---
format:
  html:
    code-fold: true
jupyter: python3
---

### **Cell 1: Setup and Tokenizer Plan**

**Data Plan**
- I will download the Tiny Shakespeare dataset with this raw URL: `https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt`
- I'll split the dataset into train and test sets, and produce context target pairs where there are labels that represent the next token for each position.

**Tokenizer Plan**
We'll compare the following tokenizers:
1. **BPE** using HuggingFace's `tokenizers` library
2. **SentencePiece** 
3. **Character-level** from scratch

I chose these tokenizers because BPE gives good subword units and works well in many datasets. SentencePiece performs well and is popular. Character-level is a simple baseline that generally well.

**Training Plan**
- Epochs: 20 - might depend on the time
- Optimizer: `Adam`
- Learning Rate: start with 1e-3 and have a scheduler option
- Batch size: 16
- Context length: 128
- Loss: `nn.CrossEntropyLoss` with logits

In [None]:
# Cell 2: Data, Tokenizers, and Training Functions
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import numpy as np
import random
import urllib.request
from pathlib import Path
import sentencepiece as spm
from tokenizers import Tokenizer, models, trainers, pre_tokenizers, normalizers, processors

SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(SEED)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)

data_url = "https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt"
data_path = Path("tiny_shakespeare_input.txt")
if not data_path.exists():
    print("Downloading Tiny Shakespeare dataset")
    urllib.request.urlretrieve(data_url, data_path)
else:
    print("Dataset already present.")

with open(data_path, "r", encoding="utf-8") as f:
    text = f.read()

print("Length (chars):", len(text))

# Train/Test split
val_ratio = 0.2
val_size = int(val_ratio * len(text))
train_text = text[:-val_size]
val_text = text[-val_size:]

# Character-level Tokenizers
class CharTokenizer:
    def __init__(self, text):
        self.id2token = text
        self.token2id = {ch:i for i, ch in enumerate(self.id2token)}
        self.vocab_size = len(self.id2token)

    def encode(self, s):
        return [self.token2id[c] for c in s]
    
    def decode(self, ids):
        return "".join([self.id2token[i] for i in ids])
    
# BPE
def train_bpe(text, vocab_size, path):
    file = "bpe_text.txt"
    with open(file, "w", encoding="utf-8") as f:
        f.write(text)

    tokenizer = Tokenizer(models.BPE(unk_token="[UNK]"))
    tokenizer.normalizer = normalizers.NFKC()
    tokenizer.pre_tokenizer = pre_tokenizers.Whitespace()
    trainer = trainers.BpeTrainer(vocab_size=vocab_size, special_tokens=["[UNK]", "[PAD]"])
    tokenizer.train([file], trainer)
    tokenizer.model.save(".", "bpe-tokenizer")
    tok = Tokenizer.from_file("bpe-tokenizer.json")
    return tok

# SentencePiece
def train_sentencepiece(text, prefix, vocab_size, model_type):
    file = "spm_text.txt"
    with open(file, "w", encoding="utf-8") as f:
        f.write(file)
    spm.SentencePieceTrainer.Train(
        input=file,
        model_prefix=prefix,
        vocab_size=vocab_size,
        model_type=model_type,
        unk_id=0,
        pad_id=1,
        bos_id=-1,
        eos_id=-1
    )
    sp = spm.SentencePieceProcessor()
    sp.load(f"{prefix}.model")
    return sp

BPE_VOCAB = 3000
SPM_VOCAB = 3000

print("Training BPE tokenizer")
bpe_tok = train_bpe(train_text, vocab_size=BPE_VOCAB, path="bpe-tokenizer.json")

print("Training SentencePiece tokenizer")
spm_tok = train_sentencepiece(train_text, prefix="spm", vocab_size=SPM_VOCAB, model_type="bpe")

print("Building char tokenizer")
char_tok = CharTokenizer(text)

