# 08 - Transformer Encoder-Decoder

Neste notebook, vamos implementar um modelo encoder-decoder baseado no **Transformer**, uma arquitetura que substituiu as RNNs em muitas tarefas de NLP e se tornou o padrão em tradução automática e modelos de linguagem modernos.

## Objetivos de Aprendizado
- Revisar a arquitetura do Transformer e seus principais componentes (Self-Attention, Encoder, Decoder)
- Implementar o Encoder e o Decoder com PyTorch
- Construir um modelo completo de tradução português-inglês usando Transformer
- Treinar o modelo em um conjunto de dados de exemplo
- Avaliar a qualidade das traduções geradas

In [None]:
import re
import math
import torch
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
from collections import Counter
from torch.utils.data import TensorDataset, DataLoader, Dataset
from sklearn.model_selection import train_test_split

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

### Positional Encoding

Transformers não possuem mecanismos recorrentes ou convolucionais, o que significa que eles não têm uma noção implícita da ordem dos tokens em uma sequência. Para incorporar essa informação, é adicionada uma codificação posicional aos vetores de embedding. Essa codificação é determinística e baseada em funções senoidais de diferentes frequências.

A codificação posicional utilizada segue a formulação original do paper *"Attention is All You Need"*:

$$
PE_{(pos, 2i)} = \sin\left(\frac{pos}{10000^{2i/d_{\text{model}}}}\right)
$$

$$
PE_{(pos, 2i+1)} = \cos\left(\frac{pos}{10000^{2i/d_{\text{model}}}}\right)
$$

Onde:
- $pos$ representa a posição do token na sequência,
- $i$ é o índice da dimensão do embedding,
- $d_{\text{model}}$ é a dimensionalidade do embedding.

O resultado é uma matriz de codificação com forma $(1, \text{max\_len}, d_{\text{model}})$ que é somada diretamente aos embeddings de entrada.

In [None]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super().__init__()
        self.d_model = d_model

        # matriz (max_len, d_model)
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)  # (max_len, 1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))

        # seno nas posições pares, cosseno nas ímpares
        pe[:, 0::2] = torch.sin(position * div_term)  # pares
        pe[:, 1::2] = torch.cos(position * div_term)  # ímpares

        pe = pe.unsqueeze(0)  # (1, max_len, d_model) -> broadcast no batch
        self.register_buffer("pe", pe)  # não é parâmetro treinável

    def forward(self, x):
        T = x.size(1)
        x = x + self.pe[:, :T, :]  # (B, T, d_model)
        return x

In [None]:
d_model = 16
num_heads = 4
B, T = 2, 5

# embeddings simulados
x = torch.randn(B, T, d_model)

# positional encoding
pos_enc = PositionalEncoding(d_model)
x = pos_enc(x)  # adiciona posições

print("Positional Encoding:", x.shape)  # (B, T, d_model)

### Multi-Head Attention

O mecanismo de **multi-head attention** é um dos blocos centrais dos Transformers. Ele permite que o modelo foque em diferentes partes da sequência em paralelo, usando múltiplas "cabeças" de atenção. Cada cabeça realiza uma atenção com projeções diferentes dos vetores de entrada, e seus resultados são combinados ao final.

A atenção é baseada no mecanismo de **Scaled Dot-Product Attention**, que recebe três vetores: $Q$ (query), $K$ (key) e $V$ (value). O cálculo da atenção segue a fórmula:

$$
\text{Attention}(Q, K, V) = \text{softmax}\left(\frac{QK^T}{\sqrt{d_k}}\right) V
$$

Onde:
- $Q$, $K$ e $V$ são tensores projetados a partir da entrada,
- $d_k$ é a dimensionalidade das chaves (key),
- A divisão por $\sqrt{d_k}$ serve para normalizar os scores e evitar valores muito grandes que podem saturar a softmax.

No caso de múltiplas cabeças, os vetores $Q$, $K$ e $V$ são divididos em $h$ partes (cabeças), cada uma com dimensionalidade reduzida $d_k = d_{\text{model}} / h$, aplicando a atenção de forma independente em cada cabeça. Os resultados são então concatenados e projetados novamente com uma camada linear:

