In [21]:
import numpy as np
import math

import torch
import torch.nn as nn
import torch.nn.functional as F

In [22]:
def scaled_dot_product(q, k, v, mask=None):

  d_k = q.size()[-1]
  scaled = torch.matmul(q, k.transpose(-1, -2)) / math.sqrt(d_k)
  print(f"scaled.size() : {scaled.size()}")
  if mask is not None:


    print(f"\n-- Adding Mask of shape {mask.size()} --\n")
    scaled+=mask
  attention = F.softmax(scaled, dim=-1)
  values = torch.matmul(attention, v)

  return values, attention

In [23]:
class MultiHeadAttention(nn.Module):

  def __init__(self, d_model, num_heads):

    super().__init__()
    self.d_model = d_model
    self.num_heads = num_heads
    self.head_dim = d_model//num_heads
    self.qkv_layer = nn.Linear(d_model, 3*d_model)
    self.linear_layer = nn.Linear(d_model, d_model)


  def forward(self, x, mask=None):

    batch_size, sequence_length, d_model = x.size()
    print(f"x.size(): {x.size()}")
    qkv = self.qkv_layer(x)
    print(f"qkv.size(): {qkv.size()}")
    qkv = qkv.reshape(batch_size, sequence_length, self.num_heads, 3*self.head_dim)
    print(f"qkv.size(): {qkv.size()}")
    qkv = qkv.permute(0, 2, 1, 3)
    print(f"after permute - qkv.size(): {qkv.size()}")
    q,k,v = qkv.chunk(3, dim=-1)
    print( f"q,k,v sizes: {q.size()} " )
    values, attention = scaled_dot_product(q, k, v, mask)
    print(f"values.size(): {values.size()},\nattention.size:{ attention.size()} ")
    values = values.reshape(batch_size, sequence_length, self.num_heads*self.head_dim)
    print(f"values.size(): {values.size()}")
    out = self.linear_layer(values)
    print(f"out.size(): {out.size()}")

    return out

In [24]:
class PositionalEncoding(nn.Module):

  def __init__(self, d_model, max_sequence_length):
    super().__init__()
    self.max_sequence_length = max_sequence_length
    self.d_model = d_model


  def forward(self):

    pos = torch.arange(0, self.d_model, 2).float()
    denominator = torch.pow(10000, pos/self.d_model)
    position = torch.arange(self.max_sequence_length).reshape(self.max_sequence_length, 1)

    even_PE = torch.sin(position/denominator)
    odd_PE = torch.cos(position/denominator)
    stacked = torch.stack( [even_PE, odd_PE], dim=2)
    PE = torch.flatten(stacked, start_dim=1, end_dim=2)

    return PE

In [25]:
class LayerNormalization(nn.Module):

  def __init__(self, parameters_shape, eps=1e-5):
      super().__init__()
      self.parameters_shape=parameters_shape
      self.eps=eps
      self.weights = nn.Parameter(torch.ones(parameters_shape))
      self.bias =  nn.Parameter(torch.zeros(parameters_shape))


  def forward(self, inputs):

      dims = [-(i + 1) for i in range(len(self.parameters_shape))]
      mean = inputs.mean(dim=dims, keepdim=True)
      print(f"Mean ({mean.size()})")
      var = ((inputs - mean) ** 2).mean(dim=dims, keepdim=True)
      std = (var + self.eps).sqrt()
      print(f"Standard Deviation  ({std.size()})")
      y = (inputs - mean) / std
      print(f"y: {y.size()}")
      out = self.weights * y  + self.bias
      print(f"self.gamma: {self.weights.size()}, self.beta: {self.bias.size()}")
      print(f"out: {out.size()}")

      return out

