# Tradução Automática Neural com RNNs (Seq2Seq) — Notebook Colab

Este notebook implementa um pipeline mínimo de tradução automática inglês→francês inspirado no D2L (Seção 9.5):
- Download e pré-processamento do conjunto de dados Tatoeba (ManyThings)
- Tokenização e construção de vocabulário (nível de palavra; com tokens reservados)
- Preenchimento/truncamento e minibatches
- Treinamento de um modelo Seq2Seq simples (GRU)
- Avaliação rápida com decodificação gulosa
- Experimento de tamanho de vocabulário para diferentes números de exemplos e geração automática de README com respostas


In [None]:
# Configuração de bibliotecas (Colab)
import sys, os, math, time, random, re, html

# Instalar PyTorch se necessário (Colab normalmente já possui). D2L não é obrigatório.
try:
    import torch
    import torch.nn as nn
    import torch.nn.functional as F
    from torch.utils.data import Dataset, DataLoader
except Exception as e:
    !pip -q install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121
    import torch
    import torch.nn as nn
    import torch.nn.functional as F
    from torch.utils.data import Dataset, DataLoader

DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Usando dispositivo: {DEVICE}")

# Reprodutibilidade
SEED = 42
random.seed(SEED)
torch.manual_seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(SEED)



Usando dispositivo: cuda


In [None]:
# Download e leitura do conjunto Tatoeba Inglês–Francês (ManyThings / espelho D2L)
import pathlib, zipfile, urllib.request
from typing import Tuple

DATA_URL = "http://d2l-data.s3-accelerate.amazonaws.com/fra-eng.zip"
DATA_DIR = pathlib.Path("./data")
DATA_DIR.mkdir(parents=True, exist_ok=True)


def download_extract_fra_eng(url: str = DATA_URL, data_dir: pathlib.Path = DATA_DIR) -> pathlib.Path:
    """Baixa fra-eng.zip e extrai. Retorna o diretório extraído."""
    zip_path = data_dir / "fra-eng.zip"
    out_dir = data_dir / "fra-eng"
    if not out_dir.exists():
        if not zip_path.exists():
            print(f"Baixando {url} ...")
            urllib.request.urlretrieve(url, zip_path)
        print("Extraindo ...")
        with zipfile.ZipFile(zip_path, "r") as zf:
            zf.extractall(data_dir)
    return out_dir


def read_data_nmt() -> str:
    """Lê o 'fra.txt' bruto com linhas Inglês<TAB>Francês."""
    out_dir = download_extract_fra_eng()
    fra_path = out_dir / "fra.txt"
    with open(fra_path, "r", encoding="utf-8") as f:
        return f.read()

raw_text = read_data_nmt()
print(raw_text.splitlines()[:5])


['Go.\tVa !', 'Hi.\tSalut !', 'Run!\tCours\u202f!', 'Run!\tCourez\u202f!', 'Who?\tQui ?']


In [None]:
# Pré-processamento: normaliza espaços/caixa e insere espaço antes de pontuação (como no D2L)
from typing import List

def preprocess_nmt(text: str) -> str:
    # Substitui espaços não separáveis e converte para minúsculas
    text = text.replace('\u202f', ' ').replace('\xa0', ' ').lower()
    # Insere espaço antes de ,.!? quando precedido por um não-espaço
    out_chars = []
    for i, ch in enumerate(text):
        if ch in ',.!?' and i > 0 and text[i-1] != ' ':
            out_chars.append(' ')
        out_chars.append(ch)
    return ''.join(out_chars)

text = preprocess_nmt(raw_text)
print(text.splitlines()[:5])


['go .\tva !', 'hi .\tsalut !', 'run !\tcours !', 'run !\tcourez !', 'who ?\tqui ?']


In [None]:
# Tokenização e vocabulário
from collections import Counter
from typing import List, Dict, Any


def tokenize_nmt(text: str, num_examples: int | None = None) -> tuple[list[list[str]], list[list[str]]]:
    src, tgt = [], []
    for i, line in enumerate(text.split('\n')):
        if num_examples is not None and i >= num_examples:
            break
        parts = line.split('\t')
        if len(parts) == 2:
            src.append(parts[0].split(' '))
            tgt.append(parts[1].split(' '))
    return src, tgt

