In [2]:
import torch
import math
import torch.nn as nn
from typing_extensions import Annotated


class PositionalEncoding(nn.Module):
    def __init__(self,
                 max_len: Annotated[int, "It means how many no of words are there in a sequence"],
                 d_model: Annotated[int, "It tells in how many dimension each and every word represents"]) -> None:
        super().__init__()
        positional_encoder = torch.zeros(
            max_len, d_model)  # (max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(
            1).float()  # (max_len, 1)

        div_term = torch.exp(torch.arange(0, d_model, 2).float(
        ) * (-math.log(10000.0) / d_model))  # (d_model//2,)

        positional_encoder[:, 0::2] = torch.sin(position * div_term)
        positional_encoder[:, 1::2] = torch.cos(position * div_term)

        # Shape it to (1, max_len, d_model) for broadcasting with input: (batch_size, seq_len, d_model)
        positional_encoder = positional_encoder.unsqueeze(0)

        self.register_buffer("positional_encoder", positional_encoder)

    def forward(self, input_data: torch.Tensor) -> torch.Tensor:
        """
        input_data: shape (batch_size, seq_len, d_model)
        returns: same shape with positional encoding added
        """
        seq_len = input_data.size(1)
        return input_data + self.positional_encoder[:, :seq_len]


In [3]:
import math
import torch
import torch.nn as nn
from typing import List, Annotated, Union


class MultiHeadAttention(nn.Module):
    def __init__(self, num_heads: Annotated[int, "No of self attention needed"],
                 embed_dim: Annotated[int, "dimension of each word"],
                 bias: Annotated[bool, "Required bias during trining"] = False,) -> None:
        super().__init__()
        assert embed_dim % num_heads == 0, "embed_dim % num_heads != 0"
        self.num_heads = num_heads
        self.embed_dim = embed_dim
        self.head_dim = self.embed_dim // self.num_heads
        self.wq = nn.Linear(embed_dim, embed_dim, bias=bias)
        self.wk = nn.Linear(embed_dim, embed_dim, bias=bias)
        self.wv = nn.Linear(embed_dim, embed_dim, bias=bias)
        self.output_projection = nn.Linear(self.embed_dim, self.embed_dim)

    def forward(self,
                q_input: Annotated[torch.Tensor, "batch of data from the input data"],
                k_input: Union[torch.Tensor, None] = None,
                v_input: Union[torch.Tensor, None] = None,
                mask: Annotated[bool, "normal MHA or masked MHA?"] = False) -> torch.Tensor:
        batch = q_input.size(0)

        # Below code make sures algorithm is self attention not cross attention
        if k_input is None:
            k_input = q_input
        if v_input is None:
            v_input = q_input

        q = self.wq(q_input)
        k = self.wk(k_input)
        v = self.wv(v_input)

        # Sequence length can be diffrent for input data and output data
        T_q, T_k = q_input.size(1), k_input.size(1)

        # Split the q, k, v(embed_dim) dimension as (num_head, embed_dim / num_head)
        q = q.reshape(batch, T_q, self.num_heads,
                      self.head_dim).transpose(1, 2)
        k = k.reshape(batch, T_k, self.num_heads,
                      self.head_dim).transpose(1, 2)
        v = v.reshape(batch, T_k, self.num_heads,
                      self.head_dim).transpose(1, 2)

        # Calculate Attention
        k_transpose = k.transpose(-2, -1)  # (b, k, d) (b, d, p) = (b, k, p)
        score = (q @ k_transpose) / math.sqrt(self.head_dim)
        mask = torch.triu(torch.ones(T_q, T_k), diagonal=1).bool(
        ) if mask else torch.zeros(T_q, T_k).bool()

        # Anyhow broadcasting works no need of unsqeeze but its good practice to
        # avoid broadcasting in Attentions, but clearly this step is optional
        mask = mask.unsqueeze(0).unsqueeze(0)
        score = score.masked_fill(mask, float("-inf"))
        attention_score = torch.softmax(score, dim=-1)
        attention = attention_score @ v

        # concat output of all heads
        attention = attention.transpose(1, 2)

        # Attention should have sequence of length = output sequence length.
        attention = attention.reshape(batch, T_q, self.embed_dim)

        # Since they are simple concatination to acutally mix all heads details we need a linear layer

        mha_output = self.output_projection(attention)
        return mha_output


In [4]:
import math
import torch
import torch.nn as nn