In [26]:
class PositionwiseFeedForward(nn.Module):

  def __init__(self, d_model, hidden, drop_prob=0.1):
    super(PositionwiseFeedForward, self).__init__()
    self.linear1 = nn.Linear(d_model, hidden)
    self.linear2 = nn.Linear(hidden, d_model)
    self.relu =  nn.ReLU()
    self.dropout = nn.Dropout(p=drop_prob)

  def forward(self, x):
    x = self.linear1(x)
    print(f"x after first linear layer: {x.size()}")
    x = self.relu(x)
    print(f"x after activation: {x.size()}")
    x = self.dropout(x)
    print(f"x after dropout: {x.size()}")
    x = self.linear2(x)
    print(f"x after 2nd linear layer: {x.size()}")
    return x

In [27]:
class EncoderLayer(nn.Module):

    def __init__(self, d_model, ffn_hidden, num_heads, drop_prob):
        super(EncoderLayer, self).__init__()
        self.attention = MultiHeadAttention(d_model=d_model, num_heads=num_heads)
        self.norm1 = LayerNormalization(parameters_shape=[d_model])
        self.dropout1 = nn.Dropout(p=drop_prob)
        self.ffn = PositionwiseFeedForward(d_model=d_model, hidden=ffn_hidden, drop_prob=drop_prob)
        self.norm2 = LayerNormalization(parameters_shape=[d_model])
        self.dropout2 = nn.Dropout(p=drop_prob)

    def forward(self, x):
        residual_x = x
        x = self.attention(x, mask=None)
        x = self.dropout1(x)
        x = self.norm1(x + residual_x)
        residual_x = x
        x = self.ffn(x)
        x = self.dropout2(x)
        x = self.norm2(x + residual_x)
        return x


In [28]:
class EncoderLayer(nn.Module):

    def __init__(self, d_model, ffn_hidden, num_heads, drop_prob):
        super(EncoderLayer, self).__init__()
        self.attention = MultiHeadAttention(d_model=d_model, num_heads=num_heads)
        self.norm1 = LayerNormalization(parameters_shape=[d_model])
        self.dropout1 = nn.Dropout(p=drop_prob)
        self.ffn = PositionwiseFeedForward(d_model=d_model, hidden=ffn_hidden, drop_prob=drop_prob)
        self.norm2 = LayerNormalization(parameters_shape=[d_model])
        self.dropout2 = nn.Dropout(p=drop_prob)

    def forward(self, x):
        residual_x = x
        print("------- Encoder Multi-Head-Attention ------", '\n')
        x = self.attention(x, mask=None)
        print('\n')
        print( "------- Applying Dropout ------")
        print('\n')
        x = self.dropout1(x)
        print( "------- Addition & Normalization Layer-1 ------", '\n')
        x = self.norm1(x + residual_x)
        residual_x = x
        print('\n')
        print(  "------- Encoder Feed Forward Neural Network ------",'\n')
        x = self.ffn(x)
        print('\n')
        print("------- Applying Dropout ------")
        x = self.dropout2(x)
        print('\n')
        print("------- Addition & Normalization Layer-2 ------",'\n')
        x = self.norm2(x + residual_x)
        return x

class Encoder(nn.Module):
    def __init__(self, d_model, ffn_hidden, num_heads, drop_prob, num_layers):
        super().__init__()
        self.layers = nn.Sequential(*[EncoderLayer(d_model, ffn_hidden, num_heads, drop_prob)
                                     for _ in range(num_layers)])

    def forward(self, x):
        x = self.layers(x)
        return x



