# Cap√≠tulo 06 ‚Äî Fine-tuning para Classifica√ß√£o (GPT vira ‚Äúproduto‚Äù)

Este notebook acompanha o Cap√≠tulo 06 da s√©rie **Fazendo um LLM do Zero**.

Neste notebook n√≥s vamos **reaproveitar o GPTMini** (que foi pr√©-treinado/treinado no Cap√≠tulo 05) e adapt√°-lo para uma tarefa supervisionada de classifica√ß√£o.

üéØ **Objetivos deste notebook:**
- Como carregar pesos do Cap√≠tulo 05 (backbone pr√©-treinado)
- Como adicionar uma **classification head**
- Como comparar **pooling**: *last-token* vs *mean pooling*
- Como comparar **estrat√©gias**: *freeze* (s√≥ head) vs *unfreeze* (fine-tuning completo)
- Como ajustar **learning rates diferentes** para cada estrat√©gia (boa pr√°tica)
- Como avaliar com m√©tricas (accuracy, precision, recall, F1 e confusion matrix)


## 1. Setup e Configura√ß√£o

In [None]:
# ============================================================
# Setup do reposit√≥rio
# ============================================================
import os

REPO_URL = "https://github.com/vongrossi/fazendo-um-llm-do-zero.git"
REPO_DIR = "fazendo-um-llm-do-zero"

if not os.path.exists(REPO_DIR):
    !git clone {REPO_URL}

os.chdir(REPO_DIR)
print("Diret√≥rio atual:", os.getcwd())


### 1.1 Depend√™ncias e Imports

In [None]:
!pip -q install -r 06-fine-tuning/requirements.txt

# Depend√™ncias e GPU opcional
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import random
import matplotlib.pyplot as plt
import sys

# Adiciona raiz ao path para imports locais
sys.path.append(os.getcwd())

device = "cuda" if torch.cuda.is_available() else "cpu"
print("Device:", device)

