In [None]:
import torch
import torch.nn as nn
import math

# Encoder

In [None]:
class InputEmbeddings(nn.Module):
    def __init__(self,vocab_size, embed_dim):
      super().__init__()
      self.vocab_size = vocab_size
      self.embed_dim = embed_dim
      self.embeddings = nn.Embedding(vocab_size, embed_dim)


    def forward(self, x):
      return self.embeddings(x) * math.sqrt(self.embed_dim)   #the paper says that in the embedding weights, we multiply the weights by sqrt of dmodel


In [None]:
class PositionalEncodings(nn.Module):
  def __init__(self,embed_dim, seq_len, dropout):
    super().__init__()
    self.embed_dim = embed_dim
    self.seq_len = seq_len
    self.dropout = nn.Dropout(dropout)

    pe = torch.zeros(seq_len, embed_dim) #create a matrix of shape (seq_len, d_shape)
    position = torch.arange(0, seq_len, dtype = torch.float).unsqueeze(1)  #(this is basically the numerator of sin and cos formula i.e., pos)
    div_term = torch.exp(torch.arange(0,embed_dim,2).float() * (-math.log(10000.0)/embed_dim))  #creating a vector of shape (seq_len,1)  (this is the denominator of the sin and cos formula that 10000 power di/dmodel but this is in log space so value will be a little different)

    #now we know that sine is used for even positions and cos is used for odd, so apply that
    pe[:,0::2] = torch.sin(position * div_term)  # : means each word and 0::2 means each index by adding two so 0,2,4
    pe[:,1::2] = torch.cos(position * div_term)  #index begins from 1 with a step size of 2 so 1,3,5

    #we are defining batch size here
    pe = pe.unsqueeze(0)   # (1,seq_len,embed_dim)  -> unsqueeze returns a new tensor with a dimension of size one inserted at the specified position.

    self.register_buffer('pe',pe)   #you use this when you have a tensor and you don't want it as a trainable parameter but keep inside the module , we want it to be saved when the module is saved, so we use register_buffer


  def forward(self,x):  #we know to add input embedding + positional embedding
    x = x + (self.pe[:,:x.shape[1],:]).required_grad_(False) # x[1] means positional encoding for that particular sentence

    #requires_grad means the PE is not learnable and will remain fixed
    return self.dropout(x)     #dropout is used to make the model less overfit

  #Refers to the sequence length of the current input x. For example, if x has a shape of
  #(batch_size, sequence_length, embedding_dim), then x.shape[1] equals sequence_length.
  #self.pe[:, :x.shape[1], :]:
  #Slices the positional encoding matrix to match the sequence length of x.
  #This ensures the positional encodings align correctly with the input embeddings.





In [None]:
#multi head attention

