# Setup


In [1]:
import nltk
from nltk.corpus import reuters

import numpy as np
import random
import re
import time

import torch
import torch.nn as nn
import torch.optim as optim

from scipy.stats import spearmanr
from collections import Counter

In [None]:
# Detect the device for computation (CPU/GPU/Metal on Mac 💻)
device = torch.device(
    "mps" if torch.backends.mps.is_available() else ("cuda" if torch.cuda.is_available() else "cpu")
)
print(f"Using device: {device}")

In [None]:
# Ensure required datasets are downloaded
nltk.download("reuters")
nltk.download("punkt")

# Skip-gram Modeling


In [None]:
sample_size = 500
# sample_size = len(reuters.fileids())


def build_corpus():
    corpus = []

    for id in reuters.fileids()[:sample_size]:
        sentences = reuters.words(id)
        sentences = [sentence.lower() for sentence in sentences if sentence.isalpha()]
        corpus.append(sentences)

    return corpus


corpus = build_corpus()
print(f"Sentences in the corpus: {len(corpus)}")
print(f"Words in the corpus: {sum(len(sentence) for sentence in corpus)}")

In [None]:
def build_vocab(corpus):
    # Flatten words from all sentences
    flatten_words = [word for sentence in corpus for word in sentence]
    word_counts = Counter(flatten_words)

    # Unique words
    vocab = list(set(flatten_words))
    vocab.append("<UNKNOWN>")

    word2index = {word: index for index, word in enumerate(vocab)}
    word2index["<UNKNOWN>"] = 0
    word2index

    return vocab, len(vocab), word2index, word_counts


all_vocabs, vocab_size, word2index, word_counts = build_vocab(corpus)
print(f"Unique words in the corpus: {vocab_size}")

In [None]:
def build_skipgrams(corpus, word2index, window_size=2):
    """
    Generate skip-gram pairs from corpus
    """
    skip_grams = []
    skip_grams_words = []

    for sentence in corpus:
        for position, center_word in enumerate(sentence):
            center_index = word2index[center_word]
            context_indices = list(
                [
                    i
                    for i in range(
                        max(
                            position - window_size, 0
                        ),  # Context words on the left. If none, then 0
                        min(
                            position + window_size + 1, len(sentence)
                        ),  # Context words on the right, If none, then 0
                    )
                    if i != position  # Exclude itself
                ]
            )
            for index in context_indices:
                context_word = sentence[index]
                context_index = word2index[context_word]
                skip_grams.append(
                    (center_index, context_index)
                )  # A tuple representing a skip-gram pair (indices)
                skip_grams_words.append(
                    (center_word, context_word)
                )  # A tuple representing a skip-gram pair (words)

    return skip_grams, skip_grams_words


skip_grams, skip_grams_words = build_skipgrams(corpus, word2index, window_size=2)
print(f"Skip-gram pairs from corpus: {len(skip_grams)}")

In [7]:
def build_unigram_table(word_counts, power=0.75):
    total_count = sum([count for count in word_counts.values()])
    Z = 0.001
    unigram_table = []

    for word, count in word_counts.items():
        # score = (count / total_count) ** power
        # unigram_table.extend([word] * int(score * 1e6))
        unigram_table.extend([word] * int(((count / total_count) ** power) / Z))
    return unigram_table


unigram_table = build_unigram_table(word_counts)

In [8]:
def to_number_sequence(all_vocabs, word2index):
    """
    Convert a sequence of words into a sequence of numerical indices
    """
    indices = list(
        map(  # Apply lambda function to each word in all_vocabs
            lambda w: word2index[w] if word2index.get(w) is not None else word2index["<UNKNOWN>"],
            all_vocabs,
        )
    )
    return torch.LongTensor(indices).to(device)  # List of indices is converted to PyTorch tensor

# Co-occurrence + Weighting


In [9]:
# X_ik_skipgram = Counter(skip_grams_words)
# X_ik_skipgram

In [10]:
# def weighting(word_i, word_j, X_ik):
#     try:
#         x_ij = X_ik[(word_i, word_j)]
#     except:
#         x_ij = 1

#     x_max = 100
#     alpha = 0.75

#     if x_ij < x_max:
#         result = (x_ij / x_max) ** alpha
#     else:
#         result = 1

#     return result


# from itertools import combinations_with_replacement

# X_ik = {}
# weighting_dict = {}

