In [17]:
import torch as t
import torch.nn as nn
import math
import numpy as np

In [49]:
# import torch
# import math
# from torch import nn

# def test_multihead_attention_block():
#     batch_size = 2
#     seq_length = 3
#     d_model = 4
#     head = 2
#     dropout = 0.1

#     # Create a MultiheadAttentionBlock
#     attention_block = MultiheadAttentionBlock(d_model, head, seq_length, dropout)

#     # Create dummy inputs
#     q = torch.rand(batch_size, seq_length, d_model)
#     k = torch.rand(batch_size, seq_length, d_model)
#     v = torch.rand(batch_size, seq_length, d_model)
#     mask = torch.ones(batch_size, seq_length, seq_length)

#     # Forward pass
#     output = attention_block(q, k, v, mask)
#     print(output)

#     # Assertions
#     assert output.shape == (batch_size, seq_length, d_model), \
#         f"Expected output shape {(batch_size, seq_length, d_model)}, but got {output.shape}"

#     # Check that attention scores are calculated and have the right shape
#     assert attention_block.attention_score.shape == (batch_size, head, seq_length, seq_length), \
#         f"Expected attention scores shape {(batch_size, head, seq_length, seq_length)}, but got {attention_block.attention_score.shape}"

# # Run the test function
# test_multihead_attention_block()


tensor([[[-0.0719,  0.2177,  0.0672,  0.2151],
         [-0.0734,  0.1958,  0.0551,  0.1925],
         [-0.0750,  0.1999,  0.0574,  0.1947]],

        [[-0.0797,  0.2668,  0.0557,  0.3180],
         [-0.0613,  0.2616,  0.0731,  0.3019],
         [-0.0806,  0.2661,  0.0542,  0.3182]]], grad_fn=<UnsafeViewBackward0>)


In [19]:
class MultiheadAttentionBlock(nn.Module):
    def __init__(self,d_model,head,seq_length,dropout):
        super().__init__()
        assert d_model%head==0,f"choose other head value as d_model-({d_model}) not completly divisible by number of head-({head})"
        self.d_model=d_model
        self.h=head
        self.d_k=int(self.d_model/self.h)
        self.seq_length=seq_length
        self.dropout=nn.Dropout(dropout)
        # sructure of the input (no.batch, seq_lenght, d_model)
        self.weight_quarry=nn.Linear(d_model,d_model,bias=False)
        self.weight_key=nn.Linear(d_model,d_model,bias=False)
        self.weight_value=nn.Linear(d_model,d_model,bias=False)
        self.weight_output=nn.Linear(d_model,d_model,bias=False)
    @staticmethod
    def attention (quarry,key,value,mask,dropout:nn.Dropout):
        dk=quarry.shape[-1] # d_model
        attention_score=(quarry@key.transpose(-2,-1))/math.sqrt(dk)
        if mask is not None:
            attention_score=attention_score.masked_fill_(mask==0,-1e9)
        attention_score=attention_score.softmax(dim=-1)
        if dropout is not None:
            attention_score=dropout(attention_score)
        return (attention_score@value), attention_score
    def forward(self,q,k,v,mask):
        q=self.weight_quarry(q)
        k=self.weight_key(k)
        v=self.weight_value(v)
            #batch,seq_lenght,d_model --to-- batch,seq_lenght,head,d_k --- to -- batch,head,seq_length,d_k
        q=q.view(q.shape[0],q.shape[1],self.h,self.d_k).transpose(1,2)
        k=k.view(k.shape[0],k.shape[1],self.h,self.d_k).transpose(1,2)
        v=v.view(v.shape[0],v.shape[1],self.h,self.d_k).transpose(1,2)
            
        x,self.attention_score=MultiheadAttentionBlock.attention(q,k,v,mask,self.dropout)
        x=x.transpose(1,2).contiguous().view(x.shape[0],-1,self.h*self.d_k)
        return self.weight_output(x)
            
            