$$
\text{MultiHead}(Q, K, V) = \text{Concat}(\text{head}_1, \dots, \text{head}_h) W^O
$$

Cada cabeça é computada como:

$$
\text{head}_i = \text{Attention}(Q W_i^Q, K W_i^K, V W_i^V)
$$

Esse paralelismo permite que diferentes aspectos contextuais da sequência sejam aprendidos simultaneamente. Essa implementação define todas as projeções lineares necessárias, faz o `split_heads`, aplica a atenção escalada, combina os resultados com `combine_heads`, e projeta de volta para o espaço original com uma camada linear $W^O$.

In [None]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        assert d_model % num_heads == 0, "d_model deve ser divisível por num_heads"

        self.d_model = d_model
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads

        # Projeções lineares
        self.q_proj = nn.Linear(d_model, d_model)
        self.k_proj = nn.Linear(d_model, d_model)
        self.v_proj = nn.Linear(d_model, d_model)

        self.out_proj = nn.Linear(d_model, d_model)

    def forward(self, q, k, v, mask=None):
        B, Tq, _ = q.size()
        Tk = k.size(1)
        Tv = v.size(1)

        # Projeções
        Q = self.q_proj(q)
        K = self.k_proj(k)
        V = self.v_proj(v)

        # Split heads
        Q = Q.view(B, Tq, self.num_heads, self.head_dim).transpose(1, 2)  # (B, h, Tq, d_head)
        K = K.view(B, Tk, self.num_heads, self.head_dim).transpose(1, 2)  # (B, h, Tk, d_head)
        V = V.view(B, Tv, self.num_heads, self.head_dim).transpose(1, 2)  # (B, h, Tv, d_head)

        # Atenção
        attn_scores = torch.matmul(Q, K.transpose(-2, -1)) / (self.head_dim ** 0.5)  # (B, h, Tq, Tk)

        if mask is not None:
            # mask: (B, 1, 1, Tk) ou (B, 1, Tq, Tk)
            attn_scores = attn_scores.masked_fill(mask == 0, float("-inf"))

        attn_weights = F.softmax(attn_scores, dim=-1)
        attn_output = torch.matmul(attn_weights, V)  # (B, h, Tq, d_head)

        # Junta os heads
        attn_output = attn_output.transpose(1, 2).contiguous().view(B, Tq, self.d_model)

        return self.out_proj(attn_output)

In [None]:
def causal_mask(seq_len, device=None):
    """
    Cria máscara causal triangular inferior.
    shape: (1, 1, seq_len, seq_len)
    """
    mask = torch.tril(torch.ones(seq_len, seq_len, device=device))
    return mask.unsqueeze(0).unsqueeze(0)  # (1, 1, T, T)


def padding_mask(pad_tokens, device=None):
    """
    Cria máscara de padding.
    pad_tokens: tensor (B, T) com 1 onde é token válido e 0 onde é padding
    retorna shape: (B, 1, 1, T) -> broadcast em atenção
    """
    return pad_tokens.unsqueeze(1).unsqueeze(2).to(device)  # (B,1,1,T)

In [None]:
# Exemplo de uso
d_model = 16
num_heads = 4
B, T = 2, 5

x = torch.randn(B, T, d_model)
mask = causal_mask(T, device=x.device)
attn = MultiHeadAttention(d_model=d_model, num_heads=num_heads)

out = attn(x, x, x, mask=mask)
print("Self-Attention:", out.shape)  # (B, T, d_model)

### Feed-Forward Layer

Em Transformers, cada bloco contém uma **camada feed-forward totalmente conectada** que é aplicada de forma independente a cada posição da sequência. Essa camada é responsável por aprender transformações não-lineares locais após o mecanismo de atenção.

A arquitetura típica de uma feed-forward layer é composta por duas camadas lineares com uma função de ativação não-linear (geralmente ReLU) no meio:

$$
\text{FFN}(x) = W_2 \cdot \text{ReLU}(W_1 \cdot x + b_1) + b_2
$$

