#### **This notebook implements the Transformer Encoder Architecture from scratch.**

In [1]:
import math
import torch
import torch.nn as nn

### **Input and positional encoding.**

In [2]:
class InputEmbeddingBlock(nn.Module):
    def __init__(self, 
                 vocab_size, 
                 embed_dim, 
                 max_seq_len
                ):
        super().__init__()
        #this holds the embedding of all the vocab
        #this is used for the lookup in the forward stage
        self.token_embeding = nn.Embedding(vocab_size,
                                           embed_dim
                                          )
        print('Maximum vocab size of the entire system: ',
              vocab_size
             )
        print('Embedding of each token: ', 
              embed_dim
             )
        print('Maximum sequence length: ',
              max_seq_len
             )
        
        self.positional_embedding = self.get_positional_encoding(
            max_seq_len,
            embed_dim
        )

    def get_positional_encoding(self, 
                                max_seq_len, 
                                d_model
                               ):
        #max_seq_len is the maximum sequence length
        #possible
        #d_model is the dimension of the embedding
        #rows capture positional information
        #column catpures frequency information
        pe = torch.zeros(max_seq_len,
                         d_model
                        )

        #positions of words
        position = torch.arange(0, 
                                max_seq_len,
                                dtype=torch.float
                               ).unsqueeze(1)
        print('Total positions to be considered for the positional encoding: ',
              d_model
             )
        
        even_indices =torch.arange(0, 
                                   d_model, 
                                   2).float()
        
        div_term = torch.exp(even_indices * (
            -math.log(10000.0) / d_model)
                            )

        pe[:, 0::2] = torch.sin(position*div_term)
        pe[:, 1::2] = torch.cos(position*div_term)
        
        print('Positional encoding dimension (maximum_sequence_length, size_of_position_encoding): ', 
              pe.size()
             )

        
        return pe


    def forward(self, x):
        #x: (batch_size, seq_len)
        seq_len = x.size(1)

        #(seq_len, d_model)
        token_embedding = self.token_embeding(x)
        #(seq_len, d_model)
        pos_emb = self.positional_embedding[:seq_len,:]

        #output of this is (seq_len, d_model)
        return token_embedding + pos_emb, pos_emb

In [3]:
vocab_size = 200
embed_dim = 100
max_seq_len = 20
batch_size = 2
seq_len = 10
model = InputEmbeddingBlock(vocab_size, 
                            embed_dim,
                            max_seq_len
                           )
input_tokens = torch.randint(0, vocab_size,
                          (batch_size, seq_len)
                         )
#these are input tokens of the sequence

print('\n\n\n')
print('Input tokens : ', input_tokens)

Maximum vocab size of the entire system:  200
Embedding of each token:  100
Maximum sequence length:  20
Total positions to be considered for the positional encoding:  100
Positional encoding dimension (maximum_sequence_length, size_of_position_encoding):  torch.Size([20, 100])




Input tokens :  tensor([[187,  86, 183,  51,  61, 184,  11,  19,  74, 156],
        [163, 160,  18,  98, 129,  65, 159,  27, 106,  98]])


In [4]:
output, pos_emb = model(input_tokens)
print(output.size())

torch.Size([2, 10, 100])


#### **Self-Attention**

