In [5]:
import torch
import torch.nn as nn
import math

In [6]:
d_model = 512
num_heads = 8
d_ff = 2048
batch_size = 32

In [7]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        return x + self.pe[:x.size(0), :]

# Exemplo
max_len = 100
pos_encoding = PositionalEncoding(d_model, max_len)

# (sequence_length, batch_size, d_model)
input_tensor = torch.randn(50, batch_size, d_model)
output_tensor = pos_encoding(input_tensor)

print(output_tensor.shape)  # Output shape: (sequence_length, batch_size, d_model)

torch.Size([50, 32, 512])


In [8]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        # Verifica se o número de dimensões do modelo é divisível pelo número de cabeças
        assert d_model % num_heads == 0

        # Número de dimensões por cabeça
        self.d_k = d_model // num_heads
        self.num_heads = num_heads

        # Inicializa as camadas lineares para Q, K e V
        self.W_q = nn.Linear(d_model, d_model)
        self.W_k = nn.Linear(d_model, d_model)
        self.W_v = nn.Linear(d_model, d_model)
        self.W_o = nn.Linear(d_model, d_model)

    def scaled_dot_product_attention(self, Q, K, V, mask=None):
        # Calcula os scores fazendo o produto escalar entre Q e K e dividindo pela raiz quadrada de d_k
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)

        # Se a máscara for fornecida, aplica a máscara para os scores
        if mask is not None:
            scores = scores.masked_fill(mask == 0, float('-inf'))

        # Calcula a softmax nos scores
        attention = torch.softmax(scores                    , dim=-1)

        # Multiplica a matriz de atenção pelo valor V
        output = torch.matmul(attention, V)
        return output

    def split_heads(self, x):
        # Divide a última dimensão em (num_heads, d_k)
        N, seq_len, d_model = x.size()
        return x.view(N, seq_len, self.num_heads, self.d_k).transpose(1, 2)

    def combine_heads(self, x):
        # Inverte a operação de split_heads
        N, _, seq_len, _ = x.size()
        return x.transpose(1, 2).contiguous().view(N, seq_len, self.num_heads * self.d_k)

    def forward(self, query, key, value, mask=None):
        N = query.shape[0]
        query_len, key_len, value_len = query.shape[1], key.shape[1], value.shape[1]

        # Passa os valores de Q, K e V pela camada linear
        Q = self.split_heads(self.W_q(query))
        K = self.split_heads(self.W_k(key))
        V = self.split_heads(self.W_v(value))

        # Calcula a atenção
        attention = self.scaled_dot_product_attention(Q, K, V, mask)

        # Combina as cabeças e aplica a camada linear final
        output = self.combine_heads(attention)
        output = self.W_o(output)
        return output


multi_head_attn = MultiHeadAttention(d_model, num_heads)

# (batch_size, sequence_length, d_model)
query = torch.randn(batch_size, 50, d_model)
key = torch.randn(batch_size, 50, d_model)
value = torch.randn(batch_size, 50, d_model)

output = multi_head_attn(query, key, value)

print(output.shape)  # Output shape: (batch_size, sequence_length, d_model)

torch.Size([32, 50, 512])


In [9]:
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super().__init__()
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff, d_model)
        self.relu = nn.ReLU()

    def forward(self, x):
        return self.fc2(self.relu(self.fc1(x)))


d_model = 512
d_ff = 2048
ffn = FeedForward(d_model, d_ff)

# (batch_size, sequence_length, d_model)
input_tensor = torch.randn(32, 50, d_model)

output = ffn(input_tensor)

print(output.shape)  # Output shape: (batch_size, sequence_length, d_model)

torch.Size([32, 50, 512])


In [10]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super().__init__()
        self.attention = MultiHeadAttention(d_model, num_heads)
        self.norm1 = nn.LayerNorm(d_model)
        self.ffn = FeedForward(d_model, d_ff)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        attn_output = self.attention(x, x, x, mask)
        x = self.norm1(x + self.dropout(attn_output))
        ffn_output = self.ffn(x)
        x = self.norm2(x + self.dropout(ffn_output))
        return x


encoder_layer = EncoderLayer(d_model, num_heads, d_ff)

# (batch_size, sequence_length, d_model)
input_tensor = torch.randn(32, 50, d_model)

output = encoder_layer(input_tensor)

print(output.shape)  # Output shape: (batch_size, sequence_length, d_model)

torch.Size([32, 50, 512])


In [11]:
class Encoder(nn.Module):
    def __init__(self, src_vocab_size, d_model, num_heads, num_layers, d_ff, max_len, dropout=0.1):
        super().__init__()
        self.embedding = nn.Embedding(src_vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model, max_len)
        self.layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        # Embedding + positional encoding + dropout
        x = self.embedding(x)
        x = self.positional_encoding(x)
        x = self.dropout(x)

        # Passa a entrada por cada camada do encoder
        for layer in self.layers:
            x = layer(x, mask)

        return x