Onde:
- $x$ é o vetor de entrada de dimensão $d_{\text{model}}$,
- $W_1 \in \mathbb{R}^{d_{\text{ff}} \times d_{\text{model}}}$ e $W_2 \in \mathbb{R}^{d_{\text{model}} \times d_{\text{ff}}}$ são pesos aprendidos,
- $d_{\text{ff}}$ é a dimensionalidade intermediária (maior que $d_{\text{model}}$ para aumentar a capacidade do modelo),
- $\text{ReLU}(x) = \max(0, x)$ é a função de ativação não-linear.

Essa camada é aplicada posição a posição (de forma independente em cada token), e introduz não-linearidades e capacidade de transformação mais complexa ao modelo, além de expandir e comprimir a dimensionalidade, o que funciona como um "bottleneck" informativo.

In [None]:
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff, dropout=0.1):
        super().__init__()
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff, d_model)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        return self.fc2(self.dropout(self.relu(self.fc1(x))))

In [None]:
# Exemplo
d_model = 512
d_ff = 2048
B, T = 2, 5

feed_forward = FeedForward(d_model, d_ff)

query = torch.randn(B, T, d_model)  # (B, T, d_model)
output = feed_forward(query)

print(f'Input shape: {query.shape}')  # Input shape: (B, T, d_model)
print(f'Output shape: {output.shape}')  # Output shape: (B, T, d_model)

### EncoderLayer e Encoder

Cada `EncoderLayer` é composta por dois blocos: atenção multi-cabeça seguida de normalização, e uma feed-forward seguida de outra normalização. Em ambos os casos, há conexões residuais e dropout:

$$
x_1 = \text{LayerNorm}(x + \text{Dropout}(\text{MultiHead}(x, x, x)))
$$

$$
\text{Output} = \text{LayerNorm}(x_1 + \text{Dropout}(\text{FFN}(x_1)))
$$

A classe `Encoder` empilha múltiplas `EncoderLayer`s após converter os tokens com `Embedding` e adicionar codificações posicionais. O fluxo é:

$$
x = \text{Embedding}(x) + \text{PositionalEncoding}
$$

$$
x = \text{EncoderLayer}_N \circ \cdots \circ \text{EncoderLayer}_1 (x)
$$

Esse processo transforma a sequência de entrada em uma representação contextualizada, onde cada posição é influenciada pelas demais.

In [None]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super().__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward  = FeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, src_mask=None):
        # Self-Attention
        attn_out = self.self_attn(x, x, x, mask=src_mask)
        x = x + self.dropout(attn_out)
        x = self.norm1(x)

        # FeedForward
        ff_out = self.feed_forward(x)
        x = x + self.dropout(ff_out)
        x = self.norm2(x)
        return x

In [None]:
# Modelo
d_model = 32
num_heads = 4
d_ff = 64

encoder_layer = EncoderLayer(d_model, num_heads, d_ff)

# Exemplo
batch_size = 4
seq_len = 10

x = torch.randn(batch_size, seq_len, d_model)
out = encoder_layer(x)

print(f"Input shape: {x.shape}")    # (N, T, d_model)
print(f"Output shape: {out.shape}") # (N, T, d_model)

In [None]:
class Encoder(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, num_layers, dropout=0.1):
        super().__init__()
        self.layers = nn.ModuleList([
            EncoderLayer(d_model, num_heads, d_ff, dropout)
            for _ in range(num_layers)
        ])
        self.norm = nn.LayerNorm(d_model)

    def forward(self, x, src_mask=None):
        for layer in self.layers:
            x = layer(x, src_mask)
        return self.norm(x)

In [None]:
# Modelo
d_model = 32
num_heads = 4
d_ff = 64
num_layers = 3

encoder = Encoder(d_model, num_heads, d_ff, num_layers)

# Exemplo
batch_size = 4
seq_len = 12

x = torch.randn(batch_size, seq_len, d_model)
out = encoder(x)

print(f"Input shape: {x.shape}")     # (N, T, d_model)
print(f"Encoder output: {out.shape}") # (N, T, d_model)

### DecoderLayer e Decoder

Cada `DecoderLayer` possui três blocos com conexões residuais e normalização:

