https://www.youtube.com/watch?v=g3sEsBGkLU0&list=PLTl9hO2Oobd97qfWC40gOSU8C0iu0m2l4&index=6

In [63]:
import torch
from torch import nn
import math
import torch.nn.functional as F


In [54]:
def scaled_dot_product(q, k, v, mask=None):
    """Scaled dot product formula as the "Attention is all you need paper" used to calculate attention values"""
    # query vector for 1 attention head is total embeddings dimensions / number of heads
    d_k = q.size()[-1]
    # The vector of keys (k) is the embeddings for all words in the sequence for a given head
    # we flip the dimmensions for max_sequence_length with the dimension of the embeddings' dimension
    # We scale the values by the number of embedding dimensions to reduce the variance of values within the matrice have a mean 0, 
    # standard deviation of. This enables more efficient training
    scaled = torch.matmul(q, k.transpose(-1, -2)) /math.sqrt(d_k) # --> return the attention matrix. 
    if mask is not None:
        scaled += mask # Broadcasting add, performs addition at the word level
    attention = F.softmax(scaled, dim=-1) # get probability values to determine how much we should pay attention to each word
    values = torch.matmul(attention, v) # captures all the information associated with context. How much each word should pay attention to all other words in the sentence
    return values, attention

In [72]:
class MultiheadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads
        self.qkv_layer = nn.Linear(d_model, 3 * d_model)
        self.linear_layer = nn.Linear(d_model, d_model)

    def forward(self, x, mask=None):
        batch_size, sequence_length, d_model = x.size()
        print(f"x.size(): {x.size()}")
        qkv = self.qkv_layer(x)
        print(f"qkv.size(): {qkv.size()}")
        # get concatenated qkv vectors, this time split by attention head
        qkv = qkv.reshape(batch_size, sequence_length, self.num_heads, 3 *self. head_dim)
        print(f"qkv.size(): {qkv.size()}")
        qkv = qkv.permute(0, 2, 1, 3) # put the head dimension before the sequence length dimension for parallelization on the last 2 dimensiosn
        print(f"qkv.size(): {qkv.size()}")
        # now split separate q, k, v vectors
        q, k, v = qkv.chunk(3, dim=-1) # -1 = last dimension
        print(f"q.size(): {q.size()} k.size(): {k.size()}, v.size(): {v.size()}")
        values, attention = scaled_dot_product(q, k, v, mask=mask)
        print(f"values.size(): {values.size()}, attention.size(): {attention.size()}")
        # reconcatenate value vectors along the number of heads dimension
        values = values.reshape(batch_size, sequence_length, self.num_heads * self.head_dim)
        print(f"values.size(): {values.size()}")
        out = self.linear_layer(values)
        print(f"out.size() : {out.size()}")
        return out

class LayerNormalization(nn.Module):
    def __init__(self, parameters_shape, eps=1e-5):
        super().__init__()
        self.parameters_shape = parameters_shape
        self.eps = eps
        self.gamma = nn.Parameter(torch.ones(parameters_shape)) # learnable parameter (the st dev of values)
        self.beta = nn.Parameter(torch.zeros(parameters_shape)) # learnable param (the mean of values)

    def forward(self, inputs):
        # compute dimensions for which we want to perform layer normalization (that is the batch + the embedding dimensions)
        # hint: its the last two layers
        dims = [-(i+1) for i in range(len(self.parameters_shape))]
        mean = inputs.mean(dim=dims, 
                           keepdim=True )# ensures we have the same number of dimensions as in the input)
        print(f"Mean \n ({mean.size()}): \n {mean})")
        var = ((inputs - mean)**2).mean(dim=dims, keepdim=True)
        std = (var + self.eps).sqrt()
        print(f"Std \n ({std.size()}): \n {std})")
        y = (inputs - mean) / std
        print(f"y \n ({y.size()}): \n {y})")
        out = self.gamma * y + self.beta
        print(f"Output \n ({out.size()}): \n {out})")
        return out
        
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_sequence_length):
        super().__init__()
        self.max_sequence_length = max_sequence_length
        self.d_model =d_model

    def forward(self):
        even_i = torch.arange(0, self.d_model, 2).float()
        denominator = torch.pow(1000, even_id/self.model)
        position = torch.arange(max_sequence_length, dtype=torch.float).reshape(max_sequence_length, 1)
        even_PE = torch.sin(position / denominator)
        odd_PE = torch.cos(position / denominator)
        # interleave the odd and even positional encodings
        stacked = torch.stack([even_PE, odd_PE], dim=2)
        # provides vectors of positional encodings for each word (in this case, 10 words)
        PE = torch.flatten(stacked, start_dim=1, end_dim=2)
        return PE


