In [None]:
# ============================
# 🔹 1. Install Required Libraries
# ============================
#!pip install torch datasets

# ============================
# 🔹 2. Import Libraries
# ============================
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
from datasets import load_dataset, concatenate_datasets
from torch.utils.data import DataLoader
import os
import re
import math
from collections import Counter
from torch.nn.utils.rnn import pad_sequence
import json

# ============================
# 🔹 3. Load Dataset (Arabic-English)
# ============================
dataset = load_dataset("Helsinki-NLP/tatoeba_mt", "ara-eng")
train_data = dataset['validation']  # Use validation set for training
test_data = dataset['test']  # Use test set for additional training

# Combine validation and test datasets
combined_data = concatenate_datasets([train_data, test_data])

# ============================
# 🔹 4. Build Custom Tokenizer
# ============================
class CustomTokenizer:
    def __init__(self):
        self.word2idx = {}
        self.idx2word = {}
        self.vocab_size = 0
        self.special_tokens = ["<pad>", "<sos>", "<eos>", "<unk>"]

    def build_vocab(self, texts):
        """Build vocabulary from a list of texts."""
        all_words = Counter()
        for text in texts:
            words = self.tokenize(text)
            all_words.update(words)

        # Add special tokens first
        for token in self.special_tokens:
            self.word2idx[token] = self.vocab_size
            self.idx2word[self.vocab_size] = token
            self.vocab_size += 1

        # Add other words
        for word, _ in all_words.most_common():
            if word not in self.word2idx:
                self.word2idx[word] = self.vocab_size
                self.idx2word[self.vocab_size] = word
                self.vocab_size += 1

    def tokenize(self, text):
        """Tokenize text into words and punctuation."""
        return re.findall(r'\w+|[^\w\s]', text.lower())

    def encode(self, text):
        """Convert text to token IDs."""
        tokens = self.tokenize(text)
        return [self.word2idx.get(token, self.word2idx["<unk>"]) for token in tokens]

    def decode(self, token_ids):
        """Convert token IDs back to text."""
        tokens = [self.idx2word.get(idx, "<unk>") for idx in token_ids]
        return " ".join(tokens)

    def save(self, filepath):
        """Save tokenizer to a JSON file."""
        tokenizer_dict = {
            "word2idx": self.word2idx,
            "idx2word": {int(k): v for k, v in self.idx2word.items()},
            "vocab_size": self.vocab_size,
            "special_tokens": self.special_tokens
        }
        with open(filepath, "w") as f:
            json.dump(tokenizer_dict, f)

    def load(self, filepath):
        """Load tokenizer from a JSON file."""
        with open(filepath, "r") as f:
            tokenizer_dict = json.load(f)
        self.word2idx = tokenizer_dict["word2idx"]
        self.idx2word = {int(k): v for k, v in tokenizer_dict["idx2word"].items()}
        self.vocab_size = tokenizer_dict["vocab_size"]
        self.special_tokens = tokenizer_dict["special_tokens"]

# Build the tokenizer
tokenizer = CustomTokenizer()
all_texts = [example['sourceString'] for example in combined_data] + [example['targetString'] for example in combined_data]
tokenizer.build_vocab(all_texts)

# Save the tokenizer
tokenizer.save("tokenizer.json")

# Define special token IDs
PAD_TOKEN_ID = tokenizer.word2idx["<pad>"]
SOS_TOKEN_ID = tokenizer.word2idx["<sos>"]
EOS_TOKEN_ID = tokenizer.word2idx["<eos>"]
UNK_TOKEN_ID = tokenizer.word2idx["<unk>"]

# ============================
# 🔹 5. Preprocess Data (Tokenization)
# ============================
def preprocess_data(example):
    source_text = example['sourceString']
    target_text = example['targetString']
    source_tokens = [SOS_TOKEN_ID] + tokenizer.encode(source_text) + [EOS_TOKEN_ID]  # Add <sos> and <eos>
    target_tokens = [SOS_TOKEN_ID] + tokenizer.encode(target_text) + [EOS_TOKEN_ID]  # Add <sos> and <eos>
    return {'source': source_tokens, 'target': target_tokens}

