Reference:

Ozeren, E. (n.d.). Word2Vec from scratch with Python. Medium.
https://medium.com/@enozeren/word2vec-from-scratch-with-python-1bba88d9f221

Mikolov, T., Chen, K., Corrado, G., & Dean, J. (2013). Efficient estimation of
word representations in vector space. arXiv preprint arXiv:1301.3781.

In [62]:
import random
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import nltk
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords

nltk.download('punkt_tab')
nltk.download('stopwords')

[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [63]:
#step1 : Generate corpus
# We define 20 words in ourcorpus
dog_cat_words = ['dog', 'cat', 'pet', 'house', 'animal', 'sleep', 'play']
family_words = ['girl', 'boy', 'father', 'mother', 'family', 'house', 'marriage']
king_queen_words = ['crown', 'queen', 'king', 'empire', 'country', 'rule', 'castle']

#We will shuffle these words and generate random long sequences to create  out dataset
dog_cat_text = ''
family_text =''
king_queen_text=''
for _ in range (10000):
  random.shuffle(dog_cat_words)
  dog_cat_text = dog_cat_text + ' ' + ' '.join(dog_cat_words)
  random.shuffle(family_words)
  family_text = family_text + ' ' + ' '.join(family_words)
  random.shuffle(king_queen_words)
  king_queen_text = king_queen_text + ' ' + ' '.join(king_queen_words)



small_corpus = dog_cat_text + family_text + king_queen_text

In [65]:
# Write corpus to file
file_name = "small_corpus.txt"
with open(file_name, 'w') as f:
    f.write(small_corpus)
# Read corpus from file
file_path = "small_corpus.txt"
with open(file_path, "r") as f:
    text = f.read()


In [66]:
# ========== step 2： Generate a bunch of CBOW training pairs  ==========
def generate_cbows(text, window_size):
    text = text.lower()
    words = word_tokenize(text)
    words = [word for word in words if word.isalpha()]
    unique_words = list(set(words))

    cbows = []
    for i in range(window_size, len(words) - window_size):
        target_word = words[i]
        context_words = []
        for j in range(i - window_size, i + window_size + 1):
            if j != i:
                context_words.append(words[j])
        cbows.append((context_words, target_word))

    return cbows, unique_words

window_size = 2
cbows, unique_words = generate_cbows(text, window_size)

In [67]:
# ========== step 3：One-Hot encoding ==========
def one_hot_encoding(word, unique_words):
    encoding = []
    for w in unique_words:
        if word == w:
            encoding.append(1)
        else:
            encoding.append(0)
    return torch.tensor(encoding, dtype=torch.float32)

one_hot_encodings = {}
for word in unique_words:
    one_hot_encodings[word] = one_hot_encoding(word, unique_words)

In [69]:
# ========== step4 converted to vector pair ==========
cbow_vector_pairs = []
for context_words, target_word in cbows:
    context_vectors = [one_hot_encodings[word] for word in context_words]
    target_vector = one_hot_encodings[target_word]
    cbow_vector_pairs.append((context_vectors, target_vector))

cbow_vector_pairs_summed = []
for context_vectors, target_vector in cbow_vector_pairs:
    context_sum = torch.stack(context_vectors).sum(dim=0)
    cbow_vector_pairs_summed.append((context_sum, target_vector))

In [70]:
# ========== step5：Create Dataset and DataLoader ==========
class Word2VecDataset(Dataset):
  def __init__(self, data):
      self.data = data

  def __len__(self):
      return len(self.data)

  def __getitem__(self, idx):
      return self.data[idx]

split_idx = int(0.8 * len(cbow_vector_pairs_summed))
train_data = cbow_vector_pairs_summed[:split_idx]
val_data = cbow_vector_pairs_summed[split_idx:]

train_dataset = Word2VecDataset(train_data)
val_dataset = Word2VecDataset(val_data)

train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)
validation_dataloader = DataLoader(val_dataset, batch_size=32, shuffle=False)

In [71]:
# ========== step6 Define model ==========
class NaiveWord2Vec(nn.Module):
    def __init__(self, VOCAB_SIZE, VECTOR_DIM):
        super().__init__()
        self.vocab_size = VOCAB_SIZE
        self.vector_dim = VECTOR_DIM
        self.W1 = nn.Parameter(torch.randn(VOCAB_SIZE, VECTOR_DIM, requires_grad=True))
        self.W2 = nn.Parameter(torch.randn(VECTOR_DIM, VOCAB_SIZE, requires_grad=True))

    def forward(self, x):
        hidden = torch.matmul(x, self.W1)
        output = torch.matmul(hidden, self.W2)
        return output

VOCAB_SIZE = len(unique_words)
VECTOR_DIM = 10

model = NaiveWord2Vec(VOCAB_SIZE, VECTOR_DIM)

