 # AI FOR MACHINE TRANSLATION
The purpose of this project iis to create an AI that translates languages . The project will be built on pytorch : the idea of neural networks and embeddings and will also include Spacy package for language tokenization and building vocabulary .

The project is a Seq2Seq model with an encoder and  decoder which I will bulid from scratch . It is my first machine translation 
project and I hope it gives other AI enthusiasts a guide or inspiration.

In [None]:
# Import the required libraries 
import torch
from torch import nn,Tensor
import numpy as np
import pytorch_lightning as pl
import time 
import spacy
import seaborn as sns
import matplotlib.pyplot as plt
from torch.utils.data import Dataloader
import torchtext
from torchtext.data.metrics import bleu_score
from torchtext.data import Field,BucketIterator
from torchtext.datasets import IWSLT2016
import string
import re

In [None]:
# Downloading the datasets for german and english ,using spacy
!python-m spacy download en_core_web_md>logs.txt
!python -m spacy download de_core_news_md>logs.txt

In [None]:
# Load our dowloaded data
spacy_en = spacy.load('en_core_web_md')
spacy_de = spacy.load('de_core_new_md')


In [None]:
# Tokenizing our  text data
sos_tok = '<sos>'
eos_tok ='<eos>'

In [None]:
# Preprocessing our text data
def preprocessing(text):
    text = text.lower().strip()
    text = re.sub(f'[{string.punctuation}\n]','',text)
    return text
def tokenize_german(text):
    text  = preprocessing(text)
    return [tok.text for tok in spacy_de.tokenizer(text)]
def tokenize_eng(text):
    text = preprocessing(text)
    return [tok.text for tok in spacy_eng.tokenizer(text)]

In [None]:
# Creating Training Data,Validation_Data and Testing Data ,using torch.DataSets
# First we accesss the Multi30K dataset
train_data,valid_data,test_data = torchtext.datasets.Multi30k(root='data',split =('train','valid','split')
                                                             ,language_pair = ('de','eng'))

# Next we convert it into a real datasets ,because at first it is just text data 
# Dataset Class
class TextDataSet(torch.utils.Datasets):
    """
    * Parameters:
      raw_data - raw_text data from our multi30k datasets]
      trg - English tokens
      src - German tokens
    """
    def __init__(self,raw_data):
        # I will convert raw_data into a list as a container to store the data
        self.datasets = list(raw_data)
    def __len___(self):
        return len(self.data)
    def __get__item__(self,idx) :
        # This method uses indexing to access data 
        src,trg = self.datasets[idx]
        src = [sos_tok] + tokenize_german(src)+[eos_tok]
        trg = [sos_tok] + tokenize_english(trg) +[eos_tok]
        # src are german language tokens ,trg are english language tokens
        return src,trg
train_datasets = TextDataSet(train_data)
test_datasets  = TextDatset(test_data)
valid_datasets = TextDataset(valid_data)

In [None]:
# Next I will use pytorch lightning as a DataLoader
class TextDataModule(pl.LightningDataModule):
    def __init__(self,train_data,valid_data,test_data,batch_size=128):
        self.train_data = train
        self.valid_data = valid
        self.test_data = test 
        self.batch_size = 128
    def train_data_loader(self):
        return DataLoader(self.train,self.batch_size)
    def test_data_loader(self):
        return DataLoader(self.test,self.batch_size)
    def valid_data_loader(self) :
        return DataLoader(self.valid,self.batch_size)
    return DataLoader(train_datasets,valid_datasets,test_datasets)
batch_size = 128
dm = TextDataModule(train_dataset,valid_datasets,test_datasets,batch_size = batch_size)

# CORPUS BUILDING AND VOCABULARY BUILDING 

In [None]:
# I will create a corpus on trg and src i.e English language and German _Language
src_corpus = []
trg_corpus = []
for src,trg in train_datasets:
    src_corpus.append(src)
    trg_corpus.append(trg)