def seed_everything(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

seed_everything(42)


## 2. Importando o Backbone (GPTMini)

Premissa da s√©rie: **n√£o copiar c√≥digo entre notebooks**.

N√≥s reutilizamos o n√∫cleo do modelo a partir do m√≥dulo:

`lib/gptmini.py`


In [None]:
from lib.gptmini import GPTConfig, GPTMini


## 3. Carregando Pesos do Cap√≠tulo 05

A ideia √© come√ßar este cap√≠tulo com um modelo que **j√° aprendeu padr√µes gerais de linguagem** no Cap√≠tulo 05.

Como checkpoints podem ter nomes diferentes, vamos procurar automaticamente por alguns candidatos comuns no reposit√≥rio.


In [None]:
import glob, os

# Ajuste aqui se voc√™ quiser apontar para um arquivo espec√≠fico
CHECKPOINT_CANDIDATES = [
    "05-pre-treinamento/gpt_checkpoint.pt",
    "05-pre-treinamento/gpt_checkpoint_full.pt",
    "05-pre-treinamento/05_gpt_checkpoint.pt",
    "05-pre-treinamento/05_pretrain_checkpoint.pt",
    "gpt_checkpoint.pt",
]

def find_checkpoint(candidates):
    for p in candidates:
        if os.path.exists(p):
            return p
    # fallback: busca por qualquer .pt dentro do cap 05
    pts = glob.glob("05-pre-treinamento/**/*.pt", recursive=True)
    return pts[0] if pts else None

ckpt_path = find_checkpoint(CHECKPOINT_CANDIDATES)
print("Checkpoint encontrado:", ckpt_path)


### 3.1 Estrat√©gia de Vocabul√°rio

Um checkpoint de LLM geralmente depende do **tamanho do vocabul√°rio** (embedding de tokens) e do `context_size`.

- Se o checkpoint trouxer `config` e `stoi/itos`, n√≥s reaproveitamos.
- Se ele n√£o trouxer, ainda d√° para reaproveitar **parte** dos pesos (por exemplo, blocos Transformer) quando as dimens√µes baterem.

No nosso caso, vamos tentar o melhor caminho primeiro: **reusar o tokenizer do checkpoint**.


In [None]:
def load_checkpoint(path):
    if path is None:
        return None
    obj = torch.load(path, map_location="cpu")
    # pode ser state_dict puro ou dict com metadados
    if isinstance(obj, dict) and "state_dict" in obj:
        return obj
    if isinstance(obj, dict):
        # heur√≠stica: parece um state_dict
        return {"state_dict": obj}
    raise ValueError("Formato de checkpoint n√£o reconhecido")

ckpt = load_checkpoint(ckpt_path)
print("Tem checkpoint?", ckpt is not None)
print("Chaves do checkpoint:", list(ckpt.keys())[:10] if ckpt else None)


## 4. Dataset Supervisionado

Para um fine-tuning fazer sentido, precisamos de um dataset rotulado ‚Äúde verdade‚Äù.

Vamos usar o **SMS Spam Collection** (dataset cl√°ssico, leve e √≥timo para Colab):
- r√≥tulos: `spam` vs `ham`
- tamanho ‚Äúm√©dio‚Äù (alguns milhares de exemplos)
- download simples

Se o download falhar, o notebook cai para um dataset toy (bem pequeno), s√≥ para demonstrar o pipeline.


In [None]:
import urllib.request
import zipfile
import io

def load_sms_spam_dataset():
    # UCI SMS Spam Collection (zip)
    url = "https://archive.ics.uci.edu/ml/machine-learning-databases/00228/smsspamcollection.zip"
    try:
        with urllib.request.urlopen(url, timeout=30) as resp:
            data = resp.read()
        z = zipfile.ZipFile(io.BytesIO(data))
        raw = z.read("SMSSpamCollection").decode("utf-8", errors="replace")
        rows = []
        for line in raw.splitlines():
            if not line.strip():
                continue
            label, text = line.split("\t", 1)
            y = 1 if label.strip().lower() == "spam" else 0
            rows.append((text.strip(), y))
        return rows
    except Exception as e:
        print("‚ö†Ô∏è Falha ao baixar dataset SMS Spam Collection. Erro:", e)
        return None

data = load_sms_spam_dataset()

if data is None:
    # fallback toy (para n√£o quebrar o notebook)
    data = [
        ("ganhe dinheiro r√°pido clique aqui", 1),
        ("promo√ß√£o imperd√≠vel compre agora", 1),
        ("voc√™ foi selecionado para um pr√™mio", 1),
        ("clique no link e resgate seu b√¥nus", 1),
        ("oferta limitada aproveite j√°", 1),
        ("oi tudo bem vamos marcar amanh√£", 0),
        ("segue o relat√≥rio do projeto", 0),
        ("podemos alinhar a reuni√£o √†s 10h", 0),
        ("me chama quando puder", 0),
        ("confirmado, vou te enviar ainda hoje", 0),
    ]

random.shuffle(data)
print("Total exemplos:", len(data))
print("Exemplo:", data[0])


## 5. Tokeniza√ß√£o e Vocabul√°rio

Para conseguir **carregar pesos** do Cap√≠tulo 05 de forma mais fiel, vamos tentar reutilizar:

- `stoi` / `itos` do checkpoint (se existirem)
- `context_size` do checkpoint (se existir)

Caso n√£o exista, constru√≠mos um tokenizer simples por palavras.

> Observa√ß√£o did√°tica: aqui n√£o estamos usando BPE/WordPiece.  
> A ideia √© entender o *fine-tuning* e as decis√µes do pipeline, n√£o otimizar tokeniza√ß√£o.


In [None]:
PAD = "<pad>"
UNK = "<unk>"

def simple_tokenize(text: str):
    # tokeniza√ß√£o word-level did√°tica
    return text.lower().strip().split()

def build_vocab_from_texts(texts, add_pad_unk=True):
    toks = []
    for t in texts:
        toks.extend(simple_tokenize(t))
    vocab = sorted(set(toks))
    stoi = {}
    if add_pad_unk:
        stoi[PAD] = 0
        stoi[UNK] = 1
        offset = 2
    else:
        offset = 0
    for i, tok in enumerate(vocab):
        stoi[tok] = i + offset
    itos = {i:t for t,i in stoi.items()}
    return stoi, itos

texts = [t for t,_ in data]
labels = [y for _,y in data]

# tenta usar tokenizer do checkpoint
ckpt_stoi = ckpt.get("stoi") if ckpt else None
ckpt_itos = ckpt.get("itos") if ckpt else None
ckpt_ctx = ckpt.get("context_size") if ckpt else None
ckpt_cfg = ckpt.get("config") if ckpt else None

if ckpt_stoi and ckpt_itos:
    stoi = ckpt_stoi
    itos = ckpt_itos
    print("‚úÖ Reutilizando stoi/itos do checkpoint (cap 05)")
else:
    stoi, itos = build_vocab_from_texts(texts, add_pad_unk=True)
    print("‚ö†Ô∏è Criando stoi/itos novo (word-level)")

# garante PAD/UNK mesmo se vier do checkpoint
if PAD not in stoi:
    stoi = {PAD:0, **{k:(v+1) for k,v in stoi.items()}}
if UNK not in stoi:
    # coloca UNK como 1 e desloca o resto se necess√°rio
    if 1 in stoi.values():
        # se j√° tem algo em 1, remapeia tudo preservando ordem
        items = sorted(stoi.items(), key=lambda kv: kv[1])
        new = {}
        new[PAD] = 0
        new[UNK] = 1
        cur = 2
        for tok,_id in items:
            if tok in (PAD, UNK):
                continue
            new[tok] = cur
            cur += 1
        stoi = new
    else:
        stoi[UNK] = 1

itos = {i:t for t,i in stoi.items()}
vocab_size = len(stoi)

# context size: do checkpoint se houver; sen√£o padr√£o
context_size = int(ckpt_ctx) if ckpt_ctx else 64

pad_id = stoi[PAD]
unk_id = stoi[UNK]

print("vocab_size:", vocab_size, "| context_size:", context_size, "| pad_id:", pad_id, "| unk_id:", unk_id)


### 5.1 Encoding com Padding/Truncation

Transformamos textos em sequ√™ncias de tamanho fixo (`context_size`) com:

- truncation (corta excesso)
- padding (com `<pad>`)
- OOV cai em `<unk>`

Isso √© suficiente para um cap√≠tulo did√°tico de fine-tuning.


In [None]:
def encode(text: str, context_size: int):
    toks = simple_tokenize(text)
    ids = [stoi.get(tok, unk_id) for tok in toks]
    ids = ids[:context_size]
    if len(ids) < context_size:
        ids = ids + [pad_id] * (context_size - len(ids))
    return ids

X = torch.tensor([encode(t, context_size) for t in texts], dtype=torch.long)
Y = torch.tensor(labels, dtype=torch.long)

print("X shape:", X.shape, "Y shape:", Y.shape)
print("Exemplo tokens:", texts[0])
print("Exemplo ids[:20]:", X[0][:20].tolist())


### 5.2 Split Treino/Val

Usaremos uma separa√ß√£o simples 80/20.


In [None]:
n = len(X)
perm = torch.randperm(n)
train_size = int(0.8 * n)

train_idx = perm[:train_size]
val_idx = perm[train_size:]

X_train, Y_train = X[train_idx].to(device), Y[train_idx].to(device)
X_val, Y_val = X[val_idx].to(device), Y[val_idx].to(device)

print("Train:", X_train.shape[0], "Val:", X_val.shape[0])


## 6. Backbone Pr√©-treinado + Cabe√ßa de Classifica√ß√£o

Vamos instanciar o **GPTMini** e tentar carregar os pesos do Cap√≠tulo 05.

Ponto importante:

- se `vocab_size` e `context_size` n√£o baterem com o checkpoint, embeddings podem n√£o encaixar
- mesmo assim, podemos reaproveitar blocos Transformer quando as dimens√µes coincidirem

A ideia: **aproveitar o m√°ximo poss√≠vel sem quebrar o notebook**.


In [None]:
# Config: se checkpoint trouxe config, usa como base (mas ajusta vocab/context se necess√°rio)
if isinstance(ckpt_cfg, dict):
    cfg_dict = dict(ckpt_cfg)
    cfg_dict["vocab_size"] = vocab_size
    cfg_dict["context_size"] = context_size
    config = GPTConfig(**cfg_dict)
else:
    config = GPTConfig(
        vocab_size=vocab_size,
        context_size=context_size,
        d_model=128,
        n_heads=4,
        n_layers=4,
        dropout=0.1,
    )

print(config)


In [None]:
backbone = GPTMini(config).to(device)

def load_pretrained_into_backbone(backbone, ckpt):
    if not ckpt or "state_dict" not in ckpt:
        print("‚ö†Ô∏è Sem checkpoint de pesos para carregar.")
        return

    sd = ckpt["state_dict"]
    model_sd = backbone.state_dict()

    filtered = {}
    for k, v in sd.items():
        if k in model_sd and tuple(model_sd[k].shape) == tuple(v.shape):
            filtered[k] = v

    backbone.load_state_dict(filtered, strict=False)
    pct = 100.0 * len(filtered) / max(1, len(model_sd))
    print(f"‚úÖ Pesos carregados (shape-match): {len(filtered)}/{len(model_sd)} ({pct:.1f}%)")
    # dicas √∫teis
    if len(filtered) < len(model_sd):
        print("‚ÑπÔ∏è Nem todos os pesos encaixaram (normal se vocab/context mudarem).")

load_pretrained_into_backbone(backbone, ckpt)


### 6.1 Features do Backbone + Pooling

Para classifica√ß√£o, precisamos de uma representa√ß√£o ‚Äúdo texto todo‚Äù.

Vamos comparar duas estrat√©gias:

1) **Last-token pooling**: usa o vetor do √∫ltimo token da sequ√™ncia  
2) **Mean pooling**: m√©dia dos vetores dos tokens **n√£o-PAD**

