# 1. Data

In [None]:
!pip install datasets



In [None]:
from datasets import load_dataset

# loading the wmt14 dataset
dataset = load_dataset("wmt14", "de-en")

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


In [None]:
# splitting into train, validation and test

train_dataset = dataset["train"]
val_dataset = dataset["validation"]
test_dataset = dataset["test"]

In [None]:
# storing the sentences in list

train_sentences_en = []
train_sentences_de = []
val_sentences_en = []
val_sentences_de = []

for i in range(50000): # only 50000 sentences are taken from train (out of 4 lakh samples in total)
  train_sentences_en.append(train_dataset[i]["translation"]['en'])
  train_sentences_de.append(train_dataset[i]["translation"]['de'])

for i in range(len(val_dataset)):
  val_sentences_en.append(val_dataset[i]["translation"]['en'])
  val_sentences_de.append(val_dataset[i]["translation"]['de'])

In [None]:
len(train_sentences_en), len(train_sentences_de), len(val_sentences_en), len(val_sentences_de)

(50000, 50000, 3000, 3000)

# 2. Training Tokenizer

In [None]:
# counting unique words in the sentences

def count_unique_words(sentences):
    words = set()
    for sentence in sentences:
        for word in sentence.lower().split():
            words.add(word.strip(".,!?;:()[]\"'"))  # remove common punctuations
    return len(words)

count_vocab_size = count_unique_words(train_sentences_en + train_sentences_de)

count_vocab_size

69447

In [None]:
from tokenizers import Tokenizer
from tokenizers.models import BPE
from tokenizers.trainers import BpeTrainer
from tokenizers.pre_tokenizers import Whitespace
from tokenizers.normalizers import Sequence, Lowercase, NFD, StripAccents
from tokenizers.implementations import ByteLevelBPETokenizer
from tokenizers.processors import TemplateProcessing
import random

def train_bpe_tokenizer_from_sentences(sentences, vocab_size=100000):
    # Initialize a tokenizer with the BPE model
    tokenizer = Tokenizer(BPE(unk_token="[UNK]"))

    # Set normalization and pre-tokenization
    tokenizer.normalizer = Sequence([NFD(), Lowercase(), StripAccents()])
    tokenizer.pre_tokenizer = Whitespace()

    # Set up the trainer
    trainer = BpeTrainer(
        vocab_size=vocab_size,
        special_tokens=["[PAD]", "[UNK]", "[CLS]", "[SEP]", "[MASK]"]
    )

    # Train using the list of sentences
    tokenizer.train_from_iterator(sentences, trainer=trainer)

    return tokenizer

sentences = train_sentences_de + train_sentences_en
random.shuffle(sentences)

# Train tokenizer
tokenizer = train_bpe_tokenizer_from_sentences(sentences, vocab_size=100000)

In [None]:
# Encode a english test sentence
output = tokenizer.encode("Tokenizers are awesome!")
print("Tokens:", output.tokens)

# Encode a german test sentence
output = tokenizer.encode("Die Katze sitzt auf dem Fensterbrett und schaut nach draußen.")
print("Tokens:", output.tokens, "IDs:", output.ids)

Tokens: ['token', 'iz', 'ers', 'are', 'a', 'wes', 'ome', '!']
Tokens: ['die', 'katze', 'sitzt', 'auf', 'dem', 'fenster', 'brett', 'und', 'schaut', 'nach', 'draußen', '.'] IDs: [101, 65572, 16242, 164, 234, 25763, 24442, 110, 34732, 340, 20471, 16]


# 3. Model

In [None]:
import torch
import torch.nn as nn
import math

# Positional Encoding
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1).float()
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))

        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        pe = pe.unsqueeze(0)  # (1, max_len, d_model)
        self.register_buffer("pe", pe)

    def forward(self, x):
        x = x + self.pe[:, :x.size(1)]
        return x

