<a href="https://colab.research.google.com/github/pavanreddy565/Transformers-Learning/blob/main/Decoder_in_Transfomrer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import math
from torch import nn
import torch.nn.functional as F

In [None]:
def scaled_dot_product(q,k,v,mask=None):
    d_k = q.size()[-1]
    scaled = torch.matmul(q, k.transpose(-1,-2)) / math.sqrt(d_k)
    print(f'scaled.size() : {scaled.size()}')
    if mask is not None:
        print(f'-- ADDING MASK of shape {mask.size()} --')
        # Broadcasting add. So just the last N dimensions need to match
        scaled += mask
    attention = F.softmax(scaled, dim=-1)
    values = torch.matmul(attention,v)
    return values, attention

In [None]:
class MultiHeadAttention(nn.Module):

    def __init__(self, d_model, num_heads):
        super().__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads
        self.qkv_layer = nn.Linear(d_model, 3*d_model)
        self.linear_layer = nn.Linear(d_model, d_model)
    def forward(self, x, mask=None):
        batch_size, max_sequence_length, d_model = x.size()
        print(f'x.size(): {x.size()}')
        qkv = self.qkv_layer(x)
        print(f'qkv.size(): {qkv.size()}')
        qkv = qkv.reshape(batch_size, max_sequence_length, self.num_heads, 3 * self.head_dim)
        print(f'qkv.size(): {qkv.size()}')
        qkv = qkv.permute(0, 2, 1, 3)
        print(f'qkv.size(): {qkv.size()}')
        q, k, v = qkv.chunk(3, dim=-1)
        print(f'q.size(): {q.size()}, k.size(): {k.size()}, v.size(): {v.size()}')
        values, attention = scaled_dot_product(q, k, v, mask)
        print(f'values.size(): {values.size()}, attention.size(): {attention.size()}')
        values = values.reshape(batch_size, max_sequence_length, self.num_heads * self.head_dim)
        print(f"values.size(): {values.size()}")
        out = self.linear_layer(values)
        print(f"out.size(): {out.size()}")
        return out


In [None]:
class LayerNormalization(nn.Module):
    def __init__(self, parameters_shape, eps= 1e-5):
        super().__init__()
        self.parameter_shape = parameters_shape
        self.eps = eps
        self.gamma = nn.Parameter(torch.ones(parameters_shape))
        self.beta = nn.Parameter(torch.zeros(parameters_shape))
    def forward(self, inputs):
        dims = [-(i+1) for i in range(len(self.parameter_shape))]
        mean = inputs.mean(dim=dims, keepdim= True)
        print(f'mean ({mean.size()})')
        var=  ((inputs - mean) ** 2).mean(dim=dims, keepdim= True)
        std = (var + self.eps).sqrt()
        print(f'Standard Deviation ({std.size()})')
        y = (inputs - mean) / std
        print(f'y: {y.size()}')
        out = self.gamma * y + self.beta
        print(f'self.gamma: {self.gamma.size()}, self.beta: {self.beta.size()}')
        print(f'out: {out.size()}')
        return out



In [None]:
class PositionWiseFeedForward(nn.Module):

    def __init__(self, d_model, hidden, drop_prob=0.1):
        super().__init__()
        self.linear1 = nn.Linear(d_model,hidden)
        self.linear2 = nn.Linear(hidden, d_model)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(p=drop_prob)

    def forward(self, x):
        x = self.linear1(x)
        print(f'x after first linear layer : {x.size()}')
        x = self.relu(x)
        print(f'x after activation: {x.size()}')
        x = self.dropout(x)
        print(f'x after dropout: {x.size()}')
        x = self.linear2(x)
        print(f'x after 2nd linear layer: {x.size()}')
        return x


In [None]:
class MulitHeadCrossAttention(nn.Module):

    def __init__(self, d_model, num_heads):
        super().__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads
        self.kv_layer = nn.Linear(d_model, 2 * d_model)
        self.q_layer = nn.Linear(d_model, d_model)
        self.linear_layer = nn.Linear(d_model, d_model)

    def forward(self, x, y , mask=None):
        batch_size, sequence_length, d_model = x.size()
        print(f'x.size(): {x.size()}')
        kv = self.kv_layer(x)
        print(f'kv.size(): {kv.size()}')
        q = self.q_layer(y)
        print(f'q.size(): {q.size()}')
        kv = kv.reshape(batch_size, sequence_length, self.num_heads, 2* self.head_dim)  # 30 x 200 x 8 x 128
        q = q.reshape(batch_size, sequence_length, self.num_heads, self.head_dim)  # 30 x 200 x 8 x 64
        kv = kv.permute(0, 2, 1, 3) # 30 x 8 x 200 x 128
        q = q.permute(0, 2, 1, 3) # 30 x 8 x 200 x 64
        k,v = kv.chunk(2, dim=-1)
        values, attention = scaled_dot_product(q, k, v, mask)
        print(f'values: {values.size()}, attention: {attention.size()}')
        values = values.reshape(batch_size, sequence_length, d_model)
        out = self.linear_layer(values)
        print(f'out after passing through linear layer: {out.size()}')
        return out


