https://jamesmccaffrey.wordpress.com/2013/11/05/why-you-should-use-cross-entropy-error-instead-of-classification-error-or-mean-squared-error-for-neural-network-classifier-training/

https://www.analyticsvidhya.com/blog/2021/08/predict-the-next-word-of-your-text-using-long-short-term-memory-lstm/

https://jaketae.github.io/study/pytorch-rnn/

https://www.analyticsvidhya.com/blog/2021/03/introduction-to-long-short-term-memory-lstm/

https://towardsdatascience.com/bert-explained-state-of-the-art-language-model-for-nlp-f8b21a9b6270

In [73]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import pandas as pd
from net import Net_CBOW
import torch.optim as optim
from matplotlib import pyplot as plt
import sys


In [126]:
version = "april22_3000datalim_20epoch"

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
net = torch.load(f"saves/model_{version}.pt", map_location=device)
vocab = torch.load(f"saves/vocab_{version}.pt")
embeddings_df = torch.load(f"saves/emb_{version}.pt")
embeddings_mat = torch.Tensor(np.array(embeddings_df))

In [127]:
def embed(word):
    if word not in vocab:
        v = embeddings_df.loc["<unk>"]
    else:
        v = embeddings_df.loc[word]
    v = np.array(v)
    return v
def lookup_id(word, vocab=vocab):
    if word not in vocab:
        return vocab["<unk>"]
    return vocab[word]
def lookup_token(word_id, vocab=vocab):
    for word in vocab:
        if vocab[word] == word_id:
            return word
    return None
def get_top_similar(word_vec, embeddings_n = embeddings_norm, topN: int = 10, include_self=True):
    word_vec = np.reshape(word_vec, (len(word_vec), 1))
    dists = np.matmul(embeddings_n, word_vec).flatten()
    if not include_self:
        topN_ids = np.argsort(-dists)[1 : topN+1]
    else:
        topN_ids = np.argsort(-dists)[0 : topN]

    topN_dict = {}
    for sim_word_id in topN_ids:
        sim_word = "<unk>"
        for k in vocab:
            if vocab[k] == sim_word_id:
                sim_word = k
                break
        topN_dict[sim_word] = dists[sim_word_id]
    return topN_dict


In [128]:
get_top_similar(embeddings_df.loc["no"], topN=10)

{'no': tensor(1.0000),
 '50': tensor(0.7276),
 'thought': tensor(0.6556),
 'these': tensor(0.6263),
 'others': tensor(0.6258),
 'more': tensor(0.6221),
 'often': tensor(0.6187),
 'small': tensor(0.6130),
 'black': tensor(0.6128),
 '...': tensor(0.6102)}

In [129]:
from datasets import load_dataset
wikitext2 = load_dataset("wikitext", "wikitext-2-v1")
sys.getsizeof(wikitext2)

208

In [130]:
text_train = wikitext2["train"]['text']
text_train = [item.lower().strip() for item in text_train if len(item) > 0]
text_test = wikitext2["test"]['text']
text_test = [item.lower().strip() for item in text_test if len(item) > 0]
len(text_test)

2891

In [131]:
text_train = [item.split(" ") + ["\n"] for item in text_train if "=" not in item]
text_test = [item.split(" ") + ["\n"] for item in text_test if "=" not in item]

In [132]:
enc_text_train = [[lookup_id(word) for word in paragraph] for paragraph in text_train]
enc_text_test = [[lookup_id(word) for word in paragraph] for paragraph in text_test]
sys.getsizeof(enc_text_test)

18232

In [148]:
SCANNING_WINDOW = 5
def get_data(index, window, data):
    x = []
    for word_id in data[index-window:index]:
        x.append([list(embeddings_df.iloc[word_id])])
    
    b = data[index]
    y = list(embeddings_df.iloc[b])
        
    x = torch.tensor(x).view(SCANNING_WINDOW,1,len(x[0][0]))
    y = torch.tensor(y)
    return x,y

In [149]:
x_train = [[get_data(i, SCANNING_WINDOW, paragraph)[0] for i in range(SCANNING_WINDOW, len(paragraph)) if paragraph[i] != vocab["<unk>"]] for paragraph in enc_text_train[:250]]
print("x -> y")
y_train = [[get_data(i, SCANNING_WINDOW, paragraph)[1] for i in range(SCANNING_WINDOW, len(paragraph)) if paragraph[i] != vocab["<unk>"]] for paragraph in enc_text_train[:250]]
sys.getsizeof(x_train)

