# Tiny GPT-2 implementation

In [1]:
import os
import io
import math
import pickle
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from collections import Counter
from tokenizers.implementations import ByteLevelBPETokenizer
from tokenizers.pre_tokenizers import ByteLevel
from tokenizers.decoders import ByteLevel as ByteLevelDecoder

# Importing dataset

In [2]:
file_path = "lines.txt"

# Read full dataset
with open("lines.txt", "r", encoding="utf-8") as f:
    text = f.read()

print(text[:200])

delicate savage / you'll never hold the cinder / but still you will burn $
our destination / the skyline of this city / shining horizon $
a splash and a cry /  words pulled from the riverside /  dried


# Tokenization

In [3]:
tokenizer = ByteLevelBPETokenizer()

# Ensure we use GPT-2-style byte-level pre-tokenizer and decoder
tokenizer.pre_tokenizer = ByteLevel()
tokenizer.decoder = ByteLevelDecoder()

In [4]:
# Train BPE merges directly on the full corpus file
tokenizer.train(
    files=[file_path],
    vocab_size=1024,
    min_frequency=2,
    special_tokens=["<|endoftext|>"],
)

In [5]:
# Quick sanity check on a small slice
sample_text = text[:200]
sample_ids = tokenizer.encode(sample_text).ids
sample_roundtrip = tokenizer.decode(sample_ids)
print("Roundtrip:", sample_roundtrip[:200])

Roundtrip:  delicate savage / you'll never hold the cinder / but still you will burn $
our destination / the skyline of this city / shining horizon $
a splash and a cry /  words pulled from the riverside /  drie


In [6]:
# Encode entire corpus into token IDs
all_ids = tokenizer.encode(text).ids
all_ids = np.array(all_ids, dtype=np.int32)

In [7]:
# Train/validation split at token level (90/10)
n_tokens = len(all_ids)
split_idx = int(0.9 * n_tokens)
train_tokens = all_ids[:split_idx]
val_tokens = all_ids[split_idx:]

# Sampling windows

In [8]:
block_size = 64   # context length (tokens per training example)
batch_size = 256    # number of sequences per batch


def get_batch(split):
    """
    Return a batch of input (x) and target (y) token windows.
    """
    data = train_tokens if split == "train" else val_tokens

    # Random starting indices; ensure room for block_size+1 (for targets)
    max_start = len(data) - block_size - 1
    idx = np.random.randint(0, max_start + 1, size=(batch_size,))

    # Build x and y by slicing contiguous windows
    x = np.stack([data[i : i + block_size] for i in idx])
    y = np.stack([data[i + 1 : i + 1 + block_size] for i in idx])

    return x, y

# Model Configuration

In [9]:
import math
import torch
import torch.nn as nn
import torch.nn.functional as F

In [10]:
# Detect device
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Device: {device}")

Device: cuda


In [11]:
# fixed vocab size = 256
vocab_size = tokenizer.get_vocab_size()
print(vocab_size)

1024


In [12]:
# Tiny GPT-2 Config
n_layer = 8      # number of Transformer blocks
n_head  = 8      # attention heads per block
n_embd  = 512    # embedding (model) dimension
dropout = 0.10   # mild regularization

# Optimization knobs
learning_rate   = 3e-4
weight_decay    = 0.01
grad_clip       = 1.0
warmup_steps    = 300
train_steps     = 10000

# Architecture

In [13]:
# GELU activation
class GELU(nn.Module):
    def forward(self, x):
        return F.gelu(x)

In [14]:
# Masked multi-head self-attention
class CausalSelfAttention(nn.Module):
    def __init__(self, n_embd, n_head, block_size, dropout):
        super().__init__()
        assert n_embd % n_head == 0  # must divide evenly
        self.n_head = n_head
        self.head_dim = n_embd // n_head

        # Combined projection for Q, K, V
        self.qkv = nn.Linear(n_embd, 3 * n_embd, bias=False)
        self.out_proj = nn.Linear(n_embd, n_embd)

        # Causal mask to prevent looking ahead
        mask = torch.triu(torch.ones(block_size, block_size), 1)
        self.register_buffer("mask", mask == 1)

        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        B, T, C = x.shape  # (batch, time, channels)

        # Compute Q, K, V
        qkv = self.qkv(x)  # shape: (B, T, 3*C)
        q, k, v = qkv.split(C, dim=2)

        # Reshape for multi-head attention (B, T, 3*C)
        q = q.view(B, T, self.n_head, self.head_dim).transpose(1, 2)
        k = k.view(B, T, self.n_head, self.head_dim).transpose(1, 2)
        v = v.view(B, T, self.n_head, self.head_dim).transpose(1, 2)

        # Scaled dot-product attention scores
        att = (q @ k.transpose(-2, -1)) / (self.head_dim ** 0.5) # (B, n_head, T, T)

        # Apply causal mask (True entries get -inf)
        att = att.masked_fill(self.mask[:T, :T], float('-inf'))

        # Softmax + droput -> attention weights
        att = F.softmax(att, dim=-1)
        att = self.dropout(att)

        # Weighted sum of values
        out = att @ v  # (B, n_head, T, head_dim)

        # Recombine heads
        out = out.transpose(1, 2).contiguous().view(B, T, C)
        out = self.out_proj(out)

        return out

