In [None]:
!pip install datasets
!gdown https://drive.google.com/file/d/1-mB6idLW5Jg4aE68jOj5NDcDxRNlMXpu/view?usp=sharing --fuzzy

Downloading...
From: https://drive.google.com/uc?id=1-mB6idLW5Jg4aE68jOj5NDcDxRNlMXpu
To: /content/TestSet_sample.csv
100% 51.5k/51.5k [00:00<00:00, 90.3MB/s]


In [None]:
import torch
torch.manual_seed(0)

<torch._C.Generator at 0x7a0ee8b06b10>

In [None]:
from datasets import load_dataset
from random import sample

wikitext = load_dataset("wikipedia", "20220301.simple")
trim_dataset = sample(wikitext['train']['text'], 5000)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


In [None]:
trim_dataset[0]

'Dale Crover (born October 23, 1967) is an American musician. He is best known as the drummer for the band, Melvins. Crover has also played the bands Men of Porn, Shrinebuilder, and Nirvana. He is the current vocalist and guitarist for the band Altamont.\n\nCareer\nAfter Mike Dillard left Melvins, the band brought Crover in to drum for them. He was recruited out of an Iron Maiden cover band. In late 1985, Crover formed the band Fecal Matter with Kurt Cobain and Greg Hokanson. Hokanson would later leave the band. After he left, Cobain and Crover decided to record Illiteracy Will Prevail on a 4-track in December 1985 at Cobain\'s aunt\'s home in Seattle, Washington. The band would break up in 1986.\n\nCrover would end up drumming on Nirvana\'s ten-song demo that was recorded on January 23, 1988 at Reciprocal Recording Studios in Seattle. He played a 14-song show with Nirvana in Tacoma, Washington, the night of the demo session. Three of the cuts from the show "Downer", "Floyd the Barber"

In [None]:
import nltk
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [None]:
import string
import re
from nltk.corpus import stopwords

def preprocess_data(data):
    """ Method to clean text from noise and standarize text across the different classes.
        The preprocessing includes converting to joining all datapoints, lowercase, removing punctuation, and removing stopwords.
    Arguments
    ---------
    text : List of String
       Text to clean
    Returns
    -------
    text : String
        Cleaned and joined text
    """

    stop_words = set(stopwords.words('english'))

    text = ' '.join(data)  # join all text in one single string
    text = text.lower()  # make everything lower case
    text = re.sub(r'\n', ' ', text)  # remove \n characters
    text = re.sub(r'references', '', text)  # remove word "References"
    text = re.sub(r'[^\w\s]', '', text)  # remove any punctuation or special characters
    text = re.sub(r'\d+', '', text)  # remove all numbers
    text = ' '.join([word for word in text.split() if word not in stop_words])  # remove all stopwords

    return text

# Preprocess the data
text = preprocess_data(trim_dataset)



In [None]:
def vocab_frequency(text):
    """ Creates dictionary of frequencies based on a dataset.
    Arguments
    ---------
    text : string
        Preprocessed text
    Returns
    -------
    vocab_dict : dictionary
        Dictionary of words and their frequencies with the format {word: frequency}
    """
    vocab_dict = {}

    # Split the text into words
    words = text.split()

    # Count the frequency of each word
    for word in words:
        vocab_dict[word] = vocab_dict.get(word, 0) + 1
    return vocab_dict

# Create the vocabulary
vocabulary = vocab_frequency(text)

In [None]:
len(vocabulary)

59951

In [None]:
import torch
import torch.nn as nn

def word_to_index(vocabulary):
    """ Method to create vocabulary to index mapping.
    Arguments
    ---------
    vocabulary : Dictionary
       Dictionary of format {word:frequency}
    Returns
    -------
    word_to_index : Dictionary
        Dictionary mapping words to index with format {word:index}
    """
    word_to_index = {'OOV': 0}  # Initialize with 'OOV' at index 0

    # Assign indices to words in the vocabulary
    for index, word in enumerate(vocabulary.keys(), start=1):  # Start from 1 since 0 is reserved for 'OOV'
        word_to_index[word] = index

    return word_to_index