# Preprocess the combined dataset
combined_data = combined_data.map(preprocess_data)

# ============================
# 🔹 6. Create DataLoader
# ============================
class TranslationDataset(data.Dataset):
    def __init__(self, dataset):
        self.dataset = dataset

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        return torch.tensor(self.dataset[idx]['source']), torch.tensor(self.dataset[idx]['target'])

# Collate Function for Padding
def collate_fn(batch):
    sources, targets = zip(*batch)
    sources_padded = pad_sequence(sources, batch_first=True, padding_value=PAD_TOKEN_ID)
    targets_padded = pad_sequence(targets, batch_first=True, padding_value=PAD_TOKEN_ID)
    return sources_padded, targets_padded

# Create dataset and dataloader
dataset = TranslationDataset(combined_data)
dataloader = DataLoader(dataset, batch_size=16, shuffle=True, collate_fn=collate_fn)

# ============================
# 🔹 7. Transformer Model
# ============================
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.pe = pe.unsqueeze(0)

    def forward(self, x):
        return x + self.pe[:, :x.size(1), :].to(x.device)

class Transformer(nn.Module):
    def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size, target_vocab_size):
        super(Transformer, self).__init__()
        self.encoder_embedding = nn.Embedding(input_vocab_size, d_model)
        self.decoder_embedding = nn.Embedding(target_vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model)

        self.transformer = nn.Transformer(d_model, num_heads, num_layers, num_layers, dff, batch_first=True)
        self.fc_out = nn.Linear(d_model, target_vocab_size)

    def forward(self, src, tgt):
        src_emb = self.positional_encoding(self.encoder_embedding(src))
        tgt_emb = self.positional_encoding(self.decoder_embedding(tgt))

        src_padding_mask = (src == PAD_TOKEN_ID)
        tgt_padding_mask = (tgt == PAD_TOKEN_ID)
        tgt_mask = nn.Transformer.generate_square_subsequent_mask(tgt.size(1)).to(tgt.device)

        transformer_out = self.transformer(
            src_emb, tgt_emb,
            src_key_padding_mask=src_padding_mask,
            tgt_key_padding_mask=tgt_padding_mask,
            tgt_mask=tgt_mask
        )
        return self.fc_out(transformer_out)

# ============================
# 🔹 8. Define Model & Training Setup
# ============================
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = Transformer(
    num_layers=6, d_model=256, num_heads=8, dff=1024,
    input_vocab_size=tokenizer.vocab_size, target_vocab_size=tokenizer.vocab_size
).to(device)

criterion = nn.CrossEntropyLoss(ignore_index=PAD_TOKEN_ID)
optimizer = optim.Adam(model.parameters(), lr=1e-4, betas=(0.9, 0.98), eps=1e-9)

# ============================
# 🔹 9. Training Loop
# ============================
def train_epoch(model, dataloader, optimizer, criterion):
    model.train()
    total_loss = 0
    for src, tgt in dataloader:
        src, tgt = src.to(device), tgt.to(device)
        optimizer.zero_grad()
        output = model(src, tgt[:, :-1])
        loss = criterion(output.view(-1, output.shape[-1]), tgt[:, 1:].reshape(-1))
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(dataloader)

num_epochs = 25  # Adjust as needed
for epoch in range(num_epochs):
    loss = train_epoch(model, dataloader, optimizer, criterion)
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {loss:.4f}")

# Save Model
torch.save(model.state_dict(), "transformer_ara_eng_custom_tokenizer.pth")

# ============================
# 🔹 10. Translation Function
# ============================
def translate(model, source_text, max_len=100):
    model.eval()
    with torch.no_grad():
        source_tokens = [SOS_TOKEN_ID] + tokenizer.encode(source_text) + [EOS_TOKEN_ID]
        source = torch.tensor([source_tokens]).to(device)
        target = torch.tensor([[SOS_TOKEN_ID]]).to(device)

        for _ in range(max_len):
            output = model(source, target)
            next_token = output[:, -1, :].argmax(dim=-1).item()
            if next_token == EOS_TOKEN_ID:
                break
            target = torch.cat([target, torch.tensor([[next_token]]).to(device)], dim=1)

        translated_text = tokenizer.decode(target[0].tolist())
        translated_text = translated_text.replace("<sos>", "").replace("<eos>", "").strip()
        return translated_text

