# SEQ2SEQ MT (English-Swedish)

In [112]:
from datasets import Dataset
import json

import torch
from torch.utils.data import Dataset as torchDataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
import torch.nn as nn
import random
import torch.optim as optim
import math
from tqdm import tqdm
import numpy as np
import sacrebleu

In [79]:
DATA_FOLDER_PATH = './data'
EN_SENTENCES_PATH = DATA_FOLDER_PATH + "/microlang_20000_eng.txt"
SW_SENTENCES_PATH = DATA_FOLDER_PATH + "/microlang_20000_swe.txt"
EN_VOCAB_PATH = DATA_FOLDER_PATH + "/eng_vocab.json"
SW_VOCAB_PATH = DATA_FOLDER_PATH + "/swe_vocab.json"

In [80]:
BATCH_SIZE = 32
EMBED_DIM = 128
HIDDEN_DIM = 256
DROPOUT = 0.3
LR = 0.001
TF_RATIO = 0.5
CLIP_GRAD = 1.0

N_EPOCHS = 10

## Load data and prepare dataset

In [81]:
def load_data_from_files(en_file_path, sw_file_path):
    
    with open(en_file_path, 'r', encoding='utf-8') as en_file:
        en_sentences = [line.strip() for line in en_file]
        
    with open(sw_file_path, 'r', encoding='utf-8') as sw_file:
        sw_sentences = [line.strip() for line in sw_file]
        
    dataset = Dataset.from_dict({"en": en_sentences, "sw": sw_sentences})
    return dataset    
    

In [82]:
dataset = load_data_from_files(EN_SENTENCES_PATH, SW_SENTENCES_PATH)

In [83]:
print(dataset)

Dataset({
    features: ['en', 'sw'],
    num_rows: 20000
})


In [84]:
def count_unique_words(dataset, field):
    vocab = set()
    for sentence in dataset[field]:
        for word in sentence.lower().split():
            vocab.add(word)
    return len(vocab)

en_unique = count_unique_words(dataset, "en")
sw_unique = count_unique_words(dataset, "sw")

print("English unique words:", en_unique)
print("Swedish unique words:", sw_unique)

English unique words: 161
Swedish unique words: 248


This is not surpising as Swedish is morphologically much more complex than English.

In [85]:
dataset = dataset.train_test_split(test_size=0.2, seed=42)
temp = dataset["test"].train_test_split(test_size=0.5, seed=42)

dataset = {
    "train": dataset["train"],
    "val": temp["train"],
    "test": temp["test"],
}
train_data, val_data, test_data = (
    dataset["train"],
    dataset["val"],
    dataset["test"],
)
print(train_data)
print(val_data)
print(test_data)

Dataset({
    features: ['en', 'sw'],
    num_rows: 16000
})
Dataset({
    features: ['en', 'sw'],
    num_rows: 2000
})
Dataset({
    features: ['en', 'sw'],
    num_rows: 2000
})


## Define tokenizer

Define special tokens

In [86]:
sos_token = "<SOS>"
eos_token = "<EOS>"
unk_token = "<UNK>"
pad_token = "<PAD>"
special_tokens = [unk_token, pad_token, sos_token, eos_token]

Since there are so few unique words in the dataset (161 and 248), there is no need to use any fancy pancy tokenizers. I will build my own lil tokenizer, which will be just fine for this job. Even using spaCy would seem a bit redundant since there is no punctuation and no commas in the dataset.

