<a href="https://colab.research.google.com/github/jayan36-asp/OAuthAPI/blob/master/NMTSITS.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install datasets sentencepiece

from datasets import load_dataset

# Load the OPUS dataset (English-French)
dataset = load_dataset("opus_books", "en-fr")

# Print a sample
print(dataset["train"][0])




The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


{'id': '0', 'translation': {'en': 'The Wanderer', 'fr': 'Le grand Meaulnes'}}


In [None]:
import sentencepiece as spm

# Save dataset to text files
with open("train.en", "w", encoding="utf-8") as f_en, open("train.fr", "w", encoding="utf-8") as f_fr:
    for pair in dataset["train"]:
        f_en.write(pair["translation"]["en"] + "\n")
        f_fr.write(pair["translation"]["fr"] + "\n")

# Train SentencePiece tokenizer
spm.SentencePieceTrainer.train(input="train.en", model_prefix="spm_en", vocab_size=32000)
spm.SentencePieceTrainer.train(input="train.fr", model_prefix="spm_fr", vocab_size=32000)

# Load trained tokenizer
sp_en = spm.SentencePieceProcessor(model_file="spm_en.model")
sp_fr = spm.SentencePieceProcessor(model_file="spm_fr.model")

# Example tokenization
print(sp_en.encode("This is a test sentence.", out_type=str))
print(sp_fr.encode("Ceci est une phrase de test.", out_type=str))


['▁This', '▁is', '▁a', '▁test', '▁sentence', '.']
['▁Ceci', '▁est', '▁une', '▁phrase', '▁de', '▁test', '.']


In [None]:
import torch
from torch.utils.data import Dataset, DataLoader

class TranslationDataset(Dataset):
    def __init__(self, dataset, src_tokenizer, trg_tokenizer, max_len=50):
        self.data = dataset
        self.src_tokenizer = src_tokenizer
        self.trg_tokenizer = trg_tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        src_text = self.data[idx]["translation"]["en"]
        trg_text = self.data[idx]["translation"]["fr"]

        # Tokenize & add special tokens
        src_tokens = [self.src_tokenizer.bos_id()] + self.src_tokenizer.encode(src_text)[:self.max_len] + [self.src_tokenizer.eos_id()]
        trg_tokens = [self.trg_tokenizer.bos_id()] + self.trg_tokenizer.encode(trg_text)[:self.max_len] + [self.trg_tokenizer.eos_id()]

        return torch.tensor(src_tokens), torch.tensor(trg_tokens)

# Load dataset
train_data = dataset["train"]

# Create dataset instance
train_dataset = TranslationDataset(train_data, sp_en, sp_fr)


In [None]:
from torch.nn.utils.rnn import pad_sequence

def collate_fn(batch):
    src_batch, trg_batch = zip(*batch)

    # Pad sequences to the same length
    src_batch = pad_sequence(src_batch, batch_first=True, padding_value=0)
    trg_batch = pad_sequence(trg_batch, batch_first=True, padding_value=0)

    return src_batch, trg_batch

# Create DataLoader
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, collate_fn=collate_fn)


In [None]:
import torch.nn as nn
import torch

class TransformerEncoder(nn.Module):
    def __init__(self, input_dim, emb_dim, n_heads, ff_dim, num_layers, dropout):
        super().__init__()

        self.embedding = nn.Embedding(input_dim, emb_dim)
        self.pos_encoding = nn.Parameter(torch.zeros(1, 500, emb_dim))  # Max length = 500
        self.encoder_layers = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(d_model=emb_dim, nhead=n_heads, dim_feedforward=ff_dim, dropout=dropout),
            num_layers
        )
        self.dropout = nn.Dropout(dropout)

    def forward(self, src):
        src_emb = self.dropout(self.embedding(src) + self.pos_encoding[:, :src.size(1), :])
        return self.encoder_layers(src_emb)


In [None]:
import torch.nn.functional as F

class TransformerDecoder(nn.Module):
    def __init__(self, output_dim, emb_dim, n_heads, ff_dim, num_layers, dropout):
        super().__init__()

        self.embedding = nn.Embedding(output_dim, emb_dim)
        self.pos_encoding = nn.Parameter(torch.zeros(1, 500, emb_dim))  # Max length = 500
        self.decoder_layers = nn.TransformerDecoder(
            nn.TransformerDecoderLayer(d_model=emb_dim, nhead=n_heads, dim_feedforward=ff_dim, dropout=dropout, batch_first=True),
            num_layers
        )
        self.fc_out = nn.Linear(emb_dim, output_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, trg, enc_output, tgt_mask=None):
        trg_emb = self.dropout(self.embedding(trg) + self.pos_encoding[:, :trg.size(1), :])

        # 🔥 Ensure tgt_mask is correctly shaped
        if tgt_mask is None:
            seq_len = trg.shape[1]  # Get sequence length
            tgt_mask = nn.Transformer.generate_square_subsequent_mask(seq_len).to(trg.device)  # Correct mask size

        output = self.decoder_layers(trg_emb, enc_output, tgt_mask)
        return self.fc_out(output)


