In [1]:
import torch
import numpy as np
import torch.nn as nn

In [2]:
## parameters to define first

# X is an input matrix, having shape (seq_len, dmodel)
# Wq is query trainable, having shape (dmodel, dk)
# Wk is key trainable, having shape (dmodel, dk)
# Wv is value trainable, having shape (dmodel, dk), dv = dk
# Wo trainable, having shape (dk*h, dmodel)
# Q = X @ Wq, shape (seq_len, dk)
# K = X @ Wk, shape (seq_len, dk)
# V  = X @ Wv, shape (seq_len, dk), dk= dv
# A = Attention(Q, K.t()), shape (seq_len, seq_len) where Attention(Q, K.t()) = (Q @ K.t())/sqrt(dk)
# Masking -- shape(seq_len, seq_len)
# then  Softmax(A), shape (seq_len, seq_len)
# second_last => Softmax(A) @ V, shape (seq_len, dk)
# lastly,  (Softmax(A) @ V) @ Wo, shape (seq_len, dmodel)

In [3]:
X = torch.tensor([
    [0.72,0.45,0.31], # Dream
    [0.75,0.20,0.55], # big
    [0.30,0.80,0.40], # and
    [0.85,0.35,0.60], # work
    [0.55,0.15,0.75], # for
    [0.25,0.20,0.85] # it
])

In [4]:
## implementation of Scaled Dot Product Attention using Class





class CausalAttentionSingleHead(nn.Module):
    def __init__(self, dk, dmodel, dropout):
        super(CausalAttentionSingleHead, self).__init__()
        self.dk = dk
        self.dmodel = dmodel
        self.weight_query = nn.Linear(self.dmodel, self.dk)
        self.weight_value = nn.Linear(self.dmodel, self.dk)
        self.weight_key = nn.Linear(self.dmodel, self.dk)
        ## defining Softmax
        self.softmax = nn.Softmax(dim = -1)
        ## defining dropout
        self.dropout = nn.Dropout(dropout)


    def forward(self, X):
        # calculating the Query, Key and Value tensors
        Q = self.weight_query(X)
        K = self.weight_key(X)
        V = self.weight_value(X)
        # calculating the attention score and scaling
        attn_score = (Q @ K.t())/( self.dk**0.5)
        # applying masking, first defining mask then applying
        mask = torch.triu(torch.ones((X.shape[0], X.shape[0])), diagonal=1)
        masked_attn_score = attn_score.masked_fill(mask.bool(), -torch.inf)
        # calculating the Softmax of the attn_score and adding dropouts
        A = self.dropout(self.softmax(masked_attn_score))
        # multiplying A with V
        return A @ V, A
    





In [5]:
obj = CausalAttentionSingleHead(dk = 2, dropout=0.2, dmodel= X.shape[1])

In [6]:
obj.forward(X)

(tensor([[0.2055, 1.1526],
         [0.2465, 1.2458],
         [0.2133, 1.1259],
         [0.1262, 0.6373],
         [0.1801, 0.9241],
         [0.1382, 0.6207]], grad_fn=<MmBackward0>),
 tensor([[1.2500, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000],
         [0.6336, 0.6164, 0.0000, 0.0000, 0.0000, 0.0000],
         [0.4398, 0.4349, 0.3753, 0.0000, 0.0000, 0.0000],
         [0.3227, 0.3166, 0.0000, 0.0000, 0.0000, 0.0000],
         [0.2613, 0.2496, 0.2490, 0.0000, 0.2349, 0.0000],
         [0.2226, 0.0000, 0.0000, 0.2145, 0.0000, 0.1886]],
        grad_fn=<MulBackward0>))

In [7]:
import torch
X = torch.tensor([
    [0.72,0.45,0.31], # Dream
    [0.75,0.20,0.55], # big
    [0.30,0.80,0.40], # and
    [0.85,0.35,0.60], # work
    [0.55,0.15,0.75], # for
    [0.25,0.20,0.85] # it
])

In [8]:
from model.attention import CausalAttentionSingleHead

In [9]:
causal_attn = CausalAttentionSingleHead(dk = 512, dmodel = 3, dropout=0.2)

In [10]:
causal_attn.forward(X)

