<a href="https://colab.research.google.com/github/soniaeya/ENG_SPA_Transformer_Translator/blob/main/ENG_SPA_Transformer_Translator.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Assigment 2: Transformers for Translation üôä


Have you ever wondered how applications like Google Translate or language translation features in social media platforms work? Behind these impressive technologies are sophisticated machine learning models that can understand and translate text between different languages. One of the most powerful and groundbreaking models used for this purpose is the Transformer model.

In this assignment, you will step into the shoes of an AI researcher and engineer to create your own Transformer model for translating text from English to Spanish. This journey will not only enhance your understanding of machine learning and deep learning but also give you hands-on experience with state-of-the-art techniques in natural language processing.

Let's start by downloading important libraries

In [None]:
!pip install datasets
!pip install evaluate
!pip install transformers
!pip install bert_score
!pip install rouge_score

Collecting evaluate
  Downloading evaluate-0.4.6-py3-none-any.whl.metadata (9.5 kB)
Downloading evaluate-0.4.6-py3-none-any.whl (84 kB)
[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m84.1/84.1 kB[0m [31m9.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: evaluate
Successfully installed evaluate-0.4.6


For this assignment we are using the Opus Book dataset (read more about it [here](https://huggingface.co/datasets/Helsinki-NLP/opus_books) ). This dataset easily found in huggingface fits perfectly for our machine translation task.

In [None]:
from datasets import load_dataset

books = load_dataset("opus_books", "en-es")
books = books["train"].train_test_split(test_size=0.2)
train_dataset = books["train"]['translation']
test_size = int(len(books["test"])/2)
val_dataset = books["test"][:test_size]['translation']
test_dataset = books["test"][test_size:]['translation']

Just to have an idea let's have a quick peak on how our dataset looks like

In [None]:
train_dataset[0]

Since we don't want to take 8 hours training let's trim our dataset a bit (although this might lead to underperformance, feel free to use the complete dataset if you have the computing power).

SUGESTION: start with a small dataset to debug your code and increase it gradually (same applies with number of epochs, batch size, test set size)

In [None]:
train_dataset = train_dataset[:30000]

### Preprocessing


Preprocessing is an important part of NLP. This allows us to clean and standarize our data.

In [None]:
import string
import re
def preprocess_data(text):
  """ Method to clean text from noise and standarize text.
  Arguments
  ---------
  text : List of String
     Text to clean
  Returns
  -------
  text : String
      Cleaned and joined text
  """

  text = text.lower()

  text= re.sub(r'[^\w\s√Å√â√ç√ì√ö√°√©√≠√≥√∫√º√ú√±√ë]', '', text) #remove any punctuation or special characters without taking off accent letters like √≥,√≠,etc..
  text = re.sub(r"\s+", " ", text).strip() #to remove extra space (I THINK CAN REMOVE)

  return text


In [None]:
assert "para ti es una cuesti√≥n de tozudez dijo ana de repente al encontrar una palabra que definiera justamente los pensamientos y el sentir de vronsky un calificativo para aquella expresi√≥n de su rostro que tanto la irritaba"==preprocess_data("‚ÄìPara ti es una cuesti√≥n de tozudez ‚Äìdijo Ana de repente, al encontrar una palabra que definiera justamente los pensamientos y el sentir de Vronsky, un calificativo para aquella expresi√≥n de su rostro que tanto la irritaba‚Äì."), "Check errors in preprocessing"
print("Good Job!")

For an easier training structure, it is useful to format our training and validation sets. The following function should help with this.

In [None]:
def create_dataset(dataset,source_lang,target_lang):
  """ Method to create a dataset from a list of text.
  Arguments
  ---------
  text : List of String
     Text from dataset
  source_lang : String
     Source language
  target_lang : String
     Target language
  Returns
  -------
  new_dataset : Tuple of String
      Source and target text in format (source, target)
  """
  new_dataset=[]
  #TODO: iterate through dataset extract source and target dataset and preprocess them creating a new clean dataset with the correct format

  return new_dataset

training_set=create_dataset(train_dataset,'en','es')
validation_set=create_dataset(val_dataset,'en','es')
test_set=create_dataset(test_dataset,'en','es')

### Model Creation


Now that our data is ready, we can get started. Let's start by creating our Sequence to Sequence Transformer model.

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F


class TransformerModel(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward,dropout):
        super(TransformerModel, self).__init__()
        self.src_embedding = # Embedding layer for source language
        self.tgt_embedding = # Embedding layer for target language
        self.transformer = # Transformer model with it's attributes (see pytorch documentation)
        self.fc = # Last linear layer



    def positional_encoding(self, d_model, maxlen = 5000):
        """Method to create a positional encoding buffer.
        Arguments
        ---------
        d_model: int
            Embedding size
        maxlen: int
            Maximum sequence length
        Returns
        -------
        PE: Tensor
            Positional encoding buffer
        """
        pos = torch.arange(0, maxlen).unsqueeze(1)
        denominator = 10000 ** (torch.arange(0, d_model, 2) / d_model)

        #TODO
        PE = torch.zeros((maxlen, d_model))
        PE[:, 0::2] = # Calculate sin for even positions
        PE[:, 1::2] = # Calculate cosine odd positions

        PE = PE.unsqueeze(0)  # Add batch dimension

        return PE


    def forward(self, src, tgt, src_mask=None, tgt_mask=None, src_key_padding_mask=None, tgt_key_padding_mask=None):
        """Method to forward a batch of data through the model."""
        #TODO
        #pass source and target throught embedding layer
        #get src_emb and tgt_emb by adding positional encoder
        #add positional encoding to src and tgt
        #pass src, tgt and all masks throught transformer
        #pass output throught linear layer
        return output

    def encode(self, src, src_mask):
        """Method to encode a batch of data through the transformer model."""
        #TODO
        #pass src throught embedding layer
        #create positional encoding
        #add src_emb and src_pe
        #pass src_emb and src_mask throught transformer encoder using self.transformer.encoder

        return self.transformer.encoder(src_emb, src_mask)


    def decode(self, tgt, memory,tgt_mask):
        """Method to decode a batch of data through the transformer model."""
        #TODO
        #pass tgt throught embedding layer
        #create positional encoding
        #add tgt_emb and tgt_pe
        #pass tgt_emb and tgt_mask throught transformer encoder using self.transformer.decoder

        return self.transformer.decoder(tgt_emb, memory,tgt_mask)


Now that our model is ready, we still need some methods that will come in handy during training.

In [None]:
import torch
def create_padding_mask(seq):
  """ Method to create a padding mask based on given sequence.
  Arguments
  ---------
  seq : Tensor
     Sequence to create padding mask for
  Returns
  -------
  mask : Tensor
      Padding mask
  """
  return #boolean matrix that is True when datapoint is equal to 0

def create_triu_mask(sz):
  """ Method to create a triangular mask based on given sequence. This is used for the tgt mask in the Transformer model to avoid looking ahead.
  Arguments
  ---------
  seq : Tensor
     Sequence to create triangular mask for
  Returns
  -------
  mask : Tensor
      Triangular mask
  """
  # TODO
  #create triangular mask of size sz x sz
  #tranpose mask and cast to float type
  #in pytorch the masked objects expect -inf instead of zero. Replace all 0 for -inf and all 1's for 0's
  return mask

def tokenize_batch(source, targets,tokenizer):
  """ Method to tokenize a batch of data given a tokenizer.
  Arguments
  ---------
  source : List of String
     Source text
  targets : List of String
     Target text
  tokenizer : Tokenizer
     Tokenizer to use for tokenization
  Returns
  -------
  tokenized_source : Tensor
      Tokenized source text
  """

  tokenized_source = tokenizer(source, padding='max_length', max_length=128, return_tensors='pt', truncation=True)

  tokenized_targets = tokenizer(targets,  padding='max_length', max_length=128, return_tensors='pt',truncation=True)

  return tokenized_source['input_ids'], tokenized_targets['input_ids']


In [None]:
a=create_triu_mask(5)
b= torch.tensor([[0., float('-inf'), float('-inf'), float('-inf'), float('-inf')],
        [0., 0., float('-inf'), float('-inf'), float('-inf')],
        [0., 0., 0., float('-inf'), float('-inf')],
        [0., 0., 0., 0., float('-inf')],
        [0., 0., 0., 0., 0.]])
assert torch.equal(a,b), "Issues with create_triu_mask"
print("Good Job!")

### Training


In [None]:
from transformers import AutoTokenizer

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

tokenizer=AutoTokenizer.from_pretrained('FacebookAI/xlm-roberta-base')
PAD_IDX = tokenizer.pad_token_id #for padding
BOS_IDX = tokenizer.bos_token_id #for beggining of sentence
EOS_IDX = tokenizer.eos_token_id #for end of sentence

model = TransformerModel(tokenizer.vocab_size, tokenizer.vocab_size,512, 8, 3, 3, 256,0.1).to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)
loss_function = torch.nn.CrossEntropyLoss(ignore_index=PAD_IDX)

train_loader = torch.utils.data.DataLoader(training_set, batch_size=8, shuffle=True) #change batch size based on your reasources
validation_loader = torch.utils.data.DataLoader(validation_set, batch_size=8, shuffle=False) #change batch size based on your reasources

In [None]:
from transformers import AutoTokenizer

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

tokenizer=AutoTokenizer.from_pretrained('FacebookAI/xlm-roberta-base')

In [None]:
from torch.utils.data import DataLoader
from tqdm import tqdm

def train_epoch(model,train_loader,tokenizer):
    model.train()
    losses = 0

    for src, tgt in tqdm(train_loader):
        src, tgt = tokenize_batch(src, tgt, tokenizer)
        src = src.to(device)
        tgt = tgt.to(device)

        tgt_input = tgt[:,:-1]

        #TODO
        src_mask = #creat src_mask this is basically a matrix of 0s of shape Sequence x Sequence (see https://pytorch.org/docs/stable/generated/torch.nn.Transformer.html)
        tgt_mask = #create triangular mask for target

        src_padding_mask = #create padding mask for src
        tgt_padding_mask = #create padding mask for tgt

        logits = #pass it through model

        optimizer.zero_grad()

        tgt_out = tgt[:,1:]
        loss = loss_function(logits.reshape(-1, logits.shape[-1]), tgt_out.reshape(-1))
        loss.backward()

        optimizer.step()
        losses += loss.item()

    return losses / len(list(train_loader))


def evaluate(model,val_dataloader ):
    model.eval()
    losses = 0
    with torch.no_grad():
      for src, tgt in tqdm(val_dataloader):
          src, tgt = tokenize_batch(src, tgt, tokenizer)
          src = src.to(device)
          tgt = tgt.to(device)

          tgt_input = tgt[:,:-1]

          #do the same as in Train

          tgt_out = tgt[:,1:]
          loss = loss_function(logits.reshape(-1, logits.shape[-1]), tgt_out.reshape(-1))
          losses += loss.item()

    return losses / len(list(val_dataloader))

Now we can start training! Keep in mind this code is very demanding computationally, it has been set to 10 epochs (which can take up to 4 hours) but feel free to change this value depending on your resources, in this case the more epochs you can execute the better üòÄ

In [None]:
def train(model, epochs, train_loader,validation_loader ):
  for epoch in range(1, epochs+1):
        train_loss = train_epoch(model,train_loader, tokenizer)
        val_loss = evaluate(model,validation_loader)
        print((f"Epoch: {epoch}, Train loss: {train_loss:.3f}, Val loss: {val_loss:.3f}"))

train(model, 10, train_loader,validation_loader)

### Testing


We will use three different evaluation metrics to see our model's test performance: [Bert Score](https://huggingface.co/spaces/evaluate-metric/bertscore), [Meteor](https://huggingface.co/spaces/evaluate-metric/meteor) and [Rouge](https://huggingface.co/spaces/evaluate-metric/rouge). Please access their hugging face documentation to know how to implement them.

In [None]:
from evaluate import load
bertscore = load("bertscore")
rouge = load('rouge')
meteor = load('meteor')

Implement greedy decode seen in class

In [None]:
# function to generate output sequence using greedy algorithm
def greedy_decode(model, src, src_mask, max_len, start_symbol):
    src = src.to(device)
    src_mask = src_mask.to(device)
    memory = #pass src through encoder
    ys = torch.ones(1, 1).fill_(start_symbol).type(torch.long).to(device)
    for i in range(max_len-1):
        memory = memory.to(device)
        tgt_mask = #create triangular mask
        out = #pass through decoder

        prob = model.fc(out[:, -1])

        _, next_word = #get next word based on probabilities (remember to use .item())

        ys = torch.cat([ys,
                        torch.ones(1, 1).type_as(src.data).fill_(next_word)], dim=1)
        if next_word == EOS_IDX:
            break
    return ys

def translate(model: torch.nn.Module, src_sentence: str, tokenizer):
    model.eval()
    src, _ = tokenize_batch(src_sentence, "", tokenizer)
    src = src.to(device)
    num_tokens = src.shape[1]
    src_mask = (torch.zeros(num_tokens, num_tokens)).type(torch.float).to(device)
    tgt_tokens = greedy_decode(
        model,  src, src_mask, max_len= int(num_tokens * 1.2 ), start_symbol=BOS_IDX).flatten()
    return tokenizer.decode(tgt_tokens, skip_special_tokens=True)

In [None]:
print(translate(model, "hello how are you today",tokenizer))

In [None]:
test_set = test_set[:1000]

In [None]:
import numpy as np
# you can also trim test_loader
def test(test_loader, model, tokenizer, device, max_length=200):
  """Method to test our model using best score and meteor metric.
  Arguments
  ---------
  test_loader: Dataloader
    Dataloader that holds test set
  model: nn.Module
    trained Machine Translation model
  tokenizer:
  """
  precision = 0
  recall = 0
  f1 = 0
  meteor_metric = 0
  for src, target in test_loader:
    #Use translade method to evaluate our model
    results_bert = #get results bert
    results_meteor = #get results meteo
    precision += #get precision of results_bert
    recall += #get recall of results_bert
    f1 += #get f1 of results_bert
    meteor_metric+= #get meteor metric of results_meteor
  return precision / len(test_loader), recall / len(test_loader), f1 / len(test_loader), meteor_metric / len(test_loader)

test(test_set, model, tokenizer, device)

## Let's experiment!

Pick 2 experiments out these 3:
1. Play with a hyperparameter of your choice to measure its effect on the translation.

2. Train an inverse translator from spanish to english and compare the performance.

3. Compare the results of your model with the performance of using the T5 pretrained model. This [tutorial](https://huggingface.co/docs/transformers/en/tasks/translation) on using T5 for machine translation might come in handy.



