<a href="https://colab.research.google.com/github/nitin649/Transformer-from-Scratch/blob/main/Transformer_for_translation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import torch
import math
from torch import  nn
import torch.nn.functional as F

In [None]:
def get_device():
    """
    this function returns device either cpu or gpu
    """
    return torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
get_device()

device(type='cpu')

In [None]:
ones = torch.ones(10,10)
mat = torch.rand(2,1,10,10)

final = mat + ones
final.shape

torch.Size([2, 1, 10, 10])

In [None]:
# @title UtilityFunctions
def scaled_dot_product(q, k, v, mask=None):
    """
    will use this function for calculating attention scores
    q - query vector
    k - key vector
    v - value vector
    mask - for decocder it will help to find mask attention and for encoder it will be None
    """
    d_k = q.size()[-1]
    scaled = torch.matmul(q, k.transpose(-1, -2)) / math.sqrt(d_k) #q , k -->(batch ,num_heads,seq_len,head_dim)
    #k**tranpose --> (batch  , num_heads,head_dim , seq_len) , q * k --> (batch ,num_heads,seq_len,seq_len)
    #scaled shape (batch , num_heads,seq_len, seq_len)
    if mask is not None:
        #mask shape - (seq_len, seq_len)
        scaled = scaled.permute(1, 0, 2, 3) + mask  #reshaping it to (num_heads , batch , seq_len ,seq_len) + (seq_len,seq_len)(mask shape)
        scaled = scaled.permute(1, 0, 2, 3)
    attention = F.softmax(scaled, dim=-1)
    values = torch.matmul(attention, v)
    return values, attention


class PositionalEncoding(nn.Module):
    """
    generated positional embedding for the sequence
    d_model - embedding dimension
    """
    def __init__(self, d_model, max_sequence_length):
        super().__init__()
        self.max_sequence_length = max_sequence_length
        self.d_model = d_model

    def forward(self):
        even_i = torch.arange(0, self.d_model, 2).float() #generating even indices
        denominator = torch.pow(10000, even_i/self.d_model)
        position = (torch.arange(self.max_sequence_length)
                          .reshape(self.max_sequence_length, 1))

        #for even index we use sin curve
        #for odd index we use cos curve
        #here we are using same denominator i.e even_index [0,2,4,...........] because if we look the at formula of generating embedding
        #it is like sin(i/10000**(j/d_model)) i--> word_index, j-- embedding_index here j is even
        #for odd formula is cos(i/10000**(j-1/d_model)) j--> odd index here , so at the end we end up calculating even index only.
        #you can imaging this we are generating 2 different position embedding staring from 0,2,4-----,n-2 and will stack them together as odd , even fashion
        #so that we can get positional embedding for the sequence.
        even_PE = torch.sin(position / denominator)
        odd_PE = torch.cos(position / denominator)
        stacked = torch.stack([even_PE, odd_PE], dim=2)
        PE = torch.flatten(stacked, start_dim=1, end_dim=2)
        return PE

class SentenceEmbedding(nn.Module):
    "For a given sentence, create an embedding"
    def __init__(self, max_sequence_length, d_model, language_to_index, START_TOKEN, END_TOKEN, PADDING_TOKEN):
        super().__init__()
        self.vocab_size = len(language_to_index)
        self.max_sequence_length = max_sequence_length
        self.embedding = nn.Embedding(self.vocab_size, d_model)
        self.language_to_index = language_to_index
        self.position_encoder = PositionalEncoding(d_model, max_sequence_length)
        self.dropout = nn.Dropout(p=0.1)
        self.START_TOKEN = START_TOKEN
        self.END_TOKEN = END_TOKEN
        self.PADDING_TOKEN = PADDING_TOKEN

    def batch_tokenize(self, batch, start_token, end_token):
        #appending of start , end and padding token
        def tokenize(sentence, start_token, end_token):
            sentence_word_indicies = [self.language_to_index[token] for token in list(sentence)]
            if start_token: #appending start token
                sentence_word_indicies.insert(0, self.language_to_index[self.START_TOKEN])
            if end_token: #appending end token
                sentence_word_indicies.append(self.language_to_index[self.END_TOKEN])
            for _ in range(len(sentence_word_indicies), self.max_sequence_length): #appending padding token
                sentence_word_indicies.append(self.language_to_index[self.PADDING_TOKEN])
            return torch.tensor(sentence_word_indicies)

        tokenized = []
        for sentence_num in range(len(batch)):
           tokenized.append( tokenize(batch[sentence_num], start_token, end_token) )
        tokenized = torch.stack(tokenized)
        return tokenized.to(get_device())

    def forward(self, x, start_token, end_token): # sentence
        x = self.batch_tokenize(x, start_token, end_token)
        x = self.embedding(x)
        pos = self.position_encoder().to(get_device())
        x = self.dropout(x + pos)
        return x

