In [1]:
import os
import time
import math
import copy
import spacy
import GPUtil
import pandas as pd
from typing import *
from itertools import chain

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.optim.lr_scheduler import LambdaLR
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, Dataset

import altair as alt
from altair import Chart

alt.data_transformers.disable_max_rows()

DataTransformerRegistry.enable('default')

## Positional Encoding

The positional encoding module is added, for the transformer to understand relative word positions, this is, absolute positions within the text but also in relation to each other. Periodical functions (sine and cosine) are used, as their orthogonality allows for unique encodings to be described through combinations of them (trigonometric identities). In addition, a dropout layer is added after the PE to avoid overfitting during training, as it prevents over-dependence on exact token positions.

In [2]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout, max_len=5000):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000)/d_model))  #Exp for (math) convenience
        pe[:,0::2] = torch.sin(position * div_term)
        pe[:,1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)  #Add batch dimension for input
        self.register_buffer("pe", pe)   #Register positional encoding as non-updatable tensor (not parameter)

    def forward(self, x):
        x = x + self.pe[:, :x.size(1)].detach()  #Adjust to input size, stop gradient flowing through PE
        return self.dropout(x)

## Multi-Head Attention

The Multi-Head Attention module uses several sets of Query (Q), Key (K) and Value (V) matrices, where each set of matrices(belonging to a head) will capture information about the text in a different regard. This is called an Attention Module.

For instance, Head 1 with matrices {K_1, Q_1, V_1} will extract the semantic information, while Head 2 with matrices {K_2, Q_2, V_2} will extract syntactic information. Each of these modules will compute a weighted sum of the attention probabilities.

In the end, the weighted sums are concatenated and projected through a final layer.

In [3]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads

        assert self.head_dim * num_heads == d_model      #Ensure integer dimensions

        #Linear (affine) transformations. Single-layer NNs acting as matrices
        self.W_q = nn.Linear(d_model, d_model)
        self.W_k = nn.Linear(d_model, d_model)
        self.W_v = nn.Linear(d_model, d_model)
        self.W_o = nn.Linear(d_model, d_model)

    def scaled_dot_product_attention(self, Q, K, V, mask=None):
        scale = math.sqrt(self.head_dim)
        K_t = K.transpose(-2, -1)                     #Transpose to match dimensions and get right similarity scores
        attn_scores = torch.matmul(Q, K_t) / scale

        #We want to remove the similarity scores of zero from attn_scores, but softmax will turn them to 1 because exp(0)=1. 
        #The mask transforms those logits so exp(-1e9)=0 and they don't receive attention after softmax is applied
        if mask != None:
            attn_scores = attn_scores.masked_fill(mask==0, -1e9)

        attn_probs = F.softmax(attn_scores, dim=-1)
        return torch.matmul(attn_probs, V)

    def _project(self, x, linear):
        #Project and reshape, as output of projection has shape [batch_size, sequence_length, d_model]
        batch_size = x.size(0)
        return linear(x).view(batch_size, -1, self.num_heads, self.head_dim).transpose(1, 2)

    def forward(self, x_q, x_k, x_v, mask=None): 
        #Extract batch size (0th dimension)
        batch_size = x_q.size(0)

        #Project input to learnable spaces
        Q_proj = self._project(x_q, self.W_q)       
        K_proj = self._project(x_k, self.W_k)
        V_proj = self._project(x_v, self.W_v)

        #Attention for each head
        attn_output = self.scaled_dot_product_attention(Q_proj, K_proj, V_proj, mask)

        #Concatenate heads and reshape vector to size=d_model
        attn_output = attn_output.transpose(1,2).contiguous().view(batch_size, -1, self.d_model)

        #Project concatenated heads
        return self.W_o(attn_output)

## Feed Forward

The Feed Forward network is placed after the Multi-Head Attention layer. 

It is used to individually process each token in the sequence, independently of the other tokens, and it simply consists of a two-layer neural network, with an activation function (ReLU in this case) in the middle to introduce non-linearity. 

Dropout is added after the activation function to again avoid overfitting. The dimensionality of the inner layer must be larger than *d_model*, but the exact ratio can vary from model to model.

In [4]:
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff=2048):
        super().__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.linear2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(0.1)

    def forward(self, x):
        x_layer1 = self.dropout(F.relu(self.linear1(x)))    # Input goes through NN, mapped through ReLU and used dropout
        return self.linear2(x_layer1)                       # Second NN, restored to original dimension

## Encoder Layer

The Encoder Layer (which is repeated n times, one after the other) extracts the input sequence's most relevant features. The input is processed through the Multi-Head Self Attention module, whose output (with dropout) is added to the input to form a residual connection layer for a smooth gradient flow without it disappearing, and then normalized through Layer Normalization (common for NLP). After normalization, the output goes through the Feed Forward network, and forms a residual connection which is again normalized.

In [5]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.ffn = FeedForward(d_model)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(0.1)

    def forward(self, x, mask=None):
        attn_output = self.self_attn(x, x, x)            # Multi-Head Attention (all same x here)
        res1 = x + self.dropout(attn_output)             # Residual connection
        norm_layer1 = self.norm1(res1)                   # Layer Normalization
        ffn_output = self.ffn(norm_layer1)               # Feed Forward
        res2 = norm_layer1 + self.dropout(ffn_output)    # Residual connection
        return self.norm2(res2)                          # Layer Normalization

## Decoder Layer

The Decoder Layer, which processes the model's output, is similar to the Encoder layer, as it also consist of n consecutive repetitions of itself, a Multi-Head Self Attention module and a Feed Forward network.

The main difference lies in that, between the Self Attention and the Feed Forward network, there is a Cross Attention Module, which takes the output from the Encoder Layer as its Key and Value inputs, and the output from the (normalized) Self Attention as the Query input. This is done so the decoder can also process the relevant features about the original input sequence, and link it to the output sequence.

