In [2]:
!pip install -q tiktoken tqdm


In [3]:
# We always start with a dataset to train on. Let's download the tiny shakespeare dataset
!wget https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt

--2025-06-20 11:53:27--  https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.109.133, 185.199.108.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.109.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1115394 (1.1M) [text/plain]
Saving to: ‘input.txt.1’


2025-06-20 11:53:27 (22.6 MB/s) - ‘input.txt.1’ saved [1115394/1115394]



# Karpathy's model

In [4]:
import torch
import torch.nn as nn
from torch.nn import functional as F

# hyperparameters
batch_size = 64 # how many independent sequences will we process in parallel?
block_size = 256 # what is the maximum context length for predictions?
max_iters = 5000
eval_interval = 500
learning_rate = 3e-4
device = 'cuda' if torch.cuda.is_available() else 'cpu'
eval_iters = 200
n_embd = 384
n_head = 6
n_layer = 6
dropout = 0.2
# ------------

torch.manual_seed(1337)

# wget https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt
with open('input.txt', 'r', encoding='utf-8') as f:
    text = f.read()

# here are all the unique characters that occur in this text
chars = sorted(list(set(text)))
vocab_size = len(chars)
# create a mapping from characters to integers
stoi = { ch:i for i,ch in enumerate(chars) }
itos = { i:ch for i,ch in enumerate(chars) }
encode = lambda s: [stoi[c] for c in s] # encoder: take a string, output a list of integers
decode = lambda l: ''.join([itos[i] for i in l]) # decoder: take a list of integers, output a string

# Train and test splits
data = torch.tensor(encode(text), dtype=torch.long)
n = int(0.9*len(data)) # first 90% will be train, rest val
train_data = data[:n]
val_data = data[n:]

# data loading
def get_batch(split):
    # generate a small batch of data of inputs x and targets y
    data = train_data if split == 'train' else val_data
    ix = torch.randint(len(data) - block_size, (batch_size,))
    x = torch.stack([data[i:i+block_size] for i in ix])
    y = torch.stack([data[i+1:i+block_size+1] for i in ix])
    x, y = x.to(device), y.to(device)
    return x, y

@torch.no_grad()
def estimate_loss():
    out = {}
    model.eval()
    for split in ['train', 'val']:
        losses = torch.zeros(eval_iters)
        for k in range(eval_iters):
            X, Y = get_batch(split)
            logits, loss = model(X, Y)
            losses[k] = loss.item()
        out[split] = losses.mean()
    model.train()
    return out

class Head(nn.Module):
    """ one head of self-attention """

    def __init__(self, head_size):
        super().__init__()
        self.key = nn.Linear(n_embd, head_size, bias=False)
        self.query = nn.Linear(n_embd, head_size, bias=False)
        self.value = nn.Linear(n_embd, head_size, bias=False)
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))

        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # input of size (batch, time-step, channels)
        # output of size (batch, time-step, head size)
        B,T,C = x.shape
        k = self.key(x)   # (B,T,hs)
        q = self.query(x) # (B,T,hs)
        # compute attention scores ("affinities")
        wei = q @ k.transpose(-2,-1) * k.shape[-1]**-0.5 # (B, T, hs) @ (B, hs, T) -> (B, T, T)
        wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf')) # (B, T, T)
        wei = F.softmax(wei, dim=-1) # (B, T, T)
        wei = self.dropout(wei)
        # perform the weighted aggregation of the values
        v = self.value(x) # (B,T,hs)
        out = wei @ v # (B, T, T) @ (B, T, hs) -> (B, T, hs)
        return out

class MultiHeadAttention(nn.Module):
    """ multiple heads of self-attention in parallel """

    def __init__(self, num_heads, head_size):
        super().__init__()
        self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)])
        self.proj = nn.Linear(head_size * num_heads, n_embd)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1)
        out = self.dropout(self.proj(out))
        return out

class FeedFoward(nn.Module):
    """ a simple linear layer followed by a non-linearity """

    def __init__(self, n_embd):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, 4 * n_embd),
            nn.ReLU(),
            nn.Linear(4 * n_embd, n_embd),
            nn.Dropout(dropout),
        )

    def forward(self, x):
        return self.net(x)

