# Assignment 2 — Mini GPT (Decoder-only Transformer)

This notebook trains a **small transformer language model** from scratch for **next-token prediction**.

## What you need
- A 1D NumPy array of token IDs from Assignment 1 at `data/tokenized/corpus_ids.npy` (edit the path below if different).
- Or set `use_synthetic = True` to do a quick smoke test without real data.

## Deliverables you'll get here
- Model implementation (1–2 layers, 2–4 heads, 64–256 hidden size)
- Training loop with **cross-entropy** and **perplexity** logging
- **Checkpoints** saved under `runs/mini_gpt/`
- **Loss** and **Perplexity** plots saved to the same folder

**Tip:** Start with small sizes if you're on CPU (e.g., `d_model=64`, `n_layers=1`, `block_size=32`).

In [6]:
import torch, numpy as np, pathlib

inp = "data/tokenized/tokenized_blocks.pt"
out = "data/tokenized/corpus_ids.npy"

obj = torch.load(inp, map_location="cpu")
print("Top-level type:", type(obj))
if isinstance(obj, dict):
    print("Keys:", obj.keys())

# Try common keys
if isinstance(obj, dict):
    for k in ["ids", "input_ids", "token_ids", "data"]:
        if k in obj:
            obj = obj[k]
            print("Using key:", k)
            break

if torch.is_tensor(obj):
    ids = obj.detach().cpu().numpy()
else:
    ids = np.array(obj)

ids = ids.astype(np.int64).reshape(-1)
pathlib.Path("data/tokenized").mkdir(parents=True, exist_ok=True)
np.save(out, ids)
print("Saved:", out, "| tokens:", len(ids))


Top-level type: <class 'dict'>
Keys: dict_keys(['input_ids', 'attention_mask'])
Using key: input_ids
Saved: data/tokenized/corpus_ids.npy | tokens: 104871936


In [7]:
use_synthetic = False          # True to generate random tokens for a smoke test
data_path = 'data/tokenized/corpus_ids.npy'  # Path to your 1D numpy array of token ids
vocab_size = 50257              # Must match your tokenizer
min_tokens = 200_000            # Synthetic-only: how many random tokens to create

block_size = 128                # 32–128
d_model = 256                   # 64–256
n_heads = 4                     # 2–4
n_layers = 2                    # 1–2
d_ff = 0                        # 0 => 4 * d_model
dropout = 0.0
tie_weights = True

batch_size = 64
epochs = 3
lr = 1e-3
weight_decay = 0.01
grad_clip = 1.0
val_frac = 0.1
log_every = 200
ckpt_every = 1000
out_dir = 'runs/mini_gpt'

print('Config loaded. Edit as needed and run the next cells ↓')

Config loaded. Edit as needed and run the next cells ↓


In [9]:
!pip install torch torchvision torchaudio





In [8]:
import math
import numpy as np
from pathlib import Path
from dataclasses import dataclass
from typing import Optional

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt

device = 'cuda' if torch.cuda.is_available() else 'cpu'
device

'cpu'

In [9]:
class TokenBlockDataset(Dataset):
    def __init__(self, ids: np.ndarray, block_size: int):
        assert ids.ndim == 1, 'Expected 1D array of token ids'
        self.ids = torch.from_numpy(ids.astype(np.int64))
        self.block_size = block_size
        self.max_start = len(self.ids) - (block_size + 1)
        assert self.max_start > 0, 'Not enough tokens for this block_size'
    def __len__(self):
        return self.max_start
    def __getitem__(self, idx):
        x = self.ids[idx:idx + self.block_size]
        y = self.ids[idx + 1:idx + 1 + self.block_size]
        return x, y

In [5]:
@dataclass
class GPTConfig:
    vocab_size: int
    block_size: int = 128
    d_model: int = 256
    n_heads: int = 4
    n_layers: int = 2
    d_ff: Optional[int] = None
    dropout: float = 0.0
    tie_weights: bool = True