print("Vocabulary sizes:")
try:
    print("BPE (tokenizers) vocab size:", bpe_tok.get_vocab_size())
except Exception as e:
    print("BPE vocab error:", e)
print("SentencePiece vocab size:", spm_tok.get_piece_size())
print("Char vocab size:", char_tok.vocab_size)

# Tokenize
example = "Hello, world!"
print("Example text:", example)

# Char
char_ids = char_tok.encode(example)
print("Char ids:", char_ids)
print("Char decoded:", char_tok.decode(char_ids))

# BPE
bpe_out = bpe_tok.encode(example)
print("BPE tokenizer ids:", bpe_out.ids)
print("BPE tokenizer tokens:", bpe_out.tokens)
print("BPE decoded string:", bpe_tok.decode(bpe_out.ids))

# SentencePiece
spm_ids = spm_tok.EncodeAsIds(example)
spm_pieces = spm_tok.EncodeAsPieces(example)
print("SentencePiece ids:", spm_ids)
print("SentencePiece pieces:", spm_pieces)
print("SentencePiece decoded string:", spm_tok.DecodeIds(spm_ids))

class CharDataset(Dataset):
    def __init__(self, token_ids, context_length):
        self.ids = token_ids
        self.context_length = context_length

    def __len__(self):
        return max(0, len(self.ids) - self.context_length)

    def __getitem__(self, idx):
        x = self.ids[idx: idx + self.context_length]
        y = self.ids[idx + 1: idx + self.context_length + 1]
        return torch.tensor(x, dtype=torch.long), torch.tensor(y, dtype=torch.long)

def build_token_ids(tokenizer_type, tokenizer, text):
    if tokenizer_type == "char":
        ids = tokenizer.encode(text)
    elif tokenizer_type == "bpe":
        ids = bpe_tok.encode(text).ids
    elif tokenizer_type == "spm":
        ids = spm_tok.EncodeAsIds(text)
    else:
        print("Invalid tokenizer type:", tokenizer_type)
        return None
    return ids

def train_model(model, train_loader, val_loader, optimizer, criterion, device, epochs, print_every=100):
    model.to(device)
    history = {"train_loss": [], "val_loss": []}
    model.train()

    for epoch in range(epochs):
        total_loss = 0.0
        for i, (xb, yb) in enumerate(train_loader):
            xb = xb.to(device)
            yb = yb.to(device)
            optimizer.zero_grad()
            logits = model(xb)
            B, S, V = logits.shape
            loss = criterion(logits.view(B*S, V), yb.view(B*S))
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()

            total_loss += loss.item()

        val_loss = evaluate_model(model, val_loader, criterion, device)
        history["val_loss"].append((epoch, val_loss))
        print(f"Epoch {epoch+1} validation loss: {val_loss:.4f}")

    return history

def evaluate_model(model, dataloader, criterion, device):
    model.eval()
    total = 0.0
    count = 0
    with torch.no_grad():
        for xb, yb in dataloader:
            xb = xb.to(device)
            yb = yb.to(device)
            logits = model(xb)
            B, S, V = logits.shape
            loss = criterion(logits.view(B*S, V), yb.view(B*S))
            total += loss.item() * xb.size(0)
            count += xb.size(0)
    return total / count

In [None]:
# Cell 3: Positional Encoding (From Scratch)
import torch
import torch.nn as nn
import math

class SinusoidalPositionalEncoding(nn.Module):
    def __init__(self, d_model, max_seq_len):
        super().__init__()
        self.d_model = d_model
        self.max_seq_len = max_seq_len

        pe = torch.zeros(max_seq_len, d_model, dtype=torch.float32)
        position = torch.arange(0, max_seq_len, dtype=torch.float32).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2, dtype=torch.float32) * -(math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term) # Even
        pe[:, 1::2] = torch.cos(position * div_term) # Odd
        self.register_buffer("pe", pe)

    def forward(self, x):
        seq_len = x.size(1)
        return self.pe[:seq_len, :].unsqueeze(0).to(x.device)
    