class Block(nn.Module):
    """ Transformer block: communication followed by computation """

    def __init__(self, n_embd, n_head):
        # n_embd: embedding dimension, n_head: the number of heads we'd like
        super().__init__()
        head_size = n_embd // n_head
        self.sa = MultiHeadAttention(n_head, head_size)
        self.ffwd = FeedFoward(n_embd)
        self.ln1 = nn.LayerNorm(n_embd)
        self.ln2 = nn.LayerNorm(n_embd)

    def forward(self, x):
        x = x + self.sa(self.ln1(x))
        x = x + self.ffwd(self.ln2(x))
        return x

class GPTLanguageModel(nn.Module):

    def __init__(self):
        super().__init__()
        # each token directly reads off the logits for the next token from a lookup table
        self.token_embedding_table = nn.Embedding(vocab_size, n_embd)
        self.position_embedding_table = nn.Embedding(block_size, n_embd)
        self.blocks = nn.Sequential(*[Block(n_embd, n_head=n_head) for _ in range(n_layer)])
        self.ln_f = nn.LayerNorm(n_embd) # final layer norm
        self.lm_head = nn.Linear(n_embd, vocab_size)

        # better init, not covered in the original GPT video, but important, will cover in followup video
        self.apply(self._init_weights)

    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)

    def forward(self, idx, targets=None):
        B, T = idx.shape

        # idx and targets are both (B,T) tensor of integers
        tok_emb = self.token_embedding_table(idx) # (B,T,C)
        pos_emb = self.position_embedding_table(torch.arange(T, device=device)) # (T,C)
        x = tok_emb + pos_emb # (B,T,C)
        x = self.blocks(x) # (B,T,C)
        x = self.ln_f(x) # (B,T,C)
        logits = self.lm_head(x) # (B,T,vocab_size)

        if targets is None:
            loss = None
        else:
            B, T, C = logits.shape
            logits = logits.view(B*T, C)
            targets = targets.view(B*T)
            loss = F.cross_entropy(logits, targets)

        return logits, loss

    def generate(self, idx, max_new_tokens):
        # idx is (B, T) array of indices in the current context
        for _ in range(max_new_tokens):
            # crop idx to the last block_size tokens
            idx_cond = idx[:, -block_size:]
            # get the predictions
            logits, loss = self(idx_cond)
            # focus only on the last time step
            logits = logits[:, -1, :] # becomes (B, C)
            # apply softmax to get probabilities
            probs = F.softmax(logits, dim=-1) # (B, C)
            # sample from the distribution
            idx_next = torch.multinomial(probs, num_samples=1) # (B, 1)
            # append sampled index to the running sequence
            idx = torch.cat((idx, idx_next), dim=1) # (B, T+1)
        return idx

model = GPTLanguageModel()
m = model.to(device)
# print the number of parameters in the model
print(sum(p.numel() for p in m.parameters())/1e6, 'M parameters')

# create a PyTorch optimizer
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

for iter in range(max_iters):

    # every once in a while evaluate the loss on train and val sets
    if iter % eval_interval == 0 or iter == max_iters - 1:
        losses = estimate_loss()
        print(f"step {iter}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}")

    # sample a batch of data
    xb, yb = get_batch('train')

    # evaluate the loss
    logits, loss = model(xb, yb)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()

# generate from the model
context = torch.zeros((1, 1), dtype=torch.long, device=device)
print(decode(m.generate(context, max_new_tokens=500)[0].tolist()))
#open('more.txt', 'w').write(decode(m.generate(context, max_new_tokens=10000)[0].tolist()))

10.788929 M parameters
step 0: train loss 4.2221, val loss 4.2306
step 500: train loss 1.7600, val loss 1.9146
step 1000: train loss 1.3903, val loss 1.5987
step 1500: train loss 1.2644, val loss 1.5271
step 2000: train loss 1.1835, val loss 1.4978
step 2500: train loss 1.1233, val loss 1.4910
step 3000: train loss 1.0718, val loss 1.4804
step 3500: train loss 1.0179, val loss 1.5127
step 4000: train loss 0.9604, val loss 1.5102
step 4500: train loss 0.9125, val loss 1.5351
step 4999: train loss 0.8589, val loss 1.5565

But with prison, I will steal for the fimker.

KING HENRY VI:
To prevent it, as I love this country's cause.