1. **Self-attention mascarada**: impede que o token atual veja os futuros.
2. **Cross-attention**: permite que o decoder atenda à saída do encoder.
3. **Feed-forward**: transformação não linear local.

As operações são:

$$
x_1 = \text{LayerNorm}(x + \text{Dropout}(\text{SelfAttn}(x)))
$$

$$
x_2 = \text{LayerNorm}(x_1 + \text{Dropout}(\text{CrossAttn}(x_1, \text{enc\_out})))
$$

$$
\text{Output} = \text{LayerNorm}(x_2 + \text{Dropout}(\text{FFN}(x_2)))
$$

O `Decoder` empilha múltiplas `DecoderLayer`s após aplicar embedding e codificação posicional, e gera uma distribuição sobre o vocabulário via uma camada linear final.

In [None]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1, cross_attention=True):
        super().__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.cross_attention = cross_attention
        if cross_attention:
            self.cross_attn = MultiHeadAttention(d_model, num_heads)

        self.feed_forward = FeedForward(d_model, d_ff, dropout)

        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model) if cross_attention else None
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, enc_out=None, tgt_mask=None, memory_mask=None):
        # Masked Self-Attention
        attn_out = self.self_attn(x, x, x, mask=tgt_mask)
        x = x + self.dropout(attn_out)
        x = self.norm1(x)

        # Cross-Attention (se habilitado)
        if self.cross_attention and enc_out is not None:
            attn_out = self.cross_attn(x, enc_out, enc_out, mask=memory_mask)
            x = x + self.dropout(attn_out)
            x = self.norm2(x)

        # FeedForward
        ff_out = self.feed_forward(x)
        x = x + self.dropout(ff_out)
        x = self.norm3(x)

        return x

In [None]:
# Modelo
d_model = 32
num_heads = 4
d_ff = 64

decoder_layer = DecoderLayer(d_model, num_heads, d_ff)

# Exemplo
batch_size = 4
src_len = 15
tgt_len = 7

enc_out = torch.randn(batch_size, src_len, d_model)  # saída do encoder
tgt = torch.randn(batch_size, tgt_len, d_model)      # entrada do decoder

tgt_mask = causal_mask(tgt_len)

out = decoder_layer(tgt, enc_out, tgt_mask=tgt_mask)

print(f"Target input: {tgt.shape}")        # (N, T_tgt, d_model)
print(f"DecoderLayer output: {out.shape}") # (N, T_tgt, d_model)

In [None]:
class Decoder(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, num_layers, dropout=0.1, cross_attention=True):
        super().__init__()
        self.layers = nn.ModuleList([
            DecoderLayer(d_model, num_heads, d_ff, dropout, cross_attention)
            for _ in range(num_layers)
        ])
        self.norm = nn.LayerNorm(d_model)

    def forward(self, x, enc_out=None, tgt_mask=None, memory_mask=None):
        for layer in self.layers:
            x = layer(x, enc_out, tgt_mask, memory_mask)
        return self.norm(x)

In [None]:
# Modelo
d_model = 32
num_heads = 4
d_ff = 64
num_layers = 2

decoder = Decoder(d_model, num_heads, d_ff, num_layers)

# Exemplo
batch_size = 4
src_len = 15
tgt_len = 7

enc_out = torch.randn(batch_size, src_len, d_model)
tgt = torch.randn(batch_size, tgt_len, d_model)

tgt_mask = causal_mask(tgt_len)

out = decoder(tgt, enc_out, tgt_mask=tgt_mask)

print(f"Decoder input: {tgt.shape}")     # (N, T_tgt, d_model)
print(f"Decoder output: {out.shape}")    # (N, T_tgt, d_model)

### Transformer

A classe `Transformer` combina o encoder e o decoder em uma arquitetura completa de tradução seq2seq. Ela segue o formato proposto por Vaswani et al. (2017), onde:

- O **encoder** processa a sequência de entrada e gera representações contextuais.
- O **decoder** gera a saída passo a passo, utilizando essas representações.

#### Máscaras