In [72]:
# ========== step7 Define training function ==========
def train_model(model, train_dataloader, validation_dataloader, epochs, learning_rate, verbose=False):
    loss_fn = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(params=model.parameters(), lr=learning_rate)
    train_set_loss_log = []
    validation_set_loss_log = []

    for epoch in range(epochs):
        if verbose:
            print(f"Epoch: {epoch + 1}/{epochs}")

        #Training phase
        model.train()
        total_train_loss = 0.0
        num_train_batches = 0

        for inputs_batch, outputs_batch in train_dataloader:
            y_train_logits = model(inputs_batch)


            target_indices = torch.argmax(outputs_batch, dim=1)
            train_loss = loss_fn(y_train_logits, target_indices)

            optimizer.zero_grad()
            train_loss.backward()
            optimizer.step()

            total_train_loss += train_loss.item()
            num_train_batches += 1

        average_train_loss = total_train_loss / num_train_batches
        train_set_loss_log.append(average_train_loss)

        if verbose:
            print(f"  Training loss: {average_train_loss:.4f}")

        # Validation phase
        model.eval()
        total_validation_loss = 0.0
        num_validation_batches = 0

        with torch.inference_mode():
            for inputs_batch, outputs_batch in validation_dataloader:
                y_val_logits = model(inputs_batch)


                target_indices = torch.argmax(outputs_batch, dim=1)
                validation_loss = loss_fn(y_val_logits, target_indices)

                total_validation_loss += validation_loss.item()
                num_validation_batches += 1

        average_validation_loss = total_validation_loss / num_validation_batches
        validation_set_loss_log.append(average_validation_loss)

        if verbose:
            print(f"  Validation loss: {average_validation_loss:.4f}")

    return model, train_set_loss_log, validation_set_loss_log

In [73]:
# ========== step8 Training model ==========
model, train_losses, val_losses = train_model(
    model,
    train_dataloader,
    validation_dataloader,
    epochs=10,
    learning_rate=0.001,
    verbose=True
)

Epoch: 1/10
  Training loss: 2.3932
  Validation loss: 1.5851
Epoch: 2/10
  Training loss: 1.5704
  Validation loss: 1.5689
Epoch: 3/10
  Training loss: 1.5671
  Validation loss: 1.5719
Epoch: 4/10
  Training loss: 1.5664
  Validation loss: 1.5719
Epoch: 5/10
  Training loss: 1.5659
  Validation loss: 1.5681
Epoch: 6/10
  Training loss: 1.5660
  Validation loss: 1.5663
Epoch: 7/10
  Training loss: 1.5658
  Validation loss: 1.5707
Epoch: 8/10
  Training loss: 1.5657
  Validation loss: 1.5713
Epoch: 9/10
  Training loss: 1.5658
  Validation loss: 1.5682
Epoch: 10/10
  Training loss: 1.5656
  Validation loss: 1.5676


In [74]:
# ========== step9 Extract word vector ==========
params = list(model.parameters())
word_vectors = params[0].detach()
word_dict = {word: vector for word, vector in zip(unique_words, word_vectors)}

print("\nTraining completed!")
print(f"vocabulary size: {len(unique_words)}")
print(f"word vector dimension: {VECTOR_DIM}")

#  Examples of word vector
print("\n word vector（first 5 dimension）:")
for i, word in enumerate(list(unique_words)[:5]):
    print(f"  {word}: {word_dict[word][:5].numpy()}")

# cosine similarity
def cosine_similarity(v1, v2):
    return torch.dot(v1, v2) / (torch.norm(v1) * torch.norm(v2))


Training completed!
vocabulary size: 20
word vector dimension: 10

 word vector（first 5 dimension）:
  mother: [ 1.0194747 -1.3262519 -0.9400389 -0.6814113 -1.1793137]
  father: [ 0.05292026 -0.59184104 -1.726279   -1.2563657  -0.48930538]
  girl: [-0.05198452  0.9495627  -0.33856043 -0.50958073  0.15935855]
  dog: [-0.08309487 -0.73545176 -0.20419116 -1.3450007   0.20620972]
  sleep: [ 1.6995863  -0.28693202  0.13986307  0.51986074 -0.24724486]


In [75]:
# ========== Step10: What the model learned in training ==========
# Check similarity between words
print("\nWord Similarity Analysis:")
print("="*50)

# Words from the same topic should be more similar
if 'dog' in word_dict and 'cat' in word_dict:
    sim = cosine_similarity(word_dict['dog'], word_dict['cat'])
    print(f"dog <-> cat (both animals): {sim:.4f}")

if 'king' in word_dict and 'queen' in word_dict:
    sim = cosine_similarity(word_dict['king'], word_dict['queen'])
    print(f"king <-> queen (both royalty): {sim:.4f}")

if 'father' in word_dict and 'mother' in word_dict:
    sim = cosine_similarity(word_dict['father'], word_dict['mother'])
    print(f"father <-> mother (both family): {sim:.4f}")

# Words from different topics should be less similar
if 'dog' in word_dict and 'king' in word_dict:
    sim = cosine_similarity(word_dict['dog'], word_dict['king'])
    print(f"dog <-> king (different topics): {sim:.4f}")

if 'cat' in word_dict and 'crown' in word_dict:
    sim = cosine_similarity(word_dict['cat'], word_dict['crown'])
    print(f"cat <-> crown (different topics): {sim:.4f}")


Word Similarity Analysis:
dog <-> cat (both animals): 0.1536
king <-> queen (both royalty): 0.5359
father <-> mother (both family): 0.6979
dog <-> king (different topics): -0.5061
cat <-> crown (different topics): -0.4150
