Creating Embeddings

In [None]:
pip install torch



In [None]:
import torch
import torch.nn as nn
import math
class Embeddings(nn.Module):
  def __init__(self,seq_length,d_model,vocab_size):
    super().__init__()
    self.vocab_size=vocab_size
    self.seq_length=seq_length
    self.d_model=d_model
    self.embeddings=nn.Embeddings(self.vocab_size,self.d_model)

  def forward(self,x):
    return self.embeddings(x) * (math.sqrt(self.d_model))  ##they multiplied the embeddings with the square root of d_model in the original paper

class PosEmbeddings(nn.Module):
  def __init__(self,seq_length,d_model,vocab_size,dropout):
    super().__init__()
    self.vocab_size=vocab_size
    self.seq_length=seq_length
    self.d_model=d_model
    self.dropout=nn.Dropout(dropout)

    # creating a matrix to store pos embeddings

    pe=torch.zeros(self.seq_length,self.d_model)## pe=(seq_length x d_model)
    pos=torch.arange(0,self.seq_length,dtype=int).unsqueeze(1)##(seq_length x 1), to get positions ## whenever we initialize tensors like torch.arange(0,y)
    ## the dimemsion becomes (y,)by itself so we use unsqueeze to convert it to(y,1)
    div_term=torch.exp((torch.arange(0,self.d_model,2))* (-math.log(1000))/self.d_model)

    ##pe(pos,2i)=sin(pos/1000^(2i/d_model))
    ##pe(pos,2i+1)=cos(pos/1000^(2i/d_model))

    pe[:,0::2]=math.sin(pos*div_term)
    pe[:,1::2]=math.cos(pos*div_term)

    pe=pe.unsqueeze(0)   ##(batch_size,seq_length,d_model) is the dimension for it
    self.register_buffer('pe',pe)


  def forward(self,x):## x is the sentence
    x=x=(self.pe[ : , :x[1], : ]).requires_grad_(False)  ##since pos embeddings are not to be trained so we keep requires_grad as False
    return self.dropout(x)

    ##the dropout layer is applied to the positional embeddings.
    ##This helps prevent overfitting on positional information by slightly regularizing it, ensuring that the model doesn't rely too heavily on specific positions in the sequence


Layer Norm

In [None]:
class layer_norm(nn.Module):
  def __init__(self,eps=1e-6):
    super().__init__()
    self.eps=eps
    self.alpha=nn.Parameter(torch.ones(1)) ##initializing alpha and beta
    self.beta=nn.Parameter(torch.zeros(1))

  def forward(self,x):
    mean=x.mean(dim=-1,keepdim=True)  ## dim=-1 means it is doing norm across last dimensions which is across features or d_model here
    std=x.std(dim=-1,keepdim=True)
    return self.alpha*((x-mean)/(std+self.eps))+self.beta

Feed Forward layer

In [None]:
class ffnn(nn.Module):
  def __init__(self,d_model,d_ff,dropout):  ## dropout is the parameter, how much of dropout we would want from 0-1
    super().__init__()
    self.d_model=d_model
    self.d_ff=d_ff
    self.Linear_1=nn.Linear(d_model,d_ff)
    self.Linear_2=nn.Linear(d_ff,d_model)
    self.dropout=nn.Dropout(dropout)

  def forward(self,x):
    return self.Linear_2(self.dropout(torch.relu(self.Linear_1(x))))

Multihead Attention

