In [1]:
##################################################  Transformer Architecture  ####################################################

"""f
This notebook is a pytorch implementation of transformer architecture from scratch. This implementation is based on the
architecture proposed in the paper 'Attention is all you need' by Google.

The Transformer architecture consists of an encoder and decoder. An encoder consists of a stack of encoder layers.
Each encoder layer consists of the below components

        1. Self Attention
        2. Residual connection
        3. Layer Normalization
        4. Feed forward neuarl network

"""


#######################################################  End of Comments  ########################################################

"f\nThis notebook is a pytorch implementation of transformer architecture from scratch. This implementation is based on the\narchitecture proposed in the paper 'Attention is all you need' by Google.\n\nThe Transformer architecture consists of an encoder and decoder. An encoder consists of a stack of encoder layers.\nEach encoder layer consists of the below components\n\n        1. Self Attention\n        2. Residual connection\n        3. Layer Normalization\n        4. Feed forward neuarl network\n\n"

In [2]:
import torch
import torch.nn as nn
import math

In [3]:
batch_size = 1
vocab_size = 100
seq_len = 3
embed_dim = 512
ffn_dim = 2048
n_heads = 8
n_layers = 6
dropout_prob = 0.1

In [4]:
class Embedding(nn.Module):

  """
    defines the embedding layer of the transformer architecture

    Args:


  """
  def __init__(self,embed_dim,seq_len,vocab_size):

    super().__init__()

    self.embed=nn.Embedding(vocab_size,embed_dim)

  def forward(self,X):

    self.word_embed = self.embed(X)

    return self.word_embed

In [5]:
class PositionalEmbedding(nn.Module):

  """
    Defines the positional embedding vector for the transformer architecture

    For each token 'pos' in a given sentence of length 'seq_len', a positional embedding vector of size
    'embed_dim' equal to dimension of word embedding is created. For each position 'i' in positional embedding,
    the function to calculate the value of the vector vector varies depending on whether the position is even or odd.
    For even position sine function is used and for odd position cos function is used.

    Args :
    seq_len - maximum length of the input tokens
    embed_dim - dimension of the word embedding

  """

  def  __init__(self,seq_len,embed_dim):

    super().__init__()

    self.seq_len = seq_len
    self.embed_dim = embed_dim

    pos_embed = torch.zeros(self.seq_len,self.embed_dim)

    for pos in range(seq_len):

      for i in range(0,self.embed_dim,2):

        pos_embed[pos,i] = math.sin(pos/(10000**((2 * i)/self.embed_dim)))

        pos_embed[pos,i+1] = math.cos(pos/(10000**((2 * (i+1))/self.embed_dim)))

    print('POS embedding shape before Unsqueeze : ',pos_embed.shape)

    pos_embed=pos_embed.unsqueeze(0)

    print('POS embedding shape after Unsqueeze : ',pos_embed.shape)


    #register_buffer of pytorch is used to save the parameters that do not need gradient updates during back propogation.
    #Anyhow these parameters still need to be stored and loaded into state_dict
    #https://discuss.pytorch.org/t/what-is-the-difference-between-register-buffer-and-register-parameter-of-nn-module/32723


    self.register_buffer('pos_embed',pos_embed)

  def forward(self):

    return self.pos_embed


In [6]:
class MultiHeadAttention(nn.Module):

  def __init__(self,seq_len,embed_dim,n_heads):

    super().__init__()

    self.seq_len = seq_len
    self.embed_dim = embed_dim
    self.n_heads = n_heads
    self.single_head_dim = int(self.embed_dim/self.n_heads)

    self.W_q = nn.Linear(self.embed_dim, self.embed_dim, bias=False)
    self.W_k = nn.Linear(self.embed_dim, self.embed_dim, bias=False)
    self.W_v = nn.Linear(self.embed_dim, self.embed_dim, bias=False)
    self.W_o = nn.Linear(self.embed_dim, self.embed_dim, bias=False)

  def split(self,inp_tensors):

    return inp_tensors.view(self.seq_len,self.n_heads,self.single_head_dim)

  def scaled_dot_product(self,Q,K,single_head_dim):

    product = torch.matmul(Q,K.transpose(-2,-1))

    scaled_product = product/math.sqrt(single_head_dim)

    attention_score = nn.functional.softmax(scaled_product,dim=-1)

    return attention_score

  def combine_heads(self,split_context,seq_len,n_heads,single_head_dim):

    context = split_context.view(seq_len,n_heads * single_head_dim)

    return context

  def forward(self,X):

    Q = self.split(self.W_q(X))
    K = self.split(self.W_k(X))
    V = self.split(self.W_v(X))

    attention_score = self.scaled_dot_product(Q,K,self.single_head_dim)

    #context = torch.matmul(attention_score,V)

    split_context = attention_score @ V

    context = self.combine_heads(split_context,self.seq_len,self.n_heads,self.single_head_dim)

    return context







In [7]:
#class ResidualAddition():

In [8]:
# class LayerNormalisation(nn.LayerNorm):

#   def __init__(self,embed_dim):

#     self.embed_dim = embed_dim

#   def forward(self,z_res):

#     norm_out = nn.LayerNorm(z_res)

#     print('After normalised output shape : ',norm_out.shape)