In [87]:
class lilTokenizer:
    def __init__(self):
        self.word2idx = {pad_token: 0, sos_token: 1, eos_token: 2, unk_token: 3}
        self.idx2word = {0: pad_token, 1: sos_token, 2: eos_token, 3:unk_token}
        self.vocab_size = 4
        
    def build_vocab(self, sentences):
        for sentence in sentences:
            for word in sentence.lower().split():
                if word not in self.word2idx:
                    self.word2idx[word] = self.vocab_size
                    self.idx2word[self.vocab_size] = word
                    self.vocab_size += 1
                
    def encode(self, sentence):
        '''Convert a sentence to integers and wrap it in eos and sos tokens'''
        tokens = []
        for word in sentence.lower().split():
            tokens.append(self.word2idx.get(word, self.word2idx[unk_token]))
        return [self.word2idx[sos_token]] + tokens + [self.word2idx[eos_token]]
    
    def decode(self, token_ids):
        '''Convers integers back to readable text'''
        words = []
        for idx in token_ids:
            word = self.idx2word[idx] 
            # self.idx2word.get(idx, unk_token) would off course be better practice,
            # but I want to make sure I get an error if something is misaligend in my code.
            # So I will leave it like this for learning purposes!
            if word == eos_token:
                break
            if word not in [sos_token, pad_token]:
                words.append(word)
        return " ".join(words) 
        

In [88]:
en_sentences_train = [sentence for sentence in train_data["en"]]
sw_sentences_train = [sentence for sentence in train_data["sw"]]
print(len(en_sentences_train), len(sw_sentences_train))

en_sentences_val = [sentence for sentence in val_data["en"]]
sw_sentences_val = [sentence for sentence in val_data["sw"]]
print(len(en_sentences_val), len(sw_sentences_val))

en_sentences_test = [sentence for sentence in test_data["en"]]
sw_sentences_test = [sentence for sentence in test_data["sw"]]
print(len(en_sentences_test), len(sw_sentences_test))

16000 16000
2000 2000
2000 2000


In [89]:
en_tokenizer = lilTokenizer()
en_tokenizer.build_vocab(en_sentences_train)
sw_tokenizer = lilTokenizer()
sw_tokenizer.build_vocab(sw_sentences_train)

Lil sanity check

In [110]:
assert en_tokenizer.word2idx[pad_token] == sw_tokenizer.word2idx[pad_token]
assert en_tokenizer.word2idx[sos_token] == sw_tokenizer.word2idx[sos_token]
assert en_tokenizer.word2idx[eos_token] == sw_tokenizer.word2idx[eos_token]
assert en_tokenizer.word2idx[unk_token] == sw_tokenizer.word2idx[unk_token]

PAD_IDX = en_tokenizer.word2idx[pad_token]
SOS_IDX = en_tokenizer.word2idx[sos_token]
EOS_IDX = en_tokenizer.word2idx[eos_token]
UNK_IDX = en_tokenizer.word2idx[unk_token]

# should be 0, 1, 2, 3
print(f"Special token indices: {PAD_IDX}, {SOS_IDX}, {EOS_IDX}, {UNK_IDX}")

Special token indices: 0, 1, 2, 3


In [91]:
# should be 161 + 4 = 165 and 248 + 4 = 252
print(len(en_tokenizer.word2idx), len(sw_tokenizer.word2idx))

165 252


### Save vocabularies

In [92]:
def save_vocab(tokenizer, filepath):
    with open(filepath, 'w', encoding='utf-8') as f:
        json.dump(tokenizer.word2idx, f, ensure_ascii=False, indent=4)

In [93]:
save_vocab(en_tokenizer, EN_VOCAB_PATH)
save_vocab(sw_tokenizer, SW_VOCAB_PATH)

## DataLoaders

Alright, lets get these sentences into pytorch!

In [94]:
class TranslationDataset(torchDataset):
    def __init__(self, en_sentences, sw_sentences, en_tokenizer, sw_tokenizer):
        self.en_sentences = en_sentences
        self.sw_sentences = sw_sentences
        self.en_tokenizer = en_tokenizer
        self.sw_tokenizer = sw_tokenizer

    def __len__(self):
        return len(self.en_sentences)

    def __getitem__(self, idx):
        # raw text for a specific index
        en_text = self.en_sentences[idx]
        sw_text = self.sw_sentences[idx]

        # encode texts using my awesome lilTokenizer
        en_encoded = self.en_tokenizer.encode(en_text)
        sw_encoded = self.sw_tokenizer.encode(sw_text)

        # convert the integer lists into pytorch tensors!
        return torch.tensor(en_encoded), torch.tensor(sw_encoded)