tensor([[-0.2411, -0.9794,  1.2128,  ...,  0.5255, -0.1772, -0.1950],
        [-0.1152, -0.4679,  0.5794,  ...,  0.2511, -0.0847, -0.0931],
        [-0.3026, -0.9404,  1.1810,  ...,  0.6016, -0.1390, -0.2028],
        [-0.1690, -0.4631,  0.6101,  ...,  0.3045, -0.0395, -0.1343],
        [-0.2308, -1.0082,  1.1772,  ...,  0.6957, -0.0901, -0.1754],
        [-0.2192, -0.7978,  0.9430,  ...,  0.5812, -0.0740, -0.1420]],
       grad_fn=<MmBackward0>)

## Masked MultiHead Attention Implementation 

In [11]:
import torch
import torch.nn as nn
import numpy as np
import math


In [12]:
class MultiHeadAttention(nn.Module):
    def __init__(self, h, dmodel, dropout):
        super(MultiHeadAttention, self).__init__()
        self.h = h
        self.dmodel = dmodel
        assert self.dmodel % self.h == 0, "head and dmodel configuration failed"
        self.dk = self.dmodel // self.h
        self.weight_query = nn.Linear(dmodel, dmodel)
        self.weight_key = nn.Linear(dmodel, dmodel)
        self.weight_value = nn.Linear(dmodel, dmodel)
        self.w_o = nn.Linear(dmodel, dmodel)
        # softmax
        self.softmax = nn.Softmax(dim = -1)
        # drop out
        self.dropout = nn.Dropout(dropout)

    def forward(self, X):
        seq_len = X.shape[0]
        Q = self.weight_query(X)
        K = self.weight_key(X)
        V = self.weight_value(X)
        Q_head = torch.permute(torch.reshape(Q, shape=(seq_len, self.h, self.dk)), dims= (1, 0, 2))
        K_head = torch.permute(torch.reshape(K, shape=(seq_len, self.h, self.dk)), dims= (1, 0, 2))
        V_head = torch.permute(torch.reshape(V, shape=(seq_len, self.h, self.dk)), dims= (1, 0, 2))
        # calculating attention score and scaling
        attn_score = (torch.matmul(Q_head, K_head.transpose(-1,-2)))/(math.sqrt(self.dk))
        # applying masking
        mask = torch.triu(torch.ones(seq_len, seq_len), diagonal=1)
        attn_score_masked = attn_score.masked_fill(mask.bool(), -torch.inf)
        # applying softmax
        attn_weights = self.softmax(attn_score_masked)
        # applying dropout
        attn_weights_with_dropout = self.dropout(attn_weights)
        # multiplying with V
        A = torch.matmul(attn_weights_with_dropout, V_head)
        # concatenation of the vector
        concate_heads = torch.reshape(torch.permute(A, dims = (1, 0, 2)), shape = (seq_len, self.dmodel))
        return self.w_o(concate_heads)
        







In [13]:
MHA = MultiHeadAttention(h = 4, dmodel= 512, dropout=0.2)

In [14]:
sentences = 10
features = 512

X = torch.randn(sentences, features)


In [15]:
X.shape

torch.Size([10, 512])

In [16]:
MHA.forward(X).shape

torch.Size([10, 512])

## Transformer Block Complete


In [1]:
import torch
import torch.nn as nn
import numpy as np
import math