# for bi_gram in combinations_with_replacement(all_vocabs, 2):
#     if X_ik_skipgram.get(bi_gram) is not None:
#         co_occurrence = X_ik_skipgram[bi_gram]
#         X_ik[bi_gram] = co_occurrence + 1
#         X_ik[(bi_gram[1], bi_gram[0])] = co_occurrence + 1  # Opposite
#     else:
#         pass

#     weighting_dict[bi_gram] = weighting(bi_gram[0], bi_gram[1], X_ik)
#     weighting_dict[(bi_gram[1], bi_gram[0])] = weighting(bi_gram[1], bi_gram[0], X_ik)  # Opposite

# print(f"{X_ik=}")
# print(f"{weighting_dict=}")

# Class


In [11]:
class Skipgram(nn.Module):  # nn.Module is the base class for all neural network modules in PyTorch

    def __init__(self, vocab_size, embed_size, mode="softmax"):
        super(Skipgram, self).__init__()
        self.mode = mode
        self.embedding_v = nn.Embedding(vocab_size, embed_size)
        self.embedding_u = nn.Embedding(vocab_size, embed_size)

    def forward(self, center_words, target_words, all_vocabs, negative_words=None):
        # Create embedding vectors for center words, target words, and all words
        center_embeds = self.embedding_v(center_words)
        target_embeds = self.embedding_u(target_words)
        all_embeds = self.embedding_u(all_vocabs)

        if self.mode == "softmax":
            # Dot product between the embeddings of the center word and the context word is computed.
            # This measures how similar the center word is to the context word.
            scores = target_embeds.bmm(center_embeds.transpose(1, 2)).squeeze(2)

            # Dot product between the embeddings of the center word and all words in the vocabulary is computed.
            # This is used to normalize the scores across the entire vocabulary (denominator in the softmax function).
            norm_scores = all_embeds.bmm(center_embeds.transpose(1, 2)).squeeze(2)

            # Negative log of the softmax probability is taken. This is the loss for a single prediction.
            # The overall loss is the average of these values across all predictions in the batch.
            negative_log_likelihood = (-1) * (
                torch.mean(
                    torch.log(torch.exp(scores) / torch.sum(torch.exp(norm_scores), 1).unsqueeze(1))
                )
            )
        elif self.mode == "negative_sampling":
            # Create embedding vectors for negative words
            negative_embeds = self.embedding_u(negative_words)

            # Compute the dot product between center and target word embeddings
            # positive_socre = torch.sum(center_embeds * target_embeds, dim=1)
            positive_score = target_embeds.bmm(center_embeds.transpose(1, 2)).squeeze(2)
            # Compute the dot product between center and negative word embeddings
            # negative_score = torch.bmm(negative_embeds, center_embeds.unsqueeze(2)).squeeze()
            negative_score = negative_embeds.bmm(center_embeds.transpose(1, 2))

            # Compute the positive loss using the sigmoid function
            positive_loss = torch.log(torch.sigmoid(positive_score))
            # Compute the negative loss using the sigmoid function
            negative_loss = torch.sum(torch.log(torch.sigmoid(-negative_score)), dim=1)

            # Compute the negative log likelihood as the mean of the positive and negative losses
            negative_log_likelihood = -torch.mean(positive_loss + negative_loss)

        return negative_log_likelihood

    def get_embedding(self, word_index):
        return (
            self.embedding_v(torch.tensor([word_index], dtype=torch.long).to(device))
            .detach()
            .cpu()
            .numpy()
        )
        # return self.embedding_v(torch.LongTensor([word_index]).to(device)).detach().cpu().numpy()

In [12]:
class GloVe(nn.Module):
    def __init__(self, vocab_size, embed_size):
        super(GloVe, self).__init__()
        self.embedding_v = nn.Embedding(vocab_size, embed_size)
        self.embedding_u = nn.Embedding(vocab_size, embed_size)
        self.v_bias = nn.Embedding(vocab_size, 1)
        self.u_bias = nn.Embedding(vocab_size, 1)

    def forward(self, center_words, target_words, co_occurrences, weighting):
        center_embeds = self.embedding_v(center_words)
        target_embeds = self.embedding_u(target_words)

        center_bias = self.v_bias(center_words).squeeze(1)
        target_bias = self.u_bias(target_words).squeeze(1)

        inner_product = target_embeds.bmm(center_embeds.transpose(1, 2)).squeeze(2)

        loss = weighting * torch.power((inner_product + center_bias + target_bias), 2)

        return torch.sum(loss)