src_vocab_size = 1000
num_layers = 6
max_len = 100

encoder = Encoder(src_vocab_size, d_model, num_heads, num_layers, d_ff, max_len)

# (batch_size, sequence_length)
input_seq = torch.randint(0, src_vocab_size, (32, 100))

output = encoder(input_seq)

print(output.shape)  # Output shape: (batch_size, sequence_length, d_model)

torch.Size([32, 100, 512])


In [12]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super().__init__()
        self.self_attention = MultiHeadAttention(d_model, num_heads)
        self.norm1 = nn.LayerNorm(d_model)
        self.cross_attention = MultiHeadAttention(d_model, num_heads)
        self.norm2 = nn.LayerNorm(d_model)
        self.ffn = FeedForward(d_model, d_ff)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, enc_out, src_mask=None, trg_mask=None):
        # Self-attention na sequência de destino
        self_attn_output = self.self_attention(x, x, x, trg_mask)
        x = self.norm1(x + self.dropout(self_attn_output))

        # Cross-attention entre a saída do self-attention e a saída do encoder
        cross_attn_output = self.cross_attention(x, enc_out, enc_out, src_mask)
        x = self.norm2(x + self.dropout(cross_attn_output))

        # Feed-forward
        ffn_output = self.ffn(x)
        x = self.norm3(x + self.dropout(ffn_output))

        return x


decoder_layer = DecoderLayer(d_model, num_heads, d_ff)

# (batch_size, sequence_length, d_model)
input_tensor = torch.randn(32, 50, d_model)
enc_out = torch.randn(32, 50, d_model)

output = decoder_layer(input_tensor, enc_out)

print(output.shape)  # Output shape: (batch_size, sequence_length, d_model)

torch.Size([32, 50, 512])