In [None]:
class MultiheadAttention(nn.Module):
  def __init__(self,d_model,h : int,dropout):
    super().__init__()
    self.d_model=d_model
    self.h=h
    self.dropout=nn.Dropout(dropout)
    self.wq=nn.Linear(self.d_model,self.d_model)
    self.wk=nn.Linear(self.d_model,self.d_model)
    self.wv=nn.Linear(self.d_model,self.d_model)
    self.wo=nn.Linear(self.d_model,self.d_model)

    assert d_model%h==0
    self.d_k=(d_model/h)

    @staticmethod
    def selfattention(query,key,value,mask,dropout):
      d_k=query.shape[-1]
      attention_scores=(query @ key.transpose(-2,-1))/math.sqrt(d_k)  ##(batch,h,seq_len,d_k)-->(batch,h,seq_len,seq_len)(dimension of attention scores),where d_k is the embedding dimension for each head
      attention_scores=attention_scores.softmax(dim=-1)
      if mask is not None:
        attention_scores.masked_fill_(mask==0,-1e9)

      if dropout is not None:
        attention_scores=dropout(attention_scores)

      return (attention_scores@value),attention_scores  ## dimension of ouptut for attention_scores@value will be (batch,h,seq_length,d_k) as attention score dim is (batch,h,seq,seq)



    def forward(self,q,k,v,mask):
      query=self.wq(q)   ##(batch,seq_len,d_model)-->(batch,seq_len,d_model)
      key=self.wq(k)   ##(batch,seq_len,d_model)-->(batch,seq_len,d_model)
      value=self.wq(v)   ##(batch,seq_len,d_model)-->(batch,seq_len,d_model)

      query=query.view(query.shape[0],query.shape[1],self.h,self.d_k).transpose(1,2)  ##(batch,seq_len,d_model) -->(batch,seq_len,h,d_k) -->(batch,h,seq_len,d_k)
      key=key.view(key.shape[0],key.shape[1],self.h,self.d_k).transpose(1,2)  ##(batch,seq_len,d_model) -->(batch,seq_len,h,d_k) -->(batch,h,seq_len,d_k)
      value=value.view(key.shape[0],key.shape[1],self.h,self.d_k).transpose(1,2)  ##(batch,seq_len,d_model) -->(batch,seq_len,h,d_k) -->(batch,h,seq_len,d_k)

      x,attention_scores=MultiheadAttention.selfattention(query,key,value,mask,self.dropout)

      x = x.transpose(1, 2).contiguous().view(x.shape[0], -1, self.h * self.d_k)  # (batch, h, seq_len, d_k) --> (batch, seq_len, h, d_k) --> (batch, seq_len, d_model)

      ##.contiguous() ensures that the tensor’s memory layout is contiguous. After a transpose operation, the underlying memory might no longer be contiguous,
      ##which can lead to issues if we try to reshape it. .contiguous() rearranges the memory to ensure the tensor is laid out contiguously, enabling the next view operation.
      return self.w_o(x)


Residual Connection

In [None]:
class residual(nn.Module):
  def __init__(self,dropout):
    self.dropout=nn.Dropout(dropout)
    self.norm=nn.LayerNorm()

  def forward(self,x,sublayer):
    return x+self.dropout(sublayer(self.norm(x)))  ##sublayer is the attention

Encoder block

In [None]:
class EncoderBlock(nn.Module):
  def __init__(self,x,self_attention_block : MultiheadAttention,feed_forward_block:ffnn, dropout:float):
    super().__init()
    self.attention_block=self_attention_block
    self.feed_forward=feed_forward_block
    self.dropout=nn.Dropout(dropout)
    self.residual=nn.ModuleList([residual(dropout) for _ in range(2)])   ## the reason for not defining residual in  the init method is that we need to apply it twice for which we use nn.ModuleLisy

  def forward(self,x,src_mask):
    x=self.residual[0](x,lambda x: self.attention_block(x,x,x,src_mask))  ## src mask here is for masking the paddings while training
    x=self.residual[1](x,lambda x: self.feed_forward(x))
    return x

class Encoder(nn.Module):  ## since we have multiple attention blocks
  def __init__(self,layers: nn.ModuleList):
    super().__init()
    self.layers=layers
    self.layernorm=layer_norm()

  def forward(self,mask):
    for layer in self.layers:
      x=layer(x,mask)
    return self.norm(x)




Decoder

In [None]:
class DecoderBlock(nn.Module):
  def __init__(self,self_attention: MultiheadAttention, cross_attention: MultiheadAttention,feed_forward: ffnn, dropout:float):
    super().__init()
    self.self_attention=self_attention
    self.cross_attention=cross_attention
    self.feed_forward=feed_forward
    self.dropout=nn.Dropout(dropout)
    self.residual=nn.ModuleList([residual(dropout) for _ in range(3)])

  def forward(self,x,encoder_output,src_mask,tgt_mask,dropout):  ##src mask is the mask from the encoder to be used in cross attention
  ##and tgt mask is the mask of the decoder to be used in the self attention in decoder. Also x is the decoder input while as encoder_output is the encoder output
    x=self.residual[0](x,lambda x: self.attention_block(x,x,x,tgt_mask))
    x=self.residual[1](x, lambda x: self.cross_attention(x,encoder_output,encoder_output,src_mask))
    x=self.residual[2](x, lambda x: self.feed_forward(x))
    return x