#     return norm_out

In [64]:
class FeedForward(nn.Module):

  def __init__(self,embed_dim,ffn_dim):

    super().__init__()

    self.embed_dim = embed_dim
    self.ffn_dim = ffn_dim
    self.ffn = nn.Sequential(nn.Linear(self.embed_dim,self.ffn_dim),
                             nn.ReLU(),
                             nn.Linear(self.ffn_dim,self.embed_dim))

    print(self.ffn[0])

  # ,nn.Linear(self.ffn_dim,self.embed_dim)

  def forward(self,norm_inp):

    ffn_out = self.ffn(norm_inp)

    return ffn_out






In [65]:
class EncoderLayer(nn.Module):

  def __init__(self,n_layers,n_heads,seq_len,embed_dim):

    super().__init__()


    self.attention=MultiHeadAttention(seq_len,embed_dim,n_heads)


  def forward(self,token_embedding):

    context = self.attention(token_embedding)

    return context






In [66]:
class Encoder(nn.Module):

  def __init__(self,n_layers,n_heads,embed_dim,seq_len,vocab_size,dropout_prob,ffn_dim):

    super().__init__()

    self.dropout_prob = dropout_prob
    self.ffn_dim = ffn_dim

    self.word_embedding = Embedding(embed_dim,seq_len,vocab_size)
    self.pos_embedding = PositionalEmbedding(seq_len,embed_dim)
    self.dropout = nn.Dropout(self.dropout_prob)

    self.layers = nn.ModuleList(EncoderLayer(n_layers,n_heads,seq_len,embed_dim) for layer in range(n_layers))

    self.norm = nn.LayerNorm(embed_dim)

    self.feedforward = FeedForward(embed_dim,self.ffn_dim)


  def forward(self,tokens):

    word_embedding = self.word_embedding(tokens)

    pos_embedding = self.pos_embedding()

    X = self.dropout(word_embedding + pos_embedding)


    for layer_num,layer in enumerate(self.layers):


      if layer_num == 0:

        Z = self.dropout(layer(X))

        Z_res = X + Z

        norm_Z_res= self.norm(Z_res)

        ffn_out = self.dropout(self.feedforward(norm_Z_res))

        ffn_nZres = ffn_out + norm_Z_res

        norm_ffn_nZres = self.norm(ffn_nZres)

        prev_layer_out = norm_ffn_nZres


      elif layer_num != 0:

        Z = self.dropout(layer(prev_layer_out))

        Z_res = prev_layer_out + Z

        norm_Z_res = self.norm(Z_res)

        ffn_out = self.dropout(self.feedforward(norm_Z_res))

        ffn_nZres = ffn_out + norm_Z_res

        norm_ffn_nZres = self.norm(ffn_nZres)

        prev_layer_out = norm_ffn_nZres


    return prev_layer_out


In [67]:
class Transformer(nn.Module):

  """
    Defines the transformer encoder and decoder

  """

  def __init__(self,n_layers,n_heads,embed_dim,seq_len,vocab_size,dropout_prob,ffn_dim):

    super().__init__()

    self.encoder=Encoder(n_layers,n_heads,embed_dim,seq_len,vocab_size,dropout_prob,ffn_dim)

  def forward(self,tokens):

    self.tokens = tokens

    out = self.encoder(self.tokens)

    return out



In [68]:
model=Transformer(n_layers,n_heads,embed_dim,seq_len,vocab_size,dropout_prob,ffn_dim)

POS embedding shape before Unsqueeze :  torch.Size([3, 512])
POS embedding shape after Unsqueeze :  torch.Size([1, 3, 512])
Linear(in_features=512, out_features=2048, bias=True)


In [69]:
model

Transformer(
  (encoder): Encoder(
    (word_embedding): Embedding(
      (embed): Embedding(100, 512)
    )
    (pos_embedding): PositionalEmbedding()
    (dropout): Dropout(p=0.1, inplace=False)
    (layers): ModuleList(
      (0-5): 6 x EncoderLayer(
        (attention): MultiHeadAttention(
          (W_q): Linear(in_features=512, out_features=512, bias=False)
          (W_k): Linear(in_features=512, out_features=512, bias=False)
          (W_v): Linear(in_features=512, out_features=512, bias=False)
          (W_o): Linear(in_features=512, out_features=512, bias=False)
        )
      )
    )
    (norm): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
    (feedforward): FeedForward(
      (ffn): Sequential(
        (0): Linear(in_features=512, out_features=2048, bias=True)
        (1): ReLU()
        (2): Linear(in_features=2048, out_features=512, bias=True)
      )
    )
  )
)

In [70]:
inp = torch.randint(1,vocab_size,(batch_size,seq_len))
output = model(inp)
print(output.shape)
print(output)

torch.Size([1, 3, 512])
tensor([[[-0.5492,  0.9476,  2.1118,  ..., -1.4608,  0.7753,  1.2287],
         [-0.2166, -0.1502, -0.0842,  ..., -1.6829,  0.6073,  1.2509],
         [ 1.7533, -0.7659,  0.1086,  ..., -0.7527,  1.9787,  0.2499]]],
       grad_fn=<NativeLayerNormBackward0>)


In [36]:
#model.__doc__
#help(model)