In [1]:
# batch size
# top k
# temperature
# early stop
# regularization

WEIGHT_DECAY = 1e-4
PATIENCE = 3
TOP_K = 50
TEMPERATURE = 1

# PATH_RES = '/content/drive/MyDrive/rnn_results_1'
# PATH_RES = '/content/drive/MyDrive/test_demo_detokenize_earlystop'
# PATH_RES = 'test_demo_temp1_nodecay'
# PATH_RES = 'test_demo_weight_decay1e-4_temp1'
PATH_RES = 'layer=3/test_demo_weight_decay1e-4_temp1'
# PATH_RES = 'layer=2/test_demo_weight_decay1e-4_temp0.7'

In [2]:
# from google.colab import drive
# drive.mount('/content/drive')

# RNN (LSTM) Next-Word Prediction on tiny_shakespeare
### Features implemented:
- Load dataset from Hugging Face: karpathy/tiny_shakespeare
- Word-level tokenization (simple regex-based)
- Build vocabulary, dataset of (context -> next_word) using sliding window
- PyTorch LSTM model with nn.Embedding
- Training loop with validation split, logging and plotting (matplotlib)
- Perplexity and token-level accuracy calculation
- Text generation given a seed phrase (generates N words)
- Ablation study: run two experiments with different hidden sizes and compare curves

In [None]:
import re
import math
import random
import argparse
from collections import Counter
from pathlib import Path

import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, random_split
from datasets import load_dataset
import requests
import nltk
from nltk.corpus import stopwords

  from .autonotebook import tqdm as notebook_tqdm


### Tokenize text
Tokenizes text into words while keeping punctuation as separate tokens.Uses a simple regex-based approach for educational purposes.

**Parameters**:
*   text (str): Input text to tokenize

**Returns**:
*   list: List of tokens (words and punctuation)

In [4]:

def simple_word_tokenize(text):
    # split by whitespace, but separate punctuation
    tokens = re.findall(r"\w+|[^\\s\w]", text)
    return tokens


In [None]:
nltk.download('stopwords')
STOP_WORDS = set(stopwords.words('english'))

In [5]:
def detokenize(tokens):
    text = ""
    for i, tok in enumerate(tokens):
        if i > 0 and tok not in [".", ",", "?", "!", ";", ":", "'", '"']:
            text += " "
        text += tok
    return text


In [6]:
def compute_perplexity(loss):
    # loss is average negative log-likelihood
    return math.exp(loss)

### Build Vocab
Builds vocabulary from tokens with frequency filtering and optional size limiting.

**Parameters**:

*   tokens (list): List of tokens to build vocabulary from
    
*   min\_freq (int, optional): Minimum frequency for tokens to be included. Defaults to 1
    
*   max\_size (int, optional): Maximum vocabulary size. Defaults to None (no limit)
    

**Returns**:

*   tuple: (token\_to\_idx, idx\_to\_token) - Dictionary mapping tokens to indices and list mapping indices to tokens

In [7]:

def build_vocab(tokens, min_freq=1, max_size=None):
    #Counts frequency of all tokens in the dataset.
    counter = Counter(tokens)
    # keep tokens with freq >= min_freq
    items = [(tok, cnt) for tok, cnt in counter.items() if cnt >= min_freq]
    items.sort(key=lambda x: (-x[1], x[0]))
    # Optionally limits vocab size (max_size)
    if max_size:
        items = items[:max_size]
    idx_to_token = ["<pad>", "<unk>"] + [tok for tok, _ in items]
    token_to_idx = {tok: i for i, tok in enumerate(idx_to_token)}
    return token_to_idx, idx_to_token


### Tokens to training samples
PyTorch Dataset for next word prediction tasks. Creates input-target pairs from token sequences.

**Parameters**:

*   token\_idxs (list): List of integer token IDs
    
*   context\_size (int): Size of the context window for prediction
    

**Methods**:

*   \_\_len\_\_(): Returns the number of samples in the dataset
    
*   \_\_getitem\_\_(idx): Returns (input\_context, target\_word) tuple for the given index

