## Question
1. Word Representation using Word2Vec: Implement the skip-gram model with negative
sampling loss function for word embedding generation. Your implementation should include: [20
 Marks]
 (b) Implement the skip-gram model from scratch with negative sampling loss. [4]
 (c) Derive and implement the gradients for backpropagation. [4]
 (d) Train your model on the text8 dataset with appropriate hyperparameters (specify your choices
 and justify them). [3]
 (e) Evaluate the quality of your embeddings through: [4]
 • Visualization using SVD to project the embeddings to 2D space.
 • Word similarity analysis for semantically related words (e.g., “king”- “man” + “woman”
 ≈ “queen”).
 (f) Discuss the impact of key hyperparameters (e.g., embedding dimension, context window size,
 number of negative samples) on the quality of the learned representations. [2]

In [4]:
from torch.utils.data import Dataset
from collections import Counter
import random
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np


class Text8Dataset(Dataset):
  def __init__(self,text,window_size=2, num_negative_samples=5):
    self.window_size = window_size
    self.num_negative_samples = num_negative_samples
    self.word_counts = None
    self.vocab_to_int = None
    self.int_to_vocab = None
    self.word_indices = None
    self.skip_gram_pairs = []
    self.sampling_weights = None

    # Process the data
    trimmed_words = self.preprocess_data(text)
    self.create_lookup_tables(trimmed_words)
    self.generate_skip_gram()
    self.unigram_distribution()

  def preprocess_data(self,text):
    text = text.lower()
    text = text.replace('.', ' <PERIOD> ')
    text = text.replace(',', ' <COMMA> ')
    text = text.replace('"', ' <QUOTATION_MARK> ')
    text = text.replace(';', ' <SEMICOLON> ')
    text = text.replace('!', ' <EXCLAMATION_MARK> ')
    text = text.replace('?', ' <QUESTION_MARK> ')
    text = text.replace('(', ' <LEFT_PAREN> ')
    text = text.replace(')', ' <RIGHT_PAREN> ')
    text = text.replace('--', ' <HYPHENS> ')
    text = text.replace('?', ' <QUESTION_MARK> ')
    text = text.replace(':', ' <COLON> ')
    words = text.split()
    self.word_counts = Counter(words)
    trimmed_words = [word for word, count in self.word_counts.items() if count >= 5]
    return trimmed_words

  def create_lookup_tables(self,trimmed_words):
    #sorted_vocab = sorted(word_counts, key=word_counts.get, reverse=True)
    self.vocab = set(trimmed_words)
    self.vocab_to_int = {word: ii for ii, word in enumerate(self.vocab)}
    self.int_to_vocab = {ii: word for word, ii in self.vocab_to_int.items()}
    self.word_indices = [self.vocab_to_int[word] for word in trimmed_words]
    self.vocab_size= len(self.vocab)

  def generate_skip_gram(self):
      for i in range(len(self.word_indices)):
          center_word = self.word_indices[i]
          # Get context words within window
          for j in range(max(0, i - self.window_size),
                        min(len(self.word_indices), i + self.window_size + 1)):
              if i != j:
                  context_word = self.word_indices[j]
                  self.skip_gram_pairs.append((center_word, context_word))

  def unigram_distribution(self):
    word_freqs = np.array([count for word, count in self.word_counts.most_common() if word in self.vocab_to_int])
    word_freqs = word_freqs ** 0.75
    self.sampling_weights = word_freqs / np.sum(word_freqs)


  def subsampling_words(self,threshold=1e-5):
    total_count = sum(self.word_counts.values())
    word_freqs = {word: count/total_count for word, count in self.word_counts.items()}
    p_drop = {word: 1 - np.sqrt(threshold/word_freqs[word]) for word in self.word_counts}
    train_words = [word for word in self.word_counts if random.random() < (1 - p_drop[word])]
    return train_words

  def __len__(self):
    return len(self.skip_gram_pairs)

  def __getitem__(self,idx):
    center_word, context_word = self.skip_gram_pairs[idx]
    self.sampling_weights_tensor = torch.from_numpy(self.sampling_weights).float()
    neg_samples = torch.multinomial(
        self.sampling_weights_tensor,
        self.num_negative_samples,
        replacement=True
    ).numpy()
    return (
        torch.tensor(center_word, dtype=torch.long),
        torch.tensor(context_word, dtype=torch.long),
        torch.tensor(neg_samples, dtype=torch.long)
      )