class MultiHeadAttention(nn.Module):
    """
    class calculates attention scores for multiple heads
    d_model --embedding_dim
    num_heads-- number of heads
    """
    def __init__(self, d_model, num_heads):
        super().__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads
        self.qkv_layer = nn.Linear(d_model , 3 * d_model) # 3*d_model --> 3 represents (query , key, value)
        self.linear_layer = nn.Linear(d_model, d_model)

    def forward(self, x, mask):
        batch_size, sequence_length, d_model = x.size()
        qkv = self.qkv_layer(x) #(B,Seq_len,d_model * 3)
        #print(qkv.shape,'first')
        qkv = qkv.reshape(batch_size, sequence_length, self.num_heads, 3 * self.head_dim)
        qkv = qkv.permute(0, 2, 1, 3) #(B,Num_heads,Seq_len,3*head_dim)
        #print(qkv.shape,'qkv')
        q, k, v = qkv.chunk(3, dim=-1)#dividing 3*d_model into 3 parts -->key, value and query
        values, attention = scaled_dot_product(q, k, v, mask) #(B,Num_head , Seq_len,Seq_len)
        #print(values.shape, 'out')
        values = values.permute(0, 2, 1, 3).reshape(batch_size, sequence_length, self.num_heads * self.head_dim) #(B,Seq_len,Num_head * head_dim)
        #print(values.shape,'final')
        out = self.linear_layer(values)
        return out

class LayerNormalization(nn.Module):
    """
    class performs  layer normalization for better training
    """
    def __init__(self, parameters_shape, eps=1e-5):
        super().__init__()
        self.parameters_shape=parameters_shape #[embed_size]
        self.eps=eps
        self.gamma = nn.Parameter(torch.ones(parameters_shape)) #trainable parameters, shape-->(embed_size)
        self.beta =  nn.Parameter(torch.zeros(parameters_shape))#(embed_size)

    def forward(self, inputs):
        #print('input shapes' ,inputs.shape) #(Batch ,Seq_len,Embed_size)
        #print('parameter shape',self.parameters_shape)
        dims = [-(i + 1) for i in range(len(self.parameters_shape))] #will use  this dimension for calculating mean i.e over last dim i.e embed_size  ,[-1] shape
        #print('dimension is this ',dims)
        mean = inputs.mean(dim=dims, keepdim=True) # (Batch ,Seq_len,1) , we can directly use -1 as well here istead of dims , we are using dims to make this more generalized
        var = ((inputs - mean) ** 2).mean(dim=dims, keepdim=True) #(Batch ,Seq_len,1)
        std = (var + self.eps).sqrt() #(Batch ,Seq_len,1)
        #print('std shape',std.shape)
        y = (inputs - mean) / std  #(Batch ,Seq_len,Embed_size)
        #print('final y',y.shape)
        out = self.gamma * y + self.beta #(Batch ,Seq_len,Embed_size)

        return out

class PositionwiseFeedForward(nn.Module):
    """
    feed forward network as per the original paper
    """
    def __init__(self, d_model, hidden, drop_prob=0.1):
        super(PositionwiseFeedForward, self).__init__()
        self.linear1 = nn.Linear(d_model, hidden) #(embed_dim , 2048) 2048 as per paper
        self.linear2 = nn.Linear(hidden, d_model) #(2048,embed_dim)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(p=drop_prob)

    def forward(self, x):
        x = self.linear1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.linear2(x)
        return x

class EncoderLayer(nn.Module):
    """
    main encoder class including all the required  functions
    """
    def __init__(self, d_model, ffn_hidden, num_heads, drop_prob):
        super(EncoderLayer, self).__init__()
        self.attention = MultiHeadAttention(d_model=d_model, num_heads=num_heads)
        self.norm1 = LayerNormalization(parameters_shape=[d_model])
        self.dropout1 = nn.Dropout(p=drop_prob)
        self.ffn = PositionwiseFeedForward(d_model=d_model, hidden=ffn_hidden, drop_prob=drop_prob)
        self.norm2 = LayerNormalization(parameters_shape=[d_model])
        self.dropout2 = nn.Dropout(p=drop_prob)

    def forward(self, x, self_attention_mask):
        #x shape - (Batch ,Seq_len,embed_size)
        residual_x = x.clone() #will this in residual connection
        x = self.attention(x, mask=self_attention_mask)#(Batch ,Seq_len,Embed_size)
        x = self.dropout1(x)#(Batch ,Seq_len,Embed_size)
        x = self.norm1(x + residual_x)#(Batch ,Seq_len,Embed_size)
        residual_x = x.clone()
        x = self.ffn(x)
        x = self.dropout2(x)
        x = self.norm2(x + residual_x) #residual connection
        return x