In [13]:
# Hyperparameters
batch_size = 128
negative_size = 5
embed_size = 100
epochs = 10


def train_skipgrams(mode, skip_grams, all_vocabs, vocab_size):
    if mode not in ["softmax", "negative_sampling"]:
        raise ValueError("Invalid mode.")

    # Model and Optimizer
    model = Skipgram(vocab_size=vocab_size, embed_size=embed_size, mode=mode).to(device)
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    # Convert all_vocabs (list -> tensor) and expands it to match the batch size
    all_vocabs = to_number_sequence(all_vocabs, word2index).expand(batch_size, vocab_size)

    all_losses = []

    # Training loop
    for epoch in range(epochs):
        epoch_loss = []  # Loss for each epoch
        start_time = time.time()

        # Shuffle the skip_grams every epoch
        random.shuffle(skip_grams)

        for i in range(0, len(skip_grams), batch_size):
            current_batch = skip_grams[i : i + batch_size]

            if len(current_batch) == 0:
                continue

            # Unpack current_batch by splitting into two variables
            # center_words = all first elements from each pair
            # target_words = all second elements from each pair
            center_words, target_words = zip(*current_batch)

            # Convert from tuples to 2D numpy arrays
            center_words = np.array(center_words).reshape(-1, 1)
            target_words = np.array(target_words).reshape(-1, 1)

            # Convert from 2D numpy arrays to tensors
            center_words = torch.LongTensor(center_words).to(device)
            target_words = torch.LongTensor(target_words).to(device)

            # RuntimeError: Expected size for first two dimensions of batch2 tensor to be: [128, 200] but got: [124, 200]
            # Solution: Pad the batch if it's smaller than batch_size
            if center_words.size(0) < batch_size:
                padding_size = batch_size - center_words.size(0)
                center_words = torch.cat(
                    [center_words, torch.zeros(padding_size, 1, dtype=torch.long).to(device)], dim=0
                ).to(device)
                target_words = torch.cat(
                    [target_words, torch.zeros(padding_size, 1, dtype=torch.long).to(device)], dim=0
                ).to(device)

            # Reset the gradients
            optimizer.zero_grad()

            if mode == "softmax":
                # Computes the loss using model's forward method
                loss = model(center_words, target_words, all_vocabs)
            elif mode == "negative_sampling":
                negative_words = []
                for _ in range(len(center_words)):
                    negative_samples = []
                    while len(negative_samples) < negative_size:
                        word = random.choice(unigram_table)
                        if word2index[word] not in target_words:
                            negative_samples.append(word2index[word])
                    negative_words.append(negative_samples)
                negative_words = torch.LongTensor(negative_words).to(device)
                # print(negative_words.size())

                loss = model(center_words, target_words, all_vocabs, negative_words)

            # Backpropagate the loss
            loss.backward()
            # Updates model parameters
            optimizer.step()

            # print(
            #     f"Epoch {epoch+1} | Batch {i+1}/{(len(skip_grams)+1)} | Center {center_words.size()} | Target {target_words.size()} | Loss {loss.item()}"
            # )

            epoch_loss.append(loss.item())

            elapsed_time = time.time() - start_time

        print(f"Epoch: {epoch+1:3d} | Loss: {np.mean(epoch_loss):5.5f} | Time: {elapsed_time:5.5f}")
        all_losses.append(np.mean(epoch_loss))

    return model, np.mean(all_losses)


def train_glove():
    pass

In [None]:
model_sm, avg_loss_sm = train_skipgrams(
    mode="softmax", skip_grams=skip_grams, all_vocabs=all_vocabs, vocab_size=len(all_vocabs)
)

In [15]:
model_ng, avg_loss_ng = train_skipgrams(
    mode="negative_sampling",
    skip_grams=skip_grams,
    all_vocabs=all_vocabs,
    vocab_size=len(all_vocabs),
)

In [None]:
model_skipgram_sm_path = f"model_skipgram_sm_b{batch_size}_em{embed_size}_ep{epochs}.pth"
model_skipgram_ng_path = (
    f"model_skipgram_ng_b{batch_size}_em{embed_size}_ep{epochs}_neg{negative_size}.pth"
)

torch.save(model_sm, model_skipgram_sm_path)
torch.save(model_ng, model_skipgram_ng_path)

print("Models saved")

