In [2]:
import torch
import torch.nn as nn
import math

In [3]:
class PositionalEncoder(nn.Module):
    def __init__(self,drop_out:float,seq_length:int,d_model:int)-> None:
        super().__init__()
        self.dropout = nn.Dropout(drop_out)
        self.seq_length = seq_length
        self.d_model  = d_model
        #creating the positional encoder matrix of seq_length and d_model = 512
        pe = torch.zeros(seq_length,d_model)
        #creating the position vector to get the position of each word in sentence of length 0 to seq_length
        position = torch.arange(0, seq_length,dtype = torch.float).unsqueeze(1)
        #formula of division term d_model/2 is the ith term
        div_term = torch.exp(torch.arange(0,d_model,2).float()*(- math.log(1000)/d_model))
        #this is for even term
        pe[:,0::2] = torch.sin(position/div_term)
        #this is for odd term
        pe[:,1::2] = torch.cos(position/div_term)
        #add the dimension for batch_size
        pe = pe.unsqueeze(0)
        #saving pe as non trainable parameter unlike torch nn.Parameter
        self.register_buffer("pe",pe)
    def feed_forward(self,x):
        #adding the word embedding with the position information
        #position embeeding with all the batch_size, until the size of seq_length and finally all the dimension of model
        x = x + (self.pe[:,:x.shape[1],:]).requires_grad(False)
        return self.dropout(x)

In [4]:
class LayerNormalization(nn.Module):
    def __init__(self,features:int,eps:float = 10**-4)-> None:
        super().__init__()
        #eps for solving divide by zero error
        self.eps = eps
        #alpha learnable parameters
        self.alpha = nn.Parameter(torch.ones(features))
        #bias learnable parametes
        self.bias = nn.Parameter(torch.zeros(features))

    def forward(self,x):
        #calculating mean keeping the last dimension
        mean = x.mean(dim = -1,keepdim = True)
        #same for standard deviation
        std = x.std(dim = -1,keepdim = True)
        return self.alpha*(x-mean)/(std+self.eps)+self.bias


In [5]:
class ResidualConnection(nn.Module):
    def __init__(self,features:int,droput:float)-> None:
        super().__init__()
        self.dropout = nn.Dropout(droput)
        self.norm  = LayerNormalization(features)

        def feed_forward(self,sublayer,x):
            #stacking the layer using python function sublayer and passing the normalized value of x
            return x +self.dropout(sublayer(self.norm(x)))

In [None]:
class FeedForward(nn.Module):
    def __init__(self, d_model:int,d_ff:int,dropout:float):
        super().__init__()
        self.linear_1 = nn.Linear(d_model,d_ff) 
        self.linear_2 = nn.Linear(d_ff,d_model)
        self.dropout = nn.Dropout(dropout)

    def feed_forward(self,x):
        #according to paper: inner layer of 2048 and input and output of 512 with ReLU in between
        return self.linear_2(self.dropout(torch.relu(self.linear_1(x))))
        

In [26]:
class MultiHeadAttentionBlock(nn.Module):
    def __init__(self,dropout:float,d_model:int,head:int):
        super().__init__()
        self.dropout = nn.Dropout(dropout)
        self.head = head #8 as per paper
        assert head%2==0, "number of heads should be even"
        #dimension of head or input to the heads 
        self.d_k = d_model//head
        #wq
        self.w_q = nn.Linear(d_model,d_model,bias = False)
        #wk
        self.w_k = nn.Linear(d_model,d_model,bias = False)
        #wv
        self.w_v = nn.Linear(d_model,d_model,bias  = False)
        #wo
        self.w_o = nn.Linear(d_model,d_model,bias = False)
    @staticmethod
    def attention(self,query,key,value,mask,dropout:nn.Dropout):
        #get the dimension of head
        d_k = query.shape[-1]
        # Just apply the formula from the paper
        # (batch, h, seq_len, d_k) --> (batch, h, seq_len, seq_len)
        attention_scores = (query @ key.transpose(-2, -1)) / math.sqrt(d_k)
        if mask is not None:
            # Write -inf to positions where mask == 0
            attention_scores = attention_scores.masked_fill(mask == 0, float('-inf'))
        attention_scores = attention_scores.softmax(dim=-1)  # (batch, h, seq_len, seq_len)
        if dropout is not None:
            attention_scores = dropout(attention_scores)
        # (batch, h, seq_len, seq_len) --> (batch, h, seq_len, d_k)
        # return attention scores which can be used for visualization
        return (attention_scores @ value), attention_scores
    
    
    def forward(self,q,k,v,mask=None):
        query = self.w_q(q) #(batch,seq_length,d_model)
        key = self.w_k(k)  #(batch,seq_length,d_model)
        value = self.w_v(v) #(batch,seq_length,d_model)

        #since we need batch,h,seq_length,d_k to feed the seq_length and d_k to h heads
        #batch,seq_length,head,d_k is transpose (1,2) to form batch,head,seq_length,d_k
        query = query.view(query.shape[0],query.shape[1],self.head,self.d_k).transpose(1,2)
        key = key.view(key.shape[0],key.shape[1],self.head,self.d_k).transpose(1,2)
        value = value.view(value.shape[0],value.shape[1],self.head,self.d_k).transpose(1,2)

        # Calculate attention
        x, attention_scores = MultiHeadAttentionBlock.attention(query, key, value, mask, self.dropout)
        
        # Combine all the heads together
        # (batch, h, seq_len, d_k) --> (batch, seq_len, h, d_k) --> (batch, seq_len, d_model)
        #since after applying transpose data is not contiguous so we make it contiguous to apply view
        x = x.transpose(1, 2).contiguous().view(x.shape[0], -1, self.h * self.d_k)
        
        # Multiply by Wo
        # (batch, seq_len, d_model) --> (batch, seq_len, d_model)  
        return self.w_o(x)