# Notebook 11 — Pipeline com **token positions** (token embeddings + positional embeddings)

No *Notebook 10*, vimos que uma representação de frase baseada em **média dos embeddings** (mean pooling) vira **bag-of-words**:  
frases com os mesmos tokens em ordens diferentes ficam praticamente indistinguíveis.

Aqui vamos **estender o pipeline** para incluir **positional embeddings** (como em GPT):

1. texto → tokens (`tiktoken`)
2. tokens → IDs
3. IDs → **token embeddings** (`nn.Embedding`)
4. posições → **positional embeddings** (`pos_embedding_layer`)
5. **soma**: `token_emb + pos_emb` → `input_embeddings`

E, por fim, vamos usar um agregador simples que **realmente explora** essas posições para diferenciar:

- **A**: "O gato sobe no tapete."
- **B**: "O tapete sobe no gato."


## 0) Setup

In [1]:
from __future__ import annotations

import random
import sys
import subprocess
from typing import Dict, List, Tuple

import numpy as np
import torch
import torch.nn as nn

import tiktoken

## Verificação da presença de GPU / device

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("device:", device)


device: cpu


## Fixando seed para reproducibilidade

In [3]:
def seed_all(seed: int = 42) -> None:
    """Define seeds para resultados reprodutíveis (CPU e GPU)."""
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

seed_all(42)


## 1) Frases de exemplo + tokenização (`tiktoken`)

In [4]:
tokenizer = tiktoken.get_encoding("gpt2")

frase_A = "O gato sobe no tapete."
frase_B = "O tapete sobe no gato."

def tokenize_with_tiktoken(text: str) -> Tuple[List[int], List[str]]:
    """Tokeniza via tiktoken retornando (raw_ids, tokens_str)."""
    raw_ids = tokenizer.encode(text)
    tokens_str = [tokenizer.decode([i]) for i in raw_ids]
    return raw_ids, tokens_str

raw_ids_A, tokens_A = tokenize_with_tiktoken(frase_A)
raw_ids_B, tokens_B = tokenize_with_tiktoken(frase_B)

print("A tokens:", tokens_A)
print("A raw_ids:", raw_ids_A)
print()
print("B tokens:", tokens_B)
print("B raw_ids:", raw_ids_B)
print()
print("Mesmos raw_ids (multiconjunto)?", sorted(raw_ids_A) == sorted(raw_ids_B))


A tokens: ['O', ' g', 'ato', ' so', 'be', ' no', ' tap', 'ete', '.']
A raw_ids: [46, 308, 5549, 523, 1350, 645, 9814, 14471, 13]

B tokens: ['O', ' tap', 'ete', ' so', 'be', ' no', ' g', 'ato', '.']
B raw_ids: [46, 9814, 14471, 523, 1350, 645, 308, 5549, 13]

Mesmos raw_ids (multiconjunto)? True


## 2) Vocabulário compacto + batch com padding

In [5]:
PAD_TOKEN = "<PAD>"

def build_compact_vocab(raw_id_lists: List[List[int]]) -> Tuple[Dict[int, int], Dict[int, int]]:
    """Cria um mapeamento raw_id -> new_id (vocabulário compacto) com um token <PAD>."""
    uniq_raw_ids = sorted({rid for lst in raw_id_lists for rid in lst})
    # new_id 0 reservado para PAD
    raw_id_to_new: Dict[int, int] = {rid: idx + 1 for idx, rid in enumerate(uniq_raw_ids)}
    new_to_raw_id: Dict[int, int] = {v: k for k, v in raw_id_to_new.items()}
    return raw_id_to_new, new_to_raw_id

raw_id_to_new, new_to_raw_id = build_compact_vocab([raw_ids_A, raw_ids_B])
pad_new_id = 0

def remap_and_pad(raw_ids: List[int], *, raw_id_to_new: Dict[int, int], max_len: int, pad_id: int = 0) -> List[int]:
    """Remapeia raw_ids -> new_ids e aplica padding até max_len."""
    new_ids = [raw_id_to_new[rid] for rid in raw_ids]
    if len(new_ids) > max_len:
        raise ValueError(f"Sequência maior que max_len ({len(new_ids)} > {max_len}).")
    return new_ids + [pad_id] * (max_len - len(new_ids))

max_len = max(len(raw_ids_A), len(raw_ids_B))

new_ids_A = remap_and_pad(raw_ids_A, raw_id_to_new=raw_id_to_new, max_len=max_len, pad_id=pad_new_id)
new_ids_B = remap_and_pad(raw_ids_B, raw_id_to_new=raw_id_to_new, max_len=max_len, pad_id=pad_new_id)

