<a href="https://colab.research.google.com/github/isa-pinheiro/transformers-implementation/blob/main/transformers_implementation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [14]:
import torch
import torch.nn as nn
import numpy as np

to implement
1. encoder
2. decoder

details
1. word embedding
2. positional encoding
3. multiheaded attention
4. feed foward
5. cross attention

block
1. multihead attention + add & norm
2. masked multihead attention + add & norm
3. feedfoward + add & norm

dropout
- aplicado no final de cada sublayer (antes de ser adicionada ao input e normalizado)
- adicionado ao somatório do embedding e do positional encoding

In [15]:
class InputEmbedding(nn.Module):
    def __init__(self, d_model, vocab_size):
        # d_model tamanho do vetor para cada palavra após o embedding
        # vocab_size tokens com o dataset de sentence pairs
        super().__init__()
        self.d_model = d_model
        self.vocab_size = vocab_size
        self.embedding = nn.Embedding(self.vocab_size, self.d_model)

    def forward(self, x):
        x = self.embedding(x) # !!não aceita diretamente texto, precisa de que seja transfromado em valores numéricos
        x = x *  np.sqrt(self.d_model)
        return x


In [16]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, seq_len, dropout = 0.1):
        # d_model é o tamanho do vetor para cada ser somado a palavra (tamanho embedding)
            # attention is all you need = 512
        # seq_len é o tamanho da sentença do input
            # attention is all you need usou sentence pairs de aproximadamente 25k tokens
        # dropout é a regularização usada
            # p = 0.1 no attention is all you need
        super().__init__()
        self.d_model = d_model
        self.seq_len = seq_len
        self.dropout = nn.Dropout(dropout)

        pe = torch.empty(seq_len, d_model)

        # pos = torch.arange(0, seq_len, dtype=torch.float).unsqueeze(1) # (seq_len, 1)
        # pos_exp = pos * torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))

        pos = torch.arange(seq_len).unsqueeze(1)
        pos_exp = pos / torch.pow(10000, 2 * torch.arange(0, d_model, 2) / d_model) # (seq_len, d_model)

        pe[:, 0::2] = torch.sin(pos_exp) # pega somente
        pe[:, 1::2] = torch.cos(pos_exp)

        pe = pe.unsqueeze(0) # (1, seq_len, d_model)

        self.register_buffer('pe', pe)

    def forward(self,x):
        x = x + (self.pe[:, :x.shape[1]])
        x = self.dropout(x)
        return x




In [17]:
# separa as informações em heads de forma que cada parte seja processada individualmente e aprenda informações diferentes
# precisa -> dimensão do modelo (quantas tem no embedding)
#         -> quantidade de heads (deve ser um dos divisores da dimensão do modelo)
#         -> matrizes para encontrar o query, key e value (precisa de uma camada linear)
#         -> saída é uma camada linear també m
class MultiHeadedAttention(nn.Module):
    def __init__(self, d_model, heads, dropout = 0.1):
        super().__init__()
        self.d_model = d_model
        self.heads = heads

        if self.d_model // self.heads == self.d_model / self.heads:
            self.d_head = self.d_model // self.heads
        else:
            print('A dimensão do modelo deve ser divisível pela quantidade de heads!')

        # pesos para definir query, key e value
        self.wquery = nn.Linear(d_model, d_model, bias=False)
        self.wkey = nn.Linear(d_model, d_model, bias=False)
        self.wvalue = nn.Linear(d_model, d_model, bias=False)

        self.dropout = nn.Dropout(dropout)

        self.woutput = nn.Linear(d_model, d_model, bias=False)

    @staticmethod
    def self_attention(query, key, value, mask, dropout):
        d_head = query.shape[-1]

        score_matrix = (query @ key.transpose(-2, -1)) / np.sqrt(d_head) # define os score de atenção, pelas interações de query e key
                                                                        # relação de palavra por palavra (batch, head, seq_len, seq_len)
        if mask:
            score_matrix.masked_fill_(mask == 0, -torch.inf)
        score_matrix = score_matrix.softmax(dim=-1) # (batch, head, seq_len, seq_len)

        if dropout:
            score_matrix = dropout(score_matrix)

        final_score_matrix = score_matrix @ value

        return final_score_matrix, score_matrix


    def forward(self, q, k, v, mask):
        # q é a entrada de dimensão (batch, seq_len, d_model)
        #
        query = self.wquery(q) # (batch, seq_len, d_model)
        key = self.wkey(k)
        value = self.wvalue(v)

        query = query.view(query.shape[0], query.shape[1], self.heads, self.d_head) # mantem as duas dimensões iniciais e divide a última
        key = key.view(key.shape[0], key.shape[1], self.heads, self.d_head)         # com a quantidade de heads e da dimensão das heads
        value = value.view(value.shape[0], value.shape[1], self.heads, self.d_head) # (batch, seq_len, heads, d_head)

        query = query.transpose(1,2)    # troca o heads com o seq_len
        key = key.transpose(1,2)        # (batch, heads, seq_len, d_head)
        value = value.transpose(1,2)    # tem toda a frase, mas apenas parte do embedding. só aprende com parte do embedding

        x, self.score_matrix = MultiHeadedAttention.self_attention(query, key, value, mask, self.dropout)

        x = x.transpose(1, 2) # retorna heads e o seq_len para as posições iniciais (batch, seq_len, heads, d_head)
        x = x.contiguous()
        x = x.view(x.shape[0], -1, self.heads * self.d_head)

        x = self.woutput(x)

        return x

