# Word2Vec (Negative Sampling)

**Import Libraries**

In [1]:
import time
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from collections import Counter
import nltk
from nltk.corpus import reuters, stopwords

nltk.download('reuters')
nltk.download('stopwords')

# to ensure to produce same random number to debug and model comparison
random.seed(42)
np.random.seed(42)
torch.manual_seed(42)


[nltk_data] Downloading package reuters to
[nltk_data]     C:\Users\acer\AppData\Roaming\nltk_data...
[nltk_data]   Package reuters is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\acer\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


<torch._C.Generator at 0x1abff2b6d30>

**Prepare the dataset**

In [2]:
stop_words = set(stopwords.words('english'))
sentences = []

for fileid in reuters.fileids():
    words = [
        w.lower()
        for w in reuters.words(fileid)
        # clean the dataset document by removing stopwords
        if w.isalpha() and w.lower() not in stop_words
    ]
    sentences.append(words)

print("Total sentences:", len(sentences))

Total sentences: 10788


In [3]:
# set vocab limit to avoid training the model fneeds GPU for londer period
VOCAB_LIMIT = 10000
UNK_TOKEN = "<UNK>"

all_words = [w for sentence in sentences for w in sentence]
word_counts = Counter(all_words)

# build the vocabulary
vocab = [UNK_TOKEN] + [
    word for word, _ in word_counts.most_common(VOCAB_LIMIT - 1)
]

word2index = {word: idx for idx, word in enumerate(vocab)}
index2word = {idx: word for word, idx in word2index.items()}

vocab_size = len(vocab)

print("Vocabulary size:", vocab_size)
print("UNK index:", word2index[UNK_TOKEN])


Vocabulary size: 10000
UNK index: 0


In [4]:
corpus = []
for sentence in sentences:
    indexed_sentence = [
        # each word is replaced by its index from word2index
        # this is to make dataset ready for model training
        word2index.get(word, word2index[UNK_TOKEN])
        for word in sentence
    ]
    corpus.append(indexed_sentence)


**Prepare train Data**

In [5]:
# dynamic windows where default is 2
def random_batch(batch_size, corpus, window_size=2):
    input_batch = []
    label_batch = []

    while len(input_batch) < batch_size:

        # pick a random sentence index
        sentence_idx = random.randint(0, len(corpus) - 1)
        sentence = corpus[sentence_idx]

        # ensure sentence is long enough to pick a center and context word
        if len(sentence) < 2 * window_size + 1:
            continue

        # pick a random center word position within the sentence
        center_word_pos = random.randint(window_size, len(sentence) - 1 - window_size)
        center_word_index = sentence[center_word_pos]

        # pick a random context word position within the window around the center word
        # and ensure it's not the center word itself
        context_word_pos = random.choice(
            list(range(center_word_pos - window_size, center_word_pos + window_size + 1))
        )
        if context_word_pos == center_word_pos:
            continue
        context_word_index = sentence[context_word_pos]

        input_batch.append(center_word_index)
        label_batch.append(context_word_index)

    return input_batch, label_batch


**Unigram Table**

$$P(w)=U(w)^{3/4}/Z$$

In [None]:
num_total_words = sum(word_counts.values())

z = sum([count ** 0.75 for count in word_counts.values()])

unigram_table = []

for v in vocab:
    if v == "<UNK>":
        continue
    uw = word_counts[v] / num_total_words
    uw_alpha = int((uw ** 0.75) / z * 1e9)  # Increased scale factor
    unigram_table.extend([v] * uw_alpha)

print("Unigram table size:", len(unigram_table))

Unigram table size: 25225


**Model**



$$\mathbf{J}_{\text{neg-sample}}(\mathbf{v}_c,o,\mathbf{U})=-\log(\sigma(\mathbf{u}_o^T\mathbf{v}_c))-\sum_{k=1}^K\log(\sigma(-\mathbf{u}_k^T\mathbf{v}_c))$$

In [None]:
def prepare_sequence(seq, word2index):
    return torch.LongTensor([word2index[w] for w in seq])

In [None]:
import random

def negative_sampling(targets, unigram_table, k):
    batch_size = targets.shape[0]
    neg_samples = []
    for i in range(batch_size):  #(1, k)
        target_index = targets[i].item()
        nsample      = []
        while (len(nsample) < k):
            neg = random.choice(unigram_table)
            if word2index[neg] == target_index:
                continue
            nsample.append(neg)
        neg_samples.append(prepare_sequence(nsample, word2index).reshape(1, -1))

    return torch.cat(neg_samples)


