In [1]:
import torch
from torch import nn
from torch.functional import F 

In [None]:
class PositionwiseFeedForward(nn.Module):
    def __init__(self, d_model, hidden, drop_prob):
        super(PositionwiseFeedForward, self).__init()
        self.linear1 = nn.Linear(d_model, hidden) # 512 x 2048
        self.linear2 = nn.Linear(hidden, d_model) # 2048 x 512
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(p=drop_prob)

    def forward(self, x): # 30 x 200 x 512
        x = self.linear1(x) # 30 x 200 x 2048
        x = self.relu(x) # 30 x 200 x 2048
        x = self.dropout(x) # 30 x 200 x 2048
        x = self.linear2(x) # 30 x 200 x 512
        return x # 30 x 200 x 512

In [None]:
class LayerNormalization(nn.Module):
    def __init__(self, parameters_shape, eps=1e-5):
        super().__init()
        self.parameters_shape = parameters_shape # 512
        self.eps = eps # to prevent division by zero
        self.gamma = nn.Parameter(torch.ones(parameters_shape)) # [512]
        self.beta = nn.Parameter(torch.zeros(parameters_shape)) # [512]

    def forward(self, x): # 30 x 200 x 512
        dims = [-(i + 1) for i in range(len(self.parametes_shape))] # [-1]
        mean = torch.mean(x, dim=dims, keepdim=True) # 30 x 200 x 1
        var = ((x - mean)**2).mean(dim=dims, keepdim=True) # 30 x 200 x 1
        std = (var + self.eps).sqrt() # 30 x 200 x 1
        y = (x - mean) / std # 30 x 200 x 512
        out = self.gamma * y + self.beta # 30 x 200 x 512
        return out # 30 x 200 x 512

In [None]:
def scaled_dot_product(q, k, v, mask=None):
    dk = q.size()[-1] # 64
    qk = torch.matmul(q, k.transpose((-1, -2))) / torch.sqrt(dk) # 30 x 8 x 200 x 200
    if mask is not None: # 200 x 200
        qk = qk + mask # Broadcasting add. so just the last N dimensions need to match
    attention = F.softmax(qk, dim=-1) # 30 x 8 x 200 x 200
    out = torch.matmul(attention, v) # 30 x 8 x 200 x 64
    return out, attention


class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        self.d_model = d_model # 512
        self.num_heads = num_heads # 8
        self.head_dim = d_model // num_heads # 64
        self.qkv_layer = nn.Linear(d_model, 3 * d_model) # 512 x 1536
        self.linear_layer = nn.Linear(d_model, d_model) # 512 x 512

    def forward(self, x, mask=None):
        batch_size, sequence_length, input_size = x.size() # 30 x 200 x 512
        qkv = self.qkv_layer(x) # 30 x 200 x 1536
        qkv = qkv.reshape(batch_size, sequence_length, self.num_heads, 3 * self.head_dim) # 30 x 200x 8 x 192
        qkv = qkv.permute(0, 2, 1, 3) # 30 x 8 x 200 x 192
        q, k, v = qkv.chunk(3, dim=-1) # 30 x 8 x 200 x 64
        out, attention = scaled_dot_product(q, k, v, mask) # 30 x 8 x 200 x 64 , 30 x 8 x 200 x 200
        out = out.reshape(batch_size, sequence_length, self.num_heads * self.head_dim) # 30 x 200 x 512
        out = self.linear_layer(out) # 30 x 200 x 512
        return out # 30 x 200 x 512 



In [None]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, ffn_hidden, num_heads, drop_prob):
        super(EncoderLayer, self).__init__()
        self.attention = MultiHeadAttention(d_model=d_model, num_heads=num_heads)
        self.norml = LayerNormalization(parameters_shape=[d_model])
        self.dropout1 = nn.Dropout(p=drop_prob)
        self.ffn = PositionwiseFeedForward(d_model=d_model, hidden=ffn_hidden, drop_prob=drop_prob)
        self.norm2 = LayerNormalization(parameters_shape=[d_model])
        self.dropout2 = nn.Dropout(p=drop_prob)

    def forward(self, x):
        residual_x = x # 30 x 200 x 512
        x = self.attention(x, mask=None) # 30 x 200 x 512
        x = self.dropout1(x) # 30 x 200 x 512
        x = self.norml(x + residual_x) # 30 x 200 x 512
        residual_x = x # 30 x 200 x 512
        x = self.ffn(x) # 30 x 200 x 512
        x = self.dropout2(x) # 30 x 200 x 512
        x = self.norm2(x + residual_x) # 30 x 200 x 512
        return x # 30 x 200 x 512

In [None]:
class Encoder(nn.Module):
    def __init__(self, d_model, ffn_hidden, num_heads, drop_prob, num_layers):
        super().__init__()
        self.layers = nn.Sequential(*[EncoderLayer(d_model, ffn_hidden, num_heads, drop_prob)
                                      for _ in range(num_layers)])
        
    def forward(self, x):
        x = self.layers(x)
        return x