In [18]:
# pointwise feedfoward
# rede feed foward completamente conectada
    # duas transformações lineares com uma relu no meio
# aplica dropout - 0.1
# FFN(x) = max(0, xW1 + b1)W2 + b2
class PointwiseFeedFoward(nn.Module):
    def __init__(self, d_model, dff, dropout = 0.1):
        super().__init__()
        self.dropout = nn.Dropout(dropout)
        self.linear1 = nn.Linear(d_model, dff) # W1, b1
        self.linear2 = nn.Linear(dff, d_model) # W2, b2

    def forward(self, x):
        x = self.linear1(x)
        x = torch.relu(x)
        x = self.dropout(x)
        x = self.linear2(x)
        return x

In [19]:
# norm_xj = (xj  - mean_j / (sqrt(sqrd(std) + epsilon))) * gamma + beta ; j varia com os batches
# epsilon é usado para estabilizar a normalização, valores não explodirem se o desvio padrão for muito grande
# gamma e beta são valores aprendidos com a backpropagation

class LayerNorm(nn.Module):
    def __init__(self, epsilon = 1e-05):
        super().__init__()
        self.epsilon = epsilon
        self.gamma = nn.Parameter(torch.ones(1))
        self.beta = nn.Parameter(torch.zeros(1))

    def forward(self, x):
        mean = x.mean(dim = -1, keepdim=True)
        std = x.std(dim = -1, keepdim=True)

        x = ((x - mean) / (std + self.epsilon)) * self.gamma + self.beta
        return x


In [22]:
dff = 64
heads = 4
d_model = 16
seq_len = 10
vocab_size = 100

x = torch.randint(0, vocab_size, (1, seq_len))
print("input:", x[0, 9])

embedding_layer = InputEmbedding(d_model, vocab_size)
x_input_embedded = embedding_layer(x)
print("Saída input embeding: \n ", x_input_embedded[0, 9])

pos_encoding = PositionalEncoding(d_model, seq_len, 0.1)
x_pos_encoded = pos_encoding(x_input_embedded)
print("Saída positional encoding: \n", x_pos_encoded[0, 9])

multiheads = MultiHeadedAttention(d_model, heads)
x_multiheads = multiheads(x_pos_encoded, x_pos_encoded, x_pos_encoded, mask=None)
print("Saída MultiHeadedAttention: \n", x_multiheads[0, 9])

layernorm = LayerNorm()
x_norm = layernorm(x_multiheads)
print("Saída layernorm: \n", x_norm[0, 9])

feedforward = PointwiseFeedFoward(d_model, dff)
x_feedforward = feedforward(x_norm)
print("Saída feedforward: \n", x_feedforward[0, 9])


input: tensor(34)
Saída input embeding: 
  tensor([-5.5695, -4.6235,  0.2193, -2.8782, -1.5296,  1.8163, -3.3187, -0.2757,
        -1.9703, -3.2695, -5.4197, -1.3704, -0.9696, -3.6081, -0.0599, -5.7804],
       grad_fn=<SelectBackward0>)
Saída positional encoding: 
 tensor([-5.7305, -6.1496,  1.1141, -2.5073, -1.5997,  3.1248, -3.6774,  0.8047,
        -2.1883, -0.0000, -6.0218, -0.4116, -1.0773, -0.0000, -0.0666, -5.3115],
       grad_fn=<SelectBackward0>)
Saída MultiHeadedAttention: 
 tensor([ 1.1758,  0.6210,  0.8832,  0.7498, -0.6326, -0.5076,  0.2836,  0.7983,
        -1.2233, -0.6680, -0.2716, -0.0599,  0.4851,  0.5932,  0.1017, -0.3571],
       grad_fn=<SelectBackward0>)
Saída layernorm: 
 tensor([ 1.5422,  0.7293,  1.1134,  0.9179, -1.1073, -0.9242,  0.2350,  0.9890,
        -1.9728, -1.1592, -0.5785, -0.2683,  0.5302,  0.6886, -0.0316, -0.7037],
       grad_fn=<SelectBackward0>)
Saída feedforward: 
 tensor([-0.0976,  0.0638,  0.1878, -0.3013,  0.2010, -0.1381,  0.7919,  0.1216

In [21]:
embedding = nn.Embedding(vocab_size, d_model)
print(x.shape)
print(x[0,0])
print(embedding.weight[x[0, 0]])
print(embedding(x)[0,0])

torch.Size([1, 10])
tensor(17)
tensor([ 0.1304,  0.6675, -0.4013, -1.3805, -0.3103,  0.3276, -1.5100, -1.6990,
        -1.0452, -0.9526,  1.2100, -0.7109,  0.6958, -0.5963,  0.2851,  0.5929],
       grad_fn=<SelectBackward0>)
tensor([ 0.1304,  0.6675, -0.4013, -1.3805, -0.3103,  0.3276, -1.5100, -1.6990,
        -1.0452, -0.9526,  1.2100, -0.7109,  0.6958, -0.5963,  0.2851,  0.5929],
       grad_fn=<SelectBackward0>)
