In [2]:
import torch
import torch.nn as nn
import math
import numpy as np

## 1. input embedding


In [3]:
# first we;ll be building input embeddings
# allows to convert token into embedding of dim 1x52  : token -> input ID(position in vocab) ->embedding


class InputEmbeddings(nn.Module):
    def __init__(self, d_model: int, vocab_size: int):
        """

        Args:
            d_model (int): dim of vector
            vocab_size (int): # of words in vocab
        """

        super().__init__()
        self.d_model = d_model
        self.vocab_size = vocab_size
        self.embedding = nn.Embedding(num_embeddings=vocab_size, embedding_dim=d_model)

    def forward(self, x):
        return self.embedding(x)


# Example usage
input_embeddings = InputEmbeddings(d_model=512, vocab_size=1000)
# Create an example input tensor (batch size 1, sequence length 5, embedding dimension 20)
batch_of_sentences = torch.tensor([[5, 6, 7, 0, 0]])  # Shape: (batch_size, max_sentence_length)
print(batch_of_sentences.shape)


# Pass through the embedding layer
# The forward method is called automatically when you use the instance like a function.
embedded_sentences = input_embeddings(batch_of_sentences)
embedded_sentences.shape, embedded_sentences  # (batch, seq_len, embedding dim)

torch.Size([1, 5])


(torch.Size([1, 5, 512]),
 tensor([[[ 0.2098, -0.3947, -0.2909,  ..., -0.2254,  1.2406, -1.0059],
          [-0.1649, -0.2299, -0.4710,  ..., -0.8700, -0.7264, -0.6422],
          [-0.0617, -0.4992, -0.2882,  ...,  0.2205,  1.1892,  1.2710],
          [-2.2563, -1.6153,  0.7384,  ..., -1.1690, -0.6215,  0.0888],
          [-2.2563, -1.6153,  0.7384,  ..., -1.1690, -0.6215,  0.0888]]],
        grad_fn=<EmbeddingBackward0>))

In [4]:
vocab_size = 5
d_model = 6
nn.Embedding(num_embeddings=vocab_size, embedding_dim=d_model)

Embedding(5, 6)

## 2. positional encoding


