In [1]:
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
from datasets import load_dataset
import numpy as np
import re
from tokenizers import Tokenizer, models, pre_tokenizers, trainers, processors

# --- Configuration ---
VOCAB_SIZE_SUBWORD = 32000  # For BPE and Unigram
MAX_SEQ_LEN = 128
BATCH_SIZE = 64
LEARNING_RATE = 1e-4
NUM_EPOCHS = 3
EMBEDDING_DIM = 256
HIDDEN_DIM = 512
NUM_LAYERS = 2
NUM_HEADS = 8
DROPOUT = 0.1

# 1. Load the WikiText-2 Dataset
print("Loading WikiText-2 dataset...")
# Using the 'raw' version which is better for training custom tokenizers
dataset = load_dataset("wikitext", "wikitext-2-raw-v1")

# Pre-cleaning function to remove document titles and extra newlines
def clean_text(examples):
    # Remove lines starting with '= Title =' and strip excessive whitespace
    text = examples["text"]
    text = re.sub(r'= .*? =', '', text)
    text = re.sub(r'\n\s*\n', '\n', text)
    examples["text"] = text.strip()
    return examples

dataset = dataset.map(clean_text, batched=False)

train_data = dataset["train"]["text"]
valid_data = dataset["validation"]["text"]
test_data = dataset["test"]["text"]

print(f"Data loaded. Train examples: {len(train_data)}")

Loading WikiText-2 dataset...


Map:   0%|          | 0/4358 [00:00<?, ? examples/s]

Map:   0%|          | 0/36718 [00:00<?, ? examples/s]

Map:   0%|          | 0/3760 [00:00<?, ? examples/s]

Data loaded. Train examples: 36718


In [8]:
def train_and_get_tokenizers(train_data, vocab_size_subword):
    # Special tokens required for all models
    special_tokens = ["<unk>", "<pad>", "<bos>", "<eos>"]
    
    # --- 1. Word Tokenizer (Simple Split + Fixed Vocab) ---
    print("\nTraining Word Tokenizer...")
    
    word_counts = {}
    for text in train_data:
        for word in text.split():
            word_counts[word] = word_counts.get(word, 0) + 1
            
    # Keep only the top 10000 words + special tokens
    word_vocab = special_tokens + sorted(word_counts, key=word_counts.get, reverse=True)[:10000]
    
    # Create a mapping (vocab_size will be len(word_vocab))
    word_to_id = {word: i for i, word in enumerate(word_vocab)}
    
    class WordTokenizer:
        def __init__(self, word_to_id, unk_id=0):
            self.word_to_id = word_to_id
            self.unk_id = unk_id
            self.pad_token_id = self.word_to_id.get("<pad>")
            self.vocab_size = len(word_to_id)
        
        def encode(self, text):
            # Simple whitespace split and lookup
            return [self.word_to_id.get(word, self.unk_id) for word in text.split()]
        
    word_tokenizer = WordTokenizer(word_to_id)
    
    
    # --- 2. Byte Tokenizer (Character Level) ---
    print("Creating Byte Tokenizer...")
    
    # Vocabulary size is 256 (for standard ASCII/bytes) + special tokens
    byte_vocab_size = 256 + len(special_tokens)
    byte_to_id = {chr(i): i + len(special_tokens) for i in range(256)}
    # Add special tokens mapping manually
    for i, token in enumerate(special_tokens):
        byte_to_id[token] = i

    class ByteTokenizer:
        def __init__(self, byte_to_id, unk_id=0):
            self.byte_to_id = byte_to_id
            self.unk_id = unk_id
            self.pad_token_id = self.byte_to_id.get("<pad>")
            self.vocab_size = len(byte_to_id)

        def encode(self, text):
            # Map characters to their ID. Uses ord() for simple chars.
            return [self.byte_to_id.get(c, self.unk_id) for c in text]

    byte_tokenizer = ByteTokenizer(byte_to_id)


    # --- 3. BPE Tokenizer (Trained) ---
    print(f"Training BPE Tokenizer with vocab size {vocab_size_subword}...")
    bpe_tokenizer = Tokenizer(models.BPE())
    bpe_tokenizer.pre_tokenizer = pre_tokenizers.Whitespace()

    bpe_trainer = trainers.BpeTrainer(
        vocab_size=vocab_size_subword,
        special_tokens=special_tokens
    )
    bpe_tokenizer.train_from_iterator(train_data, trainer=bpe_trainer)
    # Post-processor to handle special tokens for the model
    bpe_tokenizer.post_processor = processors.TemplateProcessing(
        single="<bos> $A <eos>",
        pair="<bos> $A <eos> $B",
        special_tokens=[("<bos>", bpe_tokenizer.token_to_id("<bos>")),
                        ("<eos>", bpe_tokenizer.token_to_id("<eos>"))]
    )
    bpe_tokenizer.pad_token_id = bpe_tokenizer.token_to_id("<pad>")
    bpe_tokenizer.vocab_size = bpe_tokenizer.get_vocab_size()


    # --- 4. Unigram Tokenizer (Trained) ---
    print(f"Training Unigram Tokenizer with vocab size {vocab_size_subword}...")
    unigram_tokenizer = Tokenizer(models.Unigram())
    unigram_tokenizer.pre_tokenizer = pre_tokenizers.Whitespace()

    unigram_trainer = trainers.UnigramTrainer(
        vocab_size=vocab_size_subword,
        special_tokens=special_tokens,
        shrinking_factor=0.75
    )
    unigram_tokenizer.train_from_iterator(train_data, trainer=unigram_trainer)
    # Post-processor to handle special tokens for the model
    unigram_tokenizer.post_processor = processors.TemplateProcessing(
        single="<bos> $A <eos>",
        pair="<bos> $A <eos> $B",
        special_tokens=[("<bos>", unigram_tokenizer.token_to_id("<bos>")),
                        ("<eos>", unigram_tokenizer.token_to_id("<eos>"))]
    )
    unigram_tokenizer.pad_token_id = unigram_tokenizer.token_to_id("<pad>")
    unigram_tokenizer.vocab_size = unigram_tokenizer.get_vocab_size()

    
    # Store tokenizers and their vocab sizes
    tokenizers = {
        "word": word_tokenizer,
        "bpe": bpe_tokenizer,
        "byte": byte_tokenizer,
        "unigram": unigram_tokenizer
    }
    
    return tokenizers