In [None]:
batch_size = 128
x, y = random_batch(batch_size, corpus)
x_tensor = torch.LongTensor(x)
y_tensor = torch.LongTensor(y)

In [None]:
k = 5
neg_samples = negative_sampling(y_tensor, unigram_table, k)

In [None]:
y_tensor[1]

tensor(3)

In [None]:
neg_samples[1]

tensor([ 438, 2393, 1114,  590, 3016])

$$\mathbf{J}_{\text{neg-sample}}(\mathbf{v}_c,o,\mathbf{U})=-\log(\sigma(\mathbf{u}_o^T\mathbf{v}_c))-\sum_{k=1}^K\log(\sigma(-\mathbf{u}_k^T\mathbf{v}_c))$$

In [None]:
class SkipgramNeg(nn.Module):
    def __init__(self, vocab_size, embedding_dim):
        super().__init__()
        self.embedding_center = nn.Embedding(vocab_size, embedding_dim)
        self.embedding_outside = nn.Embedding(vocab_size, embedding_dim)

    def forward(self, center, outside, negative):
        center_emb = self.embedding_center(center).squeeze(1)
        outside_emb = self.embedding_outside(outside).squeeze(1)
        neg_emb = self.embedding_outside(negative)

        pos_score = torch.sum(center_emb * outside_emb, dim=1)
        pos_loss = torch.log(torch.sigmoid(pos_score))

        neg_score = torch.bmm(
            neg_emb, center_emb.unsqueeze(2)
        ).squeeze(2)
        neg_loss = torch.sum(torch.log(torch.sigmoid(-neg_score)), dim=1)

        return -torch.mean(pos_loss + neg_loss)

In [None]:
#test your model
embedding_dim = 100
batch_size = 128
num_epochs = 5000
learning_rate = 0.001
k = 5
WINDOW_SIZE = 2  # default (dynamic)


In [None]:
model = SkipgramNeg(vocab_size, embedding_dim)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)


In [None]:
x_batch, y_batch = random_batch(batch_size, corpus, window_size=WINDOW_SIZE)

input_tensor = torch.LongTensor(x_batch)
label_tensor = torch.LongTensor(y_batch)

neg_samples = negative_sampling(label_tensor, unigram_table, k)
test_loss = model(input_tensor, label_tensor, neg_samples)

print("Initial test loss (before training):", test_loss.item())

Initial test loss (before training): 21.834829330444336


**Training Model**

In [None]:
start_time = time.time()

for epoch in range(num_epochs):

    #get batch
    input_batch, label_batch = random_batch(batch_size, corpus)
    input_tensor = torch.LongTensor(input_batch)
    label_tensor = torch.LongTensor(label_batch)

    #predict
    neg_samples = negative_sampling(label_tensor, unigram_table, k)
    loss = model(input_tensor, label_tensor, neg_samples)

    #backprogate
    optimizer.zero_grad()
    loss.backward()

    #update alpha
    optimizer.step()

    #print the loss
    if (epoch + 1) % 500 == 0:
        print(f"Epoch {epoch+1:6.0f} | Loss: {loss:2.6f}")

end_time = time.time()

print(f"\nTraining completed in {end_time - start_time:.2f} seconds")

Epoch    500 | Loss: 22.048492
Epoch   1000 | Loss: 18.929867
Epoch   1500 | Loss: 20.520149
Epoch   2000 | Loss: 16.697109
Epoch   2500 | Loss: 16.945498
Epoch   3000 | Loss: 15.586607
Epoch   3500 | Loss: 13.788391
Epoch   4000 | Loss: 14.125816
Epoch   4500 | Loss: 11.145334
Epoch   5000 | Loss: 12.844395

Training completed in 113.68 seconds


**Save the trained Word2Vec model**

In [None]:
MODEL_PATH = "word2vec_neg_sampling.pth"

torch.save({
    "model_state_dict": model.state_dict(),
    "word2index": word2index,
    "index2word": index2word,
    "embedding_dim": embedding_dim
}, MODEL_PATH)

print("Model saved to", MODEL_PATH)


Model saved to word2vec_neg_sampling.pth


**Load the model**

In [None]:
checkpoint = torch.load("word2vec_neg_sampling.pth", map_location="cpu")