In [None]:
class DecoderLayer(nn.Module):

    def __init__(self, d_model, ffn_hidden, num_heads, drop_prob):
        super().__init__()
        self.self_attention = MultiHeadAttention(d_model, num_heads )
        self.norm1 = LayerNormalization([d_model])
        self.dropout1 = nn.Dropout(p=drop_prob)
        self.encoder_decoder_attention = MulitHeadCrossAttention(d_model, num_heads)
        self.norm2 = LayerNormalization([d_model])
        self.dropout2 = nn.Dropout(p=drop_prob)
        self.ffn = PositionWiseFeedForward(d_model, ffn_hidden, drop_prob)
        self.norm3 = LayerNormalization([d_model])
        self.dropout3 = nn.Dropout(p=drop_prob)

    def forward(self,x, y, decoder_mask):
        _y = y
        print('masked self attention')
        y = self.self_attention(y, mask = decoder_mask)
        print('DROP OUT 1')
        y = self.dropout1(y)
        print('Add + Layer Normalization 1')
        y = self.norm1(y + _y)
        _y = y

        print('CROSS ATTENTION')
        y = self.encoder_decoder_attention(x, y, mask=None)
        print('DROP OUT 2')
        y = self.dropout2(y)
        print('ADD + LAYER NORMALIZATION 2')
        y = self.norm2(y + _y)

        _y = y
        print('FEED FORWARD 1')
        y = self.ffn(y)
        print('DROP OUT 3')
        y = self.dropout3(y)
        print('ADD + LAYER NORMALIZATION 3')
        y = self.norm3(y + _y)
        return y

In [None]:
class SequentialDecoder(nn.Sequential):
    def forward(self, *inputs):
        x, y, mask = inputs
        for module in self._modules.values():
            y = module(x,y, mask)
        return y

class Decoder(nn.Module):
    def __init__(self, d_model, ffn_hidden, num_heads, drop_prob, num_layers = 1):
        super().__init__()
        self.layers = SequentialDecoder(*[DecoderLayer(d_model, ffn_hidden, num_heads, drop_prob) for _ in range(num_layers)])

    def forward(self, x, y, mask):
        y = self.layers(x, y, mask)
        return y


In [None]:
d_model = 512
num_heads = 8
drop_prob = 0.1
batch_size = 30
max_sequence_length = 200
ffn_hidden = 2048
num_layers = 5

x = torch.randn( (batch_size, max_sequence_length, d_model) ) # English sentence positional encoded
y = torch.randn( (batch_size, max_sequence_length, d_model) ) # Telugu sentence positional encoded
mask = torch.full([max_sequence_length, max_sequence_length] , float('-inf'))
mask = torch.triu(mask, diagonal=1)
decoder = Decoder(d_model, ffn_hidden, num_heads, drop_prob, num_layers)
out = decoder(x, y, mask)

masked self attention
x.size(): torch.Size([30, 200, 512])
qkv.size(): torch.Size([30, 200, 1536])
qkv.size(): torch.Size([30, 200, 8, 192])
qkv.size(): torch.Size([30, 8, 200, 192])
q.size(): torch.Size([30, 8, 200, 64]), k.size(): torch.Size([30, 8, 200, 64]), v.size(): torch.Size([30, 8, 200, 64])
scaled.size() : torch.Size([30, 8, 200, 200])
-- ADDING MASK of shape torch.Size([200, 200]) --
values.size(): torch.Size([30, 8, 200, 64]), attention.size(): torch.Size([30, 8, 200, 200])
values.size(): torch.Size([30, 200, 512])
out.size(): torch.Size([30, 200, 512])
DROP OUT 1
Add + Layer Normalization 1
mean (torch.Size([30, 200, 1]))
Standard Deviation (torch.Size([30, 200, 1]))
y: torch.Size([30, 200, 512])
self.gamma: torch.Size([512]), self.beta: torch.Size([512])
out: torch.Size([30, 200, 512])
CROSS ATTENTION
x.size(): torch.Size([30, 200, 512])
kv.size(): torch.Size([30, 200, 1024])
q.size(): torch.Size([30, 200, 512])
scaled.size() : torch.Size([30, 8, 200, 200])
values: torch.