In [8]:
class NextWordDataset(Dataset):
    def __init__(self, token_idxs, context_size):
        # token_idxs: list of integer token ids
        self.context_size = context_size
        self.inputs = []
        self.targets = []
        # Turns the sequence of token indices into training examples.
        for i in range(len(token_idxs) - context_size):
            ctx = token_idxs[i : i + context_size]
            targ = token_idxs[i + context_size]
            self.inputs.append(torch.tensor(ctx, dtype=torch.long))
            self.targets.append(torch.tensor(targ, dtype=torch.long))

    def __len__(self):
        return len(self.targets)

    def __getitem__(self, idx):
        return self.inputs[idx], self.targets[idx]


### Prepare Dataset
Prepares the Tiny Shakespeare dataset for next word prediction training.

**Parameters**:

*   context\_size (int, optional): Size of context window. Defaults to 5
    
*   val\_split (float, optional): Fraction of data to use for validation. Defaults to 0.1
    

**Returns**:

*   tuple: (train\_ds, val\_ds, token\_to\_idx, idx\_to\_token) - Training dataset, validation dataset, and vocabulary mappings

In [None]:
def download_tiny_shakespeare():
	url = "https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt"
	text = requests.get(url).text
	return text


def prepare_dataset(context_size=5, val_split=0.1):
	text = download_tiny_shakespeare()

  	# Tokenizes text → splits into words.
	tokens = simple_word_tokenize(text)
 	# Lowercase tokens
	tokens = [t.lower() for t in tokens]
	# Remove stop words
	tokens = [t for t in tokens if t not in STOP_WORDS]
	
	# mappings of token to id and back
	token_to_idx, idx_to_token = build_vocab(tokens, min_freq=1)
	print(f"Vocab size: {len(idx_to_token)}")
	token_idxs = [token_to_idx.get(t, token_to_idx['<unk>']) for t in tokens]
	# windows of context for dataset usage
	dataset = NextWordDataset(token_idxs, context_size=context_size)

	# split train & test
	n_val = int(val_split * len(dataset)) # 10%
	n_train = len(dataset) - n_val # 90%
	train_ds, val_ds = random_split(dataset, [n_train, n_val])

	return train_ds, val_ds, token_to_idx, idx_to_token

### Model Construction
LSTM-based neural network model for next word prediction tasks.

**Parameters**:

*   vocab\_size (int): Size of the vocabulary
    
*   emb\_size (int): Dimensionality of word embeddings
    
*   hidden\_size (int): Number of features in the hidden state of LSTM
    
*   num\_layers (int, optional): Number of recurrent layers. Defaults to 1
    
*   dropout (float, optional): Dropout probability. Defaults to 0.0
    

**Forward Pass**:

*   Input: x (Tensor) - Batch of token sequences with shape (batch\_size, context\_size)
    
*   Output: logits (Tensor) - Unnormalized scores for each vocabulary word with shape (batch\_size, vocab\_size)

In [10]:
class LSTMNextWordModel(nn.Module):
    def __init__(self, vocab_size, emb_size, hidden_size, num_layers=1, dropout=0.0):
        super().__init__()
        # create vector embedding for each word; word -> vector space
        self.embedding = nn.Embedding(vocab_size, emb_size, padding_idx=0)
        self.lstm = nn.LSTM(emb_size, hidden_size, num_layers=num_layers, batch_first=True, dropout=dropout)
        # hidden layer size -> logits for each work in vocab (scores)
        self.fc = nn.Linear(hidden_size, vocab_size)

    def forward(self, x):
        # x: (batch, context_size) -> batch
        emb = self.embedding(x)  # (batch, context, emb)
        out, _ = self.lstm(emb)  # out: (batch, context, hidden)
        last = out[:, -1, :]     # take last timestep atfer all context window finished
        logits = self.fc(last)   # (batch, vocab) not normalized
        return logits


### Train 1 epoch (batch)

Trains the model for one epoch on the provided dataloader.

**Parameters**:

*   model (nn.Module): The neural network model to train
    
*   dataloader (DataLoader): DataLoader providing training batches
    
*   criterion (nn.Module): Loss function (e.g., CrossEntropyLoss)
    
*   optimizer (torch.optim.Optimizer): Optimization algorithm
    
*   device (str): Device to run training on ('cpu' or 'cuda')
    

**Returns**:

*   tuple: (avg\_loss, accuracy) - Average loss and accuracy for the epoch

In [11]:

def train_one_epoch(model, dataloader, criterion, optimizer, device):
    model.train()
    total_loss = 0.0
    total_correct = 0
    total_tokens = 0
    for x, y in dataloader:
        x = x.to(device)
        y = y.to(device)

        # clears grad for each batch
        optimizer.zero_grad()
        # logits = vector of scores for each word in vocab
        logits = model(x)
        loss = criterion(logits, y)
        loss.backward()
        optimizer.step()

        # total tokens in batch * loss per item
        total_loss += loss.item() * x.size(0)
        # pick highest val in vector
        preds = torch.argmax(logits, dim=-1)
        total_correct += (preds == y).sum().item()
        total_tokens += x.size(0)
    avg_loss = total_loss / total_tokens
    acc = total_correct / total_tokens
    return avg_loss, acc


### Evaluate
 Evaluates the model on the provided dataloader without training.

**Parameters**:

*   model (nn.Module): The neural network model to evaluate
    
*   dataloader (DataLoader): DataLoader providing evaluation batches
    
*   criterion (nn.Module): Loss function
    
*   device (str): Device to run evaluation on ('cpu' or 'cuda')
    

**Returns**:

*   tuple: (avg\_loss, accuracy) - Average loss and accuracy for the evaluation

In [12]:

def evaluate(model, dataloader, criterion, device):
    model.eval()
    total_loss = 0.0
    total_correct = 0
    total_tokens = 0
    with torch.no_grad():
        for x, y in dataloader:
            x = x.to(device)
            y = y.to(device)

            # logits = vector of scores for each word in vocab
            logits = model(x)
            loss = criterion(logits, y)
            total_loss += loss.item() * x.size(0)
            # pick highest val in vector
            preds = torch.argmax(logits, dim=-1)
            total_correct += (preds == y).sum().item()
            total_tokens += x.size(0)

    avg_loss = total_loss / total_tokens
    acc = total_correct / total_tokens
    return avg_loss, acc

### Generate Text - use model

 Generates text using the trained model with various sampling strategies.

**Parameters**:

*   model (nn.Module): Trained language model
    
*   token\_to\_idx (dict): Vocabulary mapping from tokens to indices
    
*   idx\_to\_token (list): Vocabulary mapping from indices to tokens
    
*   seed\_text (str): Initial text to start generation from
    
*   gen\_len (int, optional): Number of tokens to generate. Defaults to 10
    
*   context\_size (int, optional): Context window size. Defaults to 5
    
*   device (str, optional): Device to run generation on. Defaults to 'cpu'
    
*   temperature (float, optional): Controls randomness (higher = more random). Defaults to 1.0
    
*   top\_k (int, optional): Limits sampling to top-k most likely tokens. Defaults to None
    

**Returns**:

*   str: Generated text sequence
    

**Sampling Strategies**:

*   temperature: Adjusts probability distribution (higher values increase diversity)
    
*   top\_k: Restricts sampling to the k most likely tokens for more coherent output

In [13]:
def generate_text(model, token_to_idx, idx_to_token, seed_text, gen_len=10, context_size=5, device='cpu', temperature=1.0, top_k=None):
    model.eval()
    # Convert seed phrase into token IDs
    toks = simple_word_tokenize(seed_text)
    # lowercase
    toks = [t.lower() for t in toks]
    # map to ids (unk for unknown)
    ids = [token_to_idx.get(t, token_to_idx.get('<unk>')) for t in toks]

    # Pad or truncate to fit context_size
    while len(ids) < context_size:
        ids = [token_to_idx.get('<pad>')] + ids
    ids = ids[-context_size:]

    generated = []
    with torch.no_grad():
        for _ in range(gen_len):
            x = torch.tensor([ids], dtype=torch.long, device=device)
            logits = model(x)  # (1, vocab) x->token
            logits = logits.squeeze(0) / max(1e-8, temperature)
            probs = torch.softmax(logits, dim=-1)

            if top_k is not None:
                # removes unlikely words
                topk_vals, topk_idx = torch.topk(probs, k=top_k)
                topk_probs = topk_vals / torch.sum(topk_vals)
                # chosen 50 top words most likely
                chosen = np.random.choice(topk_idx.cpu().numpy(), p=topk_probs.cpu().numpy())
            else:
                chosen = np.random.choice(len(probs.cpu().numpy()), p=probs.cpu().numpy())

            generated.append(idx_to_token[chosen])
            # drop oldest token in context + add new
            ids = ids[1:] + [chosen]
    return detokenize(generated)

