In [2]:
from datasets import load_dataset
nmt_original_valid_set, nmt_test_set = load_dataset(
    path="ageron/tatoeba_mt_train", name="eng-spa",
    split=["validation", "test"])
split = nmt_original_valid_set.train_test_split(train_size=0.8, seed=42)
nmt_train_set, nmt_valid_set = split["train"], split["test"]

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


README.md: 0.00B [00:00, ?B/s]

eng-spa/validation-00000-of-00001.parque(…):   0%|          | 0.00/7.85M [00:00<?, ?B/s]

eng-spa/test-00000-of-00001.parquet:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

Generating validation split:   0%|          | 0/197299 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/24514 [00:00<?, ? examples/s]

In [3]:
import tokenizers
def train_eng_spa():  # a generator function to iterate over all training text
    for pair in nmt_train_set:
        yield pair["source_text"]
        yield pair["target_text"]

max_length = 256
vocab_size = 10_000
nmt_tokenizer_model = tokenizers.models.BPE(unk_token="<unk>")
nmt_tokenizer = tokenizers.Tokenizer(nmt_tokenizer_model)
nmt_tokenizer.enable_padding(pad_id=0, pad_token="<pad>")
nmt_tokenizer.enable_truncation(max_length=max_length)
nmt_tokenizer.pre_tokenizer = tokenizers.pre_tokenizers.Whitespace()
nmt_tokenizer_trainer = tokenizers.trainers.BpeTrainer(
    vocab_size=vocab_size, special_tokens=["<pad>", "<unk>", "<s>", "</s>"])
nmt_tokenizer.train_from_iterator(train_eng_spa(), nmt_tokenizer_trainer)

In [4]:
from collections import namedtuple

fields = ["src_token_ids", "src_mask", "tgt_token_ids", "tgt_mask"]
class NmtPair(namedtuple("NmtPairBase", fields)):
    def to(self, device):
        return NmtPair(self.src_token_ids.to(device), self.src_mask.to(device),
                       self.tgt_token_ids.to(device), self.tgt_mask.to(device))

In [5]:
from torch.utils.data import DataLoader
import torch
import torch.nn as nn
def nmt_collate_fn(batch):
    src_texts = [pair['source_text'] for pair in batch]
    tgt_texts = [f"<s> {pair['target_text']} </s>" for pair in batch]
    src_encodings = nmt_tokenizer.encode_batch(src_texts)
    tgt_encodings = nmt_tokenizer.encode_batch(tgt_texts)
    src_token_ids = torch.tensor([enc.ids for enc in src_encodings])
    tgt_token_ids = torch.tensor([enc.ids for enc in tgt_encodings])
    src_mask = torch.tensor([enc.attention_mask for enc in src_encodings])
    tgt_mask = torch.tensor([enc.attention_mask for enc in tgt_encodings])
    inputs = NmtPair(src_token_ids, src_mask,
                     tgt_token_ids[:, :-1], tgt_mask[:, :-1])
    labels = tgt_token_ids[:, 1:]
    return inputs, labels

batch_size = 32
nmt_train_loader = DataLoader(nmt_train_set, batch_size=batch_size,
                              collate_fn=nmt_collate_fn, shuffle=True)
nmt_valid_loader = DataLoader(nmt_valid_set, batch_size=batch_size,
                              collate_fn=nmt_collate_fn)
nmt_test_loader = DataLoader(nmt_test_set, batch_size=batch_size,
                             collate_fn=nmt_collate_fn)

In [6]:
device = 'cuda'
from torch.nn.utils.rnn import pack_padded_sequence
class NmtModel(nn.Module):
    def __init__(self, vocab_size, embed_dim=512, pad_id=0, hidden_dim=512,
                 n_layers=2):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, embed_dim, padding_idx=pad_id)
        self.encoder = nn.GRU(embed_dim, hidden_dim, num_layers=n_layers,
                              batch_first=True)
        self.decoder = nn.GRU(embed_dim, hidden_dim, num_layers=n_layers,
                              batch_first=True)
        self.output = nn.Linear(hidden_dim, vocab_size)

    def forward(self, pair):
        src_embeddings = self.embed(pair.src_token_ids)
        tgt_embeddings = self.embed(pair.tgt_token_ids)
        src_lengths = pair.src_mask.sum(dim=1)
        src_packed = pack_padded_sequence(
            src_embeddings, lengths=src_lengths.cpu(),
            batch_first=True, enforce_sorted=False)
        _, hidden_states = self.encoder(src_packed)
        outputs, _ = self.decoder(tgt_embeddings, hidden_states)
        return self.output(outputs).permute(0, 2, 1)

torch.manual_seed(42)
vocab_size = nmt_tokenizer.get_vocab_size()
nmt_model = NmtModel(vocab_size).to(device)

In [7]:
import torch.optim as optim
import torch.nn as nn
from tqdm import tqdm  # Using tqdm for progress bars
from torch.optim import NAdam