In [20]:
class InputEmbeddings(nn.Module):

    def __init__(self, d_model: int, vocab_size: int) -> None:
        super().__init__()
        self.d_model = d_model
        self.vocab_size = vocab_size
        self.embedding = nn.Embedding(vocab_size, d_model)

    def forward(self, x):
        # (batch, seq_len) --> (batch, seq_len, d_model)
        # Multiply by sqrt(d_model) to scale the embeddings according to the paper
        return self.embedding(x) * math.sqrt(self.d_model)
    
            
    

In [21]:
class PositionalEncoding(nn.Module):
    def __init__(self,d_model:int,seq_length:int,dropout:float):
        super().__init__()
        self.d_model=d_model
        self.seq_length=seq_length
        self.dropout=nn.Dropout(dropout)
        posemb=torch.zeros(self.seq_length,self.d_model)
        pos=torch.arange(0,seq_length,dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        posemb[:,0::2]=torch.sin(pos*div_term)
        posemb[:,1::2]=torch.cos(pos*div_term)
        pos = pos.unsqueeze(0) # (1, seq_len, d_model)
        # Register the positional encoding as a buffer
        self.register_buffer('pos', pos)

    def forward(self, x):
        x = x + (self.pos[:, :x.shape[1], :]).requires_grad_(False) # (batch, seq_len, d_model)
        return self.dropout(x)


In [22]:
class LayerNormalization(nn.Module):
    def __init__(self,feature:int,eps:float=10**-6):
        super().__init__()
        self.feature=feature
        # self.dropout=nn.Dropout
        self.eps=eps
        self.alfa=nn.Parameter(torch.ones(self.feature))
        self.beta=nn.Parameter(torch.zeros(self.feature))
    def forward(self,x):
        mean=x.mean(dim=-1,keepdim=True)
        std=x.std(dim=-1,keepdim=True)
        return self.alfa*(x-mean)/(std+self.eps)+self.beta
        
        

In [23]:
class FeedForwardBlock(nn.Module):

    def __init__(self, d_model: int, d_ff: int, dropout: float) -> None:
        super().__init__()
        self.linear_1 = nn.Linear(d_model, d_ff) # w1 and b1
        self.dropout = nn.Dropout(dropout)
        self.linear_2 = nn.Linear(d_ff, d_model) # w2 and b2

    def forward(self, x):
        # (batch, seq_len, d_model) --> (batch, seq_len, d_ff) --> (batch, seq_len, d_model)
        return self.linear_2(self.dropout(torch.relu(self.linear_1(x))))

In [24]:
class ResidualConnection(nn.Module):
    
        def __init__(self, features: int, dropout: float) -> None:
            super().__init__()
            self.dropout = nn.Dropout(dropout)
            self.norm = LayerNormalization(features)
    
        def forward(self, x, sublayer):
            return x + self.dropout(sublayer(self.norm(x)))

In [25]:
class EncoderBlock(nn.Module):
    def __init__(self,self_attention_block:MultiheadAttentionBlock,self_feed_forward_block:FeedForwardBlock,feature:int,dropout:float):
        super().__init__()
        self.multi_attention=self_attention_block
        self.feed_forward=self_feed_forward_block
        self.residual_block=nn.ModuleList([ResidualConnection(feature,dropout) for _ in range(2)])
    def forward(self,x,mask):
        x=self.residual_block[0](x,lambda x:self.multi_attention(x,x,x ,mask))
        x=self.residual_block[1](x,self.feed_forward)
        return x

In [26]:
class Encoder(nn.Module):
    def __init__(self,feature: int,layers:nn.ModuleList):
        super().__init__()
        self.norm=LayerNormalization(feature)
        self.layers=layers
    def forward(self,x,mask):
        for layer in self.layers:
            x=layer(x,mask)
            x=self.norm(x)
        return x
            

In [39]:
class DecoderBlock(nn.Module):
    def __init__(self,feature,multi_head_attention:MultiheadAttentionBlock,cross_head_attention:MultiheadAttentionBlock,feed_forward:FeedForwardBlock,dropout:float):
        super().__init__()
        self.multiheadattention=multi_head_attention
        self.crossheadattention=cross_head_attention
        self.feedforward=feed_forward
        self.residualblock=nn.ModuleList([ResidualConnection(feature,dropout) for _ in range(3)])
    def forward(self,decoder_x,encoder_output,encoder_mask,decoder_mask):
        decoder_x=self.residualblock[0](decoder_x,lambda x:self.multiheadattention(decoder_x,decoder_x,decoder_x,decoder_mask))
        
        decoder_x=self.residualblock[1](decoder_x,lambda x:self.crossheadattention(decoder_x,decoder_x,encoder_output,encoder_mask))
        decoder_x=self.residualblock[2](decoder_x,self.feedforward)
        return decoder_x

In [40]:
class Decoder(nn.Module):

    def __init__(self, features: int, layers: nn.ModuleList) -> None:
        super().__init__()
        self.layers = layers
        self.norm = LayerNormalization(features)

    def forward(self, x, encoder_output, src_mask, tgt_mask):
        for layer in self.layers:
            x = layer(x, encoder_output, src_mask, tgt_mask)
        return self.norm(x)

In [41]:
class ProjectionLayer(nn.Module):

    def __init__(self, d_model, vocab_size) -> None:
        super().__init__()
        self.proj = nn.Linear(d_model, vocab_size)

    def forward(self, x) -> None:

        return self.proj(x)

In [42]:
class Transformer(nn.Module):
    def __init__(self,encoder:Encoder,decoder:Decoder,projection: ProjectionLayer,
                enco_embed:InputEmbeddings,deco_embed:InputEmbeddings,enco_postion:PositionalEncoding,
                 deco_prosition:PositionalEncoding
                ):
        super().__init__()
        self.encod=encoder
        self.decod=decoder
        self.enco_embed=enco_embed
        self.deco_embed=deco_embed
        self.enco_position=enco_postion
        self.deco_position=deco_prosition
        self.proj=projection
    def encode(self,src,mask):
        src=self.enco_embed(src)
        src=self.enco_position(src)
        src=self.encod(src,mask)
        return src
    def decode(self,encoder_output:torch.Tensor,tar:torch.Tensor,src_mask:torch.Tensor,trg_mask:torch.Tensor):
        tar=self.deco_embed(tar)
        tar=self.deco_position(tar)
        return self.decod(tar,encoder_output,src_mask,trg_mask)
    def project(self,x):
        return self.proj(x)
    def forward(self, src, tgt, src_mask, tgt_mask):
        # Encode the source sequence
        encoder_output = self.encode(src, src_mask)
        
        # Decode the target sequence
        decoder_output = self.decode(encoder_output, tgt, src_mask,tgt_mask)
        
        # Project to the output vocabulary size
        output = self.project(decoder_output)
        
        return output

In [50]:
def build_transformer(input_vocab_size: int, output_vocab_size: int, 
                      input_seq_len: int, output_seq_len: int, d_model: int=512,
    N: int=6, h: int=8, dropout: float=0.1, d_ff: int=2048) -> Transformer:
    # for InputEmbedding parameter ---> d_model,input_vocab_size and in forward "x"
    input_embedding=InputEmbeddings(d_model,vocab_size=input_vocab_size) 
    output_embedding=InputEmbeddings(d_model,vocab_size=output_vocab_size)
    # for PositionalEncoding parameter ---> d_model,output_vocab_size, dropout and in forward "x"
    input_pos_encoding=PositionalEncoding(d_model,input_seq_len,dropout=dropout)
    output_pos_encoding=PositionalEncoding(d_model,output_seq_len,dropout=dropout)
    
    # list of encoding block that can be run N times 
    
    encoding_blocks=[]
    for _ in range(N):
        # for MulMultiheadAttentionBlock parameter --> d_model,head,seq_length,dropout
        # for forward parameter are q,k,v,mask
        encoding_self_attention=MultiheadAttentionBlock(d_model,head=h,seq_length=input_seq_len,dropout=dropout)
        # # for FeedFeedForwardBlock parameter --> d_model: int, d_ff: int, dropout: float
        # for forward parameter are x
        encoding_feed_forward=FeedForwardBlock(d_model,d_ff,dropout)
        # for EncoderBloEncoderBlock parameter ---> self_attention_block:MultiheadAttentionBlock,
        #self_feed_forward_block:FeedForwardBlock,feature:int,dropout:float
        # for forward parameter is  x,mask   
        encoder_block=EncoderBlock(self_attention_block=encoding_self_attention,self_feed_forward_block=
                                encoding_feed_forward,feature=d_model,dropout=dropout,  
                                  )
        encoding_blocks.append(encoder_block)
    decoding_blocks=[]
    for _ in range(N):

        decoding_self_attention=MultiheadAttentionBlock(d_model,head=h,seq_length=output_seq_len,dropout=dropout)
        decoding_cross_attention=encoding_self_attention=MultiheadAttentionBlock(d_model,head=h,seq_length=output_seq_len,dropout=dropout)
        decoding_feed_forward=FeedForwardBlock(d_model,d_ff,dropout)
        # for DecoderBlock parameter are :--> self,feature,multi_head_attention:MultiheadAttentionBlock,cross_head_attention:MultiheadAttentionBlock,
        # feed_forward:FeedForwardBlock,dropout:float
        decoder_block=DecoderBlock(feature=d_model,multi_head_attention=decoding_self_attention,
                                  cross_head_attention=decoding_cross_attention,feed_forward=decoding_feed_forward
                                  ,dropout=dropout)
        decoding_blocks.append(decoder_block)




    #unsing using the Encoder and Decoder to merger the EncoderBlock and DecoderBlock
    ## parameter for Encoder :-->layers:nn.ModuleList,norm: LayerNormalization
    ## for forward :-->"x " & "mask"
    encoder=Encoder(feature=d_model,layers=nn.ModuleList(encoding_blocks))
    decoder=Decoder(features=d_model,layers=nn.ModuleList(decoding_blocks))
    
    # for ProProjectionLayer aprameter--> d_model, vocab_size and forward: --> x
    projection=ProjectionLayer(d_model=d_model,vocab_size=output_vocab_size)


###### create transformer ################
    # **** parameter   ***************
    ###encoder:Encoder,decoder:Decoder,projection: ProjectionLayer,
     # enco_embed:InputEmbedding,deco_embed:InputEmbedding,enco_postion:ProjectionLayer,
      # deco_prosition:ProjectionLayer
    transformer=Transformer(encoder=encoder,decoder=decoder,projection=projection,enco_embed=
                           input_embedding,deco_embed=output_embedding,enco_postion=input_pos_encoding,
                            deco_prosition=output_pos_encoding )

    
    # Initialize the parameters
    for p in transformer.parameters():
        if p.dim() > 1:
            nn.init.xavier_uniform_(p)
    
    return transformer




In [52]:
def test_build_transformer():
    input_vocab_size = 10000
    output_vocab_size = 10000
    input_seq_len = 10
    output_seq_len = 10
    d_model = 512
    N = 6
    h = 8
    dropout = 0.1
    d_ff = 2048

    # Build the transformer model
    transformer = build_transformer(
        input_vocab_size=input_vocab_size,
        output_vocab_size=output_vocab_size,
        input_seq_len=input_seq_len,
        output_seq_len=output_seq_len,
        d_model=d_model,
        N=N,
        h=h,
        dropout=dropout,
        d_ff=d_ff
    )

    # Set the model to evaluation mode
    transformer.eval()

    # Create dummy data
    batch_size = 2
    src = torch.randint(0, input_vocab_size, (batch_size, input_seq_len), dtype=torch.long)
    tgt = torch.randint(0, output_vocab_size, (batch_size, output_seq_len), dtype=torch.long)
    src_mask = torch.ones(batch_size, 1, input_seq_len, input_seq_len, dtype=torch.float)
    tgt_mask = torch.ones(batch_size, 1, output_seq_len, output_seq_len, dtype=torch.float)

    # Perform a forward pass through the model
    output = transformer(src, tgt, src_mask, tgt_mask)

    # Check if the output shape is as expected
    expected_output_shape = (batch_size, output_seq_len, output_vocab_size)
    assert output.shape == expected_output_shape, f"Expected output shape {expected_output_shape}, but got {output.shape}"

    print("Test passed!")

# Run the test case
test_build_transformer()


Test passed!
