<a href="https://colab.research.google.com/github/shusank8/Transformers/blob/main/Transformers.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
print("Transformers.... Excited")

Transformers.... Excited


In [None]:
# imports
import torch
import torch.nn as nn
import math

In [None]:
class InputEmbeddings(nn.Module):

  def __init__(self, vocab_size, embdim):
    super().__init__()
    self.embeddings = nn.Embedding(vocab_size, embdim)

  def forward(self, x):
    return self.embeddings(x)


In [None]:
class PositionalEmbeddings(nn.Module):

  def __init__(self, block_size, embdim, dropout):
    super().__init__()
    self.dropout = nn.Dropout(dropout)

    pe = torch.zeros(block_size, embdim)

    position = torch.arange(0, block_size, dtype = torch.float).unsqueeze(1)

    div_term = torch.exp(torch.arange(0, embdim, 2).float() * (-math.log(10000.0)/embdim))

    pe[:, 0::2] = torch.sin(position*div_term)
    pe[:, 1::2] = torch.cos(position*div_term)
    pe = pe.unsqueeze(0)
    self.register_buffer('pe', pe)

  def forward(self, x):
    x = self.pe[:, :x.shape[1],:]
    x = self.dropout(x)
    return x

In [None]:
class LayerNormalization(nn.Module):

  def __init__(self, embdim):
    self.alpha = nn.Parameters(torch.ones(embdim))
    self.bias = nn.Parameters(torch.zeros(embdim))

  def forward(self, x):
    xmean = x.mean(dim=-1, keepdim=True)
    xvar = x.var(dim=-1, keepdim=True)
    x = self.alpha*((x-xmean)/(xvar+eps)**(1/2))+self.bias
    return x



In [None]:
class FeedForward(nn.Module):

  def __init__(self, embdim, dropout):
    super().__init__()
    self.m = nn.Sequential(
        nn.Linear(embdim, 3*embdim),
        nn.ReLU(),
        nn.Linear(3*embdim, embdim),
        nn.Dropout(dropout)
    )

  def forward(self, x):
    x = self.m(x)
    return x


In [None]:
class MultiHeadAttentionBlock(nn.Module):

  def __init__(self, embdim, no_of_heads, dropout):

    self.q = nn.Linear(embdim, embdim)
    self.k = nn.Linear(embdim, embdim)
    self.v = nn.Linear(embdim, embdim)
    self.proj = nn.Linear(embdim, embdim)
    self.no_of_heads = no_of_heads
    dropout = nn.Dropout(dropout)

  @staticmethod
  def attention(query, key, value, mask, dropout):
    head_dim = query.shape[-1]
    attention_scores = (query@key.transpose(-2,-1))/math.sqrt(head_dim)
    if mask is not None:
      attention_scores.masked_fill(mask==0, float("-inf"))
    attention_scores = attention_scores.softmax(dim=-1)
    if dropout is not None:
      attention_scores = dropout(attention_scores)
    return (attention_scores@value), attention_scores



  def forward(self, query, key, val, mask):
    # for self attn query==key==val but cross attn
    q = self.q(query)
    k = self.k(key)
    v = self.v(val)
    hdim = q.shape[-1]//self.no_of_heads
    # shape of q=> (B, T, C) BUT WE WANT TO BREAK C INTO DIFF HEADS
    # (B,T,NO_OF_HEADS, HEADIM) WHERE NO_OF_HEADS * HEADIM = C
    query = q.view(q.shape[0], q.shape[1], self.no_of_heads, hdim).transpose(1,2)
    key = k.view(k.shape[0], k.shape[1], self.no_of_heads, hdim).transpose(1,2)
    v = v.view(v.shape[0], v.shape[1], self.no_of_heads, hdim).transpose(1,2)

    x, attn_scores = MultiHeadAttentionBlock(q, k, v, mask, dropout)
    x = x.transpose(1,2).contiguous().view(x.shape[0], -1, embdim)
    return self.proj(x)


In [None]:
class ResidualConnection(nn.Module):
  def __init__(self, dropout, embdim):
    super().__init__()
    self.dropout  = nn.Dropout(dropout)
    self.norm = LayerNormalization(embdim)

  def forward(self, x, sublayer):
    return x+ self.dropout(sublayer(self.norm(x)))

In [None]:
class EncoderBlock(nn.Module):

  def __init__(self, s_attn, ffwd, dropout):
    super().__init__()
    self.selfattn = s_attn
    self.ffwd = ffwd
    self.residual_connections = nn.ModuleList([ResidualConnection(dropout) for _ in range(2)])

  def forward(self, x, src_mask):
    x = self.residual_connections[0](x, lambda x: self.selfattn(x,x,x,src_mask))
    x = self.residual_connections[1](x, self.ffwd)
    return x