tokenizers = train_and_get_tokenizers(train_data, VOCAB_SIZE_SUBWORD)


Training Word Tokenizer...
Creating Byte Tokenizer...
Training BPE Tokenizer with vocab size 32000...



Training Unigram Tokenizer with vocab size 32000...




In [9]:
class LanguageModelingDataset(Dataset):
    def __init__(self, data, tokenizer, max_seq_len):
        self.tokenizer = tokenizer
        self.max_seq_len = max_seq_len
        # --- CLEANUP: Directly use the standardized attribute ---
        self.pad_id = tokenizer.pad_token_id 
        # --------------------------------------------------------
        self.all_ids = self._tokenize_and_chunk(data)

        
        

        # Handle different tokenizer types
        if hasattr(tokenizer, 'pad_token_id') and tokenizer.pad_token_id is not None:
            return tokenizer.pad_token_id
        elif hasattr(tokenizer, 'word_to_id'): # WordTokenizer
            return tokenizer.word_to_id.get("<pad>")
        elif hasattr(tokenizer, 'byte_to_id'): # ByteTokenizer
            return tokenizer.byte_to_id.get("<pad>")
        return 1 # Fallback, assumes <pad> is at index 1

    def _tokenize_and_chunk(self, data):
        print(f"Tokenizing and chunking data (PAD ID: {self.pad_id}, Vocab Size: {self.tokenizer.vocab_size})...")
        full_token_list = []
        for text in data:
            if text.strip():
                # Get the IDs list from the tokenizer
                if hasattr(self.tokenizer, 'encode'): # Custom Word/Byte
                    ids = self.tokenizer.encode(text)
                else: # HF tokenizers
                    ids = self.tokenizer.encode(text).ids
                full_token_list.extend(ids)

        # Chunk the large list of IDs into sequences of max_seq_len
        all_ids = []
        for i in range(0, len(full_token_list) - self.max_seq_len, self.max_seq_len):
            all_ids.append(full_token_list[i : i + self.max_seq_len])
        
        return all_ids

    def __len__(self):
        return len(self.all_ids)

    def __getitem__(self, idx):
        # Input sequence (x): first max_seq_len-1 tokens
        # Target sequence (y): last max_seq_len-1 tokens (shifted by 1)
        sequence = torch.tensor(self.all_ids[idx], dtype=torch.long)
        
        # The target sequence is the input sequence shifted by one token
        # sequence[:-1] is the input (context)
        # sequence[1:] is the target (next tokens)
        return sequence[:-1], sequence[1:]