Mean pooling costuma ser mais robusto quando a sequ√™ncia tem muitos PADs.


In [None]:
class GPTMiniFeatures(nn.Module):
    def __init__(self, gptmini: GPTMini):
        super().__init__()
        self.gpt = gptmini

    def forward(self, idx):
        x = self.gpt.emb(idx)
        x = self.gpt.blocks(x)
        x = self.gpt.ln_f(x)
        return x  # (B, T, C)

def mean_pool(feats, idx, pad_id):
    # feats: (B,T,C), idx: (B,T)
    mask = (idx != pad_id).float().unsqueeze(-1)  # (B,T,1)
    summed = (feats * mask).sum(dim=1)            # (B,C)
    denom = mask.sum(dim=1).clamp(min=1.0)        # (B,1)
    return summed / denom

class GPTClassifier(nn.Module):
    def __init__(self, gpt_features: GPTMiniFeatures, d_model: int, num_classes=2, pooling="last"):
        super().__init__()
        assert pooling in ("last", "mean")
        self.gpt_features = gpt_features
        self.classifier = nn.Linear(d_model, num_classes)
        self.pooling = pooling

    def forward(self, idx, labels=None):
        feats = self.gpt_features(idx)  # (B,T,C)

        if self.pooling == "last":
            pooled = feats[:, -1, :]
        else:
            pooled = mean_pool(feats, idx, pad_id)

        logits = self.classifier(pooled)
        loss = None
        if labels is not None:
            loss = F.cross_entropy(logits, labels)
        return logits, loss