d_model = 64
max_seq_len = 256
pos_enc = SinusoidalPositionalEncoding(d_model, max_seq_len)

pe_mat = pos_enc.pe
def print_pe_value(pos, dim):
    v = float(pe_mat[pos, dim].item())
    print(f"pos={pos}, dim={dim} => {v:.10f}")

print_pe_value(5, 10)
print_pe_value(5, 15)
print_pe_value(100, 20)
print_pe_value(100, 25)

In [None]:
# Cell 4: Transformer Building Blocks (From Scratch)
import torch
import torch.nn as nn
import torch.nn.functional as F

class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff, dropout=0.1, activation=nn.GELU):
        super().__init__()
        self.ln = nn.LayerNorm(d_model)
        self.fc1 = nn.Linear(d_model, d_ff)
        self.act = activation()
        self.dropout1 = nn.Dropout(dropout)
        self.fc2 = nn.Linear(d_ff, d_model)
        self.dropout2 = nn.Dropout(dropout)

    def forward(self, x):
        residual = x
        out = self.ln(x)
        out = self.fc1(out)
        out = self.act(out)
        out = self.dropout1(out)
        out = self.fc2(out)
        out = self.dropout2(out)
        return residual + out
    
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, n_heads, dropout=0.1):
        super().__init__()
        assert d_model % n_heads == 0, "d_model must be divisible by n_heads"
        self.d_model = d_model
        self.n_heads = n_heads
        self.d_k = d_model // n_heads

        self.q_proj = nn.Linear(d_model, d_model)
        self.k_proj = nn.Linear(d_model, d_model)
        self.v_proj = nn.Linear(d_model, d_model)
        self.out_proj = nn.Linear(d_model, d_model)

        self.ln = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def causal_mask(self, seq_len, device):
        mask = torch.tril(torch.ones((seq_len, seq_len), device=device)).unsqueeze(0).unsqueeze(0)
        return mask

    def forward(self, x, attn_mask=None):
        B, S, _ = x.size()
        residual = x
        x_norm = self.ln(x)

        q = self.q_proj(x_norm)
        k = self.k_proj(x_norm)
        v = self.v_proj(x_norm)

        q = q.view(B, S, self.n_heads, self.d_k).transpose(1, 2)
        k = k.view(B, S, self.n_heads, self.d_k).transpose(1, 2)
        v = v.view(B, S, self.n_heads, self.d_k).transpose(1, 2)

        scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(self.d_k)

        causal = self.causal_mask(S, x.device)
        scores = scores.masked_fill(causal == 0, float("-inf"))

        if attn_mask is not None:
            scores = scores + attn_mask

        attn = torch.softmax(scores, dim=-1)
        attn = self.dropout(attn)

        context = torch.matmul(attn, v)
        context = context.transpose(1, 2).contiguous().view(B, S, self.d_model)
        out = self.out_proj(context)
        out = self.dropout(out)

        return residual + out
    
if __name__ == "__main__": 
    B, S, d_model, heads = 2, 8, 64, 8
    x = torch.randn(B, S, d_model)
    att = MultiHeadAttention(d_model=d_model, n_heads=heads)
    y = att(x)
    print("Attention output shape:", y.shape)
    ff = FeedForward(d_model=d_model, d_ff=4*d_model)
    z = ff(y)
    print("FF output shape:", z.shape)

In [None]:
# Cell 5: Transformer Implementation and Training
import torch.nn as nn
import torch.optim as optim

class Decoder(nn.Module):
    def __init__(self, d_model, n_heads, d_ff, dropout=0.1):
        super().__init__()
        self.attention = MultiHeadAttention(d_model, n_heads, dropout=dropout)
        self.ff = FeedForward(d_model, d_ff, dropout=dropout)

    def forward(self, x):
        x = self.attn(x)
        x = self.ff(x)
        return x
    
