<a href="https://colab.research.google.com/github/joshuasyoung/Intro_GenAi/blob/main/GenAi_Foundations_Ch3_Translation_Seq2Seq_minimal_transformer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Packt - Generative AI Foundations in Python - Ch3 - Implementing the original Transfomer (pg 59-68)

# English to French translation

# Data loading & preparation

In [1]:
import pandas as pd
import numpy as np

# Load demo data:
data = pd.read_csv('https://raw.githubusercontent.com/PacktPublishing/Generative-AI-Foundations-in-Python/main/Chapter3/data.csv')

# Separate English & French lexicons:
EN_TEXT = data.en.to_numpy().tolist()
FR_TEXT = data.fr.to_numpy().tolist()

# Arbitrarily cap at 100 characters for demonstration to avoid long training times:
def demo_limit(vocab, limit=100):
  return [i[:limit] for i in vocab]

EN_TEXT = demo_limit(EN_TEXT)
FR_TEXT = demo_limit(FR_TEXT)

# Establish max length of a given sequence:
MAX_LEN = 100

In [2]:
# Print the first 5 rows of the DataFrame to inspect the data
print(data.head())

   Unnamed: 0.2  Unnamed: 0.1  Unnamed: 0  \
0         14133         58214    15513316   
1         12347         67069     9273079   
2          7923         27643    17880115   
3          3874        105675    18022548   
4         17711        105942     9083757   

                                                  en  \
0  • United States Medical Staff Honoured for Ext...   
1  • Reduce efforts to develop new regulatory gui...   
2  The nine First Nations communities participati...   
3      Machinery operator for rough mill Production:   
4  • June 3, 2000 - Federal-Provincial-Territoria...   

                                                  fr  
0  • Hommage rendu au personnel médical des États...  
1  • réduire les efforts en vue d'élaborer à cour...  
2  conseil tribal ont pu acheter des terres et de...  
3  Cadres Directeur général, directeur d’usine, d...  
4  • Le 3 juin 2000 - Séance de travail fédérale-...  


# Tokenization
- converting text data into numerical data that can be understood by model

In [3]:
from tokenizers import Tokenizer
from tokenizers.models import WordPiece
from tokenizers.trainers import WordPieceTrainer
from tokenizers.pre_tokenizers import Whitespace

def train_tokenizer(texts):
  tokenizer = Tokenizer(WordPiece(unk_token="[UNK]"))
  tokenizer.pre_tokenizer = Whitespace()
  trainer = WordPieceTrainer(
      vocab_size=5000,
      special_tokens=["[UNK]", "[CLS]", "[SEP]", "[PAD]", "[MASK]", "<sos>", "<eos>"],
  )
  tokenizer.train_from_iterator(texts, trainer)
  return tokenizer

en_tokenizer = train_tokenizer(EN_TEXT)
fr_tokenizer = train_tokenizer(FR_TEXT)

# Data tensorization
- converts numericized text data to tensor format, required for data prep for training.

In [4]:
import torch
from torch.nn.utils.rnn import pad_sequence

def tensorize_data(text_data, tokenizer):
  numericalized_data = [
      torch.tensor(tokenizer.encode(text) .ids) for text in text_data
    ]
  # Corrected indentation for the next line
  padded_data = pad_sequence(numericalized_data, batch_first=True)
  # Corrected indentation for the next line
  return padded_data

src_tensor = tensorize_data(EN_TEXT, en_tokenizer)
tgt_tensor = tensorize_data(FR_TEXT, fr_tokenizer)

# Dataset creation
- custom dataset created to handle data.  this class is essential for loading data in batches during training

In [5]:
from torch.utils.data import Dataset, dataloader

class TextDataset(Dataset):
    def __init__(self, src_data, tgt_data):
        self.src_data = src_data
        self.tgt_data = tgt_data

    def __len__(self):
        return len(self.src_data)

    def __getitem__(self, idx):
        return self.src_data[idx], self.tgt_data[idx]

