In [0]:
import os
import numpy as np
from typing import List, Tuple, Union

import tensorflow as tf

from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Embedding, Layer, Flatten

# CBOW

Continuous Bag of Words implementation. I hope it makes sense.

## Preprocessing

Read in alice text and then tokenize it.

In [0]:
# Removes sentences with fewer than 3 words
corpus = [sentence for sentence in raw_corpus if sentence.count(" ") >= 2]

# remove punctuation in text and fit tokenizer on entire corpus
tokenizer = Tokenizer(filters='!"#$%&()*+,-./:;<=>?@[\\]^_`{|}~\t\n'+"'")
tokenizer.fit_on_texts(corpus)

# convert text to sequence of integer values
corpus = tokenizer.texts_to_sequences(corpus)
corpus_size = sum(len(s) for s in corpus) # total number of words in the corpus
voc_size = len(tokenizer.word_index) + 1 # total number of unique words in the corpus

## Dataset generation

Our training task for CBOW is to guess a word based on the `window_size` words surrounding it either side.
Let's generate a dataset of `(window_words, word)` with fixed length window_words.

In [0]:
def generate_dataset(
    corpus: List[List[int]],
    window_size: int
) -> Tuple[np.ndarray, np.ndarray]:
    raw_x = []
    raw_y = []
    for line in corpus:
        for word_idx, word in enumerate(line):
            min_idx = max(0, word_idx - window_size)
            max_idx = min(len(line), word_idx + window_size)
            raw_x.append(
                line[min_idx : word_idx] + line[word_idx + 1 : max_idx + 1]
            )
            raw_y.append(word)
    x = pad_sequences(raw_x) # Pads all sequences to a fixed length
    y = np.array(raw_y)
    return x, y

In [0]:
window_size = 2
x, y = generate_dataset(corpus, window_size)

## Model definition and training

We then define the CBOW model and train it

We need a custom `AverageLayer` to average out the embeddings of the window words output from the embedding layer.

In [0]:
class AverageLayer(Layer):
    def __init__(self):
        super(AverageLayer, self).__init__()
    
    def call(self, inputs):
        return tf.math.reduce_mean(inputs, axis=1)

Then we can construct the model itself. We can vary the size of the embedding, optimizer and loss function

In [20]:
embedding_size = 100
cbow = Sequential([
    Embedding(voc_size, embedding_size, mask_zero=True, input_length=2 * window_size),
    AverageLayer(),
    Dense(voc_size, activation="softmax", use_bias=False, kernel_regularizer="l2")
])
cbow.compile(
    optimizer="adam",
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"]
)
cbow.summary()

Model: "sequential_1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
embedding_1 (Embedding)      (None, 4, 100)            255700    
_________________________________________________________________
average_layer_1 (AverageLaye (None, 100)               0         
_________________________________________________________________
dense_1 (Dense)              (None, 2557)              255700    
Total params: 511,400
Trainable params: 511,400
Non-trainable params: 0
_________________________________________________________________


In [21]:
early_stopping = EarlyStopping(monitor="val_loss", patience=3, restore_best_weights=True)
_ = cbow.fit(x, y, batch_size=16, epochs=3) #, validation_split=0.1, callbacks=[early_stopping])

Epoch 1/3
Epoch 2/3
Epoch 3/3


## Evaluation of embeddings

We first get the embedding layer from the model

In [0]:
embedding_layer = cbow.layers[0]

We then define a function to retrieve the embedding of a word. Can pass custom tokenizer or embedding_layer, but will otherwise use what we defined
above.

In [0]:
def embed(
    word: str,
    tokenizer: Tokenizer = tokenizer,
    embedding_layer: Embedding = embedding_layer
) -> np.ndarray:
    word_index = tokenizer.texts_to_sequences([word])
    output_tensor = embedding_layer(np.array(word_index))
    return tf.reshape(output_tensor, (-1, 1))

We also define a function that retrieves the closes word to a word vector

In [0]:
def cosine_similarity(
    v: Union[np.ndarray, tf.Tensor],
    w: Union[np.ndarray, tf.Tensor]) -> float:
    """v and w should be 1D-vectors, for which to calculate the cosine similarity"""
    
    assert isinstance(v, tf.Tensor) or isinstance(v, np.ndarray), "v must be tf.Tensor or np.ndarray"
    assert isinstance(w, tf.Tensor) or isinstance(w, np.ndarray), "w must be tf.Tensor or np.ndarray"
    
    v = v.numpy() if isinstance(v, tf.Tensor) else v
    w = w.numpy() if isinstance(w, tf.Tensor) else w
    
    if v.ndim > 1:
        v = v.flatten()
    if w.ndim > 1:
        w = w.flatten()

    return v @ w.T / (np.linalg.norm(v) * np.linalg.norm(w))
    
def closest_n_words(
    embedding: tf.Tensor,
    n: int = 1,
    tokenizer: Tokenizer = tokenizer,
    embedding_layer: Embedding = embedding_layer
) -> List[str]:    
    all_words = np.array(list(tokenizer.index_word.values()))
    all_embeddings = np.array([embed(word).numpy() for word in all_words]).squeeze()
    similarities = np.apply_along_axis(
        lambda row: cosine_similarity(row, embedding),
        1, all_embeddings
    )
    sorted_idx = np.argsort(similarities)
    
    return np.flip(all_words[sorted_idx[-n:]]).tolist()

Ideally, we might think that $e_\text{king} - e_\text{man} \approx e_\text{queen} - e_\text{woman}$. So if we try and find the closest word to 
$e_\text{king} - e_\text{queen} + e_\text{woman}$ it should be close to $e_\text{man}$

In [0]:
analogy = embed("king") - embed("queen") + embed("woman")

In [26]:
closest_n_words(analogy, 10)

['woman',
 'king',
 'prisoner',
 'english',
 'jurymen',
 'sea',
 'temper',
 'number',
 'calling',
 'box']

In [27]:
cosine_similarity(analogy, embed("man"))

0.94519687