# Encoder Block 
For better understanding and visual follow this video by Umar Jamil 
Coding a Transformer from scratch on PyTorch, with full explanation, training and inference
https://www.youtube.com/watch?v=ISNdQcPhsts&t=3118s

In [1]:
import torch
import torch.nn as nn
import math

In [2]:
class InputEmbeddings(nn.Module):
    def __init__(self, d_model:int , vocab_size: int): #constructor to define the variabls
        super().__init__()
        self.d_model = d_model  # embedding lenght
        self.vocab_size = vocab_size
        self.embedding = nn.Embedding(vocab_size, d_model) # length of sentence * embedding dimension  and return a 
        # a same vector of embedding which will be multiplied later to get the emedding for each word
        
    def forward(self,x):
        # in this we try to normalize the embedding as per paper
        return self.embedding(x) *math.sqrt(self.d_model)
    
    
    
class PositionalEncoding(nn.Module):
    def __init__(self, d_model: int , seq_len: int, dropout:float):
        super().__init__()
        self.d_model = d_model
        self.seq_len = seq_len
        self.dropout = nn.Dropout(dropout) # we need to create a dropput layer to reduce overfitting
        
        # now we need to create a positionla encoding as per transformer paper
        # create matrix of length (Seq_len,d_model)
        pe= torch.zeros(seq_len,d_model)
        
        #Create matrix of length (Seq_len,1) gives th position of each word in sentence
        position=torch.arange(0,seq_len, dtype=torch.float).unsqueeze(1)
        div_term= torch.exp(torch.arange(0,d_model,2).float() * (-math.log(1000.0)/d_model))
        
        #apply sin and cosine
        pe[:,0::2]=torch.sin(position*div_term)
        pe[:,1::2]=torch.cos(position*div_term)
        
        
        pe=pe.unsqueeze(0) #(1, Seq_Len,d_model)
        
        self.register_buffer('pe',pe)
        
        
    def forward(self,x):
        # adding pe to embedding of every word and making sure that this layer is not trainable
        x= x + (self.pe[:,:x.shape[1],:].require_grad(False))
        
  
        
class LayerNormalization(nn.Module):
    def __init__(self, eps:float = 10**-6) -> None: # the eps si avoid if during normalizatin the value in the dnmoniator ios 0
        super().__init__()
        self.eps =eps
        self.aplha=nn.Parameter(torch.ones(1)) #mulitplied
        self.bias=nn.Paramters(torch.ones(1)) #Added
    
    def forward(self,x):
        
        mean=x.mean(dim=-1, keepdim=True)
        std=x.std(dim=-1, keepdim=True)
        return self.alpha *(x-mean)/(std+self.eps) + self.bias
    
        
        

In [3]:
class FeedForwardBlock(nn.Module):
    def __init__(self, d_model:int, d_ff:int, dropout: float) -> None :
        super().__init__()
        self.linear_1 = nn.Linear(d_model,d_ff)
        self.dropout = nn.Dropout(dropout)
        self.linear_2 = nn.Linear(d_ff,d_model)
        
        
    def forward(self,x):
        return self.linear_2(self.dropout(torch.relu(self.linear_1(x))))
        

In [4]:
class MultiHeadAttentionBlock(nn.Module):
    def __init__(self, d_model: int,h:int,dropout: float) -> None:
        super().__init__()
        self.d_model = d_model
        self.h = h
        self.dropout= dropout
        
        assert d_model %h == 0
        self.d_k = d_model // h
        self.w_q = nn.Linear(d_model,d_model)
        self.w_k = nn.Linear(d_model,d_model)
        self.w_v = nn.Linear(d_model,d_model)
        
        self.w_o = nn.Linear(d_model,d_model)
        
        self.dropout = nn.Dropout(dropout)
    
    @staticmethod
    def attention(query,key,value,mask,dropout:nn.Dropout):
        d_k=qurery.shape[-1]
        
        # (Batch,h, seq_len, d_k) -->(Batch,h, seq_len, seq_len)
        attention_score= (query@key.transpose(-2,-1))/math.sqrt(d_k)
        
        
        #masking
        if mask is not None:
            attention_scores.masked_fill(mask==0,-1e9)
        
        attention_scores=attention_scores.softmax(dim=-1)
        
        if dropout is not None:
            attention_scores=dropout(attention_scores)
            
        return (attention_scores @value),attention_scores
            
        
    def forward(self,q,k,v,mask):
        query = self.w_q(q)   # (Batch, seq_len, d_model) --> (Batch,seq_len,d_model)
        ke = self.w_k(q)    # (Batch, seq_len, d_model) --> (Batch,seq_len,d_model)
        value = self.w_v(v)   # (Batch, seq_len, d_model) --> (Batch,seq_len,d_model)
        
        
        
        # (Batch, seq_len, d_model) --> # (Batch, seq_len, h, d_k) --> # (Batch,h, seq_len, d_k)
        query= query.view(query.shape[0],query,shape[1],self.h,self.d_k).transpose(1,2)
        key= key.view(key.shape[0],key,shape[1],self.h,self.d_k).transpose(1,2)
        value= value.view(value.shape[0],value,shape[1],self.h,self.d_k).transpose(1,2)
        
        x,self.attention_scores=MultiHeadAttentionBlock.attention(query,key,value,mask,self.dropout)
        
        #(Batch,h, seq_len, d_k) -->(Batch, seq_len,h, d_k)-->(Batch,h, seq_len, d_k)
        x=x.transpose(1,2).contiguous().view(x.shape[0],-1,self.h *self.d_k)
         #(Batch,h, seq_len, d_model) --> #(Batch,h, seq_len, d_model)
        return self.w_o(x)
        
        

        
        
        

