This notebook is heavily inspired from [Harvard transformer implementation](https://nlp.seas.harvard.edu/annotated-transformer/) and [Umar Jamil's transformer implementation](https://youtu.be/ISNdQcPhsts?si=_1mO7CBcvFHg15cJ).

In [None]:
import torch
import torch.nn as nn
import numpy as np
import math

### Input Embedding

<!-- TODO:

- explore what `nn.Embedding` does
 -->

In [None]:
class InputEmbedding(nn.Module):
    def __init__(self, d_model: int, vocab_size: int) -> None:
        super(InputEmbedding, self).__init__()
        # you can also do this:
        # super().__init__()
        self.d_model = d_model  # in this paper, it 512
        self.vocab_size = vocab_size
        self.embedding = nn.Embedding(vocab_size, d_model)

    def forward(self, x):
        return self.embedding(x) * math.sqrt(self.d_model)
        # check the last line on page 5:
        # "In the embedding layers, we multiply those weights by d model."

### Positional Encoding

<!-- TODO:
- check Amirhossein Kazamnejad's blog on positional encoding -->

Inspired by Umar Jamil, we use the [Harvard pytorch transformer article implementation of positional encoding formula](https://nlp.seas.harvard.edu/annotated-transformer/#positional-encoding) mentioned in the paper by using log. He mentions in his video that applying log to an exponential nullifies the effect of log but makes the calculation more numerically stable. The value of the positional encoding calculated this way will be slightly different but the model will learn. Click [here](https://youtu.be/ISNdQcPhsts?si=HNaqDgkw6CfwgO-M&t=470) to watch that particular scene from the video.

Click [here](https://youtu.be/ISNdQcPhsts?si=cvEfkDJyW7LiBqkn&t=720) to see the reasoning behind using `self.register_buffer("pe", pe)`. The reasoning that when we want to save some variable not as a learned parameter (like weights and biases) but we want it to be saved when we save the file of the model, the we should register it as a buffer. This way it will be saved along with the state of the model.

Original formula:

$$PE_{(pos, 2i)} = sin \left( \frac{pos}{10000^{\frac{2i}{d_{model}}}} \right)$$

$$PE_{(pos, 2i+1)} = cos \left( \frac{pos}{10000^{\frac{2i}{d_{model}}}} \right)$$

<br></br>

<!-- Modified formula by Harvard Transformer article: -->



In [None]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model: int, seq_len: int, dropout: float) -> None:
        super(PositionalEncoding, self).__init__()
        self.d_model = d_model  # in this paper, it 512
        self.seq_len = seq_len  # maximum length of the sequence
        self.dropout = nn.Dropout(p=dropout)
        # create a matrix of shape (seq_len, d_model)
        # pe stands for positional encoding
        pe = torch.zeros(seq_len, d_model)
        # create a vector of shape (seq_len, 1)
        position = torch.arange(0, seq_len, dtype=torch.float32).unsqueeze(1)
        # now, we will create the denominator of the positional encoding formulae
        # since it is a bit long, we will break it into a few lines
        # first, we need a vector containing multiples of 2 from 0 to d_model (here, 512)
        # this line is because of the 2i term which is the power of 10000
        # thus, this vector provides for the numbers we need for 2i
        vector = torch.arange(0, d_model, 2, dtype=torch.float32)
        # now, we raise 10,000 to the power of 2i/d_model
        denominator_original = torch.pow(10000, vector/d_model)
        # this is the one used by Harvard Transformer article
        denominator_harvard = torch.exp(vector * (-math.log(10000.0)/d_model))
        # we apply sin for even dimension and cos for odd dimenion
        # apply sin and store it in even indices of pe
        pe[:, 0::2] = torch.sin(position * denominator_original)
        # apply cos and store it in odd indices of pe
        pe[:, 1::2] = torch.cos(position * denominator_original)
        # we need to add the batch dimension so that we can apply it to
        # batches of sentences
        pe = pe.unsqueeze(0)  # new shape: (1, seq_len, d_model)
        # register the pe tensor as a buffer so that it can be saved along with the
        # state of the model
        self.register_buffer("pe", pe)

    def forward(self, x):
        # we don't want to train the positional encoding, ie, we don't want to make it
        # a learnable parameter, so we set its requires_grad to False
        x = x + self.pe[:, :x.size(1)].requires_grad_(False)  # (batch, seq_len, d_model)
        return self.dropout(x)

Let's see how the positional encoding works by doing it on a smaller example.

In [None]:
def dummyfn1():
    seq_len = 10
    d_model = 10
    pe = torch.zeros(seq_len, d_model)
    position = torch.arange(0, seq_len, dtype=torch.float32).unsqueeze(1)
    vector = torch.arange(0, d_model, 2, dtype=torch.float32)
    denominator_original = torch.pow(10000, vector/d_model)
    denominator_harvard = torch.exp(vector * (-math.log(10000.0)/d_model))
    pe[:, 0::2] = torch.sin(position * denominator_original)
    pe[:, 1::2] = torch.cos(position * denominator_original)
    print(pe, pe[:, 0::2], pe[:, 1::2], sep='\n\n\n')

dummyfn1()

tensor([[ 0.0000,  1.0000,  0.0000,  1.0000,  0.0000,  1.0000,  0.0000,  1.0000,
          0.0000,  1.0000],
        [ 0.8415,  0.5403,  0.0264,  0.9997,  0.8573, -0.5148, -0.1383,  0.9904,
          0.9992,  0.0402],
        [ 0.9093, -0.4161,  0.0528,  0.9986, -0.8827, -0.4699, -0.2739,  0.9618,
          0.0803, -0.9968],
        [ 0.1411, -0.9900,  0.0791,  0.9969,  0.0516,  0.9987, -0.4042,  0.9147,
         -0.9927, -0.1205],
        [-0.7568, -0.6536,  0.1054,  0.9944,  0.8296, -0.5584, -0.5268,  0.8500,
         -0.1600,  0.9871],
        [-0.9589,  0.2837,  0.1316,  0.9913, -0.9058, -0.4237, -0.6393,  0.7690,
          0.9799,  0.1993],
        [-0.2794,  0.9602,  0.1577,  0.9875,  0.1031,  0.9947, -0.7395,  0.6732,
          0.2392, -0.9710],
        [ 0.6570,  0.7539,  0.1837,  0.9830,  0.7997, -0.6005, -0.8254,  0.5645,
         -0.9606, -0.2778],
        [ 0.9894, -0.1455,  0.2095,  0.9778, -0.9265, -0.3764, -0.8955,  0.4450,
         -0.3160,  0.9488],
        [ 0.4121, -

In [None]:
def dummyfn2():
    torch.manual_seed(42)
    seq_len = 4
    d_model = 4
    dropout = 0.2
    x = torch.randn(d_model, seq_len)
    obj = PositionalEncoding(d_model, seq_len, dropout)
    return obj(x)

dummyfn2()

tensor([[[ 2.4086,  3.1091,  1.1259, -0.0000],
         [ 1.8999, -0.8678, -0.6868, -0.9279],
         [ 0.1965,  1.5407, -1.5822, -0.0000],
         [-0.0000, -1.9368, -2.2107,  0.9254]]])

### Layer Normalization

In layer normalization, we calculate the mean and variance of each data point independently from other data points. Then, we calculate new values for each data point using their own mean and their own variance.

<!-- ![Screenshot%20from%202023-12-01%2000-51-48.png](attachment:Screenshot%20from%202023-12-01%2000-51-48.png) -->

(Source: https://youtu.be/ISNdQcPhsts?si=_1mO7CBcvFHg15cJ.)

Note: $\text{variance} = \text{(standard deviation)}^2$

We will use this formula:

$$\hat{x}_j = \alpha \times \left(\frac{x_j - \mu_j}{\sqrt{\sigma_j^2 + \epsilon}}\right) + \beta $$

, where:
- $\alpha$ is the multiplicative factor
- $\beta$ is the additive factor

In [None]:
class LayerNormalization(nn.Module):
    def __init__(self, eps: float = 1e-6) -> None:
        super(LayerNormalization, self).__init__()
        self.eps = eps
        # instead of simply doing self.alpha = torch.ones(1)
        # we use nn.Parameter() so that when we call the state dict of the model
        # we are able to see this alpha
        # only using torch.ones(1) won't allow us to see this alpha
        self.alpha = nn.Parameter(torch.ones(1))  # multiplied
        self.bias = nn.Parameter(torch.zeros(1))  # added

    def forward(self, x):
        # apply mean after the batch dimension
        # mean usually cancels the dimension to which it is applied,
        # but we want to keep it
        mean = x.mean(dim=-1, keepdim=True)
        # similarly for standard deviation
        std = x.std(dim=-1, keepdim=True)
        return self.alpha * ((x-mean)/(std**2 + self.eps)) + self.bias

### Position-Wise Feed Forward Networks

See section 3.3 on page 5 of the paper.

In [None]:
class PositionWiseFeedForward(nn.Module):
    """Implements the FFN equation."""
    def __init__(self, d_model: int, d_ff: int, dropout: float) -> None:
        super(PositionWiseFeedForward, self).__init__()
        self.linear1 = nn.Linear(d_model, d_ff)   # W1 and B1
        self.linear2 = nn.Linear(d_ff, d_model)   # W2 and B2
        self.dropout = nn.Dropout(p=dropout)
        self.relu = nn.ReLU()

    def forward(self, x):
        """
        x is of the shape: (batch, seq_len, d_model)
        linear1 is of the shape: (d_model, d_ff)
        linear2 is of the shape: (d_ff, d_model)

        On multiplying x with linear1, the shape of x becomes (batch, seq_len, d_ff)
        On multiplying the new x with linear2, the shape of x changes back to the
        original one, ie, (batch, seq_len, d_model)
        """
        x = self.relu(self.linear1(x))
        x = self.dropout(x)
        x = self.linear2(x)
        return x

In [None]:
def dummyfn1():
    seq_len, d_model, d_ff, dropout = 10, 10, 20, 0.1
    x = torch.randn(1, seq_len, d_model)
    ffn = PositionWiseFeedForward(d_model, d_ff, dropout)
    return ffn(x)

dummyfn1().shape

torch.Size([1, 10, 10])

### Multi-Head Attention

Queries, Keys, and Values are all just the duplication of the input for the encoder. In other words, in the encoder block, we store the same value of input in queries, keys, and values. So, they are all the same thing. You can also think of them as just the input used 3 times.

<!-- ![Screenshot%20from%202023-12-01%2001-36-31.png](attachment:Screenshot%20from%202023-12-01%2001-36-31.png)

(Source: https://youtu.be/ISNdQcPhsts?si=_1mO7CBcvFHg15cJ.) -->

Pay close attention to the above figure when coding the multi-head attention class. This will help you understand what comes when and how.

Check [this](https://sentry.io/answers/difference-between-staticmethod-and-classmethod-function-decorators-in-python/#:~:text=We%20can%20decorate%20a%20function,object%20to%20it%2C%20as%20below.&text=This%20can%20be%20useful%20when,the%20instance%20it's%20called%20on.) article for information on `@staticmethod`. Basically, when you put `@staticmethod` on top of a method in a class, then that method does not take the `self` argument, which is the object of the class.

**Scaled dot-product attention**:

$$\text{Attention(Q,K,V)} = \text{softmax} \left( \frac{Q K^T}{\sqrt{d_k}} \right)$$

In [None]:
class MultiHeadAttentionBlock(nn.Module):
    def __init__(self, d_model: int, h: int, dropout: float) -> None:
        """Take in model size and number of heads."""
        super(MultiHeadAttention, self).__init__()
        self.d_model = d_model  # embedding vector size
        self.h = h   # number of heads
        # make sure d_model is divisible by h
        assert d_model % h == 0, "d_model is not divisible by h"
        # we assume d_v always equals d_k
        self.d_k = d_model // h  # dimension of vector seen by each head
        self.wq = nn.Linear(d_model, d_model, bias=False)  # Wq
        self.wk = nn.Linear(d_model, d_model, bias=False)  # Wk
        self.wv = nn.Linear(d_model, d_model, bias=False)  # Wv
        self.wo = nn.Linear(d_model, d_model, bias=False)  # Wo
        self.dropout = nn.Dropout(dropout)

    @staticmethod
    def attention(key, query, value, mask=None, dropout=None):
        """Compute scaled dot-product attention"""
        d_k = query.size(-1)
        # calculate the attention scores by applying scaled dot-product attention
        # (batch, h, seq_len, seq_len) --> (batch, h, seq_len, d_k)
        scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)
        if mask is not None:
            # write a very low value (indicating -infinity) to the positions
            # where mask == 0, this will tell softmax to replace those values
            # with zero
            scores = scores.masked_fill(mask==0, -1e9)
            # alternatively, we can use the inplace operation masked_fill_
            # , where '_' after 'masked_fill' indicates inplace operation
            # scores.masked_fill_(mask==0, -1e9)
        # now, we convert the attention scores to probability scores by
        # applying softmax
        # note: all the probability scores of a particular datapoint must sum upto 1
        prob_scores = scores.softmax(dim=-1)  # (batch, h, seq_len, seq_len)
        if dropout is not None:
            prob_scores = dropout(prob_scores)
        # now, we (matrix) multiply prob_scores with value
        # so the shape changes from (batch, h, seq_len, seq_len)
        # to (batch, h, seq_len, d_k)
        # we also return the prob_scores, which can be used for visualization
        return torch.matmul(prob_scores, value), prob_scores

    def forward(self, q, k, v, mask=None):
        # multiply Wq matrix by q
        # this matrix multiplication does not change the shape of q
        query = self.wq(q)  # (batch, h, seq_len)
        # similarly for key and value
        key = self.wk(k)    # (batch, h, seq_len)
        value = self.wv(v)  # (batch, h, seq_len)
        # (batch, seq_len, d_model) --> (batch, seq_len, h, d_k)
        query = query.view(query.shape[0], query.shape[1], self.h, self.d_k)
        # (batch, seq_len, h, d_k) --> (batch, h, seq_len, d_k)
        query = query.transpose(1,2) # interchange the indices 1 and 2 with each other
        # similarly the dimensions of key and value will also change
        key = key.view(key.shape[0], key.shape[1], self.h, self.d_k)
        key = key.transpose(1,2)
        value = value.view(value.shape[0], value.shape[1], self.h, self.d_k)
        value = value.transpose(1,2)
        # calculate attention
        x, self.attn_scores =  MultiHeadAttentionBlock.attention(query, key, value,
                                                            mask, self.dropout)
        # combine all the heads together
        # (batch, h, seq_len, d_k) --> (batch, seq_len, h, d_k)
        x = x.transpose(1,2)
        # (batch, seq_len, h, d_k) --> (batch, seq_len, d_model)
        x = x.contiguous().view(x.shape[0], -1, self.h * self.d_k)
        # alternative code to the above line:
        # x = x.reshape(x.shape[0], -1, self.h * self.d_k)
        # now, multiply by Wo
        # this matrix multiplication does not change the shape of x
        # (batch, seq_len, d_model) --> (batch, seq_len, d_model)
        return self.wo(x)

`.view()` is used to reshape a tensor. We can reshape a tensor using `.view()` and stored the reshaped version in another variable. We must note that `.view()` passes the reference of a tensor, ie, memory address of the tensor. So, if we make changes to one tensor, then they get reflected in the other tensor as well.

<br></br>

Check [this](https://stackoverflow.com/questions/48915810/what-does-contiguous-do-in-pytorch) for information on the use of `.contiguous()` in PyTorch.

In [None]:
def dummyfn1():
    A = torch.ones(2,3)
    B = A.view(1,6)
    B[0,3] = 0  # change 1 at index [0,3] in B, this will also change the
    # the 1 in A at the corresponding index
    print(A, B, sep='\n\n')
    pass

dummyfn1()

tensor([[1., 1., 1.],
        [0., 1., 1.]])

tensor([[1., 1., 1., 0., 1., 1.]])


In [None]:
def dummyfn1():
    A = torch.ones(3,4)
    A = A.transpose(-1,-2)
    A = A.view(A.shape[1], -1)  # we get error because we didn't use contiguous
    return A

dummyfn1()

RuntimeError: view size is not compatible with input tensor's size and stride (at least one dimension spans across two contiguous subspaces). Use .reshape(...) instead.

In [None]:
def dummyfn1():
    A = torch.ones(3,4)
    A = A.transpose(-1,-2)
    A = A.contiguous().view(A.shape[1], -1)
    return A

dummyfn1()

tensor([[1., 1., 1., 1.],
        [1., 1., 1., 1.],
        [1., 1., 1., 1.]])

In [None]:
def dummyfn1():
    A = torch.ones(3,4)
    A = A.transpose(-1,-2)
    # instead of using contiguous and view, we can use reshape
    A = A.reshape(A.shape[1], -1)
    return A

dummyfn1()

tensor([[1., 1., 1., 1.],
        [1., 1., 1., 1.],
        [1., 1., 1., 1.]])

**3 ways of doing transpose in PyTorch**

In [None]:
torch.ones(2,3)

tensor([[1., 1., 1.],
        [1., 1., 1.]])

In [None]:
torch.transpose(torch.ones(2,3), -1, -2)

tensor([[1., 1.],
        [1., 1.],
        [1., 1.]])

In [None]:
torch.ones(2,3).T

tensor([[1., 1.],
        [1., 1.],
        [1., 1.]])

### Residual Connection

In [None]:
class ResidualConnection(nn.Module):
    """This is the 'add' part in the 'add and norm' block."""
    def __init__(self, features: int, dropout: float) -> None:
        super(ResidualConnection, self).__init__()
        self.dropout = nn.Dropout(p=dropout)
        self.norm = LayerNormalization(features)

    def forward(self, x, sublayer):
        """
        x: input
        sublayer: different layers of the transformer architecture (eg: multi-head
        attention, feed-forward network, etc.)

        Returns the skip or residual connection.
        """
        # most implementations first do normalization and then pass x to the sublayer
        # we will also do this way
        return x + self.dropout(sublayer(self.norm(x)))
        # however, the paper first passes x to the sublayer and then does the norm
        # return x + self.dropout(self.norm(sublayer(x)))

### Encoder

In [None]:
class EncoderBlock(nn.Module):
    def __init__(self, features: int, selfattn_block: MultiHeadAttentionBlock,
                 feedforward_block: PositionWiseFeedForward, dropout: float) -> None:
        self.selfattn_block = selfattn_block
        self.feedforward_block = feedforward_block
        # store 2 residual connection layers
        # we'l use one after self-attention layer and the other after feed-forward
        # network as shown in figure 1 of the paper
        self.res_con = nn.ModuleList([ResidualConnection(features, dropout)
                                      for _ in range(2)])

    def forward(self, x, src_mask):
        # we apply the source mask because we don't want the padding word to
        # interact with other words
        x = self.res_con[0](x, lambda x: self.selfattn_block(x,x,x,src_mask))
        x = self.res_con[1](x, self.feedforward_block)

In [None]:
nn.ModuleList(nn.Linear(2,2) for _ in range(3))

ModuleList(
  (0): Linear(in_features=2, out_features=2, bias=True)
  (1): Linear(in_features=2, out_features=2, bias=True)
  (2): Linear(in_features=2, out_features=2, bias=True)
)

In [None]:
class Encoder(nn.Module):
    def __init__(self, features: int, layers: nn.ModuleList) -> None:
        super(Encoder, self).__init__()
        self.layers = layers
        self.norm = LayerNormalization(features)

    def forward(self, x, mask):
        for layer in self.layers:
            x = layer(x, mask)
        return self.norm(x)

### Decoder

In [None]:
class DecoderBlock(nn.Module):
    def __init__(self, features: int, selfattn_block: MultiHeadAttentionBlock,
                 crossattn_block: MultiHeadAttentionBlock, dropout: float,
                 feedforward_block: PositionWiseFeedForward) -> None:
        super(DecoderBlock, self).__init__()
        self.selfattn_block = selfattn_block
        self.crossattn_block = crossattn_block
        self.feedforward_block = feedforward_block
        self.res_con = nn.ModuleList([ResidualConnection(features, dropout)
                                      for _ in range(3)])

    def forward(self, x, encoder_output, src_mask, tgt_mask):
        x = self.res_con[0](x, lambda x: self.selfattn_block(x, x, x, tgt_mask))
        x = self.res_con[1](x, lambda x: self.crossattn_block(x, encoder_output,
                                                              encoder_output, src_mask))
        x = self.res_con[2](x, self.feedforward_block)
        return x

In [None]:
class Decoder(nn.Module):
    def __init__(self, features: int, layers: nn.ModuleList):
        super(Decoder, self).__init__()
        self.layers = layers
        self.norm = LayerNormalization(features)

    def forward(self, x, encoder_output, src_mask, tgt_mask):
        for layer in self.layers:
            x = layer(x, encoder_output, src_mask, tgt_mask)
        return self.norm(x)

In [None]:
class ProjectionLayer(nn.Module):
    def __init__(self, d_model, vocab_size) -> None:
        super(ProjectionLayer, self).__init__()
        self.proj = nn.Linear(d_model, vocab_size)

    def forward(self, x):
        # (batch, seq_len, d_model) --> (batch, seq_len, vocab_size)
        return self.proj(x)

### The Transformer Class (collection of all the above methods)

In [None]:
class Transformer(nn.Module):
    def __init__(self, encoder: Encoder, decoder: Decoder, src_embed: InputEmbedding,
                 tgt_embed: InputEmbedding, src_pos: PositionalEncoding,
                 tgt_pos: PositionalEncoding, proj_layer: ProjectionLayer) -> None:
        self.encoder = encoder
        self.decoder = decoder
        self.src_embed = src_embed
        self.tgt_embed = tgt_embed
        self.src_pos = src_pos
        self.tgt_pos = tgt_pos
        self.proj_layer = proj_layer

    def encode(self, src, src_mask):
        # (batch, seq_len, d_model)
        src = self.src_embed(src)
        src = self.src_pos(src)
        return self.encoder(src, src_mask)

    def decode(self, encoder_output, src_mask, tgt, tgt_mask):
        # (batch, seq_len, d_model)
        tgt = self.tgt_embed(tgt)
        tgt = self.tgt_pos(tgt)
        return self.decoder(tgt, encoder_output, src_mask, tgt_mask)

    def project(self, x):
        # (batch, seq_len, vocab_size)
        return self.proj_layer(x)

### Final Transformer Code

In [None]:
def build_transformer(src_vocab_size: int, tgt_vocab_size: int, src_seq_len: int,
                      tgt_seq_len: int, d_model: int = 512, Nx: int = 6, h: int = 8,
                      dropout: float = 0.1, d_ff: int = 2048) -> Transformer:
    # Create the input and output embedding layers
    src_embed = InputEmbedding(d_model, src_vocab_size)
    tgt_embed = InputEmbedding(d_model, tgt_vocab_size)

    # Create the input and output positional encoding layers
    src_pos = PositionalEncoding(d_model, src_seq_len, dropout)
    tgt_pos = PositionalEncoding(d_model, tgt_seq_len, dropout)

    # Create the encoder blocks
    encoder_blocks = []
    for _ in range(Nx):
        encoder_selfattn_block = MultiHeadAttentionBlock(d_model, h, dropout)
        encoder_feedforward_block = PositionWiseFeedForward(d_model, d_ff, dropout)
        encoder_block = EncoderBlock(d_model, encoder_selfattn_block,
                                     encoder_feedforward_block, dropout)
        encoder_blocks.append(encoder_block)

    # Create the decoder blocks
    decoder_blocks = []
    for _ in range(Nx):
        decoder_selfttn_block = MultiHeadAttentionBlock(d_model, h, dropout)
        decoder_crossattn_block = MultiHeadAttentionBlock(d_model, h, dropout)
        decoder_feedforward_block = PositionWiseFeedForward(d_model, d_ff, dropout)
        decoder_block = DecoderBlock(d_model, decoder_selfttn_block,
                                     decoder_crossattn_block, decoder_feedforward_block,
                                     dropout)
        decoder_blocks.append(decoder_block)

    # Create the encoder and decoder
    encoder = Encoder(d_model, nn.ModuleList(encoder_blocks))
    decoder = Decoder(d_model, nn.ModuleList(decoder_blocks))

    # Create the projection layer
    proj_layer = ProjectionLayer(d_model, tgt_vocab_size)

    # Create the transformer
    transformer = Transformer(encoder, decoder, src_embed, tgt_embed, src_pos,
                              tgt_pos, projection_layer)

    # Initialize the parameters
    for p in transformer.parameters():
        if p.dim() > 1:
            nn.init.xavier_uniform_(p)

    return transformer