## 7. M√©tricas (Sem Sklearn)

Vamos implementar m√©tricas b√°sicas de classifica√ß√£o bin√°ria:

- accuracy
- precision
- recall
- F1
- confusion matrix

Isso ajuda a ‚Äúler o modelo‚Äù para al√©m de simplesmente ‚Äúacertou/errou‚Äù.


In [None]:
@torch.no_grad()
def confusion_matrix_binary(y_true, y_pred):
    tp = int(((y_true == 1) & (y_pred == 1)).sum().item())
    tn = int(((y_true == 0) & (y_pred == 0)).sum().item())
    fp = int(((y_true == 0) & (y_pred == 1)).sum().item())
    fn = int(((y_true == 1) & (y_pred == 0)).sum().item())
    return tp, fp, fn, tn

@torch.no_grad()
def metrics_from_confusion(tp, fp, fn, tn):
    acc = (tp + tn) / max(1, (tp + tn + fp + fn))
    prec = tp / max(1, (tp + fp))
    rec = tp / max(1, (tp + fn))
    f1 = (2 * prec * rec) / max(1e-12, (prec + rec))
    return acc, prec, rec, f1

@torch.no_grad()
def evaluate(model, X, Y):
    model.eval()
    logits, _ = model(X, labels=None)
    preds = torch.argmax(logits, dim=-1)

    tp, fp, fn, tn = confusion_matrix_binary(Y, preds)
    acc, prec, rec, f1 = metrics_from_confusion(tp, fp, fn, tn)

    return {"acc": acc, "prec": prec, "rec": rec, "f1": f1, "tp": tp, "fp": fp, "fn": fn, "tn": tn}