# Load and test the model
model.load_state_dict(torch.load("transformer_ara_eng_custom_tokenizer.pth", map_location=device))
model.eval()

# Example translation
source_text = "مرحبا بالعالم"
translated_text = translate(model, source_text)
print(f"Source: {source_text}")
print(f"Translated: {translated_text}")

Epoch 1/25, Loss: 4.7012
Epoch 2/25, Loss: 3.8479
Epoch 3/25, Loss: 3.6026
Epoch 4/25, Loss: 3.4234
Epoch 5/25, Loss: 3.2654
Epoch 6/25, Loss: 3.1249
Epoch 7/25, Loss: 2.9963
Epoch 8/25, Loss: 2.8710
Epoch 9/25, Loss: 2.7541
Epoch 10/25, Loss: 2.6421
Epoch 11/25, Loss: 2.5335
Epoch 12/25, Loss: 2.4341
Epoch 13/25, Loss: 2.3351
Epoch 14/25, Loss: 2.2385
Epoch 15/25, Loss: 2.1523
Epoch 16/25, Loss: 2.0627
Epoch 17/25, Loss: 1.9861
Epoch 18/25, Loss: 1.9121
Epoch 19/25, Loss: 1.8364
Epoch 20/25, Loss: 1.7673
Epoch 21/25, Loss: 1.7031
Epoch 22/25, Loss: 1.6413
Epoch 23/25, Loss: 1.5811
Epoch 24/25, Loss: 1.5270
Epoch 25/25, Loss: 1.4761


  model.load_state_dict(torch.load("transformer_ara_eng_custom_tokenizer.pth", map_location=device))
  output = torch._nested_tensor_from_mask(


Source: مرحبا بالعالم
Translated: correct the windows , if you are all children .


In [None]:
!pip install datasets

Collecting datasets
  Downloading datasets-3.3.2-py3-none-any.whl.metadata (19 kB)
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from datasets)
  Downloading xxhash-3.5.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess<0.70.17 (from datasets)
  Downloading multiprocess-0.70.16-py311-none-any.whl.metadata (7.2 kB)
Downloading datasets-3.3.2-py3-none-any.whl (485 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m485.4/485.4 kB[0m [31m15.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dill-0.3.8-py3-none-any.whl (116 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m116.3/116.3 kB[0m [31m10.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading multiprocess-0.70.16-py311-none-any.whl (143 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m143.5/143.5 kB[0m [31m11.3 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading

In [None]:
def translate(model, source_text, max_len=100):
    model.eval()
    with torch.no_grad():
        source_tokens = [SOS_TOKEN_ID] + tokenizer.encode(source_text) + [EOS_TOKEN_ID]
        source = torch.tensor([source_tokens]).to(device)
        target = torch.tensor([[SOS_TOKEN_ID]]).to(device)

        for _ in range(max_len):
            output = model(source, target)
            next_token = output[:, -1, :].argmax(dim=-1).item()
            if next_token == EOS_TOKEN_ID:
                break
            target = torch.cat([target, torch.tensor([[next_token]]).to(device)], dim=1)

        translated_text = tokenizer.decode(target[0].tolist())
        translated_text = translated_text.replace("<sos>", "").replace("<eos>", "").strip()
        return translated_text

# Load and test the model
model.load_state_dict(torch.load("transformer_ara_eng_custom_tokenizer.pth", map_location=device))
model.eval()

# Example translation
source_text = "الطقس جميل اليوم"
translated_text = translate(model, source_text)
print(f"Source: {source_text}")
print(f"Translated: {translated_text}")

  model.load_state_dict(torch.load("transformer_ara_eng_custom_tokenizer.pth", map_location=device))


Source: الطقس جميل اليوم
Translated: the weather is beautiful day .
