In [21]:
import numpy as np
import torch
import torch.nn as nn
import math

### Build the input embedding layer first where text is converted as tokens into numbers

In [22]:
class InputEmbedding(nn.Module):
    def __init__(self, d_model: int, vocab_size: int) -> None:
        super().__init__()
        self.d_model = d_model  # in AIYUN paper, it 512
        self.vocab_size = vocab_size
        self.embedding = nn.Embedding(vocab_size, d_model)

    def forward(self, x):
        return self.embedding(x) * math.sqrt(self.d_model)  # Acc to paper page 5
        

#### Build Positional encoding and add it to Input Embedding

In [23]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model: int, seq_len: int, dropout: float) -> None:
        super().__init__()
        self.d_model = d_model  # in this paper, it 512
        self.seq_len = seq_len  # maximum length of the sequence (the length of a sentence for example)
        self.dropout = nn.Dropout(p=dropout) # Droput to add noise and improve generalization (google for more details)
        
        # Initialise the position encoding matrix of shape (sequence length(seq_len) , d_model=512) 
        pe = torch.zeros(seq_len, d_model)
        # create a vector of shape (seq_len, 1)
        position = torch.arange(0, seq_len, dtype=torch.float32).unsqueeze(1)
        # Positional encoding formula as per page 6
        # now, we will create the denominator of the positional encoding formulae
        # since it is a bit long, we will break it into a few lines
        # first, we need a vector containing multiples of 2 from 0 to d_model (here, 512)
        # this line is because of the 2i term which is the power of 10000
        # thus, this vector provides for the numbers we need for 2i
        vector = torch.arange(0, d_model, 2, dtype=torch.float32)
        # now, we raise 10,000 to the power of 2i/d_model
        # denominator_original = torch.pow(10000, vector/d_model)
        # this is the one used by Harvard Transformer article (exponential of log nullifies but helps in numerical stability)
        denominator_harvard = torch.exp(vector * (-math.log(10000.0)/d_model))
        
        # even : apply sin and store it in even indices of pe {start from 0 and increment by 2 for index}
        pe[:, 0::2] = torch.sin(position * denominator_harvard)
        # odd : apply cos and store it in odd indices of pe {start from 1 and increment by 2 for index}
        pe[:, 1::2] = torch.cos(position * denominator_harvard)
        
        # we need to add the batch dimension so that we can apply it to batches of sentences
        pe = pe.unsqueeze(0)  # new shape: (1, seq_len, d_model)
        # register the pe tensor as a buffer so that it can be saved along with the state of the model
        # used for storing auxillary and non-trainable data , associated with nn.module {google for more information}
        self.register_buffer("pe", pe)

    def forward(self, x):
        # we don't want to train the positional encoding, ie, we don't want to make it
        # a learnable parameter, so we set its requires_grad to False
        x = x + self.pe[:, :x.size(1)].requires_grad_(False)  # (batch, seq_len, d_model)
        return self.dropout(x)

In [24]:
def dummyfn2():
    torch.manual_seed(42)
    seq_len = 4
    d_model = 4
    dropout = 0.2
    x = torch.randn(d_model, seq_len)
    obj = PositionalEncoding(d_model, seq_len, dropout)
    return obj(x)

dummy_obj = dummyfn2()
print(f"dummy_obj = {dummy_obj}")
print(f"Shape of dummy_obj ={dummy_obj.shape}")

dummy_obj = tensor([[[ 2.4086,  3.1091,  1.1259, -0.0000],
         [ 1.8999, -0.8678, -0.0413, -0.7559],
         [ 0.1965,  1.5407, -0.4656, -0.0000],
         [-0.0000, -1.9368, -0.9236,  2.2025]]])
Shape of dummy_obj =torch.Size([1, 4, 4])


### Build the Layer Normalisation Block ###

In [25]:
class LayerNormalization(nn.Module):
    def __init__(self, eps: float = 1e-6) -> None:
        super().__init__()
        self.eps = eps
        # instead of simply doing self.alpha = torch.ones(1)
        # we use nn.Parameter() so that when we call the state dict of the model
        # we are able to see this alpha & be able to learn it.
        # only using torch.ones(1) won't allow us to see this alpha
        self.alpha = nn.Parameter(torch.ones(1))  # multiplied
        self.bias = nn.Parameter(torch.zeros(1))  # added

    def forward(self, x):
        # apply mean after the batch dimension
        # mean usually cancels the dimension to which it is applied,
        # but we want to keep it hence keepdim=true is used here
        mean = x.mean(dim=-1, keepdim=True)
        # similarly for standard deviation
        std = x.std(dim=-1, keepdim=True)
        return self.alpha * ((x-mean)/(std**2 + self.eps)) + self.bias

### Build the FeedForward Block ###

In [26]:
class PositionWiseFeedForward(nn.Module):
    """Implements the FFN equation."""
    # See euation (2) on page 5
    def __init__(self, d_model: int, d_ff: int, dropout: float) -> None:
        super().__init__()
        self.linear1 = nn.Linear(d_model, d_ff)   # W1 and B1
        self.linear2 = nn.Linear(d_ff, d_model)   # W2 and B2
        self.dropout = nn.Dropout(p=dropout)      # p = Dropout Rate
        self.relu = nn.ReLU()

    def forward(self, x):
        """
        x is of the shape: (batch, seq_len, d_model)
        linear1 is of the shape: (d_model, d_ff)
        linear2 is of the shape: (d_ff, d_model)

        On multiplying x with linear1, the shape of x becomes (batch, seq_len, d_ff)
        On multiplying the new x with linear2, the shape of x changes back to the
        original one, ie, (batch, seq_len, d_model)
        """
        x = self.relu(self.linear1(x))
        x = self.dropout(x)
        x = self.linear2(x)
        return x