batch_ids = torch.tensor([new_ids_A, new_ids_B], dtype=torch.long, device=device)  # [B, T]
print("batch_ids.shape:", batch_ids.shape)
print(batch_ids)
print()
print("vocab_size_compacto (inclui PAD):", max(raw_id_to_new.values()) + 1)


batch_ids.shape: torch.Size([2, 9])
tensor([[2, 3, 7, 4, 6, 5, 8, 9, 1],
        [2, 8, 9, 4, 6, 5, 3, 7, 1]])

vocab_size_compacto (inclui PAD): 10


## 3) Token embeddings (`nn.Embedding`)

In [6]:
vocab_size = int(max(raw_id_to_new.values()) + 1)  # + PAD(0)
emb_dim = 32

token_embedding_layer = nn.Embedding(num_embeddings=vocab_size, embedding_dim=emb_dim).to(device)

token_embeddings = token_embedding_layer(batch_ids)  # [B, T, D]
print("token_embeddings.shape:", token_embeddings.shape)


token_embeddings.shape: torch.Size([2, 9, 32])


## 4) Positional embeddings (`pos_embedding_layer`) + input embeddings

In [7]:
context_length = max_len  # neste notebook, o "contexto" é o tamanho do batch após padding

pos_embedding_layer = nn.Embedding(num_embeddings=context_length, embedding_dim=emb_dim).to(device)

# IDs de posição: 0..T-1 para cada item do batch
B, T = batch_ids.shape
pos_ids = torch.arange(T, device=device).unsqueeze(0).expand(B, T)  # [B, T]

pos_embeddings = pos_embedding_layer(pos_ids)  # [B, T, D]
print("pos_embeddings.shape:", pos_embeddings.shape)

# Input embeddings no estilo GPT: soma token_emb + pos_emb
input_embeddings = token_embeddings + pos_embeddings
print("input_embeddings.shape:", input_embeddings.shape)


pos_embeddings.shape: torch.Size([2, 9, 32])
input_embeddings.shape: torch.Size([2, 9, 32])


## 5) Importante: mean pooling continua **sem ordem** (mesmo com pos embeddings)

Se você fizer `mean(input_embeddings, dim=1)`, você terá:

\\[ \\text{mean}(\\,tok_i + pos_i\\,) = \\text{mean}(tok_i) + \\text{mean}(pos_i) \\]\n\nComo o termo `mean(pos_i)` depende só do comprimento **T** (e não da ordem dos tokens),
ele será igual para A e B quando ambas tiverem o mesmo tamanho.

Abaixo confirmamos isso numericamente.

In [8]:
@torch.no_grad()
def cosine_similarity(u: torch.Tensor, v: torch.Tensor, eps: float = 1e-12) -> float:
    """Similaridade cosseno entre vetores 1D (torch)."""
    u = u.flatten()
    v = v.flatten()
    num = torch.dot(u, v)
    den = (u.norm() * v.norm()).clamp_min(eps)
    return float((num / den).cpu().item())

@torch.no_grad()
def mean_pool(x: torch.Tensor) -> torch.Tensor:
    """Mean pooling em [B, T, D] -> [B, D]."""
    return x.mean(dim=1)

sent_tok_mean = mean_pool(token_embeddings)          # [B, D]
sent_inp_mean = mean_pool(input_embeddings)          # [B, D]

sim_tok_mean = cosine_similarity(sent_tok_mean[0], sent_tok_mean[1])
sim_inp_mean = cosine_similarity(sent_inp_mean[0], sent_inp_mean[1])

print("cosine(token_mean(A), token_mean(B)) :", sim_tok_mean)
print("cosine(input_mean(A), input_mean(B)) :", sim_inp_mean)
print("(esperado: ~1.0 em ambos os casos)")


cosine(token_mean(A), token_mean(B)) : 1.0
cosine(input_mean(A), input_mean(B)) : 1.0000001192092896
(esperado: ~1.0 em ambos os casos)


## 6) Um agregador simples que *usa* posições (para diferenciar A vs B)

A correção não é "só somar pos embeddings" — você também precisa de um módulo que
**não colapse** a sequência de forma puramente linear/permutação-invariante.

A seguir implementamos um *pooling com pesos* (softmax) onde o score de cada posição
depende do vetor `token_emb + pos_emb`. Como os *pos embeddings* mudam de acordo com a posição,
a associação *token↔posição* afeta o resultado final.