In [5]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, number_of_head):
        super().__init__()
        self.embed_dim = d_model
        self.number_of_head = int(number_of_head)
        self.head_dim = int(self.embed_dim / self.number_of_head)

        print('\n Embedding size: ', self.embed_dim)
        print('Number of heads: ', self.number_of_head)
        print('Head dimension: ', self.head_dim)
        
    
        #These project to the total dimension across all head.
        self.W_q = nn.Linear(self.embed_dim, 
                             self.embed_dim, 
                             bias=False
                            )
        self.W_k = nn.Linear(self.embed_dim,
                             self.embed_dim,
                             bias=False
                            )
        self.W_v = nn.Linear(self.embed_dim,
                             self.embed_dim,
                             bias=False
                            )
    
        self.scale = 1/ math.sqrt(self.head_dim)

        self.output_proj = nn.Linear(embed_dim,
                                     embed_dim
                                    )
        

    def forward(self, x):
        batch_size, seq_len, embed_dim = x.shape

        #This results in batch_size, seq_len,
        #embed_dim
        Q_all = self.W_q(x) 
        K_all = self.W_k(x)
        V_all = self.W_v(x)

        #Split the heads
        Q = Q_all.view(batch_size, seq_len, 
                       self.number_of_head, 
                       self.head_dim
                      )
        K = K_all.view(batch_size, seq_len, 
                       self.number_of_head, 
                       self.head_dim
                      )
        V = V_all.view(batch_size, seq_len, 
                       self.number_of_head, 
                       self.head_dim
                      )

        print('Dimension of Q, K and V, (batch_size, seq_len, number of head, head_dim): ',
              V.size()
             )

        #batch_size, number of head,
        #seq length, head_dim
        Q = Q.transpose(1, 2)
        K = K.transpose(1, 2)
        V = V.transpose(1, 2)

        scores = torch.matmul(Q, 
                              #batch_size, number of head,
                              #head_dim, seq_length
                              K.transpose(-2, -1)
                             )*self.scale
        #output of scores = (batch_size, number_of_head,
        #seq_length, seq_length

        print('Dimension of scores: (batch_size, number of head, seq length, seq length), ', 
              scores.size()
             )
        
        attention_weights = torch.softmax(scores,
                                          dim=-1
                                         )
        #output of attention_weights = (batch_size,
        #number_of_head, seq_length, seq_length)
        print('Dimension of attention weights: (batch size, number of head, seq_lenght, seq_length) ',
              attention_weights.size()
             )

        output = torch.matmul(attention_weights,
                              V
                             )
        #(batch_size, number_of_head,
        #seq_length, head_dim)
        print('Dimension of output (batch_size, number of head, seq length, head_dim) :',
              output.size()
             )

        output = output.transpose(1, 2).contiguous()
        output = output.view(batch_size,
                             seq_len,
                             embed_dim
                            )
        print('Dimension of output after projection (batch_size, seq_len, embed_dim) :',
              output.size()
             )

        return self.output_proj(output)

In [6]:
print(output.size())

torch.Size([2, 10, 100])


In [7]:
vocab_size = 200
embed_dim = 100
max_seq_len = 20
batch_size = 2
seq_len = 10
number_of_head = 4

multihead = MultiHeadAttention(embed_dim,
                              number_of_head
                              )

output_multihead = multihead(output)


 Embedding size:  100
Number of heads:  4
Head dimension:  25
Dimension of Q, K and V, (batch_size, seq_len, number of head, head_dim):  torch.Size([2, 10, 4, 25])
Dimension of scores: (batch_size, number of head, seq length, seq length),  torch.Size([2, 4, 10, 10])
Dimension of attention weights: (batch size, number of head, seq_lenght, seq_length)  torch.Size([2, 4, 10, 10])
Dimension of output (batch_size, number of head, seq length, head_dim) : torch.Size([2, 4, 10, 25])
Dimension of output after projection (batch_size, seq_len, embed_dim) : torch.Size([2, 10, 100])


In [8]:
output_multihead.size()

torch.Size([2, 10, 100])

#### **Add & Norm and Feed Forward**

In [9]:
#Layer normalization is need to stop gradients from exploding or vanishing

In [10]:
output_after_multihead = output + output_multihead


In [11]:
output_after_multihead.shape

torch.Size([2, 10, 100])

In [12]:
import torch.nn.functional as F
class FeedForward(nn.Module):
    def __init__(self, embed_dim,
                 ff_dim, dropout=0.1
                ):

        super().__init__()
        self.Linear1 = nn.Linear(embed_dim, ff_dim)
        self.Linear2 = nn.Linear(ff_dim, embed_dim)
        self.dropout = nn.Dropout(dropout)
        self.layernorm = nn.LayerNorm(embed_dim)

    def forward(self, x):
        x = F.relu(self.Linear1(x))
        x = self.dropout(x)
        x = self.Linear2(x)
        x = self.layernorm(x)
        

        return x

In [13]:
vocab_size = 200
embed_dim = 100
max_seq_len = 20
batch_size = 2
seq_len = 10
number_of_head = 4
feedforward_dim = 50
dropout = 0.1

feedforward = FeedForward(embed_dim, feedforward_dim)
feedforward_output = feedforward(output_after_multihead)

In [14]:
feedforward_output.shape

torch.Size([2, 10, 100])

## **Decoder Section**

In [15]:
residual = feedforward_output + pos_emb