### Build the MultiHead attention module ###

In [27]:
class MultiHeadAttentionBlock(nn.Module):
    def __init__(self, d_model: int, h: int, dropout: float) -> None:
        """Take in model size and number of heads."""
        super().__init__()
        self.d_model = d_model  # embedding vector size
        self.h = h   # number of heads
        # make sure d_model is divisible by h
        assert d_model % h == 0, "d_model is not divisible by h"
        # we assume d_v always equals d_k
        self.d_k = d_model // h  # dimension of vector seen by each head
        self.wq = nn.Linear(d_model, d_model, bias=False)  # Wq
        self.wk = nn.Linear(d_model, d_model, bias=False)  # Wk
        self.wv = nn.Linear(d_model, d_model, bias=False)  # Wv
        self.wo = nn.Linear(d_model, d_model, bias=False)  # Wo
        self.dropout = nn.Dropout(dropout)
    
    @staticmethod
    def attention(query, key, value, mask=None, dropout=None):
        d_k = query.size(-1)
        # calculate the attention scores by applying scaled dot-product attention
        # query(batch, h, seq_len, d_k) * key_transpose(batch, h, d_k, seq_len) = (batch, h, seq_len, seq_len)
        # in AIYUN paper, equation (1)
        attention_scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k) # Transpose the last 2 axis

        if mask is not None:
            # write a very low value (indicating -infinity) to the positions
            # where mask == 0, this will tell softmax to replace those values
            # with zero
            attention_scores = attention_scores.masked_fill(mask==0, -1e9)
            
        # convert the attention scores to probability scores by softmax (# in AIYUN paper, equation (1))
        attention_scores = attention_scores.softmax(dim=-1)  # (batch, h, seq_len, seq_len)
        if dropout is not None:
            attention_scores = dropout(attention_scores)
        
        # Return a tuple containing the final matrix --> (batch, h, seq_len, d_k) & 
        # attention_scores --> (batch, h, seq_len, seq_len)
        return torch.matmul(attention_scores, value), attention_scores
        
        
               

    def forward(self, q, k, v, mask=None):
        '''
        Preapare the query, key and value matrices as in notes (Q',K',V')
        The multiplication with Q',K',V' does not change the shape of incoming matrices
        '''
        query = self.wq(q) # (batch, seq_len, d_model) --> (batch, seq_len,d_model)
        key = self.wk(k)   # (batch, seq_len, d_model) --> (batch, seq_len,d_model)
        value = self.wv(v) # (batch, seq_len, d_model) --> (batch, seq_len,d_model)

        # (batch, seq_len, d_model) --> (batch, seq_len, h, d_k)  broken down into smaller chunks
        # (batch, seq_len, h, d_k) --> (batch, h, seq_len, d_k)   2nd & 3rd dimension is transposed
        # (batch, h, seq_len, d_k) makes more sense b'cz u have 'h' chunks of (seq_len,d_k) matrices
        query = query.view(query.shape[0], query.shape[1], self.h, self.d_k).transpose(1,2)
        key = key.view(key.shape[0], key.shape[1], self.h, self.d_k).transpose(1,2)
        value = value.view(value.shape[0], value.shape[1], self.h, self.d_k).transpose(1,2)

        x, self.attention_scores = MultiHeadAttentionBlock.attention(query, key, value, self.dropout)

        # Change the shape of x to original {concatenate all attention heads}
        #  (batch, h, seq_len, d_k) --> (batch, seq_len, h, d_k) --> (batch, seq_len, d_model)
        x = x.transpose(1,2).contiguous().view(x.shape[0], -1, self.h * self.d_k)
        
        # Final multiplication with W0 {AIYUN Paper page 5 top section}
        # (batch, seq_len, d_model) --> (batch, seq_len, d_model) 
        return self.wo(x)
        


### Build the residual Connection module ###

It is the 'add' part in 'add & norm' module

In [28]:
class ResidualConnection(nn.Module):
    """This is the 'add' part in the 'add and norm' block."""
    def __init__(self, dropout: float) -> None:
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)
        self.norm = LayerNormalization()

    def forward(self, x, sublayer):
        """
        x: input
        sublayer: the previous layer , different layers of the transformer architecture (eg: multi-head
        attention, feed-forward network, etc.)

        Returns the skip or residual connection.
        """
        # most implementations first do normalization and then pass x to the sublayer
        # we will also do this way
        return x + self.dropout(sublayer(self.norm(x)))
        # however, the paper first passes x to the sublayer and then does the norm
        # return x + self.dropout(self.norm(sublayer(x)))

### Build the entire Encoder block  ###

In [29]:
class EncoderBlock(nn.Module):
    def __init__(self, features: int, selfattn_block: MultiHeadAttentionBlock,
                 feedforward_block: PositionWiseFeedForward, dropout: float) -> None:
        super().__init__()
        self.selfattn_block = selfattn_block
        self.feedforward_block = feedforward_block
        # store 2 residual connection layers
        # we'l use one after self-attention layer and the other after feed-forward
        # network as shown in figure 1 of the paper
        self.res_con = nn.ModuleList([ResidualConnection(features, dropout)
                                      for _ in range(2)])

    def forward(self, x, src_mask):
        # we apply the source mask because we don't want the padding word to
        # interact with other words
        x = self.res_con[0](x, lambda x: self.selfattn_block(x,x,x,src_mask))
        x = self.res_con[1](x, self.feedforward_block)

        return x


In [31]:
class Encoder(nn.Module):


    def __init__(self, layers: nn.ModuleList) -> None:
        super().__init__()
        self.layers = layers
        self.norm = LayerNormalization()

    def forward(self, x, mask):
        for layer in self.layers:
            x = layer(x, mask)
        return self.norm(x)