In [5]:
def data_preprocessing(file_path):
    with open(file_path,'r') as file:
        data=file.read()
        print("read data")
    return data

file_path = "C:/users/pantm/Downloads/text8/text8.txt"
data = data_preprocessing(file_path)
dataset = Text8Dataset(data)
print("Vocab size is:", len(dataset))
# datasetiterator = iter(dataset)
# center_word, context_word, neg_samples = next(datasetiterator)

train_words = dataset.subsampling_words()
print("subsample vocab size",len(train_words))

read data
Vocab size is: 285154
subsample vocab size 250775


In [None]:
import torch
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt

class SkipGram:
    def __init__(self, vocab_size, embedding_dim, num_negative_samples=5, momentum=0.9,eta=0.01):
        self.vocab_size = vocab_size
        self.embedding_dim = embedding_dim
        self.num_negative_samples = num_negative_samples
        self.momentum = momentum
        self.train_losses=[]
        self.test_losses=[]

        # Initialize embeddings
        self.W1 = torch.randn(vocab_size, embedding_dim) * 0.1
        self.W2 = torch.randn(vocab_size, embedding_dim) * 0.1
        
        # init_range = np.sqrt(6.0 / (vocab_size + embedding_dim))
        # self.W1= torch.FloatTensor(vocab_size, embedding_dim).uniform_(-init_range, init_range)
        # self.W2 = torch.FloatTensor(vocab_size, embedding_dim).uniform_(-init_range, init_range)


        self.eta = eta

        # Initialize velocity terms for momentum
        self.v_W1 = torch.zeros_like(self.W1)
        self.v_W2 = torch.zeros_like(self.W2)

    def forward(self, target_word, context_word, negative_samples):
        target_emb = self.W1[target_word]
        context_emb = self.W2[context_word]
        neg_emb = self.W2[negative_samples]

        pos_score = torch.sum(target_emb * context_emb, dim=1)
        pos_loss = -F.logsigmoid(pos_score)

        neg_score = torch.bmm(neg_emb, target_emb.unsqueeze(2)).squeeze()
        neg_loss = -torch.sum(F.logsigmoid(-neg_score), dim=1)

        return pos_loss + neg_loss, target_emb, context_emb, neg_emb

    def compute_test_loss(self, test_dataloader):
        total_loss = 0
        with torch.no_grad():
            for target, context, neg_samples in test_dataloader:
                loss, _, _, _ = self.forward(target, context, neg_samples)
                total_loss += loss.mean().item()
        return total_loss / len(test_dataloader)


    def gradient_descent(self, train_dataloader,test_dataloader=None, n_epoch=10,early_stopping=5):
        best_loss = float('inf')
        no_improve = 0
        for epoch in range(n_epoch):
            train_loss=0
            for target_word, context_word, negative_samples in train_dataloader:
                loss, target_emb, context_emb, neg_emb = self.forward(target_word, context_word, negative_samples)
                train_loss += loss.mean().item()

                # Gradients for positive samples
                d_pos = -torch.sigmoid(-torch.sum(target_emb * context_emb, dim=1)).unsqueeze(1) * context_emb
                d_context = -torch.sigmoid(-torch.sum(target_emb * context_emb, dim=1)).unsqueeze(1) * target_emb

                # Gradients for negative samples
                d_neg = torch.sigmoid(torch.bmm(neg_emb, target_emb.unsqueeze(2)).squeeze()).unsqueeze(2) * neg_emb
                d_target_neg = torch.sum(torch.sigmoid(torch.bmm(neg_emb, target_emb.unsqueeze(2)).squeeze()).unsqueeze(2) * neg_emb, dim=1)

                # Compute momentum updates
                self.v_W1[target_word] = self.momentum * self.v_W1[target_word] - self.eta * (d_pos + d_target_neg)
                self.v_W2[context_word] = self.momentum * self.v_W2[context_word] - self.eta * d_context
                self.v_W2[negative_samples] = self.momentum * self.v_W2[negative_samples] - self.eta * d_neg

                # Apply updates
                self.W1[target_word] += self.v_W1[target_word]
                self.W2[context_word] += self.v_W2[context_word]
                self.W2[negative_samples] += self.v_W2[negative_samples]
            
            avg_train_loss = train_loss / len(train_dataloader)
            self.train_losses.append(avg_train_loss)
            
            if test_dataloader:
                avg_test_loss = self.compute_test_loss(test_dataloader)
                self.test_losses.append(avg_test_loss)
                print(f"Epoch {epoch+1}, Train Loss: {avg_train_loss:.4f}, Test Loss: {avg_test_loss:.4f}")
                
                # Early stopping
                if avg_test_loss < best_loss:
                    best_loss = avg_test_loss
                    no_improve = 0
                else:
                    no_improve += 1
                    if no_improve >= early_stopping:
                        print(f"Early stopping at epoch {epoch+1}")
                        break
            else:
                print(f"Epoch {epoch+1}, Train Loss: {avg_train_loss:.4f}")

    def get_word_embedding(self, word_idx):
        return self.W1[word_idx].detach().numpy()
    
    def plot_losses(self):
        """Plot training and test loss curves"""
        plt.figure(figsize=(10, 6))
        plt.plot(self.train_losses, label='Training Loss')
        if self.test_losses:
            plt.plot(self.test_losses, label='Test Loss')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.title('Training and Test Loss Over Epochs')
        plt.legend()
        plt.grid(True)
        plt.show()