class LayerNormalization(nn.Module):
    def __init__(self, embed_dim:int, eps:float = 1e-9) -> None:
        super().__init__()
        self.alpha = nn.Parameter(torch.ones(embed_dim)).float()
        self.beta = nn.Parameter(torch.ones(embed_dim)).float()
        self.eps = eps

    def forward(self, input_data:torch.Tensor) -> torch.Tensor:
        # Assume input dim(2, 3, 6)
        mean = torch.mean(input_data, dim=-1, keepdim=True) # (2, 3, 1)
        std = torch.std(input_data, dim=-1, keepdim=True) # (2, 3, 1)

        # to normalize (2, 3, 6) - (2, 3, 1) = (2, 3, 6) due to broadcasting
        normalized_input_data = (input_data - mean) / (std + self.eps)

        # some weights do not require normalized output so alpha learnable parameter is introduced 
        return self.alpha * normalized_input_data + self.beta
        

In [5]:
import math
import torch
import torch.nn as nn
from typing import List, Annotated

class FeedForward(nn.Module):
    def __init__(self, embed_dim:int, hidden_dim:int, dropout:float = 0.1,  bias:bool = True):
        super().__init__()
        self.w1 = nn.Linear(embed_dim, hidden_dim, bias=bias)
        self.dropout = nn.Dropout(dropout)
        self.relu = nn.ReLU()
        self.w2 = nn.Linear(hidden_dim, embed_dim, bias=bias)

    def forward(self, input_data:torch.Tensor) -> torch.Tensor:
        layer = self.w1(input_data)
        output = self.relu(layer)
        dropout = self.dropout(output)
        layer = self.w2(dropout)
        
        return layer


In [15]:
class DecoderBlock(nn.Module):
    def __init__(self, embed_dim:int, hidden_dim:int,num_heads:int, dropout = 0.1, bias:bool = False):
        super().__init__()
        self.mmha = MultiHeadAttention(num_heads=num_heads, embed_dim=embed_dim)
        self.cmha = MultiHeadAttention(num_heads=num_heads, embed_dim=embed_dim)
        self.ln1 = LayerNormalization(embed_dim=embed_dim)
        self.ln2 = LayerNormalization(embed_dim=embed_dim)
        self.ln3 = LayerNormalization(embed_dim=embed_dim)
        self.ffn = FeedForward(embed_dim=embed_dim, hidden_dim=hidden_dim, dropout=dropout, bias=bias)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x:torch.Tensor, k_encoder_output, v_encoder_output) -> torch.Tensor:
        # Multi-head attention + residual + norm
        x_mha = self.mmha(x, mask = True)
        x = self.ln1(x + self.dropout(x_mha))

        # Cross_Attention
        x_cmha = self.cmha(q_input=x, k_input=k_encoder_output, v_input=v_encoder_output)
        x = self.ln2(x + self.dropout(x_cmha))

        # Feedforward + residual + norm
        x_ffn = self.ffn(x)
        x = self.ln3(x + self.dropout(x_ffn))
        return x



class Decoder(nn.Module):
    def __init__(self, Nx, embed_dim, num_heads, ff_hidden_dim, dropout=0.1):
        super().__init__()
        self.layers = nn.ModuleList([
            DecoderBlock(embed_dim, ff_hidden_dim, num_heads, dropout)
            for _ in range(Nx)
        ])
    
    def forward(self, x: torch.Tensor, k_encoder_output: torch.Tensor, v_encoder_output: torch.Tensor) -> torch.Tensor:
        for layer in self.layers:
            x = layer(x, k_encoder_output, v_encoder_output)
        return x



In [16]:
class EncoderBlock(nn.Module):
    def __init__(self, seq_len:int, embed_dim:int, hidden_dim:int,num_heads:int, dropout = 0.1, bias:bool = False):
        super().__init__()
        self.mha = MultiHeadAttention(num_heads=num_heads, embed_dim=embed_dim)
        self.ln1 = LayerNormalization(embed_dim=embed_dim)
        self.ln2 = LayerNormalization(embed_dim=embed_dim)
        self.ffn = FeedForward(embed_dim=embed_dim, hidden_dim=hidden_dim, dropout=dropout, bias=bias)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x:torch.Tensor) -> torch.Tensor:
        # Multi-head attention + residual + norm
        x_mha = self.mha(x)
        x = self.ln1(x + self.dropout(x_mha))

        # Feedforward + residual + norm
        x_ffn = self.ffn(x)
        x = self.ln2(x + self.dropout(x_ffn))
        return x