# Creating a Vocabulary for src and eng language and including unknown tokens
src_vocab = torchtext.vocab_build_vocab_from_iterator(src_corpus)
trg_vocab = torchtext.vacab_build_vocab_from_iterator(trg_corpus)
# For vocabulary with a frequency less than 1 
data_src.build_vocab(train_data,min_freq=2)
data_trg.build_vocab(train_data,min_freq =2)
itos_src = data_src.vocab.itos
itos_trg = data.trg.vocab.itos


# A function that gets the set vocabulary :::
def get(itos,i):
    try :
        return itos[i]
    except :
        ...
        return '<unk>'
    
def decode_src(x:Tensor):
    return ''.join([get(itos_src,i) for i in x])
def decode_trg(x:Tensor):
    return ''.join([get(itos_trg,i)]for i in x)
# I will now use the Bucket Split iterator to iterate through our vocabulary 
batch_size = 128
train_data,valid_data,test_data = BucketIterator.splits((train_data,valid_data,test_data),batch_size=batch_size)

# ENCODER AND DECODER USING NN.MODULE : RNN + EMBEDDING + LINEAR MODEL

In [None]:
class Encoder(nn.Module):
    def __init__(self,input_dim,emb_dim,hid_dim,n_layers,dropout=0.4):
        """Args :
        Parameters
        input_dim -vocabulary size
        emb_dim  - word_embedding_size
        n_layers - number of hidden layers for the RNN
        hid_d - Hidden state sizes 
        dropout - This is the dropout ratio to address overfitting
        Returns :
        
        """
        super().__init__()
        self.embedding = nn.Embedding(input_dim,emb_dim)
        self.rnn = nn.LSTM(emb_dim,hidden_dim.n_layers,batch_first =True)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self,src:Tensor):
        embedded = sself.embedding(src)
        embedded = self.dropout(embedded)
        out,(h,c) = self.rnn(embedded)
        return h,c

class Decoder(nn.Module):
    def __init__(self,output_dim,emb_dim,hidden_dim,n_layers,dropout = 0.4):
        super().__init__()
        self.embedding  = nn.Embedding(output_dim,emb_dim)
        self.rnn = nn.LSTM(emb_dim,hidden_dim,n_layers,batch_first =True)
        self.fc_out = nn.Linear(hidden_dim,output_dim)
        self.dropout = nn.Dropout(dropout)
    def forward(self,trg:Tensor):
        trg =trg.unsqueeze()
        embedded = self.embedded(trg)
        embedded = self.dropout(trg)
        out,(h,c) = self.rnn(embedded,(encoder_h,encoder_c))
        out = self.fc_out(out.squeeze())
        return out,h,c
        

# Seq 2  Seq Model

In [1]:
class Seq2Seq(pl.LightningModule):
    def __init__(self,encoder,decoder):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
  
    def configure_optimizers(self):
        return torch.optim.Adam(model.parameters(), lr=0.001)
    
    def forward(self, src, trg, teacher_force_ratio = 0.5):
        
        h, c = self.encoder(src)
        
        decoder_input = trg[:, 0] # just get the <sos> token
        
        # [src len, batch size, vocab size]
        outputs = torch.zeros(trg.size(1), trg.size(0), self.decoder.fc_out.out_features)
        
        for t in range(1, trg.size(1)):
            out, h, c = self.decoder(decoder_input, h, c)
            
            outputs[t] = out
            
            top1 = out.argmax(1)
            
            use_teacher_force = torch.rand() < teacher_force_ratio
            
            decoder_input = trg[:, t] if use_teacher_force else top1
        
        return outputs
    
    def prediction(self, src, max_len=50):
        h, c = self.encoder(src)
        sos_id = data_trg.vocab.stoi[sos_tok]
        decoder_input = torch.tensor([[sos_id]])
        
    def training_step(self, batch, batch_idx):
        src, trg = batch
        outputs = self.forward(src, trg)
        
        output = output[1:].view(-1, output.size(-1))
        trg = trg[1:].view(-1)
        loss = criterion(output, trg)
        self.log('train_loss', loss)
        return loss
    
    def validation_step(self, batch, batch_idx):
        src, trg = batch
        outputs = self.forward(src, trg)
        
        output = output[1:].view(-1, output.size(-1))
        trg = trg[1:].view(-1)
        loss = criterion(output, trg)
        self.log('train_loss', loss)
        return loss
        
    def test_step(self, batch, batch_idx):
        src, trg = batch
        outputs = self.forward(src, trg)
    
        output = output[1:].view(-1, output.size(-1))
        trg = trg[1:].view(-1)
        loss = criterion(output, trg)
        self.log('train_loss', loss)
        return loss