class TransformerDecoder(nn.Module):
    def __init__(self, vocab_size, d_model=256, n_layers=4, n_heads=8, d_ff=None, context_length=128, dropout=0.1, pos_enc_module=None):
        super().__init__()
        self.vocab_size = vocab_size
        self.d_model = d_model
        self.context_length = context_length

        self.token_embedding = nn.Embedding(vocab_size, d_model)
        self.pos_enc = pos_enc_module if pos_enc_module is not None else SinusoidalPositionalEncoding(d_model, max_seq_len=context_length)
        self.dropout = nn.Dropout(dropout)
        self.blocks = nn.ModuleList()

        if d_ff is None:
            d_ff = 4 * d_model

        for _ in range(n_layers):
            self.blocks.append(Decoder(d_model, n_heads, d_ff, dropout=dropout))
        
        self.ln_f = nn.LayerNorm(d_model)
        self.head = nn.Linear(d_model, vocab_size, bias=False)

    def forward(self, idx):
        B, S = idx.shape
        assert S <= self.context_length, f"Sequence length too long: {S} > {self.context_length}"
        token_embedding = self.token_embedding(idx)
        pos = self.pos_enc(torch.zeros(B, S, self.d_model, device=token_embedding.device))
        x = self.dropout(token_embedding + pos)
        
        for block in self.blocks:
            x = block(x)
        
        x = self.ln_f(x)
        logits = self.head(x)

        return logits
    
# Hyperparameters
context_length = 128
d_model = 256
n_layers = 4
n_heads = 8
dropout = 0.1

vocab_for_model = bpe_tok.get_vocab_size()
print("Using vocab size for model:", vocab_for_model)

pos_module = SinusoidalPositionalEncoding(d_model=d_model, max_seq_len=context_length)

model = TransformerDecoder(
    vocab_size=vocab_for_model,
    d_model=d_model,
    n_layers=n_layers,
    n_heads=n_heads,
    d_ff=4*d_model,
    context_length=context_length,
    dropout=dropout,
    pos_enc_module=pos_module
).to(device)

# Prepare training data tokens (use BPE tokens in this run)
ids_train = build_token_ids("bpe", tokenizer=bpe_tok, text=train_text)
ids_val = build_token_ids("bpe", tokenizer=bpe_tok, text=val_text)

train_dataset = CharDataset(ids_train, context_length=context_length)
val_dataset = CharDataset(ids_val, context_length=context_length)

batch_size = 16
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, drop_last=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, drop_last=False)

optimizer = optim.Adam(model.parameters(), lr=1e-3, weight_decay=0.01)
criterion = nn.CrossEntropyLoss()

history = train_model(model, train_loader, val_loader, optimizer, criterion, device, epochs=3, print_every=50)

final_val = evaluate_model(model, val_loader, criterion, device)
print("Final validation loss:", final_val)

### **Cell 6: Generation and Sampling Plan**

**Prompt**: I'll use `O Romeo, Romeo!` as my prompt

**Parameters to test**
- **Temperature:** T = 0.2 and T = 1.0
- **Top-k** k = 5 and k = 50
- **Top-p**p = 0.6 and p = 0.9

**Hypothesis**
- Temperature 0.2: the generations will be repetitive and conservative
- Temperature 1.0: more varied
- Top-k small (5): might get stuck in loops
- Top-k 50: more variety, more creative
- Top-p 0.6: conservative, coherent but safe
- Top-p 0.9: more creative and human-like, but may output interesting tokens

In [None]:
# Cell 7: Generation and Sampling Implementation
import torch.nn.functional as F
import numpy as np

def sample_temperature(logits, T):
    logits = logits / (T + 1e-12)
    probs = F.softmax(logits, dim=-1)
    idx = torch.multinomial(probs, num_samples=1).item()

    return idx

