# Attention is all you need

### Rasul Alakbarli, Mahammad Nuriyev, Petko Petkov

## Required libraries

In [11]:
import torch
from torch import nn
import math
import spacy

General `Module` class so we always have access to the device used:

In [2]:
class Module(nn.Module):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

Implement the embeddings in the transformer architecture (both `input` and `output` with the positional encodings):

In [3]:
class Embedding(Module):
    def __init__(self, d_model, vocab_len, pad_index, dropout_rate=0.1):
        super().__init__()

        self.d_model = d_model
        self.embedding = nn.Embedding(vocab_len, self.d_model, padding_idx=pad_index)
        self.dropout = nn.Dropout(p=dropout_rate)

    def forward(self, x):
        # Embedding shape: (batch, sequence_len, d_model)
        # Positional encoding shape: (sequence_len, d_model)
        return self.dropout(self.embedding(x) + self.positional_encoding(x))

    def positional_encoding(self, x):
        # result.shape = (seq_len, d_model)
        result = torch.zeros(
            (x.size(1), self.d_model),
            dtype=torch.float,
            requires_grad=False
        )

        # pos.shape = (seq_len, 1)
        pos = torch.arange(0, x.size(1)).unsqueeze(1)

        # dim.shape = (d_model)
        dim = torch.arange(0, self.d_model, step=2)

        # Sine for even positions, cosine for odd dimensions
        result[:, 0::2] = torch.sin(pos / (10_000 ** (dim / self.d_model)))
        result[:, 1::2] = torch.cos(pos / (10_000 ** (dim / self.d_model)))
        return result.to(self.device)

Implementation of feed-forward neural network which is used in both of the encoder and decoder parts in the transformer:

In [4]:
class FeedForwardNetwork(Module):
    def __init__(self, d_model, d_ff=2048):
        super().__init__()

        self.linear1 = nn.Linear(d_model, d_ff)
        self.relu = nn.ReLU()
        self.linear2 = nn.Linear(d_ff, d_model)

    def forward(self, x):
        return self.linear2(self.relu(self.linear1(x)))

The `Multi-Head Attention` module of the transformer:

In [5]:
class MultiHeadAttention(Module):
    def __init__(self, d_model, num_heads=8, use_mask=False):
        super().__init__()

        self.d_model = d_model
        self.num_heads = num_heads
        self.use_mask = use_mask
        assert d_model % num_heads == 0, 'D_MODEL must be divisible by NUM_HEADS'

        # w_q_i projects D_MODEL to D_MODEL / NUM_HEADS. However, there are
        # NUM_HEADS parallel attention layers that are concatenated, so in the
        # end output dim is still D_MODEL / NUM_HEADS * NUM_HEADS = D_MODEL
        self.w_q = nn.Linear(d_model, d_model)
        self.w_k = nn.Linear(d_model, d_model)
        self.w_v = nn.Linear(d_model, d_model)
        self.w_o = nn.Linear(d_model, d_model)

        self.softmax = nn.Softmax(dim=-1)

    def forward(self, queries, keys, values):
        # queries, keys, values = (batch, seq, 512)
        # w_q = (512, 512)
        # queries @ w_q.t = (batch, seq, 512)
        # split_heads = (batch, 8, seq, 64)
        q = self.split_heads(self.w_q(queries))
        k = self.split_heads(self.w_k(keys))
        v = self.split_heads(self.w_v(values))

        # Perform NUM_HEADS parallel single-head attention
        attention = self.scaled_dot_product_attention(q, k, v)

        # Concatenate and return multi-headed results
        # (batch, 8, seq, 64) -> (batch, seq, 512)
        merged = self.merge_heads(attention)

        # Apply final projection matrix
        return self.w_o(merged)

    def split_heads(self, x):
        batch_size, seq_len, _ = x.size()

        # Split D_MODEL into NUM_HEADS channels of D_MODEL // NUM_HEADS each
        # Now shape is (batch, seq, num_heads, d_model/num_heads)
        heads = x.reshape(batch_size, seq_len, self.num_heads, self.d_model // self.num_heads)

        # However, we want (batch, num_heads, seq, d_model/num_heads) because each tensor
        # of size (seq, d_model/num_heads) represents a single-head attention
        return heads.transpose(2, 1)

    def merge_heads(self, x):
        # Concatenate multi-headed results back into shape (batch, seq, d_model)
        # This is the inverse of split_heads
        batch_size, _, seq_len, _ = x.size()

        # Switch back to shape (batch, seq, num_heads, d_model)
        transposed = x.transpose(1, 2)

        # Merge last two dimensions
        return transposed.reshape(batch_size, seq_len, self.d_model)

    def scaled_dot_product_attention(self, q, k, v):
        # Inputs are size (batch, num_heads, seq, d_model/num_heads)
        d_k = self.d_model // self.num_heads
        compatibility = torch.matmul(q, k.transpose(2, 3)) / math.sqrt(d_k)

        """
        Use lower-triangular mask to prevent leftward information flow
        Fill upper triangle with negative infinity to zero out those values during softmax

        seq     weights      values          output
        0       [1 0 0]   [ --- a --- ]   [ a + 0 + 0 ]
        1       [1 1 0] * [ --- b --- ] = [ a + b + 0 ]
        2       [1 1 1]   [ --- c --- ]   [ a + b + c ]

        At seq=0, can only attend to seq=0
        At seq=1, can attend to both seq=0 and seq=1
        And so on...
        """
        if self.use_mask:
            seq_len = compatibility.size(-1)
            mask = torch.triu(  # Prevents leftward flow of information in target seq
                torch.ones(seq_len, seq_len, dtype=torch.bool, requires_grad=False),
                diagonal=1
            ).to(self.device)
            compatibility = torch.masked_fill(compatibility, mask, float('-inf'))

        # Apply softmax along the last dimension
        value_weights = self.softmax(compatibility)

        # Weight values by softmax results
        return torch.matmul(value_weights, v)

Now we have all of the required parts to complete the encoder and decoder parts of the architecture which contains an `encoder` and a `decoder`.

Implementation of the `Encoder`:

In [6]:
class EncoderLayer(Module):
    def __init__(self, d_model, num_heads=8, dropout_rate=0.1):
        super().__init__()

        self.self_attention = MultiHeadAttention(d_model, num_heads=num_heads)
        self.dropout1 = nn.Dropout(p=dropout_rate)
        self.layer_norm1 = nn.LayerNorm(d_model)

        self.ffn = FeedForwardNetwork(d_model)
        self.dropout2 = nn.Dropout(p=dropout_rate)
        self.layer_norm2 = nn.LayerNorm(d_model)

    def forward(self, x):
        # Multi-headed attention and residual connection + layer norm
        # Dropout is applied to sub-layer output, before residual and norm
        attention_out = self.self_attention(queries=x, keys=x, values=x)
        x = self.layer_norm1(x + self.dropout1(attention_out))

        # Feed-forward network and another residual + layer norm
        ffn_out = self.ffn(x)
        return self.layer_norm2(x + self.dropout2(ffn_out))

The `Decoder` is very similar to the `Encoder`. The main differences are that its input is the output of the `Encoder` and it includes a `Linear` and a `Softmax` layer at the end to make the final prediction.

Implementation of the `Decoder`:

In [7]:
class DecoderLayer(Module):
    def __init__(self, d_model, num_heads=8, dropout_rate=0.1):
        super().__init__()

        self.self_attention = MultiHeadAttention(d_model, num_heads=num_heads, use_mask=True)
        self.dropout1 = nn.Dropout(p=dropout_rate)
        self.layer_norm1 = nn.LayerNorm(d_model)

        self.enc_attention = MultiHeadAttention(d_model, num_heads=num_heads)
        self.dropout2 = nn.Dropout(p=dropout_rate)
        self.layer_norm2 = nn.LayerNorm(d_model)

        self.ffn = FeedForwardNetwork(d_model)
        self.dropout3 = nn.Dropout(p=dropout_rate)
        self.layer_norm3 = nn.LayerNorm(d_model)

    def forward(self, x, enc_out):
        # Multi-headed attention and residual connection + layer norm
        attention_out = self.self_attention(queries=x, keys=x, values=x)
        x = self.layer_norm1(x + self.dropout1(attention_out))

        # Multi-headed attention over output of encoder stack
        # Use ENC_OUT as the keys and values, the queries come from previous attention
        # Values come from encoder, so need to use encoder mask for this attention
        attention_out = self.enc_attention(queries=x, keys=enc_out, values=enc_out)
        x = self.layer_norm2(x + self.dropout2(attention_out))

        # Feed-forward network and another residual + layer norm
        ffn_out = self.ffn(x)
        return self.layer_norm3(x + self.dropout3(ffn_out))


Now we have all of the modules required and we can combine them into a final class to get the full transformer architecture:

In [8]:
class Transformer(Module):
    def __init__(self,
                 d_model,
                 src_vocab_len,
                 trg_vocab_len,
                 src_pad_index,
                 trg_pad_index,
                 num_heads=8,
                 num_layers=6,
                 dropout_rate=0.1,
                 seed=20230815):
        super().__init__()

        self.src_pad_index = src_pad_index
        self.trg_pad_index = trg_pad_index

        # Manually seed to keep embeddings consistent across loads
        torch.manual_seed(seed)

        # Embeddings, pass in pad indices to prevent <pad> from contributing to gradient
        self.src_embedding = Embedding(d_model,
                                       src_vocab_len,
                                       src_pad_index,
                                       dropout_rate=dropout_rate)
        self.trg_embedding = Embedding(d_model,
                                       trg_vocab_len,
                                       trg_pad_index,
                                       dropout_rate=dropout_rate)

        # Encoder
        self.encoder_stack = nn.ModuleList(
            [EncoderLayer(d_model,
                          num_heads=num_heads,
                          dropout_rate=dropout_rate)
             for _ in range(num_layers)]
        )

        # Decoder
        self.decoder_stack = nn.ModuleList(
            [DecoderLayer(d_model,
                          num_heads=num_heads,
                          dropout_rate=dropout_rate)
             for _ in range(num_layers)]
        )

        # Final layer to project embedding to target vocab word probability distribution
        self.linear = nn.Linear(d_model, trg_vocab_len)

        # Move to GPU if possible
        self.to(self.device)

        # Re-seed afterward to allow shuffled data
        torch.seed()

    def forward(self, source, target):
        # Encoder stack
        enc_out = self.src_embedding(source)
        for layer in self.encoder_stack:
            enc_out = layer(enc_out)

        # Decoder stack
        dec_out = self.trg_embedding(target)
        for layer in self.decoder_stack:
            dec_out = layer(dec_out, enc_out)

        # Final linear layer to get word probabilities
        # DO NOT apply softmax here, as CrossEntropyLoss already does softmax!!!
        return self.linear(dec_out)