# Transformer with tokenizer integration
class TransformerTranslationModel(nn.Module):
    def __init__(self, vocab_size, tokenizer,
                 d_model=512, nhead=8, num_encoder_layers=6, num_decoder_layers=6, dim_feedforward=2048, dropout=0.1):
        super().__init__()

        # Load tokenizers
        self.tokenizer = tokenizer

        self.src_embedding = nn.Embedding(vocab_size, d_model)
        self.tgt_embedding = nn.Embedding(vocab_size, d_model)

        self.pos_encoder = PositionalEncoding(d_model)
        self.pos_decoder = PositionalEncoding(d_model)

        self.transformer = nn.Transformer(
            d_model=d_model,
            nhead=nhead,
            num_encoder_layers=num_encoder_layers,
            num_decoder_layers=num_decoder_layers,
            dim_feedforward=dim_feedforward,
            dropout=dropout
        )

        self.fc_out = nn.Linear(d_model, vocab_size)

        self.d_model = d_model
        self.tgt_vocab_size = vocab_size

    def forward(self, src, tgt, src_mask=None, tgt_mask=None, src_padding_mask=None, tgt_padding_mask=None, memory_key_padding_mask=None):
        src_emb = self.pos_encoder(self.src_embedding(src) * math.sqrt(self.d_model))
        tgt_emb = self.pos_decoder(self.tgt_embedding(tgt) * math.sqrt(self.d_model))

        output = self.transformer(
            src_emb, tgt_emb,
            src_mask=src_mask, tgt_mask=tgt_mask,
            src_key_padding_mask=src_padding_mask,
            tgt_key_padding_mask=tgt_padding_mask,
            memory_key_padding_mask=memory_key_padding_mask
        )
        return self.fc_out(output)

    def encode_input(self, src_text):
        # Encode input text using source tokenizer
        return torch.tensor(self.tokenizer.encode(src_text).ids).unsqueeze(1)  # (seq_len, 1)

    def decode_output(self, token_ids):
        return self.tokenizer.decode(token_ids, skip_special_tokens=True)

    def generate(self, src_text, max_len=50):
        self.eval()
        src = self.encode_input(src_text).to(next(self.parameters()).device)  # (src_len, 1)

        tgt_tokens = [self.tokenizer.token_to_id("[CLS]")]
        for _ in range(max_len):
            tgt_input = torch.tensor(tgt_tokens).unsqueeze(1).to(src.device)  # (tgt_len, 1)
            tgt_mask = nn.Transformer.generate_square_subsequent_mask(tgt_input.size(0)).to(src.device)

            out = self.forward(src, tgt_input, tgt_mask=tgt_mask)
            next_token = out.argmax(dim=-1)[-1, 0].item()

            if next_token == self.tgt_tokenizer.token_to_id("[SEP]"):
                break

            tgt_tokens.append(next_token)

        return self.decode_output(tgt_tokens[1:])  # skip [CLS]

# 4. Pytorch Dataset

In [None]:
from torch.utils.data import Dataset, DataLoader

# --- Custom Dataset ---
class TranslationDataset(Dataset):
    def __init__(self, src_sentences, tgt_sentences, tokenizer, max_len=128):
        self.src = src_sentences
        self.tgt = tgt_sentences
        self.tokenizer = tokenizer
        self.max_len = max_len
        self.pad_id = tokenizer.token_to_id("[PAD]")
        self.cls_id = tokenizer.token_to_id("[CLS]")
        self.sep_id = tokenizer.token_to_id("[SEP]")

    def __len__(self):
        return len(self.src)

    def __getitem__(self, idx):
        src_encoded = self.tokenizer.encode(self.src[idx]).ids
        tgt_encoded = self.tokenizer.encode(self.tgt[idx]).ids

        src_ids = src_encoded[:self.max_len]
        tgt_ids = [self.cls_id] + tgt_encoded[:self.max_len - 2] + [self.sep_id]

        # Padding
        src_ids += [self.pad_id] * (self.max_len - len(src_ids))
        tgt_ids += [self.pad_id] * (self.max_len - len(tgt_ids))

        src = torch.tensor(src_ids)
        tgt = torch.tensor(tgt_ids)
        return src, tgt

train_dataset = TranslationDataset(train_sentences_en, train_sentences_de, tokenizer)
val_dataset = TranslationDataset(val_sentences_en, val_sentences_de, tokenizer)

# 5. Pytorch Dataloader