# Create the word_to_index mapping
word_to_index = word_to_index(vocabulary)

In [None]:
def generate_dataset(data, window_size, word_to_index):
    """ Method to generate training dataset for CBOW.
    Arguments
    ---------
    data : String
       Training dataset
    window_size : int
       Size of the context window
    word_to_index : Dictionary
       Dictionary mapping words to index with format {word:index}
    Returns
    -------
    surroundings : N x W Tensor
        Tensor with index of surrounding words, with N being the number of samples and W being the window size
    targets : Tensor
        Tensor with index of target word
    """
    surroundings = []
    targets = []
    data = data.split()

    for i in range(window_size, len(data) - window_size):
        # Get surrounding words based on window size
        surrounding = data[i-window_size:i] + data[i+1:i+window_size+1]

        # Get target word (middle word)
        target = data[i]

        # Convert words to indices, using 'OOV' for unknown words
        surrounding_indices = [word_to_index.get(word, word_to_index['OOV']) for word in surrounding]
        target_index = word_to_index.get(target, word_to_index['OOV'])

        # Append to surroundings and targets
        surroundings.append(surrounding_indices)
        targets.append(target_index)

    # Convert lists to tensors
    surroundings = torch.tensor(surroundings, dtype=torch.long)
    targets = torch.tensor(targets, dtype=torch.long)

    return surroundings, targets

# Generate the dataset
t_surroundings, t_targets = generate_dataset(text, 2, word_to_index)

In [None]:
class CBOW(nn.Module):
    def __init__(self, vocab_size, embed_dim=300):
        """ Class to define the CBOW model
        Attributes
        ---------
        vocab_size : int
            Size of the vocabulary
        embed_dim : int
            Size of the embedding layer
        """
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.linear = nn.Linear(embed_dim, vocab_size)

    def forward(self, x):
        # Pass input through embedding layer
        emb = self.embedding(x)

        # Average and resize (size must be batch_size x embed_dim)
        average = torch.mean(emb, dim=1)

        # Pass through linear layer
        out = self.linear(average)

        return out

In [None]:
from torch.utils.data import DataLoader
#creation of dataloader for training
train_dataloader=DataLoader(list(zip(t_surroundings,t_targets)),batch_size=64,shuffle=True) #Here please change batch size depending of your GPU capacities (if GPU runs out of memory lower batch_size)

In [None]:
from tqdm import tqdm

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = CBOW(len(word_to_index)).to(device)
loss_function = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
epochs = 10

# Training loop
for epoch in range(epochs):
    total_loss = 0
    for surr, tar in tqdm(train_dataloader):
        surr, tar = surr.to(device), tar.to(device)
        output = model(surr)
        loss = loss_function(output, tar)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    # Print epoch loss
    print(f"Epoch {epoch + 1} loss: {total_loss / len(train_dataloader)}")

100%|██████████| 7611/7611 [00:26<00:00, 282.66it/s]


Epoch 1 loss: 8.845444558406374


100%|██████████| 7611/7611 [00:25<00:00, 295.54it/s]


Epoch 2 loss: 6.683306341367989


100%|██████████| 7611/7611 [00:25<00:00, 293.33it/s]


Epoch 3 loss: 5.471606830278155


100%|██████████| 7611/7611 [00:25<00:00, 293.54it/s]


Epoch 4 loss: 4.566382996498121


100%|██████████| 7611/7611 [00:25<00:00, 294.78it/s]


Epoch 5 loss: 3.842703691842662


100%|██████████| 7611/7611 [00:25<00:00, 294.39it/s]


Epoch 6 loss: 3.2657571008399318


100%|██████████| 7611/7611 [00:25<00:00, 294.77it/s]


Epoch 7 loss: 2.8229455292232717


100%|██████████| 7611/7611 [00:25<00:00, 294.06it/s]


Epoch 8 loss: 2.475040219738862


