In [1]:
import os
import re
from collections import defaultdict
from itertools import chain
from itertools import tee
from os.path import join as pj
from typing import List, Union, Tuple, Dict

import numpy as np
import pandas as pd

In [2]:
DATA_FOLDER = pj('thesis', 'NLP_Course', 'HW1', 'data')
START_TOKEN = '<s>'
END_TOKEN = '</s>'

In [15]:

def get_tweets_from_file(data_file_path: Union[str, os.PathLike]) -> List[str]:
    return pd.read_csv(data_file_path, encoding='utf-8')['tweet_text'].tolist()


def preprocess() -> List[str]:
    vocabulary = {START_TOKEN, END_TOKEN}
    for f in os.listdir(DATA_FOLDER):
        if f.endswith('.csv'):
            print(f)
            for tweet in get_tweets_from_file(data_file_path=pj(DATA_FOLDER, f)):
                vocabulary.update(tweet)

    return list(vocabulary)

In [16]:
_vocabulary = preprocess()
len(_vocabulary), _vocabulary[:10]

en.csv
fr.csv
tl.csv
pt.csv
es.csv
it.csv
nl.csv
in.csv


(1804, ['🦄', '😞', '🌫', '녀', '🐓', '🗾', '●', 'А', '🐶', 'p'])

In [5]:
def pad_sequence(sequence: str, n: int, pad_left: bool = True, pad_right: bool = True,
                 left_pad_symbol: str = START_TOKEN, right_pad_symbol: str = END_TOKEN):
    sequence = iter(sequence)
    if pad_left:
        sequence = chain((left_pad_symbol,) * (n - 1), sequence)
    if pad_right:
        sequence = chain(sequence, (right_pad_symbol,) * (n - 1))
    return sequence


def ngram(sequence: str, n: int) -> zip:
    sequence = pad_sequence(sequence=sequence, n=n)

    iterables = tee(sequence, n)

    for i, sub_iterable in enumerate(iterables):
        for _ in range(i):
            next(sub_iterable, None)

    return zip(*iterables)


def get_ngram_model(grams: List[Tuple[str, ...]],
                    vocab_length: int, add_one: bool = False) -> Dict[str, Dict[str, float]]:
    model = defaultdict(lambda: defaultdict(lambda: 1e-8))

    for gram in grams:
        list_gram = list(gram)
        last_ch = list_gram.pop(len(list_gram) - 1)
        seq = ''.join(list_gram)
        model[seq][last_ch] += 1

    for seq in model.keys():
        counter = 0
        for last_ch in model[seq].keys():
            counter += model[seq][last_ch]
        for last_ch in model[seq].keys():
            model[seq][last_ch] = (model[seq][last_ch] + (1 * add_one)) / (counter + (vocab_length * add_one))

    return model


def lm(n: int, vocabulary: List[str], data_file_path: Union[str, os.PathLike],
       add_one: bool = False) -> Dict[str, Dict[str, float]]:
    # n - the n-gram to use (e.g., 1 - unigram, 2 - bigram, etc.)
    # vocabulary - the vocabulary list (which you should use for calculating add_one smoothing)
    # data_file_path - the data_file from which we record probabilities for our model
    # add_one - True/False (use add_one smoothing or not)
    v_len = len(vocabulary)

    tweets = get_tweets_from_file(data_file_path=data_file_path)
    grams = [gram for seq in tweets for gram in ngram(sequence=seq, n=n)]

    return get_ngram_model(grams=grams, vocab_length=v_len, add_one=add_one)


In [6]:
_model_3 = lm(n=3, vocabulary=_vocabulary, data_file_path=pj(DATA_FOLDER, 'en.csv'), add_one=False)


In [7]:
def _eval(n: int, model: Dict[str, Dict[str, float]], data_file: str) -> float:
    # n - the n-gram that you used to build your model (must be the same number)
    # model - the dictionary (model) to use for calculating perplexity
    # data_file - the tweets file that you wish to calculate a perplexity score for

    tweets = get_tweets_from_file(data_file_path=pj(DATA_FOLDER, data_file))
    probs = []

    for tweet in tweets:
        # Start of the sentence.
        probs += [model[START_TOKEN * (n - i - 1) + tweet[:i]][tweet[i]]
                  for i in range(n - 1)]
        # n-grams of the sentence.
        probs += [model[tweet[i:i + n - 1]][tweet[i + n - 1]]
                  for i in range(len(tweet) - n + 1)]
        # End of sentence.
        last_idx = len(tweet) - n + 1
        probs += [model[tweet[last_idx + i:last_idx + n] + END_TOKEN * i][END_TOKEN]
                  for i in range(n - 1)]

    return np.power(2, - np.log2(probs).mean())


_eval(n=3, model=_model_3, data_file='en.csv')

8.695300683714246

In [8]:
def match(n: int, add_one: bool) -> pd.DataFrame:
    # n - the n-gram to use for creating n-gram models
    # add_one - use add_one smoothing or not

    #TODO

    languages = {os.path.splitext(fn)[0]: fn
                 for fn in os.listdir(DATA_FOLDER) if fn.endswith('.csv')}
    languages.pop('test')
    languages.pop('tests')

    vocabulary = preprocess()

    models = {}
    languages_sorted = sorted(list(languages.keys()))
    for language in languages_sorted:
        models[language] = lm(n=n, vocabulary=vocabulary,
                              data_file_path=pj(DATA_FOLDER, languages[language]), add_one=add_one)

    df = pd.DataFrame(columns=languages_sorted)
    for lang_model in languages_sorted:
        for lang_test in languages_sorted:
            prep = _eval(n=n, model=models[lang_model], data_file=languages[lang_test])
            df.loc[lang_model, lang_test] = round(prep, 4)

    return df