def create_dataloaders(tokenizer, train_data, valid_data, test_data, max_seq_len, batch_size):
    # Use max_seq_len + 1 tokens to get an input sequence of length max_seq_len
    train_dataset = LanguageModelingDataset(train_data, tokenizer, max_seq_len + 1)
    valid_dataset = LanguageModelingDataset(valid_data, tokenizer, max_seq_len + 1)
    test_dataset = LanguageModelingDataset(test_data, tokenizer, max_seq_len + 1)
    
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    valid_loader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
    
    return train_loader, valid_loader, test_loader

In [10]:
# Check for GPU
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {DEVICE}")

# --- 1. LSTM Model ---
class LSTMLanguageModel(nn.Module):
    def __init__(self, vocab_size, emb_dim, hidden_dim, num_layers, dropout):
        super().__init__()
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers
        
        self.embedding = nn.Embedding(vocab_size, emb_dim)
        self.lstm = nn.LSTM(emb_dim, hidden_dim, num_layers, dropout=dropout, batch_first=True)
        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(hidden_dim, vocab_size)
        
    def forward(self, text, hidden_state=None):
        # text shape: [batch size, seq len]
        embedded = self.dropout(self.embedding(text))
        # embedded shape: [batch size, seq len, emb dim]
        
        # Initial hidden state will be created if not passed
        output, (hidden, cell) = self.lstm(embedded, hidden_state)
        # output shape: [batch size, seq len, hidden dim]
        
        prediction = self.fc(self.dropout(output))
        # prediction shape: [batch size, seq len, vocab size]
        return prediction, (hidden, cell)

# --- 2. Transformer Model (Decoder Only) ---
class TransformerLanguageModel(nn.Module):
    def __init__(self, vocab_size, emb_dim, n_heads, n_layers, max_seq_len, dropout):
        super().__init__()
        
        self.embedding = nn.Embedding(vocab_size, emb_dim)
        
        # Positional Encoding
        self.pos_encoder = nn.Parameter(torch.zeros(1, max_seq_len, emb_dim))
        nn.init.uniform_(self.pos_encoder, -0.01, 0.01)

        # Transformer Decoder Layers
        decoder_layer = nn.TransformerDecoderLayer(
            d_model=emb_dim, 
            nhead=n_heads, 
            dim_feedforward=4*emb_dim, 
            dropout=dropout, 
            batch_first=True
        )
        self.transformer_decoder = nn.TransformerDecoder(decoder_layer, num_layers=n_layers)
        
        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(emb_dim, vocab_size)
        
        self.max_seq_len = max_seq_len

    def forward(self, src):
        # src shape: [batch size, seq len]
        seq_len = src.shape[1]
        
        # 1. Embedding + Positional Encoding
        embedded = self.dropout(self.embedding(src))
        embedded = embedded + self.pos_encoder[:, :seq_len, :]
        # embedded shape: [batch size, seq len, emb dim]

        # 2. Causal Mask (Look-ahead mask)
        # Allows tokens to only attend to previous tokens.
        mask = torch.triu(torch.ones(seq_len, seq_len) * float('-inf'), diagonal=1).to(DEVICE)
        
        # 3. Transformer Decoder
        output = self.transformer_decoder(
            embedded, 
            tgt_mask=mask, 
            memory=embedded # Using self-attention for decoder-only LM
        )
        # output shape: [batch size, seq len, emb dim]
        
        # 4. Final Prediction
        prediction = self.fc(self.dropout(output))
        # prediction shape: [batch size, seq len, vocab size]
        return prediction

Using device: cuda


In [11]:
def evaluate(model, data_loader, criterion):
    model.eval()
    total_loss = 0.0
    total_tokens = 0
    
    with torch.no_grad():
        for i, (src, trg) in enumerate(data_loader):
            src, trg = src.to(DEVICE), trg.to(DEVICE)
            
            if isinstance(model, LSTMLanguageModel):
                # Pass None for initial hidden state
                output, _ = model(src)
            else: # TransformerLanguageModel
                output = model(src)

            # Reshape for loss calculation: 
            # Output: [batch size * seq len, vocab size]
            # Target: [batch size * seq len]
            output_dim = output.shape[-1]
            output = output.reshape(-1, output_dim)
            trg = trg.reshape(-1)
            
            loss = criterion(output, trg)
            total_loss += loss.item() * trg.numel()
            total_tokens += trg.numel()

    # NLL is the average cross-entropy loss
    nll = total_loss / total_tokens
    # PPL is the exponential of NLL
    ppl = torch.exp(torch.tensor(nll)).item() 
    
    return nll, ppl