In [None]:
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

In [None]:
next(iter(train_loader))[0]

tensor([[ 112,  759, 1092,  ...,    0,    0,    0],
        [5773,   14,  526,  ...,    0,    0,    0],
        [  82,  753, 5345,  ...,    0,    0,    0],
        ...,
        [  82, 3107,   79,  ...,    0,    0,    0],
        [ 112,  401,   14,  ...,    0,    0,    0],
        [ 127,   33, 4156,  ...,    0,    0,    0]])

# 6. Training

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm

# --- Generate Padding Mask ---
def create_padding_mask(seq, pad_id):
    return (seq == pad_id)

def train_model(model, dataloader, tokenizer, epochs=10, lr=1e-4, device='cuda'):
    model = model.to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss(ignore_index=tokenizer.token_to_id("[PAD]"))

    for epoch in range(epochs):
        model.train()
        total_loss = 0
        loop = tqdm(dataloader, desc=f"Epoch {epoch+1}", leave=False)

        for src, tgt in loop:
            src = src.to(device)         # (batch, seq_len)
            tgt = tgt.to(device)         # (batch, seq_len)

            tgt_input = tgt[:, :-1]      # (batch, seq_len - 1)
            tgt_output = tgt[:, 1:]      # (batch, seq_len - 1)

            # Transpose for transformer (seq_len, batch)
            src = src.transpose(0, 1)
            tgt_input = tgt_input.transpose(0, 1)

            # Masks
            tgt_mask = nn.Transformer.generate_square_subsequent_mask(tgt_input.size(0)).to(device)
            pad_id = tokenizer.token_to_id("[PAD]")
            src_padding_mask = create_padding_mask(src.transpose(0, 1), pad_id).to(device)
            tgt_padding_mask = create_padding_mask(tgt_input.transpose(0, 1), pad_id).to(device)

            # Forward pass
            output = model(
                src, tgt_input,
                src_mask=None, tgt_mask=tgt_mask,
                src_padding_mask=src_padding_mask,
                tgt_padding_mask=tgt_padding_mask,
                memory_key_padding_mask=src_padding_mask
            )

            output = output.transpose(0, 1).contiguous()  # (batch, seq_len, vocab_size)
            loss = criterion(output.view(-1, output.size(-1)), tgt_output.reshape(-1))

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total_loss += loss.item()
            loop.set_postfix(loss=loss.item())

        print(f"Epoch {epoch+1}: Average Loss = {total_loss / len(dataloader):.4f}")

In [None]:
# Model
vocab_size = tokenizer.get_vocab_size()
model = TransformerTranslationModel(vocab_size, tokenizer)

# Train
train_model(model, train_loader, tokenizer, epochs=1)

                                                                       

Epoch 1: Average Loss = 5.9674




# 7. Testing on a example sentence

In [None]:
def translate_sentence(model, tokenizer, sentence, device='cuda', max_len=50):
    model.eval()
    model = model.to(device)

    # Encode source sentence
    src_tensor = model.encode_input(sentence).to(device)  # (src_len, 1)

    # Start decoding with [CLS] token
    tgt_tokens = [tokenizer.token_to_id("[CLS]")]

    for _ in range(max_len):
        tgt_input = torch.tensor(tgt_tokens).unsqueeze(1).to(device)  # (tgt_len, 1)
        tgt_mask = nn.Transformer.generate_square_subsequent_mask(tgt_input.size(0)).to(device)

        with torch.no_grad():
            output = model(
                src_tensor, tgt_input,
                src_mask=None, tgt_mask=tgt_mask,
                src_padding_mask=None,
                tgt_padding_mask=None,
                memory_key_padding_mask=None
            )

        next_token = output.argmax(dim=-1)[-1, 0].item()
        if next_token == tokenizer.token_to_id("[SEP]"):
            break

        tgt_tokens.append(next_token)

    # Decode the output tokens (skipping [CLS])
    translation = model.decode_output(tgt_tokens[1:])
    return translation

In [None]:
example_sentence = "lion"
translation = translate_sentence(model, tokenizer, example_sentence)
print("Translation:", translation)

Translation: ( das parlament )
