In [1]:
import numpy as np
import torch
import pathlib
import re
import unicodedata

from torch.utils.data import Dataset, DataLoader
from collections import Counter

In [4]:
config = {
    'MAX_VOCAB_SIZE': 13000,
    'BATCH_SIZE': 8,
    'raw_dataset_path': './dataset/por.txt',
    'MAX_SEQ_LEN': 32
}

In [5]:
# data loader
dataset_path = pathlib.Path(config['raw_dataset_path'])
text_data = dataset_path.read_text(encoding = 'utf-8')

lines = text_data.splitlines()
pairs = [line.split('\t') for line in lines]

context_en = np.array([context for context, target, _ in pairs])
target_por = np.array([target for context, target, _ in pairs])

sentences = list(zip(context_en, target_por))

In [8]:
def tokenizer(text):
    text = unicodedata.normalize("NFKD", text)
    text = text.lower()
    text = re.sub(r"[^ a-z.?!,¿]", "", text)
    text = re.sub(r"([.?!,¿])", r" \1 ", text)
    text = text.strip()
    return text.split()

tokenizer(context_en[34]), tokenizer(target_por[34])

(['go', 'on', '.'], ['siga', 'em', 'frente', '.'])

In [9]:
# build a vocabulary

class Vocabulary:
    def __init__(self, freq_threshold, max_vocab_size):
        # maintain two different mappings
        self.itos = {0: '[PAD]', 1: '[SOS]', 2: '[EOS]', 3: '[UNK]'}
        self.stoi = {'[PAD]': 0, '[SOS]': 1, '[EOS]': 2, '[UNK]': 3}
        self.freq_threshold = freq_threshold
        self.max_vocab_size = max_vocab_size
        
        self.pad_id = self.stoi['[PAD]']
        self.sos_id = self.stoi['[SOS]']
        self.eos_id = self.stoi['[EOS]']
        self.oov_id = self.stoi['[UNK]']

    def __len__(self):
        return len(self.itos)

    def vocab_size(self):
        return len(self.itos)

    def get_vocabulary(self):
        return self.stoi

    def token_to_ids(self, tokens):
        if isinstance(tokens, str): # handle a single word or sentence here
            token_list = self.tokenizer(tokens)
            return [self.stoi[t] if t in self.stoi else self.stoi['[UNK]'] for t in token_list]

        elif isinstance(tokens, list):
            return [self.stoi[t] if t in self.stoi else self.stoi['[UNK]'] for t in tokens]
        
        else:
            raise TypeError("Input must be either String or List of words.")

    def ids_to_token(self, ids):
        return [self.itos[id] for id in ids]

    # building vocab with the input sentence list
    def adapt(self, sentences, tokenizer):
        self.tokenizer = tokenizer
        idx = len(self.itos)
        token_freqs = {}

        for sentence in sentences:
            for token in self.tokenizer(sentence):
                if token not in self.stoi:
                    token_freqs[token] = 1
                else:
                    token_freqs[token] += 1
                
                if (token_freqs[token] == self.freq_threshold) and (idx < self.max_vocab_size):
                    self.itos[idx] = token
                    self.stoi[token] = idx
                    idx += 1

In [11]:
# english vocabulary
en_vocab = Vocabulary(freq_threshold = 1, max_vocab_size = config['MAX_VOCAB_SIZE'])
en_vocab.adapt(context_en, tokenizer)

# portuguese vocabulary
por_vocab = Vocabulary(freq_threshold = 1, max_vocab_size = config['MAX_VOCAB_SIZE'])
por_vocab.adapt(target_por, tokenizer)

en_vocab.vocab_size(), por_vocab.vocab_size()

(13000, 13000)

In [12]:
# test
test_idx = 789
en_translation = context_en[test_idx]
por_translation = target_por[test_idx]

print(en_translation, '--------->', por_translation)

max_seq_len = 16
context_tokens = en_vocab.token_to_ids(en_translation)
target_tokens = por_vocab.token_to_ids(por_translation)

print("\nEncoder Input IDs: ")
print([en_vocab.sos_id] + context_tokens + [en_vocab.eos_id] + (max_seq_len - len(context_tokens) - 2) * [en_vocab.pad_id])
print("\nPre-Attention Decoder Input IDs (Shifted to the Right): ")
print([en_vocab.sos_id] + target_tokens + [en_vocab.eos_id] + (max_seq_len - len(context_tokens) - 2) * [en_vocab.pad_id])
print("\nPost-Attention Decoder Input IDs: ")
print(target_tokens + [en_vocab.eos_id] + (max_seq_len - len(context_tokens) - 1) * [en_vocab.pad_id])

Here I am. ---------> Aqui estou.

Encoder Input IDs: 
[1, 194, 20, 62, 5, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

Pre-Attention Decoder Input IDs (Shifted to the Right): 
[1, 402, 47, 5, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

Post-Attention Decoder Input IDs: 
[402, 47, 5, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]


In [13]:
class NMT_dataset(Dataset):
    def __init__(self, translation_pairs, tokenizers, vocabularies, max_seq_len):
        self.translation_pairs = translation_pairs
        self.en_tokenizer, self.por_tokenizer = tokenizers
        self.en_vocab, self.por_vocab = vocabularies
        self.max_seq_len = max_seq_len

        # for convenience 
        self.sos_id = self.en_vocab.sos_id
        self.eos_id = self.en_vocab.eos_id
        self.pad_id = self.en_vocab.pad_id
        self.oov_id = self.en_vocab.oov_id

    def __len__(self):
        return len(self.translation_pairs)

    def __getitem__(self, idx):
        req_pair = self.translation_pari[idx]
        en_translation, por_translation = req_pair

        context_tokens = self.en_vocab.token_to_ids(en_translation)
        target_tokens = self.por_vocab.token_to_ids(por_translation)

        # encoder input tokens
        encoder_input = (
            [self.sos_id] + 
            context_tokens + 
            [self.eos_id] + 
            (self.max_seq_len - len(context_tokens) - 2) * [self.pad_id]
            )
        
        # pre-attention decoder input tokens
        pre_decoder_input = (
            [self.sos_id] + 
            target_tokens + 
            [self.eos_id] + 
            (max_seq_len - len(context_tokens) - 2) * [self.pad_id] 
        )

        # post-attention decoder output tokens
        post_decoder_output = (
            target_tokens + 
            [self.eos_id] + 
            (max_seq_len - len(context_tokens) - 1) * [self.pad_id]
        )

        encoder_input_tensor = torch.tensor(encoder_input, dtype = torch.long)
        pre_decoder_input_tensor = torch.tensor(pre_decoder_input, dtype = torch.long)
        post_decoder_output_tensor = torch.tensor(post_decoder_output, dtype = torch.long)

        return encoder_input_tensor, pre_decoder_input_tensor, post_decoder_output_tensor