In [29]:
class MultiHeadCrossAttention(nn.Module):

    def __init__(self, d_model, num_heads):

      super().__init__()
      self.d_model = d_model
      self.num_heads = num_heads
      self.head_dim = d_model//num_heads
      self.kv_layer = nn.Linear(d_model, 2*d_model)
      self.q_layer = nn.Linear(d_model, d_model)
      self.linear_layer = nn.Linear(d_model, d_model)


    def forward(self, x, y, mask=None):

      batch_size, sequence_length, d_model = x.size()
      print(f"x.size(): {x.size()}")

      kv = self.kv_layer(x)
      q = self.q_layer(y)
      print(f"kv.size(): {kv.size()}")
      print(f"q.size(): {q.size()}")

      kv = kv.reshape(batch_size, sequence_length, self.num_heads, 2*self.head_dim)
      q = q.reshape(batch_size, sequence_length, self.num_heads, self.head_dim)
      print(f"kv.size(): {kv.size()}")
      print(f"q.size(): {q.size()}")

      kv = kv.permute(0, 2, 1, 3)
      q = q.permute(0, 2, 1, 3)
      print(f"after permute - kv.size(): {kv.size()}")
      print(f"after permute - q.size(): {q.size()}")

      k,v = kv.chunk(2, dim=-1)
      print( f"k,v sizes: {k.size()} " )

      values, attention = scaled_dot_product(q, k, v, mask)
      print(f"values.size(): {values.size()}" )
      print( f"attention.size:{ attention.size()}" )

      values =  values.permute(0, 2, 1, 3)
      values = values.reshape(batch_size, sequence_length, d_model)
      out = self.linear_layer(values)
      print(f"values.size(): {values.size()}")
      print(f"out.size(): {out.size()}")

      return out

In [30]:
class DecoderLayer(nn.Module):

  def __init__(self, d_model, ffn_hidden, num_heads, drop_prob):
    super(DecoderLayer, self).__init__()
    self.self_attention = MultiHeadAttention(d_model=d_model, num_heads=num_heads)
    self.norm1 = LayerNormalization(parameters_shape=[d_model])
    self.dropout1 = nn.Dropout(p=drop_prob)
    self.encoder_decoder_attention = MultiHeadCrossAttention(d_model=d_model, num_heads=num_heads)
    self.norm2 = LayerNormalization(parameters_shape=[d_model])
    self.dropout2 = nn.Dropout(p=drop_prob)
    self.ffn = PositionwiseFeedForward(d_model=d_model, hidden=ffn_hidden, drop_prob=drop_prob)
    self.norm3 = LayerNormalization(parameters_shape=[d_model])
    self.dropout3 = nn.Dropout(p=drop_prob)


  def forward(self, x, y, decoder_mask):

    residual_y = y
    print('\n')
    print("----- Decoder Masked-Multi-Head-Attention -----", '\n')
    y = self.self_attention(y, mask=decoder_mask)
    print('\n')
    print('----- Applying Dropout ------')
    print('\n')
    y = self.dropout1(y)
    print('----- Addition & Normmalization Layer-1 -----', '\n')
    y = self.norm1(y+residual_y)
    residual_y = y
    print('\n')
    print('------ Decoder Cross-Multi-Head-Atention ------', '\n')
    y = self.encoder_decoder_attention(x, y, mask=None)
    print('\n')
    print('------ Applying Dropout ------')
    print('\n')
    y = self.dropout2(y)
    print('----- Addition & Normmalization Layer-2 -----', '\n')
    y = self.norm2(y+residual_y)
    residual_y = y
    print('\n')
    print('------ Feed Forward Neural Neywork ------', '\n')
    y = self.ffn(y)
    print('\n')
    print('----- Applying Dropout ------')
    print('\n')
    y = self.dropout3(y)
    print('----- Addition & Normmalization Layer-3 -----', '\n')
    y = self.norm3(y+residual_y)

    return y


In [31]:
class SequentialDecoder(nn.Sequential):
  def forward(self, *inputs):
    x, y, mask = inputs
    for module in self._modules.values():
      y = module(x, y, mask)
    return y

In [32]:
class Decoder(nn.Module):
  def __init__(self, d_model, ffn_hidden, num_heads, drop_prob, num_layers=1):
    super().__init__()
    self.layers  = SequentialDecoder(*[ DecoderLayer(d_model, ffn_hidden, num_heads, drop_prob )
                                        for _ in range(num_layers)])

  # Fix: Correct the indentation of the forward method
  def forward(self, x, y, mask):
    y = self.layers(x, y, mask)
    return y