def train_model(model, train_loader, valid_loader, optimizer, criterion, num_epochs):
    best_valid_ppl = float('inf')
    
    for epoch in range(1, num_epochs + 1):
        model.train()
        epoch_loss = 0
        for i, (src, trg) in enumerate(train_loader):
            src, trg = src.to(DEVICE), trg.to(DEVICE)
            
            optimizer.zero_grad()
            
            if isinstance(model, LSTMLanguageModel):
                # Detach hidden state for sequence modeling
                output, _ = model(src, None)
            else:
                output = model(src)

            output_dim = output.shape[-1]
            output = output.reshape(-1, output_dim)
            trg = trg.reshape(-1)
            
            loss = criterion(output, trg)
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) # Gradient clipping
            optimizer.step()
            
            epoch_loss += loss.item()

        valid_nll, valid_ppl = evaluate(model, valid_loader, criterion)
        
        print(f"  Epoch: {epoch}, Train Loss: {epoch_loss/len(train_loader):.4f}, Valid PPL: {valid_ppl:.2f}")

        if valid_ppl < best_valid_ppl:
            best_valid_ppl = valid_ppl
            # Save the best model state
            # torch.save(model.state_dict(), 'best_model.pt') 
            
    print(f"Training finished. Best Valid PPL: {best_valid_ppl:.2f}")


# --- Full Experiment Runner ---
criterion = nn.CrossEntropyLoss(ignore_index=tokenizers['word'].word_to_id.get("<pad>")) # Using WordTokenizer PAD for initial criterion, will update
results = {}

tokenizer_names = ["word", "bpe", "byte", "unigram"]
model_names = ["lstm", "transformer"]

for t_name in tokenizer_names:
    tokenizer = tokenizers[t_name]
    vocab_size = tokenizer.vocab_size
    pad_id = tokenizer.pad_token_id if hasattr(tokenizer, 'pad_token_id') else tokenizer._get_pad_id(tokenizer)

    # 1. Update the criterion with the current tokenizer's PAD ID
    criterion = nn.CrossEntropyLoss(ignore_index=pad_id)
    
    # 2. Create DataLoaders
    train_loader, valid_loader, test_loader = create_dataloaders(
        tokenizer, train_data, valid_data, test_data, MAX_SEQ_LEN, BATCH_SIZE
    )

    print(f"\n========================================================")
    print(f"Starting Experiments for Tokenizer: **{t_name.upper()}** (Vocab: {vocab_size})")
    print(f"========================================================")

    for m_name in model_names:
        print(f"\n--- Running **{m_name.upper()}** Model ---")
        
        if m_name == "lstm":
            model = LSTMLanguageModel(
                vocab_size, EMBEDDING_DIM, HIDDEN_DIM, NUM_LAYERS, DROPOUT
            ).to(DEVICE)
        else: # transformer
            model = TransformerLanguageModel(
                vocab_size, EMBEDDING_DIM, NUM_HEADS, NUM_LAYERS, MAX_SEQ_LEN, DROPOUT
            ).to(DEVICE)

        optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)
        
        # Train the model (Using a small number of epochs for demonstration)
        train_model(model, train_loader, valid_loader, optimizer, criterion, NUM_EPOCHS)
        
        # Evaluate on the Test Set
        test_nll, test_ppl = evaluate(model, test_loader, criterion)
        results[(t_name, m_name)] = {"NLL": test_nll, "PPL": test_ppl}
        
        print(f"\nâœ… **{t_name.upper()}** + **{m_name.upper()}** TEST RESULTS:")
        print(f"  Test NLL: {test_nll:.4f}")
        print(f"  Test PPL: {test_ppl:.2f}")


# --- Final Results Table ---
print("\n" + "="*50)
print("FINAL EXPERIMENT RESULTS")
print("="*50)

print("| Tokenizer | Model | Test NLL | Test PPL |")
print("| :---: | :---: | :---: | :---: |")
for (t_name, m_name), metrics in results.items():
    print(f"| {t_name.capitalize()} | {m_name.capitalize()} | {metrics['NLL']:.4f} | {metrics['PPL']:.2f} |")

Tokenizing and chunking data (PAD ID: 1, Vocab Size: 10004)...


TypeError: __init__() should return None, not 'int'