In [1]:
"""A tiny word2vec model for learning."""

'A tiny word2vec model for learning.'

In [19]:
import collections 
import math 
import random 
import torch
from nlp import Vocab

In [3]:
with open("../data/ptb/ptb.train.txt") as f:
    raw_text = f.read()
sentences = [line.split() for line in raw_text.split('\n')]

In [4]:
vocab = Vocab(sentences, min_freq=10)
len(vocab)

6719

## Subsampling

In [5]:
T = 1e-4

In [6]:
def discard_probability(t: float, freq: int, num_tokens: int) -> float:
    """
    Calculates the probability for this word to be discarded. 

    Parameters
    ----------
    t : float
        Hyperparameter to adjust for subsampling.
    freq : int
        Frequency of the word in the corpus.
    num_tokens : int
        Total number of tokens in the corpus.

    Returns
    -------
    float
        The probability for discarding this word.
    """
    return max(1 - math.sqrt(t / (freq / num_tokens)), 0)

def keep(prob: float) -> bool:
    """
    Returns True if this word is kept under the roll of a imaginary dice.

    Parameters
    ----------
    prob : float
        Probability for keeping

    Returns
    -------
    bool
        To keep or not to keep :)
    """
    rand = random.uniform(0, 1)
    return rand < prob

def subsample(sentences: list[list[str]], unk: str) -> tuple[list[list[str]], collections.Counter]:
    """
    Subsamples the words in the vocabulary according to their frequencies.

    Parameters
    ----------
    words : list[list[str]]
        All the words in the corpus
    unk : str
        The <unk> token in this case

    Returns
    -------
    tuple[list[list[str]], collections.Counter]
        The subsampled words and the counter
    """
    sentences = [[token for token in line if token != unk] for line in sentences]
    counter = collections.Counter([token for line in sentences for token in line])
    num_tokens = sum(counter.values())
    subsampled_sentences = [[token for token in line if keep(discard_probability(T, counter[token], num_tokens))] for line in sentences]
    return subsampled_sentences, counter

In [7]:
subsampled, counter = subsample(sentences, '<unk>')

## Extracting center words and context words 

In [8]:
def get_centers_and_contexts(corpus, max_window_size):
    """Return center words and context words in skip-gram."""
    centers, contexts = [], []
    for line in corpus:
        if len(line) < 2:
            continue
        centers += line
        # adding the context for each word in that line
        # such that each element in center corresponds to a sublist of contexts
        for i in range(len(line)): 
            window_size = random.randint(1, max_window_size)
            indices = list(range(max(0, i - window_size),
                                 min(len(line), i + 1 + window_size)))
            indices.remove(i)
            contexts.append([line[idx] for idx in indices])
    return centers, contexts

In [9]:
# the total number of center context pairs would thus be the length of the flattened context list, each each word in that flattened list can be paired with a center word
all_centers, all_contexts = get_centers_and_contexts(subsampled, 5)

## Negative sampling

The softmax objective is too expensive to compute, negative sampling modifies the learning objective to make the learning problem much easier to approach. 

The problem is now, given two words, predict if they are context target pairs. We just extracted positive context target pair, and now we want negative examples - words that are not context target pairs. 

We define a logistic model, where c is the context word, t is the target word, and y is the label.

$ P(y = 1 | c, t) = \sigma (\theta_t^T e_c) $

In [10]:
K = 3 # number of negative examples per positive example

This is a much more simplified version of the algorithm that draws negative samples from the one described in the paper. Instead of drawing negative samples from a specific distribution, we are just going to randomly draw K of them from outside the context window. 

In [11]:
def get_negative_samples(all_centers, all_contexts, sentences, K):
    all_negatives = []
    for center, context, sentence in zip(all_centers, all_contexts, sentences):
        difference = [word for word in sentence if word not in context and word != center]
        all_negatives.append(difference[:K])
    return all_negatives

In [12]:
all_negatives = get_negative_samples(all_centers, all_contexts, subsampled, K)

### Looking at some of the negatives and contexts just to make sure they look right

In [13]:
all_negatives[:5]

[[],
 ['as', 'a', 'nov.'],
 ['mr.', 'is', 'chairman'],
 ['old', 'and', 'former'],
 ['of', 'once', 'used']]

In [14]:
all_contexts[:5]

[['years', 'will'],
 ['N', 'will', 'the'],
 ['N', 'years', 'the', 'as'],
 ['N', 'years', 'will', 'as', 'a', 'nov.', 'N'],
 ['N', 'years', 'will', 'the', 'a', 'nov.', 'N']]

## Constructing the dataset with negatives and contexts

In [15]:
# converting the raw words into their indices in the vocabulary 
centers = vocab[all_centers]
contexts = vocab[all_contexts]
negatives = vocab[all_negatives]

In [35]:
len(centers), len(contexts), len(negatives)

(472608, 472608, 42069)

In [22]:
def batchify(data):
    """Return a minibatch of examples for skip-gram with negative sampling."""
    max_len = max(len(c) + len(n) for _, c, n in data)
    centers, contexts_negatives, masks, labels = [], [], [], []
    for center, context, negative in data:
        cur_len = len(context) + len(negative)
        centers += [center]
        contexts_negatives += [context + negative + [0] * (max_len - cur_len)]
        masks += [[1] * cur_len + [0] * (max_len - cur_len)]
        labels += [[1] * len(context) + [0] * (max_len - len(context))]
    return (torch.tensor(centers).reshape((-1, 1)), torch.tensor(
        contexts_negatives), torch.tensor(masks), torch.tensor(labels))

In [34]:
class PTBDataset(torch.utils.data.Dataset):
    def __init__(self, centers, contexts, negatives):
        assert len(centers) == len(contexts) == len(negatives)
        self.centers = centers
        self.contexts = contexts
        self.negatives = negatives
        
    def __getitem__(self, index):
        return (self.centers[index], self.contexts[index],
                    self.negatives[index])
        
    def __len__(self):
        return len(self.centers)

NUM_WORKERS = 1
BATCH_SIZE = 50
dataset = PTBDataset(centers, contexts, negatives)
data_iter = torch.utils.data.DataLoader(dataset, BATCH_SIZE, shuffle=True, collate_fn=batchify, num_workers=NUM_WORKERS)

AssertionError: 

In [24]:
batch

(tensor([[1],
         [1]]),
 tensor([[2, 2, 3, 3, 3, 3],
         [2, 2, 2, 3, 3, 0]]),
 tensor([[1, 1, 1, 1, 1, 1],
         [1, 1, 1, 1, 1, 0]]),
 tensor([[1, 1, 0, 0, 0, 0],
         [1, 1, 1, 0, 0, 0]]))

In [26]:
centers[0]

27

In [33]:
contexts[0]

[6697, 6612]

In [32]:
negatives[0]

[]