In [1]:
import spacy
from enum import Enum
from torch.utils.data import Dataset
import itertools
import re
import random
import torch
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from functools import partial
from torchtext.vocab import build_vocab_from_iterator
from torchtext.data.utils import get_tokenizer

In [2]:
FILE_PATH = 'data/en-pt.txt'
NUM_PHRASES = 1_000_000
OUTPUT_FILE = 'data/en-pt_sentences.txt'
TOKENIZER_DIR = 'tokenizer'
VOCAB_SIZE = 64_000
PAD_TOKEN = "<pad>"
BOS_TOKEN = "<bos>"
EOS_TOKEN = "<eos>"
UNK_TOKEN = '<unk>'

In [3]:
class TranslationDataset(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        en, pt = self.data[idx]
        return torch.tensor(en), torch.tensor(pt)

In [4]:
class LanguageDirection(Enum):
    PT2EN = 0,
    EN2PT = 1

In [5]:
def load_dataset(dataset_path, language_direction, limit=1000000):
    with open(dataset_path, "r", encoding="utf-8") as file:
        if language_direction == LanguageDirection.PT2EN.name:
            sentence_pairs = [tuple(reversed(line.strip().split("\t"))) for line in itertools.islice(file, limit)]
        else:
            sentence_pairs = [tuple(line.strip().split("\t")) for line in itertools.islice(file, limit)]
    return sentence_pairs

In [6]:
def preprocess_text(text):
    # Convert the text to lowercase
    text = text.lower()

    # Remove special characters and digits
    text = re.sub(r'[^a-záàâãéèêíïóôõöúçñ]+', ' ', text)

    return text

In [7]:
language_direction = LanguageDirection.PT2EN.name

sentence_pairs = load_dataset(FILE_PATH, language_direction, limit=NUM_PHRASES)
preprocessed_pairs = [(preprocess_text(en), preprocess_text(pt)) for en, pt in sentence_pairs]

In [19]:
en_tokenizer = get_tokenizer("spacy", language="pt_core_web_sm")
pt_tokenizer = get_tokenizer("spacy", language="en_core_news_sm")

OSError: [E050] Can't find model 'pt_core_web_sm'. It doesn't seem to be a Python package or a valid path to a data directory.

In [9]:
tokenized_pairs = [(
    en_tokenizer(en),
    pt_tokenizer(pt)
) for en, pt in preprocessed_pairs]

In [11]:
def add_special_tokens(tokens, bos_token, eos_token):
    return [bos_token] + tokens + [eos_token]

tokenized_pairs = [(
    add_special_tokens(en_tokens, BOS_TOKEN, EOS_TOKEN),
    add_special_tokens(pt_tokens, BOS_TOKEN, EOS_TOKEN)
) for en_tokens, pt_tokens in tokenized_pairs]

In [13]:
from torchtext.vocab import build_vocab_from_iterator

def yield_tokens(pairs, language):
    for en_tokens, pt_tokens in pairs:
        if language == "en":
            yield en_tokens
        else:
            yield pt_tokens

en_vocab = build_vocab_from_iterator(yield_tokens(tokenized_pairs, "en"), specials=[UNK_TOKEN, PAD_TOKEN, BOS_TOKEN, EOS_TOKEN])
pt_vocab = build_vocab_from_iterator(yield_tokens(tokenized_pairs, "pt"), specials=[UNK_TOKEN, PAD_TOKEN, BOS_TOKEN, EOS_TOKEN])

index_pairs = [(
    [en_vocab[token] for token in en_tokens],
    [pt_vocab[token] for token in pt_tokens]
) for en_tokens, pt_tokens in tokenized_pairs]

In [14]:
def padded_sequence(sequence, max_length, pad_idx):
    return sequence + [pad_idx] * (max_length - len(sequence))

max_length = 50
pad_idx_en = en_vocab[PAD_TOKEN]
pad_idx_pt = pt_vocab[PAD_TOKEN]

padded_pairs = [(
    padded_sequence(en_indices, max_length, pad_idx_en),
    padded_sequence(pt_indices, max_length, pad_idx_pt)
) for en_indices, pt_indices in index_pairs]

In [15]:
dataset = TranslationDataset(padded_pairs)

In [16]:
def collate_fn(batch):
    src_tensors, tgt_tensors = zip(*batch)
    src_tensors = pad_sequence(src_tensors, batch_first=True, padding_value=pad_idx_en)
    tgt_tensors = pad_sequence(tgt_tensors, batch_first=True, padding_value=pad_idx_pt)
    return src_tensors, tgt_tensors

In [17]:
from torch.utils.data import random_split

# Calculate the sizes of the training and testing sets
dataset_size = len(dataset)
train_size = int(dataset_size * 0.8)  # Use 80% of the dataset for training
test_size = dataset_size - train_size

# Split the dataset into training and testing sets
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

# Create DataLoaders for the training and testing sets
train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)
test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=False, collate_fn=collate_fn)

In [18]:
for batch_idx, (src, tgt) in enumerate(train_dataloader):
    print(src, tgt)
    break

tensor([[    2,  4228,    21,  ...,     1,     1,     1],
        [    2,   473,     4,  ...,     1,     1,     1],
        [    2,    67,  1247,  ...,     1,     1,     1],
        ...,
        [    2, 14787,    58,  ...,     1,     1,     1],
        [    2,    32,   802,  ...,     1,     1,     1],
        [    2,   119,     6,  ...,     1,     1,     1]]) tensor([[     2,   6783,     10,  ...,      1,      1,      1],
        [     2,   1527,    587,  ...,      1,      1,      1],
        [     2,    848,      7,  ...,      1,      1,      1],
        ...,
        [     2,    131,      9,  ...,      1,      1,      1],
        [     2,    174,    808,  ...,      1,      1,      1],
        [     2, 117745,     37,  ...,      1,      1,      1]])
