# Transformers

### Ideia do artigo "Attention is all you need"


In [4]:
import numpy as np
import torch
from torch import nn
from torch import Tensor

### Implementando o Position Encoding

In [20]:
# Copiado do artigo: https://medium.com/the-dl/transformers-from-scratch-in-pytorch-8777e346ca51
# Créditos: Frank Odom

def position_encoding(
    seq_len: int, dim_model: int, device: torch.device = torch.device("cpu"),
) -> Tensor:
    pos = torch.arange(seq_len, dtype=torch.float, device=device).reshape(1, -1, 1)
    dim = torch.arange(dim_model, dtype=torch.float, device=device).reshape(1, 1, -1)
    phase = pos / (1e4 ** (dim / dim_model))

    return torch.where(dim.long() % 2 == 0, torch.sin(phase), torch.cos(phase))

### Implementando o mecanismo de Attention

![Attention Mechanism](attention.png)

In [6]:
class AttentionHead(nn.Module):
    def __init__(self, q_dim: int, k_dim: int, input_dim: int):
        super().__init__()
        self.q_linear_layer = nn.Linear(input_dim, q_dim)
        self.k_linear_layer = nn.Linear(input_dim, k_dim)
        self.v_linear_layer = nn.Linear(input_dim, k_dim)

    def forward(self, q: Tensor, k: Tensor, v: Tensor) -> Tensor:
        return self.__scaled_dot_product_attention(
            self.q_linear_layer(q),
            self.k_linear_layer(k),
            self.v_linear_layer(v)
        )
    
    def __scaled_dot_product_attention(self, q: Tensor, k: Tensor, v: Tensor) -> Tensor:
        # Obtendo produto escalar
        product_q_k = torch.bmm(q, k.transpose(1, 2))
    
        # Scaling do produto
        scaled_params = product_q_k / (k.size(-1) ** 0.5)
    
        # Função softmax com os parâmetros obtidos e obtendo o produto escalar com V
        logits = nn.functional.softmax(scaled_params)
        result = torch.bmm(logits, v)
    
        return result
    

In [7]:
class MultiAttentionHead(nn.Module):
    def __init__(self, num_heads: int, q_dim: int, k_dim: int, input_dim: int):
        super().__init__()
        self.num_heads = num_heads
    
        # Criando uma lista de módulo com 'N' AttentionHeads
        self.attention_heads = nn.ModuleList(
            np.full(num_heads, AttentionHead(q_dim, k_dim, input_dim))
        )
    
        self.linear_layer = nn.Linear(num_heads * k_dim, input_dim)

    def forward(self, q: Tensor, k: Tensor, v: Tensor) -> Tensor:
        return self.linear_layer(
            torch.cat([attention_head(q, k, v) for attention_head in self.attention_heads], dim = -1)
        )

### Implementando o módulo residual

In [8]:
class Residual(nn.Module):
    def __init__(self, sublayer: nn.Module, dimension: int, dropout: float = 0.1):
        super().__init__()
        self.sublayer = sublayer
        self.norm = nn.LayerNorm(dimension)
        self.dropout = nn.Dropout(dropout)

    def forward(self, *tensors: Tensor) -> Tensor:
        return self.norm(tensors[0] + self.dropout(self.sublayer(*tensors)))

### Implementando o encoder e decoder

In [9]:
class Encoder(nn.Module):
    def __init__(
        self, 
        model_dim: int = 512,
        ff_dim: int = 2048, 
        num_heads: int = 4,
        dropout: float = 0.1
    ):
        super().__init__()
        q_dim = max(model_dim // num_heads, 1)
        k_dim = max(model_dim // num_heads, 1)

        self.attention_layer = Residual(
            MultiAttentionHead(num_heads, q_dim, k_dim, input_dim = model_dim),
            model_dim,
            dropout
        )

        self.ff_layer = Residual(
            self.__instantiate_feed_forward(model_dim, ff_dim),
            model_dim,
            dropout
        )

    def forward(self, x: Tensor) -> Tensor:
        attention_output = self.attention_layer(x, x, x)
        ff_output = self.ff_layer(attention_output)
        return ff_output
    
    def __instantiate_feed_forward(self, dim_input: int = 512, dim_feedforward: int = 2048) -> nn.Module:
        return nn.Sequential(
            nn.Linear(dim_input, dim_feedforward),
            nn.ReLU(),
            nn.Linear(dim_feedforward, dim_input),
        )


In [10]:
class Decoder(nn.Module):
    def __init__(
        self, 
        model_dim: int = 512,
        ff_dim: int = 2048, 
        num_heads: int = 4,
        dropout: float = 0.1
    ):
        super().__init__()
        q_dim = max(model_dim // num_heads, 1)
        k_dim = max(model_dim // num_heads, 1)

        self.attention_layer_1 = Residual(
            MultiAttentionHead(num_heads, q_dim, k_dim, input_dim = model_dim),
            model_dim,
            dropout
        )

        self.attention_layer_2 = Residual(
            MultiAttentionHead(num_heads, q_dim, k_dim, input_dim = model_dim),
            model_dim,
            dropout
        )

        self.ff_layer = Residual(
            self.__instantiate_feed_forward(model_dim, ff_dim),
            model_dim,
            dropout
        )

    def forward(self, x: Tensor, memory: Tensor) -> Tensor:
        layer_1_output = self.attention_layer_1(x, x, x)
        layer_2_output = self.attention_layer_2(layer_1_output, memory, memory)
        ff_layer_output = self.ff_layer(layer_2_output)
        return ff_layer_output

    def __instantiate_feed_forward(self, dim_input: int = 512, dim_feedforward: int = 2048) -> nn.Module:
        return nn.Sequential(
            nn.Linear(dim_input, dim_feedforward),
            nn.ReLU(),
            nn.Linear(dim_feedforward, dim_input),
        )

### Implementando o Transformer

![Transformer Architecture](transformer.png)

In [11]:
class Transformer(nn.Module):
    def __init__(
        self,
        num_encoder_layers = 4,
        num_decoder_layers = 4,
        model_dim: int = 512,
        ff_dim: int = 2048, 
        num_heads: int = 4,
        dropout: float = 0.1
    ):
        super().__init__()

        self.encoders = nn.ModuleList(
            np.full(
                num_encoder_layers,
                Encoder(model_dim, ff_dim, num_heads, dropout)
            )
        )

        self.decoders = nn.ModuleList( 
            np.full(
                num_decoder_layers,
                Decoder(model_dim, ff_dim, num_heads, dropout)
            )
        )

        self.external_ff = nn.Linear(model_dim, model_dim)

    def forward(self, x_1: Tensor, x_2: Tensor) -> Tensor:

        seq_len_x1, dim_len_x1 = x_1.size(1),  x_1.size(2)
        x_1 += position_encoding(seq_len_x1, dim_len_x1)

        seq_len_x2, dim_len_x2 = x_2.size(1),  x_2.size(2)
        x_2 += position_encoding(seq_len_x2, dim_len_x2)

        encoder_output = x_1

        for encoder in self.encoders:
            encoder_output = encoder(encoder_output)

        decoder_output = x_2

        for decoder in self.decoders:
            decoder_output = decoder(decoder_output, encoder_output)

        y = torch.softmax(self.external_ff(decoder_output), dim = -1)
        
        return y


In [19]:
x_1 = torch.rand(16, 16, 512)
x_2 = torch.rand(16, 16, 512)
y = Transformer()(x_1, x_2)
y.shape

  logits = nn.functional.softmax(scaled_params)


torch.Size([16, 16, 512])