In [None]:
class Encoder(nn.Module):

  def __init__(self, layers, embdim):
    super().__init__()
    self.layers = layers
    self.norm = LayerNormalization(embdim)

  def forward(self, x, mask):
    for layer in self.layers:
      x = layer(x, mask)
    return self.norm(x)

In [None]:
class DecoderBlock(nn.Module):
  def __init__(self, selfattn, crossattn, ffwd, dropout):
    super().__init__()
    self.selfattn = selfattn
    self.crossattn = crossattn
    self.ffwd = ffwd
    self.residual_connections = nn.ModuleList([ResidualConnection(dropout) for _ in range(3)])

  def forward(self, x, encoder_output, src_mask, tgt_mask):
    x = self.residual_connections[0](x, lambda x: self.selfattn(x,x,x, tgt_mask))
    x = self.residual_connections[1](x, lambda x: self.crossattn(x, encoder_output, encoder_output, src_mask))
    x = self.residual_connections[2](x, self.ffwd)
    return x





In [None]:
class Decoder(nn.Module):

  def __init__(self, layers, embdim):
    super().__init__()
    self.layers = layers
    self.norm = LayerNormalization(embdim)

  def forward(self, x, encoder_output, src_mask, tgt_mask):
    for layer in self.layers:
      x = layer(x, encoder_output, src_mask, tgt_mask)
    return self.norm(x)

In [None]:
class ProjectionLayer(nn.Module):

  def __init__(self, embdim, vocab_size):
    super().__init__()
    self.proj = nn.Linear(embdim, vocab_size)
  def forward(self, x):
    return torch.log_softmax(self.proj(x), dim=-1)

In [None]:
class Transformer(nn.Module):

  def __init__(self, encoder, decoder, src_embed, tgt_embd, src_pos, tgt_pos, projection_layer):
    super().__init__()
    self.encoder = encoder
    self.decoder = decoder
    self.src_embd = src_embed
    self.tgt_embd = tgt_embd
    self.src_pos = src_pos
    self.tgt_pos = tgt_pos
    self.proj_layer = projection_layer

  def encode(self, src, src_mask):
    src = self.src_embd(src)
    src = self.src_pos(src)
    return self.encoder(src, src_mask)

  def decode(self, encoder_output, src_mask, tgt, tgt_mask):
    tgt = self.tgt_embd(tgt)
    tgt = self.tgt_pos(tgt)
    return self.decoder(tgt, encoder_output, src_mask, tgt_mask)
  def projection(self, x):
    return self.projection_layer(x)


In [None]:
def build_transformer(src_vocab_size,tgt_vocab_size, src_seq_len, tgt_seq_len, embdim, n_of_layers, no_of_heads, dropout):
  src_embd = InputEmbeddings(src_vocab_size, embdim)
  tgt_embd = InputEmbeddings(tgt_vocab_size, embdim)

  src_pos = PositionalEmbeddings(src_seq_len, embdim, dropout)
  tgt_pos = PositionalEmbeddings(tgt_seq_len, embdim, dropout)

  encoder_blocks = []
  for _ in range(n_of_layers):
    encoder_sa = MultiHeadAttentionBlock(embdim, no_of_heads, dropout)
    encoder_ffd = FeedForward(embdim, dropout)
    encoder_block = EncoderBlock(encoder_sa, encoder_ffd, dropout)
    encoder_blocks.append(encoder_block)
  decoder_blocks = []
  for _ in range(n_of_layers):
    decoder_sa1 = MultiHeadAttentionBlock(embdim, no_of_heads, dropout)
    decoder_ca = MultiHeadAttentionBlock(embdim, no_of_heads, dropout)
    decoder_ffd = FeedForward(embdim, dropout)
    decoder_block = DecoderBlock(decoder_sa1, decoder_ca, decoder_ffd, dropout)
    decoder_blocks.append(decoder_block)

  encoder = Encoder(nn.ModuleList(encoder_blocks))
  decoder = Decoder(nn.ModuleList(nn.ModuleList(decoder_block)))

  projection_layer = ProjectionLayer(embdim, tgt_vocab_size)

  transformer = Transformer(encoder, decoder, src_embd, tgt_embd, src_pos, tgt_pos, projection_layer)



In [None]:
!pip install datasets



In [None]:
import pandas as pd

In [None]:
from datasets import load_dataset

# Load the dataset
ds = load_dataset("iamTangsang/Nepali-to-English-Translation-Dataset")

# Convert each split (train, test, validation) to Pandas DataFrame
df_train = ds['train'].to_pandas() if 'train' in ds else None
df_test = ds['test'].to_pandas() if 'test' in ds else None
df_valid = ds['validation'].to_pandas() if 'validation' in ds else None