In [5]:
class ResidualConnection(nn.Module):
    def __init__(self,dropout:float)->None:
        super().__init__()
        self.dropout=nn.Dropout(dropout)
        self.norm = LayerNormalization()
        
    
    def forward(self,x,sublayer):
        return x+self.dropout(sublayer(self.norm(x)))
        

In [6]:
class EncoderBlock(nn.Module):
    def __init__(self, self_attention_block: MultiHeadAttentionBlock, feed_forward_block: FeedForwardBlock,dropout:float) -> None:
        super().__init__()
        self.self_attention_block=self_attention_block
        self.feedforward_block= feedforward_block
        self.residual_connection=nn.ModuleList([ResidualConnection(dropout) for _ in range(2)])
        
    def forward(self,x ,src_mask):
        
        #calling self attention
        x=self.residual_connections[0](x,lambda x: self.self_attention_block(x,x,x,src_mask))
        
        # feed forwar connection block
        x=self.residual_connections[1](x,self.feed_forward_block)
        return x
        

In [7]:
class Encoder(nn.Module):
    # module list is used to apply one after anonterh
    def __init__(self,layers:nn.ModuleList) -> None:
        super().__init__()
        self.layers= layers
        self.norm =LayerNormalization()
    
    def forward(self,x,mask):
        
        for layer in self.layers:
            x=layer(x,mask)
            
        return self.norm(x)
        
        

# Decoder Block

In [30]:
class DecoderBlock(nn.Module):
    def __init__(self,self_attention_block: MultiHeadAttentionBlock,cross_attention_block: MultiHeadAttentionBlock, feed_forward: FeedForwardBlock,drop_out=float)-> None:
        
        self.self_attention_block=self_attention_block
        self.cross_attention_block=cross_attention_block
        self.feed_forward=feed_forward
        
        # we ahve 3 residual connections. In decoder block we have 3  connections while in encoder we have 2 residual connections
        self.residual_connections=nn.Module([ResidualConnection(dropout) for _ in range (3)])
        
    def forward(self,x ,encoder_output,src_mask,tgt_mask):
        # this residual connection is valuse from decoder input while the next one will usques from decoder, key and value form encoder,
        #mask from encoder
        x=self.residual_connection[0](x,lambda x:self.self_attention_block(x,x,x,tgt_mask))
        x=self.residual_connection[1](x,lambda x:self.self_attention_block(x,encoder_output,encoder_output,src_mask))
        x=self.residual_connection[2](x,lambda x:self.feed_forward_block)
        return x

class Decoder(nn.Module):
    def __init__(self, layers: nn.Module)-> None:
        
        super().__init__()
        self.layers=layers
        self.norm=LayerNormalization()
    
        
    
    def forward(self, x, encoder_output, src_mask, tgt_mask):
        for layer in self.layers:
            x= layer(x, encoder_output, src_mask, tgt_mask)
        return self.norm(x)
    
    
        
        

        
                                    
        

In [None]:
class ProjectionLayer(nn.Module):
    def __init__(self, d_model: int, vocab_size:int)-> None:
        super().__init__()
        self.proj=nn.Linear(d_model,vocab_size)
        
    def forward (self,x):
        
        #(Batch, se_len,d_model)-->(batch,seq_len,Vocab_size)
        
        return torch.log_softmax(self.projec
    