# Word2Vec

> **Instructions -**
> This notebook contains holes. The holes have to be filled by the student foloowing the instructons provided as functions doc string. A correction will be provided at the end of the lecture. However it is strongly advised to fill the holes by yourself for educational purposes.

Text data is different from image data. It is not representend as a tensor. Most of the time text is digitaly represented as a sequence of bytes and encoded using a format such as `ascii` or `utf-8`. One naive way of representing words using vector representation is to keep a vocabulary dictrionnary and assign each word a position. This position can then be used as a one-hot vector, a vector with zeros except at the position of the word index where it is set to one. This the kind of vector we previously built for the target in multi-class classification.

While such approach works in some cases, we are limitting the amount of information carried by a word, its meanings, its genre, type, into a single number. To avoid this limitation, one can build a dictionaary of words that are represented by a random vector of a certain size. This is called an embedding. This embedding can then be trained using standard gradient descent by being used in a model.

In this chapter, we explore such an embedding called [Word2Vec](https://arxiv.org/abs/1301.3781).

In [None]:
from __future__ import annotations

from collections import Counter, OrderedDict
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from nltk.tokenize import sent_tokenize, word_tokenize
from sklearn.manifold import TSNE
from torch import Tensor
from torch.nn import Embedding, Linear, Module, PairwiseDistance, ReLU, Sequential
from torch.optim import AdamW, Optimizer
from torch.utils.data import DataLoader, Dataset
from tqdm.notebook import tqdm
from typing import Iterator

import matplotlib.pyplot as plt
import os
import torch
import torch.nn.functional as F
import requests

%matplotlib inline

## Harry Potter

For the sake of this exploration let us use the Harry Potter Books by J.K. Rowling. The links used for downlaoding the book content are being made available for educational purposes. Please do not use them for other usage.

The following class handles the dataset downloading, merging into a single text file, and its reading.

In [None]:
class HarryPotterBooks:
    base = "https://raw.githubusercontent.com/formcept/whiteboard/master/nbviewer/notebooks/data/harrypotter"
    urls = [
        os.path.join(base, "Book%201%20-%20The%20Philosopher's%20Stone.txt"),
        os.path.join(base, "Book%202%20-%20The%20Chamber%20of%20Secrets.txt"),
        os.path.join(base, "Book%203%20-%20The%20Prisoner%20of%20Azkaban.txt"),
        os.path.join(base, "Book%204%20-%20The%20Goblet%20of%20Fire.txt"),
        os.path.join(base, "Book%205%20-%20The%20Order%20of%20the%20Phoenix.txt"),
        os.path.join(base, "Book%206%20-%20The%20Half%20Blood%20Prince.txt"),
        os.path.join(base, "Book%207%20-%20The%20Deathly%20Hallows.txt"),
    ]

    def __init__(self, path: str, download: bool = True) -> None:
        self.path = path
        if download: self._download()

    def _download(self) -> None:
        """download

        Creates a file at self.path location if does not already exisis.
        Donwloads every HP books and appends to the file at self.path.
        """
        ...

    def read_lines(self) -> list[str]:
        """read_lines

        Reads the raw content line by line.
        """
        return []

Here we can read the first 10 lines of Harry Potter. As we can observe, we may need to clean the content before using it.

In [None]:
hp_lines = HarryPotterBooks(os.path.join(".datasets", f"hp.txt"), download=True).read_lines()
hp_lines[:10]

## Preprocessing

Text is quite a versatile data type. A same word can be used with different meanings, some can be expressed using shortened formats such as abreviations, and more. To make our work easier, let us remove some of this complexity.

The following helper class provides methods for removing, cleaning but also joining all the lines into a single welle defined string.

In [None]:
class HarryPotterLinePreprocessor:
    @staticmethod
    def remove_empty_lines(lines: Iterator[str]) -> Iterator[str]:
        """remove empty lines
        
        Removes every empty line from the given set of lines.
        Use the filter operation.
        """
        return lines

    @staticmethod
    def remove_chapter_names(lines: Iterator[str]) -> Iterator[str]:
        """remove chapter names
        
        Removes every chapter headers.
        Use the filter operation.
        """
        return lines

    @staticmethod
    def remove_page_declaration(lines: Iterator[str]) -> Iterator[str]:
        """remove page declaration
        
        Removes every page footer.
        Use the filter operation.
        """
        return lines

    @staticmethod
    def replace_abbreviations(lines: Iterator[str]) -> Iterator[str]:
        """replace abbreviations
        
        Replace abbreviations by their full version (Dr. -> Doctor).
        Use the map operation.
        """
        return lines

    def remove_end_of_line(lines: Iterator[str]) -> Iterator[str]:
        """remove end of line
        
        Remove all end of line symbols.
        Use the map operation.
        """
        return lines

    @staticmethod
    def preprocess(lines: Iterator[str], join: bool = True) -> Iterator[str] | str:
        lines = HarryPotterLinePreprocessor.remove_empty_lines(lines)
        lines = HarryPotterLinePreprocessor.remove_chapter_names(lines)
        lines = HarryPotterLinePreprocessor.remove_page_declaration(lines)
        lines = HarryPotterLinePreprocessor.replace_abbreviations(lines)
        lines = HarryPotterLinePreprocessor.remove_end_of_line(lines)
        return " ".join(lines) if join else list(lines)

Here we can observe the first thousand characters of the joined and cleaned string.

In [None]:
hp_raw = HarryPotterLinePreprocessor.preprocess(hp_lines, join=True)
hp_raw[:1_000]

## Tokens

In order to build a dictionnary for our embedding, we need to split the text in single word units called tokens. The text is first split into sentences. The reason is explained later in the document.

THe following processor class provides functions to split the text into sentences, then into tokens, but also removes the english punctuation symbols, makes each token lower case to avoid repetition, and keeps the sentences that have a minimum given length (number of tokens).

In [None]:
class HarryPotterProcessor:
    @staticmethod
    def extract_sentences(raw: str) -> list[str]:
        """extract sentences
        
        Split raw text by sentences.
        """
        return []

    @staticmethod
    def extract_sentences_tokens(sentences: list[str], min_seq: int) -> list[list[str]]:
        """extract sentences tokens
        
        For each sentence, split the sentence by tokens.
        """
        return []

    @staticmethod
    def to_lower(sentences_tokens: list[list[str]]) -> list[list[str]]:
        """to lower
        
        Put every token in its lower case version.
        """
        return []

    def lemmatize(sentences_tokens: list[list[str]]) -> list[list[str]]:
        """lemmatize
        
        Keep only the root of the tokens using a lemmatizer. (eating -> eat)
        """
        return []

    def remove_stop_words(sentences_tokens: list[list[str]]) -> list[list[str]]:
        """remove stop words
        
        Remove every english stop words from the tokens.
        """
        stop_words = set(stopwords.words('english'))
        stop_words.update([
            ".", ",", ":", ";", "?", "!",
            "(", ")", "#", "—", "-", "_", "--", "...",
            "“", "”", "\"", "'", "`", "~", "’", "|",
        ])
        return []

    @staticmethod
    def extract(raw: str, min_seq: int = 9) -> list[list[str]]:
        sentences = HarryPotterProcessor.extract_sentences(raw)
        sentences_tokens = HarryPotterProcessor.extract_sentences_tokens(sentences, min_seq)
        sentences_tokens = HarryPotterProcessor.to_lower(sentences_tokens)
        sentences_tokens = HarryPotterProcessor.remove_stop_words(sentences_tokens)
        sentences_tokens = HarryPotterProcessor.lemmatize(sentences_tokens)
        return sentences_tokens

## Vocabulary

The vocabulary is an augmented dictionary containing the index of every token in a given token list. It provides methods to transform a token, a string, into its index in the dictionary, and the inverse operation to transform an index to its corresponding token. Here the vocabulary only keeps the token that appears at least some given number of time in the text. It let us avoid creating an enormous dataset that woul be impracticable later.

In [None]:
class Vocab:
    def __init__(self, tokens: list[str], min_freq: int) -> None:
        self.t2f = OrderedDict(Counter(tokens).most_common())
        self._stoi = {s: i for i, (s, count) in enumerate([("<unk>", min_freq)] + list(self.t2f.items())) if count >= min_freq}
        self._itos = {i: s for s, i in self._stoi.items()}

    def __len__(self) -> int: return len(self._stoi)
    def stoi(self, s: str) -> int: return self._stoi[s] if s in self._stoi else 0
    def itos(self, i: int) -> int: return self._itos[i] if i in self._itos else "<unk>"

## Continous Bag of Words (CBOW)

A word can be explained by its surrounding, the context. Gvien a sentence, if one word is removes, it is likely you are able to find the missing one. This is due to the word surrounding the one masked. It gives us information that allows to narrow down the posibilities to a few. This is called a prior. Continuous Bag of Word (CBOW) is an embedding training method making use of this phenomena. Instead of encoding the word embedding into a latent variable and decoding it to find the input embedding back, it uses the context word as inputs that are aggregated, encoded into a latent variable that is then decoded into the target word.

The Harry Potter CBOW Dataset class is reponsible for building a dataset of context annd target word pairs. It first builds a vocabulary dictionary out of the Harry Potter Books content and uses it to output tokens as interger indices.

In [None]:
class HarryPotterCBOWDataset(Dataset):
    def __init__(self, raw: str, window_size: int, min_word_freq: int) -> None:
        super().__init__()
        assert window_size // 2 != 0
        self.raw = raw
        self.window_size = window_size
        self.min_word_freq = min_word_freq
        self.vocab: Vocab() = None
        self.pairs: list[tuple[list[str], str]] = []
        self._build()

    def _build(self) -> None:
        """build

        Extract the sentences tokens, build the vocabulary and the context / target
        word pairs. The pairs are obtained using a sliding window with a hop size of
        1 for sentences that have a minimum length at least equal to the window size + 1.
        """
        ...

    def __len__(self) -> int:
        """len
        
        Returns the dataset length.
        """
        return 0

    def __getitem__(self, idx: int) -> tuple[Tensor, int]:
        """getitem
        
        Retrieve the context / target token pairs.
        Return the context as a Tensor of indices from the vocabulary corresponding to
        the context tokens and a single index corresponding to the target word index.
        """
        return None, 0

Here we display the Dataset length, the vocabulary size, and the shape of the context to verify that it feats with respect to our arguments.

In [None]:
hp_cbow = HarryPotterCBOWDataset(hp_raw, window_size=9, min_word_freq=10)
ctx, word = hp_cbow[0]
print("Dataset Length:", len(hp_cbow))
print("Vocab   Length:", len(hp_cbow.vocab))
print("Context Size  :", tuple(ctx.size()))

 ## Word2Vec

 Word2Vec is a simple AutoEncoder model. It is composed of an embedding that is responsible for storing the embedding vector for each and every word of the dictionary, and a multi layer perceptron that is responsible for the decoding portion of the network. The model takes a context vector as input $(n_{\text{context}}, \text{embedding}_{\text{size}})$, and outputs a probability vector that is optimized for outputing the one hot representation of the target word.

In [None]:
class Word2Vec(Module):
    def __init__(
        self,
        vocab_size: int,
        emb_dim: int,
        h_dim: int,
        context_size: int,
        emb_max_norm: float = 1.0,
    ) -> None:
        super().__init__()
        self.embedding = Embedding(num_embeddings=vocab_size, embedding_dim=emb_dim, max_norm=emb_max_norm)
        self.classifier = Sequential(
            Linear(emb_dim * context_size, h_dim), ReLU(),
            Linear(h_dim, vocab_size, bias=False)
        )

    def forward(self, x: Tensor) -> Tensor:
        """forward
        
        Forward pass for the Word2Vec model in CBOW mode.
            ctx_1 \
            ctx_2 -> z -> tgt
            ctx_3 /
        """
        return x

## Training

### History

In [None]:
class History:
    def __init__(self) -> None:
        self.loss = []
        self.acc = []

    def __len__(self) -> int:
        assert len(self.loss) == len(self.acc)
        return len(self.loss)

    def plot(self) -> None:
        fig = plt.figure(figsize=(2 * 4, 4), dpi=400)
        ax = fig.add_subplot(2, 1, 1)
        ax.plot(list(range(len(self))), self.loss, label="train", color="r")
        ax.set_title("Loss History")
        ax.set_xlabel("epoch")
        ax.set_ylabel("loss")
        ax.grid(linestyle="--", linewidth=0.5)
        ax.spines["top"].set_visible(False)
        ax.spines["right"].set_visible(False)
        ax.spines["bottom"].set_visible(False)
        ax.spines["left"].set_visible(False)
        ax = fig.add_subplot(2, 1, 2)
        ax.plot(list(range(len(self))), self.acc, label="train", color="b")
        ax.set_title("Accuracy History")
        ax.set_xlabel("epoch")
        ax.set_ylabel("accuracy in %")
        ax.set_ylim(0.0, 100.0)
        ax.grid(linestyle="--", linewidth=0.5)
        ax.spines["top"].set_visible(False)
        ax.spines["right"].set_visible(False)
        ax.spines["bottom"].set_visible(False)
        ax.spines["left"].set_visible(False)
        fig.canvas.draw()
        fig.tight_layout()
        plt.show()

### Fit

In [None]:
def fit(model: Module, optim: Optimizer, loader: DataLoader, epochs: int, device: str) -> History:
    """fit
    
    Fit the given embedding model to the given dataset.
    Should use tqdm to display a progress bar and fill the history for each
    epoch (for the average loss and accuracy).
    """
    history = History()
    return history

### Hyperparameters

In [None]:
device = "cuda"
epochs = 50
batch_size = 512
lr = 1e-3

vocab_size = len(hp_cbow.vocab)
emb_dim = 512
h_dim = 128
context_size = hp_cbow.window_size - 1

### Loader, Model, and Optimizer

In [None]:
loader = DataLoader(hp_cbow, batch_size=batch_size, num_workers=4, pin_memory=True, shuffle=True)
model = Word2Vec(vocab_size, emb_dim, h_dim, context_size).to(device)
optim = AdamW(model.parameters(), lr=lr)

### Fitting

In [None]:
fit(model, optim, loader, epochs, device).plot()

## Embedding Visualization

Here we attempt to visualize the resulting embedding space. As the embedding dim is often bigger thant 3 dimension it is hard to visualize. In this type of scenario data scientists use a set of tools referred to as dimensionality reduction techniques. One of such as method is [t-SNE](https://jmlr.org/papers/volume9/vandermaaten08a/vandermaaten08a.pdf).

In the following cell, we apply the t-SNE method to every token of the vocabulary and reduce their embedding dimension to 2. This process will allow us to project the embeddings into a 2 dimensional scatter plot.

In [None]:
embedding = model.embedding.to("cpu")

vocab_emb = torch.empty((len(hp_cbow.vocab), emb_dim), dtype=torch.float32)
with torch.inference_mode():
    for word_id in range(len(hp_cbow.vocab)):
        # Fill the vocabulary embedding list for every word in the volcabulary
        ...

reduced_vocab_emb = TSNE(n_components=2).fit_transform(vocab_emb.numpy())

In [None]:
fig = plt.figure(figsize=(8, 8), dpi=400)
ax = fig.add_subplot(1, 1, 1)
ax.scatter(x=reduced_vocab_emb[1:, 0], y=reduced_vocab_emb[1:, 1], color="b", marker="+")
for i, word in hp_cbow.vocab._itos.items():
    if word == "<unk>": continue
    ax.annotate(word, reduced_vocab_emb[i])
ax.set_title("Harry Potter Word Embedding")
ax.set_xlabel("t-sne axis 1")
ax.set_ylabel("t-sne axis 2")
ax.grid(linestyle="--", linewidth=0.5)
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
ax.spines["bottom"].set_visible(False)
ax.spines["left"].set_visible(False)
fig.canvas.draw()
plt.show()

## Closest Words

The trained embedding makes word with similar contexts close to each other and pushes others far away. As such we can mesure the closest relationship between every word of the dictionnary by mesure their distance. Many distances can be used such as the euclidean distance (used here), the cosine distance, and more. They all have downsides and advantages that will not be discussed here.

In [None]:
def closest_words(ref_word: str, top_k: int = 10) -> None:
    ref_idx = torch.tensor(hp_cbow.vocab.stoi(ref_word), dtype=torch.long)
    ref_embedding = model.embedding(ref_idx)

    word_distances = []
    pdist = PairwiseDistance()
    for word in hp_cbow.vocab._stoi.keys():
        if word in [ref_word, "<unk>"]: continue
        idx = torch.tensor(hp_cbow.vocab.stoi(word), dtype=torch.long)
        embedding = model.embedding(idx)
        word_distances.append((word, pdist(ref_embedding, embedding).item()))
    word_distances.sort(key=lambda wd: wd[1])

    print("------------------------------")
    print(f"Closest words for {ref_word}:")
    for i, (word, dist) in enumerate(word_distances[:top_k]):
        print(f"{i + 1:>2} - {word:<12} | {dist:.2e}")
    print("------------------------------")

In [None]:
closest_words("harry")

In [None]:
closest_words("wizzard")

In [None]:
closest_words("wand")

In [None]:
closest_words("griffondor")

In [None]:
closest_words("slytherin")

In [None]:
closest_words("death")

In [None]:
closest_words("car")

## Word Arithmetic

By representing words as vectors, it naturally benefits from vector space operations. Thus, we can apply arithmetics operations to word and combine them in a meaningful manner.

In [None]:
def plus_minus(ref_word: str, plus_word: str, minus_word: str, top_k: int = 10) -> None:
    ref_idx = torch.tensor(hp_cbow.vocab.stoi(ref_word), dtype=torch.long)
    ref_embedding = model.embedding(ref_idx)

    plus_idx = torch.tensor(hp_cbow.vocab.stoi(plus_word), dtype=torch.long)
    plus_embedding = model.embedding(plus_idx)

    minus_idx = torch.tensor(hp_cbow.vocab.stoi(minus_word), dtype=torch.long)
    minus_embedding = model.embedding(minus_idx)

    op_embedding = ref_embedding + plus_embedding - minus_embedding

    word_distances = []
    pdist = PairwiseDistance()
    for word in hp_cbow.vocab._stoi.keys():
        if word in [ref_word, plus_word, minus_word, "<unk>"]: continue
        idx = torch.tensor(hp_cbow.vocab.stoi(word), dtype=torch.long)
        embedding = model.embedding(idx)
        word_distances.append((word, pdist(op_embedding, embedding).item()))
    word_distances.sort(key=lambda wd: wd[1])

    print("------------------------------")
    print(f"Closest words for {ref_word} + {plus_word} - {minus_word}:")
    for i, (word, dist) in enumerate(word_distances[:top_k]):
        print(f"{i + 1:>2} - {word:<12} | {dist:.2e}")
    print("------------------------------")

In [None]:
plus_minus("harry", "ginny", "hermione")

In [None]:
plus_minus("james", "lily", "child")

## Relationship Exploration

In [None]:
fig = plt.figure(figsize=(8, 8), dpi=400)
ax = fig.add_subplot(1, 1, 1)

friends   = ["harry", "ron", "hermione", "hagrid"]
voldemort = ["voldemort", "tom", "riddle", "lord"]
magic     = ["wand", "expelliarmus", "avada", "patronum", "leviosa"]
houses    = ["gryffindor", "ravenclaw", "hufflepuff", "slytherin"]
quidditch = ["quidditch", "quaffle", "chase", "broomstick", "gold", "snitch"]

c = []
for i, word in hp_cbow.vocab._itos.items():
    found = False
    if word == "<unk>"  : continue
    if word in friends  : found = True; c.append((1, 0, 0, 1))
    if word in voldemort: found = True; c.append((0, 1, 0, 1))
    if word in magic    : found = True; c.append((0, 0, 1, 1))
    if word in houses   : found = True; c.append((1, 0, 1, 1))
    if word in quidditch: found = True; c.append((1, 1, 0, 1))
    if found: ax.annotate(word, reduced_vocab_emb[i])
    else: c.append((0, 0, 0, 0.1))

ax.scatter(x=reduced_vocab_emb[1:, 0], y=reduced_vocab_emb[1:, 1], c=c, marker="+")
ax.set_title("Harry Potter Colored Word Embedding")
ax.set_xlabel("t-sne axis 1")
ax.set_ylabel("t-sne axis 2")
ax.grid(linestyle="--", linewidth=0.5)
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
ax.spines["bottom"].set_visible(False)
ax.spines["left"].set_visible(False)
fig.canvas.draw()
plt.show()