def sample_top_k(logits, k):
    if k <= 0:
        return sample_temperature(logits, 1.0)
    
    values, indices = torch.topk(logits, k)
    probs = torch.zeros_like(logits).to(logits.device)
    probs[indices] = F.softmax(values, dim=-1)
    idx = torch.multinomial(probs, num_samples=1).item()

    return idx

def sample_top_p(logits, p):
    sorted_logits, sorted_indices = torch.sort(logits, descending=True)
    probs = F.softmax(sorted_logits, dim=-1)
    cumulative_probs = torch.cumsum(probs, dim=-1)
    cutoff = torch.searchsorted(cumulative_probs, p)
    cutoff = min(cutoff.item()+1, logits.size(0))
    probs_to_sample = probs[:cutoff]
    indices_to_sample = sorted_indices[:cutoff]
    probs_to_sample = probs_to_sample / probs_to_sample.sum()
    chosen_index = torch.multinomial(probs_to_sample, num_samples=1).item()

    return int(indices_to_sample[chosen_index].item())

def generate_autoregressive(model, tokenizer_type, tokenizer, prompt, max_new_tokens, method="temperature", method_param=1.0, device=device):
    # Convert prompt to token ids using specified tokenizer
    if tokenizer_type == "char":
        ids = tokenizer.encode(prompt)
    elif tokenizer_type == "bpe":
        ids = bpe_tok.encode(prompt).ids
    elif tokenizer_type == "spm":
        ids = spm_tok.EncodeAsIds(prompt)
    else:
        raise ValueError("Unknown tokenizer_type")

    model.eval()
    cur_ids = ids.copy()
    for _ in range(max_new_tokens):
        input_ids = cur_ids[-model.context_length:]
        x = torch.tensor([input_ids], dtype=torch.long, device=device)
        logits = model(x)
        last_logits = logits[0, -1, :].detach().cpu()

        if method == "temperature":
            next_id = sample_temperature(last_logits, method_param)
        elif method == "top_k":
            next_id = sample_top_k(last_logits, int(method_param))
        elif method == "top_p":
            next_id = sample_top_p(last_logits, float(method_param))
        else:
            raise ValueError("Unknown sampling method")
        cur_ids.append(next_id)

    if tokenizer_type == "char":
        out_text = tokenizer.decode(cur_ids)
    elif tokenizer_type == "bpe":
        out_text = bpe_tok.decode(cur_ids)
    elif tokenizer_type == "spm":
        out_text = spm_tok.DecodeIds(cur_ids)

    return out_text

# Generate examples for each method and parameter set
seed_prompt = "O Romeo, Romeo!"
print("Seed prompt:", seed_prompt)

print("\n--- Temperature T=0.2 ---")
print(generate_autoregressive(model, "bpe", bpe_tok, seed_prompt, max_new_tokens=120, method="temperature", method_param=0.2))

print("\n--- Temperature T=1.0 ---")
print(generate_autoregressive(model, "bpe", bpe_tok, seed_prompt, max_new_tokens=120, method="temperature", method_param=1.0))

print("\n--- Top-k k=5 ---")
print(generate_autoregressive(model, "bpe", bpe_tok, seed_prompt, max_new_tokens=120, method="top_k", method_param=5))

print("\n--- Top-k k=50 ---")
print(generate_autoregressive(model, "bpe", bpe_tok, seed_prompt, max_new_tokens=120, method="top_k", method_param=50))

print("\n--- Top-p p=0.6 ---")
print(generate_autoregressive(model, "bpe", bpe_tok, seed_prompt, max_new_tokens=120, method="top_p", method_param=0.6))

print("\n--- Top-p p=0.9 ---")
print(generate_autoregressive(model, "bpe", bpe_tok, seed_prompt, max_new_tokens=120, method="top_p", method_param=0.9))

### **Cell 8: Analysis and Discussion**

**Tokenizer Comparison**
- **Char-level**: smallest implementation compelxity and least requirements. Vocabulary is equal to the number of unique characters. Tokenization is trivial but sequence lengths are long, making modeling potentially slower.
- **BPE**: 