In [5]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model: int, seq_len: int, dropout: float) -> None:
        """
                Since our model contains no recurrence and no convolution, in order for the model to make use of the
        order of the sequence, we must inject some information about the relative or absolute position of the
        tokens in the sequence. To this end, we add "positional encodings" to the input embeddings at the
        bottoms of the encoder and decoder stacks. The positional encodings have the same dimension dmodel
        as the embeddings, so that the two can be summed. There are many choices of positional encodings,
        learned and fixed [9].
        In this work, we use sine and cosine functions of different frequencies:
            `PE(pos,2i) = sin(pos/(10000)**2i/dmodel)`
            `PE(pos,2i+1) = cos(pos/(10000)**2i/dmodel)`
        where pos is the position and i is the dimension. That is, each dimension of the positional encoding
        corresponds to a sinusoid. The wavelengths form a geometric progression from 2π to 10000 · 2π. We
        chose this function because we hypothesized it would allow the model to easily learn to attend by
        relative positions, since for any fixed offset k, P E(pos+k) can be represented as a linear function of
        PE(pos).

        Keyword arguments:
        dropout -- to make model less overfit
        seq_len -- Specifies the maximum length of sequence that the model can handle. This helps determine the scale and range of the positional encodings.
        """
        super().__init__()
        self.d_model = d_model
        self.seq_len = seq_len
        self.dropout = nn.Dropout(dropout)

        # positional encodeing shape: seq_len X d_model i.e. each token will be represented (1*d_model) vector

        """
        
        formula :`PE(pos,2i) = cos(pos/(10000)**2i/dmodel) for i=1,3,5, ...and `PE(pos,2i) = sin(pos/(10000)**2i/dmodel) for i=2,4,6, ...and `
        
        """

        #  Create a model of shape (seq_len , d_model)

        pe = torch.zeros(seq_len, d_model)
        #  create a vector of shape(seq_len,1) to represent position of word in sequence

        position = torch.arange(0, seq_len, dtype=torch.float).unsqueeze(1)  # (seq_len,1)  # pos in formula
        # create denominator of formula
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))

        # apply sin to even positions
        pe[:, 0::2] = torch.sin(position * div_term)

        # apply cos to odd positions
        pe[:, 1::2] = torch.cos(position * div_term)

        # now we need to add batch dimension to these sentences so we can apply it to whole sentences, so to all the batch of sentence, because weill have batch of sentences.
        # adding batch dim
        pe = pe.unsqueeze(0)  # (1, seq_len, d_model)

        # register this tensor in buffer of module  .. it is done for the tensor that you want to keep inside the module, not as a lerarned parameter but you want it to be saved when you save the file of the model
        # you should register it as a buffer. this way the tensor would be saved in file along with state of model
        self.register_buffer("pe", pe)  # This is typically used to register a buffer that should not to be considered a model parameter.
        """
        Say you have a linear layer nn.Linear. You already have weight and bias parameters. But if you need a new parameter you use register_parameter() to register a new named parameter that is a tensor.
        When you register a new parameter it will appear inside the module.parameters() iterator, but when you register a buffer it will not.
        The difference:
        Buffers are named tensors that do not update gradients at every step, like parameters. For buffers, you create your custom logic (fully up to you).
        """

    def forward(self, x):
        """
        we need to add positional encoding to every token/word inside sequence/sentence
        """
        x = x + (self.pe[:, : x.shape[1], :]).requires_grad_(False)  # x:token and pe is positional encoding  # because we dont want to learn pe because these are fixed
        return self.dropout(x)


# Example usage
positional = PositionalEncoding(d_model=512, seq_len=5, dropout=0.5)

# Create an example input tensor (batch size , sequence length , embedding dimension )

# Apply positional encoding
positional_encoded = positional(embedded_sentences)
print("input ", embedded_sentences)

print("input shape", embedded_sentences.shape)
print("positional_encoded shape", positional_encoded.shape)

print(positional_encoded)  # (1, seq_len,d_model)

input  tensor([[[ 0.2098, -0.3947, -0.2909,  ..., -0.2254,  1.2406, -1.0059],
         [-0.1649, -0.2299, -0.4710,  ..., -0.8700, -0.7264, -0.6422],
         [-0.0617, -0.4992, -0.2882,  ...,  0.2205,  1.1892,  1.2710],
         [-2.2563, -1.6153,  0.7384,  ..., -1.1690, -0.6215,  0.0888],
         [-2.2563, -1.6153,  0.7384,  ..., -1.1690, -0.6215,  0.0888]]],
       grad_fn=<EmbeddingBackward0>)
input shape torch.Size([1, 5, 512])
positional_encoded shape torch.Size([1, 5, 512])
tensor([[[ 0.4196,  1.2105, -0.5818,  ...,  1.5491,  2.4813, -0.0000],
         [ 1.3532,  0.6207,  0.7017,  ...,  0.0000, -1.4526,  0.7157],
         [ 1.6952, -1.8307,  1.2964,  ...,  2.4411,  2.3788,  0.0000],
         [-4.2303, -5.2106,  1.9669,  ..., -0.3380, -0.0000,  2.1776],
         [-6.0262, -0.0000,  0.1624,  ..., -0.0000, -1.2421,  2.1776]]],
       grad_fn=<MulBackward0>)


### 2.1 register_buffer


In [6]:
class MyModule(nn.Module):
    def __init__(self):
        super(MyModule, self).__init__()

        # Register a buffer tensor with zeros
        self.register_buffer("buffer_tensor", torch.zeros(3, 3))

        # Register another buffer tensor with specific values
        data = torch.tensor([[1, 2, 3], [4, 5, 6], [7, 8, 9]], dtype=torch.float32)
        self.register_buffer("another_buffer", data)

    def forward(self, x):
        # Use the buffer tensors in the forward pass
        output = x + self.buffer_tensor
        return output