Durante o `forward`, são geradas duas máscaras:
- **Máscara de padding**: impede atenção a tokens vazios (`src == 0` ou `trg == 0`).
- **Máscara causal (no-peak)**: impede que a atenção no decoder veja posições futuras, garantindo autoregressividade. Ela é definida por:

$$
\text{nopeak}_{i,j} = \begin{cases}
1, & \text{se } j \leq i \\
0, & \text{caso contrário}
\end{cases}
$$

O fluxo geral é:

$$
\text{EncoderOutput} = \text{Encoder}(src, src\_mask)
$$

$$
\text{Output} = \text{Decoder}(trg, \text{EncoderOutput}, src\_mask, trg\_mask)
$$

In [None]:
class Transformer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, num_encoder_layers, num_decoder_layers,
                 src_vocab_size, tgt_vocab_size, max_len=5000, dropout=0.1):
        super().__init__()

        # embeddings separados
        self.src_embedding = nn.Embedding(src_vocab_size, d_model)
        self.tgt_embedding = nn.Embedding(tgt_vocab_size, d_model)

        # positional encoding
        self.pos_encoding = PositionalEncoding(d_model, max_len)

        # encoder e decoder
        self.encoder = Encoder(d_model, num_heads, d_ff, num_encoder_layers, dropout)
        self.decoder = Decoder(d_model, num_heads, d_ff, num_decoder_layers, dropout)

        # projeção final para o vocabulário de saída
        self.fc_out = nn.Linear(d_model, tgt_vocab_size)

    def forward(self, src, tgt, src_mask=None, tgt_mask=None, memory_mask=None):
        """
        src: (N, T_src) índices dos tokens da entrada
        tgt: (N, T_tgt) índices dos tokens da saída
        """
        # embeddings + posições
        src_emb = self.src_embedding(src) * (self.src_embedding.embedding_dim ** 0.5)
        src_emb = self.pos_encoding(src_emb)

        tgt_emb = self.tgt_embedding(tgt) * (self.tgt_embedding.embedding_dim ** 0.5)
        tgt_emb = self.pos_encoding(tgt_emb)

        # encoder
        memory = self.encoder(src_emb, src_mask)

        # decoder
        out = self.decoder(tgt_emb, memory, tgt_mask, memory_mask)

        # projeção final para vocabulário alvo
        logits = self.fc_out(out)  # (N, T_tgt, tgt_vocab_size)

        return logits

In [None]:
# Modelo
d_model = 32
num_heads = 4
d_ff = 64
num_encoder_layers = 2
num_decoder_layers = 2
src_vocab_size = 120   # ex: português
tgt_vocab_size = 150   # ex: inglês
max_len = 50

model = Transformer(d_model, num_heads, d_ff, num_encoder_layers, num_decoder_layers, src_vocab_size, tgt_vocab_size, max_len)

# Exemplo
batch_size = 4
src_len = 12
tgt_len = 8

src = torch.randint(0, src_vocab_size, (batch_size, src_len))  # tokens de entrada
tgt = torch.randint(0, tgt_vocab_size, (batch_size, tgt_len))  # tokens de saída

tgt_mask = causal_mask(tgt_len)

out = model(src, tgt, tgt_mask=tgt_mask)

print(f"Source input shape: {src.shape}")   # (N, T_src)
print(f"Target input shape: {tgt.shape}")   # (N, T_tgt)
print(f"Output shape: {out.shape}")         # (N, T_tgt, tgt_vocab_size)

## Tradução

In [None]:
pairs = [
    ("olá", "hello"),
    ("bom dia", "good morning"),
    ("boa noite", "good night"),
    ("como vai?", "how are you?"),
    ("estou bem", "i am fine"),
    ("obrigado", "thank you"),
    ("até logo", "see you later"),
    ("sim", "yes"),
    ("não", "no"),
    ("eu gosto de café", "i like coffee"),
    ("ela gosta de música", "she likes music"),
    ("nós vamos para a escola", "we go to school"),
    ("ele está em casa", "he is at home"),
    ("onde você está?", "where are you?"),
    ("o gato está na cadeira", "the cat is on the chair"),
]