class Vocab:
    def __init__(self, tokens: List[List[str]], min_freq: int = 1,
                 reserved_tokens: List[str] | None = None):
        if reserved_tokens is None:
            reserved_tokens = []
        # Contagem de frequências
        counter = Counter()
        for line in tokens:
            counter.update(line)
        # Ordena por frequência e depois alfabeticamente
        token_freqs = sorted(counter.items(), key=lambda x: (-x[1], x[0]))
        # Tokens especiais
        self.pad = '<pad>'
        self.bos = '<bos>'
        self.eos = '<eos>'
        self.unk = '<unk>'
        uniq_tokens = [self.pad, self.bos, self.eos, self.unk] + [t for t in reserved_tokens if t not in {self.pad, self.bos, self.eos, self.unk}]
        for token, freq in token_freqs:
            if freq >= min_freq and token not in uniq_tokens:
                uniq_tokens.append(token)
        self.idx_to_token = uniq_tokens
        self.token_to_idx = {t: i for i, t in enumerate(self.idx_to_token)}

    def __len__(self):
        return len(self.idx_to_token)

    def __getitem__(self, tokens: List[str] | List[List[str]]):
        if not tokens:
            return []
        if isinstance(tokens[0], list):
            return [self.__getitem__(line) for line in tokens]  # type: ignore
        return [self.token_to_idx.get(t, self.token_to_idx[self.unk]) for t in tokens]  # type: ignore

    def to_tokens(self, indices: List[int] | List[List[int]]):
        if not indices:
            return []
        if isinstance(indices[0], list):
            return [self.to_tokens(line) for line in indices]  # type: ignore
        return [self.idx_to_token[i] for i in indices]  # type: ignore

src_tokens, tgt_tokens = tokenize_nmt(text)
src_vocab = Vocab(src_tokens, min_freq=2)
tgt_vocab = Vocab(tgt_tokens, min_freq=2)
print('tamanho src_vocab:', len(src_vocab), 'tamanho tgt_vocab:', len(tgt_vocab))


tamanho src_vocab: 10012 tamanho tgt_vocab: 17851


In [None]:
# Experimento de tamanho de vocabulário para diferentes valores de num_examples (Exercício 1)
NUM_EXAMPLES_LIST = [300, 600, 1000, 5000, 10000, 20000]
results = []
for ne in NUM_EXAMPLES_LIST:
    s, t = tokenize_nmt(text, num_examples=ne)
    sv = Vocab(s, min_freq=2)
    tv = Vocab(t, min_freq=2)
    results.append((ne, len(sv), len(tv)))

for ne, svsz, tvsz in results:
    print(f"num_examples={ne:6d} -> vocab_origem={svsz:6d}, vocab_destino={tvsz:6d}")

# Salva resultados para o README
vocab_experiment_results = results


num_examples=   300 -> vocab_origem=   102, vocab_destino=   107
num_examples=   600 -> vocab_origem=   184, vocab_destino=   201
num_examples=  1000 -> vocab_origem=   266, vocab_destino=   321
num_examples=  5000 -> vocab_origem=   875, vocab_destino=  1231
num_examples= 10000 -> vocab_origem=  1505, vocab_destino=  2252
num_examples= 20000 -> vocab_origem=  2459, vocab_destino=  3828


In [None]:
# Truncar/preencher e DataLoader
from typing import Tuple

def truncate_pad(line: list[int], num_steps: int, pad_idx: int) -> list[int]:
    if len(line) > num_steps:
        return line[:num_steps]
    return line + [pad_idx] * (num_steps - len(line))


def build_array_nmt(lines: list[list[str]], vocab: Vocab, num_steps: int) -> Tuple[torch.Tensor, torch.Tensor]:
    lines_indices = [vocab[l] for l in lines]
    # Acrescenta <eos>
    eos_idx = vocab.token_to_idx[vocab.eos]
    pad_idx = vocab.token_to_idx[vocab.pad]
    lines_indices = [li + [eos_idx] for li in lines_indices]
    arr = torch.tensor([truncate_pad(li, num_steps, pad_idx) for li in lines_indices], dtype=torch.long)
    valid_len = (arr != pad_idx).int().sum(dim=1)
    return arr, valid_len