class CausalSelfAttention(nn.Module):
    def __init__(self, d_model, n_heads, dropout, block_size):
        super().__init__()
        assert d_model % n_heads == 0, 'd_model must be divisible by n_heads'
        self.n_heads = n_heads
        self.head_dim = d_model // n_heads
        self.scale = self.head_dim ** -0.5
        self.qkv = nn.Linear(d_model, 3 * d_model, bias=False)
        self.out = nn.Linear(d_model, d_model, bias=False)
        self.attn_drop = nn.Dropout(dropout)
        self.resid_drop = nn.Dropout(dropout)
        mask = torch.tril(torch.ones(block_size, block_size)).view(1, 1, block_size, block_size)
        self.register_buffer('mask', mask)
    def forward(self, x):
        B, T, C = x.size()
        qkv = self.qkv(x)
        q, k, v = qkv.split(C, dim=2)
        q = q.view(B, T, self.n_heads, self.head_dim).transpose(1, 2)
        k = k.view(B, T, self.n_heads, self.head_dim).transpose(1, 2)
        v = v.view(B, T, self.n_heads, self.head_dim).transpose(1, 2)
        att = (q @ k.transpose(-2, -1)) * self.scale
        att = att.masked_fill(self.mask[:, :, :T, :T] == 0, float('-inf'))
        att = torch.softmax(att, dim=-1)
        att = self.attn_drop(att)
        y = att @ v
        y = y.transpose(1, 2).contiguous().view(B, T, C)
        y = self.resid_drop(self.out(y))
        return y

In [5]:
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff, dropout):
        super().__init__()
        inner = d_ff if d_ff is not None else 4 * d_model
        self.net = nn.Sequential(
            nn.Linear(d_model, inner),
            nn.GELU(),
            nn.Linear(inner, d_model),
            nn.Dropout(dropout),
        )
    def forward(self, x):
        return self.net(x)

class TransformerBlock(nn.Module):
    def __init__(self, d_model, n_heads, dropout, block_size, d_ff):
        super().__init__()
        self.ln1 = nn.LayerNorm(d_model)
        self.attn = CausalSelfAttention(d_model, n_heads, dropout, block_size)
        self.ln2 = nn.LayerNorm(d_model)
        self.ff = FeedForward(d_model, d_ff, dropout)
    def forward(self, x):
        x = x + self.attn(self.ln1(x))
        x = x + self.ff(self.ln2(x))
        return x

class MiniGPT(nn.Module):
    def __init__(self, cfg: GPTConfig):
        super().__init__()
        self.cfg = cfg
        self.tok_emb = nn.Embedding(cfg.vocab_size, cfg.d_model)
        self.pos_emb = nn.Embedding(cfg.block_size, cfg.d_model)
        self.drop = nn.Dropout(cfg.dropout)
        self.blocks = nn.ModuleList([
            TransformerBlock(cfg.d_model, cfg.n_heads, cfg.dropout, cfg.block_size, cfg.d_ff)
            for _ in range(cfg.n_layers)
        ])
        self.ln_f = nn.LayerNorm(cfg.d_model)
        self.head = nn.Linear(cfg.d_model, cfg.vocab_size, bias=False)
        if cfg.tie_weights:
            self.head.weight = self.tok_emb.weight
        self.apply(self._init_weights)
    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            nn.init.normal_(module.weight, mean=0.0, std=0.02)
    def forward(self, idx):
        B, T = idx.size()
        assert T <= self.cfg.block_size, 'Sequence length exceeds block_size'
        tok = self.tok_emb(idx)
        pos = self.pos_emb(torch.arange(T, device=idx.device))
        x = self.drop(tok + pos)
        for block in self.blocks:
            x = block(x)
        x = self.ln_f(x)
        logits = self.head(x)
        return logits

In [6]:
def perplexity(loss_value: float) -> float:
    return float(math.exp(loss_value))

def load_ids(path: Path, synthetic: bool, vocab_size: int, min_tokens: int = 200_000):
    if synthetic:
        rng = np.random.default_rng(0)
        size = max(min_tokens, 200_000)
        print(f'[synthetic] Generating {size:,} random tokens with vocab_size={vocab_size} ...')
        return rng.integers(0, vocab_size, size=size, dtype=np.int64)
    else:
        assert path.exists(), f'Token id file not found: {path}'
        print(f'Loading token ids from: {path}')
        arr = np.load(path)
        if arr.ndim > 1:
            arr = arr.reshape(-1)
        return arr.astype(np.int64)