In [33]:
# x = torch.randn( (batch_size, max_sequence_length, d_model) ) # Positional Encoding of input label
# y = torch.randn( (batch_size, max_sequence_length, d_model) ) # Positional Encoding of output label
# mask = torch.full([max_sequence_length, max_sequence_length] , float('-inf'))
# mask = torch.triu(mask, diagonal=1)
# encoder = Encoder(d_model, ffn_hidden, num_heads, drop_prob, num_layers)
# out = encoder(x)
# decoder = Decoder(d_model, ffn_hidden, num_heads, drop_prob, num_layers)
# out = decoder(x, y, mask)

In [34]:
class Transformer(nn.Module):

    def __init__(self, d_model, ffn_hidden, num_heads, drop_prob, num_encoder_layers, num_decoder_layers, max_sequence_length, vocab_size):
        super(Transformer, self).__init__()
        self.encoder = Encoder(d_model, ffn_hidden, num_heads, drop_prob, num_encoder_layers)
        self.decoder = Decoder(d_model, ffn_hidden, num_heads, drop_prob, num_decoder_layers)
        self.positional_encoding = PositionalEncoding(d_model, max_sequence_length)
        self.final_layer = nn.Linear(d_model, vocab_size)
        self.softmax = nn.Softmax(dim=-1)

    def forward(self, x, y, mask):
        x = x + self.positional_encoding()
        y = y + self.positional_encoding()
        encoder_output = self.encoder(x)
        decoder_output = self.decoder(encoder_output, y, mask)
        logits = self.final_layer(decoder_output)
        final_output = self.softmax(logits)
        return decoder_output, final_output


In [35]:
d_model = 512
num_heads = 8
drop_prob = 0.1
batch_size = 30
max_sequence_length = 200
ffn_hidden = 2048
num_encoder_layers = 1
num_decoder_layers = 1
vocab_size = 10000

x = torch.randn((batch_size, max_sequence_length, d_model))  # input sequence
y = torch.randn((batch_size, max_sequence_length, d_model))  # target sequence
mask = torch.full([max_sequence_length, max_sequence_length], float('-inf'))
mask = torch.triu(mask, diagonal=1)

transformer = Transformer(d_model, ffn_hidden, num_heads, drop_prob, num_encoder_layers, num_decoder_layers, max_sequence_length, vocab_size)
decoder_output, final_output = transformer(x, y, mask)

------- Encoder Multi-Head-Attention ------ 

x.size(): torch.Size([30, 200, 512])
qkv.size(): torch.Size([30, 200, 1536])
qkv.size(): torch.Size([30, 200, 8, 192])
after permute - qkv.size(): torch.Size([30, 8, 200, 192])
q,k,v sizes: torch.Size([30, 8, 200, 64]) 
scaled.size() : torch.Size([30, 8, 200, 200])
values.size(): torch.Size([30, 8, 200, 64]),
attention.size:torch.Size([30, 8, 200, 200]) 
values.size(): torch.Size([30, 200, 512])
out.size(): torch.Size([30, 200, 512])


------- Applying Dropout ------


------- Addition & Normalization Layer-1 ------ 

Mean (torch.Size([30, 200, 1]))
Standard Deviation  (torch.Size([30, 200, 1]))
y: torch.Size([30, 200, 512])
self.gamma: torch.Size([512]), self.beta: torch.Size([512])
out: torch.Size([30, 200, 512])


------- Encoder Feed Forward Neural Network ------ 

x after first linear layer: torch.Size([30, 200, 2048])
x after activation: torch.Size([30, 200, 2048])
x after dropout: torch.Size([30, 200, 2048])
x after 2nd linear layer:

In [36]:
print('Decoder Output Shape:- ', decoder_output.size())
print('Final Output Shape:- ', final_output.size())

Decoder Output Shape:-  torch.Size([30, 200, 512])
Final Output Shape:-  torch.Size([30, 200, 10000])
