In [12]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

In [None]:
class Embedding(nn.Module):
    def __init__(self, vocab_size, embedding_size, pad_mask):
        super(Embedding, self).__init__()
        self.emb = nn.Embedding(vocab_size, embedding_size, padding_idx=pad_mask)

    def forward(self, x):
        return self.emb(x)

In [14]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model=512, d_ff=2048, d_h=8):
        super(EncoderLayer, self).__init__()
        
        self.d_model = d_model
        self.d_h = d_h
        self.d_ff = d_ff
        self.d_k = self.d_v = int(d_model / d_h)

        self.concatLinear = nn.Linear(d_model, d_model, bias=False) # Linear Layer for the concatenated head
        self.normalize = nn.LayerNorm(d_model) # Normalizing Layer

        self.feed_forward = nn.Sequential( # Feed Forward Layer
            nn.Linear(d_model, d_ff),
            nn.ReLU(),
            nn.Linear(d_ff, d_model)
        )

        self.linears = nn.ModuleList([nn.Linear(d_model, self.d_k, bias=False) for _ in range(d_h*3)])

    def forward(self, x):
        heads = []
        for i in range(self.d_h):
            Q = self.linears[3*i](x) # Query Matrix
            K = self.linears[3*i + 1](x) # Key Matrix
            V = self.linears[3*i + 2](x) # Value Matrix

            scaledMatMul = torch.matmul(Q, K.transpose(-1,-2)) / math.sqrt(self.d_k) # MatMul of Q and K -> Scale

            scaledMatMul[[scaledMatMul==1]] = -np.inf # Masking the Padding Indexes With Minus Infinity

            head = torch.matmul(F.softmax(scaledMatMul), V) # SoftMax -> MatMul of Q,K and V

            heads.append(head) # A Single Head

        Z = self.concatLinear(torch.cat((heads), -1)) # Concatenated heads -> Linear Layer

        AddNorm = self.normalize(x + Z) # Output of the First Add&Norm Layer
        
        Z = self.normalize(self.feed_forward(AddNorm) + AddNorm) # 1st Add&Norm -> Feed Forward -> 2nd Add&Norm

        return Z

In [None]:
class EncoderStack(nn.Module):
    def __init__(self, d_model, d_ff, d_h, N):
        super(EncoderStack, self).__init__()
        self.encoders = nn.ModuleList([EncoderLayer(d_model, d_ff, d_h) for _ in range(N)]) # Stacking Encoder Layer N Times

    def forward(self, src):
        for encoder in self.encoders:
            src = encoder(src)
        return src

In [None]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model=512, d_ff=2048, d_h=8, batch_size=16): # parameters={}
        super(DecoderLayer, self).__init__()

        self.d_model = d_model
        self.d_h = d_h
        self.d_k = self.d_v = int(self.d_model / self.d_h)
        self.batch_size = batch_size

        self.linears = nn.ModuleList([nn.Linear(d_model, self.d_k, bias=False) for _ in range(d_h*3)]) # Linear Layers

        self.firstLinear = nn.Linear(d_h * self.d_v, d_model, bias=False) # Linear Layer for the Concatenated Head

        self.secondLinear = nn.Linear(d_h * self.d_model, d_model, bias=False) # Linear Layer for the Concatenated Head(second multi-head attention)

        self.normalize = nn.LayerNorm(d_model)

        self.mask = torch.triu( # Lower Triangular Mask Matrix
            torch.tensor([[[-np.inf for _ in range(self.d_k)] for _ in range(self.d_k)] for _ in range(batch_size)]), diagonal=1
        )

        self.feed_forward = nn.Sequential( # Feed Forward Layer
            nn.Linear(d_model, d_ff),
            nn.ReLU(),
            nn.Linear(d_ff, d_model)
        )

    def forward(self, src, tgt):
        heads1 = []
        heads2 = []
        for i in range(self.d_h):

            # FIRST ATTENTION LAYER OF THE DECODER
            ''' Same as decoder, but here we have tgt(target) as the decoder's input. '''
            Q = self.linears[3*i](tgt) # Query Matrix
            K = self.linears[3*i+1](tgt) # Key Matrix
            V = self.linears[3*i+2](tgt) # Value Matrix
            
            scaledMatMul = torch.matmul(Q, K.transpose(-1, -2)) / math.sqrt(self.d_k) # Matrix Multiplication of Q and K -> Scale

            # maskedMat = scaledMatMul + self.mask # Masking, optional

            soft = F.softmax(scaledMatMul) # SoftMax

            head = torch.matmul(soft, V) # Matrix Multiplication of Scaled and Soft Maxed Q, K Matrices with V

            heads1.append(head) # Appending a Single Head

        Z1 = self.firstLinear(torch.cat((heads1), dim=-1)) # Concatenated Heads of the First Attention Layer

        AddNorm1 = self.normalize(src + Z1) # First Normalizing Layer

        for i in range(self.d_h): # Second Attention Layer

            # SECOND ATTENTION LAYER OF THE DECODER
            ''' A typical Attention layer, however, instead of Q and K matrices, we use the output of the encoder. '''
            scaledMat = torch.matmul(src, src.transpose(-1, -2)) / math.sqrt(self.d_k) # Matrix Multiplication of SRC and it's Transpose -> Scale

            soft = F.softmax(scaledMat) # Soft Max

            head = torch.matmul(soft, Z1) # Matrix Multiplication of Scaled and Soft Maxed X, X.T and the Output of the Previous Multi Head Attention Layer

            heads2.append(head) # Appending a Single Head

        Z2 = self.secondLinear(torch.cat((heads2), dim=-1)) # Concatenated Heads of the Second Attention Layer

        AddNorm2 = self.normalize(AddNorm1 + Z2) # Second Normalizing Layer -- The Output of the First Normalizing Layer is Being Used

        Z = self.normalize(AddNorm2 + self.feed_forward(AddNorm2)) # Feed Forward -> Normalize

        return Z

In [None]:
class DecoderStack(nn.Module):
    def __init__(self, d_model, d_ff, d_h, batch_size, N):
        super(DecoderStack, self).__init__()
        self.decoders = nn.ModuleList([DecoderLayer(d_model, d_ff, d_h, batch_size) for _ in range(N)]) # Stacking Decoder Layer N Times

    def forward(self, src, tgt):
        for decoder in self.decoders:
            src = decoder(src, tgt)
        return src

In [None]:
class Transformer(nn.Module):
    def __init__(self, batch_size=16, embedding_size=512, d_model=512, d_h = 8, d_ff=2048, vocab_size=32768, num_coder_layers=6):
        super(Transformer, self).__init__()
        self.linear = nn.Linear(d_model, d_model, bias=False) # Final Linear Layer of Our Model

        ''' Instead of using a new linear layer for the final linear layer of our model
        the paper suggests that we use the shared weights of the input and the output embeddings. '''

        self.softmax = F.softmax
        self.embed = Embedding(vocab_size, embedding_size, pad_mask=1)
        self.encoderStack = EncoderStack(d_model, d_ff, d_h, num_coder_layers)
        self.decoderStack = DecoderStack(d_model, d_ff, d_h, batch_size, num_coder_layers)

    def forward(self, src, tgt):
        srcEmbedded = self.embed(src)

        tgtEmbedded = self.embed(tgt)

        encoderOutput = self.encoderStack(srcEmbedded)

        decoderOutput = self.decoderStack(encoderOutput, tgtEmbedded)

        print(self.embed.emb.weight.t().shape)

        logits = torch.matmul(torch.sum(decoderOutput, dim=1), self.embed.emb.weight.t())

        probs = self.softmax(logits)

        return probs