# Create an instance of MyModule
model = MyModule()

# Print the module to see its structure
print(model)

# Accessing the buffer tensors
print("Buffer tensor:")
print(model.buffer_tensor)

print("\nAnother buffer tensor:")
print(model.another_buffer)

MyModule()
Buffer tensor:
tensor([[0., 0., 0.],
        [0., 0., 0.],
        [0., 0., 0.]])

Another buffer tensor:
tensor([[1., 2., 3.],
        [4., 5., 6.],
        [7., 8., 9.]])


## 3. Add & Norm - layer normalization

for each item in the batch, calculte mean & var, and normalize each item so that each has mean=0, and var of 1(z-standardization), Beta and Gamma are also learnt to minimize the data flactuation as having values between - and 1 might be too restrictive.

new xj = (xj -meanj) / math.sqrt(var\*\*2 + epsilon)

simplified version: `x = α * (x - μ) / (σ + ε) + β`

gamma(multiplication) and beta(addition) will be learnt after this. epsilon is for numericalsatability as if denominator gets very small, overall number would be difficult to manage percision wise.


In [7]:
class LayerNormalization(nn.Module):
    def __init__(self, eps: float = 10**-6) -> None:
        super().__init__()
        self.eps = eps  # epsilon
        self.alpha = nn.Parameter(torch.ones(1))  # gamma  # mulltiplied
        self.bias = nn.Parameter(torch.zeros(1))  # added

    def forward(self, x):
        # print(x.shape)
        mean = x.mean(dim=-1, keepdim=True)
        std = x.std(dim=-1, keepdim=True)
        # print("mean shape", mean.shape, mean)

        return self.alpha * (x - mean) / (std + self.eps) + self.bias


ln = LayerNormalization()

# print("Before normalization:")
# print(positional_encoded)

normalized = ln(positional_encoded)
# print("After normalization:")
print(normalized.shape)
normalized  # (1, seq_len,d_model)

torch.Size([1, 5, 512])


tensor([[[-0.0627,  0.4125, -0.6643,  ...,  0.6159,  1.1759, -0.3148],
         [ 0.4697,  0.0163,  0.0664,  ..., -0.3680, -1.2672,  0.0750],
         [ 0.7076, -1.4773,  0.4605,  ...,  1.1698,  1.1312, -0.3429],
         [-2.9017, -3.4909,  0.8233,  ..., -0.5621, -0.3590,  0.9499],
         [-3.5371, -0.3253, -0.2388,  ..., -0.3253, -0.9873,  0.8353]]],
       grad_fn=<AddBackward0>)

## 4. feed forward block

In addition to attention sub-layers, each of the layers in our encoder and decoder contains a fully
connected feed-forward network, which is applied to each position separately and identically. This
consists of two linear transformations with a ReLU activation in between.

`FFN(x) = max(0, xW1 + b1)W2 + b2 (2)` # two lyers with ReLu in between

While the linear transformations are the same across different positions, they use different parameters
from layer to layer. Another way of describing this is as two convolutions with kernel size 1.
The dimensionality of input and output is dmodel = 512, and the inner-layer has dimensionality
dff = 2048.


In [8]:
class FeedForwardBlock(nn.Module):
    def __init__(self, d_model: int, d_ff: int, dropout: float) -> None:
        super().__init__()
        self.linear_1 = nn.Linear(d_model, d_ff, bias=True)  # first layer: w1,b1
        self.dropout = nn.Dropout(dropout)
        self.linear_2 = nn.Linear(d_ff, d_model, bias=True)  # second layer: w2,b2

    def forward(self, x):
        # input:(batch, seq_len, d_model)

        # after first layer: (batch, seq_len, d_ff)

        # after second layer: (batch, seq_len, d_model)

        return self.linear_2(self.dropout(torch.relu(self.linear_1(x))))