In [None]:
model_skipgram_sm_path = f"model_skipgram_sm_b{batch_size}_em{embed_size}_ep{epochs}.pth"
model_skipgram_ng_path = (
    f"model_skipgram_ng_b{batch_size}_em{embed_size}_ep{epochs}_neg{negative_size}.pth"
)

loaded_model_sm = Skipgram(vocab_size=len(all_vocabs), embed_size=embed_size, mode="softmax").to(
    device
)
loaded_model_sm.load_state_dict(torch.load(model_skipgram_sm_path, map_location=device))

loaded_model_ng = Skipgram(
    vocab_size=len(all_vocabs), embed_size=embed_size, mode="negative_sampling"
).to(device)
loaded_model_ng.load_state_dict(torch.load(model_skipgram_ng_path, map_location=device))

print("Models loaded")

# Skip-gram Plotting


In [None]:
def get_embed(model, word):
    # Convert word to tensor of its index
    id_tensor = torch.LongTensor([word2index[word]]).to(device)
    # Get the embedding vectors for the word from both embedding matrices
    v_embed = model.embedding_v(id_tensor)
    u_embed = model.embedding_u(id_tensor)
    # Average the two embedding vectors to get the final word embedding
    word_embed = (v_embed + u_embed) / 2
    # Extract the x and y coordinates from the embedding
    x, y = word_embed[0][0].item(), word_embed[0][1].item()

    return x, y


import matplotlib.pyplot as plt

plt.figure(figsize=(6, 3))
for i, word in enumerate(all_vocabs[30:50]):  # loop each unique vocab
    x, y = get_embed(loaded_model_sm, word)
    plt.scatter(x, y)
    plt.annotate(word, xy=(x, y), xytext=(5, 2), textcoords="offset points")
plt.show()

plt.figure(figsize=(6, 3))
for i, word in enumerate(all_vocabs[30:50]):  # loop each unique vocab
    x, y = get_embed(loaded_model_ng, word)
    plt.scatter(x, y)
    plt.annotate(word, xy=(x, y), xytext=(5, 2), textcoords="offset points")
plt.show()

# Skip-gram Analogies


In [19]:
def evaluate_analogies(model, analogy_file, word2index):
    # Initialize counters for correct and total predictions
    semantic_correct = 0
    semantic_total = 0
    syntactic_correct = 0
    syntactic_total = 0

    # Create a reverse mapping from index to word
    index2word = {val: key for key, val in word2index.items()}

    section = None

    with open(analogy_file, "r") as f:
        # Lines starting with ":" indicate a new section
        for line in f:
            if line.startswith(":"):
                section = line.strip()[1:].lower()
                continue

            # Split each line into words and convert to lowercase
            words_in_line = line.strip().lower().split()
            # Check if all words are present in word2index
            if all(word in word2index for word in words_in_line):
                first_word, second_word, third_word, expected_word = words_in_line
                # Get embeddings for the words
                first_word_emb = model.get_embedding(word2index[first_word])
                second_word_vec = model.get_embedding(word2index[second_word])
                third_word_vec = model.get_embedding(word2index[third_word])

                # Find the word in the vocabulary whose embedding is closest to the predicted vector
                predicted_idx = np.argmax(
                    np.dot(
                        model.embedding_v.weight.detach().cpu().numpy(),
                        (second_word_vec - first_word_emb + third_word_vec).T,
                    )
                )

                # Retrieve word using predicted index
                predicted_word = index2word[predicted_idx]

                # Check if the section is semantic or syntactic and update counters
                if section == "capital-common-countries":
                    semantic_total += 1
                    if predicted_word == expected_word:
                        semantic_correct += 1

                elif section == "past-tense":
                    syntactic_total += 1
                    if predicted_word == expected_word:
                        syntactic_correct += 1

    # Calculate accuracy ratios
    semantic_accuracy = semantic_correct / semantic_total if semantic_total > 0 else 0
    syntactic_accuracy = syntactic_correct / syntactic_total if syntactic_total > 0 else 0

    # Print the results
    print(f"Model: {model.mode}")
    print(f"  Semantic Accuracy: {semantic_accuracy:.4f}")
    print(f"  Syntactic Accuracy: {syntactic_accuracy:.4f}")

In [None]:
analogy_file = "word-test.v1.txt"

evaluate_analogies(loaded_model_sm, analogy_file, word2index)
evaluate_analogies(loaded_model_ng, analogy_file, word2index)