class NMTDataset(Dataset):
    def __init__(self, src_lines: list[list[str]], tgt_lines: list[list[str]],
                 src_vocab: Vocab, tgt_vocab: Vocab, num_steps: int):
        self.src_arr, self.src_valid = build_array_nmt(src_lines, src_vocab, num_steps)
        self.tgt_arr, self.tgt_valid = build_array_nmt(tgt_lines, tgt_vocab, num_steps)
    def __len__(self):
        return self.src_arr.size(0)
    def __getitem__(self, idx):
        return (self.src_arr[idx], self.src_valid[idx], self.tgt_arr[idx], self.tgt_valid[idx])

# Subconjunto pequeno para treinamento mais rápido no Colab
NUM_STEPS = 12
BATCH_SIZE = 128
NUM_EXAMPLES_TRAIN = 20000
src_small, tgt_small = tokenize_nmt(text, num_examples=NUM_EXAMPLES_TRAIN)
src_vocab = Vocab(src_small, min_freq=2)
tgt_vocab = Vocab(tgt_small, min_freq=2)
train_ds = NMTDataset(src_small, tgt_small, src_vocab, tgt_vocab, NUM_STEPS)
train_dl = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True)

len(src_vocab), len(tgt_vocab), len(train_ds)


(2459, 3828, 20000)

In [None]:
# Seq2Seq simples com GRU (codificador-decodificador) e decodificação gulosa

class Encoder(nn.Module):
    def __init__(self, vocab_size: int, embed_size: int, hidden_size: int, num_layers: int = 1, dropout: float = 0.1):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, embed_size, padding_idx=0)
        self.gru = nn.GRU(embed_size, hidden_size, num_layers=num_layers, batch_first=True, dropout=dropout if num_layers>1 else 0.0)
    def forward(self, x, valid_len):
        emb = self.embed(x)
        # Opcional: poderíamos empacotar sequências com padding, mas manteremos simples
        outputs, hidden = self.gru(emb)
        return outputs, hidden

class Decoder(nn.Module):
    def __init__(self, vocab_size: int, embed_size: int, hidden_size: int, num_layers: int = 1, dropout: float = 0.1):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, embed_size, padding_idx=0)
        self.gru = nn.GRU(embed_size, hidden_size, num_layers=num_layers, batch_first=True, dropout=dropout if num_layers>1 else 0.0)
        self.fc = nn.Linear(hidden_size, vocab_size)
    def forward(self, x, hidden):
        emb = self.embed(x)
        out, hidden = self.gru(emb, hidden)
        logits = self.fc(out)
        return logits, hidden

class Seq2Seq(nn.Module):
    def __init__(self, enc: Encoder, dec: Decoder, tgt_pad_idx: int):
        super().__init__()
        self.enc = enc
        self.dec = dec
        self.tgt_pad_idx = tgt_pad_idx
    def forward(self, src, src_valid_len, tgt):
        _, hidden = self.enc(src, src_valid_len)
        # Teacher forcing: alimenta os tokens verdadeiros deslocados (preprende <bos>)
        logits, _ = self.dec(tgt[:, :-1], hidden)
        return logits

# Construir modelo
EMBED_SIZE = 128
HIDDEN_SIZE = 256
enc = Encoder(len(src_vocab), EMBED_SIZE, HIDDEN_SIZE)
dec = Decoder(len(tgt_vocab), EMBED_SIZE, HIDDEN_SIZE)
model = Seq2Seq(enc, dec, tgt_pad_idx=tgt_vocab.token_to_idx[tgt_vocab.pad]).to(DEVICE)

# Função de perda e otimizador
criterion = nn.CrossEntropyLoss(ignore_index=tgt_vocab.token_to_idx[tgt_vocab.pad])
optimizer = torch.optim.Adam(model.parameters(), lr=3e-3)

# Preparar função de criação de entradas/saídas do decodificador
BOS_IDX = tgt_vocab.token_to_idx[tgt_vocab.bos]
EOS_IDX = tgt_vocab.token_to_idx[tgt_vocab.eos]
PAD_IDX = tgt_vocab.token_to_idx[tgt_vocab.pad]

def make_decoder_inputs_targets(tgt_batch: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor]:
    # tgt_batch forma: (B, T). Entrada: [<bos>, y_0, ..., y_{T-2}] e alvo: [y_0, ..., y_{T-2}, y_{T-1}]
    B, T = tgt_batch.size()
    bos_col = torch.full((B, 1), BOS_IDX, dtype=tgt_batch.dtype, device=tgt_batch.device)
    dec_in = torch.cat([bos_col, tgt_batch[:, :-1]], dim=1)
    dec_out = tgt_batch
    return dec_in, dec_out