In [14]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.cross_attn = MultiHeadAttention(d_model, num_heads)
        self.ffn = FeedForward(d_model)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(0.1)

    def forward(self, x, encoder_output, self_mask, cross_mask):
        attn_output = self.self_attn(x, x, x, self_mask)                                                # Multi-Head Attention
        res1 = x + self.dropout(attn_output)                                                            # Residual connection
        norm_layer1 = self.norm1(res1)                                                                  # Layer Normalization

        cross_attn_output = self.cross_attn(norm_layer1, encoder_output, encoder_output, cross_mask)    # Cross Attention
        res2 = norm_layer1 + self.dropout(cross_attn_output)                                            # Residual connection
        norm_layer2 = self.norm2(res2)                                                                  # Layer Normalization

        ffn_output = self.ffn(norm_layer2)                                                              # Feed Forward
        res3 = norm_layer2 + self.dropout(ffn_output)                                                   # Residual Connection
        return self.norm3(res3)                                                                         # Layer Normalization

## Transformer

In [7]:
class Transformer(nn.Module):
    def __init__(self, source_vocab_size, target_vocab_size, d_model=512, num_heads=8, num_layers=6):
        super().__init__()
        self.encoder_embedding = nn.Embedding(source_vocab_size, d_model)
        self.decoder_embedding = nn.Embedding(target_vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model, dropout=0.1)

        self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads) for _ in range(num_layers)])
        self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, num_heads) for _ in range(num_layers)])

        self.fc_out = nn.Linear(d_model, target_vocab_size)

    def forward(self, inputs, outputs, enc_mask=None, dec_mask=None):
        inputs_embedding = self.positional_encoding(self.encoder_embedding(inputs))     # Embed input and add positional encoding
        encoder_input = inputs_embedding
        for layer in self.encoder_layers:
            encoder_output = layer(encoder_input, enc_mask)                             # Perform n layers of encoder

        outputs_embedding = self.positional_encoding(self.decoder_embedding(outputs))   # Embed output and add positional encoding
        decoder_input = outputs_embedding
        for layer in self.decoder_layers:
            decoder_output = layer(decoder_input, encoder_output, enc_mask, dec_mask)   #Perform n layers of decoder

        return self.fc_out(decoder_output)                                              # Affine transform to reduce dimension

## Transformer test

In [26]:
def test_transformer():
    torch.manual_seed(24)
    batch_size = 2
    seq_len = 10
    d_model = 512
    num_heads = 8
    source_vocab_size = 100
    target_vocab_size = 100
    num_layers = 2

    source = torch.randint(0, source_vocab_size, (batch_size, seq_len))
    target = torch.randint(0, target_vocab_size, (batch_size, seq_len))
    
    enc_mask = torch.ones(batch_size, 1, 1, seq_len)
    dec_mask = torch.tril(torch.ones(seq_len, seq_len)).expand(batch_size, 1, seq_len, seq_len)

    transformer = Transformer(source_vocab_size=source_vocab_size,
                              target_vocab_size=target_vocab_size,
                              d_model=d_model,
                              num_heads=num_heads,
                              num_layers=num_layers)
    print("="*50)
    print("1. Positional Encoding Test")
    pe = PositionalEncoding(d_model, dropout=0.1)
    x = torch.randn(1, seq_len, d_model)
    print(f"Before PE: mean={x.mean():.4f}, std={x.std():.4f}")
    x_pe = pe(x)
    print(f"After PE: mean={x_pe.mean():.4f}, std={x.std():.4f}")
    print(f"PE shape: {x_pe.shape}. (should be [1, {seq_len}, {d_model}])")

    print("\n2. Multi-Head Attention Test")
    att = MultiHeadAttention(d_model, num_heads)
    q = k = v = torch.randn(batch_size, seq_len, d_model)
    att_output = att(q,k,v)
    print(f"Attention output shape: {att_output.shape}. (should be {q.shape})")
    print(f"Max value: {att_output.max():.4f}")
    print(f"Min value: {att_output.min():.4f}")

    print("\n3. Encoder test")
    encoder_layer = EncoderLayer(d_model, num_heads)
    encoder_input = torch.randn(batch_size, seq_len, d_model)
    encoder_output = encoder_layer(encoder_input)
    print(f"Encoder output shape: {encoder_output.shape}. (should be {encoder_input.shape})")
    print(f"Data changed: {torch.allclose(encoder_input, encoder_output, atol=1e-4)}. (should be False)")

    print("\n4. Decoder test")
    decoder_layer = DecoderLayer(d_model, num_heads)
    decoder_input = torch.randn(batch_size, seq_len, d_model)
    decoder_output = decoder_layer(decoder_input, encoder_output, enc_mask, dec_mask)
    print(f"Decoder output shape: {decoder_output.shape}. (should be {decoder_input.shape})")
    print(f"Output norm = {decoder_output.norm():.4f}")

test_transformer()

1. Positional Encoding Test
Before PE: mean=-0.0059, std=0.9872
After PE: mean=0.4742, std=0.9872
PE shape: torch.Size([1, 10, 512]). (should be [1, 10, 512])

2. Multi-Head Attention Test
Attention output shape: torch.Size([2, 10, 512]). (should be torch.Size([2, 10, 512]))
Max value: 0.3594
Min value: -0.3447

3. Encoder test
Encoder output shape: torch.Size([2, 10, 512]). (should be torch.Size([2, 10, 512]))
Data changed: False. (should be False)

4. Decoder test
Decoder output shape: torch.Size([2, 10, 512]). (should be torch.Size([2, 10, 512]))
Output norm = 101.1924
