# Transformers - Implementación

Implementación de la arquitectura _Transformer_ propuesta originalmente en el _paper_ seminal "_Attention is All Your Need_".

- [https://arxiv.org/abs/1706.03762](https://arxiv.org/abs/1706.03762)

La implementación se realiza capa a capa en _PyTorch_, lo que hoy en día no es práctico ni necesario, pero resulta útil para afianzar conceptos.

## 1. Setup

Para ejecutar este _notebook_ en local se requiere Python, y se recomienda crear un entorno virtual para instalar las dependencias de manera controlada.

```
uv venv --python=python3.12
source .venv/bin/activate

uv pip install torch==2.6.0

uv pip install jupyterlab ipywidgets
uv run jupyter lab
```

## 2. Modelo

La implementación se realiza siguiendo la arquitectura original propuesta en el _paper_.

- [https://github.com/jcmellado/sapiens/blob/main/notebooks/transformers/transformers.ipynb](https://github.com/jcmellado/sapiens/blob/main/notebooks/transformers/transformers.ipynb)

### 2.1. Positional Encoding

La capa de _Positional Encoding_ se implementa desde cero ya que _PyTorch_ no la proporciona de forma nativa.

```
In ─> Positional Encoding ─> Dropout ─> Out
```

Recordando la expresión de cálculo de _Positional Encoding_:

- $ \operatorname{PE}(i, j) = \begin{cases} 
\sin\left( \dfrac{i}{10000^{j/d}} \right), & \text{si } j \text{ es par} \\
\cos\left( \dfrac{i}{10000^{(j-1)/d}} \right), & \text{si } j \text{ es impar}
\end{cases} $

La implementación utilizada para calcular los denominadores es un tanto distinta de la propuesta en el _paper_ original, por cuestiones de precisión numérica, pero equivalente.

- $ e^{-a \ln(b)} = \cfrac{1}{e^{a \ln(b)}} = \cfrac{1}{e^{\ln(b^a)}} = \cfrac{1}{b^a} $

- $ e^{-\frac{j}{d} \ln(10000)} = \cfrac{1}{e^{\frac{j}{d} \ln(10000)}} = \cfrac{1}{e^{\ln(10000^{j/d})}} = \cfrac{1}{10000^{j/d}} $

La tabla con los valores calculados se registra como un _buffer_ para que durante el entrenamiento se trate como un parámetro y no se inicialice ni actualice.

La capa de _dropout_ es utilizada habitualmente para prevenir el _overfitting_ y es mencionada en el _paper_ original. Aunque en algunas arquitecturas más recientes estas capas se están eliminando de los modelos en algunos casos.

Por otra parte, en desarrollos actuales es común utilizar una técnica alternativa llamada _Rotary Positional Embeddings_ (RoPE).

In [1]:
import torch
import torch.nn as nn
import math

class PositionalEncoding(nn.Module):

  def __init__(
    self,
    dim_embeddings: int,
    max_sequence_length: int,
    dropout: float
  ):
    super().__init__()

    position = torch.arange(0, max_sequence_length, dtype=torch.float).unsqueeze(1)
    div_term = torch.exp(torch.arange(0, dim_embeddings, 2).float() * (-math.log(10000.0) / dim_embeddings))

    pe = torch.zeros(max_sequence_length, dim_embeddings)
    pe[:, 0::2] = torch.sin(position * div_term)
    pe[:, 1::2] = torch.cos(position * div_term)
    pe = pe.unsqueeze(0)

    self.register_buffer('positional_encoding_table', pe)

    self.dropout = nn.Dropout(p=dropout)

  def forward(self, embeddings: torch.Tensor) -> torch.Tensor:
    positional_encoding = self.positional_encoding_table[:, :embeddings.size(1), :]

    return self.dropout(embeddings + positional_encoding)

### 2.2. Feed Forward

La capa de _Feed Forward_ se implementa como un módulo de forma independiente para poder utilizarlo en el _encoder_ y el _decoder_.

```
In ─> Linear ─> ReLU ─> Dropout ─> Linear ─> Out
```

Se utiliza la función de activación ReLU siguiendo el _paper_ original, aunque en arquitecturas más recientes es habitual utilizar otras funciones.

Se aplica una capa de _dropout_ a la salida de la función de activación.

In [2]:
class FeedForward(nn.Module):

  def __init__(
    self,
    dim_model: int,
    dim_expand: int,
    dropout: float
  ):
    super().__init__()

    self.feed_forward = nn.Sequential(
      nn.Linear(in_features=dim_model, out_features=dim_expand),
      nn.ReLU(),
      nn.Dropout(p=dropout),
      nn.Linear(in_features=dim_expand, out_features=dim_model)
    )

  def forward(self, x: torch.Tensor) -> torch.Tensor:
    return self.feed_forward(x)

### 2.3. Encoder

La implementación del _encoder_ se realiza siguiendo el diseño del _paper_ original.

```
In ─> MHA ─> Dropout ─> Add ─> Norm ─> FF ─> Dropout ─> Add ─> Norm ─> Out
   │                     ↑          │                    ↑
   └─────────────────────┘          └────────────────────┘
```

El parámetro `batch_first` se utiliza para que la atención se retorne con las dimensiones en el orden esperado (_batch_, _secuencia_, _embedding_).

El parámetro `need_weights` se utiliza para que _PyTorch_ no retorne la matriz de pesos al calcular la atención, lo que aumenta el rendimiento.

Se aplican capas de _dropout_ a la salida de cada capa.

Las capas de normalización se aplican después de las conexiones residuales, pero en arquitecturas más recientes es habitual aplicarlas antes de las capas de atención y _feed forward_.

Al entrenar el modelo con _batches_, cada secuencia del _batch_ puede tener una longitud distinta, por lo que se proporciona una matriz de _padding_ que enmascara los _tokens_ del final de cada secuencia para que tengan la misma longitud que la secuencia más larga.

In [3]:
class Encoder(nn.Module):

  def __init__(
    self,
    dim_embeddings: int,
    num_heads: int,
    dim_feedforward: int,
    dropout: float
  ):
    super().__init__()

    self.multi_head_attention = nn.MultiheadAttention(
        embed_dim=dim_embeddings, num_heads=num_heads, batch_first=True)
    self.dropout_1 = nn.Dropout(p=dropout)
    self.norm_1 = nn.LayerNorm(normalized_shape=dim_embeddings)

    self.feed_forward = FeedForward(dim_model=dim_embeddings, dim_expand=dim_feedforward, dropout=dropout)
    self.dropout_2 = nn.Dropout(p=dropout)
    self.norm_2 = nn.LayerNorm(normalized_shape=dim_embeddings)

  def forward(self, embeddings_pe: torch.Tensor, padding_mask: torch.Tensor) -> torch.Tensor:
    attention, _ = self.multi_head_attention(
        query=embeddings_pe, key=embeddings_pe, value=embeddings_pe, key_padding_mask=padding_mask, need_weights=False)
    dropout_1 = self.dropout_1(attention)
    residual_1 = embeddings_pe + dropout_1
    norm_1 = self.norm_1(residual_1)

    feedforward = self.feed_forward(norm_1)
    dropout_2 = self.dropout_2(feedforward)
    residual_2 = norm_1 + dropout_2
    output = self.norm_2(residual_2)

    return output

### 2.4. Encoder Stack

La pila de _encoders_ se implementa como un módulo de _PyTorch_.

La capa de _embeddings_ es compartida con el _decoder_, por lo que se pasa como parámetro en el constructor. Siguiendo las indicaciones del _paper_ original, la salida de esta capa se multiplica por la raíz cuadrada de las dimensiones de los _embeddings_.

In [4]:
class EncoderStack(nn.Module):

  def __init__(
    self,
    num_encoders: int,
    embedding: nn.Module,
    dim_embeddings: int,
    max_sequence_length: int,
    num_heads: int,
    dim_feedforward: int,
    dropout: float
  ):
    super().__init__()

    self.embedding = embedding
    self.dim_embeddings = dim_embeddings
    self.positional_encoding = PositionalEncoding(
      dim_embeddings=dim_embeddings, max_sequence_length=max_sequence_length, dropout=dropout)

    encoders = [
      Encoder(dim_embeddings=dim_embeddings, num_heads=num_heads, dim_feedforward=dim_feedforward, dropout=dropout)
      for _ in range(num_encoders)
    ]
    self.encoders = nn.ModuleList(encoders)

  def forward(self, tokens: torch.Tensor, padding_mask: torch.Tensor) -> torch.Tensor:
    embeddings = self.embedding(tokens) * math.sqrt(self.dim_embeddings)
    embeddings_pe = self.positional_encoding(embeddings)

    output = embeddings_pe

    for encoder in self.encoders:
      output = encoder(output, padding_mask)

    return output

### 2.5. Decoder

El _decoder_ se implementa siguiendo el diseño del _paper_ original.

```
In ─> MHA ─> DO ─> Add ─> Norm ─> MHA ─> DO ─> Add ─> Norm ─> FF ─> DO ─> Add ─> Norm ─> Out
   │                ↑          │                ↑          │               ↑
   └────────────────┘          └────────────────┘          └───────────────┘
```

Se crea una máscara de atención `attn_mask` para no tener en cuenta los elementos correspondientes a los _tokens_ de la secuencia que aún no se han generado.

- $ \begin{bmatrix}
\text{false} & \text{true} & \text{true} & \ldots & \text{true}
\\ \text{false} & \text{false} & \text{true} & \ldots & \text{true}
\\ \vdots & \vdots & \vdots & \ddots & \text{true}
\\ \text{false} & \text{false} & \text{false} & \ldots & \text{false}
\end{bmatrix} $

Al entrenar el modelo con _batches_, cada secuencia del _batch_ puede tener una longitud distinta, por lo que se proporcionan matrices de _padding_ que enmascaran los _tokens_ del final de cada secuencia para que tengan la misma longitud que la secuencia más larga.

El resto de parámetros son similares a los del _encoder_.

In [5]:
class Decoder(nn.Module):

  def __init__(
    self,
    dim_embeddings: int,
    num_heads: int,
    dim_feedforward: int,
    dropout: float
  ):
    super().__init__()

    self.masked_attention = nn.MultiheadAttention(
        embed_dim=dim_embeddings, num_heads=num_heads, batch_first=True)
    self.dropout_1 = nn.Dropout(p=dropout)
    self.norm_1 = nn.LayerNorm(normalized_shape=dim_embeddings)
 
    self.attention = nn.MultiheadAttention(
        embed_dim=dim_embeddings, num_heads=num_heads, batch_first=True)
    self.dropout_2 = nn.Dropout(p=dropout)
    self.norm_2 = nn.LayerNorm(normalized_shape=dim_embeddings)
 
    self.feed_forward = FeedForward(dim_model=dim_embeddings, dim_expand=dim_feedforward, dropout=dropout)
    self.dropout_3 = nn.Dropout(p=dropout)
    self.norm_3 = nn.LayerNorm(normalized_shape=dim_embeddings)

  def forward(self, embeddings_pe: torch.Tensor, decoder_padding_mask: torch.Tensor,
              context: torch.Tensor, encoder_padding_mask: torch.Tensor
  ) -> torch.Tensor:
    sequence_len = embeddings_pe.size(1)
    attn_mask = torch.triu(torch.ones(sequence_len, sequence_len, dtype=torch.bool), diagonal=1).to(embeddings_pe.device)
      
    self_attention, _ = self.masked_attention(
        query=embeddings_pe, key=embeddings_pe, value=embeddings_pe, attn_mask=attn_mask, is_causal=True,
        key_padding_mask=decoder_padding_mask, need_weights=False)
    dropout_1 = self.dropout_1(self_attention)
    residual_1 = embeddings_pe + dropout_1
    norm_1 = self.norm_1(residual_1)

    cross_attention, _ = self.attention(
        query=norm_1, key=context, value=context,
        key_padding_mask=encoder_padding_mask, need_weights=False)
    dropout_2 = self.dropout_2(cross_attention)
    residual_2 = norm_1 + dropout_2
    norm_2 = self.norm_2(residual_2)

    feedforward = self.feed_forward(norm_2)
    dropout_3 = self.dropout_3(feedforward)
    residual_3 = norm_2 + dropout_3
    output = self.norm_3(residual_3)
 
    return output

### 2.6. Decoder Stack

La pila de _decoders_ se implementa como un módulo.

La capa de _embeddings_ es compartida con el _encoder_, por lo que se pasa como parámetro en el constructor. Siguiendo las indicaciones del _paper_ original, la salida de esta capa se multiplica por la raíz cuadrada de las dimensiones de los _embeddings_.

In [6]:
class DecoderStack(nn.Module):

  def __init__(
    self,
    num_decoders: int,
    embedding: nn.Module,
    dim_embeddings: int,
    max_sequence_length: int,
    num_heads: int,
    dim_feedforward: int,
    dropout: float
  ):
    super().__init__()

    self.embedding = embedding
    self.dim_embeddings = dim_embeddings
    self.positional_encoding = PositionalEncoding(
      dim_embeddings=dim_embeddings, max_sequence_length=max_sequence_length, dropout=dropout
    )

    decoders = [
      Decoder(dim_embeddings=dim_embeddings, num_heads=num_heads, dim_feedforward=dim_feedforward, dropout=dropout)
        for _ in range(num_decoders)
    ]
    self.decoders = nn.ModuleList(decoders)

  def forward(self, tokens: torch.Tensor, decoder_padding_mask: torch.Tensor,
              context: torch.Tensor, encoder_padding_mask: torch.Tensor
  ) -> torch.Tensor:
    embeddings = self.embedding(tokens) * math.sqrt(self.dim_embeddings)
    embeddings_pe = self.positional_encoding(embeddings)

    output = embeddings_pe

    for decoder in self.decoders:
      output = decoder(output, decoder_padding_mask, context, encoder_padding_mask)

    return output

### 2.7. Transformer

El _transformer_ se implementa siguiendo el _paper_ original.

```
Input ─> Encoder x N ─> Decoder x M ─> Linear ─> Output
```

La capa de _embedding_ es compartida por el _stack_ de _encoders_, el _stack_ de _decoders_, y su matriz de pesos con la capa lineal.

Al entrenar el modelo con _batches_, cada secuencia del _batch_ puede tener una longitud distinta, por lo que se construye una matriz de _padding_ que enmascara los _tokens_ del final de cada secuencia para que tengan la misma longitud que la secuencia más larga.

La salida del _transformer_ son los habitualmente llamados "_logits_" (valores de entrada a la función _SoftMax_), en vez de una secuencia de _tokens_. Esto facilita el entrenamiento, ya que habitualmente se utiliza la entropía cruzada como función de pérdida. Esta función aplica la función _SoftMax_, por lo que recibe directamente los _logits_ de la salida del modelo, sin necesidad de llamar a _SoftMax_ dentro del mismo.

In [7]:
class Transformer(nn.Module):

  def __init__(
    self,
    vocabulary_size: int,
    dim_embeddings: int,
    pad_token_id: int,
    max_sequence_length: int,
    num_encoders: int,
    num_decoders: int,
    num_heads: int,
    dim_feedforward: int,
    dropout: float
  ):
    super().__init__()

    self.pad_token_id = pad_token_id

    self.embedding = nn.Embedding(
      num_embeddings=vocabulary_size, embedding_dim=dim_embeddings, padding_idx=pad_token_id
    )
        
    self.encoder_stack = EncoderStack(
      num_encoders=num_encoders,
      embedding=self.embedding,
      dim_embeddings=dim_embeddings,
      max_sequence_length=max_sequence_length,
      num_heads=num_heads,
      dim_feedforward=dim_feedforward,
      dropout=dropout
    )

    self.decoder_stack = DecoderStack(
      num_decoders=num_decoders,
      embedding=self.embedding,
      dim_embeddings=dim_embeddings,
      max_sequence_length=max_sequence_length,
      num_heads=num_heads,
      dim_feedforward=dim_feedforward,
      dropout=dropout
    )
      
    self.linear = nn.Linear(in_features=dim_embeddings, out_features=vocabulary_size, bias=False)
    self.linear.weight = self.embedding.weight

  def forward(self, input_sequence: torch.Tensor, output_sequence: torch.Tensor) -> torch.Tensor:
    encoder_padding_mask = (input_sequence == self.pad_token_id)
    encoder_context = self.encoder_stack(input_sequence, encoder_padding_mask)

    decoder_padding_mask = (output_sequence == self.pad_token_id)
    decoder_output = self.decoder_stack(output_sequence, decoder_padding_mask, encoder_context, encoder_padding_mask)

    logits = self.linear(decoder_output)

    return logits