x -> y


2200

In [150]:
x_test = [[get_data(i, SCANNING_WINDOW, paragraph)[0] for i in range(SCANNING_WINDOW, len(paragraph)) if paragraph[i] != vocab["<unk>"]] for paragraph in enc_text_test[:250]]
print("x -> y")
y_test = [[get_data(i, SCANNING_WINDOW, paragraph)[1] for i in range(SCANNING_WINDOW, len(paragraph)) if paragraph[i] != vocab["<unk>"]] for paragraph in enc_text_test[:250]]
len(x_test)

x -> y


250

In [155]:
EMBED_DIMENSION = 50
MIDDLE_LAYER = 64
HIDDEN_LAYER = 50
class Predict(nn.Module):
    def __init__(self, hidden_size = HIDDEN_LAYER, embed_dimension : int = EMBED_DIMENSION):
        super(Predict, self).__init__()
        self.hidden_size = hidden_size
        self.in2hidden = nn.Linear(embed_dimension*SCANNING_WINDOW + hidden_size, hidden_size)
        self.in2middle = nn.Linear(embed_dimension*SCANNING_WINDOW + hidden_size, MIDDLE_LAYER)
        self.middle2out = nn.Linear(MIDDLE_LAYER, embed_dimension)

    def forward(self, x, hidden):
        comb = torch.cat((x, hidden), 1)
        hidden = torch.sigmoid(self.in2hidden(comb))
        next = F.relu(self.in2middle(comb))
        next = F.softmax(self.middle2out(next))
        return x, hidden
    
    def predict(self, inputs):
        return self.forward(inputs)

    def init_hidden(self):
        return nn.init.kaiming_uniform_(torch.empty(1, self.hidden_size))

In [156]:
net = Predict(EMBED_DIMENSION)
params = list(net.parameters())
net.zero_grad()
criterion = nn.MSELoss()
losses = []
epoch_losses = []

In [157]:
NUM_EPOCHS = 10
optimizer = optim.Adam(net.parameters(), lr=0.025)
scheduler = optim.lr_scheduler.LinearLR(optimizer, 1.0, 0.0, total_iters=NUM_EPOCHS)

In [159]:
for epoch in range(NUM_EPOCHS):
    print("RUN", str(epoch+1)+"/"+str(NUM_EPOCHS), end=": ")
    for index in range(len(x_train[:100])):
        if index % (len(x_train)//10) == 0:
            print("•", end="")
        hidden = net.init_hidden()
        for word_index in range(len(x_train[index])):
            context, target = x_train[index][word_index], y_train[index][word_index]
            output, hidden = net(context, hidden)
        loss = criterion(output, y_train[index])
        optimizer.zero_grad()   # zero the gradient buffers
        loss.backward()
        optimizer.step()    # Does the update
    epoch_loss = 0
    for contextparagraph, targetparagraph in zip(x_test[:50], y_test[:50]):
        hidden = net.init_hidden()
        for context, target in zip(contextparagraph, targetparagraph):
            output, hidden = net(context, hidden)
        losses.append(criterion(output, targetparagraph).item())
        epoch_loss += losses[-1]
    epoch_loss /= len(x_test)
    epoch_losses.append(epoch_loss)
    
    print(f"[{epoch_loss}]")
    scheduler.step()
    print()

RUN 1/10: •

RuntimeError: Tensors must have same number of dimensions: got 3 and 2

In [None]:
plt.plot(epoch_losses)
plt.show()


In [None]:
plt.plot(losses)
plt.show()

In [None]:
sentence = "the book and the novel are the best type of <unk>"
sentence = [vocab[a.lower()] for a in sentence.split(" ")]
index = 10
last_word = "of"
for i in range(10):
    context, middle = get_data(index + i, window=SCANNING_WINDOW, data=sentence)
    print('context',[list(get_top_similar(context[i*EMBED_DIMENSION:(i+1)*EMBED_DIMENSION], topN=1).keys())[0] for i in range(SCANNING_WINDOW)])
    predicted_word = list(get_top_similar(net.predict(torch.tensor(context)).detach(), topN=10).keys())
    print(predicted_word)
    predicted_word = predicted_word[0]
    # if predicted_word[0] == last_word:
    #     predicted_word = predicted_word[1]
    # else:
        # predicted_word = predicted_word[0]
    last_word = predicted_word
    sentence.insert(index + i, vocab[predicted_word])
    print([lookup_token(a) for a in sentence])