# 4. Hopefully, a not too cumbersome way to implement transformers

### About this notebook

This notebook was used in the 50.039 Deep Learning course at the Singapore University of Technology and Design.

**Author:** Matthieu DE MARI (matthieu_demari@sutd.edu.sg)

**Version:** 1.1 (22/03/2022)

This notebook demonstrates how we could technically implement attention layers and transformers, as described in Vaswani et al., “Attention Is All You Need”, 2017;

https://arxiv.org/abs/1706.03762

and Devlin et al., “BERT: Pre-training of Deep Bidirectional Transformers for Language Understanding”, 2019.

https://arxiv.org/abs/1810.04805

This code is not designed for running (did not even bother coming with imports for this), and training code (dataset, loss, trainer functions, etc.) is also not provided.

This is just for illustrative purposes, as we do not expect students to train transformers (costly in time and resources). The purpose of this notebook is also to show how we may design multiple classes of nn.Module and combine them together in a very sophisticated architecture.

More importantly, we no longer have to implement this difficult architecture manually, as a recent PyTorch release now includes a nn.Transformer! (Feel free to check https://pytorch.org/docs/stable/generated/torch.nn.Transformer.html and https://pytorch.org/tutorials/beginner/transformer_tutorial.html for details!)

**Requirements:**
- Python 3 (tested on v3.9.6)
- Matplotlib (tested on v3.5.1)
- Numpy (tested on v1.22.1)
- Torch (tested on v1.10.1)
- Torchvision (tested on v0.11.2)
- We also strongly recommend setting up CUDA on your machine!

### Imports

In [None]:
import torch
from torch.autograd import Variable
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import functools
import matplotlib.pyplot as plt
CUDA = torch.cuda.is_available()
device = torch.device("cuda" if CUDA else "cpu")

### 1. The Self-attention mechanism and the MultiHeadAttentionLayer layers

Let us start with the attention mechanism. As seen in class, it consists of a rather simple mathematical formula involving three matrices Q, K and V.

We implement it below as the SelfAttention module.

In [None]:
class SelfAttention(nn.Module):
    """
    Description: SelfAttention layer Class, describing the attention layers to
    be used in the transformer.
    Attributes list:
    - d_model: An integer, defining the dimension of the attention layer
    - output_size: An integer, defining the dimension of the output
    for the attention layer.
    - droupout_rate: A float value between 0 and 1, corresponding to the Dropout rate
    used in the Dropout layers of the Feed Forward layers.
    - mask: A boolean. If set to True, a triangular mask will be applied.
    """
    
    def __init__(self, d_model, output_size, dropout_rate = 0.3, mask = None):
        """
        Init Method for attention layer, mostly defining attributes.
        """
        super().__init__()
        self.query = nn.Linear(d_model, output_size)
        self.key = nn.Linear(d_model, output_size)
        self.value = nn.Linear(d_model, output_size)
        self.dropout = nn.Dropout(dropout_rate)
        self.mask = mask
        
    def forward(self, q, k, v):
        """
        Forward pass for the attention layer.
        """
        
        # Compute query, key and value parameters
        q_shape = q.shape[0]
        y_len = q.shape[1]
        seq_len = k.shape[1]
        query_out = self.query(q)
        key_out = self.key(k)
        value_out = self.value(v)
        key_dim = key.size(-1)
        transposed_key_out = key.transpose(1,2)
        
        # Calculate attention scores
        scores = torch.bmm(query, transposed_key_out)/np.sqrt(key_dim)
        
        # Apply masks if needed
        if self.mask is None:
            continue
        elif not self.mask:
            temp = torch.ones((y_len, y_len), \
                              device = self.mask.device, \
                              dtype = torch.uint8)
            subsequent_mask = 1 - torch.triu(temp, diagonal = 1)
            subsequent_mask = subsequent_mask[None, :, :].expand(q_shape, y_len, y_len)
            scores = scores.masked_fill(subsequent_mask == 0, -float("Inf"))
        elif self.mask:
            expanded_mask = self.mask[:, None, :].expand(q_shape, y_len, seq_len)
            scores = scores.masked_fill(expanded_mask == 0, -float("Inf"))
        
        # Final touches
        weights = F.softmax(scores, dim = -1)
        out = torch.bmm(weights, value_out)
        return out

We can then use the attention operation multiple times in a row, as suggested in the original paper, and combine it with a few Linear layers.

In [None]:
class MultiHeadAttentionLayer(nn.Module):
    """
    Description: MultiHeadAttention layer Class, describing the attention layers to
    be used in the transformer.
    Attributes list:
    - d_model: An integer, defining the dimension of the attention layer
    - num_heads: An integer, defining the number of attention heads
    to be used in both the encoder and decoder layers.
    - droupout_rate: A float value between 0 and 1, corresponding to the Dropout rate
    used in the Dropout layers of the Feed Forward layers.
    - mask: A boolean. If set to True, a triangular mask will be applied.
    """
    
    def __init__(self, d_model, num_heads, dropout_rate, mask = None):
        """
        Init method, mostly defining attributes.
        """
        super().__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.droupout_rate = droupout_rate
        self.attention_output_size = d_model/num_heads
        layers = [SelfAttention(d_model, self.attention_output_size, dropout_rate, mask) \
                  for i in range(num_heads)]
        self.attention_layers = nn.Module(layers, )
        self.final_layer = nn.Linear(d_model, d_model)
        
    def forward(self, q, k, v):
        """
        Forward pass, simply going through each attention layer.
        Applying one linear for final touch.
        """
        x = torch.cat([layer(q, k, v) for layer in self.attention_layers], dim = -1)
        x = self.final_layer(x)
        return x

### 2. The EncoderLayer and Encoder architecture

In this part, we start off by defining the typical layers used in the Encoder part of the transformer.
It consists of an attention layer (along with a skip connection), and a feed-forward block (which is simply defined as a combination of several linear layers).

We implement it in the EncoderLayer module below, which reuses our previous modules accordingly.

In [None]:
class EncoderLayer(nn.Module):
    """
    Description: EncoderLayer Class, describing the encoder layers to
    be used in the Encoder part of the transformer.
    Attributes list:
    - d_model: An integer, defining the dimension of the encoder layer.
    - num_heads: An integer, defining the number of attention heads
    to be used in both the encoder and decoder layers.
    - d_inner: An integer, defining the number of neurons in the Feed Forward layers.
    - droupout_rate: A float value between 0 and 1, corresponding to the Dropout rate
    used in the Dropout layers of the Feed Forward layers.
    """
    
    def __init__(self, d_model, num_heads, d_inner = 2048, droupout_rate = 0.3):
        """
        Init Method, which adds up
        - one MultiHeadAttentionLayer, with an expanded mask,
        - one Feed Forward layer,
        - and Normalization Layers after each one of them.
        """
        super().__init__()
        # First Multi Head Attention Layer
        self.attention = MultiHeadAttentionLayer(d_model, num_heads, droupout_rate)
        # Feed Forward Network Layer
        self.ffn = nn.Sequential(nn.Linear(d_model, d_inner),
                                 nn.ReLU(inplace = True),
                                 nn.Dropout(droupout_rate),
                                 nn.Linear(d_inner, d_model),
                                 nn.Dropout(droupout_rate),)
        # Normalization Layers
        # (Roughly identical to BatchNorm)
        self.attention_normalization = nn.LayerNorm(d_model)
        self.ffn_normalization = nn.LayerNorm(d_model)
        
    def forward(self, x):
        """
        Forward pass for the encoder layer.
        """
        # Attention Layer forward pass and normalization
        out = x + self.attention(q = x,
                               k = x,
                               v = x)
        out = self.attention_normalization(out)
        # Feed Forward Network Layer forward pass and normalization
        out = out + self.ffn(out)
        out = self.ffn_normalization(out)
        return out

Later on, we combine several EncoderLayer modules together to obtain the full Encoder part of the transformer architecture.

In [None]:
class Encoder(nn.Module):
    """
    Description: Encoder Class, describing the Encoding part of the transformer.
    Attributes list:
    - d_model: An integer, defining the dimension of the encoder layers.
    - num_heads: An integer, defining the number of attention heads
    to be used in both the encoder and decoder layers.
    - num_encoders: An integer, defining the number of encoder layers
    to be used in the transformer.
    """
    
    def __init__(self, d_model, num_heads, num_encoders):
        """
        Init Method, which adds up num_encoders EncoderLayers in a row. 
        """
        super().__init__()
        layers_list = [EncoderLayer(d_model, num_heads) for i in range(num_encoders)]
        self.enc_layers = nn.ModuleList(layers_list, )
        
    def forward(self, x):
        """
        Forward pass for the Encoder, simply repeating the EncoderLayers in a row.
        """
        out = x
        for layers in self.enc_layers:
            out = layer(out)
        return out

### 3. The DecoderLayer and Decoder architecture

Roughly the same logic as in the Encoder part.

Same same, but different (masking must be included in some attention layers and output from Encoder must also be included).

In [None]:
class DecoderLayer(nn.Module):
    """
    Description: DecoderLayer Class, describing the decoder layers to
    be used in the Decoder part of the transformer.
    Attributes list:
    - d_model: An integer, defining the dimension of the encoder layers.
    - num_heads: An integer, defining the number of attention heads
    to be used in the decoder layer.
    - d_inner: An integer, defining the number of neurons in the Feed Forward layers.
    - droupout_rate: A float value between 0 and 1, corresponding to the Dropout rate
    used in the Dropout layers of the Feed Forward layers.
    """
    
    def __init__(self, d_model, num_heads, d_inner = 2048, droupout_rate = 0.3):
        """
        Init Method, which adds up
        - one MultiHeadAttentionLayer, with a triangular mask,
        - one MultiHeadAttentionLayer, with no mask,
        - one Feed Forward layer,
        - and Normalization Layers after each one of them.
        """
        super().__init__()
        # Masked First Attention Layer
        self.masked_attention = MultiHeadAttentionLayer(d_model, \
                                                        num_heads, \
                                                        droupout_rate, \
                                                        mask = True)
        # Subsequent Attention Layer
        self.subsequent_attention = MultiHeadAttentionLayer(d_model, \
                                                            num_heads, \
                                                            droupout_rate)
        # Feed Forward Network Layer
        self.ffn = nn.Sequential(nn.Linear(d_model, d_inner),
                                 nn.ReLU(inplace = True),
                                 nn.Dropout(droupout_rate),
                                 nn.Linear(d_inner, d_model),
                                 nn.Dropout(droupout_rate),)
        # Normalization Layers
        # (Roughly identical to BatchNorm)
        self.masked_attention_normalization = nn.LayerNorm(d_model)
        self.subsequent_attention_normalization = nn.LayerNorm(d_model)
        self.ffn_normalization = nn.LayerNorm(d_model)
        
    def forward(self, y, enc_out):
        """
        Forward pass for the decoder layer.
        """
        # Masked First Attention Layer
        out = y
        out = out + self.masked_attention(q = out, k = out, v = out)
        out = self.masked_attention_normalization(out)
        # Subsequent Attention Layer
        out = out + self.subsequent_attention(q = out, k = enc_out, v = enc_out)
        out = self.subsequent_attention_normalization(out)
        # Feed Forward Network Layer
        out = out + self.ffn(out)
        out = self.ffn_normalization(out)
        return out

In [None]:
class Decoder(nn.Module):
    """
    Description: Encoder Class, describing the Encoding part of the transformer.
    Attributes list:
    - d_model: An integer, defining the dimension of the decoder layers.
    - num_heads: An integer, defining the number of attention heads
    to be used in both the decoder layers.
    - num_decoders: An integer, defining the number of decoder layers
    to be used in the transformer.
    """
    
    def __init__(self, d_model, num_heads, num_decoders):
        """
        Init Method, which adds up num_decoders DecoderLayers in a row. 
        """
        super().__init__()
        layers_list = [DecoderLayer(d_model, num_heads) for i in range(num_decoders)]
        self.dec_layers = nn.ModuleList(layers_list, )
        
    def forward(self, y, enc_out):
        """
        Forward pass for the Decoder, simply repeating the DeccoderLayers in a row.
        """
        out = y
        for layers in self.dec_layers:
            out = layer(out, enc_out)
        return out

### 4. Finally assemble everything into your whole Transformer architecture...

And voila!

In [None]:
class Transformer(nn.Module):
    """
    Description: Full Transformer Class, combining the Encoder and Decoder parts.
    Attributes list:
    - d_model: An integer, defining the dimension of the encoder
    and decoder layers.
    - num_heads: An integer, defining the number of attention heads
    to be used in both the encoder and decoder layers.
    - num_encoders: An integer, defining the number of encoder layers
    to be used in the transformer.
    - num_decoders: An integer, defining the number of decoder layers
    to be used in the transformer.
    """
    
    def __init__(self, d_model = 512, num_heads = 8, num_encoders = 6, num_decoders = 6):
        """
        Init Method, establishing the Decoder and Encoder parts as
        attributes of the transfomer.
        Later on, we will extract the encoder part for our Embeddings.
        """
        super().__init__()
        self.encoder = Encoder(d_model, num_heads, num_encoders)
        self.decoder = Decoder(d_model, num_heads, num_decoders)
        
    def forward(self, x, y):
        """
        Forward pass for transformer.
        - Will encoder the input with the Encoder part first.
        - Then use the Decoder, combining the encoded input and its target,
        along with masks.
        """
        enc_out = self.encoder(x)
        dec_out = self.decoder(y, enc_out)
        return dec_out