In [None]:
def build_vocab(sentences):
    tokens = set()
    for s in sentences:
        tokens.update(s.lower().split())
    stoi = {tok: i+4 for i, tok in enumerate(sorted(tokens))}
    stoi["<pad>"] = 0
    stoi["<sos>"] = 1
    stoi["<eos>"] = 2
    stoi["<unk>"] = 3
    itos = {i: t for t, i in stoi.items()}
    return stoi, itos

# constrói vocabulários
src_sentences = [pt for pt, en in pairs]
tgt_sentences = [en for pt, en in pairs]

src_stoi, src_itos = build_vocab(src_sentences)
tgt_stoi, tgt_itos = build_vocab(tgt_sentences)

In [None]:
def encode_sentence(sentence, stoi, max_len=10):
    tokens = sentence.lower().split()
    ids = [stoi.get(tok, stoi["<unk>"]) for tok in tokens]
    ids = [stoi["<sos>"]] + ids + [stoi["<eos>"]]
    if len(ids) < max_len:
        ids += [stoi["<pad>"]] * (max_len - len(ids))
    return ids[:max_len]

max_len = 10
data = [
    (encode_sentence(pt, src_stoi, max_len), encode_sentence(en, tgt_stoi, max_len))
    for pt, en in pairs
]

src_data = torch.tensor([pt for pt, en in data])
tgt_data = torch.tensor([en for pt, en in data])

print("src_data:", src_data.shape)  # (N, max_len)
print("tgt_data:", tgt_data.shape)  # (N, max_len)

In [None]:
src_vocab_size = len(src_stoi)
tgt_vocab_size = len(tgt_stoi)

d_model = 32
num_heads = 4
d_ff = 64
num_encoder_layers = 2
num_decoder_layers = 2

model = Transformer(
    d_model, num_heads, d_ff,
    num_encoder_layers, num_decoder_layers,
    src_vocab_size, tgt_vocab_size, max_len
)

In [None]:
criterion = nn.CrossEntropyLoss(ignore_index=tgt_stoi["<pad>"])
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

epochs = 50
batch_size = 4

for epoch in range(epochs):
    total_loss = 0
    for i in range(0, len(src_data), batch_size):
        src_batch = src_data[i:i+batch_size]
        tgt_batch = tgt_data[i:i+batch_size]

        # entrada do decoder é sem o último token
        tgt_in = tgt_batch[:, :-1]
        # alvo é sem o primeiro token
        tgt_out = tgt_batch[:, 1:]

        tgt_mask = causal_mask(tgt_in.size(1))

        logits = model(src_batch, tgt_in, tgt_mask=tgt_mask)

        loss = criterion(
            logits.reshape(-1, tgt_vocab_size),
            tgt_out.reshape(-1)
        )

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    if (epoch+1) % 10 == 0:
        print(f"Epoch {epoch+1}, Loss: {total_loss:.4f}")

In [None]:
def greedy_translate(model, src_sentence, max_len=10):
    model.eval()
    src_ids = torch.tensor([encode_sentence(src_sentence, src_stoi, max_len)])
    tgt_ids = torch.tensor([[tgt_stoi["<sos>"]]])

    for _ in range(max_len-1):
        tgt_mask = causal_mask(tgt_ids.size(1))
        logits = model(src_ids, tgt_ids, tgt_mask=tgt_mask)
        next_token = logits[:, -1, :].argmax(-1).unsqueeze(0)
        tgt_ids = torch.cat([tgt_ids, next_token], dim=1)
        if next_token.item() == tgt_stoi["<eos>"]:
            break

    return " ".join([tgt_itos[i.item()] for i in tgt_ids[0]])

print(greedy_translate(model, "eu gosto de cafe"))

## Exercícios

### Exercício 1: Classificação de Notícias com Transformer Encoder

Implemente um módulo de classificação de texto utilizando **apenas o Encoder do Transformer**.  

O modelo deve:  
- Receber uma sequência de índices de tokens como entrada.  
- Passar os embeddings pela pilha de camadas do Encoder.  
- Agregar a informação da sequência por meio de um **pooling de média na dimensão temporal** (`seq_len`).  
- Passar o vetor resultante por uma camada linear para prever a classe.  

