# Natural Language Models

In [84]:
import os
import time
import shutil
import random
from typing import Tuple
from argparse import Namespace
import matplotlib.pyplot as plt

# Preprocessing
import nltk
from nltk.corpus import stopwords
from nltk import ngrams
from nltk.tokenize import TweetTokenizer
from nltk import FreqDist
import pandas as pd
import numpy as np

# Pytorch
from torch.utils.data import DataLoader, TensorDataset 
import torch
import torch.nn as nn
import torch.nn.functional as F

# scikit-learn
from sklearn.metrics import accuracy_score

In [85]:
seed = 1111
random.seed(seed) # python seed
np.random.seed(seed) #numpy seed
torch.manual_seed(seed)
torch.backends.cudnn.benchmark = False 

In [86]:
X_train = pd.read_csv("mex_train.txt", sep="\r\n", engine="python", header=None).loc[:,0]
# print(X_train)
X_val = pd.read_csv("mex_train.txt", sep="\r\n", engine="python", header=None).loc[:,0]
# print(X_val)

In [87]:
args = Namespace()
args.N = 4

In [88]:
from nltk import FreqDist
from nltk.tokenize import TweetTokenizer
import numpy as np

class NgramData():
    def __init__(self, N:int, vocab_max:int=5000, tokenizer=None, embeddings_model=None):
        self.tokenizer = tokenizer if tokenizer else self.default_tokenizer
        self.punct = set(['.',',', ';', ':', '-', '^', '>>', '!', '¡', '¿', '?', '"', '\'','...','<url>', '*', '@usuario'])
        self.N = N
        self.vocab_max = vocab_max
        self.UNK = "<unk>"
        self.SOS = "<s>"
        self.EOS = "</s>"
        self.embeddings_model = embeddings_model

    def get_vocab_size(self) -> int:
        return len(self.vocab)

    def default_tokenizer(self, doc:str) -> list:
        return doc.split(" ")

    def remove_word(self, word:str)-> bool:
        word = word.lower()
        is_punct = True if word in self.punct else False
        is_digit = word.isnumeric()
        return is_punct or is_digit

    def get_vocab(self, corpus:list) -> set:
        freq_dist = FreqDist([w.lower() for sentence in corpus\
                    for w in self.tokenizer(sentence)\
                    if not self.remove_word(w)])
        sorted_words = self.sortFreqDict(freq_dist)[:self.vocab_max-3]
        return set(sorted_words)

    def sortFreqDict(self, freq_dist) -> list:
        freq_dict = dict(freq_dist)
        return sorted(freq_dict, key=freq_dict.get, reverse=True)

    def fit(self, corpus:list) -> None:
        self.vocab = self.get_vocab(corpus)
        self.vocab.add(self.UNK)
        self.vocab.add(self.SOS)
        self.vocab.add(self.EOS)

        self.w2id = {}
        self.id2w = {}

        if self.embeddings_model is not None:
            self.embedding_matrix = np.empty([len(self.vocab), self.embeddings_model.vector_size])

        id = 0
        for doc in corpus:
            for word in self.tokenizer(doc):
                word_ = word.lower()
                if word_ in self.vocab and not word_ in self.w2id:
                    self.w2id[word_] = id
                    self.id2w[id] = word_

                    if self.embeddings_model is not None:
                        if word_ in self.embeddings_model:
                            self.embedding_matrix[id] = self.embeddings_model[word_]
                        else:
                            self.embedding_matrix[id] = np.random.rand(self.embeddings_model.vector_size)

                        id += 1

        # Always add special tokens
        self.w2id.update(
            {
                self.UNK: id,
                self.SOS: id+1,
                self.EOS: id+2
            }
        )
        self.id2w.update(
            {
                id: self.UNK,
                id+1: self.SOS,
                id+2: self.EOS
            }
        )

    def transform(self, corpus: list) -> Tuple[np.ndarray, np.ndarray]:
        X_ngrams = []
        y = []

        for doc in corpus:
            doc_ngram = self.get_ngram_doc(doc)
            for words_window in doc_ngram:
                words_window_ids = [self.w2id[w] for w in words_window]
                X_ngrams.append(list(words_window_ids[:-1]))
        return np.array(X_ngrams), np.array(y)

    def get_ngram_doc(self, doc:str)-> list:
        doc_tokens = self.tokenizer(doc)
        doc_tokens = self.replace_unk(doc_tokens)
        doc_tokens = [w.lower() for w in doc_tokens]
        doc_tokens = [self.SOS]*(self.N-1) + doc_tokens + [self.EOS]
        return list(ngrams(doc_tokens, self.N))

    def replace_unk(self, doc_tokens: list) -> list:
        for i, token in enumerate(doc_tokens):
            if token.lower() not in self.vocab:
                doc_tokens[i] = self.UNK
        return doc_tokens

In [89]:
tk = TweetTokenizer()
ngram_data = NgramData(args.N, 5000, tk.tokenize)
ngram_data.fit(X_train)

In [90]:
print(f'Vocab Size: {ngram_data.get_vocab_size()}')

Vocab Size: 5000


In [91]:
X_ngram_train, y_ngram_train = ngram_data.transform(X_train)
X_ngram_val, y_ngram_val = ngram_data.transform(X_val)

In [92]:
X_ngram_train

array([[1, 1, 1],
       [1, 1, 0],
       [1, 0, 0],
       ...,
       [0, 0, 0],
       [0, 0, 0],
       [0, 0, 0]])

In [93]:
y_ngram_train

array([], dtype=float64)

In [99]:
print(f'training obs x:, {X_ngram_train.shape}, y: {y_ngram_train.shape}')
print(f'validation obs x:, {X_ngram_val.shape}, y:{y_ngram_val.shape}')

training obs x:, (106964, 3), y: (0,)
validation obs x:, (106964, 3), y:(0,)