: 

In [17]:
def train_skip_gram_with_negative_sampling(text, embedding_dim=100, window_size=2,
                                          num_negative_samples=5, batch_size=32, epochs=5,eta=0.01):
    dataset = Text8Dataset(text, window_size, num_negative_samples)
    print("Vocab size is:", dataset.vocab_size)
    dataloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True)

    model = SkipGram(dataset.vocab_size, embedding_dim, num_negative_samples,eta)
    model.gradient_descent(dataloader, n_epoch=epochs)

    return model, dataset.vocab_to_int, dataset.int_to_vocab

# def find_similar_words(word, model, vocab_to_int, idx_to_word, top_k=5):
#     if word not in vocab:
#         return []

#     word_idx = vocab_to_int[word]
#     word_vector = model.get_word_embedding(word_idx)

#     similarities = []
#     for idx in range(len(vocab)):
#         if idx != word_idx:
#             vector = model.get_word_embedding(idx)
#             similarity = np.dot(word_vector, vector) / (np.linalg.norm(word_vector) * np.linalg.norm(vector))
#             similarities.append((idx_to_word[idx], similarity))

#     similarities.sort(key=lambda x: x[1], reverse=True)
#     return similarities[:top_k]


def word_similarity_analysis(embeddings, vocab, idx_to_word, test_words=None):

    def cosine_similarity(vec1, vec2):
        return np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2))
    
    def find_most_similar(target_word, top_k=5):
        """Find top_k most similar words to target_word"""
        if target_word not in vocab:
            return []
            
        target_vec = embeddings[vocab[target_word]]
        similarities = []
        
        for word, idx in vocab.items():
            if word != target_word:
                sim = cosine_similarity(target_vec, embeddings[idx])
                similarities.append((word, sim))
        
        return sorted(similarities, key=lambda x: -x[1])[:top_k]
    
    def solve_analogy(a, b, c, top_k=5):
        if not all(w in vocab for w in [a, b, c]):
            return []
            
        vec = embeddings[vocab[b]] - embeddings[vocab[a]] + embeddings[vocab[c]]
        similarities = []
        
        for word, idx in vocab.items():
            if word not in [a, b, c]:
                sim = cosine_similarity(vec, embeddings[idx])
                similarities.append((word, sim))
        
        return sorted(similarities, key=lambda x: -x[1])[:top_k]
    

    if test_words is None:
        test_words = ['king', 'queen', 'man', 'woman', 'paris', 'france', 'london', 'england']
    
    print("\nWord Similarity Analysis:")
    print("="*50)
    
    # 1. Show similar words
    print("\nMost similar words:")
    for word in test_words[:4]:  
        similar = find_most_similar(word)
        print(f"{word}: {[w for w, _ in similar]}")
    
    # 2. Test analogies
    print("\nWord analogies:")
    analogies = [
        ('king', 'man', 'woman'),
        ('france', 'paris', 'london')
    ]
    
    for a, b, c in analogies:
        result = solve_analogy(a, b, c)
        print(f"{a} - {b} + {c} ≈ {result[0][0]} (similarity: {result[0][1]:.2f})")
        print(f"Other candidates: {[w for w, _ in result[1:3]]}\n")