In [15]:
# Feed-forward MLP
class FeedForward(nn.Module):
    def __init__(self, n_embd, dropout):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, 4 * n_embd),
            GELU(),
            nn.Linear(4 * n_embd, n_embd),
            nn.Dropout(dropout),
        )

    def forward(self, x):
        return self.net(x)

In [16]:
# Transformer block
class TransformerBlock(nn.Module):
    def __init__(self, n_embd, n_head, block_size, dropout):
        super().__init__()
        self.ln1 = nn.LayerNorm(n_embd)
        self.attn = CausalSelfAttention(n_embd, n_head, block_size, dropout)
        self.ln2 = nn.LayerNorm(n_embd)
        self.mlp = FeedForward(n_embd, dropout)

    def forward(self, x):
        x = x + self.attn(self.ln1(x))  # Pre-norm + residual
        x = x + self.mlp(self.ln2(x))   # Pre-norm + residual
        return x

In [17]:
# Full GPT-2 model
class GPT2Tiny(nn.Module):
    def __init__(self, vocab_size, n_layer, n_head, n_embd, block_size, dropout):
        super().__init__()
        # Token and positional embeddings
        self.tok_emb = nn.Embedding(vocab_size, n_embd)
        self.pos_emb = nn.Embedding(block_size, n_embd)

        self.blocks = nn.ModuleList([
            TransformerBlock(n_embd, n_head, block_size, dropout)
            for _ in range(n_layer)
        ])
        self.ln_f = nn.LayerNorm(n_embd)  # Final normalization
        self.head = nn.Linear(n_embd, vocab_size, bias=False)  # LM head

        self.block_size = block_size

    def forward(self, idx, targets=None):
        B, T = idx.shape
        assert T <= self.block_size, "Input sequence too long!"

        # Embed tokens + positions
        tok_emb = self.tok_emb(idx)  # (B, T, n_embd)
        pos = torch.arange(0, T, device=idx.device).unsqueeze(0)
        pos_emb = self.pos_emb(pos)  # (1, T, n_embd)

        x = tok_emb + pos_emb

        # Pass through transformer blocks
        for block in self.blocks:
            x = block(x)

        # Final layernorm + logits
        x = self.ln_f(x)
        logits = self.head(x)  # (B, T, vocab_size)

        # If training, compute loss
        loss = None
        if targets is not None:
            logits = logits.view(-1, logits.size(-1))
            targets = targets.view(-1)
            loss = F.cross_entropy(logits, targets)

        return logits, loss



In [18]:
# Instantiate model
model = GPT2Tiny(
    vocab_size=vocab_size,
    n_layer=n_layer,
    n_head=n_head,
    n_embd=n_embd,
    block_size=block_size,
    dropout=dropout
).to(device)

# Optimizer and scheduler

In [19]:
optimizer = torch.optim.AdamW(
    model.parameters(),
    lr=learning_rate,
    weight_decay=weight_decay,
)

In [20]:
# Simple linear warmup (optional)
def get_lr(step):
    if step < warmup_steps:
        return learning_rate * (step + 1) / warmup_steps
    return learning_rate


# Training loop

In [21]:
@torch.no_grad()
def estimate_loss(num_batches=50):
    model.eval()
    out = {}
    for split in ["train", "val"]:
        losses = []
        for _ in range(num_batches):
            x, y = get_batch(split)
            x = torch.from_numpy(x).long().to(device)
            y = torch.from_numpy(y).long().to(device)
            _, loss = model(x, y)
            losses.append(loss.item())
        out[split] = sum(losses) / len(losses)
    model.train()
    return out

In [22]:
eval_interval = 200