NameError: name 'pl' is not defined

# MODEL TRAINING :::

In [None]:
# The input dim in our model is the number of vocabulary of thr german language ,the output dim is the number for English language
input_dim = len(data_src.vocab)
output_dim = len(data_trg.vocab)
enc_emb_dim = 500
dec_emb_dim = 500
hid_dim = 512
n_layers = 3
enc_dropout = 0.5
dec_dropout = 0.5

encoder = Encoder(input_dim, enc_emb_dim, hid_dim, n_layers, enc_dropout)
decoder = Decoder(output_dim, dec_emb_dim, hid_dim, n_layers, dec_dropout)

model = Seq2Seq(encoder, decoder)
model

In [None]:
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

trg_pad_idx = data_trg.vocab.stoi[data_trg.pad_token]

criterion = nn.CrossEntropyLoss(ignore_index=trg_pad_idx)

In [None]:
def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

In [None]:
n_epochs = 30

best_valid_loss = float('inf')

list_train_loss = []
list_valid_loss = []

for epoch in range(n_epochs):
    
    start_time = time.time()
    
    train_loss = train_epoch(model, train_iterator, optimizer, criterion)
    valid_loss = eval_epoch(model, valid_iterator, criterion)
    
    list_train_loss.append(train_loss)
    list_valid_loss.append(valid_loss)
    
    end_time = time.time()
    
    epoch_mins, epoch_secs = epoch_time(start_time, end_time)
    
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), 'nmt-model.pt')
    
    print(f'Epoch: {epoch+1:02} | Time: {epoch_mins}m {epoch_secs}s')
    print(f'\tTrain Loss: {train_loss:.3f} | Train PPL: {math.exp(train_loss):7.3f}')
    print(f'\t Val. Loss: {valid_loss:.3f} |  Val. PPL: {math.exp(valid_loss):7.3f}')

# MACHINE TRANSLATOR ::

In [None]:
def translate_sentence(model, sentence, german, english, device, max_length=50):

    # Load german tokenizer
    spacy_ger = spacy.load("de")

    # Create tokens using spacy and everything in lower case (which is what our vocab is)
    if type(sentence) == str:
        tokens = [token.text.lower() for token in spacy_ger(sentence)]
    else:
        tokens = [token.lower() for token in sentence]
    # I will add the sos token at the start and eos token  in the  end respectively
    tokens.insert(0, german.sos_token)
    tokens.append(german.eos_token)

    # Go through each german token and convert to an index
    text_to_indices = [german.vocab.stoi[token] for token in tokens]

    # Convert to Tensor
    sentence_tensor = torch.LongTensor(text_to_indices).unsqueeze(1).to(device)

    # Build encoder hidden, cell state
    with torch.no_grad():
        hidden, cell = model.encoder(sentence_tensor)

    outputs = [english.vocab.stoi[sos_token]]

    for _ in range(max_length):
        previous_word = torch.LongTensor([outputs[-1]]).to(device)

        with torch.no_grad():
            output, hidden, cell = model.decoder(previous_word, hidden, cell)
            best_guess = output.argmax(1).item()

        outputs.append(best_guess)

        # Model predicts it's the end of the sentence
        if output.argmax(1).item() == english.vocab.stoi[eos_token]:
            break

    translated_sentence = [english.vocab.itos[idx] for idx in outputs]

    # remove start token
    return translated_sentence[1:]