In [None]:
def collate_fn(batch):
    en_batch = []
    sw_batch = []
    
    # separate tensors
    for en_item, sw_item in batch:
        en_batch.append(en_item)
        sw_batch.append(sw_item)
        
    # pad the sequences
    # batch_first=False creates shape: (Sequence_Length, Batch_Size)
    en_padded = pad_sequence(en_batch, padding_value=PAD_IDX, batch_first=False)
    sw_padded = pad_sequence(sw_batch, padding_value=PAD_IDX, batch_first=False)
    
    return en_padded, sw_padded

In [96]:
train_dataset = TranslationDataset(
    en_sentences=en_sentences_train, 
    sw_sentences=sw_sentences_train, 
    en_tokenizer=en_tokenizer, 
    sw_tokenizer=sw_tokenizer
)

val_dataset = TranslationDataset(
    en_sentences=en_sentences_val,
    sw_sentences=sw_sentences_val,
    en_tokenizer=en_tokenizer,
    sw_tokenizer=sw_tokenizer
)

test_dataset = TranslationDataset(
    en_sentences=en_sentences_test,
    sw_sentences=sw_sentences_test,
    en_tokenizer=en_tokenizer,
    sw_tokenizer=sw_tokenizer
)

train_loader = DataLoader(
    train_dataset, 
    batch_size=BATCH_SIZE, 
    shuffle=True,
    collate_fn=collate_fn,
    drop_last=True
)

val_loader = DataLoader(
    val_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    collate_fn=collate_fn
)

test_loader = DataLoader(
    test_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    collate_fn=collate_fn
)

# check first batch
en_batch, sw_batch = next(iter(train_loader))

print(f"English Batch Shape: {en_batch.shape} (Seq_Len, Batch_Size)")
print(f"Swedish Batch Shape: {sw_batch.shape} (Seq_Len, Batch_Size)")

English Batch Shape: torch.Size([8, 32]) (Seq_Len, Batch_Size)
Swedish Batch Shape: torch.Size([8, 32]) (Seq_Len, Batch_Size)


## Define Encoder and Decoder for Seq2Seq

I choose... GRU! 

Because, they are a type of RNN that efficiently capture sequential dependencies while having fewer parameters than LSTMs. This makes them faster to train and less prone to overfitting on a small dataset like this, while still handling long-range dependencies better than a vanilla RNN.

### Encoder

In [97]:
class Encoder(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, pad_idx, dropout=0.3):
        super(Encoder, self).__init__()
        
        # pass padding_idx so the model ignores it and can focus on learning
        # the actual words!
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=pad_idx)
        self.dropout = nn.Dropout(dropout)
        self.rnn = nn.GRU(embed_dim, hidden_dim)
        
    def forward(self, x):
        # x: (src_len, batch_size)
        embedded = self.dropout(self.embedding(x))
        outputs, hidden = self.rnn(embedded)
        return hidden

### Decoder

In [98]:
class Decoder(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, pad_idx, dropout=0.3):
        super(Decoder, self).__init__()
        
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=pad_idx)
        self.dropout = nn.Dropout(dropout)
        self.rnn = nn.GRU(embed_dim, hidden_dim)
        self.fc_out = nn.Linear(hidden_dim, vocab_size)
        
    def forward(self, x, hidden):
        # x: (batch_size)
        x = x.unsqueeze(0) # (1, batch_size)
        embedded = self.dropout(self.embedding(x)) # (1, batch_size, embed_dim)
        outputs, hidden = self.rnn(embedded, hidden)
        prediction = self.fc_out(outputs) # (batch_size, vocab_size)
        return prediction.squeeze(0), hidden
        

