In [13]:
import torch
import numpy as np
import pandas as pd
from datasets import load_dataset
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
from tqdm import tqdm
import torch.nn.functional as F
import torch.optim as optim
from collections import Counter
from io import BytesIO
import random
import re
import zipfile

In [8]:
class PreprocessText8:
    def __init__(self, min_count=5, batch_size=1000, subsample_threshold=1e-5):
        """
        Initializes the PreprocessText8 class.
        Args:
        - min_count: Minimum word frequency for vocabulary inclusion.
        - batch_size: Number of rows to process in each batch.
        - subsample_threshold: Threshold for subsampling frequent words.
        """
        self.min_count = min_count  # Minimum word frequency for vocabulary inclusion
        self.batch_size = batch_size  # Batch size for processing data
        self.subsample_threshold = subsample_threshold  # Threshold for subsampling frequent words
        self.vocab = None  # To store word-to-index mapping
        self.word_counts = None  # To store word frequencies

    def load_dataset(self, zip_filepath, parquet_filename, text_column):
        """
        Generator to load the dataset from a zipped Parquet file in batches.
        Args:
        - zip_filepath: Path to the ZIP file containing Parquet files.
        - parquet_filename: The specific Parquet file within the ZIP archive.
        - text_column: The column in the Parquet file containing the text data.
        """
        with zipfile.ZipFile(zip_filepath, "r") as z:
            with z.open(parquet_filename) as f:
                # Load the Parquet file into a pandas DataFrame in memory
                df = pd.read_parquet(BytesIO(f.read()), engine="pyarrow")

                # Process the data in batches
                df_iterator = df[text_column].astype(str).values
                for i in range(0, len(df_iterator), self.batch_size):
                    yield " ".join(df_iterator[i : i + self.batch_size])

    def preprocess_text(self, text):
        """
        Tokenize text by removing punctuation and splitting by spaces.
        Args:
        - text: Raw text data.
        Returns:
        - tokens: List of tokens.
        """
        text = text.lower()
        tokens = re.findall(r"\b[a-z]+\b", text)
        return tokens

    def build_vocab(self, tokens):
        """
        Build the vocabulary by counting word frequencies and filtering rare words.
        Args:
        - tokens: List of tokenized words.
        Returns:
        - vocab: Word-to-index mapping.
        - word_counts: Word frequencies.
        """
        word_counts = Counter(tokens)
        # Filter out words below the minimum count
        word_counts = {
            word: count
            for word, count in word_counts.items()
            if count >= self.min_count
        }

        # Create word-to-index mapping (vocabulary)
        vocab = {word: i for i, (word, _) in enumerate(word_counts.items(), start=1)}
        vocab["<UNK>"] = 0  # Unknown words get a default index of 0

        self.vocab = vocab
        self.word_counts = word_counts
        return vocab, word_counts

    def subsample_frequent_words(self, tokens):
        """
        Subsample frequent words based on the subsample_threshold to reduce their frequency.
        Args:
        - tokens: List of tokenized words.
        Returns:
        - subsampled_tokens: List of tokens after subsampling.
        """
        total_count = sum(self.word_counts.values())

        # Calculate the subsampling probability for each word
        subsample_probs = {
            word: 1 - np.sqrt(self.subsample_threshold / (count / total_count))
            for word, count in self.word_counts.items()
        }

        # Subsample the tokens based on their probability
        subsampled_tokens = [
            word for word in tokens if word not in self.word_counts or np.random.rand() > subsample_probs[word]
        ]

        return subsampled_tokens

    def filter_and_subsample(self, tokens):
        """
        Combines filtering of rare words and subsampling of frequent words.
        Args:
        - tokens: List of tokenized words.
        Returns:
        - processed_tokens: List of tokens after filtering and subsampling.
        """
        if self.vocab is None:
            raise ValueError("Vocabulary is not set. Please build the vocabulary before filtering.")
    
        # Replace rare words with <UNK>
        filtered_tokens = [word if word in self.vocab else "<UNK>" for word in tokens]
    
        # Subsample frequent words
        subsampled_tokens = self.subsample_frequent_words(filtered_tokens)
    
        return subsampled_tokens


    def text_to_indices(self, tokens):
        """
        Convert tokenized words to their corresponding indices from the vocabulary.
        Args:
        - tokens: List of tokens.
        Returns:
        - indices: List of indices corresponding to tokens.
        """
        indices = [self.vocab.get(word, self.vocab["<UNK>"]) for word in tokens]
        return indices