In [None]:
class TransformerSeq2Seq(nn.Module):
    def __init__(self, input_dim, output_dim, emb_dim, n_heads, ff_dim, num_layers, dropout):
        super().__init__()
        self.encoder = TransformerEncoder(input_dim, emb_dim, n_heads, ff_dim, num_layers, dropout)
        self.decoder = TransformerDecoder(output_dim, emb_dim, n_heads, ff_dim, num_layers, dropout)

    def forward(self, src, trg, tgt_mask):
        enc_output = self.encoder(src)
        return self.decoder(trg, enc_output, tgt_mask)


In [None]:
# Tokenize the dataset using the trained SentencePiece models
src_train = [sp_en.encode(pair["translation"]["en"]) for pair in dataset["train"]]
trg_train = [sp_fr.encode(pair["translation"]["fr"]) for pair in dataset["train"]]

# Print an example to verify
print(f"Example tokenized source: {src_train[0]}")
print(f"Example tokenized target: {trg_train[0]}")


Example tokenized source: [46, 25539, 293]
Example tokenized target: [90, 175, 851]


In [None]:
from torch.utils.data import Dataset, DataLoader

class TranslationDataset(Dataset):
    def __init__(self, src, trg):
        self.src = src
        self.trg = trg

    def __len__(self):
        return len(self.src)

    def __getitem__(self, idx):
        return self.src[idx], self.trg[idx]


In [None]:
train_dataset = TranslationDataset(src_train, trg_train)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, collate_fn=lambda x: tuple(zip(*x)))


In [None]:
if "test" not in dataset:
    dataset = dataset["train"].train_test_split(test_size=0.1)  # 10% for testing
    dataset["validation"] = dataset["test"].train_test_split(test_size=0.5)["train"]  # Half for validation
print(dataset.keys())


dict_keys(['train', 'test', 'validation'])


In [None]:
import torch.optim as optim

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Define model parameters
INPUT_DIM = 32000  # Based on SentencePiece vocab size
OUTPUT_DIM = 32000
EMB_DIM = 256
N_HEADS = 8
FF_DIM = 512
NUM_LAYERS = 3
DROPOUT = 0.1

# Initialize the model
model = TransformerSeq2Seq(INPUT_DIM, OUTPUT_DIM, EMB_DIM, N_HEADS, FF_DIM, NUM_LAYERS, DROPOUT).to(device)

# Define optimizer & loss function
optimizer = optim.Adam(model.parameters(), lr=0.0005)
criterion = nn.CrossEntropyLoss(ignore_index=0)  # Ignore padding token




In [None]:
def train(model, data_loader, optimizer, criterion):
    model.train()
    epoch_loss = 0

    for src, trg in data_loader:
        src = torch.nn.utils.rnn.pad_sequence([torch.tensor(s) for s in src], batch_first=True, padding_value=0).to(device)
        trg = torch.nn.utils.rnn.pad_sequence([torch.tensor(t) for t in trg], batch_first=True, padding_value=0).to(device)

        optimizer.zero_grad()

        tgt_input = trg[:, :-1]  # Teacher forcing: Remove last token
        tgt_output = trg[:, 1:]  # Expected output: Shift left

        seq_len = tgt_input.shape[1]  # Get actual sequence length

        # 🔹 Dynamically generate correct attn_mask
        tgt_mask = torch.nn.Transformer.generate_square_subsequent_mask(seq_len).to(device)

        # 🔹 Ensure the mask shape is dynamically adjusted
        if tgt_mask.shape != (seq_len, seq_len):
            tgt_mask = tgt_mask[:seq_len, :seq_len]

        output = model(src, tgt_input, tgt_mask)

        loss = criterion(output.reshape(-1, OUTPUT_DIM), tgt_output.reshape(-1))
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

    return epoch_loss / len(data_loader)


In [None]:
N_EPOCHS = 1

for epoch in range(N_EPOCHS):
    train_loss = train(model, train_loader, optimizer, criterion)
    print(f"Epoch {epoch+1}, Train Loss: {train_loss:.4f}")
