In [1]:

import math 
import torch
import numpy as np 
import torch.nn as nn
from torch.nn import functional as F

In [2]:
x = torch.randn(size=(3,3))
x.size()

torch.Size([3, 3])

In [3]:

""" 
Multi-Head-Attention + (Self attention):
"""
def selfAttention(q,k,v,mask=None): # 30 x 8 x 200 x 64
    d_k = k.size()[-1]
    scaled = torch.matmul(q,k.transpose(-1,-2)) / math.sqrt(d_k) # 30 x 8 x 200 x 200
    if mask is not None:
        scaled =+ mask  
    attention = F.softmax(input=scaled,dim=-1)  # 30 x 8 x 200 x 200 
    # (30 x 8 x 200 x 200) (30 x 8 x 200 x 64) =  (30 x 8 x 200 x 64) 
    values = torch.matmul(attention,v)  #  30 x 8 x 200 x 64
    return attention,values

class MultiHeadAttention(nn.Module):
    def __init__(self,d_model,num_heads):
        super().__init__()
        self.d_model = d_model # 512
        self.num_heads = num_heads # 8
        self.head_dim = d_model // num_heads # 512/8 = 64
        self.qkv = nn.Linear(in_features=d_model,out_features=(3*d_model))
        self.linear_trans = nn.Linear(in_features=d_model,out_features=d_model)
        
    def forward(self,x,mask):
        batch_size,max_sequence_length,d_model = x.size() # 30 x 200 x 512 
        qkv = self.qkv(x) # 30 x 200 x 1536
        qkv = qkv.view(batch_size,max_sequence_length,self.num_heads, 3 * self.head_dim) # 30 x 200 x 8 x (3*64)=192
        qkv = qkv.permute(0,2,1,3)  # 30 x 8 X 200 x 192 
        q,k,v = qkv.chunk(3,dim=-1) # 30 x 8 x 200 x 64
        # attention=( 30 x 8 x 200 x 200) vlaues=( 30 x 8 x 200 x 64)
        attention, values = selfAttention(q=q,k=k,v=v,mask=mask) 
        # [30, 8, 200, 64] -> [30,200,8,64] -> [30,200,512]
        values = values.permute(0,2,1,3).reshape(batch_size,max_sequence_length,self.num_heads*self.head_dim)
        out = self.linear_trans(values)
        return out 

        

In [4]:

""" 
Cross Attention:
query -> From Decoder

key ->  From Encoder
value -> From Encoder
While traning, we need no mask
"""
class CrossAttention(nn.Module):
    def __init__(self,d_model,num_heads):
        super().__init__()
        self.d_model = d_model # 512
        self.num_heads = num_heads #8
        self.head_dim = d_model // num_heads # 64 
        self.kv = nn.Linear(in_features=d_model,out_features=2*d_model)
        self.q = nn.Linear(in_features=d_model,out_features=d_model)
        self.linear_trans = nn.Linear(in_features=d_model,out_features=d_model)
        
    def forward(self,x,y,mask):
        batch_size,max_sequence_length,d_model = x.size()
        kv = self.kv(x) # 30 x 200 x 1024
        q = self.q(y) # 30 x 200 x 512
        kv = kv.view(batch_size,max_sequence_length,self.num_heads,2*self.head_dim) # 30x200x8x128
        q = q.view(batch_size,max_sequence_length,self.num_heads,self.head_dim) # 30x200x8x64
        kv = kv.permute(0,2,1,3) # 30x8x200x128
        q = q.permute(0,2,1,3) # 30x8x200x64
        k,v = kv.chunk(2,dim=-1) # 30 x 8 x 200 x 64 
        # attention=( 30 x 8 x 200 x 200) vlaues=( 30 x 8 x 200 x 64)
        attention,values = selfAttention(q=q,k=k,v=v,mask=None)
        values = values.permute(0,2,1,3).reshape(batch_size,max_sequence_length,self.num_heads*self.head_dim)
        out = self.linear_trans(values)
        return out 
    
        
        
        

In [5]:

""" 
LayerNormalization:
"""
class LayerNormalization(nn.Module):
    def __init__(self,parameter_shape,esp=1e-5):
        super().__init__()
        self.parameter_shape = parameter_shape # 512
        self.gamma = nn.Parameter(torch.ones(size=parameter_shape))
        self.beta = nn.Parameter(torch.zeros(size=parameter_shape))
        self.esp = esp 
    def forward(self,x):
        dims = [ -(i+1) for i in range(len(self.parameter_shape))]
        mean = x.mean(dim=dims,keepdim=True)
        var = ((x-mean) **2).mean(dim=dims,keepdim=True)
        std = (var+self.esp).sqrt()
        y = (x-mean) / std 
        out = self.gamma * y + self.beta 
        return out 
    

In [6]:
""" 
Feed Forward Network
"""
class FeedForward(nn.Module):
    def __init__(self,d_model,hidden,drop_prob):
        super().__init__()
        self.linear1 = nn.Linear(in_features=d_model,out_features=hidden)
        self.linear2 = nn.Linear(in_features=hidden,out_features=d_model)
        self.activation = nn.ReLU()
        self.dropOut = nn.Dropout(p=drop_prob)
        
    def forward(self,input):
        x = self.linear1(input)
        x = self.activation(x)
        x = self.dropOut(x)
        x = self.linear2(x)
        return x 

In [7]:

""" 
DecoderLayer
"""
class DecoderLayer(nn.Module):
    def __init__(self,d_model,num_heads,drop_prob,ffn_hidden):
        super().__init__()
        self.attention = MultiHeadAttention(d_model=d_model,num_heads=num_heads)
        self.dropOut = nn.Dropout(p=drop_prob)
        self.norm = LayerNormalization(parameter_shape=[d_model])
        self.cross = CrossAttention(d_model=d_model,num_heads=num_heads)
        self.ffn = FeedForward(d_model=d_model,hidden=ffn_hidden,drop_prob=drop_prob)
        
    def forward(self,x,y,decoder_mask):
        
        residual = y # 30x20x512
        # mased multi-head attention
        y = self.attention(y,decoder_mask)
        y = self.dropOut(y)
        # add and norm
        y = self.norm(residual + y)
        
        residual = y 
        y = self.cross(x,y,mask=None)
        y = self.dropOut(y)
        y = self.norm(residual + y )
        
        residul = y 
        y = self.ffn(y)
        y = self.dropOut(y)
        y = self.norm(residual + y)
        return y  # 30 x 200 x 12
    
        
        


In [8]:

"""
Final Decoder Layer:
"""
class SequentialDecoder(nn.Sequential):
    def forward(self,*inputs):
        x,y,mask = inputs
        for module in self._modules.values():
            y = module(x,y,mask)
        return y
        
class Decoder(nn.Module):
    def __init__(self,d_model,num_heads,drop_prob,ffn_hidden,num_layer):
        super().__init__()
        self.layers = SequentialDecoder(*[DecoderLayer(d_model,num_heads,drop_prob,ffn_hidden)
                                          for _ in range(num_layer)])
    def forward(self,x,y,mask):
        y = self.layers(x,y,mask)


In [9]:

# hyperparameter think from the decoder architecture:
d_model = 512
drop_prob = 0.1
num_heads = 8
num_layer = 6
batch_size = 30
ffn_hidden = 2048
max_sequence_length = 200

# inputs: 
x = torch.randn(size=(batch_size,max_sequence_length,d_model)) # English 
y = torch.randn(size=(batch_size,max_sequence_length,d_model)) # Bangla

# mask:
mask = torch.tril(input=torch.ones(size=(200,200)))
mask[mask==0] = -np.inf
mask[mask==1] = 0
print("mask : ",mask,flush=True)

decoder = Decoder(d_model=d_model,num_heads=num_heads,drop_prob=drop_prob,
                  ffn_hidden=ffn_hidden,num_layer=num_layer)
out = decoder(x, y, mask)


mask :  tensor([[0., -inf, -inf,  ..., -inf, -inf, -inf],
        [0., 0., -inf,  ..., -inf, -inf, -inf],
        [0., 0., 0.,  ..., -inf, -inf, -inf],
        ...,
        [0., 0., 0.,  ..., 0., -inf, -inf],
        [0., 0., 0.,  ..., 0., 0., -inf],
        [0., 0., 0.,  ..., 0., 0., 0.]])