class MultiHeadAttention(nn.Module):
  def __init__(self, embed_dim, num_heads,dropout):  #this embed_dim must be divisible by head other wise we cannot divide equally in the heads
    super().__init__()
    self.embed_dim = embed_dim
    self.num_heads = num_heads

    assert (self.head_dim * num_heads == embed_dim), "Embed dim must be divisible by num heads"

    #d_model / h is the the dk or dv
    self.d_k = embed_dim // num_heads

    #let's define the weight matrices
    self.w_q = nn.Linear(embed_dim, embed_dim)
    self.w_k = nn.Linear(embed_dim, embed_dim)
    self.w_v = nn.Linear(embed_dim, embed_dim)

    self.w_o = nn.Linear(embed_dim, embed_dim)
    self.dropout = nn.Dropout(dropout)

  @staticmethod   #by static method means that we can use this function without having an instance of this class
  def selfAttention(self,query,key,value,mask,dropout):
    d_k = query.shape[-1]
    #@ in pytorch means matrix multiplication , -2,-1 means last two dimesnions that were seq_len,d_k will become d_k,seq_len
    attention_scores = (query @ key.transpose(-2,-1)) / math.sqrt(d_k)  #attention
    #apply mask before applying softmax and softmax will replace them with zeroes

    if mask is not None:
      attention_scores.masked_fill_(mask == 0, -1e9)    #if the condition is true, replace by that value

    attention_scores = attention_scores.softmax(dim=-1)  #(batch,h, seq_len,seq_len)

    if dropout is not None:
      attention_scores = dropout(attention_scores)

    return (attention_scores @ value), attention_scores   #we are returning attention scores * value bcs the output has to be given to the next layer, the other is being returned for the purpose of visualization

  def forward(self,q,k,v,mask):  #mask is using to mask the words
    query = self.w_q(q)  #(batch, seq_len,embed_dim) -> (batch,seq_len, embed_dim)
    key = self.w_k(k)
    value = self.w_v(v)

    #now we want to divide these query, keys and values so that we can give them to different heads
    #so we will use pytorch's view method for that

    #query.shape[0] bcs we don't want to split the sentence, we want to split the embedding so we keep the batch's dimenson , also keeping the seq_len, splitting teh dimensions only to two smaller dimensions
    query = query.view(query.shape[0], query.shape[1],self.num_heads,self.d_k).transpose(1,2)    #making h the second dimension so that we can see the full sentence
    #transpose bcs (batch,seq_len,embed_dim) -> (batch,seq_len,h,d_k) -> (batch,h,seq_len,d_k)
    key = key.view(key.shape[0],key.shape[1],self.num_heads, self.d_k).transpose(1,2)
    value = value.view(value.shape[0], value.shape[1], self.num_heads, self.d_k).transpose(1,2)

    #we want two things, the output and the attention scores
    x, self.attentionscores = MultiHeadAttention.selfAttention(query,key,value,mask,self.dropout)

    #now we concatenate the output of all the heads
    x= x.tranpose(1,2).contiguous().view(x.shape[0],-1, self.h * self.d_k)  #returning to original dimensions

    #and after concatenation, we finally multiply it with w_o to get the output matrix
    return self.w_o(x)




In [None]:
class LayerNormalization(nn.Module):
  def __init__(self, eps=1e-5):  #this epsilon is used to avoid 0 in the numerator so we take a small value of epsilon
    super().__init__()
    self.eps = eps
    self.alpha = nn.Parameter(torch.ones(1))   #Parameter makes the parameter learnable  (alpha is multiplied)
    self.bias = nn.Parameter(torch.zeros(1))   #torch.ones(1) creates a tensor filled with the value 1 and has a shape of (1,). (bias is added)

  def forward(self,x):
    #we need mean, std deviation and variance
    mean = x.mean(-1, keepdim=True)
    std = x.std(-1, keepdim=True)
    var = x.var(-1, keepdim=True)
    #this is the formula we use for add and norm
    return self.alpha * (x - mean) / torch.sqrt(var + self.eps) + self.bias


In [None]:
class FeedForward(nn.Module):
  def __init__(self,d_model, d_ff, dropout):  #as in paper FFN(x) = max(0,xW1+b1)W2+b2
    super().__init__()
    self.linear_1 = nn.Linear(d_model, d_ff)  #W1 and b1 (bias ia by default truw)
    self.dropout = nn.Dropout(dropout)
    self.linear_2 = nn.Linear(d_ff,d_model)  #W2 and b2

  def forward(self,x):     #(batch,seq_len,d_model) -> (batch,seq_len,d_ff) -> (batch,seq_len, d_model)
    #first linear layer will make it dff and then the other linear layer will again transform it to d_model
    x = self.linear_1(x)
    x = torch.relu(x)
    x = self.dropout(x)
    x = self.linear_2(x)  #or we can also shortly write x = self.linear_2(self.dropout(torch.relu(self.linear_1(x))))
    return x

In [None]:
#now all ingredients are complete and only residual connection is left

class ResidualConnection(nn.Module):
  def __init__(self,dropout):
    super().__init__()
    self.dropout = nn.Dropout(dropout)
    #skipgram or residual connection is between the add & norm and the previous layer

    self.norm = LayerNormalization()

  def forward(self,x,sublayer):   #sublayer means previous layer
    return x + self.dropout(sublayer(self.norm(x)))



In [None]:
#Nx, each of the small blocks are joined together by the bigger block that is the encoder
#and there are n encoder blocks, and the output of each layer/block is sent to the next