In [2]:
d_model = 512
num_heads = 8
drop_prob = 0.1
batch_size = 30
max_sequence_length = 200
ffn_hidden = 2048
num_layers = 5


# Decoder

In [None]:
class MultiHeadCrossAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads
        self.kv_layer = nn.Linear(d_model, 2 * d_model) # 1024
        self.q_layer = nn.Linear(d_model, d_model)
        self.linear_layer = nn.Linear(d_model, d_model)

    def forward(self, x, y, mask=None):
        batch_size, sequence_length, d_model = x.size() # 30 x 200 x 512
        kv = self.kv_layer(x) # 30 x 200 x 1024
        q = self.q_layer(y) # 30 x 200 x 512
        kv = kv.reshape(batch_size, sequence_length, self.num_heads, 2 * self.head_dim) # 30 x 200 x 8 x 128
        q = q.reshape(batch_size, sequence_length, self.num_heads, self.head_dim) # 30 x 200 x 8 x 64
        kv = kv.permute(0, 2, 1, 3) # 30 x 8 x 200 x 128
        q = q.permute(0, 2, 1, 3) # 30 x 8 x 200 x 64
        k, v = kv.chunk(2, dim=-1) # 30 x 8 x 200 x 64 each
        values, attention = scaled_dot_product(q, k, v, mask) # 30 x 8 x 200 x 64
        values = values.reshape(batch_size, sequence_length, d_model) # 30 x 200 x 512
        out = self.linear_layer(values) # 30 x 200 x 512
        return out # 30 x 200 x 512

class DecoderLayer(nn.Module):
    def __init__(self,d_model, ffn_hidden, num_heads, drop_prob):
        super(DecoderLayer, self).__init__()
        self.self_attention = MultiHeadAttention(d_model=d_model, num_heads=num_heads)
        self.norm1 = LayerNormalization(parameters_shape=[d_model])
        self.dropout1 = nn.Dropout(p=drop_prob)
        self.encoder_decoder_attention = MultiHeadCrossAttention(d_model=d_model, num_heads=num_heads)
        self.norm2 = LayerNormalization(parameters_shape=[d_model])
        self.dropout2 = nn.Dropout(p=drop_prob)
        self.ffn = PositionwiseFeedForward(d_model=d_model, hidden=ffn_hidden, drop_prob=drop_prob)
        self.norm3 = LayerNormalization(parameters_shape=[d_model])
        self.dropout3 = nn.Dropout(p=drop_prob)

    def forward(self, x, y, decoder_mask):
        _y = y # 30 x 200 x 512
        print("Masked Self Attention")
        y = self.self_attention(y, mask=decoder_mask) # 30 x 200 x 512
        print("Drop out 1")
        y = self.dropout1(y) # 30 x 200 x 512
        print("Add + Layer Normalization 1")
        y = self.norm1(y + _y) # 30 x 200 x 512

        _y = y # 30 x 200 x 512
        print("Cross Attention")
        y = self.encoder_decoder_attention(x, y, mask=None) # 30 x 200 x 512
        print("Drop out 2")
        y = self.dropout2(y) # 30 x 200 x 512
        print("Add + Layer Normalization 2")
        y = self.norm2(y + _y) # 30 x 200 x 512

        _y = y # 30 x 200 x 512
        print("Feed Forward 1")
        y = self.ffn(y) # 30 x 200 x 512
        print("Drop Out 3")
        y = self.dropout3(y) # 30 x 200 x 512
        print("Add + Layer Normalization 3")
        y = self.norm3(y + _y) # 30 x 200 x 512
        return y # 30 x 200 x 512

# Normal pytorch Sequential only accepts one parameter so we created our own Sequential by overriding nn.Sequential
class SequentialDecoder(nn.Sequential):
    def forward(self, *inputs):
        x, y, mask = inputs
        for module in self._modules.values():
            y = module(x, y, mask) # 30 x 200 x 512
        return y

class Decoder(nn.Module):
    def __init__(self, d_model, ffn_hidden, num_heads, drop_prob, num_layers):
        super().__init__()
        self.layers = SequentialDecoder(*[DecoderLayer(d_model, ffn_hidden, num_heads, drop_prob) for _ in range(num_layers)])
        
    def forward(self, x, y, mask):
        # x : 30 x 200 x 512
        # y : 30 x 200 x 512
        # mask : 200 x 200
        y = self.layers(x, y, mask)
        return y

In [None]:
x = torch.randn( (batch_size, max_sequence_length, d_model) ) # English sentence positional encoded
y = torch.randn( (batch_size, max_sequence_length, d_model) ) # Kannada sentence positional encoded
mask = torch.full([max_sequence_length, d_model], float('-inf'))
mask = torch.triu(mask, diagonal=1)
decoder = Decoder(d_model, ffn_hidden, num_heads, drop_prob, num_layers)
out = decoder(x, y, mask) # this call the forward method