feedforwardblock = FeedForwardBlock(d_model=512, d_ff=2048, dropout=0.5)

print("Before feedforwardblock:")
print(normalized.shape, normalized)

feedforwarded = feedforwardblock(normalized)
# print("After normalization:")
print(feedforwarded.shape)
feedforwarded  # (1, seq_len,d_model)

Before feedforwardblock:
torch.Size([1, 5, 512]) tensor([[[-0.0627,  0.4125, -0.6643,  ...,  0.6159,  1.1759, -0.3148],
         [ 0.4697,  0.0163,  0.0664,  ..., -0.3680, -1.2672,  0.0750],
         [ 0.7076, -1.4773,  0.4605,  ...,  1.1698,  1.1312, -0.3429],
         [-2.9017, -3.4909,  0.8233,  ..., -0.5621, -0.3590,  0.9499],
         [-3.5371, -0.3253, -0.2388,  ..., -0.3253, -0.9873,  0.8353]]],
       grad_fn=<AddBackward0>)
torch.Size([1, 5, 512])


tensor([[[-0.5375,  0.0354, -0.0791,  ...,  0.5518,  0.1221, -0.3908],
         [-0.7616,  0.3545,  0.2357,  ..., -0.1363,  0.6434,  0.5094],
         [-0.4188, -0.3181,  0.4288,  ...,  0.1878, -0.2010, -0.0597],
         [-0.1479,  0.2047,  0.1629,  ...,  0.6414, -0.7571, -0.3854],
         [ 0.2172, -0.0781, -0.1686,  ...,  0.1476, -0.6196,  0.2035]]],
       grad_fn=<ViewBackward0>)

# 5. Multi-head attention

takes input:(seq_len, d_model) of encoder and uses it three times k:key, q:query, v:values. then we multiply these matrices with Wk, Wq and Wv respectively. resulting in K',Q',V' of same(seq_len, d_model) dim. Now,split each of K', Q' and V' into h parts along d_model(embedding) dim where h is number of head. So that each head will have access to full sentence but different part of embedding of each token.

Now, apply following formulas to each head which will result into h matrices of `(seq_len, d_k)` dims where `d_k` = `d_model/h`

$$
\text{Attention}(Q, K, V) = \text{softmax}\left(\frac{QK^T}{\sqrt{d_k}}\right)V
$$

$$
\text{head}_i = \text{Attention}(QW^Q_i, K W^K_i, V W^V_i)
$$

Now concatenate all heads,

$$
\text{MultiHead(Q, K, V)} = \text{Concatenate}(\text{head}_1, \text{head}_2, \ldots, \text{head}_h) W^o
$$

![alt text](02_transformer/MHA.png)

W^o is of `(seq, h*d_v)` shape where `d_v = d_k`

and resultant MH-A is `(seq_len, d_model)` same as input

But we also have to consider batch_dim for dealing with multiple sentences; the above intition works for single sentence.

`SO WE WILL CONSIDER BATCH DIMENSION.`

---

### MASK

before applying multiplying with V meaning

$$
\text{Attention}(Q, K, V) = \text{softmax}\left(\frac{QK^T}{\sqrt{d_k}}\right)
$$

we get a scaled dot profuct of q and k, its (seq_len, seq_len) matrix. this shows interaction of each words with each other word.

If we dont want some words to interact with other words, we basically replace there attention score(before applying softmax) with very small value, which means after softmax these values will become zero,so basically we hide attention between those two words.


