In [None]:
import re
import nltk
import numpy as np

In [None]:
from nltk.corpus import reuters
from nltk import ngrams
from collections import Counter, defaultdict

In [None]:
!wget https://gist.githubusercontent.com/jardondiego/f91298aa14142b505e71b9ee46d21fcf/raw/f32234be7180be78d7a045f8bf2bbb70edbf0614/quijote.txt

In [None]:
def get_data(file):
    """
    Read file and return a list of lines without empty lines
    """
    with open(file, 'r', encoding='utf-8') as f:
        lines = f.readlines()
        no_empty_lines = [line.strip() for line in lines if line.strip()]
        no_special_chars = [re.sub(r'[^\w\s]', '', line) for line in no_empty_lines]
        lowercased = [line.lower() for line in no_special_chars]
        tokenized = [ sentence.split() for sentence in lowercased ]
    return tokenized

dataset = get_data("quijote.txt")
print(dataset[:5])


### Preprocesamiento

In [None]:
def preprocess_corpus(corpus: list[str]) -> list[str]:
    """Función de preprocesamiento

    Agrega tokens de inicio y fin, normaliza todo a minusculas
    """
    preprocessed_corpus = []
    for sent in corpus:
        result = [word.lower() for word in sent]
        # Al final de la oración
        result.append("<EOS>")
        result.insert(0, "<BOS>")
        preprocessed_corpus.append(result)
    return preprocessed_corpus

In [None]:
def get_words_freqs(corpus: list[list[str]]):
    words_freqs = {}
    for sentence in corpus:
        for word in sentence:
            words_freqs[word] = words_freqs.get(word, 0) + 1
    return words_freqs

In [None]:
UNK_LABEL = "<UNK>"
def get_words_indexes(words_freqs: dict) -> dict:
    result = {}
    for idx, word in enumerate(words_freqs.keys()):
        # Happax legomena happends
        if words_freqs[word] == 1:
            # Temp index for unknowns
            result[UNK_LABEL] = len(words_freqs)
        else:
            result[word] = idx

    return {word: idx for idx, word in enumerate(result.keys())}, {idx: word for idx, word in enumerate(result.keys())}

In [None]:
corpus = preprocess_corpus(get_data("quijote.txt"))

In [None]:
len(corpus)

In [None]:
words_freqs = get_words_freqs(corpus)

In [None]:
words_freqs["the"]

In [None]:
len(words_freqs)

In [None]:
count = 0
for word, freq in words_freqs.items():
    if freq == 1 and count <= 10:
        print(word, freq)
        count += 1

In [None]:
words_indexes, index_to_word = get_words_indexes(words_freqs)

In [None]:
words_indexes["the"]

In [None]:
index_to_word[16]

In [None]:
len(words_indexes)

In [None]:
len(index_to_word)

In [None]:
def get_word_id(words_indexes: dict, word: str) -> int:
    unk_word_id = words_indexes[UNK_LABEL]
    return words_indexes.get(word, unk_word_id)

### Obtenemos trigramas

Convertiremos los trigramas obtenidos a secuencias de idx, y preparamos el conjunto de entrenamiento $x$ y $y$

- x: Contexto
- y: Predicción de la siguiente palabra

In [None]:
def get_train_test_data(corpus: list[list[str]], words_indexes: dict, n: int) -> tuple[list, list]:
    x_train = []
    y_train = []
    for sent in corpus:
        n_grams = ngrams(sent, n)
        for w1, w2, w3 in n_grams:
            x_train.append([get_word_id(words_indexes, w1), get_word_id(words_indexes, w2)])
            y_train.append([get_word_id(words_indexes, w3)])
    return x_train, y_train

In [None]:
# cargamos bibliotecas
import torch
from torch import nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader
import time

In [None]:
# Setup de parametros
EMBEDDING_DIM = 200
CONTEXT_SIZE = 2
BATCH_SIZE = 256
H = 100
torch.manual_seed(19)
# Tamaño del Vocabulario
V = len(words_indexes)

In [None]:
x_train, y_train = get_train_test_data(corpus, words_indexes, n=3)

In [None]:
train_set = np.concatenate((x_train, y_train), axis=1)
# partimos los datos de entrada en batches
train_loader = DataLoader(train_set, batch_size = BATCH_SIZE)

In [None]:
# Trigram Neural Network Model
class TrigramModel(nn.Module):
    """Clase padre: https://pytorch.org/docs/stable/generated/torch.nn.Module.html"""

    def __init__(self, vocab_size, embedding_dim, context_size, h):
        super(TrigramModel, self).__init__()
        self.context_size = context_size
        self.embedding_dim = embedding_dim
        self.embeddings = nn.Embedding(vocab_size, embedding_dim)
        self.linear1 = nn.Linear(context_size * embedding_dim, h)
        self.linear2 = nn.Linear(h, vocab_size)

    def forward(self, inputs):
        # x': concatenation of x1 and x2 embeddings   -->
        #self.embeddings regresa un vector por cada uno de los índices que se les pase como entrada. view() les cambia el tamaño para concatenarlos
        embeds = self.embeddings(inputs).view((-1,self.context_size * self.embedding_dim))
        # h: tanh(W_1.x' + b)  -->
        out = torch.tanh(self.linear1(embeds))
        # W_2.h                 -->
        out = self.linear2(out)
        # log_softmax(W_2.h)      -->
        # dim=1 para que opere sobre renglones, pues al usar batchs tenemos varios vectores de salida
        log_probs = F.log_softmax(out, dim=1)

        return log_probs

