# SETUP

## Constants

In [None]:
TXT_FILE = 'data/pride_and_prejudice.txt'
PUNCTUATION = '.;,-“’”:?—‘!()_'
LINE_TO_EXCLUDE = r'(?i)^\s*CHAPTER\s*[IVXLCDM]+\s*$'
# SPLITTING_TO_SENTENCES_PATTERN_DIRECT_SPEECH = r'(?<!\w\.\w)(?<![A-Z][a-z]\.)(?<=\.|\?|\!)(?=\s)(?![^"]*"|\')'
# SPLITTING_TO_SENTENCES_PATTERN = r'(?<!")(?<=[.!?…;])\s+(?=[A-Z])'

CONTEXT_WINDOW_SIZE = 2
EPOCHS = 10
LR = 0.1
BATCH_SIZE = 16
EMBEDDING_DIM = 20

## Import

In [None]:
import re

from nltk import word_tokenize, sent_tokenize

import torch
import torch.nn as nn
from torch.distributions.uniform import Uniform
from torch.optim import Adam
from torch.utils.data import TensorDataset, DataLoader
import lightning as L

from sklearn.manifold import TSNE
import matplotlib.pyplot as plt
import seaborn as sns

# DATA

In [None]:
txt_file_url = 'https://raw.githubusercontent.com/vm1828/nlp-basics/main/data/pride_and_prejudice.txt'
!mkdir data
!wget --no-cache --no-check-certificate {txt_file_url} -O {TXT_FILE}

In [None]:
with open(TXT_FILE) as f:
    text = f.read()
    text = re.sub(LINE_TO_EXCLUDE, '', text, flags=re.MULTILINE | re.IGNORECASE)

## Tokenized Corpus

In [None]:
sentences = sent_tokenize(text)
tokenized_corpus = []

for sentence in sentences:
    tokens = []
    for word in word_tokenize(sentence):
        word = word.strip(PUNCTUATION)
        if word:
            tokens.append(word)
    tokens.append('<EOS>')
    tokenized_corpus.append(tokens)

print(tokenized_corpus[:3])

## Indexed Corpus

In [None]:
# Build vocabulary
WORD2IDX = {word: idx for idx, word in enumerate(set(sum(tokenized_corpus, [])))}
IDX2WORD = {idx: word for word, idx in WORD2IDX.items()}
VOCAB_SIZE = len(WORD2IDX)
# Convert tokenized corpus to indexed corpus
indexed_corpus = [[WORD2IDX[word] for word in sentence] for sentence in tokenized_corpus]

print(indexed_corpus[0])
print(len(indexed_corpus))
print(VOCAB_SIZE)

## Skip-Gram Pairs

In [None]:
def generate_skipgrams(indexed_corpus, window_size=CONTEXT_WINDOW_SIZE): 
    data = []
    for sentence in indexed_corpus:
        for i, target in enumerate(sentence):
            window = range(max(0, i-window_size), min(len(sentence), i+window_size+1))
            context = [sentence[j] for j in window if j!=i]
            for ctx in context:
                data.append((target, ctx)) # pair every word with each word in the context
    return data

skipgram_pairs = generate_skipgrams(indexed_corpus)
print(skipgram_pairs[0])
print(len(skipgram_pairs))

## Train Data

In [None]:
X_train = torch.LongTensor([pair[0] for pair in skipgram_pairs])  # target words
y_train = torch.LongTensor([pair[1] for pair in skipgram_pairs])  # context words
# nn.Embedding and nn.CrossEntropyLoss require torch.int64 (LongTensor) inputs for indexing and categorical targets.

dataset = TensorDataset(X_train, y_train)
dataloader = DataLoader(dataset, batch_size=4, shuffle=True)

In [None]:
len(dataset)

# MODEL

## From Scratch

In [None]:

INPUT_DIM = VOCAB_SIZE

class SkipGramModelFromScratch(L.LightningModule):

    def __init__(self):
        super().__init__()
        L.seed_everything(seed=42)
        self.input_to_hidden = nn.Linear(in_features=len(dataset), out_features=EMBEDDING_DIM, bias=False)
        self.hidden_to_output = nn.Linear(in_features=2, out_features=4, bias=False)
        self.loss = nn.CrossEntropyLoss()

    def forward(self, input):
        hidden = self.input_to_hidden(input)
        # Then we pass "hidden" to the weights we created with nn.Linear() between the hidden layer and the output.
        output_values = self.hidden_to_output(hidden)
        return(output_values)


    def configure_optimizers(self):
        return Adam(self.parameters(), lr=LR)


    def training_step(self, batch, batch_idx):
        input_i, label_i = batch
        output_i = self.forward(input_i)
        loss = self.loss(output_i, label_i)
        
        self.log("train_loss", loss)

        return loss

## nn.Embedding

In [None]:
class SkipGramModelEmbedding(L.LightningModule):

    def __init__(self, vocabulary_size, embedding_size):
        super().__init__()
        self.embeddings = nn.Embedding(vocabulary_size, embedding_size)
        self.linear = nn.Linear(embedding_size, vocabulary_size)
        self.loss = nn.CrossEntropyLoss()

    def forward(self, context_word):
        embedding = self.embeddings(context_word)
        output = self.linear(embedding)
        return output
    
    def configure_optimizers(self):
        return Adam(self.parameters(), lr=LR)

    def training_step(self, batch, batch_idx):
        input_i, label_i = batch
        output_i = self.forward(input_i)
        loss = self.loss(output_i, label_i)

        self.log("train_loss", loss)
        
        return loss

In [None]:
modelEmbedding = SkipGramModelEmbedding(VOCAB_SIZE, EMBEDDING_DIM)
trainer = L.Trainer(max_epochs=EPOCHS)
trainer.fit(modelEmbedding, train_dataloaders=dataloader)

# DEMO

In [None]:
def visualize_words_embedding(model):
  
  words = [
    'Elizabeth', 'Darcy', 'Bingley', 'Lydia', 'Jane', 'Collins', 'Pemberley', 
    'Meryton', 'Lady', 'Wickham', 'marriage', 'love', 'prejudice', 'fortune', 
    'accomplished', 'pride', 'sense', 'character', 'family', 'society',
    'husband', 'wife', 'man', 'woman', 'summer', 'spring', 'winter'
  ]
  
  indices = [WORD2IDX[word] for word in words]
  embeddings = model.embeddings.weight.data[indices]

  tsne = TSNE(n_components=2, perplexity=20, random_state=42) # 2d t-SNE
  embeddings_2d = tsne.fit_transform(embeddings)

  colors = sns.husl_palette(n_colors = len(words))
  plt.figure(figsize=(10, 10))
  for i, word in enumerate(words):
    plt.scatter(embeddings_2d[:, 0], embeddings_2d[:, 1],c=colors)
    plt.annotate(word, xy=(embeddings_2d[i, 0], embeddings_2d[i, 1]))
  
  plt.show();

In [None]:
visualize_words_embedding(modelEmbedding)