class EncoderBlock(nn.Module):
  def __init__(self,self_attention, feed_forward, dropout):
    super().__init__()
    self.self_attention_block = self_attention
    self.feed_forward = feed_forward
    self.dropout = nn.Dropout(dropout)

    self.residualconnection = nn.ModuleList([ResidualConnection(dropout) for _ in range(2)])

  def forward(self,x,src_mask):   #src mask is the mask we want to apply to the input of the encoder
    #[0] means take the first residual connection, x is first sent to multiheadattention (self_attention is an instance of MultiHeadAttention)
    #each x for q,k,v
    x = self.residualconnection[0](x, lambda x: self.self_attention_block(x,x,x,src_mask))
    #second residual connection is b/w the feed forward
    x = self.residualconnection[1](x,self.feed_forward)

    return x




In [None]:
class Encoder(nn.Module):
  def __init__(self, num_layers):
    super().__init__()
    self.num_layers = num_layers
    self.norm = LayerNormalization()

  def forward(self,x,mask):
    for layer in self.num_layers:
      x = layer(x,mask)

    return self.norm(x)



**Decoder**

In [None]:
class DecoderBlock(nn.Module):
  def __init__(self, self_attention, cross_attention, feed_forward, dropout):
    super().__init__()
    self.self_attention = self_attention
    self.cross_attention = cross_attention
    self.feedforward = feedforward
    self.dropout = nn.Dropout(dropout)
    self.residualconnection = nn.ModuleList([ResidualConnection(dropout) for _ in range(3)])

  def forward(self,x,encoder_output,src_mask,tar_mask):  #source mask is the mask used by encoder and the target mask is the one used by decoder bcs we are dealing with the task of language translation
    x = self.residualconnection[0](x, lambda x: self.self_attention(x,x,x,tar_mask))
    x = self.residual[1](x,lambda x: self.cross_attention(x,encoder_output,encoder_output,src_mask))  #keys and values from the encoder
    x = self.residual[2](x,self.feedforward)

    return x

In [None]:
class Decoder(nn.Module):
  def __init__(self,layers):
    super().__init__()
    self.layers = layers
    self.norm = LayerNormalization()

  def forward(self,x,encoder_output,src_mask,tar_mask):
    for layer in self.layers:
      x = layer(x,encoder_output,src_mask,tar_mask)

    return self.norm

In [None]:
#coding the linear layer before applying the softmax
class LinearLayer(nn.Module):
  def __init___(self, embed_dim, vocab_size):
    super().__init__()
    self.linear = nn.Linear(embed_dim, vocab_size)

  def foward(self,x):
    #(batch,seq_len,embed_dim)  ->  (batch,seq_len,vocab_size)
    return torch.log_softmax(self.linear(x),dim=-1)

In [None]:
class Transformer(nn.Module):
  def __init__(self, encoder, decoder, src_embed, tar_embed, src_pos,tar_pos, linear_layer):
    super().__init__()

    self.encoder = encoder
    self.decoder = decoder
    self.src_embed = src_embed
    self.tar_embed = tar_embed
    self.src_pos = src_pos
    self.tar_pos = tar_pos
    self.linear_layer = linear_layer

  #now we will add three functions, for encoding, decoding and projecting to the linear layer

  def encode(self,src,src_mask):
    src = self.src_embed(src)
    src = self.src_pos(src)
    return self.encoder(src,src_mask)

  def decode(self,tar,encoder_output,src_mask,tar_mask):
    tar = self.tar_embed(tar)
    tar = self.tar_pos(tar)

    return self.decoder(tar,encoder_output,src_mask,tar_mask)

  def project(self,x):
    return self.linear_layer(x)