In [None]:
def find_analogy(model, word_a, word_b, word_c, word2index, index2word):
    if word_a not in word2index or word_b not in word2index or word_c not in word2index:
        return "One or more words are not in the vocabulary."

    # Get embeddings for the words
    word_a_emb = model.get_embedding(word2index[word_a])
    word_b_emb = model.get_embedding(word2index[word_b])
    word_c_emb = model.get_embedding(word2index[word_c])

    # Find the word in the vocabulary whose embedding is closest to the predicted vector
    predicted_idx = np.argmax(
        np.dot(
            model.embedding_v.weight.detach().cpu().numpy(),
            (word_b_emb - word_a_emb + word_c_emb).T,
        )
    )

    # Retrieve word using predicted index
    predicted_word = index2word[predicted_idx]

    return predicted_word


# Create a reverse mapping from index to word
index2word = {val: key for key, val in word2index.items()}

# Test samples from Reuters corpus
samples = [
    ("harry", "potter", "making"),
]

for word_a, word_b, word_c in samples:
    result_sm = find_analogy(loaded_model_sm, word_a, word_b, word_c, word2index, index2word)
    result_ng = find_analogy(loaded_model_ng, word_a, word_b, word_c, word2index, index2word)
    print(f"If {word_a} is to {word_b}, then {word_c} is to {result_sm} (softmax)")
    print(f"If {word_a} is to {word_b}, then {word_c} is to {result_ng} (negative sampling)")

# Skip-gram Similarities


In [22]:
def evaluate_similarities(model, similarity_file, word2index):
    """Evaluate similarity correlation using Spearman correlation."""
    human_scores = []  # List to store human similarity scores
    model_scores = []  # List to store model similarity scores

    # Open the similarity file and read line by line
    with open(similarity_file, "r") as f:
        for line in f:
            # Split each line into word1, word2, and human_score
            word1, word2, human_score = line.strip().split()
            # Check if both words are in the word2index dictionary
            if word1 in word2index and word2 in word2index:
                # Get the embeddings for both words
                word1_vec = model.get_embedding(word2index[word1]).squeeze()
                word2_vec = model.get_embedding(word2index[word2]).squeeze()

                # Calculate the similarity score using dot product
                model_score = np.dot(word1_vec, word2_vec)
                # Append the human score and model score to their respective lists
                human_scores.append(float(human_score))
                model_scores.append(model_score)

    # Calculate the Spearman correlation between human scores and model scores
    correlation, _ = spearmanr(human_scores, model_scores)
    # Calculate the mean squared error between human scores and model scores
    mse = np.mean((np.array(human_scores) - np.array(model_scores)) ** 2)

    # Print the results
    print(f"Model: {model.mode}")
    print(f"  Spearman Correlation: {correlation:.4f}")
    print(f"  MSE: {mse}")

In [None]:
similarity_file = "wordsim_similarity_goldstandard.txt"

evaluate_similarities(loaded_model_sm, similarity_file, word2index)
evaluate_similarities(loaded_model_ng, similarity_file, word2index)

In [None]:
def compare_similarity(model, word1, word2, similarity_file, word2index):
    """Compare similarity score of two words with human score."""
    # Get embeddings for both words
    word1_vec = model.get_embedding(word2index[word1]).squeeze()
    word2_vec = model.get_embedding(word2index[word2]).squeeze()

    # Calculate the similarity score using dot product
    model_score = np.dot(word1_vec, word2_vec)

    # Read the human score from the similarity file
    human_score = None
    with open(similarity_file, "r") as f:
        for line in f:
            w1, w2, score = line.strip().split()
            if (w1 == word1 and w2 == word2) or (w1 == word2 and w2 == word1):
                human_score = float(score)
                break

    if human_score is None:
        return f"No human score found for words: {word1}, {word2}"

    # Print the results
    print(f"Model: {model.mode}")
    print(f"  Words: {word1}, {word2}")
    print(f"  Model Similarity Score: {model_score:.4f}")
    print(f"  Human Similarity Score: {human_score:.4f}")
    print(f"  Difference: {abs(model_score - human_score):.4f}")


# Example usage
# Read the similarity file and extract word pairs
with open(similarity_file, "r") as f:
    word_pairs = [line.strip().split()[:2] for line in f]

# Randomly pick two words from the similarity file and ensure they exist in word2index
while True:
    word1, word2 = random.choice(word_pairs)
    if word1 in word2index and word2 in word2index:
        break

compare_similarity(loaded_model_sm, word1, word2, similarity_file, word2index)
compare_similarity(loaded_model_ng, word1, word2, similarity_file, word2index)