In [None]:
import logging
import sys


date_strftime_format = "%Y-%m-%y %H:%M:%S"
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(asctime)s %(message)s", datefmt=date_strftime_format)

# Data

### Training data
- Source: http://mattmahoney.net/dc/text8.zip

### Analogies data
- Source: https://raw.githubusercontent.com/nicholas-leonard/word2vec/refs/heads/master/questions-words.txt
- Stored in: `data/analogies.txt`

# Defining model

## Model wrapper

In [None]:
from gensim.models import Word2Vec
from gensim.models.callbacks import CallbackAny2Vec

In [None]:
class StoreLossCurveCallback(CallbackAny2Vec):
    def __init__(self):
        self.epoch = 0
        self.last_logged_loss = 0
        self.loss_curve = []

    def on_epoch_end(self, model):
        curr_loss = model.get_latest_training_loss() - self.last_logged_loss
        self.last_logged_loss = model.get_latest_training_loss()

        self.loss_curve.append(curr_loss)

        print(
            f"Loss for epoch #{self.epoch}: {curr_loss}"
        )

        self.epoch += 1

In [None]:
class Word2VecModel:
    def __init__(
        self,
        model_type: str,
        window_size: int,
        embedding_size: int,
        min_word_count: int = 0,
        workers: int = 8,
    ):
        self.model_type = model_type
        self._sg = 1 if model_type ==  "skipgram" else 0
        self.window_size = window_size
        self.embedding_size = embedding_size
        self.min_word_count = min_word_count
        self.workers = workers
        self.compute_loss = True

        self._loss_container = StoreLossCurveCallback()
        self.loss_curve = []

        self.model =  None


    @property
    def wv(self):
        return self.model.wv


    def train(
        self,
        dataset: str,
        epochs: int,
    ):
        self.model = Word2Vec(
            sentences=dataset,
            sg=self._sg,
            window=self.window_size,
            vector_size=self.embedding_size,
            min_count=self.min_word_count,
            epochs=epochs,
            compute_loss=self.compute_loss,
            callbacks=[self._loss_container],
            workers=self.workers,
        )

        self.loss_curve = self._loss_container.loss_curve


## Searching best hyper-parameters configs

### Preparing test data

In [None]:
analogies_file_name = "../data/analogies.txt"

with open(analogies_file_name) as file:
    file_content = file.read().splitlines()

all_test_analogies = {}
last_key_added = None
for line in file_content:
    if line[0] == ":":
        last_key_added = line.replace(": ", "")
        all_test_analogies[last_key_added] = []

    else:
        all_test_analogies[last_key_added].append(
            line.lower().split(" ")
        )

### Defining evaluation

In [None]:
import numpy as np

In [None]:
def evaluate_analogy(model, analogy):
    def cosine_similarity(a, b):
        return (
            np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
        )

    w0_embedding = model.wv[analogy[0]]
    w1_embedding = model.wv[analogy[1]]
    w2_embedding = model.wv[analogy[2]]
    w3_embedding = model.wv[analogy[3]]

    return cosine_similarity(
        w0_embedding - w1_embedding,
        w2_embedding - w3_embedding,
    )

In [None]:
def build_report(model, test_analogies):
    total_ignored_analogies = 0

    report = {}
    for sub_category in test_analogies.keys():
        similarities = []
        for curr_sample in test_analogies[sub_category]:
            if all([model.wv.__contains__(sample) for sample in curr_sample]):
                curr_similarity = evaluate_analogy(model, curr_sample)
                similarities.append(curr_similarity)

            else:
                total_ignored_analogies += 1

        report[sub_category] = np.average(similarities)

    if total_ignored_analogies:
        print(
            f"[WARNING] A total of {total_ignored_analogies} samples were ignored because they contained "
            "words out of the model's vocabulary."
        )

    report["overall_average"] = np.average(list(report.values()))

    return report

### Grid search routine

In [None]:
from itertools import product
import gensim.downloader as gensim_downloader

In [None]:
train_dataset = gensim_downloader.load("text8")

In [None]:
def run_grid_search(
    train_dataset,
    param_grid: dict,
    test_analogies: dict,
    param_conditions_callback: callable = None,
    return_best: bool = False
):
    param_combinations = list(product(*param_grid.values()))

    param_keys = list(param_grid.keys())

    curr_train = 0

    results = []
    for params in param_combinations:
        param_dict = dict(zip(param_keys, params))

        if param_conditions_callback and not param_conditions_callback(param_dict):
            continue

        model = Word2VecModel(
            model_type=param_dict["model_type"],
            window_size=param_dict["window_size"],
            embedding_size=param_dict["embedding_size"],
            min_word_count=0,
            workers=12,
        )

        print(
            f"Starting training model {curr_train}."
        )

        model.train(
            dataset=train_dataset,
            epochs=param_dict["epochs"],
        )

        curr_model_report = build_report(model, test_analogies)
        score = curr_model_report["overall_average"]

        print(
            f"model_type: {model.model_type}, window_size: {model.window_size}, embedding_size: {model.embedding_size}"
        )
        print(f"Final score: {score}\n")

        results.append(
            {"params": param_dict, "score": score, "full_report": curr_model_report, "model": model}
        )

        curr_train += 1

    if not return_best:
        return results

    return max(results, key=lambda x: x["score"])

In [None]:
def is_valid_param_combination(selected_params: dict):
    return (
        selected_params["window_size"]
        <= selected_params["embedding_size"]
    )

In [None]:
%%time

param_grid = {
    "model_type": ["skipgram"],
    "embedding_size": [10, 50, 100],
    "window_size": [5, 15, 25],
    "epochs": [20],
}

grid_search_results = run_grid_search(
    train_dataset=train_dataset,
    param_grid=param_grid,
    test_analogies=all_test_analogies,
    param_conditions_callback=is_valid_param_combination,
    return_best=False
)

grid_search_results

In [None]:
import matplotlib.pyplot as plt

In [None]:
best_model = max(grid_search_results, key=lambda x: x["score"])
best_report = best_model["full_report"]
best_score, _ = best_model["model"].wv.evaluate_word_analogies("../data/analogies.txt")


plt.plot(best_model["model"].loss_curve)
plt.show()

In [None]:
worst_model = min(grid_search_results, key=lambda x: x["score"])
worst_report = worst_model["full_report"]
worst_score, _ = worst_model["model"].wv.evaluate_word_analogies("../data/analogies.txt")

plt.plot(worst_model["model"].loss_curve)
plt.show()