# Transformer desde cero para Traducción Náhuatl (ncx) ↔ Español (es)

**Autor:** preparado para Samuel Pérez Zistecatl  
**Stack:** PyTorch puro (modelo desde cero) + baseline opcional **BERT2BERT** (mBERT, HuggingFace)  

Este notebook cubre: preprocesamiento (SentencePiece Unigram, vocab compartido), dataset, modelo Transformer *from scratch* con **Pre-Norm**, *Noam* LR, **label smoothing**, métricas **sacreBLEU**/**chrF++**, *beam search* y **UI con Gradio** para inferencia.  

> ⚠️ **Uso responsable de datos**: Este notebook asume que ya construiste el corpus `parallel_ncx_es.jsonl` desde fuentes permitidas (p. ej., JW.org) y que respetas sus términos de uso.  


## 0) Dependencias
Ejecuta esta celda si necesitas instalar paquetes en tu entorno local.
```bash
%pip install sentencepiece sacrebleu gradio tqdm pyyaml
# Opcional (baseline BERT2BERT / mBART):
%pip install transformers accelerate datasets
# Opcional (Windows con GPU AMD via DirectML):
%pip install torch-directml
# Opcional (spaCy español para segmentar oraciones mejor):
# %pip install spacy && python -m spacy download es_core_news_sm
```


In [None]:
# 1) Rutas, semillas y splits
from pathlib import Path
import os, random, numpy as np, torch

# Carpeta de trabajo / salida (Windows: usa raw string o /)
BASE_DIR = Path(r"C:\Users\Samuel Perez\Desktop\articulo")
BASE_DIR.mkdir(parents=True, exist_ok=True)

DATA_DIR = BASE_DIR / "salida"              # aquí debe estar 'parallel_ncx_es.jsonl'
CHECK_DIR = BASE_DIR / "checkpoints"        # pesos del modelo
TOK_DIR = BASE_DIR / "spm"                  # modelos de SentencePiece
LOG_DIR = BASE_DIR / "logs"

for p in [DATA_DIR, CHECK_DIR, TOK_DIR, LOG_DIR]:
    p.mkdir(parents=True, exist_ok=True)

# Archivo paralelo
PARALLEL_JSONL = DATA_DIR / "parallel_ncx_es.jsonl"
assert PARALLEL_JSONL.exists(), f"No se encontró {PARALLEL_JSONL}. Ajusta DATA_DIR."

# Semillas y splits
SEED = 42
random.seed(SEED); np.random.seed(SEED); torch.manual_seed(SEED)
SPLIT_TRAIN = 0.8
SPLIT_DEV   = 0.1   # test será 1 - train - dev
MAX_SAMPLES = 0     # 0 = usar todos; usa p.ej. 20000 para prueba rápida

# Longitud máxima de secuencia
MAX_LEN = 128


In [None]:
# 2) Dispositivo: CPU por defecto; DirectML opcional si está instalado (GPU AMD).
import torch
DEVICE = torch.device("cpu")
try:
    import torch_directml
    DEVICE = torch_directml.device()
    print("Usando DirectML (GPU AMD) si está disponible.")
except Exception as e:
    print("DirectML no disponible; usando CPU.\n", str(e))

print("DEVICE =", DEVICE)


In [None]:
# 3) Carga de datos y segmentación por oraciones
import json, re
from typing import List

def sent_split_es(text: str) -> List[str]:
    """Segmentación básica para español; intenta spaCy si está instalado."""
    try:
        import spacy
        try:
            nlp = spacy.load("es_core_news_sm")
        except Exception:
            nlp = spacy.blank("es")
            nlp.add_pipe("sentencizer")
        return [s.text.strip() for s in nlp(text).sents if s.text.strip()]
    except Exception:
        # Fallback regex simple
        parts = re.split(r"(?<=[\.\?\!¡¿])\s+", text.strip())
        return [p.strip() for p in parts if p.strip()]

def sent_split_ncx(text: str) -> List[str]:
    """Segmentación aproximada para náhuatl (reglas por puntuación)."""
    parts = re.split(r"(?<=[\.\?\!])\s+", text.strip())
    return [p.strip() for p in parts if p.strip()]

# Cargar pares del JSONL
pairs = []  # (src, tgt, libro, cap, ver)
with open(PARALLEL_JSONL, "r", encoding="utf-8") as f:
    for line in f:
        obj = json.loads(line)
        src = obj["src"].strip()
        tgt = obj["tgt"].strip()
        if not src or not tgt:
            continue
        pairs.append((src, tgt, obj.get("libro",""), obj.get("capitulo",0), obj.get("versiculo",0)))

print(f"Total pares (verso a verso) cargados: {len(pairs):,}")

# Expandir a nivel oración cuando ambos lados tienen igual # de oraciones; si no, conservar verso.
expanded = []
for src, tgt, libro, cap, ver in pairs:
    s_src = sent_split_ncx(src.lower())
    s_tgt = sent_split_es(tgt.lower())
    if len(s_src) == len(s_tgt) and 1 < len(s_src) < 10:
        for i in range(len(s_src)):
            expanded.append((s_src[i], s_tgt[i], libro, cap, f"{ver}.{i+1}"))
    else:
        expanded.append((src.lower(), tgt.lower(), libro, cap, ver))

if MAX_SAMPLES and MAX_SAMPLES > 0:
    expanded = expanded[:MAX_SAMPLES]

print(f"Pares después de segmentación-oración (ncx→es) potenciales: {len(expanded):,}")


In [None]:
# 4) Crear splits train/dev/test (misma partición para ambas direcciones)
from math import floor
idx = list(range(len(expanded)))
random.shuffle(idx)

n_train = floor(len(idx)*SPLIT_TRAIN)
n_dev   = floor(len(idx)*SPLIT_DEV)
n_test  = len(idx) - n_train - n_dev

def take(idxs): return [expanded[i] for i in idxs]

train_pairs = take(idx[:n_train])
dev_pairs   = take(idx[n_train:n_train+n_dev])
test_pairs  = take(idx[n_train+n_dev:])

print(f"Train: {len(train_pairs):,} | Dev: {len(dev_pairs):,} | Test: {len(test_pairs):,}")


In [None]:
# 5) Entrenar SentencePiece (Unigram, vocab compartido)
import sentencepiece as spm

# Archivos temporales para entrenamiento
raw_corpus_path = TOK_DIR / "spm_raw.txt"
with open(raw_corpus_path, "w", encoding="utf-8") as w:
    for s, t, *_ in train_pairs + dev_pairs:
        w.write(s + "\n")
        w.write(t + "\n")

VOCAB_SIZE = 10000  # dentro del rango 8k–12k
SPM_MODEL_PREFIX = str((TOK_DIR / "ncx_es_unigram").as_posix())

spm.SentencePieceTrainer.Train(
    input=str(raw_corpus_path),
    model_prefix=SPM_MODEL_PREFIX,
    vocab_size=VOCAB_SIZE,
    model_type="unigram",
    user_defined_symbols=["<pad>","<bos>","<eos>","<lang_ncx>","<lang_es>"],
    character_coverage=1.0,
    input_sentence_size=1000000,
    shuffle_input_sentence=True
)

SPM_MODEL = TOK_DIR / "ncx_es_unigram.model"
SPM_VOCAB = TOK_DIR / "ncx_es_unigram.vocab"
assert SPM_MODEL.exists(), "No se generó el modelo SentencePiece."
print("SPM listo:", SPM_MODEL)


In [None]:
# 6) Tokenización utilidades
sp = spm.SentencePieceProcessor(model_file=str(SPM_MODEL))

PAD_ID   = sp.piece_to_id("<pad>")
BOS_ID   = sp.piece_to_id("<bos>")
EOS_ID   = sp.piece_to_id("<eos>")
LNCX_ID  = sp.piece_to_id("<lang_ncx>")
LES_ID   = sp.piece_to_id("<lang_es>")
VOCAB    = sp.get_piece_size()

def encode_with_lang(text, lang_tok_id):
    ids = sp.encode(text, out_type=int)
    ids = [BOS_ID, lang_tok_id] + ids + [EOS_ID]
    return ids

def collate_batch(batch, pad_id=PAD_ID):
    # batch: list of (src_ids, tgt_ids)
    src_lens = [len(b[0]) for b in batch]
    tgt_lens = [len(b[1]) for b in batch]
    max_src = min(max(src_lens), MAX_LEN)
    max_tgt = min(max(tgt_lens), MAX_LEN)

    def pad_seq(seq, max_len):
        seq = seq[:max_len]
        return seq + [pad_id]*(max_len - len(seq))

    import torch
    src = torch.tensor([pad_seq(b[0], max_src) for b in batch], dtype=torch.long)
    tgt = torch.tensor([pad_seq(b[1], max_tgt) for b in batch], dtype=torch.long)
    return src, tgt


In [None]:
# 7) Dataset y bucketing simple
from torch.utils.data import Dataset, DataLoader
import random

class ParallelDataset(Dataset):
    def __init__(self, pairs, direction="ncx2es"):
        self.items = []
        for s, t, *_ in pairs:
            if direction == "ncx2es":
                src_ids = encode_with_lang(s, LNCX_ID)
                tgt_ids = encode_with_lang(t, LES_ID)
            else:
                src_ids = encode_with_lang(t, LES_ID)
                tgt_ids = encode_with_lang(s, LNCX_ID)
            self.items.append((src_ids, tgt_ids))

    def __len__(self): return len(self.items)
    def __getitem__(self, i): return self.items[i]

def make_loader(pairs, direction, batch_size=32, shuffle=True):
    ds = ParallelDataset(pairs, direction=direction)
    # Bucketing: ordenar por longitud (src) para reducir padding
    order = sorted(range(len(ds)), key=lambda i: len(ds.items[i][0]))
    if shuffle:
        # dividir en cubetas y mezclar entre cubetas
        B = 50
        buckets = [order[i::B] for i in range(B)]
        order = [i for b in buckets for i in random.sample(b, len(b))]
    class _Proxy(Dataset):
        def __len__(self): return len(order)
        def __getitem__(self, j): return ds.items[order[j]]
    return DataLoader(_Proxy(), batch_size=batch_size, collate_fn=collate_batch)


In [None]:
# 8) Transformer desde cero (Pre-Norm)
import math
import torch.nn as nn
import torch

D_MODEL = 512; N_HEADS = 8; DIM_FF  = 2048
N_LAYERS_ENC = 6; N_LAYERS_DEC = 6; DROPOUT = 0.1

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=2048):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe.unsqueeze(0))  # (1, L, D)
    def forward(self, x):
        L = x.size(1)
        return x + self.pe[:, :L]

class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, n_heads, dropout=0.1):
        super().__init__()
        assert d_model % n_heads == 0
        self.d_k = d_model // n_heads
        self.n = n_heads
        self.q_proj = nn.Linear(d_model, d_model)
        self.k_proj = nn.Linear(d_model, d_model)
        self.v_proj = nn.Linear(d_model, d_model)
        self.o_proj = nn.Linear(d_model, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, q, k, v, attn_mask=None, key_padding_mask=None):
        B, Lq, D = q.shape
        B, Lk, Dk = k.shape
        q = self.q_proj(q).view(B, Lq, self.n, self.d_k).transpose(1,2)  # (B, h, Lq, d_k)
        k = self.k_proj(k).view(B, Lk, self.n, self.d_k).transpose(1,2)
        v = self.v_proj(v).view(B, Lk, self.n, self.d_k).transpose(1,2)
        scores = torch.matmul(q, k.transpose(-2,-1)) / math.sqrt(self.d_k)  # (B,h,Lq,Lk)

        if attn_mask is not None:
            if attn_mask.dim() == 2:
                scores = scores + attn_mask.unsqueeze(0).unsqueeze(0)
            elif attn_mask.dim() == 4:
                scores = scores + attn_mask

        if key_padding_mask is not None:
            mask = key_padding_mask.unsqueeze(1).unsqueeze(2)  # (B,1,1,Lk)
            scores = scores.masked_fill(mask, float('-inf'))

        attn = torch.softmax(scores, dim=-1)
        attn = self.dropout(attn)
        out = torch.matmul(attn, v)  # (B,h,Lq,d_k)
        out = out.transpose(1,2).contiguous().view(B, Lq, D)
        return self.o_proj(out)

class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff, dropout=0.1):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(d_ff, d_model),
        )
    def forward(self, x): return self.net(x)

class EncoderLayer(nn.Module):
    def __init__(self, d_model, n_heads, d_ff, dropout=0.1):
        super().__init__()
        self.norm1 = nn.LayerNorm(d_model)
        self.attn = MultiHeadAttention(d_model, n_heads, dropout)
        self.drop1 = nn.Dropout(dropout)
        self.norm2 = nn.LayerNorm(d_model)
        self.ff = FeedForward(d_model, d_ff, dropout)
        self.drop2 = nn.Dropout(dropout)
    def forward(self, x, src_pad_mask):
        y = self.attn(self.norm1(x), self.norm1(x), self.norm1(x), key_padding_mask=src_pad_mask)
        x = x + self.drop1(y)
        y = self.ff(self.norm2(x))
        x = x + self.drop2(y)
        return x

class DecoderLayer(nn.Module):
    def __init__(self, d_model, n_heads, d_ff, dropout=0.1):
        super().__init__()
        self.norm1 = nn.LayerNorm(d_model)
        self.self_attn = MultiHeadAttention(d_model, n_heads, dropout)
        self.drop1 = nn.Dropout(dropout)

        self.norm2 = nn.LayerNorm(d_model)
        self.cross_attn = MultiHeadAttention(d_model, n_heads, dropout)
        self.drop2 = nn.Dropout(dropout)

        self.norm3 = nn.LayerNorm(d_model)
        self.ff = FeedForward(d_model, d_ff, dropout)
        self.drop3 = nn.Dropout(dropout)

    def forward(self, x, mem, tgt_pad_mask, tgt_causal_mask, mem_pad_mask):
        y = self.self_attn(self.norm1(x), self.norm1(x), self.norm1(x),
                           attn_mask=tgt_causal_mask, key_padding_mask=tgt_pad_mask)
        x = x + self.drop1(y)
        y = self.cross_attn(self.norm2(x), mem, mem, key_padding_mask=mem_pad_mask)
        x = x + self.drop2(y)
        y = self.ff(self.norm3(x))
        x = x + self.drop3(y)
        return x

class TransformerModel(nn.Module):
    def __init__(self, vocab_size, d_model, n_heads, d_ff, n_enc, n_dec, dropout=0.1, pad_id=0):
        super().__init__()
        self.pad_id = pad_id
        self.emb = nn.Embedding(vocab_size, d_model, padding_idx=pad_id)
        self.pos = PositionalEncoding(d_model)
        self.encoder = nn.ModuleList([EncoderLayer(d_model, n_heads, d_ff, dropout) for _ in range(n_enc)])
        self.decoder = nn.ModuleList([DecoderLayer(d_model, n_heads, d_ff, dropout) for _ in range(n_dec)])
        self.proj = nn.Linear(d_model, vocab_size)

    def make_pad_mask(self, seq):
        return seq.eq(self.pad_id)  # (B, L)

    def make_causal_mask(self, L):
        mask = torch.triu(torch.ones(L, L, device=self.emb.weight.device), diagonal=1)
        mask = mask.masked_fill(mask==1, float('-inf'))
        return mask  # (L, L)

    def encode(self, src):
        src_pad = self.make_pad_mask(src)
        x = self.pos(self.emb(src))
        for layer in self.encoder:
            x = layer(x, src_pad)
        return x, src_pad

    def decode(self, tgt, mem, mem_pad):
        tgt_pad = self.make_pad_mask(tgt)
        x = self.pos(self.emb(tgt))
        causal = self.make_causal_mask(tgt.size(1)).unsqueeze(0).unsqueeze(0)  # broadcast
        for layer in self.decoder:
            x = layer(x, mem, tgt_pad, causal, mem_pad)
        return self.proj(x)

    def forward(self, src, tgt_in):
        mem, src_pad = self.encode(src)
        logits = self.decode(tgt_in, mem, src_pad)
        return logits


In [None]:
# 9) Criterio (Label Smoothing) y Noam Scheduler
import torch.nn as nn, torch
class LabelSmoothingLoss(nn.Module):
    def __init__(self, classes, smoothing=0.1, ignore_index=0):
        super().__init__()
        self.ignore_index = ignore_index
        self.confidence = 1.0 - smoothing
        self.smoothing = smoothing
        self.cls = classes

    def forward(self, pred, target):
        pred = pred.view(-1, pred.size(-1))
        target = target.reshape(-1)
        log_probs = torch.log_softmax(pred, dim=-1)
        nll = -log_probs.gather(dim=-1, index=target.unsqueeze(1)).squeeze(1)
        smooth = -log_probs.mean(dim=-1)
        pad_mask = target.eq(self.ignore_index)
        loss = self.confidence * nll + self.smoothing * smooth
        return (loss.masked_fill(pad_mask, 0).sum() / torch.clamp((~pad_mask).sum(), min=1))

class NoamWrapper:
    def __init__(self, optimizer, d_model, warmup=4000):
        self.opt = optimizer
        self.d_model = d_model
        self.warm = warmup
        self.step_num = 0
    def step(self):
        self.step_num += 1
        lr = (self.d_model ** -0.5) * min(self.step_num ** -0.5, self.step_num * (self.warm ** -1.5))
        for pg in self.opt.param_groups:
            pg['lr'] = lr
        self.opt.step()
    def zero_grad(self): self.opt.zero_grad()
    @property
    def lr(self): return self.opt.param_groups[0]['lr']

# Métricas (sacreBLEU / chrF++)
try:
    from sacrebleu.metrics import BLEU, CHRF
    bleu = BLEU(force=True)
    chrf = CHRF(word_order=2)
except Exception as e:
    print("sacrebleu no disponible:", e)
    bleu = chrf = None


In [None]:
# 10) Entrenamiento
import torch

def batch_to_device(b, device):
    return b[0].to(device), b[1].to(device)

def evaluate(model, loader, device):
    model.eval()
    total = 0.0; n = 0
    with torch.no_grad():
        for src, tgt in loader:
            src, tgt = batch_to_device((src,tgt), device)
            logits = model(src, tgt[:, :-1])
            crit = LabelSmoothingLoss(VOCAB, 0.1, PAD_ID)
            loss = crit(logits, tgt[:,1:])
            total += loss.item(); n += 1
    return total / max(n,1)

def ids_to_text(ids):
    ids = [i for i in ids if i not in (PAD_ID, BOS_ID)]
    if ids and ids[-1] == EOS_ID: ids = ids[:-1]
    return sp.decode(ids)

def translate_greedy(model, src_ids, max_len=MAX_LEN):
    model.eval()
    src = torch.tensor([src_ids], dtype=torch.long, device=model.emb.weight.device)
    mem, src_pad = model.encode(src)
    ys = torch.tensor([[BOS_ID, LES_ID]], dtype=torch.long, device=src.device)  # por defecto ncx->es
    for _ in range(max_len):
        logits = model.decode(ys, mem, src_pad)
        nxt = logits[:,-1,:].argmax(dim=-1, keepdim=True)
        ys = torch.cat([ys, nxt], dim=1)
        if nxt.item() == EOS_ID: break
    return ys[0].tolist()

def train_direction(direction="ncx2es",
                    epochs=3, batch_size=32, grad_accum=1,
                    d_model=512, n_heads=8, d_ff=2048, n_enc=6, n_dec=6,
                    warmup=4000, save_prefix="scratch_ncx2es"):
    print(f"\n=== Entrenando dirección: {direction} ===")
    train_loader = make_loader(train_pairs, direction, batch_size=batch_size, shuffle=True)
    dev_loader   = make_loader(dev_pairs,   direction, batch_size=batch_size, shuffle=False)

    model = TransformerModel(VOCAB, d_model, n_heads, d_ff, n_enc, n_dec, DROPOUT, PAD_ID)
    model.to(DEVICE)

    opt = torch.optim.Adam(model.parameters(), betas=(0.9,0.98), eps=1e-9)
    noam = NoamWrapper(opt, d_model, warmup=warmup)
    crit = LabelSmoothingLoss(VOCAB, 0.1, PAD_ID)

    best_dev = 1e9; best_path = None
    for ep in range(1, epochs+1):
        model.train()
        total = 0.0; n = 0
        opt.zero_grad()
        for i, (src, tgt) in enumerate(train_loader, 1):
            src, tgt = batch_to_device((src,tgt), DEVICE)
            logits = model(src, tgt[:, :-1])
            loss = crit(logits, tgt[:, 1:]) / grad_accum
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            if i % grad_accum == 0:
                noam.step(); noam.zero_grad()
            total += loss.item()*grad_accum; n += 1
            if i % 100 == 0:
                print(f"ep{ep} step{i} lr={noam.lr:.6f} loss={total/max(n,1):.4f}")

        dev_loss = evaluate(model, dev_loader, DEVICE)
        print(f"[Ep {ep}] dev_loss={dev_loss:.4f}")
        if dev_loss < best_dev:
            best_dev = dev_loss
            best_path = (CHECK_DIR / f"{save_prefix}_best.pt").as_posix()
            torch.save({"model":model.state_dict(),
                        "cfg":{"d_model":d_model,"n_heads":n_heads,"d_ff":d_ff,
                               "n_enc":n_enc,"n_dec":n_dec,"pad_id":PAD_ID,"vocab":VOCAB}},
                       best_path)
            print("Guardado mejor modelo en", best_path)

    return best_path

CFG_SMALL = dict(d_model=512, n_heads=8, d_ff=2048, n_enc=6, n_dec=6)
CFG_LIGHT = dict(d_model=256, n_heads=4, d_ff=1024, n_enc=4, n_dec=4)


In [None]:
# 11) Entrenar ambos sentidos (ajusta epochs según tu tiempo)
EPOCHS = 2          # Recomendado START (CPU/DirectML). Sube a 6–10 si tienes paciencia.
BATCH  = 32
ACCUM  = 1
WARMUP = 4000

best_ncx2es = train_direction("ncx2es", epochs=EPOCHS, batch_size=BATCH, grad_accum=ACCUM, warmup=WARMUP, save_prefix="scratch_ncx2es", **CFG_SMALL)
best_es2ncx = train_direction("es2ncx", epochs=EPOCHS, batch_size=BATCH, grad_accum=ACCUM, warmup=WARMUP, save_prefix="scratch_es2ncx", **CFG_SMALL)

print("Mejores checkpoints:")
print(" ncx→es:", best_ncx2es)
print(" es→ncx:", best_es2ncx)


In [None]:
# 12) Evaluación en test con greedy (rápido) y métricas sacreBLEU/chrF
from pathlib import Path

def load_model(path):
    import torch
    data = torch.load(path, map_location="cpu")
    cfg = data["cfg"]
    model = TransformerModel(cfg["vocab"], cfg["d_model"], cfg["n_heads"], cfg["d_ff"],
                             cfg["n_enc"], cfg["n_dec"], pad_id=cfg["pad_id"])
    model.load_state_dict(data["model"])
    model.to(DEVICE)
    model.eval()
    return model

def ids_to_text(ids):
    ids = [i for i in ids if i not in (PAD_ID, BOS_ID)]
    if ids and ids[-1] == EOS_ID: ids = ids[:-1]
    return sp.decode(ids)

def eval_direction(best_path, direction="ncx2es", max_samples=200):
    if best_path is None or not Path(best_path).exists():
        print("Checkpoint no encontrado:", best_path); return
    model = load_model(best_path)
    ds = ParallelDataset(test_pairs, direction=direction)
    refs = []; hyps = []
    for i in range(min(len(ds), max_samples)):
        src_ids, tgt_ids = ds[i]
        out_ids = translate_greedy(model, src_ids, max_len=MAX_LEN)
        refs.append([ids_to_text(tgt_ids)])
        hyps.append(ids_to_text(out_ids))
    try:
        from sacrebleu.metrics import BLEU, CHRF
        print(direction, "BLEU:", BLEU(force=True).corpus_score(hyps, list(zip(*refs))))
        print(direction, "chrF++:", CHRF(word_order=2).corpus_score(hyps, list(zip(*refs))))
    except Exception as e:
        print("sacrebleu no disponible:", e)
        for k in range(min(5, len(hyps))):
            print("SRC:", ids_to_text(ds[k][0]))
            print("HYP:", hyps[k])
            print("REF:", refs[k][0][0]); print()

eval_direction(best_ncx2es, "ncx2es")
eval_direction(best_es2ncx, "es2ncx")


In [None]:
# 13) Beam search y UI con Gradio
import gradio as gr
import torch

def translate_beam(model, src_ids, beam=5, lp=0.7, max_len=MAX_LEN, tgt_lang_id=LES_ID):
    model.eval()
    device = model.emb.weight.device
    src = torch.tensor([src_ids], dtype=torch.long, device=device)
    mem, src_pad = model.encode(src)
    beams = [([BOS_ID, tgt_lang_id], 0.0)]
    finished = []
    for _ in range(max_len):
        new_beams = []
        for seq, score in beams:
            if seq[-1] == EOS_ID:
                finished.append((seq, score)); continue
            ys = torch.tensor([seq], dtype=torch.long, device=device)
            logits = model.decode(ys, mem, src_pad)[:,-1,:].squeeze(0)
            logp = torch.log_softmax(logits, dim=-1).detach().cpu()
            topk = torch.topk(logp, beam).indices.tolist()
            for tok in topk:
                new_seq = seq + [tok]
                new_score = score + logp[tok].item()
                new_beams.append((new_seq, new_score))
        beams = sorted(new_beams, key=lambda x: x[1]/((len(x[0])**lp)), reverse=True)[:beam]
        if not beams: break
    if not finished: finished = beams
    best = max(finished, key=lambda x: x[1]/((len(x[0])**lp)))
    return best[0]

# Intentar cargar mejores checkpoints si existen
BEST_NCX2ES = best_ncx2es if 'best_ncx2es' in globals() else None
BEST_ES2NCX = best_es2ncx if 'best_es2ncx' in globals() else None

def load_scratch(direction):
    path = BEST_NCX2ES if direction=="ncx2es" else BEST_ES2NCX
    if path is None or not Path(path).exists():
        return None, f"Checkpoint no encontrado: {path}"
    return load_model(path), f"Cargado: {path}"

def infer_scratch(text, direction="ncx2es", beam=5):
    if not text.strip():
        return ""
    model, msg = load_scratch(direction)
    lang_id = LES_ID if direction=="ncx2es" else LNCX_ID
    src_lang = LNCX_ID if direction=="ncx2es" else LES_ID
    src_ids = encode_with_lang(text.lower(), src_lang)
    out_ids = translate_beam(model, src_ids, beam=beam, tgt_lang_id=lang_id)
    return ids_to_text(out_ids)

with gr.Blocks() as demo:
    gr.Markdown("## Traductor (Transformer desde cero)")
    direction = gr.Radio(choices=["ncx2es","es2ncx"], value="ncx2es", label="Dirección")
    beam = gr.Slider(1,10, step=1, value=5, label="Beam size")
    inp = gr.Textbox(lines=3, label="Texto de entrada")
    out = gr.Textbox(lines=3, label="Traducción")
    btn = gr.Button("Traducir")
    btn.click(fn=infer_scratch, inputs=[inp, direction, beam], outputs=[out])

print("Para lanzar la UI en local: demo.launch()")


## 14) Baseline opcional: **BERT2BERT** (mBERT) con HuggingFace
Esta sección usa `transformers` para crear un `EncoderDecoderModel` con `bert-base-multilingual-cased` como encoder y decoder. En CPU será lento; se sugiere **freezing** parcial.


In [None]:
# 14.1) Preparación de dataset para HuggingFace
from datasets import Dataset as HFDataset
from transformers import BertTokenizerFast

HF_DIR = BASE_DIR / "hf_bert2bert"
HF_DIR.mkdir(parents=True, exist_ok=True)

tok_hf = BertTokenizerFast.from_pretrained("bert-base-multilingual-cased")

def build_hf_split(pairs, direction="ncx2es"):
    srcs, tgts = [], []
    for s, t, *_ in pairs:
        if direction=="ncx2es": srcs.append(s); tgts.append(t)
        else: srcs.append(t); tgts.append(s)
    return HFDataset.from_dict({"src":srcs, "tgt":tgts})

hf_train = build_hf_split(train_pairs, "ncx2es")
hf_dev   = build_hf_split(dev_pairs,   "ncx2es")

def tok_map(batch):
    model_inputs = tok_hf(batch["src"], truncation=True, max_length=MAX_LEN)
    with tok_hf.as_target_tokenizer():
        labels = tok_hf(batch["tgt"], truncation=True, max_length=MAX_LEN)
    model_inputs["labels"] = labels["input_ids"]
    return model_inputs

hf_train_tok = hf_train.map(tok_map, batched=True, remove_columns=["src","tgt"])
hf_dev_tok   = hf_dev.map(tok_map,   batched=True, remove_columns=["src","tgt"])

print(hf_train_tok)


In [None]:
# 14.2) BERT2BERT EncoderDecoderModel (entrenamiento ligero)
from transformers import EncoderDecoderModel, TrainingArguments, Trainer, DataCollatorForSeq2Seq

enc_dec = EncoderDecoderModel.from_encoder_decoder_pretrained(
    "bert-base-multilingual-cased", "bert-base-multilingual-cased"
)

# Congelar capas bajas para CPU
for name, param in enc_dec.named_parameters():
    if "encoder.embeddings" in name or "encoder.encoder.layer.0" in name or "decoder.embeddings" in name:
        param.requires_grad = False

data_collator = DataCollatorForSeq2Seq(tokenizer=tok_hf, model=enc_dec)
args = TrainingArguments(
    output_dir=str(HF_DIR),
    per_device_train_batch_size=4,
    per_device_eval_batch_size=4,
    evaluation_strategy="epoch",
    learning_rate=5e-5,
    num_train_epochs=1,
    save_total_limit=1,
    logging_steps=50,
    report_to="none"
)
trainer = Trainer(
    model=enc_dec,
    args=args,
    data_collator=data_collator,
    tokenizer=tok_hf,
    train_dataset=hf_train_tok,
    eval_dataset=hf_dev_tok,
)
# trainer.train()  # Descomenta para entrenar (CPU puede tardar)


In [None]:
# 14.3) Inferencia con BERT2BERT (opcional)
def infer_hf(text, max_new_tokens=64):
    if not text.strip(): return ""
    enc_dec.eval()
    inputs = tok_hf(text, return_tensors="pt")
    outputs = enc_dec.generate(**inputs, max_new_tokens=max_new_tokens)
    return tok_hf.decode(outputs[0], skip_special_tokens=True)