HENRY BOLINGBROKE:
I thank bhop my follow. Walk ye were so?

NORTHUMBERLAND:
My lord, I hearison! Who may love me accurse
Some chold or flights then men shows to great the cur
Ye cause who fled the trick that did princely action?
Take my captiving sound, althoughts thy crown.

RICHMOND NE:
God neit will he not make it wise this!

DUKE VINCENTIO:
Worthy Prince fo

# My model

In [7]:
from __future__ import annotations
import math, time
from dataclasses import dataclass
from pathlib import Path

import torch
import torch.nn as nn
import torch.nn.functional as F
import tiktoken

# ──────────────────────────────────────────────────────────────────────────────
# Configuration
# ──────────────────────────────────────────────────────────────────────────────
@dataclass
class Config:
    batch_size: int = 16
    block_size: int = 256             # contexte plus long
    max_iters: int = 1_600
    eval_interval: int = 200
    eval_iters: int = 200
    learning_rate: float = 1.5e-4      # LR de base un peu plus bas
    weight_decay: float = 0.25         # régularisation L2 plus forte
    warmup_steps: int = 800            # warm‑up + long
    dropout: float = 0.35
    token_dropout: float = 0.08        # data augmentation légère
    n_embd: int = 256
    n_head: int = 4
    n_layer: int = 3
    clip_norm: float = 1.0
    label_smoothing: float = 0.10
    patience: int = 6                 # early‑stop sur 6 evals

C = Config()

# global RNG + device/dtype
torch.manual_seed(1337)
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
DTYPE  = torch.bfloat16 if DEVICE == "cuda" else torch.float32

# ──────────────────────────────────────────────────────────────────────────────
# Dataset & tokenisation (GPT‑2 BPE via tiktoken)
# ──────────────────────────────────────────────────────────────────────────────
text = Path("input.txt").read_text(encoding="utf-8")
enc  = tiktoken.get_encoding("gpt2")
ids  = torch.tensor(enc.encode(text), dtype=torch.long)
train_data = ids[: int(0.9 * len(ids))]
val_data   = ids[int(0.9 * len(ids)) :]
vocab_size = enc.n_vocab

# ──────────────────────────────────────────────────────────────────────────────
# Batch helpers with token‑dropout
# ──────────────────────────────────────────────────────────────────────────────

def apply_token_dropout(x: torch.Tensor):
    if C.token_dropout <= 0:
        return x
    mask = torch.rand_like(x.float()) < C.token_dropout
    random_tokens = torch.randint_like(x, vocab_size)
    return torch.where(mask, random_tokens, x)


def get_batch(split: str):
    src = train_data if split == "train" else val_data
    ix  = torch.randint(0, len(src) - C.block_size - 1, (C.batch_size,))
    idx = ix.unsqueeze(1) + torch.arange(C.block_size + 1).unsqueeze(0)
    chunk = src[idx]
    x = chunk[:, :-1].to(DEVICE)
    y = chunk[:, 1:].to(DEVICE)
    if split == "train":
        x = apply_token_dropout(x)
    return x, y

# ──────────────────────────────────────────────────────────────────────────────
# Transformer building blocks (RMSNorm, RoPE, SwiGLU, etc.)
# ──────────────────────────────────────────────────────────────────────────────
class RMSNorm(nn.Module):
    def __init__(self, dim: int, eps: float = 1e-8):
        super().__init__(); self.eps = eps; self.scale = nn.Parameter(torch.ones(dim))
    def forward(self, x):
        return self.scale * x * torch.rsqrt((x * x).mean(-1, keepdim=True) + self.eps)