word2index = checkpoint["word2index"]
index2word = checkpoint["index2word"]
embedding_dim = checkpoint["embedding_dim"]

vocab_size = len(word2index)

model = SkipgramNeg(vocab_size, embedding_dim)
model.load_state_dict(checkpoint["model_state_dict"])
model.eval()


SkipgramNeg(
  (embedding_center): Embedding(10000, 100)
  (embedding_outside): Embedding(10000, 100)
)

In [None]:
import torch.nn.functional as F

embeddings = model.embedding_center.weight.data
embeddings = F.normalize(embeddings, dim=1)


**Evaluate semantic & syntactic accuracy**

In [None]:
# load word analogies dataset and returns two lists for semantic and synthetic evaluation
def load_analogy_dataset(filepath):
    semantic = []
    syntactic = []

    current_section = None

    with open(filepath, "r", encoding="utf-8") as f:
        for line in f:
            line = line.strip()

            if line.startswith(":"):
                if "capital-common-countries" in line:
                    current_section = "semantic"
                elif "past-tense" in line:
                    current_section = "syntactic"
                else:
                    current_section = None
                continue

            if current_section is None:
                continue

            words = line.lower().split()
            if len(words) != 4:
                continue

            if current_section == "semantic":
                semantic.append(words)
            else:
                syntactic.append(words)

    return semantic, syntactic


In [None]:
import torch
import torch.nn.functional as F

def analogy_accuracy(analogies, embeddings, word2index, index2word):
    correct = 0
    total = 0

    vocab_size = embeddings.size(0)

    for a, b, c, d in analogies:
        if a not in word2index or b not in word2index \
           or c not in word2index or d not in word2index:
            continue

        va = embeddings[word2index[a]]
        vb = embeddings[word2index[b]]
        vc = embeddings[word2index[c]]

        # Word2Vec analogy: b - a + c
        target_vec = vb - va + vc
        target_vec = F.normalize(target_vec.unsqueeze(0), dim=1)

        # Cosine similarity with all words
        similarities = torch.matmul(target_vec, embeddings.T).squeeze()

        # Exclude query words
        similarities[word2index[a]] = -1e9
        similarities[word2index[b]] = -1e9
        similarities[word2index[c]] = -1e9

        predicted_index = torch.argmax(similarities).item()
        predicted_word = index2word[predicted_index]

        if predicted_word == d:
            correct += 1

        total += 1

    return correct / total if total > 0 else 0


In [None]:
semantic, syntactic = load_analogy_dataset("/content/sample_data/word_analogies_dataset.txt")

semantic_acc = analogy_accuracy(
    semantic, embeddings, word2index, index2word
)

syntactic_acc = analogy_accuracy(
    syntactic, embeddings, word2index, index2word
)

print(f"Semantic accuracy: {semantic_acc:.4f}")
print(f"Syntactic accuracy: {syntactic_acc:.4f}")


Semantic accuracy: 0.0000
Syntactic accuracy: 0.0000


**Load similarity dataset**

In [None]:
import pandas as pd

# Load similarity dataset
sim_df = pd.read_csv("/content/sample_data/wordsim353crowd.csv")

sim_df.head()


Unnamed: 0,Word 1,Word 2,Human (Mean)
0,admission,ticket,5.536
1,alcohol,chemistry,4.125
2,aluminum,metal,6.625
3,announcement,effort,2.0625
4,announcement,news,7.1875


**Compute dot-product similarities**

In [None]:
model_sims = []
human_sims = []

UNK_INDEX = word2index.get("<UNK>")

for _, row in sim_df.iterrows():
    w1 = str(row[0]).lower()
    w2 = str(row[1]).lower()
    human_score = float(row[2])

    idx1 = word2index.get(w1, UNK_INDEX)
    idx2 = word2index.get(w2, UNK_INDEX)

    v1 = embeddings[idx1]
    v2 = embeddings[idx2]

    dot_sim = torch.dot(v1, v2).item()

    model_sims.append(dot_sim)
    human_sims.append(human_score)


  w1 = str(row[0]).lower()
  w2 = str(row[1]).lower()
  human_score = float(row[2])


**Spearman correlation**

In [None]:
from scipy.stats import spearmanr

correlation, p_value = spearmanr(model_sims, human_sims)

print(f"Spearman Correlation: {correlation:.4f}")
print(f"P-value: {p_value:.4e}")


Spearman Correlation: 0.1065
P-value: 4.5555e-02