def plot_curves(out_dir: Path, history):
    out_dir.mkdir(parents=True, exist_ok=True)
    steps = [h['step'] for h in history]
    losses = [h['loss'] for h in history]
    ppls = [h['perplexity'] for h in history]
    plt.figure()
    plt.plot(steps, losses)
    plt.xlabel('Step'); plt.ylabel('Loss'); plt.title('Training Loss'); plt.tight_layout()
    plt.savefig(out_dir / 'loss_curve.png', dpi=150)
    plt.figure()
    plt.plot(steps, ppls)
    plt.xlabel('Step'); plt.ylabel('Perplexity'); plt.title('Training Perplexity'); plt.tight_layout()
    plt.savefig(out_dir / 'perplexity_curve.png', dpi=150)

In [None]:
def train_loop():
    cfg = GPTConfig(
        vocab_size=vocab_size, block_size=block_size, d_model=d_model,
        n_heads=n_heads, n_layers=n_layers, d_ff=(None if d_ff<=0 else d_ff),
        dropout=dropout, tie_weights=tie_weights
    )
    print('Using device:', device)
    ids = load_ids(Path(data_path), synthetic=use_synthetic, vocab_size=vocab_size, min_tokens=min_tokens)
    n = len(ids)
    split = int(n * (1.0 - val_frac))
    train_ids = ids[:split]
    val_ids = ids[split:] if split < n else ids[:int(0.1*n)]
    train_ds = TokenBlockDataset(train_ids, block_size)
    val_ds = TokenBlockDataset(val_ids, block_size)
    train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True, drop_last=True)
    val_loader = DataLoader(val_ds, batch_size=batch_size, shuffle=False, drop_last=True)
    model = MiniGPT(cfg).to(device)
    optimizer = torch.optim.AdamW(model.parameters(), lr=lr, betas=(0.9, 0.95), weight_decay=weight_decay)
    criterion = nn.CrossEntropyLoss()
    history = []
    step = 0
    def run_eval():
        model.eval()
        total_loss, total_tokens = 0.0, 0
        with torch.no_grad():
            for x, y in val_loader:
                x = x.to(device); y = y.to(device)
                logits = model(x)
                loss = criterion(logits.view(-1, logits.size(-1)), y.view(-1))
                total_loss += loss.item() * x.numel()
                total_tokens += x.numel()
        avg = total_loss / max(total_tokens, 1)
        return avg, perplexity(avg)
    for epoch in range(epochs):
        model.train()
        for x, y in train_loader:
            x = x.to(device); y = y.to(device)
            logits = model(x)
            loss = criterion(logits.view(-1, logits.size(-1)), y.view(-1))
            optimizer.zero_grad(set_to_none=True)
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), grad_clip)
            optimizer.step()
            if step % log_every == 0:
                val_loss, val_ppl = run_eval()
                history.append({'step': step, 'loss': val_loss, 'perplexity': val_ppl})
                print(f'[step {step:6d}] val loss={val_loss:.4f} | val ppl={val_ppl:.2f}')
            if ckpt_every > 0 and step % ckpt_every == 0 and step > 0:
                Path(out_dir).mkdir(parents=True, exist_ok=True)
                torch.save({'model_state': model.state_dict()}, Path(out_dir)/f'mini_gpt_step_{step}.pt')
            step += 1
        val_loss, val_ppl = run_eval()
        history.append({'step': step, 'loss': val_loss, 'perplexity': val_ppl})
        print(f'[EPOCH {epoch+1}/{epochs}] val loss={val_loss:.4f} | val ppl={val_ppl:.2f}')
        Path(out_dir).mkdir(parents=True, exist_ok=True)
        torch.save({'model_state': model.state_dict()}, Path(out_dir)/f'mini_gpt_epoch_{epoch+1}.pt')
    Path(out_dir).mkdir(parents=True, exist_ok=True)
    torch.save({'model_state': model.state_dict()}, Path(out_dir)/'mini_gpt_checkpoint.pt')
    print('Saved final checkpoint to', Path(out_dir)/'mini_gpt_checkpoint.pt')
    plot_curves(Path(out_dir), history)
    print('Saved loss_curve.png and perplexity_curve.png to', out_dir)
train_loop()

Using device: cpu
[synthetic] Generating 200,000 random tokens with vocab_size=50257 ...


### Notes
- Re-run with different hyperparameters and compare perplexities.
- Include the loss/perplexity plots in your report.
- Submit `runs/mini_gpt/mini_gpt_checkpoint.pt` as your final checkpoint.