In [9]:
class SkipGramDataGenerator:
    def __init__(self, vocab, window_size=2):
        self.vocab = vocab
        self.window_size = window_size  # Context window size

    def generate_training_pairs(self, text_indices_batch):
        """
        Generates (input, context) pairs for a batch using the skip-gram model.
        Args:
        - text_indices_batch: List of word indices (batch of text).
        Returns:
        - pairs: List of (input_word, context_word) pairs.
        """
        pairs = []
        for i, target_word in enumerate(text_indices_batch):
            # Define the context window range
            start = max(i - self.window_size, 0)
            end = min(i + self.window_size + 1, len(text_indices_batch))

            # For each word in the window (except the target word), generate a pair
            for context_word in (
                text_indices_batch[start:i] + text_indices_batch[i + 1 : end]
            ):
                pairs.append((target_word, context_word))

        return pairs

In [16]:
class DatasetProcessor:
    def __init__(self, preprocessor, data_generator):
        self.preprocessor = preprocessor
        self.data_generator = data_generator

    def process_dataset(self, zip_filepath, parquet_filename, text_column):
        # Load the entire dataset
        data = next(self.preprocessor.load_dataset(zip_filepath, parquet_filename, text_column))
        # Preprocess the text
        tokens = self.preprocessor.preprocess_text(data)
        return tokens

    def build_vocab(self, zip_filepath, parquet_filename, text_column):
        # Process the dataset to get all tokens
        tokens = self.process_dataset(zip_filepath, parquet_filename, text_column)
        # Build the vocabulary
        vocab, word_counts = self.preprocessor.build_vocab(tokens)
        return vocab, word_counts

    def filter_and_subsample(self, zip_filepath, parquet_filename, text_column):
        # Process the dataset to get all tokens
        tokens = self.process_dataset(zip_filepath, parquet_filename, text_column)
        # Filter and subsample
        filtered_subsampled_tokens = self.preprocessor.filter_and_subsample(tokens)
        return filtered_subsampled_tokens

    def convert_to_indices(self, tokens):
        return self.preprocessor.text_to_indices(tokens)

    def generate_training_pairs(self, indices):
        return self.data_generator.generate_training_pairs(indices)


In [17]:
# Instantiate PreprocessText8 and SkipGramDataGenerator
preprocessor = PreprocessText8(min_count=5, subsample_threshold=1e-5)
data_generator = SkipGramDataGenerator(vocab=None, window_size=2)

# Create an instance of DatasetProcessor
processor = DatasetProcessor(preprocessor, data_generator)

# Build the vocabulary
vocab, word_counts = processor.build_vocab("data/train.zip", "train-00000-of-00001.parquet", "text")

# Filter and subsample the dataset
train_filtered_tokens = processor.filter_and_subsample("data/train.zip", "train-00000-of-00001.parquet", "text")
test_filtered_tokens = processor.filter_and_subsample("data/test.zip", "test-00000-of-00001.parquet", "text")
validation_filtered_tokens = processor.filter_and_subsample("data/validation.zip", "validation-00000-of-00001.parquet", "text")

# Convert filtered tokens to indices
train_indices = processor.convert_to_indices(train_filtered_tokens)
test_indices = processor.convert_to_indices(test_filtered_tokens)
validation_indices = processor.convert_to_indices(validation_filtered_tokens)

# Generate training pairs for Skip-gram
training_pairs = processor.generate_training_pairs(train_indices)
test_pairs = processor.generate_training_pairs(test_indices)
validation_pairs = processor.generate_training_pairs(validation_indices)

In [15]:
print(f"Total training pairs generated: {len(training_pairs)}")

Total training pairs generated: 17827246


#### 3. Building skip-gram model

Architecture Overview:

- Input: One-hot vector of size equal to the vocabulary size.

- Hidden Layer: Produces the embedding vector (size embedding_dim).

- Output: Softmax over the vocabulary size to predict the context word.


In [23]:
"""
Embedding Layer: Converts input words (as indices) to vectors of size embedding_dim. These vectors represent the word embeddings that will be learned.
Linear Layer: Maps the embedding vector to a vector of size vocab_size. This represents the probabilities of each word in the vocabulary being a context word.
Forward Pass: The forward method defines how input passes through the layer
"""


class SkipGramModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim):
        super(SkipGramModel, self).__init__()
        # Embedding layer for input (target words)
        self.input_embeddings = nn.Embedding(vocab_size, embedding_dim)
        # Embedding layer for output (context words)
        self.output_embeddings = nn.Embedding(vocab_size, embedding_dim)

    def forward(self, input_word, context_word):
        # Get embeddings for input and context words
        input_embedding = self.input_embeddings(input_word)
        context_embedding = self.output_embeddings(context_word)

        # Return dot product between input and context embeddings
        return torch.sum(input_embedding * context_embedding, dim=1)