class Decoder(nn.Module):
  def __init__(self,layers : nn.ModuleList):
    super().__init()
    self.layers=layers
    self.layer_norm=layer_norm

  def forward(self,x,mask):
    for layers in self.layers:
      x=layers(x,mask)
    return self.norm(x)  ##the output is (seq x d_model) if we don't use batch_size


In [None]:
##now for the output we need to project it into the vocabulary for which we use the projection layer

Final Projection layer

In [None]:
class proj_layer(nn.Module):
  def __init__(self,d_model:int,vocab_size:int):
    super().__init()
    self.proj=nn.Linear(d_model,vocab_size)

  def softmax(self,x):
    ##(batch,seq_length,d_model)---->(batch,seq_length,vocab_size)
    return torch.log_softmax(self.proj(x),dim=-1)

##what basically happens here the output is of dimension (seq_length,d_model){not considering batch_size} and it gets converted to (seq_length,vocab_size),
##so basically each token here is the combination of all the tokens in the vocabulary and after applying softmax on this we get the probability of each word in the vocabulary for this word

Transformer

In [None]:
class Transformer(nn.Module):   ##(this is the generaline blueprint of transformere architecture, here no parameters are initialized or so)
  def __init__(self,encoder: Encoder, decoder:Decoder, src_emb:Embeddings, tgt_emb: Embeddings,src_pos:  PosEmbeddings, tgt_pos:  PosEmbeddings, feedforward:ffnn, proj_layer:proj_layer  ):
    super().__init()
    self.src_embed=src_emb
    self.tgt_emb=tgt_emb
    self.src_pos=src_pos
    self.tgt_pos=tgt_pos
    self.encoder=encoder
    self.decoder=decoder
    self.proj_layer=proj_layer
##the residual layer is already embedded in the encode and decode layers
  def encode(self,src,src_mask):
    src=self.src_embed(src)
    src=self.src_pos(src)
    return self.encoder(src,src_mask)

  def decode(self,tgt,encoder_output,src_mask,tgt_mask):
    tgt=self.tgt_emb(tgt)
    tgt=self.tgt_pos(tgt)
    return self.decoder(tgt,encoder_output,src_mask,tgt_mask)

  def proj_layer(self,x):
    return self.proj_layer(x)

Building a transformer now with parameters initialized

In [None]:
def build_transformer(src,src_vocab_size:int,tgt_vocab_size: int,src_seq_length: int,tgt_seq_length: int,d_model: int=512,N: int=6, h: int=8, dropout:float=0.1,d_ff:int=2048):

  ##the above are parameters to be defined for building the transformer N is the number of blocks of encoder/decoder stacked up while as h is the number of heads


  ##creating the embedding layers
  src_embed=Embeddings(d_model,src_vocab_size)
  tgt_embed=Embeddings(d_model,tgt_vocab_size)

  ##creating pos embeddings
  src_pos=PosEmbeddings(d_model,src_vocab_size)
  tgt_pos=PosEmbeddings(d_model,tgt_vocab_size)

  encoder_blocks=[]
  for encoder in range(N):
    encoder_self_attention=MultiheadAttention(d_model,h,dropout)
    encoder_feed_forward=ffnn(d_ff ,d_ff,dropout)
    encoder_block=EncoderBlock( encoder_self_attention, encoder_feed_forward,dropout)
    encoder_blocks.append(encoder)
    ##all the things in these structures that depend on the parameters are initialized here like encoder depends on d_model,h,dropout

  decoder_blocks=[]
  for encoder in range(N):
    decoder_self_attention=MultiheadAttention(d_model,h,dropout)
    decoder_cross_attention=MultiheadAttention(d_model,h,dropout)
    decoder_feed_forward=ffnn(d_ff ,d_ff,dropout)
    decoder=DecoderBlock( encoder_self_attention,decoder_cross_attention, encoder_feed_forward,dropout)
    decoder_blocks.append(decoder)

  encoder=Encoder(nn.ModuleList(encoder_blocks))
  decoder=Decoder(nn.ModuleList(decoder_blocks))
  projection_layer=proj_layer(d_model,tgt_vocab_size)

  transformer=Transformer(encoder,decoder,src_embed,tgt_embed,src_pos,tgt_pos, PosEmbeddings, projection_layer)

  for p in transformer.parameter:
    if p.dim()>1:
      nn.init.xavier_uniform_(p)
## initializing the parameters using Xavier initialization to get faster training
  return transformer

Now we are gonna train this for a translation task using the huggingface dataset

In [None]:
import torch
import torch.nn as nn