In [None]:
df_train = pd.concat([df_train, df_valid], ignore_index=True)

In [None]:
df_train.columns = ['tgt', 'src']
df_test.columns = ['tgt', 'src']

In [None]:
df_train

Unnamed: 0,tgt,src
0,"""कुनै पनि अन्य सरकारी एजेन्सीले यो जानकारी प्र...","""No other government agency can use this infor..."
1,"एउटा गीत प्ले गर्नुहोस्, जुन तपाईं चाहानुहुन्छ।",Pick a song which you want.
2,"तर, तपाईं उदास हुँदा चिढिएर अझ बढी रिसाउनुहुन्...","But, when you get sad, if you are going to get..."
3,जिन्दगी भनेको कुनै चलचित्र होइन ।,"""Life is not a movie."
4,"त्यसपछि, म र उनीबीच केही दूरीको महसुस गरेँ।",Yet I felt there was a distance between me and...
...,...,...
713558,अगुवाहरूले मानिसहरूलाई आज्ञा दिए। तिनीहरूले भन...,"and they commanded the people, saying, ""When y..."
713559,यी सबै कुराहरूबाट आज म फेरि झस्कें ।\n,"Today, I got rid of all these things.\n"
713560,उनीहरुको घरबाट समुद्रको दृश्यसमेत देख्न सकिन्छ ।,You can even see the sea from his house.
713561,यी परियोजनाको लागत भने खुलेको छैन ।,The project's cost is not being released.


In [None]:
# SOURCE TEXT => ENGLISH
# TARGET TEXT => NEPALI

In [None]:
from tokenizers.trainers import WordLevelTrainer
from tokenizers import Tokenizer
from tokenizers.pre_tokenizers import Whitespace
from tokenizers.models import BPE, WordLevel
import os
def create_or_load_tokenizer(df):
  if os.path.exists("SourceTokenizer.json"):
    srctok = Tokenizer.from_file("SourceTokenizer.json")
    tgttok = Tokenizer.from_file("TargetTokenizer.json")
    return srctok, tgttok
  else:
    tokenizer = Tokenizer(WordLevel(unk_token="[UNK]"))
    trainer = WordLevelTrainer(special_tokens=["[UNK]",  "[PAD]", "[SOS]", "[EOS]"], vocab_size = 50000, min_frequency=8)
    tokenizer.pre_tokenizer = Whitespace()
    tokenizer.train_from_iterator(df_train['src'], trainer=trainer)
    tokenizer.save("SourceTokenizer.json")
    tokenizer.train_from_iterator(df_train['tgt'], trainer=trainer)
    tokenizer.save("TargetTokenizer.json")
    srctokenizer = Tokenizer.from_file("SourceTokenizer.json")
    targettokenizer = Tokenizer.from_file("TargetTokenizer.json")
    return srctokenizer, targettokenizer

In [None]:
src_tok, tgt_tok = create_or_load_tokenizer(df_train)

In [None]:
src_tok.get_vocab_size()

20687

In [None]:
tgt_tok.get_vocab_size()

36061

In [None]:
from torch.utils.data import Dataset
class BiDataSet(Dataset):

  def __init__(self, df, src_tok, tgt_tok,  block_size):
    self.df = df
    self.src_tok = src_tok
    self.tgt_tok = tgt_tok
    self.block_size = block_size

    self.sos_tok = torch.tensor([src_tok.token_to_id(["[SOS]"])], dtype = torch.int64)
    self.eos_tok = torch.tensor([src_tok.token_to_id(["[EOS]"])], dtype = torch.int64)
    self.pad_tok = torch.tensor([src_tok.token_to_id(["[PAD]"])], dtype = torch.int64)

  def __len__(self):
    return len(self.df)

  def __getitem__(self, index):
    df = self.df.iloc[index]
    src_text = df['src']
    tgt_text = df['tgt']

    enc_inp_tokens = self.src_tok(src_text).ids
    dec_inp_tokens = self.tgt_tok(tgt_text).ids

    enc_num_padding_tok = self.block_size - len(enc_inp_tokens)-2
    dec_num_padding_tok = self.block_size - len(dec_input_tokens)-1

    if enc_num_padding_tok < 0 or dec_num_padding_tok<0:
      raise ValueError("Sentence is too long")

    encoder_input = torch.cat([
        self.sos_tok,
        torch.tensor(enc_inp_tokens, dtype = torch.int64),
        self.eos_token,
        torch.tensor([self.pad_token]*enc_num_padding_tok, dtype=torch.int64)
    ])







In [None]:
df_train.iloc[1]['source']

'एउटा गीत प्ले गर्नुहोस्, जुन तपाईं चाहानुहुन्छ।'