### Seq2Seq

In [99]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super(Seq2Seq, self).__init__()
        
        self.encoder = encoder
        self.decoder = decoder
        self.device = device
        
        # ensure the Encoder and Decoder hidden sizes match
        assert encoder.rnn.hidden_size == decoder.rnn.hidden_size, \
            "Hidden dimensions of encoder and decoder must be equal!!!!"

    def forward(self, source, target, tf_ratio=0.5):
        # source shape: (src_len, batch_size)
        # target shape: (trg_len, batch_size)
        
        batch_size = source.shape[1]
        target_len = target.shape[0]
        target_vocab_size = self.decoder.fc_out.out_features
        
        # tensor to store all decoder predictions
        outputs = torch.zeros(target_len, batch_size, target_vocab_size).to(self.device)
        
        # pass sentence through the encoder 
        hidden = self.encoder(source) # this is the so called context vector
        
        # first input to the decoder is the [SOS] tokens from the target
        input_word = target[0, :]
        
        # generate word by word
        for t in range(1, target_len):
            # pass current word and previous hidden state into decoder
            prediction, hidden = self.decoder(input_word, hidden)
            
            outputs[t] = prediction
            
            teacher_force = random.random() < tf_ratio
            
            top1 = prediction.argmax(1) # models best guess
            
            # ground truth or model's own guess
            input_word = target[t] if teacher_force else top1
            
        return outputs

In [100]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


In [None]:
encoder = Encoder(vocab_size=en_tokenizer.vocab_size, embed_dim=EMBED_DIM,
                  hidden_dim=HIDDEN_DIM, pad_idx=PAD_IDX, dropout=DROPOUT)
decoder = Decoder(vocab_size=sw_tokenizer.vocab_size, embed_dim=EMBED_DIM,
                  hidden_dim=HIDDEN_DIM, pad_idx=PAD_IDX, dropout=DROPOUT)

model = Seq2Seq(encoder, decoder, device).to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=LR)
criterion = nn.CrossEntropyLoss(ignore_index=PAD_IDX)

## TRAIN

In [102]:
def train(model, iterator, optimizer, criterion, clip_grad=CLIP_GRAD, tf_radio=TF_RATIO):
    model.train()
    epoch_loss = 0
    
    for i, (src, trg) in enumerate(iterator):
        src, trg = src.to(device), trg.to(device)
        
        optimizer.zero_grad()
        
        # forward pass
        output = model(src, trg, tf_ratio=tf_radio)
        
        # output shape is currently: (trg_len, batch_size, output_dim)
        output_dim = output.shape[-1]
        
        # slice off the 0th token [SOS] and flatten the rest
        output = output[1:].view(-1, output_dim) 
        trg = trg[1:].view(-1)
        
        # calculate loss
        loss = criterion(output, trg)
        
        # backward pass
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip_grad)
        
        # update weights
        optimizer.step()
        
        epoch_loss += loss.item()
        
    return epoch_loss / len(iterator)

In [103]:
def evaluate(model, iterator, criterion):
    model.eval()
    
    epoch_loss = 0
    
    # turn of gradient tracking
    with torch.no_grad():
        for i, (src, trg) in enumerate(iterator):
            src, trg = src.to(device), trg.to(device)
            
            # forward pass (no teacher forcing)
            output = model(src, trg, tf_ratio=0)
            
            # flatten
            output_dim = output.shape[-1]
            output = output[1:].view(-1, output_dim)
            trg = trg[1:].view(-1)
            
            # calculate loss
            loss = criterion(output, trg)
            epoch_loss += loss.item()
            
    return epoch_loss / len(iterator)

In [104]:
best_valid_loss = float('inf') # set starting best loss to infinity