In [9]:
class MultiHeadAttentionBlock(nn.Module):
    def __init__(self, d_model: int, h: int, dropout: float) -> None:
        super().__init__()
        self.d_model = d_model
        self.h = h
        self.dropout = dropout
        assert d_model % h == 0, "d_model is not divisible by h"
        self.d_k = d_model // h

        # define weight matrices
        self.wq = nn.Linear(d_model, d_model)  # wq
        self.wk = nn.Linear(d_model, d_model)  # wk
        self.wv = nn.Linear(d_model, d_model)  # wv

        # output matrix Wo (h*dv, d_model) where dv = dk
        self.wo = nn.Linear(d_model, d_model)  # wo
        self.dropout = nn.Dropout(dropout)

    @staticmethod  # so we could cal fn wothout specifying class instance
    def attention(query, key, value, mask, dropout: nn.Dropout):
        d_k = query.shape[-1]  # last dim of query/key/value

        # (batch, h, seq_len, d_k) -> # (batch, h, seq_len, seq_len)
        attention_scores = (query @ key.transpose(-2, -1)) / math.sqrt(d_k)  # transpose(-2, -1): transpose last two dims

        # apply mask: just replace values you want to mask with very small values

        if mask is not None:
            attention_scores.masked_fill(mask == 0, -1e9)  # replace all values where mask==0 (conidtion is true) with -1e9

        # applying softmax
        attention_scores = attention_scores.softmax(dim=-1)  # (batch_size, h, seq_len,seq_len)

        if dropout is not None:
            attention_scores = dropout(attention_scores)

        return (attention_scores @ value), attention_scores

    def forward(self, q, k, v, mask):
        # (batch, seq_len, d_model) -> (batch, seq_len, d_model)
        query = self.wq(q)
        key = self.wq(k)
        value = self.wq(v)

        # splitting
        # (batch, seq_len, d_model) -> (batch, seq_len, h, d_k) -> (batch, h, seq_len, d_k)
        # we moved h dimension because we want each head to consider (seq_len, d_k)
        # each head considers full sentence but smaller embedding
        query = query.view(query.shape[0], query.shape[1], self.h, self.d_k).transpose(1, 2)
        key = key.view(key.shape[0], key.shape[1], self.h, self.d_k).transpose(1, 2)
        value = value.view(value.shape[0], value.shape[1], self.h, self.d_k).transpose(1, 2)

        # (batch, h, seq_len, d_k)
        x, self.attention_scores = MultiHeadAttentionBlock.attention(query=query, key=key, value=value, mask=mask, dropout=self.dropout)

        # (batch, h, seq_len, d_k) -> (batch, seq_len, h, d_k)
        x = x.transpose(1, 2)

        # (batch, seq_len, d_model)
        x = x.contiguous().view(x.shape[0], -1, self.h * self.d_k)

        # (batch, seq_len, d_model)
        return self.w_o(x)

## 5. Residual/skip connection

between add & Norm and previous layer


In [10]:
class ResidualConnection(nn.Module):
    def __init__(self, dropout: float) -> None:
        super().__init__()
        self.dropout = dropout
        self.norm = LayerNormalization()

    def forward(self, x, sublayer):  # sublayer:previous layer
        """take x and combine with output of next layer"""

        return x + self.dropout(sublayer(self.norm(x)))

## 6. Encoderblock

![alt text](<02_transformer/Screenshot from 2024-07-22 14-45-21.png>)

it will contain one multi-head attention, two Add&Norm, one Feed forward block and two residual connections


In [11]:
class EncoderBlock(nn.Module):
    def __init__(self, self_attention_block: MultiHeadAttentionBlock, feed_forward_block: FeedForwardBlock, dropout: float) -> None:
        super().__init__()

        self.self_attention_block = self_attention_block
        self.feed_forward_block = feed_forward_block
        self.residual_connection = nn.ModuleList([ResidualConnection(dropout=dropout) for _ in range(2)])

    def forward(self, x, src_mask):
        """
        src_mask: mask we want to apply to input of encoder. we need this to hide interaction of padding word with other words.

        """

        # multihead attention within

        x = self.residual_connection[0](x, lambda x: self.self_attention_block(x, x, x, src_mask))

        """
        The lambda is used because self_attention_block needs four arguments (query, key, value, mask) 
        but ResidualConnection expects a function that takes only one argument.
        The lambda allows us to create a function that takes one argument x and expands it to the required four arguments, including the src_mask.
        """
        x = self.residual_connection[1](x, self.feed_forward_block)
        return x