In [9]:
def run_match():
    for n in [1, 2, 3, 4]:
        for add_one in [True, False]:
            print(f"{'-' * 20} n = {n}, add one = {add_one} {'-' * 20}")
            print(match(n=n, add_one=add_one))
            print('\n')

#TODO
run_match()

In [10]:
def match2(n: int, add_one: bool) -> Dict[str, Dict[str, Dict[str, float]]]:
    # n - the n-gram to use for creating n-gram models
    # add_one - use add_one smoothing or not

    #TODO

    languages = {os.path.splitext(fn)[0]: fn
                 for fn in os.listdir(DATA_FOLDER) if fn.endswith('.csv')}
    languages.pop('test')
    languages.pop('tests')

    vocabulary = preprocess()

    models = {}
    languages_sorted = sorted(list(languages.keys()))
    for language in languages_sorted:
        models[language] = lm(n=n, vocabulary=vocabulary,
                              data_file_path=pj(DATA_FOLDER, languages[language]), add_one=add_one)

    df = pd.DataFrame(columns=languages_sorted)
    for lang_model in languages_sorted:
        for lang_test in languages_sorted:
            prep = _eval(n=n, model=models[lang_model], data_file=languages[lang_test])
            df.loc[lang_model, lang_test] = round(prep, 4)

    return models


def run_match2() -> Dict[str, Dict[str, Dict[str, Dict[str, float]]]]:
    all_models = {}
    for n in [1, 2, 3, 4, 5]:
        for add_one in [True, False]:
            all_models[f'{n}_{str(add_one)[0]}'] = match2(n=n, add_one=add_one)

    return all_models

In [11]:
_all_models = run_match2()
_all_models.keys()

dict_keys(['1_T', '1_F', '2_T', '2_F', '3_T', '3_F', '4_T', '4_F', '5_T', '5_F'])

In [12]:
_df_test = pd.read_csv(pj(DATA_FOLDER, 'test.csv'))
_tweets, _labels = _df_test['tweet_text'].tolist(), _df_test['label'].tolist()

In [13]:
def sum_word_prob(models: Dict[str, Dict[str, Dict[str, Dict[str, float]]]], word: str) -> Dict[str, float]:
    word_sum = defaultdict(lambda: 0.0)

    def _s(_m: Dict[str, Dict[str, Dict[str, Dict[str, float]]]], _w: str, _ws: Dict[str, float], gram_value: int):
        for i in range(len(_w) - gram_value + 1):
            for lang in models[f'{gram_value}_T'].keys():
                _ws[lang] += _m[f'{gram_value}_T'][lang][_w[i: i + gram_value - 1]][_w[i + gram_value - 1]]
                _ws[lang] += _m[f'{gram_value}_F'][lang][_w[i: i + gram_value - 1]][_w[i + gram_value - 1]]

    for j in list(set([m.split('_')[0] for m in models.keys()])):
        _s(_m=models, _w=word, _ws=word_sum, gram_value=int(j))

    return word_sum


def sum_words_prob(models: Dict[str, Dict[str, Dict[str, Dict[str, float]]]], sequence: str) -> Dict[str, float]:
    t = sequence[:]
    t = re.sub('#[^ ]*', '', t)
    t = re.sub('[0-9]*', '', t)
    t = re.sub('https://t.co/[a-zA-Z0-9]*', '', t)
    words = re.findall("[^ ]+", t, flags=re.IGNORECASE)

    sums = defaultdict(lambda: 0.0)

    for word in words:
        for l, v in sum_word_prob(models=models, word=word).items():
            sums[l] += v

    return sums


def classify(models: Dict[str, Dict[str, Dict[str, Dict[str, float]]]],
             tweets: List[str]) -> List[str]:
    def _classify(_models: Dict[str, Dict[str, Dict[str, Dict[str, float]]]], sequence: str) -> str:
        swords = sum_words_prob(models=models, sequence=sequence)
        if len(swords) == 0:
            return 'en'  # there are few without words so just returning en can be any language, it doens't matter

        return sorted([(l, v) for l, v in swords.items()], key=lambda x: x[1])[-1][0]

    predictions = []
    for t in tweets:
        predictions.append(_classify(_models=models, sequence=t))

    return predictions

In [14]:
def calc_f1(result: Dict[str, List[str]]):
    from sklearn.metrics import f1_score

    labels, predictions = result['labels'], result['predictions']
    correct = sum([1 for l, p in zip(labels, predictions) if l.__eq__(p)])
    print(f'Total samples: {len(labels)}')
    print(f'Classified correct: {correct} & Classified wrong: {len(labels) - correct}.')
    print(f'F1 score: {round(f1_score(labels, predictions, average="macro"), 5)}')

In [15]:
clasification_result = classify(models=_all_models, tweets=_tweets)

In [16]:
calc_f1(result={'labels': _labels, 'predictions': clasification_result})

Total samples: 7999
Classified correct: 7228 & Classified wrong: 771.
F1 score: 0.90382