for epoch in tqdm(range(N_EPOCHS)):
    
    # train the model and get training loss
    train_loss = train(model, train_loader, optimizer, criterion)
    
    # evaluate the model and get validation loss
    valid_loss = evaluate(model, val_loader, criterion)
    
    # save the model if its the best one yet!!
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        print(f"Saving model at epch: {epoch+1}")
        torch.save(model.state_dict(), 'best_translation_model.pt')
    
    # print stats
    print(f'Epoch: {epoch+1:02}')
    print(f'\t train Loss: {train_loss:.3f} | train perplexity: {math.exp(train_loss):7.3f}')
    print(f'\t val Loss: {valid_loss:.3f} |  val perplexity: {math.exp(valid_loss):7.3f}')
    # A lower perplexity means the model is more confident in its translations.

 10%|█         | 1/10 [00:09<01:28,  9.83s/it]

Saving model at epch: 1
Epoch: 01
	 train Loss: 1.984 | train perplexity:   7.274
	 val Loss: 0.988 |  val perplexity:   2.687


 20%|██        | 2/10 [00:20<01:21, 10.16s/it]

Saving model at epch: 2
Epoch: 02
	 train Loss: 0.693 | train perplexity:   2.000
	 val Loss: 0.444 |  val perplexity:   1.559


 30%|███       | 3/10 [00:29<01:09,  9.96s/it]

Saving model at epch: 3
Epoch: 03
	 train Loss: 0.281 | train perplexity:   1.324
	 val Loss: 0.134 |  val perplexity:   1.143


 40%|████      | 4/10 [00:39<00:58,  9.75s/it]

Saving model at epch: 4
Epoch: 04
	 train Loss: 0.090 | train perplexity:   1.094
	 val Loss: 0.055 |  val perplexity:   1.057


 50%|█████     | 5/10 [00:49<00:49,  9.83s/it]

Saving model at epch: 5
Epoch: 05
	 train Loss: 0.039 | train perplexity:   1.039
	 val Loss: 0.032 |  val perplexity:   1.032


 60%|██████    | 6/10 [00:59<00:40, 10.06s/it]

Saving model at epch: 6
Epoch: 06
	 train Loss: 0.021 | train perplexity:   1.021
	 val Loss: 0.018 |  val perplexity:   1.018


 70%|███████   | 7/10 [01:09<00:29,  9.83s/it]

Saving model at epch: 7
Epoch: 07
	 train Loss: 0.012 | train perplexity:   1.012
	 val Loss: 0.012 |  val perplexity:   1.012


 80%|████████  | 8/10 [01:18<00:19,  9.74s/it]

Saving model at epch: 8
Epoch: 08
	 train Loss: 0.009 | train perplexity:   1.009
	 val Loss: 0.009 |  val perplexity:   1.009


 90%|█████████ | 9/10 [01:29<00:10, 10.10s/it]

Epoch: 09
	 train Loss: 0.011 | train perplexity:   1.011
	 val Loss: 0.016 |  val perplexity:   1.016


100%|██████████| 10/10 [01:40<00:00, 10.03s/it]

Saving model at epch: 10
Epoch: 10
	 train Loss: 0.007 | train perplexity:   1.007
	 val Loss: 0.007 |  val perplexity:   1.007





Pretty nice! A perplexity of 1.0 means the model has 0% confusion. So when the model hits a perplexity of 1.007, it means it is basically 100% certain of the correct Swedish word at every single step!

And even though the dataset was very small, the model was able to practically memorize the grammar and vocab perfectly without overfitting!

In [None]:
model.load_state_dict(torch.load("best_translation_model.pt"))

test_loss = evaluate(model, test_loader, criterion)

print(f"| Test Loss: {test_loss:.3f} | Test PPL: {np.exp(test_loss):7.3f} |")

In [107]:
def get_max_len(sentences_list, tokenizer):
    # sentences_list: list of lists of sentences
    max_len = 0
    for sentences in sentences_list:
        for s in sentences:
            length = len(tokenizer.encode(s))
            if length > max_len:
                max_len = length
    return max_len