In [None]:
#now we are defining this class so that we can have an object that builds a transformer for us given the hyperparameters
#these values are according to the paper
def build_transformer(src_vocab_size, tar_vocab_size, src_seq_len, tar_seq_len, embed_dim = 512, n= 6, h = 8,dropout=0.1, d_ff = 2048):
   # n = number of layers , h is the number of heads
   src_embeddings = InputEmbeddings(vocab_size = src_vocab_size, embed_dim = embed_dim)
   src_pos = PositionalEncodings(embed_dim = embed_dim, seq_len = src_seq)

   tar_embeddings = InputEmbeddings(vocab_size = tar_vocab_size, embed_dim = embed_dim)
   tar_pos = PositionalEncodings(embed_dim = embed_dim, seq_len = tar_seq_len)

   encoder_blocks = []
   for _ in range(n):
    encoder_self_attention_block = MultiHeadAttention(embed_dim, h, dropout)
    feed_forward_block = FeedForward(embed_dim, d_ff, dropout)
    encoder_block = EncoderBlock(embed_dim, encoder_self_attention_block, feed_forward_block, dropout)
    encoder_blocks.append(encoder_block)

    # Create the decoder blocks
    decoder_blocks = []
   for _ in range(n):
    decoder_self_attention_block = MultiHeadAttention(embed_dim, h, dropout)
    decoder_cross_attention_block = MultiHeadAttention(embed_dim, h, dropout)
    feed_forward_block = FeedForward(embed_dim, d_ff, dropout)
    decoder_block = DecoderBlock(embed_dim, decoder_self_attention_block, decoder_cross_attention_block, feed_forward_block, dropout)
    decoder_blocks.append(decoder_block)

    # Create the encoder and decoder
    encoder = Encoder(embed_dim, nn.ModuleList(encoder_blocks))
    decoder = Decoder(embed_dim, nn.ModuleList(decoder_blocks))

    # Create the projection layer
    projection_layer = LinearLayer(embed_dim, tar_vocab_size)

    # Create the transformer
    transformer = Transformer(encoder, decoder, src_embeddings, tar_embeddings, src_pos, tar_pos, projection_layer)

     # Initialize the parameters
    for p in transformer.parameters():
        if p.dim() > 1:
            nn.init.xavier_uniform_(p)

    return transformer



In [None]:
!pip install datasets

Collecting datasets
  Downloading datasets-3.2.0-py3-none-any.whl.metadata (20 kB)
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from datasets)
  Downloading xxhash-3.5.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess<0.70.17 (from datasets)
  Downloading multiprocess-0.70.16-py310-none-any.whl.metadata (7.2 kB)
Collecting fsspec<=2024.9.0,>=2023.1.0 (from fsspec[http]<=2024.9.0,>=2023.1.0->datasets)
  Downloading fsspec-2024.9.0-py3-none-any.whl.metadata (11 kB)
Downloading datasets-3.2.0-py3-none-any.whl (480 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m480.6/480.6 kB[0m [31m9.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dill-0.3.8-py3-none-any.whl (116 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m116.3/116.3 kB[0m [31m8.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading fsspec-2024.9.0-py3-none-any.whl (1

In [None]:
from datasets import load_dataset
from tokenizers import Tokenizer
from tokenizers.models import WordLevel
from tokenizers.trainers import WordLevelTrainer
from tokenizers.pre_tokenizers import Whitespace

from pathlib import Path

def get_all_sentences(ds, lang):  #we just want to yield one particular language
  for item in ds:
    yield item['translation'][lang]

def get_tokenizer(config,ds,lang):
  tokenizer_path = Path(config["Tokenizer_file"].format(lang))
  if not tokenizer_path.exists():
    tokenizer = Tokenizer(WordLevel(unk_token="[UNK]")) #if there is a word in it's vocab that it has not seen then replace with UNK token's equivalent embedding
    tokenizer.pre_tokenizer = Whitespace()
    trainer = WordLevelTrainer(special_tokens = ["[UNK]", "[PAD]", "[SOS]", "[EOS]"], min_frequency = 2)
    tokenizer.train_from_iterator(trainer,get_all_sentences(ds,lang))
    tokenizer.save(str(tokenizer_path))
  else:
    tokenizer = tokenizer.from_file(str(tokenizer_path))

  return tokenizer


In [None]:
from torch.utils.data import DataLoader,Dataset,random_split

def get_ds(config):
  ds_raw = load_dataset("opus_books",f'{config["lang_src"]}{config["lang_tgt"]}',split="train")

  #build a tokenzier
  tokenizer_src = get_tokenizer(config,ds_raw,config["lang_src"])
  tokenizer_tgt = get_tokenizer(config,ds_raw,config["lang_tgt"])

  #train/val split  #in the hugging face dataset we only have training set, so we will split for validation as well
  train_ds_size = int(0.9 * len(ds_raw))
  val_ds_size = len(ds_raw) - train_ds_size

  train_ds_size,val_ds_size = random_split(ds_raw,[train_ds_size,val_ds_size])

