# Imports

In [1]:
!wget https://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip -O spa-eng.zip
!unzip -u spa-eng.zip
!pip install tokenizers
!pip install lightning

--2023-11-29 10:13:18--  https://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip
Resolving storage.googleapis.com (storage.googleapis.com)... 142.251.2.207, 2607:f8b0:4023:c0d::cf, 2607:f8b0:4023:c03::cf, ...
Connecting to storage.googleapis.com (storage.googleapis.com)|142.251.2.207|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2638744 (2.5M) [application/zip]
Saving to: ‘spa-eng.zip’


2023-11-29 10:13:18 (182 MB/s) - ‘spa-eng.zip’ saved [2638744/2638744]

Archive:  spa-eng.zip


In [2]:
import pandas as pd

from tokenizers import Tokenizer
from tokenizers.models import WordLevel
from tokenizers import normalizers
from tokenizers import pre_tokenizers
from tokenizers.trainers import WordLevelTrainer
from tokenizers.processors import TemplateProcessing

import torch
from torch.utils.data import Dataset, DataLoader

import lightning as L
from torch import nn
from torch import optim
from torch.nn import functional as F

# Data processing

In [3]:
raw_data = pd.read_csv('spa-eng/spa.txt', sep='\t', header=None).sample(frac=1)
text_en, text_es = raw_data[0].values, raw_data[1].values

for i in range(3):
    print(f'{text_en[i]} ==> {text_es[i]}')

She divided the cake into five pieces. ==> Ella dividió la torta en cinco porciones.
I must be true to myself. ==> Debo ser honesto conmigo mismo.
Black cats are bad luck. ==> Los gatos negros traen mala suerte.


In [4]:
vocab_size = 1000

tokenizer_en = Tokenizer(WordLevel(unk_token='[UNK]'))

normalizer = normalizers.Sequence([normalizers.NFKC(), normalizers.Lowercase()])
tokenizer_en.normalizer = normalizer

pre_tokenizer = pre_tokenizers.Sequence([pre_tokenizers.Punctuation(), pre_tokenizers.Whitespace()])
tokenizer_en.pre_tokenizer = pre_tokenizer

trainer_en = WordLevelTrainer(vocab_size=vocab_size, show_progress=True, special_tokens=['[PAD]', '[UNK]'])

tokenizer_en.train_from_iterator(text_en, trainer_en)

In [5]:
tokenizer_es = Tokenizer(WordLevel(unk_token='[UNK]'))

normalizer = normalizers.Sequence([normalizers.NFKC(), normalizers.Lowercase()])
tokenizer_es.normalizer = normalizer

pre_tokenizer = pre_tokenizers.Sequence([pre_tokenizers.Punctuation(), pre_tokenizers.Whitespace()])
tokenizer_es.pre_tokenizer = pre_tokenizer

trainer_es = WordLevelTrainer(vocab_size=vocab_size, show_progress=True, special_tokens=['[PAD]', '[UNK]', '[BOS]', '[EOS]'])

tokenizer_es.train_from_iterator(text_es, trainer_es)

tokenizer_es_x = Tokenizer.from_str(tokenizer_es.to_str())
tokenizer_es_y = Tokenizer.from_str(tokenizer_es.to_str())

tokenizer_es_x.post_processor = TemplateProcessing(
    single='[BOS] $A',
    special_tokens=[('[BOS]', 2)]
)
tokenizer_es_y.post_processor = TemplateProcessing(
    single='$A [EOS]',
    special_tokens=[('[EOS]', 3)]
)

In [6]:
tokenized_en = tokenizer_en.encode_batch(text_en)
tokenized_en = [torch.tensor(i.ids) for i in tokenized_en]
tokenized_es_x = tokenizer_es_x.encode_batch(text_es)
tokenized_es_x = [torch.tensor(i.ids) for i in tokenized_es_x]
tokenized_es_y = tokenizer_es_y.encode_batch(text_es)
tokenized_es_y = [torch.tensor(i.ids) for i in tokenized_es_y]

# Create dataset and dataloader

In [7]:
class EnglishSpanishDataset(Dataset):
    def __init__(self, tokenized_en, tokenized_es_x, tokenized_es_y):
        self.tokenized_en = nn.utils.rnn.pad_sequence(tokenized_en, batch_first=True)
        self.tokenized_es_x = nn.utils.rnn.pad_sequence(tokenized_es_x, batch_first=True)
        self.tokenized_es_y = nn.utils.rnn.pad_sequence(tokenized_es_y, batch_first=True)

        self.seq_len_en = [len(i) for i in tokenized_en]
        self.seq_len_es = [len(i) for i in tokenized_es_x]

    def __len__(self):
        return len(self.tokenized_en)

    def __getitem__(self, indx):
        (x_enc, x_enc_len) = (self.tokenized_en[indx], self.seq_len_en[indx])
        (x_dec, x_dec_len) = (self.tokenized_es_x[indx], self.seq_len_es[indx])
        y_dec = self.tokenized_es_y[indx]
        return (x_enc, x_enc_len), (x_dec, x_dec_len), y_dec

In [8]:
train_en, train_es_x, train_es_y = tokenized_en[:100_000], tokenized_es_x[:100_000], tokenized_es_y[:100_000]
valid_en, valid_es_x, valid_es_y = tokenized_en[100_000:], tokenized_es_x[100_000:], tokenized_es_y[100_000:]