In [30]:
class MaskedMultiHead(nn.Module):
    def __init__(self, seq_length,
                 embed_size, batch_size,
                 number_of_head,
                 max_seq_length,
                 dropout = 0.1,
                ):
        super().__init__()

        assert embed_dim % number_of_head == 0

        self.embed_dim = embed_dim
        self.num_heads = number_of_head
        self.head_dim = embed_dim // number_of_head
        self.scale = 1.0 / math.sqrt(self.head_dim)

        self.W_q = nn.Linear(self.embed_dim, 
                             self.embed_dim, 
                             bias=False
                            )
        self.W_k = nn.Linear(self.embed_dim,
                             self.embed_dim,
                             bias=False
                            )
        self.W_v = nn.Linear(self.embed_dim,
                             self.embed_dim,
                             bias=False
                            )

        self.dropout = nn.Dropout(dropout)
        self.out_proj = nn.Linear(embed_dim,
                                  embed_dim
                                 )
        self.register_buffer('causal_mask',
                             torch.tril(
                                 torch.ones(max_seq_length,
                                            max_seq_length
                                           )
                             ))
        
        


    def forward(self, x, padding=False):
        batch_size, seq_len, embed_dim = x.shape

        Q = self.W_q(x)
        #x = batch_size, seq_len, embed_dim and W = embed_dim x embed_dim
        K = self.W_k(x)
        V = self.W_v(x)

        Q = Q.reshape(batch_size, seq_len,
                    self.num_heads, self.head_dim
                     )
        K = K.reshape(batch_size, seq_len,
                    self.num_heads, self.head_dim
                     )
        V = V.reshape(batch_size, seq_len,
                    self.num_heads, self.head_dim
                     )
        
        Q = Q.transpose(1, 2)
        K = K.transpose(1, 2)
        V = V.transpose(1, 2)

        scores = torch.matmul(Q, 
                              #batch_size, number of head,
                              #head_dim, seq_length
                              K.transpose(-2, -1)
                             )*self.scale
        causal_mask = self.causal_mask[:seq_len, :seq_len]
        scores = scores.masked_fill(causal_mask == 0,
                                    float('-inf')
                                   )
        #output of scores = (batch_size, number_of_head,
        #seq_length, seq_length

        print('Dimension of scores: (batch_size, number of head, seq length, seq length), ', 
              scores.size()
             )
        
        attention_weights = F.softmax(scores, dim=-1)
        attention_weights = self.dropout(attention_weights)
        
        #output of attention_weights = (batch_size,
        #number_of_head, seq_length, seq_length)
        print('Dimension of attention weights: (batch size, number of head, seq_lenght, seq_length) ',
              attention_weights.size()
             )

        output = torch.matmul(attention_weights,
                              V
                             )
        #(batch_size, number_of_head,
        #seq_length, head_dim)
        print('Dimension of output (batch_size, number of head, seq length, head_dim) :',
              output.size()
             )

        output = output.transpose(1, 2).contiguous()
        output = output.view(batch_size,
                             seq_len,
                             embed_dim
                            )
        print('Dimension of output after projection (batch_size, seq_len, embed_dim) :',
              output.size()
             )
        
        return self.out_proj(output), attention_weights
        
        

In [34]:
vocab_size = 200
embed_dim = 100
max_seq_len = 20
batch_size = 2
seq_len = 10
number_of_head = 4
feedforward_dim = 50
dropout = 0.1

masked_attention = MaskedMultiHead(seq_length=seq_len,
                                   embed_size=embed_dim, 
                                   batch_size=batch_size,
                                   number_of_head=number_of_head,
                                   max_seq_length=max_seq_len,
                                   dropout = dropout
                                  )
masked_output, attention_weights = masked_attention(residual)

Dimension of scores: (batch_size, number of head, seq length, seq length),  torch.Size([2, 4, 10, 10])
Dimension of attention weights: (batch size, number of head, seq_lenght, seq_length)  torch.Size([2, 4, 10, 10])
Dimension of output (batch_size, number of head, seq length, head_dim) : torch.Size([2, 4, 10, 25])
Dimension of output after projection (batch_size, seq_len, embed_dim) : torch.Size([2, 10, 100])


In [35]:
embed_dim = 100
layernorm = nn.LayerNorm(embed_dim)
layernorm_output = layernorm(residual + masked_output)

#### **Decoder Multihead Attention**

In [36]:
# Query from decoder, key and value from encoder