O dataset a ser utilizado é o **20 Newsgroups**, filtrado em algumas categorias de notícias.  
Seu objetivo é treinar o classificador para prever a qual categoria pertence cada texto.  

In [None]:
from sklearn.datasets import fetch_20newsgroups

categories = ['sci.electronics', 'comp.graphics', 'sci.med', 'rec.motorcycles']
max_len = 100
batch_size = 32


# === Carregamento dos dados ===
newsgroups_data = fetch_20newsgroups(subset='all', categories=categories)
texts = newsgroups_data.data[:5000]
labels = newsgroups_data.target[:5000]

train_texts, val_texts, train_labels, val_labels = train_test_split(
    texts, labels, test_size=0.2, random_state=42
)


# === Pré-processamento ===
def preprocess_text(text):
    text = text.lower()
    text = re.sub(r'[^a-z\s]', '', text)
    tokens = text.split()
    return tokens


def build_vocab(texts, min_freq=1):
    word_freq = {}
    for text in texts:
        tokens = preprocess_text(text)
        for token in tokens:
            word_freq[token] = word_freq.get(token, 0) + 1

    vocab = {'<pad>': 0, '<unk>': 1}
    index = 2
    for word, freq in word_freq.items():
        if freq >= min_freq:
            vocab[word] = index
            index += 1
    return vocab


# === Vocabulário ===
vocab = build_vocab(train_texts)
vocab_size = len(vocab)


# === Dataset ===
class NewsGroupsDataset(Dataset):
    def __init__(self, texts, labels, vocab, max_len):
        self.texts = texts
        self.labels = labels
        self.vocab = vocab
        self.max_len = max_len

    def __len__(self):
        return len(self.labels)

    def encode_text(self, text):
        tokens = preprocess_text(text)
        token_ids = [self.vocab.get(token, self.vocab['<unk>']) for token in tokens]
        if len(token_ids) > self.max_len:
            token_ids = token_ids[:self.max_len]
        else:
            token_ids += [self.vocab['<pad>']] * (self.max_len - len(token_ids))
        return torch.tensor(token_ids, dtype=torch.long)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]
        token_ids = self.encode_text(text)
        return token_ids, label


# === DataLoaders ===
train_dataset = NewsGroupsDataset(train_texts, train_labels, vocab, max_len)
val_dataset = NewsGroupsDataset(val_texts, val_labels, vocab, max_len)

train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=batch_size)

In [None]:
# Sua classe aqui

In [None]:
# # Loss e otimizador
# criterion = nn.CrossEntropyLoss()
# optimizer = torch.optim.Adam(model.parameters(), lr=lr)

# # Loop de treino
# for epoch in range(num_epochs):
#     model.train()
#     total_loss, correct, total = 0, 0, 0
    
#     for inputs, labels in train_dataloader:
#         inputs, labels = inputs.to(device), labels.to(device)

#         # Forward
#         outputs = model(inputs)
#         loss = criterion(outputs, labels)

#         # Backward
#         optimizer.zero_grad()
#         loss.backward()
#         optimizer.step()

#         # Métricas
#         total_loss += loss.item()
#         preds = outputs.argmax(dim=1)
#         correct += (preds == labels).sum().item()
#         total += labels.size(0)

#     train_acc = correct / total

#     # Validação
#     model.eval()
#     val_loss, val_correct, val_total = 0, 0, 0
#     with torch.no_grad():
#         for inputs, labels in val_dataloader:
#             inputs, labels = inputs.to(device), labels.to(device)

#             outputs = model(inputs)
#             loss = criterion(outputs, labels)

#             val_loss += loss.item()
#             preds = outputs.argmax(dim=1)
#             val_correct += (preds == labels).sum().item()
#             val_total += labels.size(0)

#     val_acc = val_correct / val_total

#     print(f"Epoch {epoch+1}/{num_epochs} | "
#           f"Train Loss: {total_loss:.3f}, Train Acc: {train_acc:.3f} | "
#           f"Val Loss: {val_loss:.3f}, Val Acc: {val_acc:.3f}")

### Exercício 2: Modelo de Linguagem com Transformer Decoder