In [73]:
class PositionwiseFeedForward(nn.Module): # feed forward part of the encoder
    def __init__(self, d_model, hidden, drop_prob=0.1):
        super(PositionwiseFeedForward, self).__init__()
        self.linear1 = nn.Linear(d_model, hidden)
        self.linear2 = nn.Linear(hidden, d_model)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(p=drop_prob)

    def forward(self, x):
        x = self.linear1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.linear2(x)
        return x
        

In [74]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model,
                 ffn_hidden, 
                num_heads,
                drop_prob,
                ):
        super(EncoderLayer, self).__init__()# <-- setup all the components of nn.Module
        self.attention = MultiheadAttention(d_model=d_model, num_heads=num_heads)
        self.norm1 = LayerNormalization(parameters_shape=[d_model])
        self.dropout1 = nn.Dropout(p=drop_prob)
        self.ffn = PositionwiseFeedForward(d_model=d_model, hidden=ffn_hidden, drop_prob=drop_prob)
        self.norm2 = LayerNormalization(parameters_shape=[d_model])
        self.dropout2 = nn.Dropout(p=drop_prob)

    def forward(self, x):
        residual_x = x
        print("------- ATTENTION 1 ------")
        x = self.attention(x, mask=None) # Masking only occurs in the Decoder
        print("------- DROPOUT 1 ------")
        x = self.dropout1(x)
        print("------- ADD AND LAYER NORMALIZATION 1 ------")
        x = self.norm1(x + residual_x)
        residual_x = x
        print("------- ATTENTION 2 ------")
        x = self.ffn(x)
        print("------- DROPOUT 2 ------")
        x = self.dropout2(x)
        print("------- ADD AND LAYER NORMALIZATION 2 ------")
        x = self.norm2(x + residual_x)
        return x
    

In [75]:
class Encoder(nn.Module):
    def __init__(self, d_model,
                 ffn_hidden,
                num_heads,
                drop_prob,
                num_layers ):
        
        super().__init__()
        self.layers = nn.Sequential(*[EncoderLayer(d_model,ffn_hidden,
                num_heads,
                drop_prob,
            ) for _ in range(num_layers)])

    def forward(self, x):
        x = self.layers(x)
        return x

In [76]:
d_model = 512
num_heads = 8
drop_prob = 0.1
batch_size = 30
max_sequence_length = 200
ffn_hidden = 2048 # number of neurons in the FFN layer
num_layers = 5 # number of transformer encoder units we want in our architecture

In [77]:
encoder = Encoder(d_model,
                 ffn_hidden,
                num_heads,
                drop_prob,
                num_layers)

In [78]:
x = torch.randn((batch_size, max_sequence_length, d_model))

In [79]:
out = encoder(x)

------- ATTENTION 1 ------
x.size(): torch.Size([30, 200, 512])
qkv.size(): torch.Size([30, 200, 1536])
qkv.size(): torch.Size([30, 200, 8, 192])
qkv.size(): torch.Size([30, 8, 200, 192])
q.size(): torch.Size([30, 8, 200, 64]) k.size(): torch.Size([30, 8, 200, 64]), v.size(): torch.Size([30, 8, 200, 64])
values.size(): torch.Size([30, 8, 200, 64]), attention.size(): torch.Size([30, 8, 200, 200])
values.size(): torch.Size([30, 200, 512])
out.size() : torch.Size([30, 200, 512])
------- DROPOUT 1 ------
------- ADD AND LAYER NORMALIZATION 1 ------
Mean 
 (torch.Size([30, 200, 1])): 
 tensor([[[-0.0317],
         [ 0.0717],
         [ 0.0005],
         ...,
         [ 0.0061],
         [-0.0135],
         [-0.0458]],

        [[-0.0657],
         [ 0.0580],
         [-0.0400],
         ...,
         [-0.0698],
         [ 0.0062],
         [-0.0570]],

        [[ 0.1003],
         [-0.0228],
         [ 0.0476],
         ...,
         [ 0.0340],
         [ 0.0021],
         [ 0.0946]],

    