In [57]:
class DecoderCrossAttention(nn.Module):
    def __init__(self, embed_dim,
                 num_heads, 
                 dropout=0.1
                ):
        super().__init__()
        print("Embedding dimension: ", embed_dim)
        print("Number of heads: ", num_heads)
        
        assert embed_dim % num_heads == 0

        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.head_dim = embed_dim // num_heads
        self.scale = 1.0/ math.sqrt(self.head_dim)

        self.W_q = nn.Linear(self.embed_dim, 
                             self.embed_dim, 
                             bias=False
                            )
        self.W_k = nn.Linear(self.embed_dim,
                             self.embed_dim,
                             bias=False
                            )
        self.W_v = nn.Linear(self.embed_dim,
                             self.embed_dim,
                             bias=False
                            )

        self.dropout = nn.Dropout(dropout)
        self.out_proj = nn.Linear(embed_dim,
                                  embed_dim
                                 )
        self.feedforward = nn.Linear(embed_dim,
                                    embed_dim
                                    )
        self.layerNorm = nn.LayerNorm(embed_dim)

    def forward(self, query, key, value,
                key_padding_mask=None
               ):
        """
        Args:
            query: 
            (batch_size, seq_len, embed_size)
            - from decoder
            key: (batch_size, seq_len, embed_size) 
            - from encoder
            value: (batch_size, seq_len, embed_size)
            - from encoder
            
        Returns:
            output: (batch_size,seq_len, embed_size)
            attention_weights: (batch_size, seq_len, seq_len)
        """

        batch_size, query_len, embed_dim = query.shape
        key_len = key.size(1)

        Q = self.W_q(query)
        #x = batch_size, seq_len, embed_dim and W = embed_dim x embed_dim
        K = self.W_k(key)
        V = self.W_v(value)

        Q = Q.reshape(batch_size, query_len,
                    self.num_heads, self.head_dim
                     )
        K = K.reshape(batch_size, key_len,
                    self.num_heads, self.head_dim
                     )
        V = V.reshape(batch_size, key_len,
                    self.num_heads, self.head_dim
                     )
        
        Q = Q.transpose(1, 2)
        # batch_size, self.num_heads, query_len, self.head_dim
        K = K.transpose(1, 2)
        V = V.transpose(1, 2)

        scores = torch.matmul(Q, 
                              K.transpose(-2, -1)
                             )*self.scale
        #batch_size, number of head,
        #query_len, key_len

        print('Dimension of scores: (batch_size, number of head, seq length, seq length), ', 
              scores.size()
             )
        
        attention_weights = F.softmax(scores, 
                                      dim=-1
                                     )
        attention_weights = self.dropout(attention_weights)
        
        #output of attention_weights = (batch_size,
        #number_of_head, query_len, seq_length)
        print('Dimension of attention weights: (batch size, number of head, seq_lenght, seq_length) ',
              attention_weights.size()
             )

        output = torch.matmul(attention_weights,
                              V
                             )
        #(batch_size, number_of_head,
        #seq_length, head_dim)
        print('Dimension of output (batch_size, number of head, seq length, head_dim) :',
              output.size()
             )

        output = output.transpose(1, 2).contiguous()
        output = output.view(batch_size,
                             seq_len,
                             embed_dim
                            )
        print('Dimension of output after projection (batch_size, seq_len, embed_dim) :',
              output.size()
             )

        return self.layerNorm(self.feedforward(self.out_proj(output) + query)), attention_weights
        
        

In [58]:
vocab_size = 200
embed_dim = 100
max_seq_len = 20
batch_size = 2
seq_len = 10
number_of_head = 4
feedforward_dim = 50
dropout = 0.1

decoder_cross_attn = DecoderCrossAttention(
    embed_dim=embed_dim,
    num_heads=number_of_head,
    dropout=0.1
)
decoder_cross_attn_output, attn_cross = decoder_cross_attn(
    query=layernorm_output, 
    key=feedforward_output, 
    value=feedforward_output,
)

Embedding dimension:  100
Number of heads:  4
Dimension of scores: (batch_size, number of head, seq length, seq length),  torch.Size([2, 4, 10, 10])
Dimension of attention weights: (batch size, number of head, seq_lenght, seq_length)  torch.Size([2, 4, 10, 10])
Dimension of output (batch_size, number of head, seq length, head_dim) : torch.Size([2, 4, 10, 25])
Dimension of output after projection (batch_size, seq_len, embed_dim) : torch.Size([2, 10, 100])


In [61]:
# decoder_cross_attn_output[0]

In [62]:
vocab_size = 200
embed_dim = 100
max_seq_len = 20
batch_size = 2
seq_len = 10
number_of_head = 4
feedforward_dim = 50
dropout = 0.1

ff = FeedForward(embed_dim, feedforward_dim)
ff_output = ff(decoder_cross_attn_output) #batch_size, seq_len, embed_dim

In [68]:
class LinearSoftmax(nn.Module):
    def __init__(self, embed_dim, vocab_size):
        self.embed_dim = embed_dim
        self.vocab_size = vocab_size
        self.linear = nn.Linear(embed_dim,
                                vocab_size,
                                bias=True
                               )

    def forward(self, x):
        x = self.linear(x)
        self.softmax = F.softmax(x, dim=-1)

        return self.softmax

In [69]:
vocab_size = 200
embed_dim = 100
max_seq_len = 20
batch_size = 2
seq_len = 10
number_of_head = 4
feedforward_dim = 50
dropout = 0.1

linear = LinearSoftmax(embed_dim=embed_dim,
                       vocab_size=vocab_size
                      )
logits = linear(ff_output)

AttributeError: cannot assign module before Module.__init__() call