# Laço de treinamento
EPOCHS = 150
model.train()
for epoch in range(1, EPOCHS+1):
    total_loss = 0.0
    total_tokens = 0
    for src_b, src_len_b, tgt_b, tgt_len_b in train_dl:
        src_b = src_b.to(DEVICE)
        tgt_b = tgt_b.to(DEVICE)
        dec_in, dec_out = make_decoder_inputs_targets(tgt_b)
        optimizer.zero_grad()
        logits = model(src_b, src_len_b.to(DEVICE), dec_in)
        # Alinha formas para a perda: logits (B, T-1, V) vs alvos (B, T-1)
        loss = criterion(logits.reshape(-1, logits.size(-1)), dec_out[:, 1:].reshape(-1))
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()
        # Acumula
        ntokens = (dec_out[:, 1:] != PAD_IDX).sum().item()
        total_loss += loss.item() * ntokens
        total_tokens += ntokens
    ppl = math.exp(total_loss / max(1, total_tokens))
    print(f"Época {epoch}: perda_por_token={(total_loss/total_tokens):.4f}, ppl={ppl:.2f}")


Época 1: perda_por_token=3.8674, ppl=47.82
Época 2: perda_por_token=2.5507, ppl=12.82
Época 3: perda_por_token=1.7803, ppl=5.93
Época 4: perda_por_token=1.2786, ppl=3.59
Época 5: perda_por_token=0.9709, ppl=2.64
Época 6: perda_por_token=0.7842, ppl=2.19
Época 7: perda_por_token=0.6649, ppl=1.94
Época 8: perda_por_token=0.5879, ppl=1.80
Época 9: perda_por_token=0.5341, ppl=1.71
Época 10: perda_por_token=0.4989, ppl=1.65
Época 11: perda_por_token=0.4742, ppl=1.61
Época 12: perda_por_token=0.4535, ppl=1.57
Época 13: perda_por_token=0.4434, ppl=1.56
Época 14: perda_por_token=0.4311, ppl=1.54
Época 15: perda_por_token=0.4221, ppl=1.53
Época 16: perda_por_token=0.4141, ppl=1.51
Época 17: perda_por_token=0.4108, ppl=1.51
Época 18: perda_por_token=0.4068, ppl=1.50
Época 19: perda_por_token=0.3997, ppl=1.49
Época 20: perda_por_token=0.3966, ppl=1.49
Época 21: perda_por_token=0.3939, ppl=1.48
Época 22: perda_por_token=0.3958, ppl=1.49
Época 23: perda_por_token=0.3880, ppl=1.47
Época 24: perda_po

In [None]:
# Decodificação gulosa para avaliação qualitativa rápida
@torch.no_grad()
def translate_greedy(src_sentence: str, max_len: int = 20) -> str:
    # Pré-processa e tokeniza a frase de entrada (apenas Inglês)
    s = preprocess_nmt(src_sentence)
    s_tokens = s.strip().split(' ')
    src_idx = torch.tensor([truncate_pad(src_vocab[s_tokens] + [src_vocab.token_to_idx[src_vocab.eos]], NUM_STEPS, src_vocab.token_to_idx[src_vocab.pad])], dtype=torch.long).to(DEVICE)
    _, hidden = model.enc(src_idx, torch.tensor([len(s_tokens)+1]))
    # Inicia com <bos>
    cur = torch.tensor([[BOS_IDX]], dtype=torch.long, device=DEVICE)
    out_tokens: list[int] = []
    for _ in range(max_len):
        logits, hidden = model.dec(cur, hidden)
        next_token = logits[:, -1].argmax(dim=-1)
        token_id = next_token.item()
        if token_id == EOS_IDX or token_id == PAD_IDX:
            break
        out_tokens.append(token_id)
        cur = torch.tensor([[token_id]], dtype=torch.long, device=DEVICE)
    return ' '.join(tgt_vocab.to_tokens(out_tokens))

# Testa algumas amostras curtas do conjunto de dados
for i in [0, 5, 10, 20, 30]:
    en = ' '.join(src_small[i])
    fr = ' '.join(tgt_small[i])
    pred = translate_greedy(en)
    print(f"EN: {en}\nFR(verdadeiro): {fr}\nFR(previsto): {pred}\n---")