## Encoder

is made up of many encoder

Each encoder block is repeated Nx times

![alt text](<02_transformer/Screenshot from 2024-07-22 14-45-21.png>)


In [12]:
class Encoder(nn.Module):
    def __init__(self, layers: nn.ModuleList, **kwargs) -> None:
        super().__init__()
        self.layers = layers

        self.norm = LayerNormalization()  # at end

    def forward(self, x, mask):
        # apply on layer after another # order matters
        for layer in self.layers:
            x = layer(x, mask)
        return self.norm(x)

# Decoder

![alt text](<02_transformer/Screenshot from 2024-07-22 15-50-17.png>)


## output embeddings

output embeddings are same as input embeddings, so weill just intialize it twice

masked attention is some what same as self attenntion because of 3 same inputs while Mulihead attention block is actually cross attension bea=cause key and value are cping from encoder.


In [13]:
class DecoderBlock(nn.Module):
    def __init__(self, self_attention_block: MultiHeadAttentionBlock, cross_attention_block: MultiHeadAttentionBlock, feed_forward_block: FeedForwardBlock, dropout: float) -> None:
        super().__init__()
        self.self_attention_block = self_attention_block
        self.cross_attention_block = cross_attention_block
        self.feed_forward_block = feed_forward_block

        # we have three residual connections
        self.residual_connections = nn.ModuleList([ResidualConnection(dropout=dropout) for _ in range(3)])
        self.dropout = dropout

    def forward(self, x, encoder_output, src_mask, tgt_mask):
        """


        Args:
            x: input of decoder
            src_mask: mask applied to encoder
            tgt_mask: target mask applied to decoder



        src_mask and tgt_mask because we are dealing with language transalation. SO, source language is English and target language is italian

        """

        # i. masked multihead attention: first residual connection
        x = self.residual_connections[0](x, lambda x: self.self_attention_block(x, x, x, tgt_mask))  # tgt_mask:becasue its decoder

        # ii. cross attention: second residual connection
        x = self.residual_connections[1](x, lambda x: self.cross_attention_block(x, encoder_output, encoder_output, src_mask))

        x = self.residual_connections[2](x, feedforwardblock)
        return x

In [None]:
class Decoder(nn.Module):
    """build decoder which is n times DecoderBlock one after anotherjust we did for encoder

    Args:
        nn (_type_): whic
    """

    def __init__(self, layers: nn.ModuleList) -> None:
        super().__init__()
        self.layers = layers
        self.norm = LayerNormalization()

    def forward(self, x, encoder_output, src_mask, tgt_mask):
        for layer in self.layers:
            # each layer is a decoderblock
            x = layer(x, encoder_output, src_mask, tgt_mask)
        return self.norm(x)

# Projection/linear layer

![alt text](<02_transformer/Screenshot from 2024-07-23 10-50-34.png>)

output of multihead attention is (seq_len, d_model)

However we want to these words back into vocabularly which convert embedding to position in vocab


In [15]:
class ProjectionLayer(nn.Module):
    def __init__(self, d_moel: int, vocab_size: int) -> None:
        """
        this is a linear layer that is converting from d_model to vocab_size

        """

        super().__init__()
        self.proj = nn.Linear(d_model, vocab_size)

    def forward(self, x):
        # (batch, seq_len, d_model) --> (batch seq_len, vocab_size)
        # The purpose of applying softmax is to convert the raw output of the linear layer into a probability distribution over the vocabulary.
        #  we will also apply softmax, specifically log_softmax for mathematiacal stability

        return torch.log_softmax(self.proj(x), dim=-1)