dataset = TextDataset(src_tensor, tgt_tensor)

# Embeddings Layer
- maps each token to continuous vector space.  crucial for model to understand and process data

In [6]:
import torch.nn as nn

class Embeddings(nn.Module):
    def __init__(self, d_model, vocab_size):
      super(Embeddings, self).__init__()
      self.embed = nn.Embedding(vocab_size, d_model)

    def forward(self, x):
      return self.embed(x)

# Positional Encoding
- adds position information to embeddings, which helps model understand order of tokens in sequence

In [7]:
import math

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1,
                 max_len=MAX_LEN
    ):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)
        pe = torch.zeros(max_len, d_model)
import math

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1,
                 max_len=MAX_LEN
    ):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0.0, max_len).unsqueeze(1)
        div_term = torch.exp(
            torch.arange(0.0, d_model, 2) * - \
             (math.log(10000.0) / d_model)
        )
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:, :x.size(1)]
        return self.dropout(x)
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:, :x.size(1)]
        return self.dropout(x)

# Multi-head self-attention (MHSA)
- allows model to focus on different parts of input sequence when producing output sequence

In [8]:
# COLAB CORRECTION:
# Delete the MultiHeadSelfAttention class entirely.
# The functionality of self-attention and cross-attention
# will be handled directly within EncoderLayer and DecoderLayer
# using torch.nn.MultiheadAttention.


# ORIGINAL CODE FROM BOOK:
# class MultiHeadSelfAttention(nn.Module):
#     def __init__(self, d_model, nheads):
#         super(MultiHeadSelfAttention, self).__init__()
#         self.attention = nn.MultiheadAttention(d_model, nhead)

#     def forward(self, x):
#         return self.attention(x, x, x)

# FFN - Fully Connected NN (FCNN)
- operates independently on each position

In [9]:
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super(FeedForward, self).__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.dropout = nn.Dropout(0.1)
        self.linear2 = nn.Linear(d_ff, d_model)

    def forward(self, x):
        return self.linear2(self.dropout(torch.relu(self.linear1(x))))

# Encoder Layer
- consists of MHSA mechanism and simple FFNN.  Structure repeated in stack to form complete encoder

In [10]:
# Modify EncoderLayer to use nn.MultiheadAttention directly
class EncoderLayer(nn.Module):
    def __init__(self, d_model, nhead, d_ff):
        super(EncoderLayer, self).__init__()
        # Use nn.MultiheadAttention directly for self-attention
        self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=0.1, batch_first=False)
        self.feed_forward = FeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(0.1)

    def forward(self, x):
        # Ensure input is in shape (sequence_length, batch_size, d_model) for MultiheadAttention
        x = x.transpose(0, 1)
        # Self-attention: Query, Key, Value are all the same input tensor
        attn_output, _ = self.self_attn(x, x, x)
        x = x + self.dropout(attn_output)
        x = self.norm1(x)
        ff_output = self.feed_forward(x)
        x = x + self.dropout(ff_output)
        # Return to shape (batch_size, sequence_length, d_model)
        return self.norm2(x).transpose(0, 1)

# Encoder
- stack of identical layers with MHSA mechanism and an FFN

In [11]:
class Encoder(nn.Module):
  def __init__(self, d_model, nhead, d_ff, num_layers, vocab_size):
    super(Encoder, self).__init__()
    self.embedding = Embeddings(d_model, vocab_size)
    self.pos_encoding = PositionalEncoding(d_model)
    # Corrected: Assign ModuleList directly to the instance
    self.encoder_layers = nn.ModuleList([
        EncoderLayer(d_model, nhead, d_ff) for _ in range(num_layers)
    ])
    # Removed: FeedForward is part of EncoderLayer, not Encoder
    # self.feed_forward = FeedForward(d_model, d_ff)

  def forward(self, x):
    x = self.embedding(x)
    x = self.pos_encoding(x)
    # Corrected: Iterate through the assigned attribute
    for layer in self.encoder_layers:
      x = layer(x)
    return x