class SequentialEncoder(nn.Sequential):
    def forward(self, *inputs): #(x , self_attention_mask)
        x, self_attention_mask  = inputs
        for module in self._modules.values(): #will get stack of n numbers of encoder block like [encoder1 ,encoder2 ,..........]
        #will iterate each one of them and process each block by passing inputs.
        #encoder1--->complete model with all layers
            #print('single module',module)
            x = module(x, self_attention_mask) #output of one encoder block acts as input for another block
        return x #(B,Seq_len,Embedding_size)

class Encoder(nn.Module):
    #print('encoder class calling')
    def __init__(self,
                 d_model,
                 ffn_hidden,
                 num_heads,
                 drop_prob,
                 num_layers,
                 max_sequence_length,
                 language_to_index,
                 START_TOKEN,
                 END_TOKEN,
                 PADDING_TOKEN):
        super().__init__()
        self.sentence_embedding = SentenceEmbedding(max_sequence_length, d_model, language_to_index, START_TOKEN, END_TOKEN, PADDING_TOKEN)
        self.layers = SequentialEncoder(*[EncoderLayer(d_model, ffn_hidden, num_heads, drop_prob)
                                      for _ in range(num_layers)]) #stacking multiple encoder blocks together

    def forward(self, x, self_attention_mask, start_token=None, end_token=None):
        #print('inside forward pass')
        x = self.sentence_embedding(x, start_token, end_token)
        x = self.layers(x, self_attention_mask)
        return x

class MultiHeadCrossAttention(nn.Module):
    """
    class performs multihead cross attention between encoder and decoder block.
    """
    def __init__(self, d_model, num_heads):
        super().__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads
        self.kv_layer = nn.Linear(d_model , 2 * d_model) #key , value vectors come from encoder
        self.q_layer = nn.Linear(d_model , d_model) #query vector from decoder
        self.linear_layer = nn.Linear(d_model, d_model)

    def forward(self, x, y, mask):
        #x is original sentence , y is target sentence.
        batch_size, sequence_length, d_model = x.size() # (B,Seq_len,embed_size) in practice, this is the same for both languages...so we can technically combine with normal attention
        kv = self.kv_layer(x) #(B, Seq_len,embed_size * 2)
        q = self.q_layer(y) #(B, Seq_len,embed_size)
        kv = kv.reshape(batch_size, sequence_length, self.num_heads, 2 * self.head_dim) #(B,Seq_len,Num_heads,2*head_dim)
        q = q.reshape(batch_size, sequence_length, self.num_heads, self.head_dim)#(B,Seq_len,Num_heads,head_dim)
        kv = kv.permute(0, 2, 1, 3) #(B,Num_heads,Seq_len,2*head_dim)
        q = q.permute(0, 2, 1, 3) #(B,Num_heads,Seq_len,head_dim)
        k, v = kv.chunk(2, dim=-1)#K:(B,Num_heads,Seq_len,head_dim) , v: (B,Num_heads,Seq_len,head_dim)
        values, attention = scaled_dot_product(q, k, v, mask) # We don't need the mask for cross attention, removing in outer function!
        values = values.permute(0, 2, 1, 3).reshape(batch_size, sequence_length, d_model)
        out = self.linear_layer(values)
        return out


class DecoderLayer(nn.Module):
    def __init__(self, d_model, ffn_hidden, num_heads, drop_prob):
        super(DecoderLayer, self).__init__()
        self.self_attention = MultiHeadAttention(d_model=d_model, num_heads=num_heads)
        self.layer_norm1 = LayerNormalization(parameters_shape=[d_model])
        self.dropout1 = nn.Dropout(p=drop_prob)

        self.encoder_decoder_attention = MultiHeadCrossAttention(d_model=d_model, num_heads=num_heads)
        self.layer_norm2 = LayerNormalization(parameters_shape=[d_model])
        self.dropout2 = nn.Dropout(p=drop_prob)

        self.ffn = PositionwiseFeedForward(d_model=d_model, hidden=ffn_hidden, drop_prob=drop_prob)
        self.layer_norm3 = LayerNormalization(parameters_shape=[d_model])
        self.dropout3 = nn.Dropout(p=drop_prob)

    def forward(self, x, y, self_attention_mask, cross_attention_mask):
        residual_y = y.clone()
        y = self.self_attention(y, mask=self_attention_mask)#(Batch ,Seq_len,Embed_size)
        y = self.dropout1(y)#(Batch ,Seq_len,Embed_size)
        y = self.layer_norm1(y + residual_y) #residual connection , (Batch ,Seq_len,Embed_size)

        residual_y = y.clone()
        y = self.encoder_decoder_attention(x, y, mask=cross_attention_mask)
        y = self.dropout2(y)
        y = self.layer_norm2(y + residual_y) #residual connection

        residual_y = y.clone()
        y = self.ffn(y)
        y = self.dropout3(y)
        y = self.layer_norm3(y + residual_y) #residual connection
        return y