In [9]:
class SimplePositionalSentenceEncoder(nn.Module):
    """Encoder mínimo para demonstrar o papel das posições.

    Pipeline (estilo GPT):
        input_ids -> token_emb -> pos_emb -> soma -> input_embeddings

    Em seguida, produz um vetor de frase via pooling ponderado:
        weights = softmax(score(input_embeddings))
        sent_vec = sum(weights * input_embeddings)
    """

    def __init__(self, vocab_size: int, context_length: int, emb_dim: int) -> None:
        super().__init__()
        self.tok_emb = nn.Embedding(vocab_size, emb_dim)
        self.pos_emb = nn.Embedding(context_length, emb_dim)

        # Um scorer pequeno e não-linear (ordem importa pois pos_emb altera o input)
        self.score = nn.Sequential(
            nn.Linear(emb_dim, emb_dim),
            nn.Tanh(),
            nn.Linear(emb_dim, 1, bias=False),
        )

    def forward(self, input_ids: torch.Tensor, *, use_positional_embeddings: bool = True) -> dict:
        if input_ids.ndim != 2:
            raise ValueError("input_ids deve ter shape [B, T].")

        B, T = input_ids.shape

        tok = self.tok_emb(input_ids)  # [B, T, D]

        if use_positional_embeddings:
            pos_ids = torch.arange(T, device=input_ids.device).unsqueeze(0).expand(B, T)
            pos = self.pos_emb(pos_ids)  # [B, T, D]
        else:
            pos = torch.zeros_like(tok)

        x = tok + pos  # [B, T, D]

        scores = self.score(x).squeeze(-1)      # [B, T]
        weights = torch.softmax(scores, dim=-1) # [B, T]
        sent = torch.sum(weights.unsqueeze(-1) * x, dim=1)  # [B, D]

        return {
            "tok": tok,
            "pos": pos,
            "x": x,
            "weights": weights,
            "sent": sent,
        }

seed_all(123)
encoder = SimplePositionalSentenceEncoder(vocab_size=vocab_size, context_length=context_length, emb_dim=emb_dim).to(device)
encoder.eval()

with torch.no_grad():
    out_no_pos = encoder(batch_ids, use_positional_embeddings=False)
    out_with_pos = encoder(batch_ids, use_positional_embeddings=True)

sim_no_pos = cosine_similarity(out_no_pos["sent"][0], out_no_pos["sent"][1])
sim_with_pos = cosine_similarity(out_with_pos["sent"][0], out_with_pos["sent"][1])

print("cosine(sent_sem_pos(A), sent_sem_pos(B))  :", sim_no_pos)
print("cosine(sent_com_pos(A), sent_com_pos(B))  :", sim_with_pos)

# Um indicador simples de diferença componente-a-componente
delta_no_pos = float((out_no_pos["sent"][0] - out_no_pos["sent"][1]).abs().max().cpu().item())
delta_with_pos = float((out_with_pos["sent"][0] - out_with_pos["sent"][1]).abs().max().cpu().item())

print("\nmax |A - B| (sem pos):", delta_no_pos)
print("max |A - B| (com pos):", delta_with_pos)


cosine(sent_sem_pos(A), sent_sem_pos(B))  : 1.0
cosine(sent_com_pos(A), sent_com_pos(B))  : 0.9894437193870544

max |A - B| (sem pos): 5.960464477539063e-08
max |A - B| (com pos): 0.23108315467834473


## 7) Inspecionando os pesos por posição (intuído)

Os pesos abaixo mostram como o agregador distribui atenção ao longo das posições.
Como `pos_emb` muda o vetor em cada posição, a permutação entre **gato** e **tapete**
altera os *scores* e, consequentemente, o vetor de frase.

In [10]:
def format_weights(weights: torch.Tensor, tokens: List[str]) -> List[Tuple[str, float]]:
    weights = weights.detach().cpu().numpy().tolist()
    return [(t, float(w)) for t, w in zip(tokens, weights)]

# tokens (para exibir) - lembrando que estamos mostrando tokens do tiktoken,
# mas o encoder está recebendo o vocabulário compacto.
print("A:", frase_A)
print(format_weights(out_with_pos["weights"][0], tokens_A))
print()
print("B:", frase_B)
print(format_weights(out_with_pos["weights"][1], tokens_B))


A: O gato sobe no tapete.
[('O', 0.08797570317983627), (' g', 0.13124318420886993), ('ato', 0.10881079733371735), (' so', 0.088450588285923), ('be', 0.1148715615272522), (' no', 0.10215062648057938), (' tap', 0.07443861663341522), ('ete', 0.14828598499298096), ('.', 0.1437729448080063)]

B: O tapete sobe no gato.
[('O', 0.08625047653913498), (' tap', 0.10123278200626373), ('ete', 0.1306297481060028), (' so', 0.08671604841947556), ('be', 0.11261890828609467), (' no', 0.10014741867780685), (' g', 0.11311985552310944), ('ato', 0.1283312290906906), ('.', 0.14095352590084076)]


## Conclusão

- **Token embeddings** sozinhos não carregam ordem.
- **Positional embeddings** permitem injetar informação de posição.
- Porém, para a ordem impactar a representação final, é preciso um módulo que
  *use* essas posições (por exemplo: atenção, convoluções, MLP sobre sequência etc.).