### Run Single Experiment
Comprehensive training and evaluation pipeline for the LSTM next-word prediction model. This function orchestrates the entire machine learning workflow from data preparation to model training, evaluation, and result analysis.**Workflow**:

1.  **Data Preparation**: Loads and preprocesses the Tiny Shakespeare dataset, builds vocabulary, and creates DataLoader objects for training and validation
    
2.  **Model Setup**: Initializes the LSTM model with specified architecture parameters and sets up loss function and optimizer
    
3.  **Training Loop**: Executes training for multiple epochs with batch processing, gradient updates, and performance tracking
    
4.  **Early Stopping**: Monitors validation loss and stops training if no improvement is detected for several consecutive epochs
    
5.  **Evaluation**: Computes final validation perplexity as a language modeling quality metric
    
6.  **Visualization**: Generates and saves loss and accuracy curves to monitor training progress
    
7.  **Text Generation**: Demonstrates model capabilities by generating text from a seed phrase
    
8.  **Results Packaging**: Returns a comprehensive summary dictionary with all experiment results

In [14]:

def run_experiment(hidden_size=256, num_layers=1, dropout=0.1, emb_size=128, context_size=5,
                   batch_size=128, epochs=10, lr=1e-3, device='cpu', run_name='exp'):

    print(f"Running experiment: {run_name} | hidden={hidden_size} layers={num_layers} dropout={dropout}")

    train_ds, val_ds, token_to_idx, idx_to_token = prepare_dataset(context_size=context_size)

    train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_ds, batch_size=batch_size)

    model = LSTMNextWordModel(len(idx_to_token), emb_size, hidden_size, num_layers=num_layers, dropout=dropout)
    model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr,weight_decay=WEIGHT_DECAY)

    train_losses = []
    train_accs = []
    val_losses = []
    val_accs = []
    best_val_loss = float('inf')
    patience, patience_counter = PATIENCE, 0   # stop after 2 epochs without improvement
    best_model_state = None

    for ep in range(1, epochs + 1):
        train_loss, train_acc = train_one_epoch(model, train_loader, criterion, optimizer, device) # per epoch loss,acc during training
        val_loss, val_acc = evaluate(model, val_loader, criterion, device) # per epoch loss,acc during testing

        if val_loss < best_val_loss:
          best_val_loss = val_loss
          best_model_state = model.state_dict()  # save best weights
          patience_counter = 0
        else:
            patience_counter += 1
            if patience_counter >= patience:
                print("Early stopping triggered.")
                break

        train_losses.append(train_loss)
        train_accs.append(train_acc)
        val_losses.append(val_loss)
        val_accs.append(val_acc)

        print(f"Epoch {ep}/{epochs}  train_loss={train_loss:.4f} train_acc={train_acc:.4f}  val_loss={val_loss:.4f} val_acc={val_acc:.4f}")

    if best_model_state:
     model.load_state_dict(best_model_state)

    # Compute perplexity
    val_perplexity = compute_perplexity(val_losses[-1])
    print(f"Validation perplexity: {val_perplexity:.2f}")

    # Save plots
    outdir = Path(PATH_RES) / run_name
    outdir.mkdir(parents=True, exist_ok=True)

    # Loss plot
    plt.figure()
    epochs_ran = len(train_losses)
    plt.plot(range(1, epochs_ran + 1), train_losses, label='train_loss')
    plt.plot(range(1, epochs_ran + 1), val_losses, label='val_loss')
    plt.xlabel('Epoch')
    plt.ylabel('Cross-Entropy Loss')
    plt.legend()
    plt.title(f'Loss curves ({run_name})')
    plt.savefig(outdir / 'loss.png')
    plt.close()

    # Accuracy plot
    plt.figure()
    epochs_ran = len(train_accs)
    plt.plot(range(1, epochs_ran + 1), train_accs, label='train_acc')
    plt.plot(range(1, epochs_ran + 1), val_accs, label='val_acc')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.title(f'Accuracy curves ({run_name})')
    plt.savefig(outdir / 'acc.png')
    plt.close()

    # Generate text
    seed = "To be or not to"
    generated = generate_text(model, token_to_idx, idx_to_token, seed, gen_len=200, context_size=context_size, device=device, temperature=TEMPERATURE, top_k=TOP_K)
    print(f"Seed: '{seed}'")
    print("Generated:", generated)

    # Return summary
    return {
        'model': model,
        'token_to_idx': token_to_idx,
        'idx_to_token': idx_to_token,
        'train_losses': train_losses,
        'val_losses': val_losses,
        'train_accs': train_accs,
        'val_accs': val_accs,
        'val_perplexity': val_perplexity,
        'results_dir': str(outdir),
        'generated': generated,
    }