In [None]:
class TransformerBlock(nn.Module):
    def __init__(self, heads, d_model, dropout):
        super(TransformerBlock, self).__init__()
        self.heads = heads
        self.d_model = d_model
        assert self.d_model % self.heads == 0, "Choose Correct Head Number"
        self.dk = self.d_model // self.heads
        # defining dropout
        self.dropout = nn.Dropout(dropout)
        # defining softmax
        self.softmax = nn.Softmax(dim=-1)
        # query_weight, key_weight, value_weight
        self.weight_query = nn.Linear(self.d_model, self.d_model)
        self.weight_key = nn.Linear(self.d_model, self.d_model)
        self.weight_value = nn.Linear(self.d_model, self.d_model)
        self.w_o = nn.Linear(self.d_model, self.d_model)
        self.ffn1 = nn.Linear(self.d_model, self.d_model*4)
        self.ffn2 = nn.Linear(self.d_model*4, self.d_model)
        self.layer_norm_1 = nn.LayerNorm(self.d_model)
        self.layer_norm_2 = nn.LayerNorm(self.d_model)

    def forward(self, X):
        # extract shape of seq_len
        seq_len = X.shape[1]
        batch_size = X.shape[0]

        ##layer normalization
        layer_norm1_output = self.layer_norm_1(X)

        #project X (B, SEQ_LEN, Dmodel) into (B, Dmodel, Dmodel)
        Q = self.weight_query(layer_norm1_output)
        K = self.weight_key(layer_norm1_output)
        V = self.weight_value(layer_norm1_output)
        Q_heads = torch.permute(torch.reshape(Q, shape = (batch_size, seq_len, self.heads, self.dk)), dims = (0, 2, 1,3))
        K_heads = torch.permute(torch.reshape(K, shape = (batch_size, seq_len, self.heads, self.dk)), dims = (0, 2, 1,3))
        V_heads = torch.permute(torch.reshape(V, shape = (batch_size, seq_len, self.heads, self.dk)), dims = (0, 2, 1,3))

        # calculate attn score and scale
        attn_score = (torch.matmul(Q_heads, K_heads.transpose(-1, -2)))/(math.sqrt(self.dk))

        # masking 
        mask = torch.triu(torch.ones(seq_len, seq_len), diagonal=1)
        # applying mask
        attn_score_with_mask = attn_score.masked_fill(mask.bool(), -torch.inf)
        # applying softmax
        attn_weight_with_mask = self.softmax(attn_score_with_mask)
        # apply dropouts 
        attn_wgts_mask_drpt = self.dropout(attn_weight_with_mask)

        ### multiply with V_heads
        A = torch.matmul(attn_wgts_mask_drpt, V_heads)
        ## concatenation
        concat_A = torch.reshape(torch.permute(A, dims = (0,2,1,3)), shape = (batch_size, seq_len, self.d_model))
        output_masked_attn = self.w_o(concat_A)
        #return layer_norm1_output

        # residual connection 1
        residual_connection_1 = X + output_masked_attn

        #layer norm 2
        layer_norm_2_output =self.layer_norm_2(residual_connection_1)
        # ffnn
        ffn1 = self.ffn1(layer_norm_2_output)
        # activation
        activation = nn.ReLU(ffn1)
        # final layer
        ffn2 = self.ffn2(activation)
        
        # residual connection 2
        final_output = layer_norm_2_output + ffn2

        #return the fnal vector
        return final_output




        



In [3]:
block = TransformerBlock(heads = 8, d_model=512, dropout=0.2)

In [4]:
X = torch.randn(32, 20, 512)

In [6]:
block.forward(X)

tensor([[[-0.7894,  1.9269, -1.8423,  ...,  0.3329, -0.7462, -1.6045],
         [ 0.4369,  0.0260, -0.4353,  ...,  1.3975,  0.0652, -0.9650],
         [ 0.3732,  0.3503,  0.5452,  ..., -1.3787, -1.0930,  0.3708],
         ...,
         [-0.7461, -0.4691, -0.3380,  ...,  0.7276, -0.5263, -0.4840],
         [ 1.5050,  0.4982,  0.6430,  ...,  0.9810,  0.0398, -0.5948],
         [ 1.8314,  1.3456,  0.1464,  ...,  0.9975, -1.4653,  0.9642]],

        [[-0.6528,  1.3612, -0.4787,  ...,  0.8576,  0.5415, -1.0688],
         [ 1.9451, -0.7653,  0.4269,  ...,  1.1427, -0.3564,  1.8415],
         [ 0.3862,  0.2472,  0.3377,  ...,  1.1954, -1.1006,  0.4049],
         ...,
         [ 0.5666,  0.6158, -0.5619,  ...,  0.4627,  0.8919,  0.9075],
         [-0.6789, -0.1943, -1.0082,  ..., -0.0751,  1.0986, -0.3521],
         [ 0.8874, -0.3992,  0.5513,  ..., -1.4385,  0.1640, -0.3817]],

        [[ 1.5878,  0.7143,  1.7367,  ...,  0.8599, -0.2548, -0.3510],
         [ 0.4049, -0.4799,  0.0255,  ..., -1

In [None]:
## class for ffnn

import torch
import torch.nn as nn
import numpy as np


class PointWiseFeedForward(nn.Module):
    "Implements Point Wise Feedforward Neural Network"
    def __init__(self, d_model, dffn, dropout):
        super(PointWiseFeedForward, self).__init__()
        self.dropout = nn.Dropout(dropout)
        #defining layers
        self.w_1 = nn.Linear(d_model, dffn)
        self.w_2 = nn.Linear(dffn, d_model)

        # defining forward function
        def forward(self, X):
            return self.dropout(self.w_2_2(nn.ReLU(self.w_1(X))))