### Entrenamiento

In [None]:
# 1. Pérdida. Negative log-likelihood loss
loss_function = nn.NLLLoss()

#Otras opciones de función de pérdida (tendrían que usar softmax sin log):
#nn.CrossEntropyLoss()


# 2. Instanciar el modelo
model = TrigramModel(V, EMBEDDING_DIM, CONTEXT_SIZE, H)

# 3. Optimización. ADAM optimizer
optimizer = optim.Adam(model.parameters(), lr = 2e-3)

#Otras opciones de optimizador:
#optimizer = optim.SGD(model.parameters(), lr=0.1)


# ------------------------- TRAIN & SAVE MODEL ------------------------
# En la práctica sólo correremos una epoch por restricciones de recursos
EPOCHS = 1
for epoch in range(EPOCHS):
    st = time.time()
    print("\n--- Training model Epoch: {} ---".format(epoch))
    for it, data_tensor in enumerate(train_loader):
        context_tensor = data_tensor[:,0:2]
        target_tensor = data_tensor[:,2]

        model.zero_grad() #reinicializar los gradientes
        #FORWARD:
        # get log probabilities over next words
        log_probs = model(context_tensor)


        # compute loss function
        loss = loss_function(log_probs, target_tensor)

        #BACKWARD:
        # backward pass and update gradient
        loss.backward()
        optimizer.step()

        if it % 500 == 0:
            print("Training Iteration {} of epoch {} complete. Loss: {}; Time taken (s): {}".format(it, epoch, loss.item(), (time.time()-st)))
            st = time.time()
            #barch_size x len(vocab)

    # saving model
    model_path = 'model_{}.dat'.format(epoch)
    torch.save(model.state_dict(), model_path)
    print(f"Model saved for epoch={epoch} at {model_path}")

In [None]:
def get_model(path: str) -> TrigramModel:
    model_loaded = TrigramModel(V, EMBEDDING_DIM, CONTEXT_SIZE, H)
    model_loaded.load_state_dict(torch.load(path))
    model_loaded.eval()
    return model_loaded

In [None]:
PATH = "drive/MyDrive/LM_neuronal/model_0.dat"

In [None]:
model = get_model(PATH)
W1 = "<BOS>"
W2 = "mi"

IDX1 = get_word_id(words_indexes, W1)
IDX2 = get_word_id(words_indexes, W2)

#Obtenemos Log probabidades p(W3|W2,W1)
probs = model(torch.tensor([[IDX1,  IDX2]])).detach().tolist()

In [None]:
len(probs[0])

In [None]:
# Creamos diccionario con {idx: logprob}
model_probs = {}
for idx, p in enumerate(probs[0]):
  model_probs[idx] = p

# Sort:
model_probs_sorted = sorted(((prob, idx) for idx, prob in model_probs.items()), reverse=True)

# Printing word  and prob (retrieving the idx):
topcandidates = 0
for prob, idx in model_probs_sorted:
  #Retrieve the word associated with that idx
  word = index_to_word[idx]
  print(idx, word, prob)

  topcandidates += 1

  if topcandidates > 100:
    break

In [None]:
index_to_word.get(model_probs_sorted[0][0])

## Generacion de lenguaje

In [None]:
def get_likely_words(model: TrigramModel, context: str, words_indexes: dict, index_to_word: dict, top_count: int=10) -> list[tuple]:
    model_probs = {}
    words = context.split()
    idx_word_1 = get_word_id(words_indexes, words[0])
    idx_word_2 = get_word_id(words_indexes, words[1])
    probs = model(torch.tensor([[idx_word_1, idx_word_2]])).detach().tolist()

    for idx, p in enumerate(probs[0]):
        model_probs[idx] = p

    # Strategy: Sort and get top-K words to generate text
    return sorted(((prob, index_to_word[idx]) for idx, prob in model_probs.items()), reverse=True)[:top_count]

In [None]:
sentence = "esto es"
get_likely_words(model, sentence, words_indexes, index_to_word, 3)

In [None]:
from random import randint

def get_next_word(words: list[tuple[float, str]]) -> str:
    # From a top-K list of words get a random word
    return words[randint(0, len(words)-1)][1]

In [None]:
get_next_word(get_likely_words(model, sentence, words_indexes, index_to_word))

In [None]:
MAX_TOKENS = 30
TOP_COUNT = 10
def generate_text(model: TrigramModel, history: str, words_indexes: dict, index_to_word: dict, tokens_count: int=0) -> None:
    next_word = get_next_word(get_likely_words(model, history, words_indexes, index_to_word, top_count=TOP_COUNT))
    print(next_word, end=" ")
    tokens_count += 1
    if tokens_count == MAX_TOKENS or next_word == "<EOS>":
        return
    generate_text(model, history.split()[1]+ " " + next_word, words_indexes, index_to_word, tokens_count)

In [None]:
sentence = "<BOS> el"
print(sentence, end=" ")
generate_text(model, sentence, words_indexes, index_to_word)