<img src="https://theaiengineer.dev/tae_logo_gw_flatter.png" width=35% align=right>

# Building a Large Language Model from Scratch — A Step-by-Step Guide Using Python and PyTorch
## Chapter 10 — Training the Model
**© Dr. Yves J. Hilpisch**<br>AI-Powered by GPT-5.

## How to Use This Notebook

- Set up data loaders, optimizers, and schedulers for efficient training.
- Track metrics and checkpoints so experiments are repeatable.
- Stress-test the training loop with gradient accumulation and mixed precision.

### Roadmap

We wire the dataset, define the training loop, add evaluation hooks, and finish with logging utilities you can reuse elsewhere.

### Study Tips

Treat this notebook like a lab notebook: record hyperparameters, seeds, and observations directly alongside the code.

In [None]:
# Ensure PyTorch is available (CPU or CUDA)
import subprocess
import sys

try:
    import torch  # noqa: F401
    print('PyTorch found')
except Exception:
    print('Installing PyTorch...')
    has_cuda = False
    try:
        r = subprocess.run(['nvidia-smi'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        has_cuda = (r.returncode == 0)
    except Exception:
        has_cuda = False
    index_url = 'https://download.pytorch.org/whl/cu121' if has_cuda else 'https://download.pytorch.org/whl/cpu'
    subprocess.check_call([sys.executable, '-m', 'pip', 'install', '--index-url', index_url, 'torch', 'torchvision', 'torchaudio'])
    import torch  # noqa: F401
print('torch', torch.__version__)


In [None]:
# Imports, style, and device helper
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
plt.style.use('seaborn-v0_8')
%config InlineBackend.figure_format = 'svg'
torch.manual_seed(0)
device = (
    'cuda' if torch.cuda.is_available() else
    'mps' if getattr(torch.backends, 'mps', None)
            and torch.backends.mps.is_available()
    else 'cpu'
)
device


In [None]:
# Minimal tokenizer fallback: byte-level ids
def build_ids_byte_level(text: str):
    data = text.encode('utf-8', errors='ignore')
    ids = torch.tensor(list(data), dtype=torch.long)
    return ids, 256

txt = ('Hello world.\nHello vectors.\n') * 64
ids, vocab = build_ids_byte_level(txt)
ids.shape, vocab

In [None]:
# Dataset: overlapping windows for next-token prediction
class LMSequenceDataset(Dataset):
    def __init__(self, ids: torch.Tensor, block_size: int):
        self.ids = ids; self.T = int(block_size)
    def __len__(self):
        return max(0, self.ids.numel() - self.T)
    def __getitem__(self, idx):
        i = int(idx)
        x = self.ids[i:i+self.T]
        y = self.ids[i+1:i+self.T+1]
        return x, y
ds = LMSequenceDataset(ids, block_size=64)
len(ds)


In [None]:
# Sinusoidal positions (reuse from Ch. 8)
def sinusoidal_positions(T: int, d_model: int, device=None):
    pos = torch.arange(T, device=device).float()[:, None]
    i = torch.arange(d_model, device=device).float()[None, :]
    ang = pos / (10000 ** (2 * (i//2) / d_model))
    enc = torch.zeros(T, d_model, device=device)
    enc[:, 0::2] = torch.sin(ang[:, 0::2])
    enc[:, 1::2] = torch.cos(ang[:, 1::2])
    return enc
sinusoidal_positions(4, 8).shape


In [None]:
# Multi-head attention with mask normalization as in Ch. 8
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model: int, num_heads: int, dropout: float = 0.0):
        super().__init__()
        assert d_model % num_heads == 0
        self.h = num_heads; self.d = d_model // num_heads
        self.qkv = nn.Linear(d_model, 3*d_model, bias=False)
        self.out = nn.Linear(d_model, d_model, bias=False)
        self.drop = nn.Dropout(dropout)
    def forward(self, x, mask=None):
        B, T, Dm = x.shape
        qkv = self.qkv(x); q, k, v = qkv.chunk(3, dim=-1)
        def split(t):
            return t.view(B, T, self.h, self.d).transpose(1, 2)
        q, k, v = map(split, (q, k, v))
        sdpa_mask = None
        if mask is not None:
            if mask.dim() == 2:
                base = (mask == 0).bool()[None, None, :, :]
                sdpa_mask = base.expand(B, self.h, T, T)
            elif mask.dim() == 3:
                base = (mask == 0).bool().unsqueeze(1)
                sdpa_mask = base.expand(B, self.h, T, T)
            elif mask.dim() == 4:
                if mask.size(1) == 1:
                    sdpa_mask = (mask == 0).bool().expand(B, self.h, T, T)
                else:
                    sdpa_mask = (mask == 0).bool()
        attn = F.scaled_dot_product_attention(q, k, v, attn_mask=sdpa_mask)
        attn = self.drop(attn)
        y = attn.transpose(1, 2).contiguous().view(B, T, Dm)
        return self.out(y)
MultiHeadAttention(32, 4)


In [None]:
# Feed-forward, residual, block, config, and GPT (compact)
class FeedForward(nn.Module):
    def __init__(self, d_model: int, d_ff: int, dropout: float = 0.0):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(d_model, d_ff), nn.GELU(), nn.Dropout(dropout),
            nn.Linear(d_ff, d_model), nn.Dropout(dropout),
        )
    def forward(self, x): return self.net(x)
class Residual(nn.Module):
    def __init__(self, d_model: int):
        super().__init__(); self.norm = nn.LayerNorm(d_model)
    def forward(self, x, sub, *a, **k): return x + sub(self.norm(x), *a, **k)
class TransformerBlock(nn.Module):
    def __init__(self, d_model, n_head, d_ff, dropout=0.0):
        super().__init__()
        self.mha = MultiHeadAttention(d_model, n_head, dropout)
        self.ffn = FeedForward(d_model, d_ff, dropout)
        self.res1 = Residual(d_model); self.res2 = Residual(d_model)
    def forward(self, x, mask=None):
        x = self.res1(x, self.mha, mask); x = self.res2(x, self.ffn); return x
from dataclasses import dataclass
@dataclass
class GPTConfig:
    vocab_size: int; block_size: int; d_model: int = 128; n_head: int = 4
    n_layer: int = 2; d_ff: int = 512; dropout: float = 0.1
    pos_type: str = 'learned'; tie_weights: bool = True
class GPT(nn.Module):
    def __init__(self, cfg: GPTConfig):
        super().__init__(); self.cfg = cfg
        V, Tm, D = cfg.vocab_size, cfg.block_size, cfg.d_model
        self.tok = nn.Embedding(V, D)
        self.pos = nn.Embedding(Tm, D) if cfg.pos_type=='learned' else None
        self.drop = nn.Dropout(cfg.dropout)
        self.blocks = nn.ModuleList([
            TransformerBlock(D, cfg.n_head, cfg.d_ff, cfg.dropout)
            for _ in range(cfg.n_layer)])
        self.norm = nn.LayerNorm(D); self.head = nn.Linear(D, V, bias=False)
        if cfg.tie_weights: self.head.weight = self.tok.weight
        self.apply(self._init)
    def _init(self, m):
        if isinstance(m, (nn.Linear, nn.Embedding)):
            nn.init.normal_(m.weight, 0.0, 0.02)
        if isinstance(m, nn.Linear) and m.bias is not None:
            nn.init.zeros_(m.bias)
    def _mask(self, ids):
        T = ids.size(1); c = torch.tril(torch.ones(T, T, device=ids.device))
        return c
    def forward(self, input_ids, targets=None):
        B, T = input_ids.size(); x = self.tok(input_ids)
        if self.cfg.pos_type=='learned':
            pos = torch.arange(T, device=input_ids.device)[None, :]
            x = x + self.pos(pos)
        else:
            x = x + sinusoidal_positions(T, self.cfg.d_model, input_ids.device)[None,:,:]
        x = self.drop(x); mask = self._mask(input_ids)
        for b in self.blocks: x = b(x, mask)
        x = self.norm(x); logits = self.head(x)
        loss = None
        if targets is not None:
            lf = logits.reshape(B*T, -1); tf = targets.reshape(B*T)
            loss = F.cross_entropy(lf, tf)
        return logits, loss
cfg = GPTConfig(vocab_size=vocab, block_size=64, d_model=128, n_head=4,
                n_layer=2, d_ff=512, dropout=0.1)
model = GPT(cfg).to(device); model


In [None]:
# DataLoader and one batch
dl = DataLoader(ds, batch_size=64, shuffle=True, drop_last=True)
xb, yb = next(iter(dl))
xb.shape, yb.shape


In [None]:
# One forward/backward/update; print loss
opt = torch.optim.AdamW(model.parameters(), lr=3e-4)
xb, yb = xb.to(device), yb.to(device)
opt.zero_grad(set_to_none=True)
_, loss = model(xb, targets=yb)
loss.backward(); opt.step(); float(loss.detach().cpu().item())


In [None]:
# Short training run with simple linear warmup and loss curve
steps = 200; base_lr = 3e-4; warmup = 50; hist = []
opt = torch.optim.AdamW(model.parameters(), lr=base_lr)
for i, (x, y) in enumerate(dl):
    if i >= steps: break
    # linear warmup scale in [0,1]
    scale = min(1.0, (i+1)/float(warmup))
    for g in opt.param_groups: g['lr'] = base_lr * scale
    x, y = x.to(device), y.to(device)
    opt.zero_grad(set_to_none=True)
    _, loss = model(x, targets=y)
    loss.backward(); opt.step()
    if i % 20 == 0: print(i, g['lr'], float(loss.detach().cpu().item()))
    hist.append(float(loss.detach().cpu().item()))
plt.figure(figsize=(5.2,3.2)); plt.plot(hist); plt.title('Training loss');
plt.xlabel('step'); plt.ylabel('CE'); plt.show()


In [None]:
# Quick sampling preview (greedy/temperature/top-k)
@torch.no_grad()
def sample(model, input_ids, max_new_tokens=50, temperature=1.0, top_k=None):
    model.eval(); x = input_ids.to(device)
    for _ in range(max_new_tokens):
        x_cond = x[:, -cfg.block_size:]
        logits, _ = model(x_cond)
        logits = logits[:, -1, :]
        if temperature <= 0:
            next_id = torch.argmax(logits, dim=-1, keepdim=True)
        else:
            logits = logits / temperature
            if top_k is not None and top_k > 0:
                v, _ = torch.topk(logits, top_k)
                thr = v[:, [-1]]
                logits = torch.where(logits < thr, torch.tensor(-1e9, device=device), logits)
            probs = torch.softmax(logits, dim=-1)
            next_id = torch.multinomial(probs, num_samples=1)
        x = torch.cat([x, next_id], dim=1)
    return x
seed = torch.tensor([[ids[0].item()] * 4], dtype=torch.long, device=device)
gen = sample(model, seed, max_new_tokens=16, temperature=0.8, top_k=40)
gen.shape


## Exercises

- Implement gradient accumulation and confirm loss curves match the non-accumulated baseline.
- Add early stopping based on validation loss and test that it triggers appropriately.
- Integrate a lightweight experiment tracker (Weights & Biases, TensorBoard, or CSV logger) and log key metrics.

<img src="https://theaiengineer.dev/tae_logo_gw_flatter.png" width=35% align=right>