In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.nn.utils.rnn import pad_sequence

from torchtext.vocab import build_vocab_from_iterator

from torch.utils.data import Dataset, DataLoader, random_split

import spacy
spacy_en = spacy.load("en_core_web_sm")



### Modelling

In [None]:
class Encoder(nn.Module):
    def __init__(self, input_dim, emb_dim, hidden_dim, n_layers, dropout):
        super().__init__()

        self.embedding = nn.Embedding(input_dim, emb_dim)
        self.rnn = nn.LSTM(emb_dim, hidden_dim, n_layers, dropout=dropout)
        self.dropout = nn.Dropout(dropout)

    def forward(self, src):
        # src: [src_len, batch_size]
        embedded = self.dropout(self.embedding(src))
        # embedded: [src_len, batch_size, emb_dim]
        outputs, (hidden, cell) = self.rnn(embedded)
        # outputs: [src_len, batch_size, hidden_dim * n_directions]
        # hidden, cell: [n_layers, batch_size, hidden_dim]
        return hidden, cell

In [None]:
class Decoder(nn.Module):
    def __init__(self, output_dim, emb_dim, hidden_dim, n_layers, dropout):
        super().__init__()

        self.embedding = nn.Embedding(output_dim, emb_dim)
        self.rnn = nn.LSTM(emb_dim, hidden_dim, n_layers, dropout=dropout)
        self.fc_out = nn.Linear(hidden_dim, output_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, input, hidden, cell):
        # input: [1, batch_size] (we're processing one time step at a time)
        input = input.unsqueeze(0)
        # embedded: [1, batch_size, emb_dim]
        embedded = self.dropout(self.embedding(input))
        output, (hidden, cell) = self.rnn(embedded, (hidden, cell))
        # output: [1, batch_size, hidden_dim]
        prediction = self.fc_out(output.squeeze(0))
        # prediction: [batch_size, output_dim]
        return prediction, hidden, cell


In [None]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super().__init__()

        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        # src: [src_len, batch_size]
        # trg: [trg_len, batch_size]
        trg_len = trg.shape[0]
        batch_size = trg.shape[1]
        output_dim = self.decoder.fc_out.out_features
        outputs = torch.zeros(trg_len, batch_size, output_dim).to(self.device)

        hidden, cell = self.encoder(src)

        input = trg[0, :]

        for t in range(1, trg_len):
            output, hidden, cell = self.decoder(input, hidden, cell)
            outputs[t] = output
            top1 = output.argmax(1)
            input = trg[t] if torch.rand(1).item() < teacher_forcing_ratio else top1

        return outputs

### Data Preperation

In [None]:
def tokenizer(text):
    return [tok.text for tok in spacy_en.tokenizer(text)]

In [None]:
txt = 'In reply, Pakistan got off to a solid start.'
tokens = [tok.text for tok in spacy_en.tokenizer(txt)]
src_voc = build_vocab_from_iterator(
            [tokens],
            specials=['<pad>', '<sos>', '<eos>', '<unk>']
        )
src_voc.set_default_index(src_voc['<unk>'])
src_voc['<sos>']

1

In [None]:
class TranslationDataset(Dataset):
    def __init__(self, data, tokenizer):
        self.data = data
        self.tokenizer = tokenizer

        self.src_vocab = build_vocab_from_iterator(
            (self.tokenizer(src) for src, trg in data),
            specials=['<pad>', '<sos>', '<eos>', '<unk>']
        )

        self.trg_vocab = build_vocab_from_iterator(
            (self.tokenizer(trg) for src, trg in data),
            specials=['<pad>', '<sos>', '<eos>', '<unk>']
        )
        self.src_vocab.set_default_index(self.src_vocab['<unk>'])
        self.trg_vocab.set_default_index(self.trg_vocab['<unk>'])

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        src, trg = self.data[index]
        src_tokens = [self.src_vocab['<sos>']] + [self.src_vocab[token] for token in self.tokenizer(src)] + [self.src_vocab['<eos>']]
        trg_tokens = [self.trg_vocab['<sos>']] + [self.trg_vocab[token] for token in self.tokenizer(trg)] + [self.trg_vocab['<eos>']]
        return torch.tensor(src_tokens), torch.tensor(trg_tokens)


In [None]:
def _load(file):
    with open(file, 'r') as handle:
        return [line.strip() for line in handle.readlines()]

In [None]:
src_path = 'en-hi/train.en'
tar_path = 'en-hi/train.hi'

english = _load(src_path)[:10000]
hindi = _load(tar_path)[:10000]

data = list(zip(english, hindi))

In [None]:
dataset = TranslationDataset(data, tokenizer)