class Encoder(nn.Module):
    def __init__(self, Nx, embed_dim, seq_len, num_heads, ff_hidden_dim, dropout = 0.1, bias:bool = False):
        super().__init__()
        self.layers = nn.ModuleList([
            EncoderBlock(seq_len, embed_dim, ff_hidden_dim, num_heads, dropout, bias)
            for _ in range(Nx)
        ])
    
    def forward(self, x):
        for layer in self.layers:
            x = layer(x)
        return x



In [None]:
class TokenEmbeddings(nn.Module):
    def __init__(self, vocab_size:int, embed_dim:int):
        super().__init__()
        self.embed_dim = embed_dim
        self.embeddings = nn.Embedding(vocab_size, embed_dim)

    def forward(self, x:torch.Tensor) -> torch.Tensor:
        return self.embeddings(x) * math.sqrt(self.embed_dim)




In [22]:
class Transformers(nn.Module):
    def __init__(self, vocab_size:int ,embed_dim:int, seq_len:int, num_heads:int, 
                 ff_hidden_dim:int, Nx:int, dropout:float = 0.1, bias:bool = False, eps:float = 1e-9) -> None:
        super().__init__()
        self.embeddings = TokenEmbeddings(vocab_size=vocab_size, embed_dim=embed_dim)
        self.pe = PositionalEncoding(max_len=seq_len, d_model=embed_dim)
        self.encoder = Encoder(Nx=Nx, embed_dim=embed_dim, ff_hidden_dim = ff_hidden_dim, dropout=dropout, num_heads=num_heads, seq_len=seq_len)
        self.decoder = Decoder(Nx=Nx, embed_dim=embed_dim, ff_hidden_dim = ff_hidden_dim, dropout=dropout, num_heads=num_heads)
        self.output_proj = nn.Linear(embed_dim, embed_dim)
        self.softmax = nn.Softmax(dim=-1)


    def forward(self, query, key:Union[torch.Tensor, None] = None, value: Union[torch.Tensor, None] = None, mask = False) -> torch.Tensor:
        query = self.embeddings(query)
        pe_q = self.pe(query)
        pe_k = self.pe(self.embeddings(key)) if key is not None else None
        pe_v = self.pe(self.embeddings(value)) if value is not None else None

        encoder_output = self.encoder(pe_q)
        decoder_output = self.decoder(pe_q, pe_k, pe_v)
        linearized = self.output_proj(decoder_output)
        output_probabilities = self.softmax(linearized)
        return output_probabilities






In [25]:
params = {"Nx" : 8,
"embed_dim" : 10,
"seq_len": 8,
"num_heads" : 2,
"ff_hidden_dim" : 1024,
"dropout" : 0.1,
"bias" : False,
"vocab_size":512}

tns = Transformers(**params)

In [26]:
sample_input = torch.tensor([
    [1, 5, 23, 67, 2],
    [45, 234, 12, 6, 8]
]) 

tns(sample_input)

tensor([[[0.1923, 0.1441, 0.0566, 0.1095, 0.0764, 0.0717, 0.1334, 0.0418,
          0.0834, 0.0909],
         [0.2031, 0.1374, 0.0552, 0.0944, 0.0850, 0.0781, 0.1314, 0.0491,
          0.0824, 0.0840],
         [0.2056, 0.2036, 0.0416, 0.0985, 0.0583, 0.0653, 0.1383, 0.0356,
          0.0846, 0.0685],
         [0.2126, 0.1533, 0.0556, 0.0745, 0.0873, 0.0805, 0.1289, 0.0401,
          0.0934, 0.0738],
         [0.1624, 0.1766, 0.0638, 0.0765, 0.0934, 0.0752, 0.1740, 0.0341,
          0.0609, 0.0830]],

        [[0.2026, 0.1468, 0.0534, 0.0713, 0.0736, 0.0967, 0.1287, 0.0416,
          0.1041, 0.0813],
         [0.1528, 0.1221, 0.0880, 0.0526, 0.1444, 0.0570, 0.1643, 0.0525,
          0.0862, 0.0800],
         [0.1789, 0.1241, 0.0662, 0.0825, 0.0930, 0.0725, 0.1446, 0.0504,
          0.0917, 0.0961],
         [0.1882, 0.1271, 0.0587, 0.1083, 0.0825, 0.0734, 0.1255, 0.0494,
          0.0988, 0.0880],
         [0.1384, 0.1279, 0.0912, 0.1074, 0.0924, 0.0989, 0.1387, 0.0337,
          0.052