all_en = [en_sentences_train, en_sentences_val, en_sentences_test]
all_sw = [sw_sentences_train, sw_sentences_val, sw_sentences_test]

max_len_en = get_max_len(all_en, en_tokenizer)
max_len_sw = get_max_len(all_sw, sw_tokenizer)

print(f"Max English length: {max_len_en}")
print(f"Max Swedish length: {max_len_sw}")

Max English length: 10
Max Swedish length: 10


In [108]:
def translate_sentence(sentence, model, en_tokenizer, sw_tokenizer, device, max_len=15):
    model.eval()
    
    # encode the English sentence
    tokens = en_tokenizer.encode(sentence)
    
    src_tensor = torch.LongTensor(tokens).unsqueeze(1).to(device)
    
    # get the context vector from the Encoder
    with torch.no_grad():
        hidden = model.encoder(src_tensor)
        
    # init the target sequence with the [SOS] token ID
    trg_indexes = [SOS_IDX]
    
    for _ in range(max_len):
        # the input to the decoder is  the "last" word predicted
        trg_tensor = torch.LongTensor([trg_indexes[-1]]).to(device)
        
        with torch.no_grad():
            output, hidden = model.decoder(trg_tensor, hidden)
            
        # output shape is (1, swedish_vocab_size). argmax(1) gets the highest probability index
        pred_token = output.argmax(1).item()
        trg_indexes.append(pred_token)
        
        # stop early if the model predicts the end of the sentence
        if pred_token == EOS_IDX:
            break
            
    # convert the list of predicted integers back to a readable string
    translated_sentence = sw_tokenizer.decode(trg_indexes)
    
    return translated_sentence

lil sanity check again

In [111]:
# Try a sentence from your dataset
english_sentence = "a red apple is hot"
swedish_translation = translate_sentence(english_sentence, model, en_tokenizer, sw_tokenizer, device)

print(f"English: {english_sentence}")
print(f"Swedish: {swedish_translation}")

English: a red apple is hot
Swedish: ett rött äpple är hett


correct

## BLEU score calculation

pip install sacrebleu

In [115]:
def calculate_sacrebleu(test_pairs, model, en_tokenizer, sw_tokenizer, device):
    """
    test_pairs should be a list of tuples containing the raw strings: 
    [("a red apple is hot", "ett rött äpple är hett"), ...]
    """
    predictions = []
    references = []
    
    print("Translating test set...")
    # loop through all sentence pairs in test set
    for item in tqdm(test_pairs):
        eng_sentence = item["en"]
        true_sw_sentence = item["sw"]
        
        # translate english sentence
        predicted_sw = translate_sentence(eng_sentence, model, en_tokenizer, sw_tokenizer, device)
        predictions.append(predicted_sw)
        references.append(true_sw_sentence)
        
    # format references for SacreBLEU (list of lists)
    references_formatted = [references]
    
    # calculate the BLEU score !!!
    bleu = sacrebleu.corpus_bleu(predictions, references_formatted)
    
    return bleu

In [116]:
bleu_result = calculate_sacrebleu(test_data, model, en_tokenizer, sw_tokenizer, device)

print(f"\n***** FINAL RESULTS *****")
print(f"SacreBLEU Score: {bleu_result.score:.2f}")
print(f"All details: {bleu_result}")

Translating test set...


100%|██████████| 2000/2000 [00:08<00:00, 222.31it/s]



***** FINAL RESULTS *****
SacreBLEU Score: 99.56
All details: BLEU = 99.56 99.8/99.7/99.5/99.3 (BP = 1.000 ratio = 1.000 hyp_len = 7285 ref_len = 7285)


Scores over 0.60 are often considered better than human so a score of 99.56 is obviously very very very good.

But... we are using a "microlang" dataset with a tiny vocabulary and highly predictable, punctuation-free grammar. So the GRU model had enough capacity to practically memorize the exact mapping rules of this dataset.