def collate_fn(batch):
    src_batch, trg_batch = zip(*batch)
    src_batch = pad_sequence(src_batch, padding_value=dataset.src_vocab['<pad>'])
    trg_batch = pad_sequence(trg_batch, padding_value=dataset.trg_vocab['<pad>'])
    return src_batch, trg_batch


In [None]:
dataloader = DataLoader(dataset, batch_size=8, collate_fn=collate_fn)

In [None]:
input_dim = len(dataset.src_vocab)
output_dim = len(dataset.trg_vocab)
embed_dim = 256
hidden_dim = 512
n_layers = 2
dropout = 0.5
n_epochs = 100
clip = 1
batch_size = 8

device = 'cuda' if torch.cuda.is_available() else torch.device('cpu')

encoder = Encoder(input_dim, embed_dim, hidden_dim, n_layers, dropout)
decoder = Decoder(output_dim, embed_dim, hidden_dim, n_layers, dropout)
model = Seq2Seq(encoder, decoder, device).to(device)

In [None]:
device

'cuda'

In [None]:
optimizer = optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss(ignore_index=dataset.trg_vocab['<pad>'])

In [None]:
# Define the proportion for training and validation splits
train_size = int(0.8 * len(dataset))  # 80% for training
val_size = len(dataset) - train_size   # 20% for validation

# Split the dataset
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

# Create data loaders for both sets
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
val_dataloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)

In [None]:
def evaluate_model(model, dataloader, criterion, device):
    model.eval()
    val_loss = 0
    correct_predictions = 0
    total_predictions = 0

    with torch.no_grad():
        for src, trg in dataloader:
            src = src.to(device)
            trg = trg.to(device)

            output = model(src, trg)
            output = output.view(-1, output.shape[-1])
            trg = trg.transpose(0, 1).contiguous().view(-1)

            # Calculate loss
            loss = criterion(output, trg)
            val_loss += loss.item()

            # Get predictions and mask padding tokens
            pred_tokens = output.argmax(dim=1)
            non_pad_mask = trg != dataset.trg_vocab['<pad>']

            # Calculate accuracy
            correct_predictions += (pred_tokens[non_pad_mask] == trg[non_pad_mask]).sum().item()
            total_predictions += non_pad_mask.sum().item()

    avg_loss = val_loss / len(dataloader)
    avg_accuracy = correct_predictions / total_predictions * 100

    return avg_loss, avg_accuracy

In [None]:
# Training loop with validation
for epoch in range(n_epochs):
    model.train()
    epoch_loss = 0
    correct_predictions = 0
    total_predictions = 0

    for idx, (src, trg) in enumerate(train_dataloader):
        src = src.to(device)
        trg = trg.to(device)

        optimizer.zero_grad()

        # Forward pass
        output = model(src, trg)
        output = output.view(-1, output.shape[-1])
        trg = trg.transpose(0, 1).contiguous().view(-1)

        # Calculate loss
        loss = criterion(output, trg)
        loss.backward()
        nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()

        # Update epoch loss
        epoch_loss += loss.item()

        # Calculate accuracy
        pred = output.argmax(dim=1)
        non_pad_mask = trg != dataset.trg_vocab['<pad>']
        correct_predictions += (pred[non_pad_mask] == trg[non_pad_mask]).sum().item()
        total_predictions += non_pad_mask.sum().item()

        if idx % 300 == 0 and idx > 0:
            avg_loss = epoch_loss / (idx + 1)
            avg_accuracy = correct_predictions / total_predictions * 100
            print(f'Epoch [{epoch + 1}/{n_epochs}], Step [{idx}], Loss: {avg_loss:.4f}, Accuracy: {avg_accuracy:.2f}%')


    # End of epoch training stats
    avg_loss = epoch_loss / len(train_dataloader)
    avg_accuracy = correct_predictions / total_predictions * 100
    print(f'Epoch [{epoch + 1}/{n_epochs}] completed, Average Loss: {avg_loss:.4f}, Average Accuracy: {avg_accuracy:.2f}%')

    # Validation
    val_loss, val_accuracy = evaluate_model(model, val_dataloader, criterion, device)
    print(f'Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.2f}%')

Epoch [1/100], Step [300], Loss: 7.4271, Accuracy: 4.31%
Epoch [1/100], Step [600], Loss: 7.3259, Accuracy: 4.35%
Epoch [1/100], Step [900], Loss: 7.2869, Accuracy: 4.39%
Epoch [1/100] completed, Average Loss: 7.2758, Average Accuracy: 4.41%
Validation Loss: 7.1800, Validation Accuracy: 4.53%
Validation Loss: 7.1800, Validation Accuracy: 4.53%