train_dataset = EnglishSpanishDataset(train_en, train_es_x, train_es_y)
valid_dataset = EnglishSpanishDataset(valid_en, valid_es_x, valid_es_y)

batch_size = 32
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=2)
valid_dataloader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=True, num_workers=2)

# Model

In [9]:
class Encoder(L.LightningModule):
    def __init__(self, vocab_size, embed_dim, hidden_size):
        super().__init__()

        self.embed = nn.Embedding(vocab_size, embed_dim, padding_idx=0)
        self.rnn = nn.LSTM(embed_dim, hidden_size)

    def forward(self, input, seq_lenghts):
        x = self.embed(input).swapaxes(0, 1)
        x = nn.utils.rnn.pack_padded_sequence(x, lengths=seq_lenghts.to('cpu'), enforce_sorted=False)
        _, state = self.rnn(x)

        return state

In [10]:
class Decoder(L.LightningModule):
    def __init__(self, vocab_size, embed_dim, hidden_size):
        super().__init__()

        self.embed = nn.Embedding(vocab_size, embed_dim, padding_idx=0)
        self.rnn = nn.LSTM(embed_dim, hidden_size)
        self.linear = nn.Linear(hidden_size, vocab_size)

    def forward(self, input, init_state, seq_lenghts):
        x = self.embed(input).swapaxes(0, 1)
        x = nn.utils.rnn.pack_padded_sequence(x, lengths=seq_lenghts.to('cpu'), enforce_sorted=False)
        x, (h_n, c_n) = self.rnn(x, init_state)
        x, x_seq_lengths = nn.utils.rnn.pad_packed_sequence(x)
        x = self.linear(x)

        return x#, (h_n, c_n)

In [11]:
class Translator(L.LightningModule):
    def __init__(self, vocab_size, embed_dim, hidden_size):
        super().__init__()

        self.enc = Encoder(vocab_size, embed_dim, hidden_size)
        self.dec = Decoder(vocab_size, embed_dim, hidden_size)

    def training_step(self, batch, batch_indx):
        (x_enc, x_enc_len), (x_dec, x_dec_len), y_dec = batch

        init_state = self.enc(x_enc, x_enc_len)
        output = self.dec(x_dec, init_state, x_dec_len)
        output = output.swapaxes(0, 1).swapaxes(1, 2)
        y_dec = y_dec[:,:output.shape[-1]]

        loss = F.cross_entropy(output, y_dec)
        self.log("train_loss", loss, on_step=True, on_epoch=True, prog_bar=True, logger=True)

        return loss

    def configure_optimizers(self):
        return optim.Adam(self.parameters(), lr=0.001)

    def translate_sentence(self, sentence_en, max_seq_size=60, tokenizer_en=tokenizer_en, tokenizer_es=tokenizer_es):

        tokenized_en = tokenizer_en.encode_batch([sentence_en])
        tokenized_en = torch.tensor([i.ids for i in tokenized_en])
        seq_len_en = torch.tensor([len(tokenized_en[0])])

        init_state = self.enc(tokenized_en, seq_len_en) # init state from encoder
        translated_tokens = [tokenizer_es.get_vocab()['[BOS]']] # Tokens starting with [BOS]

        for i in range(max_seq_size):
            output = self.dec(torch.tensor([translated_tokens]), init_state, torch.tensor([i+1]))
            new_word = output[i,0].topk(1).indices.item()
            translated_tokens.append(new_word)
            if new_word == tokenizer_es.token_to_id('[EOS]'):
                break

        return tokenizer_es.decode(translated_tokens)

t = Translator(vocab_size=vocab_size, embed_dim=128, hidden_size=512)

In [12]:
t = Translator(vocab_size=vocab_size, embed_dim=128, hidden_size=512)
trainer = L.Trainer(max_epochs=10)
trainer.fit(model=t, train_dataloaders=train_dataloader)

INFO: GPU available: True (cuda), used: True
INFO:lightning.pytorch.utilities.rank_zero:GPU available: True (cuda), used: True
INFO: TPU available: False, using: 0 TPU cores
INFO:lightning.pytorch.utilities.rank_zero:TPU available: False, using: 0 TPU cores
INFO: IPU available: False, using: 0 IPUs
INFO:lightning.pytorch.utilities.rank_zero:IPU available: False, using: 0 IPUs
INFO: HPU available: False, using: 0 HPUs
INFO:lightning.pytorch.utilities.rank_zero:HPU available: False, using: 0 HPUs
INFO: LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
INFO:lightning.pytorch.accelerators.cuda:LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
INFO: 
  | Name | Type    | Params
---------------------------------
0 | enc  | Encoder | 1.4 M 
1 | dec  | Decoder | 2.0 M 
---------------------------------
3.4 M     Trainable params
0         Non-trainable params
3.4 M     Total params
13.595    Total estimated model params size (MB)
INFO:lightning.pytorch.callbacks.model_summary:
  | Name | Type    | Params
----

Training: |          | 0/? [00:00<?, ?it/s]

INFO: `Trainer.fit` stopped: `max_epochs=10` reached.
INFO:lightning.pytorch.utilities.rank_zero:`Trainer.fit` stopped: `max_epochs=10` reached.


In [14]:
print(t.translate_sentence('I like soccer!'))
print(t.translate_sentence('Hello, how are you?'))
print(t.translate_sentence('Have a nice day.'))

¡ me gusta el fútbol !
¿ a qué tal estás ?
un buen día .