# Decoder Layer
- similarly, decoders consists of two MHA mechanisms - one self-attention and one cross-attention - followed by an FFN

In [12]:
# Modify DecoderLayer to use nn.MultiheadAttention directly
class DecoderLayer(nn.Module):
    def __init__(self, d_model, nhead, d_ff):
        super(DecoderLayer, self).__init__()
        # Use nn.MultiheadAttention directly for self-attention
        self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=0.1, batch_first=False)
        # Use nn.MultiheadAttention directly for cross-attention
        self.cross_attn = nn.MultiheadAttention(d_model, nhead, dropout=0.1, batch_first=False)
        self.feed_forward = FeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(0.1)

    def forward(self, x, memory):
        # Ensure inputs are in shape (sequence_length, batch_size, d_model)
        x = x.transpose(0, 1)
        memory = memory.transpose(0, 1)
        # Self-attention in decoder: Query, Key, Value are from the decoder's input
        attn_output, _ = self.self_attn(x, x, x)
        x = x + self.dropout(attn_output)
        x = self.norm1(x)
        # Cross-attention: Query is from decoder input, Key and Value are from encoder output (memory)
        attn_output, _ = self.cross_attn(x, memory, memory)
        x = x + self.dropout(attn_output)
        x = self.norm2(x)
        ff_output = self.feed_forward(x)
        x = x + self.dropout(ff_output)
        # Return to shape (batch_size, sequence_length, d_model)
        return self.norm3(x).transpose(0, 1)

# Decoder
- also a stack of identical layers, each containing two MHA mechanisms and an FFN
- This stacking layer pattern continues to build transformer architecture.  Each block has a specific role in processing input data and generating output translations

In [13]:
class Decoder(nn.Module):
  def __init__(self, d_model, nhead, d_ff, num_layers, vocab_size):
    super(Decoder, self).__init__()
    self.embedding = Embeddings(d_model, vocab_size)
    self.pos_encoding = PositionalEncoding(d_model)
    self.decoder_layers = nn.ModuleList([
        DecoderLayer(d_model, nhead, d_ff) for _ in range(num_layers)
    ])
    self.linear = nn.Linear(d_model, vocab_size)
    self.softmax = nn.Softmax(dim=2)

  def forward(self, x, memory):
    x = self.embedding(x)
    x = self.pos_encoding(x)
    for layer in self.decoder_layers:
      x = layer(x, memory)
    x = self.linear(x)
    x = self.softmax(x)
    return x

# Complete Transformer
- encapsulates previously defined encoder and decoder structures
- this is the primary class that will be used for training and translation tasks

In [14]:
# Complete Transformer
# - encapsulates previously defined encoder and decoder structures
# - this is the primary class that will be used for training and translation tasks
class Transformer(nn.Module):
  def __init__(
      self,
      d_model,
      nhead,
      d_ff,
      num_encoder_layers,
      num_decoder_layers,
      src_vocab_size,
      tgt_vocab_size,
  ):
    super(Transformer, self).__init__()
    self.encoder = Encoder(d_model, nhead, d_ff, \
                           num_encoder_layers, src_vocab_size)
    self.decoder = Decoder(d_model, nhead, d_ff, \
                           num_decoder_layers, tgt_vocab_size)

  # Define the forward method outside of __init__
  def forward(self, src, tgt):
    memory = self.encoder(src)
    output = self.decoder(tgt, memory)
    return output

# Training function
- iterates through the epochs and batches, calculates the loss and updates model parameters

In [15]:
def train(model, loss_fn, optimizer, NUM_EPOCHS = 10):
  for epoch in range(NUM_EPOCHS):
    model.train()
    total_loss = 0
    for batch in batch_iterator:
          src, tgt = batch
          optimizer.zero_grad()
          output = model(src, tgt)
          loss = loss_fn(output.view(-1, tgt_vocab_size), tgt.view(-1))
          loss.backward()
          optimizer.step()
          total_loss += loss.item()

    print(f'Epoch {epoch} Loss {total_loss / len(batch_iterator)}')