## 8. Experimentos

Vamos rodar **4 experimentos** para comparar:

- Pooling: `last` vs `mean`
- Estrat√©gia: `freeze` vs `unfreeze`

E vamos usar **learning rates diferentes** (boa pr√°tica):

- Freeze (s√≥ head): LR maior (ex.: 2e-3)  
- Unfreeze (modelo todo): LR menor (ex.: 5e-4)  

Motivo: quando muitos par√¢metros est√£o trein√°veis, LR alto pode destruir rapidamente o conhecimento pr√©vio.


In [None]:
def set_trainable(module: nn.Module, trainable: bool):
    for p in module.parameters():
        p.requires_grad = trainable

def train_classifier(model, X_train, Y_train, X_val, Y_val, steps=400, batch_size=32, lr=1e-3, eval_every=100):
    optimizer = torch.optim.AdamW(filter(lambda p: p.requires_grad, model.parameters()), lr=lr)

    train_loss_hist = []
    val_hist = []

    model.train()
    for step in range(steps):
        idx = torch.randint(0, X_train.size(0), (batch_size,), device=device)
        xb = X_train[idx]
        yb = Y_train[idx]

        _, loss = model(xb, labels=yb)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        train_loss_hist.append(loss.item())

        if step % eval_every == 0:
            stats = evaluate(model, X_val, Y_val)
            val_hist.append((step, stats))
            print(f"step {step:04d} | loss {loss.item():.4f} | val_acc {stats['acc']:.3f} | f1 {stats['f1']:.3f}")

    return train_loss_hist, val_hist