### Ablation Study System
Systematic ablation study that runs multiple experiments with different model configurations to analyze the impact of various architectural choices on performance. This function executes a series of controlled experiments where specific model parameters are varied while keeping others constant, allowing for comparative analysis of how different design decisions affect model behavior, training dynamics, and final performance.

**Experimental Design**:The function runs multiple experiments with carefully controlled variations:

1.  **Hidden Size Comparison**: Tests different model capacities (64 vs 256 units)
    
2.  **Layer Depth Analysis**: Compares single vs multi-layer architectures
    
3.  **Dropout Impact**: Evaluates different regularization strengths (0.0, 0.1, 0.3)

In [15]:
def run_ablation():
    experiments = [
        # # Hidden size variations
        # {'hidden_size': 256, 'num_layers': 1, 'dropout': 0.0, 'run_name': 'hidden_256'},
        # {'hidden_size': 64, 'num_layers': 1, 'dropout': 0.0, 'run_name': 'hidden_64'},

        # # Number of layers variations
        # {'hidden_size': 256, 'num_layers': 1, 'dropout': 0.1, 'run_name': 'layers_1'},
        # {'hidden_size': 256, 'num_layers': 2, 'dropout': 0.1, 'run_name': 'layers_2'},

        # Dropout variations
        # {'hidden_size': 256, 'num_layers': 2, 'dropout': 0.1, 'run_name': 'dropout_0.1'},
        # {'hidden_size': 256, 'num_layers': 2, 'dropout': 0.2, 'run_name': 'dropout_0.2'},
        # {'hidden_size': 256, 'num_layers': 2, 'dropout': 0.3, 'run_name': 'dropout_0.3'},

        {'hidden_size': 256, 'num_layers': 3, 'dropout': 0.1, 'run_name': 'dropout_0.1'},
        {'hidden_size': 256, 'num_layers': 3, 'dropout': 0.2, 'run_name': 'dropout_0.2'},
        {'hidden_size': 256, 'num_layers': 3, 'dropout': 0.3, 'run_name': 'dropout_0.3'},
    ]

    summaries = {}
    for cfg in experiments:
        summary = run_experiment(
            hidden_size=cfg['hidden_size'],
            num_layers=cfg['num_layers'],
            dropout=cfg['dropout'],
            emb_size=128,
            context_size=5,
            batch_size=128,
            epochs=20,
            lr=1e-3,
            device=('cuda' if torch.cuda.is_available() else 'cpu'),
            run_name=cfg['run_name']
        )
        summaries[cfg['run_name']] = summary

    # Plot comparison of validation loss curves
    plt.figure()
    for name, s in summaries.items():
        plt.plot(range(1, len(s['val_losses']) + 1), s['val_losses'], label=f'{name}')
    plt.xlabel('Epoch')
    plt.ylabel('Val Loss')
    plt.legend()
    plt.title('Ablation: val loss by config')
    comp_dir = Path(PATH_RES) / 'ablation'
    comp_dir.mkdir(parents=True, exist_ok=True)
    plt.savefig(comp_dir / 'ablation_val_loss.png')
    plt.close()

    print('Ablation results saved to', comp_dir)


In [None]:
epochs = 20
batch_size = 128
ablation = True   # set to True if you want to run the ablation study
device = 'cuda' if torch.cuda.is_available() else 'cpu'

if ablation:
    run_ablation()
else:
    run_experiment(
        hidden_size=256,
        num_layers=1,
        dropout=0.1,
        emb_size=128,
        context_size=5,
        batch_size=batch_size,
        epochs=epochs,
        lr=1e-3,
        device=device,
        run_name=PATH_RES
    )