# Translation function
- uses the trained model to translate source text into the target language.  
- generates a translation token by token and stops when an end-of-sequence (EOS) token is generated or when max length is reached

In [16]:
def translate(model, src_text, src_tokenizer, tgt_tokenizer, max_len=MAX_LEN):
  model.eval()
  src_tokens = src_tokenizer.encode(src_text).ids
  src_tensor = torch.LongTensor(src_tokens).unsqueeze(0)

  tgt_sos_idx = tgt_tokenizer.token_to_id("<sos>")
  tgt_eos_idx = tgt_tokenizer.token_to_id("<eos>")

  tgt_tensor = torch.LongTensor([tgt_sos_idx]).unsqueeze(0)

  for i in range(max_len):
    with torch.no_grad():
      output = model(src_tensor, tgt_tensor)

    predicted_token_idx = output.argmax(dim=2)[0,-1].item()
def translate(model, src_text, src_tokenizer, tgt_tokenizer, max_len=MAX_LEN):
  model.eval()
  src_tokens = src_tokenizer.encode(src_text).ids
  src_tensor = torch.LongTensor(src_tokens).unsqueeze(0)

  tgt_sos_idx = tgt_tokenizer.token_to_id("<sos>")
  tgt_eos_idx = tgt_tokenizer.token_to_id("<eos>")

  tgt_tensor = torch.LongTensor([tgt_sos_idx]).unsqueeze(0)

  for i in range(max_len):
    with torch.no_grad():
      output = model(src_tensor, tgt_tensor)

    predicted_token_idx = output.argmax(dim=2)[0,-1].item()
    if predicted_token_idx == tgt_eos_idx:
      break
    tgt_tensor = torch.cat([tgt_tensor, torch.LongTensor([predicted_token_idx]).unsqueeze(0)], dim=1)

  translated_token_ids = tgt_tensor[0, 1:].tolist()
  translated_text = tgt_tokenizer.decode(translated_token_ids)

  return translated_text
  translated_text = tgt_tokenizer.decode(translated_token_ids)

  return translated_text

# Main execution
- hyperparameters defined, tokenizer and model are instantiated, and training / translation processes initiated

In [None]:
import torch.optim as optim
from torch.utils.data import DataLoader # Make sure DataLoader is imported

if __name__ == "__main__":
  # Hyperparameters
  num_encoder_layers = 2
  num_decoder_layers = 2
  dropout_rate = 0.1
  embedding_dim = 512
  nhead = 8
  ffn_hid_dim = 2048
  batch_size = 31
  learning_rate = 0.001

  en_tokenizer = train_tokenizer(EN_TEXT)
  fr_tokenizer = train_tokenizer(FR_TEXT)

  src_vocab_size = len(en_tokenizer.get_vocab())
  tgt_vocab_size = len(fr_tokenizer.get_vocab())

  src_tensor = tensorize_data(EN_TEXT, en_tokenizer)
  tgt_tensor = tensorize_data(FR_TEXT, fr_tokenizer)

  dataset = TextDataset(src_tensor, tgt_tensor)

  model = Transformer(
      embedding_dim,
      nhead,
      ffn_hid_dim,
      num_encoder_layers,
      num_decoder_layers,
      src_vocab_size,
      tgt_vocab_size,
  )
  loss_fn = nn.CrossEntropyLoss()
  optimizer = optim.Adam(model.parameters(), lr=learning_rate)

  batch_iterator = DataLoader(
      dataset, batch_size=batch_size,
      shuffle=True, drop_last = True)

  train(model, loss_fn, optimizer, NUM_EPOCHS= 10)

  src_text = "hello, how are you?"
  translated_text = translate(
      model, src_text, en_tokenizer, fr_tokenizer)
  print(f"English: {src_text}")
  print(f"French: {translated_text}")

Epoch 0 Loss 7.827403373188442