Implemente um **modelo de linguagem baseado apenas no Decoder do Transformer** (estilo *decoder-only*).  

O modelo deve:  
- Receber uma sequência de tokens como entrada.  
- Utilizar máscara **causal** na auto-atenção, garantindo que cada posição só acesse os tokens anteriores e o próprio token.  
- Prever o **próximo token em cada posição** (treinamento por *next token prediction*).  

O corpus de treino será o fornecido na variável `corpus`.  
Seu objetivo é treinar o modelo para gerar frases coerentes em português a partir de um prompt inicial.  

In [None]:
import requests

def load_corpus_from_url(url):
    response = requests.get(url)
    response.raise_for_status()  # dispara erro se a requisição falhar
    text = response.text
    return text

# Exemplo de uso
url = "https://raw.githubusercontent.com/wess/iotr/master/lotr.txt"
corpus = load_corpus_from_url(url)[:30000]

print("Tamanho do corpus:", len(corpus))

# Tokenize o texto
tokens = re.findall(r'\b\w+\b', corpus.lower())

# Constrói o vocabulário
word_counts = Counter(tokens)
vocab = sorted(word_counts.keys())

special_tokens = ["<pad>", "<unk>", "<sos>", "<eos>"]
word2idx = {tok: idx for idx, tok in enumerate(special_tokens, start=0)}

for word in vocab:
    if word not in word2idx:  # evita colisão
        word2idx[word] = len(word2idx)

idx2word = {idx: word for word, idx in word2idx.items()}
vocab_size = len(word2idx)

# Converte tokens para índices
indices = [word2idx.get(w, word2idx["<unk>"]) for w in tokens]

# Gera as sequências
sequence_length = 10
inputs, targets = [], []

for i in range(len(indices) - seq_len):
    seq = indices[i:i+seq_len]
    tgt = indices[i+1:i+seq_len+1]

    # insere <sos> no início do input, <eos> no fim do target
    seq = [word2idx["<sos>"]] + seq
    tgt = tgt + [word2idx["<eos>"]]

    inputs.append(seq)
    targets.append(tgt)

inputs = torch.tensor(inputs, dtype=torch.long)
targets = torch.tensor(targets, dtype=torch.long)

# Cria o dataset e o dataloader
batch_size = 32
dataset = TensorDataset(inputs, targets)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

In [None]:
# Seu modelo de linguagem aqui

In [None]:
# criterion = nn.CrossEntropyLoss()
# optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

# epochs = 20
# for epoch in range(epochs):
#     total_loss = 0
#     for x, y in dataloader:
#         x, y = x.to(device), y.to(device)
#         mask = causal_mask(x.size(1)).to(device)
#         logits = model(x, mask=mask)
#         loss = criterion(logits.view(-1, vocab_size), y.view(-1))

#         optimizer.zero_grad()
#         loss.backward()
#         optimizer.step()

#         total_loss += loss.item()
#     print(f"Epoch {epoch+1}, Loss: {total_loss:.3f}")

In [None]:
# def generate_text(model, prompt, word2idx, idx2word, max_new_tokens=20, device="cpu"):
#     model.eval()
    
#     # Converte prompt em índices
#     tokens = re.findall(r'\b\w+\b', prompt.lower())
#     ids = torch.tensor([[word2idx.get(tok, word2idx["<unk>"]) for tok in tokens]], device=device)

#     for _ in range(max_new_tokens):
#         # Máscara causal
#         mask = causal_mask(ids.size(1)).to(device)

#         # Forward
#         with torch.no_grad():
#             logits = model(ids, mask=mask)  # (1, T, vocab_size)

#         # Pega último token previsto (greedy)
#         next_id = logits[:, -1, :].argmax(-1).unsqueeze(0)

#         # Concatena ao input
#         ids = torch.cat([ids, next_id], dim=1)

#     # Decodifica para palavras
#     out_tokens = [idx2word[i.item()] for i in ids[0]]
#     return " ".join(out_tokens)

In [None]:
# prompt = "the"
# generated = generate_text(model, prompt, word2idx, idx2word, max_new_tokens=10, device=device)
# print("Generated:", generated)