model.train()
for step in range(train_steps):
    # Adjust learning rate (warmup)
    lr = get_lr(step)
    for param_group in optimizer.param_groups:
        param_group["lr"] = lr

    # Get batch
    x, y = get_batch("train")
    x = torch.from_numpy(x).long().to(device)
    y = torch.from_numpy(y).long().to(device)

    # Forward + backward
    logits, loss = model(x, y)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    torch.nn.utils.clip_grad_norm_(model.parameters(), grad_clip)
    optimizer.step()

    # Logging
    if step % eval_interval == 0 or step == train_steps - 1:
        losses = estimate_loss(num_batches=20)
        print(
            f"step {step}: "
            f"train loss {losses['train']:.4f}, "
            f"val loss {losses['val']:.4f}, "
            f"curr lr {lr:.2e}"
        )

step 0: train loss 7.1528, val loss 7.1543, curr lr 1.00e-06
step 200: train loss 4.1681, val loss 4.2587, curr lr 2.01e-04
step 400: train loss 3.5274, val loss 3.8028, curr lr 3.00e-04
step 600: train loss 2.8819, val loss 3.6060, curr lr 3.00e-04
step 800: train loss 1.9406, val loss 3.7879, curr lr 3.00e-04
step 1000: train loss 0.7963, val loss 4.3519, curr lr 3.00e-04
step 1200: train loss 0.3186, val loss 4.9522, curr lr 3.00e-04
step 1400: train loss 0.2382, val loss 5.3016, curr lr 3.00e-04
step 1600: train loss 0.2159, val loss 5.5756, curr lr 3.00e-04
step 1800: train loss 0.2032, val loss 5.7263, curr lr 3.00e-04
step 2000: train loss 0.1917, val loss 5.8078, curr lr 3.00e-04
step 2200: train loss 0.1866, val loss 5.9545, curr lr 3.00e-04
step 2400: train loss 0.1810, val loss 6.0555, curr lr 3.00e-04
step 2600: train loss 0.1758, val loss 6.1495, curr lr 3.00e-04
step 2800: train loss 0.1722, val loss 6.2120, curr lr 3.00e-04
step 3000: train loss 0.1701, val loss 6.2945, 

# Text Generation

In [40]:
@torch.no_grad()
def generate(model, idx, max_new_tokens, temperature=1.0, top_k=None):
    """
    Autoregressive generation with optional temperature and top-k sampling.
    """
    model.eval()
    for _ in range(max_new_tokens):
        # Crop context to block size
        idx_cond = idx[:, -model.block_size:]

        # Forward pass
        logits, _ = model(idx_cond)   # (B, T, vocab_size)
        logits = logits[:, -1, :]    # (B, vocab_size) â€“ last time step

        # Temperature
        logits = logits / temperature

        # Optional top-k filtering
        if top_k is not None:
            v, _ = torch.topk(logits, top_k)
            threshold = v[:, [-1]]
            logits[logits < threshold] = -float('inf')

        # Sample next token
        probs = F.softmax(logits, dim=-1)
        next_id = torch.multinomial(probs, num_samples=1)  # (B, 1)

        # Append
        idx = torch.cat((idx, next_id), dim=1)

    return idx

In [41]:
def generate_text(prompt, max_new_tokens=50, temperature=1.0, top_k=None):
    """
    Convenience wrapper: takes a text prompt and returns generated text.
    Works with `tokenizers.Tokenizer`.
    """
    # Encode prompt -> Encoding object
    encoding = tokenizer.encode(prompt)
    input_ids = encoding.ids  # list[int]

    # Make tensor on the right device
    idx = torch.tensor([input_ids], dtype=torch.long, device=device)  # shape (1, T)

    # Generate
    out = generate(
        model,
        idx,
        max_new_tokens=max_new_tokens,
        temperature=temperature,
        top_k=top_k,
    )

    # Decode full sequence (prompt + generated tokens)
    generated_ids = out[0].tolist()
    return tokenizer.decode(generated_ids)

In [42]:
# Example usage after training:
sample_prompt = "Once upon a time"
print("Generated text:\n")
print(generate_text(sample_prompt, max_new_tokens=60, temperature=0.7, top_k=50))

Generated text:

 Once upon a time $
breathe in the fresh air / crisp and cool spring is now here / bright rosy red cheeks $
in my room the walls / breathe and the windows blink but / the door remains


In [45]:
sample_prompt = "round and round we go"
print("Generated text:\n")
print(generate_text(sample_prompt, max_new_tokens=60))

Generated text:

 round and round we go / of all it takes is time $
we are one family / knight errants of the divine / forever ever more $
stop falling in love / it is not good for the soul / loneliness is god $