100%|██████████| 7611/7611 [00:25<00:00, 292.93it/s]


Epoch 9 loss: 2.185140357696028


100%|██████████| 7611/7611 [00:25<00:00, 294.41it/s]

Epoch 10 loss: 1.9363827782835663





In [None]:
def get_embedding(word, model, word_to_index):
    """ Method to get the embedding vector for a given word.
    Arguments
    ---------
    word : String
       Word given
    model : nn.Module
       CBOW model
    word_to_index : Dictionary
       Dictionary mapping words to index with format {word:index}
    Returns
    -------
    word_embedding : Tensor
        Embedding vector for the given word
    """
    # Get word index
    index = word_to_index.get(word, word_to_index['OOV'])

    with torch.no_grad():
        # Get the weights of the embedding layer
        embedding_weights = model.embedding.weight
        embedding_weights.requires_grad = False
        # Extract the embedding vector for the given word index
        word_embedding = embedding_weights[index]
    return word_embedding

# Test the function by getting embedding of the word "shot"
shot_embedding = get_embedding("shot", model, word_to_index)
print("Embedding for 'shot':")
print(shot_embedding)
print("\nEmbedding shape:", shot_embedding.shape)

Embedding for 'shot':
tensor([-0.5104,  0.6486, -1.4537, -1.9213, -1.3372, -0.2679,  0.4715, -2.7087,
         1.7029,  0.4720,  0.9303,  0.0696,  0.1501, -1.3730, -1.3949, -1.4177,
        -1.7958,  1.8108, -1.1981, -0.8408, -0.4461, -0.3431, -0.5385,  0.3779,
         1.6149, -1.3431, -1.7388,  0.3018,  0.4861,  1.3020,  1.7395,  3.3552,
         1.6584,  0.3735,  0.6640,  2.8699, -1.4258,  0.2059, -0.3016, -0.1784,
        -0.4232, -1.9736,  0.8814,  0.0944,  0.0386, -1.2488, -0.5565,  3.4759,
        -1.6080,  0.8086, -0.2412,  1.3416, -0.9223,  0.8879,  1.4856,  0.8504,
         0.8220, -1.2019,  0.9576,  1.7681, -1.6925, -0.1288,  1.2276,  1.4811,
         1.4687, -0.0197,  0.8070,  0.5399,  2.4207,  1.7647,  2.2903,  0.0362,
        -2.7118, -1.0168,  0.7816, -0.5663,  1.1888,  0.7267,  0.9805,  0.4547,
        -0.5105, -1.2837,  1.3420, -0.0076, -1.0942, -1.5712,  1.2057,  0.0062,
        -3.1115,  2.4083, -2.8160, -0.1335, -1.0055,  0.2373,  1.0636,  0.5883,
        -0.7589, -

In [None]:
def cosine_similarity(v1, v2):
    """ Method to calculate cosine similarity between two vectors.
    Arguments
    ---------
    v1 : Tensor
       First vector
    v2 : Tensor
       Second vector
    Returns
    -------
    cosine_similarity : float
        Cosine similarity between v1 and v2
    """
    dot_product = torch.dot(v1, v2)
    magnitude_v1 = torch.norm(v1)
    magnitude_v2 = torch.norm(v2)
    if magnitude_v1.item() == 0 or magnitude_v2.item() == 0:
        return 0.0
    cosine_similarity = dot_product / (magnitude_v1 * magnitude_v2)
    return cosine_similarity

In [None]:
def get_k_nearest_words(k, word, vocabulary, model, word_to_index):
    """ Method to find the k nearest words of a given vector
    Arguments
    ---------
    k : int
       Number of nearest words to return
    word : str or torch.Tensor
       Word or embedding vector for the given word
    vocabulary : Dictionary
       Dictionary mapping words to frequency with format {word:frequency}
    model : nn.Module
       CBOW model
    word_to_index : Dictionary
       Dictionary mapping words to index with format {word:index}
    Returns
    -------
    similar : List of Strings
        List of k nearest words to the given word
    """
    similarity_scores = torch.zeros(len(vocabulary))

    # Check if the input is a word (string) or an embedding (tensor)
    if isinstance(word, torch.Tensor):
        word_embedding = word
    else:
        word_embedding = get_embedding(word, model, word_to_index)

    # Fill similarity scores matrix using the word and our cosine_similarity function
    for i, (w, _) in enumerate(vocabulary.items()):
        current_word_embedding = get_embedding(w, model, word_to_index)
        similarity_scores[i] = cosine_similarity(word_embedding, current_word_embedding)

    # Get the k highest similarity scores
    k_first = torch.topk(similarity_scores, k)

    # Create a list of the k nearest words
    similar = [list(vocabulary.keys())[i] for i in k_first.indices]

    return similar

In [None]:
import pandas as pd

def test_analogy(model, word_to_index, analogy_file):
    """ Method to test accuracy of CBOW embeddings on analogy tasks.
    Arguments
    ---------
    model : nn.Module
       CBOW model
    word_to_index : Dictionary
       Dictionary mapping words to index with format {word:index}
    analogy_file : String
       File containing analogy tasks
    Returns
    -------
    accuracy : float
        accuracy of the model on the analogy tasks
    """
    df = pd.read_csv(analogy_file)
    df = df[df.category=='capital-common-countries']  # using capital cities subset of test set
    correct = 0
    total = 0

    for index, row in df.iterrows():
        # Extract words and standardize to lowercase
        word_one = row['word_one'].lower()
        word_two = row['word_two'].lower()
        word_three = row['word_three'].lower()
        word_four = row['word_four'].lower()

        try:
            # Get embeddings of all words
            embedding_one = get_embedding(word_one, model, word_to_index)
            embedding_two = get_embedding(word_two, model, word_to_index)
            embedding_three = get_embedding(word_three, model, word_to_index)
            embedding_four = get_embedding(word_four, model, word_to_index)

            result = embedding_two - embedding_one + embedding_three
            prediction = get_k_nearest_words(10, result, vocabulary, model, word_to_index)
            is_correct = word_four in prediction
            print(f"Test Analogy: {word_one} => {word_two} || {word_three} => {word_four} || Prediction: {prediction}, Ground Truth: {is_correct}")
            # Check if word_four is in prediction
            if is_correct:
                correct += 1
            total += 1
        except KeyError:
            # Skip this analogy if any word is not in the vocabulary
            continue

    if total != 0:
        accuracy = correct / total
    else:
        return 'No word was found in the embeddings'
    return accuracy

# Test the analogy function
accuracy = test_analogy(model, word_to_index, 'TestSet_sample.csv')
print(f"Accuracy on analogy tasks: {accuracy}")

In [None]:
import numpy as np
import pandas as pd
import torch
import sys

from sklearn.manifold import TSNE
import plotly.graph_objects as go

# Assuming the model is already defined and trained
model.eval()

# Get embeddings for the first 1000 words
words = list(word_to_index.keys())[:1000]
embeddings = model.embedding.weight.detach().cpu().numpy()[:1000]

# Create TSNE
tsne = TSNE(n_components=2, random_state=0)
tsne_results = tsne.fit_transform(embeddings)

# Create a DataFrame for easier plotting
df = pd.DataFrame({
    'x': tsne_results[:, 0],
    'y': tsne_results[:, 1],
    'word': words
})

# Create the plot
fig = go.Figure()

fig.add_trace(go.Scatter(
    x=df['x'],
    y=df['y'],
    mode='markers+text',
    text=df['word'],
    textposition="top center",
    hoverinfo='text',
    marker=dict(size=5, color=df['x'], colorscale='Viridis', showscale=True)
))

fig.update_layout(
    title='t-SNE visualization of word embeddings',
    xaxis_title='t-SNE dimension 1',
    yaxis_title='t-SNE dimension 2',
    width=1000,
    height=800
)

# fig.show()
fig.write_image("t-SNE_word_embeddings_10epc_5k.png")