if __name__ == "__main__":
    text = data[:4000000]
    model, vocab, idx_to_word = train_skip_gram_with_negative_sampling(
        text, embedding_dim=300, epochs=300, batch_size=128,eta=0.01
    )

    # test_words = ['king', 'queen', 'man', 'woman', 'violent', 'word', 'term', 'describe', 'philosophy', 'chief', 'anarchists']
    # # for each_word in test_words:
    # #     similar_words = find_similar_words(each_word, model, vocab, idx_to_word)
    # #     print(f"Words similar to '{each_word}': {similar_words}")
    
    word_similarity_analysis(model.W2, vocab, idx_to_word)



Vocab size is: 10713
Epoch 1, Train Loss: 4.1753
Epoch 2, Train Loss: 4.1597
Epoch 3, Train Loss: 4.1460
Epoch 4, Train Loss: 4.1335
Epoch 5, Train Loss: 4.1253
Epoch 6, Train Loss: 4.1161
Epoch 7, Train Loss: 4.1079
Epoch 8, Train Loss: 4.1015
Epoch 9, Train Loss: 4.0953
Epoch 10, Train Loss: 4.0893
Epoch 11, Train Loss: 4.0846
Epoch 12, Train Loss: 4.0801
Epoch 13, Train Loss: 4.0758
Epoch 14, Train Loss: 4.0725
Epoch 15, Train Loss: 4.0692
Epoch 16, Train Loss: 4.0651
Epoch 17, Train Loss: 4.0623
Epoch 18, Train Loss: 4.0602
Epoch 19, Train Loss: 4.0576
Epoch 20, Train Loss: 4.0544
Epoch 21, Train Loss: 4.0525
Epoch 22, Train Loss: 4.0505
Epoch 23, Train Loss: 4.0487
Epoch 24, Train Loss: 4.0467
Epoch 25, Train Loss: 4.0449
Epoch 26, Train Loss: 4.0434
Epoch 27, Train Loss: 4.0418
Epoch 28, Train Loss: 4.0401
Epoch 29, Train Loss: 4.0388
Epoch 30, Train Loss: 4.0370
Epoch 31, Train Loss: 4.0358
Epoch 32, Train Loss: 4.0346
Epoch 33, Train Loss: 4.0333
Epoch 34, Train Loss: 4.0319
Ep

KeyboardInterrupt: 

In [19]:
if __name__ == "__main__":
    text = data[:4000000]
    model, vocab, idx_to_word = train_skip_gram_with_negative_sampling(
        text, embedding_dim=300, epochs=300, batch_size=128,eta=0.01
    )

    # test_words = ['king', 'queen', 'man', 'woman', 'violent', 'word', 'term', 'describe', 'philosophy', 'chief', 'anarchists']
    # # for each_word in test_words:
    # #     similar_words = find_similar_words(each_word, model, vocab, idx_to_word)
    # #     print(f"Words similar to '{each_word}': {similar_words}")
    
    word_similarity_analysis(model.W2, vocab, idx_to_word)


Vocab size is: 10713
Epoch 1, Train Loss: 4.1589
Epoch 2, Train Loss: 4.1587
Epoch 3, Train Loss: 4.1584
Epoch 4, Train Loss: 4.1583
Epoch 5, Train Loss: 4.1581
Epoch 6, Train Loss: 4.1579


KeyboardInterrupt: 

In [None]:
%matplotlib inline
%config InlineBackend.figure_format = 'retina'

import matplotlib.pyplot as plt
from sklearn.manifold import TSNE
viz_words = 280
tsne = TSNE()
embed_tsne = tsne.fit_transform(model.W1[:viz_words, :])
fig, ax = plt.subplots(figsize=(16, 16))
for idx in range(viz_words):
    plt.scatter(*embed_tsne[idx, :], color='steelblue')
    plt.annotate(dataset.int_to_vocab[idx], (embed_tsne[idx, 0], embed_tsne[idx, 1]), alpha=0.7)