In [None]:
import re
import json
from typing import List, Dict
from urllib.parse import urljoin
from collections import namedtuple
from recordtype import recordtype
from random import random, seed, sample
from collections import defaultdict, Counter

import requests
import numpy as np
from tqdm import tqdm
from cleaner import Cleaner
from sanity.sanity import check
# from tqdm.notebook import tqdm

**NOTE** Please set the environment variables in case you use the corpnet network

In [None]:
# %env http_proxy=http://clientproxy.corproot.net:8079
# %env https_proxy=http://clientproxy.corproot.net:8079

The data collection for entities concerns German.

In [None]:
# constants
FILTERING = True
LANGUAGE = 'de'
ENTITY_URL = 'https://plato-entity-management-staging.scapp-corp.swisscom.com/solutions/tv/entity-types/'
IGNORE_ENTITY_TYPES = ['CallingNumberType', 'DossierName', 'SpotifyGenre']
cleaner = Cleaner()

Definition of two data structures that are useful for the rest of the analysis.

In [None]:
Entity = recordtype('Entity', ['value', 'type', 'popularity', 'aliases'])

### Fetch Entity Types from Entity Management API

In [None]:
url = urljoin(ENTITY_URL, f'?language={LANGUAGE}')
response = requests.get(url=url)
entity_types = []
if response.status_code == 200:
    data = response.json()
    entity_types = [entity['type'] for entity in data if entity['type'] not in IGNORE_ENTITY_TYPES]

### Utility functions

In [None]:
def find_space_separated_abbreviations(text: str) -> List:
    """
    Finds abbreviations in text written with space among their letters.
    E.g. 'Go to S R F 1' finds 'SRF' as abbreviation.
    """
    regex = re.compile(r'\b[A-Z]\b')

    # initialize values
    abbreviations = []
    abbreviation = ''
    last_pos = -1
    
    for item in regex.finditer(text):
        if last_pos == -1:
            abbreviation += item.group()
            last_pos = item.span()[1]
        elif item.span()[0] == last_pos + 1:
            abbreviation += item.group()
            last_pos = item.span()[1]
        elif len(abbreviation) > 1:
            abbreviations.append(abbreviation)
            abbreviation = item.group()
            last_pos = -1
        elif len(abbreviation) == 1:
            abbreviation = item.group()
            last_pos = -1
    
    # append last found abbreviation
    if len(abbreviation) > 1:
        abbreviations.append(abbreviation)

    return abbreviations

def restore_abbreviations_in_text(text: str) -> str:
    """
    Restores malformed abbreviations in text.
    E.g. 'Go to S R F 1' becomes 'Go to SRF 1'.
    """
    abbreviations = find_space_separated_abbreviations(text=text)
    if abbreviations:
        for abbreviation in abbreviations:
            text = text.replace(' '.join(list(abbreviation)), abbreviation)
    return text

In [None]:
def remove_noisy_tags(text: str) -> str:
    """
    Removes the CH, D, F, I, HD tags from the string.
    """
    text = re.sub(r'\b(?:)(CH|D|F|I|HD|UHD)\b', '', text, flags=re.IGNORECASE)
    return text

In [None]:
def normalize_text(text: str) -> str:
    """
    Normalizes the text by:
        * restoring malformed abbreviations
        * removing noisy tags
        * replacing multiple spaces with one
        * stripping the text
    """
    text = restore_abbreviations_in_text(text=text)
    text = remove_noisy_tags(text=text)
    text = re.sub(r'\s+', ' ', text)
    text = text.strip()
    return text

In [None]:
def filter_aliases(aliases: List) -> List:
    """
    Filters list of aliases to keep only the useful ones.
    It is used to remove all the noisy aliases given by tv that are useful for ASR.
    
    E.g. ['s r f 1', 'SRF 1'] becomes ['SRF 1']
    """
    original_aliases = list(aliases)
    regex = re.compile(r'\b[a-z]\b')
    set_lowercased_aliases = set([alias.lower() for alias in aliases])
    for alias in original_aliases:
        if alias.islower() and regex.finditer(alias):
            uppercased_alias = alias.upper()
            uppercased_alias = restore_abbreviations_in_text(text=uppercased_alias).strip()
            if uppercased_alias.lower() in set_lowercased_aliases:
                aliases.remove(alias)
    return aliases

In [None]:
def get_entities_from_api(entity_type: str) -> List:
    url = urljoin(ENTITY_URL, f'{entity_type}/entities?language={LANGUAGE}')  
    response = requests.get(url=url)
    entities = []
    if response.status_code == 200:
        entities = response.json()
    return entities

In [None]:
def parse_entities(entities: List[Dict]) -> List[Entity]:
    entities_obj = []
    for entity in entities:
        entity_obj = Entity(**{key: entity[key] for key in Entity._fields})
        entity_obj.value = normalize_text(text=normalize_text(text=entity_obj.value))
        entity_obj.aliases = [restore_abbreviations_in_text(text=alias) for alias in set(entity_obj.aliases)]
        entity_obj.aliases = list(set(filter_aliases(aliases=entity_obj.aliases)))
        entities_obj.append(entity_obj)
    return entities_obj

### Fetch Entity Values from Entity Management API

In [None]:
entity_filtering_threshold = {
    'AppName': 30,
    'BroadcastName': 500,
    'LocalsearchLocation': 200,
    'ParticipantName': 150,
    'RadioChannelName': 30,
    'SeriesName': 500,
    'SportParticipantName': 100,
    'TvChannelName': 60,
    'VodName': 500,
    'FirstName': 100,
    'LastName': 100
}

In [None]:
# # mock entity_types
# # uncomment if you want to do one quick experiment with one entity type
# entity_types = ['TvChannelName']

cleaned_entities_from_db = []

for entity_type in tqdm(entity_types, total=len(entity_types)):
    entities_in_json = get_entities_from_api(entity_type=entity_type)
    entities_in_json = sorted(entities_in_json, key=lambda item: item['popularity'], reverse=True)
    if entity_type in entity_filtering_threshold.keys():
        entities_in_json = entities_in_json[:entity_filtering_threshold[entity_type]]
    entities = parse_entities(entities=entities_in_json)
    for entity in entities:
        e = {
            'type': entity_type,
            'language': LANGUAGE,
            'popularity': entity.popularity,
            'value': normalize_text(text=entity.value),
            'aliases': [normalize_text(text=alias) for alias in entity.aliases]
            #'value': cleaner.normalize_text(text=entity.value, language=LANGUAGE),
            #'aliases': [normalize_text(text=cleaner.normalize_text(text=alias, language=LANGUAGE)) for alias in entity.aliases]
        }
        cleaned_entities_from_db.append(e)

In [None]:
if FILTERING:
    file_name = f'_{LANGUAGE}_filtered_entities.json'
else:
    file_name = f'_{LANGUAGE}_unfiltered_entities.json'

with open(file=file_name, mode='w', encoding='utf-8') as f:
    json.dump(cleaned_entities_from_db, f, ensure_ascii=False, indent=4)