rot_freq = torch.exp(
    -torch.arange(0, C.n_embd // C.n_head, 2, dtype=torch.float32)
    * (math.log(10000.0) / (C.n_embd // C.n_head))
)

def apply_rope(q: torch.Tensor, k: torch.Tensor):
    t = torch.arange(q.size(1), device=q.device).float().unsqueeze(-1)
    angles = t * rot_freq.to(q.device)
    sin, cos = angles.sin(), angles.cos()
    def rotate(x):
        x_even, x_odd = x[..., 0::2], x[..., 1::2]
        out = torch.stack((x_even * cos - x_odd * sin,
                           x_even * sin + x_odd * cos), dim=-1)
        return out.flatten(-2)
    return rotate(q), rotate(k)

class Head(nn.Module):
    def __init__(self, head_size: int):
        super().__init__()
        self.key   = nn.Linear(C.n_embd, head_size, bias=False)
        self.query = nn.Linear(C.n_embd, head_size, bias=False)
        self.value = nn.Linear(C.n_embd, head_size, bias=False)
        self.register_buffer("tril", torch.tril(torch.ones(C.block_size, C.block_size)))
        self.dropout = nn.Dropout(C.dropout)
    def forward(self, x):
        B, T, _ = x.shape
        k = self.key(x); q = self.query(x)
        q, k = apply_rope(q, k)
        wei = q @ k.transpose(-2, -1) * (k.size(-1) ** -0.5)
        wei = wei.masked_fill(self.tril[:T, :T] == 0, float("-inf"))
        wei = F.softmax(wei, dim=-1)
        wei = self.dropout(wei)
        v = self.value(x)
        return wei @ v

class MultiHeadAttention(nn.Module):
    def __init__(self, num_heads: int, head_size: int):
        super().__init__()
        self.heads   = nn.ModuleList([Head(head_size) for _ in range(num_heads)])
        self.proj    = nn.Linear(head_size * num_heads, C.n_embd)
        self.dropout = nn.Dropout(C.dropout)
    def forward(self, x):
        x = torch.cat([h(x) for h in self.heads], dim=-1)
        return self.dropout(self.proj(x))

class FeedForward(nn.Module):
    def __init__(self):
        super().__init__()
        hidden = 4 * C.n_embd
        self.w1 = nn.Linear(C.n_embd, hidden, bias=False)
        self.w2 = nn.Linear(hidden // 2, C.n_embd, bias=False)
        self.dropout = nn.Dropout(C.dropout)
    def forward(self, x):
        a, b = self.w1(x).chunk(2, dim=-1)
        return self.dropout(self.w2(F.silu(a) * b))

class Block(nn.Module):
    def __init__(self):
        super().__init__()
        head_size = C.n_embd // C.n_head
        self.attn  = MultiHeadAttention(C.n_head, head_size)
        self.ffwd  = FeedForward()
        self.norm1 = RMSNorm(C.n_embd)
        self.norm2 = RMSNorm(C.n_embd)
    def forward(self, x):
        x = x + self.attn(self.norm1(x))
        return x + self.ffwd(self.norm2(x))

class GPTLanguageModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.tok_emb = nn.Embedding(vocab_size, C.n_embd)
        self.blocks  = nn.Sequential(*[Block() for _ in range(C.n_layer)])
        self.norm_f  = RMSNorm(C.n_embd)
        self.lm_head = nn.Linear(C.n_embd, vocab_size, bias=False)
        self.apply(self._init_weights)
        self.lm_head.weight = self.tok_emb.weight
    @staticmethod
    def _init_weights(m):
        if isinstance(m, nn.Linear):
            nn.init.normal_(m.weight, 0.0, 0.02 / math.sqrt(C.n_layer))
            if m.bias is not None:
                nn.init.zeros_(m.bias)
        elif isinstance(m, nn.Embedding):
            nn.init.normal_(m.weight, 0.0, 0.02)
    def forward(self, idx, targets=None):
        x = self.tok_emb(idx)
        x = self.blocks(x)
        x = self.norm_f(x)
        logits = self.lm_head(x)
        loss = None
        if targets is not None:
            loss = F.cross_entropy(
                logits.view(-1, vocab_size),
                targets.view(-1),
                label_smoothing=C.label_smoothing,
            )
        return logits, loss
    @torch.no_grad()
    def generate(self, idx, max_new_tokens: int):
        self.eval()
        for _ in range(max_new_tokens):
            logits, _ = self(idx[:, -C.block_size:])
            probs = F.softmax(logits[:, -1, :], dim=-1)
            idx = torch.cat([idx, torch.multinomial(probs, 1)], dim=1)
        return idx

# ──────────────────────────────────────────────────────────────────────────────
# Evaluation helper
# ──────────────────────────────────────────────────────────────────────────────

def evaluate(model: nn.Module):
    model.eval()
    losses = {}
    with torch.no_grad(), torch.amp.autocast(device_type=DEVICE, dtype=DTYPE, enabled=(DEVICE == "cuda")):
        for split in ("train", "val"):
            losses[split] = torch.stack([
                model(*get_batch(split))[1] for _ in range(C.eval_iters)
            ]).mean().item()
    model.train()
    return losses

# ──────────────────────────────────────────────────────────────────────────────
# Training loop
# ──────────────────────────────────────────────────────────────────────────────
model = GPTLanguageModel().to(DEVICE)
print(f"Model size: {sum(p.numel() for p in model.parameters())/1e6:.2f} M params")

optimizer = torch.optim.AdamW(
    model.parameters(), lr=C.learning_rate, weight_decay=C.weight_decay
)

# Cosine with warm‑up
cosine = torch.optim.lr_scheduler.LambdaLR(
    optimizer,
    lr_lambda=lambda step: step / C.warmup_steps if step < C.warmup_steps else 0.5 * (
        1 + math.cos(math.pi * (step - C.warmup_steps) / max(1, C.max_iters - C.warmup_steps))
    ),
)
# LR on plateau (validation)
plateau = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, mode="min", factor=0.5, patience=4, verbose=True
)

scaler = torch.amp.GradScaler(enabled=(DEVICE == "cuda"))

best_val = float("inf")
patience_counter = 0
start = time.time()

for step in range(C.max_iters):
    # ─── evaluation ──────────────────────────────────────────────
    if step % C.eval_interval == 0 or step == C.max_iters - 1:
        losses = evaluate(model)
        val_loss = losses["val"]
        print(f"step {step:5d}: train {losses['train']:.4f} | val {val_loss:.4f}")

        # plateau scheduler & early stop
        plateau.step(val_loss)
        if val_loss < best_val:
            best_val = val_loss
            patience_counter = 0
            torch.save(model.state_dict(), "best_model.pt")
        else:
            patience_counter += 1
            if patience_counter >= C.patience:
                print("Early stopping triggered.")
                break

    # ─── forward / backward ─────────────────────────────────────
    xb, yb = get_batch("train")
    with torch.amp.autocast(device_type=DEVICE, dtype=DTYPE, enabled=(DEVICE == "cuda")):
        logits, loss = model(xb, yb)

    optimizer.zero_grad(set_to_none=True)
    scaler.scale(loss).backward()
    scaler.unscale_(optimizer)
    nn.utils.clip_grad_norm_(model.parameters(), C.clip_norm)
    scaler.step(optimizer)
    scaler.update()
    cosine.step()

    if DEVICE == "cuda":
        torch.cuda.empty_cache()

print(f"Training finished in {(time.time() - start)/60:.1f} min. Best val: {best_val:.4f}")

# ──────────────────────────────────────────────────────────────────────────────
# Sampling from best checkpoint
# ──────────────────────────────────────────────────────────────────────────────
model.load_state_dict(torch.load("best_model.pt", map_location=DEVICE))
model.eval()
context = torch.zeros((1, 1), dtype=torch.long, device=DEVICE)
print(enc.decode(model.generate(context, 500)[0].tolist()))


Model size: 14.83 M params
step     0: train 10.8455 | val 10.8385
step   200: train 8.8850 | val 8.8754
step   400: train 6.8578 | val 6.8964
step   600: train 6.1480 | val 6.2802
step   800: train 5.7008 | val 5.9050
step  1000: train 5.4592 | val 5.7329
step  1200: train 5.3275 | val 5.6145
step  1400: train 5.2653 | val 5.5965
step  1599: train 5.2423 | val 5.5816
Training finished in 11.2 min. Best val: 5.5816
!
Ay, sir, as gentlemen, Edward's Harm;;"
It is taken,ONTo France you good in this rudeons,
Yet shall publications alone looks to see him to commend him
 Nicola to the? O's Wil Cube what she dardverts?

CORIOLANUS:
So, doth pure heart: And shall I are.

ogleULIET:
scill shall have── my p subduits, entertain your would you to,
That came your't.

CLARURENCE:
 assaulting is little doubtful Chomsky such a gentleman to a train,
 [ sons take fertility accident and in det fencedll,
So I do I man can have next:
я, I will Icar and read of our nose 'tis.
Love where sing rate his light