EN: go .
FR(verdadeiro): va !
FR(previsto): ! y . suis .
---
EN: wow !
FR(verdadeiro): ça alors !
FR(previsto): alors ! ! de ! ! ! ! ! ! ! ! ! ! ! ! ! ! ! !
---
EN: stop !
FR(verdadeiro): stop !
FR(previsto): ! ! ! ! ! ! ! ! ! ! ! ! ! ! ! ! ! ! ! !
---
EN: i try .
FR(verdadeiro): j'essaye .
FR(previsto): . .
---
EN: cheers !
FR(verdadeiro): tchin-tchin !
FR(previsto): ! ! votre ! ! ! ! ! ! ! ! ! ! ! ! ! ! ! ! !
---


In [None]:
# Auto-generate README.md with documentation and exercise answers
import textwrap, json

EXERCISE_1_TEXT = """
Pergunta: Tente valores diferentes do argumento num_examples na função load_data_nmt.
Como isso afeta os tamanhos do vocabulário do idioma de origem e do idioma de destino?

Resposta (baseada nos resultados deste notebook):
"""

lines = [f"- num_examples={ne}: src_vocab={sv}, tgt_vocab={tv}" for (ne, sv, tv) in vocab_experiment_results]
ex1_answer = EXERCISE_1_TEXT + "\n" + "\n".join(lines) + "\n\n" + \
    "Conclusão: conforme aumentamos num_examples, ambos os vocabulários crescem monotonicamente (ou quase),\n" \
    "pois mais sentenças expõem mais tipos de palavras. As taxas de crescimento diferem por idioma\n" \
    "devido a diferenças morfológicas e distribuição de tokens no corpus."

EXERCISE_2_TEXT = """
Pergunta: O texto em alguns idiomas, como chinês e japonês, não tem indicadores de limite de palavras.
A tokenização em nível de palavra ainda é uma boa ideia para esses casos? Por que ou por que não?

Resposta:
Não. Em línguas sem separadores explícitos de palavras (p.ex., chinês, japonês), a tokenização em nível de palavra
exige uma etapa externa de segmentação que é ruidosa e dependente de dicionário. Isso pode introduzir erros sistêmicos
no treinamento. Em vez disso, tokenização em nível de subpalavra (BPE/WordPiece/Unigram, como em Sennrich et al., 2016; Kudo, 2018)
ou mesmo nível de caractere pode ser preferível, pois lida melhor com morfologia rica e OOV.

Referências:
- D2L 9.5 destaca que o vocabulário em nível de palavra cresce muito, e sugere técnicas de tokenização mais avançadas.
- Sennrich, Haddow, Birch (2016): Neural Machine Translation of Rare Words with Subword Units.
- Kudo (2018): Subword Regularization: Improving Neural Network Translation Models with Multiple Subword Candidates.
- Devlin et al. (2019): BERT uses WordPiece (subword) tokenization, amplamente adotado em PLN.
"""

readme = f"""
# Neural Machine Translation with RNNs (Seq2Seq)

Este projeto implementa um pipeline simples de tradução automática (inglês→francês) inspirado em D2L Seção 9.5.

## Conteúdo
- `NMT_Seq2Seq_Colab.ipynb`: Notebook Colab com:
  - Download e pré-processamento (ManyThings/Tatoeba, fra-eng.zip)
  - Tokenização e vocabulários com tokens especiais (<pad>, <bos>, <eos>, <unk>)
  - Truncamento/preenchimento e minibatches
  - Modelo Seq2Seq (Encoder/Decoder GRU) com treinamento rápido
  - Decodificação gulosa para avaliação qualitativa
  - Experimento de tamanho de vocabulário variando `num_examples`

## Como executar (Colab)
1. Abra o notebook no Google Colab e selecione GPU (opcional).
2. Execute todas as células em ordem. O último bloco gera este README automaticamente.

## Resultados (resumo)
- Perplexidade por época exibida no treinamento (ver notebook).
- Traduções de exemplo impressas (qualitativas).

## Exercícios (D2L 9.5.7)

### 1) Variação de `num_examples` e tamanhos de vocabulário
{ex1_answer}

### 2) Tokenização em idiomas sem separadores de palavras (chinês/japonês)
{EXERCISE_2_TEXT}
"""

with open("README.md", "w", encoding="utf-8") as f:
    f.write(textwrap.dedent(readme))

print("README.md written.")


README.md written.