In [13]:
# Decoder
class Decoder(nn.Module):
    def __init__(self, trg_vocab_size, d_model, num_heads, num_layers, d_ff, max_len, dropout=0.1):
        super(Decoder, self).__init__()
        self.embedding = nn.Embedding(trg_vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model, max_len)
        self.layers = nn.ModuleList([DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
        self.fc_out = nn.Linear(d_model, trg_vocab_size)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, enc_out, src_mask=None, trg_mask=None):
        # Embedding + positional encoding + dropout
        x = self.embedding(x)
        x = self.positional_encoding(x)
        x = self.dropout(x)

        # Passa a entrada por cada camada do decoder
        for layer in self.layers:
            x = layer(x, enc_out, src_mask, trg_mask)

        out = self.fc_out(x)
        return out


trg_vocab_size = 1000
num_layers = 6
max_len = 100

decoder = Decoder(trg_vocab_size, d_model, num_heads, num_layers, d_ff, max_len)

# (batch_size, sequence_length)
trg_seq = torch.randint(0, trg_vocab_size, (32, 100))
enc_out = torch.randn(32, 100, d_model)

output = decoder(trg_seq, enc_out)

print(output.shape)  # Output shape: (batch_size, sequence_length, trg_vocab_size)

torch.Size([32, 100, 1000])


In [14]:
# Transformer Completo
class Transformer(nn.Module):
    def __init__(self, src_vocab_size, trg_vocab_size, d_model, num_heads, num_encoder_layers, num_decoder_layers, d_ff, max_len, dropout=0.1):
        super().__init__()
        self.encoder = Encoder(src_vocab_size, d_model, num_heads, num_encoder_layers, d_ff, max_len, dropout)
        self.decoder = Decoder(trg_vocab_size, d_model, num_heads, num_decoder_layers, d_ff, max_len, dropout)

    def generate_mask(self, src, trg):
        src_mask = (src != 0).unsqueeze(1).unsqueeze(2)
        trg_mask = (trg != 0).unsqueeze(1).unsqueeze(3)
        seq_length = trg.size(1)
        nopeak_mask = (1 - torch.triu(torch.ones(1, seq_length, seq_length), diagonal=1)).bool()
        trg_mask = trg_mask & nopeak_mask
        return src_mask, trg_mask

    def forward(self, src, trg, src_mask=None, trg_mask=None):
        src_mask, trg_mask = self.generate_mask(src, trg)
        enc_out = self.encoder(src, src_mask)
        out = self.decoder(trg, enc_out, src_mask, trg_mask)
        return out


src_vocab_size = 1000
trg_vocab_size = 1000
d_model = 512
num_heads = 8
num_encoder_layers = 6
num_decoder_layers = 6
d_ff = 2048
max_len = 100

transformer = Transformer(src_vocab_size, trg_vocab_size, d_model, num_heads, num_encoder_layers, num_decoder_layers, d_ff, max_len)

# (batch_size, sequence_length)
src_seq = torch.randint(0, src_vocab_size, (batch_size, 100))
trg_seq = torch.randint(0, trg_vocab_size, (batch_size, 100))

output = transformer(src_seq, trg_seq)

print(output.shape)  # Output shape: (batch_size, target_sequence_length, trg_vocab_size)

torch.Size([32, 100, 1000])


In [15]:
# Gerando as máscaras de forma separada

def create_padding_mask(seq):
    return (seq != 0).unsqueeze(1).unsqueeze(2).type(torch.uint8)  # Cria uma máscara para posições de preenchimento

def create_look_ahead_mask(size):
    mask = (1 - torch.triu(torch.ones(size, size), diagonal=1)).type(torch.uint8)
    return mask  # Cria uma máscara triangular para impedir a atenção em tokens futuros

In [16]:
seq = torch.tensor([[1, 2, 0, 4, 0]])
padding_mask = create_padding_mask(seq)
print(padding_mask)

tensor([[[[1, 1, 0, 1, 0]]]], dtype=torch.uint8)


In [17]:
look_ahead_mask = create_look_ahead_mask(5)
print(look_ahead_mask)

tensor([[1, 0, 0, 0, 0],
        [1, 1, 0, 0, 0],
        [1, 1, 1, 0, 0],
        [1, 1, 1, 1, 0],
        [1, 1, 1, 1, 1]], dtype=torch.uint8)


## Exercícios

### Exercício 1
Implemente um módulo que utilize apenas o módulo Encoder para a classificação de texto em `num_classes` classes. Para a obtenção do vetor de embedding de toda a sequência que será enviado para a cabeça de classificação, faça um pooling de média através da dimensão de sequência.

In [19]:
class TextClassifier(nn.Module):
  def __init__(self, src_vocab_size, d_model, num_heads, num_layers, d_ff, max_len, num_classes, dropout=0.1):
      super().__init__()
      self.embedding = nn.Embedding(src_vocab_size, d_model)
      self.positional_encoding = PositionalEncoding(d_model, max_len)
      self.layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
      self.fc = nn.Linear(d_model, num_classes)
      self.dropout = nn.Dropout(dropout)

  def forward(self, x, mask=None):
      # Embedding + positional encoding + dropout
      x = self.embedding(x)
      x = self.positional_encoding(x)
      x = self.dropout(x)

      # Passa a entrada por cada camada do encoder
      for layer in self.layers:
          x = layer(x, mask)


      x = x.mean(dim=1)
      out = self.fc(x)

      return out

src_vocab_size = 1000
num_layers = 6
max_len = 100

classifier = TextClassifier(src_vocab_size, d_model, num_heads, num_layers, d_ff, max_len, num_classes=5)

# (batch_size, sequence_length)
input_seq = torch.randint(0, src_vocab_size, (32, 100))

output = classifier(input_seq)

print(output.shape)  # Output shape: (batch_size, sequence_length, d_model)

torch.Size([32, 5])


### Exercício 2
Vamos implementar um modelo baseado em stack de decoders. Uma vez que não é necessário cross-attention, pois não há encoders, utilize o módulo `EncoderLayer`. O tamanho do vocabulário deverá ser de 50257, o tamanho dos embeddings de 768, 12 cabeças de atenção, 12 camadas, dimensão da camada feedforward de 3072 e tamanho máximo de sequência 1024. Em seguida, teste com valores aleatórios simulando uma sequência de tokens.

In [20]:
class TextGenerator(nn.Module):
  def __init__(self, src_vocab_size, d_model, num_heads, num_layers, d_ff, max_len, dropout=0.1):
      super().__init__()
      self.embedding = nn.Embedding(src_vocab_size, d_model)
      self.positional_encoding = PositionalEncoding(d_model, max_len)
      self.layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
      self.fc = nn.Linear(d_model, src_vocab_size)
      self.dropout = nn.Dropout(dropout)

  def forward(self, x, mask=None):
      # Embedding + positional encoding + dropout
      x = self.embedding(x)
      x = self.positional_encoding(x)
      x = self.dropout(x)

      # Passa a entrada por cada camada do encoder
      for layer in self.layers:
          x = layer(x, mask)


      x = x.mean(dim=1)
      out = self.fc(x)

      return out

src_vocab_size = 50257
d_model = 768
num_heads = 12
num_layers = 12
d_ff = 3072
max_len = 100
max_len = 1024

generator = TextGenerator(src_vocab_size, d_model, num_heads, num_layers, d_ff, max_len)

# (batch_size, sequence_length)
input_seq = torch.randint(0, src_vocab_size, (32, 100))

output = generator(input_seq)

print(output.shape)  # Output shape: (batch_size, sequence_length, d_model)

torch.Size([32, 50257])