# 1. Define Optimizer and Loss Function
optimizer = NAdam(nmt_model.parameters(), lr=1e-3)
loss_fn = nn.CrossEntropyLoss(ignore_index=0) # We ignore the padding token (id=0)

# 2. Training Loop
n_epochs = 10

for epoch in range(n_epochs):
    nmt_model.train()
    total_train_loss = 0

    # Progress bar for the training phase
    progress_bar = tqdm(nmt_train_loader, desc=f"Epoch {epoch+1}/{n_epochs}", leave=False)

    for inputs, labels in progress_bar:
        # Move inputs and labels to the configured device
        inputs = inputs.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()

        # Forward pass
        # Model output shape: (batch_size, vocab_size, sequence_length) due to permute
        logits = nmt_model(inputs)

        # Calculate loss
        # CrossEntropyLoss expects (N, C, L) input and (N, L) target
        loss = loss_fn(logits, labels)

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

        total_train_loss += loss.item()
        progress_bar.set_postfix({'train_loss': f"{loss.item():.4f}"})

    avg_train_loss = total_train_loss / len(nmt_train_loader)

    # 3. Validation Phase
    nmt_model.eval()
    total_valid_loss = 0

    with torch.no_grad():
        for inputs, labels in nmt_valid_loader:
            inputs = inputs.to(device)
            labels = labels.to(device)

            logits = nmt_model(inputs)
            loss = loss_fn(logits, labels)

            total_valid_loss += loss.item()

    avg_valid_loss = total_valid_loss / len(nmt_valid_loader)

    print(f"Epoch {epoch+1}/{n_epochs} | "
          f"Train Loss: {avg_train_loss:.4f} | "
          f"Val Loss: {avg_valid_loss:.4f}")



Epoch 1/10 | Train Loss: 3.1336 | Val Loss: 2.4184




Epoch 2/10 | Train Loss: 2.0384 | Val Loss: 2.2262




Epoch 3/10 | Train Loss: 1.7209 | Val Loss: 2.2097




Epoch 4/10 | Train Loss: 1.5626 | Val Loss: 2.2492




Epoch 5/10 | Train Loss: 1.4726 | Val Loss: 2.2938




Epoch 6/10 | Train Loss: 1.4204 | Val Loss: 2.3420




Epoch 7/10 | Train Loss: 1.3933 | Val Loss: 2.3907




Epoch 8/10 | Train Loss: 1.3757 | Val Loss: 2.4345




Epoch 9/10 | Train Loss: 1.3754 | Val Loss: 2.4781




Epoch 10/10 | Train Loss: 1.3764 | Val Loss: 2.5171


In [8]:
def translate(model, src_text, max_length=20, pad_id=0, eos_id=3):
    tgt_text = ""
    token_ids = []
    for index in range(max_length):
        batch, _ = nmt_collate_fn([{"source_text": src_text,
                                    "target_text": tgt_text}])

        with torch.no_grad():
            Y_logits = model(batch.to(device))
            Y_token_ids = Y_logits.argmax(dim=1)  # find the best token IDs
            next_token_id = Y_token_ids[0, index]  # take the last token ID

        next_token = nmt_tokenizer.id_to_token(next_token_id)
        tgt_text += " " + next_token
        if next_token_id == eos_id:
            break
    return tgt_text

In [9]:
nmt_model.eval()
translate(nmt_model, 'I am not going to the school today')

' Hoy voy al colegio . </s>'

In [20]:
import torch
import torch.nn.functional as F

def translate_beam(model, src_text, beam_width=3, max_length=20, eos_id=3):

    candidates = [("", 0.0, False)]

    for i in range(max_length):
        all_candidates = []


        for text, score, finished in candidates:

            if finished:
                all_candidates.append((text, score, True))
                continue
            batch, _ = nmt_collate_fn([{"source_text": src_text, "target_text": text}])

            with torch.no_grad():
                logits = model(batch.to(device))

                current_step_logits = logits[0, :, i]

                log_probs = F.log_softmax(current_step_logits, dim=0)
                top_k_log_probs, top_k_ids = torch.topk(log_probs, beam_width)
            for k in range(beam_width):
                token_id = top_k_ids[k].item()
                token_score = top_k_log_probs[k].item()
                if token_id == eos_id:
                    all_candidates.append((text, score + token_score, True))
                else:
                    token = nmt_tokenizer.id_to_token(token_id)
                    new_text = text + " " + token
                    new_score = score + token_score
                    all_candidates.append((new_text, new_score, False))
        ordered = sorted(all_candidates, key=lambda x: x[1], reverse=True)
        candidates = ordered[:beam_width]
        if candidates[0][2]:
            break


    return candidates[0][0].strip()
print("Greedy:", translate(nmt_model, 'I am not going to the school today'))
print("Beam  :", translate_beam(nmt_model, 'I am not going to the school today', beam_width=3))

Greedy:  Hoy voy al colegio . </s>
Beam  : Hoy no voy al colegio .


In [21]:
translate_beam(nmt_model, 'I want to go to the home with my friends', beam_width=3)

'Quiero ir a mi casa con mis amigos .'