In [None]:
def run_experiment(pooling: str, strategy: str, lr: float, steps=400):
    seed_everything(42)

    # Novo backbone para cada experimento (mesmo ponto de partida)
    bb = GPTMini(config).to(device)
    load_pretrained_into_backbone(bb, ckpt)

    feats = GPTMiniFeatures(bb).to(device)
    clf = GPTClassifier(feats, d_model=config.d_model, num_classes=2, pooling=pooling).to(device)

    if strategy == "freeze":
        set_trainable(clf.gpt_features, False)
        set_trainable(clf.classifier, True)
    elif strategy == "unfreeze":
        set_trainable(clf.gpt_features, True)
        set_trainable(clf.classifier, True)
    else:
        raise ValueError("strategy deve ser freeze ou unfreeze")

    trainable = sum(p.requires_grad for p in clf.parameters())
    total = sum(1 for _ in clf.parameters())
    print(f"\n=== Experimento | pooling={pooling} | strategy={strategy} | lr={lr} ===")
    print(f"Par√¢metros trein√°veis: {trainable}/{total}")

    loss_hist, val_hist = train_classifier(
        clf, X_train, Y_train, X_val, Y_val,
        steps=steps,
        batch_size=64 if X_train.size(0) > 256 else 16,
        lr=lr,
        eval_every=max(50, steps//4),
    )

    final_stats = evaluate(clf, X_val, Y_val)
    return clf, loss_hist, val_hist, final_stats


In [None]:
# LRs recomendados (did√°ticos)
LR_FREEZE = 2e-3
LR_UNFREEZE = 5e-4

STEPS = 400 if X_train.size(0) > 200 else 300

results = {}

for pooling in ["last", "mean"]:
    # freeze
    clf_f, loss_f, val_f, stats_f = run_experiment(pooling, "freeze", lr=LR_FREEZE, steps=STEPS)
    results[(pooling, "freeze")] = {"model": clf_f, "loss": loss_f, "val": val_f, "stats": stats_f}

    # unfreeze
    clf_u, loss_u, val_u, stats_u = run_experiment(pooling, "unfreeze", lr=LR_UNFREEZE, steps=STEPS)
    results[(pooling, "unfreeze")] = {"model": clf_u, "loss": loss_u, "val": val_u, "stats": stats_u}


### 8.1 Comparando Resultados

Vamos ver as m√©tricas finais de cada experimento e plotar as losses.

Em datasets pequenos, resultados podem variar, mas a compara√ß√£o conceitual √© o mais importante:
- *mean pooling* tende a ser melhor quando h√° muito padding
- *freeze* tende a treinar r√°pido mas tem teto de performance
- *unfreeze* costuma melhorar mais, mas precisa LR menor


In [None]:
def print_confusion(stats):
    tp, fp, fn, tn = stats["tp"], stats["fp"], stats["fn"], stats["tn"]
    print("Confusion Matrix (bin√°ria)")
    print(f"         Pred 0   Pred 1")
    print(f"True 0 |   {tn:4d}   {fp:4d}")
    print(f"True 1 |   {fn:4d}   {tp:4d}")

for key, obj in results.items():
    pooling, strategy = key
    stats = obj["stats"]
    print(f"\n=== {pooling.upper()} + {strategy.upper()} ===")
    print({k: round(v, 4) if isinstance(v, float) else v for k,v in stats.items()})
    print_confusion(stats)


In [None]:
# Plot das losses
plt.figure(figsize=(10,5))
for key, obj in results.items():
    pooling, strategy = key
    plt.plot(obj["loss"], label=f"{pooling}-{strategy}")
plt.title("Training Loss ‚Äî Compara√ß√£o")
plt.xlabel("steps")
plt.ylabel("loss")
plt.legend()
plt.show()


## 9. Infer√™ncia (Testar Textos Novos)

Vamos pegar o melhor modelo (por F1 na valida√ß√£o) e testar com exemplos.


In [None]:
best_key = max(results.keys(), key=lambda k: results[k]["stats"]["f1"])
best_model = results[best_key]["model"]
print("Melhor modelo (val F1):", best_key, "->", results[best_key]["stats"])


In [None]:
@torch.no_grad()
def predict(text, model):
    model.eval()
    x = torch.tensor([encode(text, context_size)], dtype=torch.long, device=device)
    logits, _ = model(x, labels=None)
    probs = F.softmax(logits, dim=-1).squeeze(0)
    pred = int(torch.argmax(probs).item())
    return pred, probs.detach().cpu().numpy()

tests = [
    "Congratulations! You won a prize, click now",
    "Please review the report before the meeting",
    "FREE entry in a weekly draw, claim your reward",
    "Can you call me tomorrow morning?",
]

for t in tests:
    pred, probs = predict(t, best_model)
    label = "SPAM" if pred == 1 else "HAM"
    print(f"\nTexto: {t}\nPred: {label} | probs={probs}")


## 10. Salvando Checkpoint do Classificador

Vamos salvar:
- pesos do classificador (backbone + head)
- config do modelo
- stoi/itos e context_size

Isso permite abrir o notebook depois e reproduzir infer√™ncia.


In [None]:
import time

ckpt_out = {
    "config": config.__dict__,
    "state_dict": best_model.state_dict(),
    "stoi": stoi,
    "itos": itos,
    "context_size": context_size,
    "pooling": best_key[0],
    "strategy": best_key[1],
    "created_at": time.strftime("%Y-%m-%d %H:%M:%S"),
}

out_path = "06-fine-tuning-classificacao/06_gpt_classifier.pt"
os.makedirs(os.path.dirname(out_path), exist_ok=True)
torch.save(ckpt_out, out_path)
print("Salvo em:", out_path)


## 11. Conclus√£o

Voc√™ acabou de:

- carregar pesos do Cap√≠tulo 05 (transfer√™ncia de conhecimento)
- transformar um GPT em classificador (classification head)
- comparar pooling (last vs mean)
- comparar estrat√©gias (freeze vs unfreeze) com LRs diferentes
- avaliar com m√©tricas e confusion matrix
- salvar um checkpoint reproduz√≠vel

Isso √© a base pr√°tica de como LLMs viram ‚Äúfeatures‚Äù para resolver problemas reais.