class SequentialDecoder(nn.Sequential):
    def forward(self, *inputs):
        x, y, self_attention_mask, cross_attention_mask = inputs
        for module in self._modules.values():
            y = module(x, y, self_attention_mask, cross_attention_mask)
        return y

class Decoder(nn.Module):
    """
    main decoder class
    """
    def __init__(self,
                 d_model,
                 ffn_hidden,
                 num_heads,
                 drop_prob,
                 num_layers,
                 max_sequence_length,
                 language_to_index,
                 START_TOKEN,
                 END_TOKEN,
                 PADDING_TOKEN):
        super().__init__()
        self.sentence_embedding = SentenceEmbedding(max_sequence_length, d_model, language_to_index, START_TOKEN, END_TOKEN, PADDING_TOKEN)
        self.layers = SequentialDecoder(*[DecoderLayer(d_model, ffn_hidden, num_heads, drop_prob) for _ in range(num_layers)])

    def forward(self, x, y, self_attention_mask, cross_attention_mask, start_token, end_token):
        #x --> (batch , seq_len, embed_size)
        #y --> (batch,seq_len,embed_size)
        #mask --(seq_len,seq_len)
        y = self.sentence_embedding(y, start_token, end_token)
        y = self.layers(x, y, self_attention_mask, cross_attention_mask)
        return y


class Transformer(nn.Module):
    """
    main transformer class having encoder and decoder block
    """
    def __init__(self,
                d_model,
                ffn_hidden,
                num_heads,
                drop_prob,
                num_layers,
                max_sequence_length,
                hindi_vocab_size,
                english_to_index,
                hindi_to_index,
                START_TOKEN,
                END_TOKEN,
                PADDING_TOKEN
                ):
        super().__init__()
        self.encoder = Encoder(d_model, ffn_hidden, num_heads, drop_prob, num_layers, max_sequence_length, english_to_index, START_TOKEN, END_TOKEN, PADDING_TOKEN)
        self.decoder = Decoder(d_model, ffn_hidden, num_heads, drop_prob, num_layers, max_sequence_length, hindi_to_index, START_TOKEN, END_TOKEN, PADDING_TOKEN)
        self.linear = nn.Linear(d_model, hindi_vocab_size)
        self.device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

    def forward(self,
                x,
                y,
                encoder_self_attention_mask=None, #encoding padding mask
                decoder_self_attention_mask=None, #decoder attention mask
                decoder_cross_attention_mask=None, #decoder padding mask
                enc_start_token=False,
                enc_end_token=False,
                dec_start_token=False, # We should make this true
                dec_end_token=False): # x, y are batch of sentences
                #x original (batch , seq_len, embed_size)
                #y target   (batch , seq_len, embed_size)
        x = self.encoder(x, encoder_self_attention_mask, start_token=enc_start_token, end_token=enc_end_token)
        out = self.decoder(x, y, decoder_self_attention_mask, decoder_cross_attention_mask, start_token=dec_start_token, end_token=dec_end_token)
        out = self.linear(out) #out from decoder
        return out

In [None]:
# batch_size = 3
# sentence_length = 6
# embedding_dim = 24
# inputs = torch.randn(batch_size,sentence_length,embedding_dim)
# encoder = Encoder(embedding_dim,64,2,0.1,4,6)
# x = encoder.forward(inputs,None)
# print(x.shape)

In [None]:
# pe = PositionalEncoding(d_model=6, max_sequence_length=10)
# pe.forward()

#/////////////////////
# x=torch.rand(1,6,24)
# n=MultiHeadAttention(24,4)
# n.forward(x,None)

#////////////////
# batch_size = 3
# sentence_length = 6
# embedding_dim = 24
# inputs = torch.randn(batch_size,sentence_length,embedding_dim)
# print(inputs.shape , inputs[-1:].shape)
# layer_norm = LayerNormalization(inputs.size()[-1:]) #setting parameters_shape which we can use for calculating mean setting embed_dim but will use range(len(param)) i.e 1 always
# #so will -1 dimension for calculating mean
# out = layer_norm.forward(inputs)

# print(out[0].std())
# print(inputs[0].std())
# print(out[0].mean())
# print(inputs[0].mean())

#///////////
# batch_size = 3
# sentence_length = 6
# embedding_dim = 24
# inputs = torch.randn(sentence_length, batch_size, embedding_dim)


torch.Size([3, 6, 24]) torch.Size([1, 6, 24])
parameter shape torch.Size([24])
dimension is this  [-1]
std shape torch.Size([3, 6, 1])


1.3245e-08