In [24]:
def negative_sampling_loss(model, input_word, context_word, negative_samples, device):
    """
    Compute the Negative Sampling loss.
    Args:
    - model: SkipGramModel instance.
    - input_word: Tensor of input word indices.
    - context_word: Tensor of positive context word indices.
    - negative_samples: Tensor of negative word indices.
    - device: Device (CPU/GPU) for computation.
    """
    # Positive samples: calculate log(sigmoid(dot product))
    pos_score = model(input_word, context_word)  # Dot product of input and context embeddings
    pos_loss = -torch.log(torch.sigmoid(pos_score))

    # Negative samples: calculate log(sigmoid(-dot product))
    neg_score = model(input_word.unsqueeze(1), negative_samples)  # Shape: (batch_size, num_neg_samples)
    neg_loss = -torch.log(torch.sigmoid(-neg_score)).sum(1)  # Sum over negative samples

    # Total loss is the sum of positive and negative losses
    total_loss = (pos_loss + neg_loss).mean()

    return total_loss


#### 4. Training the model

- Input preparation: Convert the training pairs (input word, context word) into tensors.
- Forward pass: For each input word, predict the probability distribution over the context words.
- Loss calculation: Use cross-entropy loss to measure how far the predicted probabilities are from the true context word.
- Backpropagation: Compute gradients and update model weights using an optimizer.
- Repeat for several epochs: Go through the entire dataset multiple times to improve the model.


In [25]:
class SkipGramDataset(Dataset):
    def __init__(self, training_pairs):
        self.training_pairs = training_pairs

    def __len__(self):
        return len(self.training_pairs)

    def __getitem__(self, idx):
        input_word, context_word = self.training_pairs[idx]
        return torch.tensor(input_word), torch.tensor(context_word)

In [26]:
# Training parameters
num_epochs = 5
embedding_dim = 200
learning_rate = 0.1
batch_size = 1024

# k: negative sampling 
k = 5
vocab_size = len(preprocessor.vocab)

# Initialize the model, loss function, and optimizer
model = SkipGramModel(vocab_size, embedding_dim)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
model.to(device)

Using device: cuda


SkipGramModel(
  (input_embeddings): Embedding(67428, 200)
  (output_embeddings): Embedding(67428, 200)
)

In [28]:
# optimizer = optim.SGD(model.parameters(), lr=learning_rate)
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

loss_function = nn.CrossEntropyLoss()

# DataLoader for training and validation (assuming you have training_pairs and validation_pairs)
train_dataset = SkipGramDataset(training_pairs)
train_loader = DataLoader(
    train_dataset, batch_size=batch_size, shuffle=True, num_workers=6
)

validate_dataset = SkipGramDataset(validation_pairs)
validate_loader = DataLoader(
    validate_dataset, batch_size=batch_size, shuffle=False, num_workers=6
)

In [29]:
def evaluate_model(model, validate_loader, loss_function, device, k):
    """
    Evaluate the model using Negative Sampling on the validation set.
    Returns:
    - Average validation loss.
    """
    model.eval()  # Set the model to evaluation mode
    total_loss = 0
    with torch.no_grad():  # Disable gradient computation
        for input_words, context_words in validate_loader:
            input_words, context_words = input_words.to(device), context_words.to(device)
            
            # Generate negative samples for this batch
            negative_samples = torch.randint(0, vocab_size, (input_words.size(0), k)).to(device)
            
            # Calculate the negative sampling loss
            loss = loss_function(model, input_words, context_words, negative_samples, device)
            total_loss += loss.item()

    # Return the average validation loss
    return total_loss / len(validate_loader)


In [None]:
# Training loop with Negative Sampling
for epoch in range(num_epochs):
    print(f"Epoch {epoch+1}/{num_epochs}")
    model.train()  # Set the model to training mode
    total_loss = 0

    for input_words, context_words in train_loader:
        input_words, context_words = input_words.to(device), context_words.to(device)

        # Zero the gradients
        optimizer.zero_grad()

        # Generate negative samples for this batch
        negative_samples = torch.randint(0, vocab_size, (input_words.size(0), k)).to(device)

        # Calculate the negative sampling loss
        loss = negative_sampling_loss(model, input_words, context_words, negative_samples, device)

        # Backward pass: Compute gradients and update weights
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    # Average training loss for this epoch
    avg_training_loss = total_loss / len(train_loader)
    print(f"Epoch {epoch+1}/{num_epochs}, Training Loss: {avg_training_loss}")

    # Optionally, evaluate on the validation dataset after each epoch
    avg_validation_loss = evaluate_model(model, validate_loader, negative_sampling_loss, device, k)
    print(f"Epoch {epoch+1}/{num_epochs}, Validation Loss: {avg_validation_loss}")