<a href="https://colab.research.google.com/github/sankarvinayak/DL-assignment-3/blob/main/DL_Assignment_3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# DA6401 Assignment 3
Use recurrent neural networks to build a transliteration system.

# Instructions
- The goal of this assignment is fourfold: (i) learn how to model sequence to sequence learning problems using Recurrent Neural Networks (ii) compare different cells such as vanilla RNN, LSTM and GRU (iii) understand how attention networks overcome the limitations of vanilla seq2seq models (iv) visualise the interactions between different components in a RNN based model.
- We strongly recommend that you work on this assignment in a team of size 2. Both the members
of the team are expected to work together (in a subsequent viva both members will be expected to answer questions, explain the code, etc).
- Collaborations and discussions with other groups are strictly prohibited.
- You must use Python (numpy and pandas) for your implementation.
- You can use any and all packages from keras, pytorch, tensorflow
- You can run the code in a jupyter notebook on colab by enabling GPUs.
- You have to generate the report in the same format as shown below using wandb.ai. You can start by cloning this report using the clone option above. Most of the plots that we have asked for below can be (automatically) generated using the apis provided by wandb.ai. You will upload a link to this report on gradescope.
- You also need to provide a link to your github code as shown below. Follow good software engineering practices and set up a github repo for the project on Day 1. Please do not write all code on your local machine and push everything to github on the last day. The commits in github should reflect how the code has evolved during the course of the assignment.
- You have to check moodle regularly for updates regarding the assignment.



# Problem Statement

In this assignment you will experiment with the [Dakshina dataset](https://github.com/google-research-datasets/dakshina) released by Google. This dataset contains pairs of the following form:

$x$.      $y$

ajanabee अजनबी.

i.e., a word in the native script and its corresponding transliteration in the Latin script (the way we type while chatting with our friends on WhatsApp etc). Given many such $(x_i, y_i)_{i=1}^n$ pairs your goal is to train a model $y = \hat{f}(x)$ which takes as input a romanized string (ghar) and produces the corresponding word in Devanagari (घर).

As you would realise this is the problem of mapping a sequence of characters in one language to a sequence of characters in another language. Notice that this is a scaled down version of the problem of translation where the goal is to translate a sequence of **words** in one language to a sequence of words in another language (as opposed to sequence of **characters** here).

Read these blogs to understand how to build neural sequence to sequence models: [blog1](https://keras.io/examples/nlp/lstm_seq2seq/), [blog2](https://machinelearningmastery.com/define-encoder-decoder-sequence-sequence-model-neural-machine-translation-keras/)




In [1]:
!wget https://storage.googleapis.com/gresearch/dakshina/dakshina_dataset_v1.0.tar
!tar -xf dakshina_dataset_v1.0.tar
!cp -r dakshina_dataset_v1.0/ml .
!rm -rf dakshina_dataset_v1.0
!rm -r dakshina_dataset_v1.0.tar

--2025-05-16 04:26:24--  https://storage.googleapis.com/gresearch/dakshina/dakshina_dataset_v1.0.tar
Resolving storage.googleapis.com (storage.googleapis.com)... 142.250.145.207, 74.125.128.207, 74.125.143.207, ...
Connecting to storage.googleapis.com (storage.googleapis.com)|142.250.145.207|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2008340480 (1.9G) [application/x-tar]
Saving to: ‘dakshina_dataset_v1.0.tar’


2025-05-16 04:27:12 (40.4 MB/s) - ‘dakshina_dataset_v1.0.tar’ saved [2008340480/2008340480]



In [2]:
import pandas as pd

# Paths to your data files (adjust if needed)
path = "/content/ml/lexicons/ml.translit.sampled.train.tsv"
path_val = "/content/ml/lexicons/ml.translit.sampled.dev.tsv"
path_test = "/content/ml/lexicons/ml.translit.sampled.test.tsv"

# Read the files
df = pd.read_csv(path, sep='\t', header=None)
df_val = pd.read_csv(path_val, sep='\t', header=None)
df_test = pd.read_csv(path_test, sep='\t', header=None)

# Split into native (Malayalam) and romanized (English) columns
malayalam_words = df[0]
english_words = df[1]

malayalam_words_val = df_val[0]
english_words_val = df_val[1]

malayalam_words_test = df_test[0]
english_words_test = df_test[1]



In [None]:
malayalam_words,english_words

(0             അം
 1            അംഗ
 2            അംഗ
 3           അംഗം
 4           അംഗം
           ...   
 58377       ഹൗസ്
 58378       ഹർജി
 58379       ഹർജി
 58380    ഹർജിയിൽ
 58381    ഹർജിയിൽ
 Name: 0, Length: 58382, dtype: object,
 0              am
 1            amga
 2            anga
 3           amgam
 4           angam
            ...   
 58377       house
 58378       harje
 58379       harji
 58380    harjeyil
 58381    harjiyil
 Name: 1, Length: 58382, dtype: object)

In [None]:

english_words = english_words.dropna()
malayalam_words = malayalam_words.dropna()

english_words = english_words.astype(str)
malayalam_words = malayalam_words.astype(str)

english_chars = sorted(set("".join(english_words)))
malayalam_chars = sorted(set("".join(malayalam_words)))

max_len_eng = max(len(w) for w in pd.concat([english_words, english_words_val, english_words_test]).dropna().astype(str))
max_len_mal = max(len(w) for w in pd.concat([malayalam_words, malayalam_words_val, malayalam_words_test]).dropna().astype(str))

print("English characters:", english_chars)
print("Malayalam characters:", malayalam_chars)
print("Max English word length:", max_len_eng)
print("Max Malayalam word length:", max_len_mal)


English characters: ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z']
Malayalam characters: ['ം', 'ഃ', 'അ', 'ആ', 'ഇ', 'ഈ', 'ഉ', 'ഊ', 'ഋ', 'എ', 'ഏ', 'ഐ', 'ഒ', 'ഓ', 'ഔ', 'ക', 'ഖ', 'ഗ', 'ഘ', 'ങ', 'ച', 'ഛ', 'ജ', 'ഝ', 'ഞ', 'ട', 'ഠ', 'ഡ', 'ഢ', 'ണ', 'ത', 'ഥ', 'ദ', 'ധ', 'ന', 'പ', 'ഫ', 'ബ', 'ഭ', 'മ', 'യ', 'ര', 'റ', 'ല', 'ള', 'ഴ', 'വ', 'ശ', 'ഷ', 'സ', 'ഹ', 'ാ', 'ി', 'ീ', 'ു', 'ൂ', 'ൃ', 'െ', 'േ', 'ൈ', 'ൊ', 'ോ', '്', 'ൗ', 'ൺ', 'ൻ', 'ർ', 'ൽ', 'ൾ', '\u200c']
Max English word length: 32
Max Malayalam word length: 31


In [None]:

longest_malayalam_word = max(malayalam_words, key=len)
print("Longest Malayalam word:", longest_malayalam_word)
print("Length:", len(longest_malayalam_word))


Longest Malayalam word: ചൂണ്ടിക്കാണിക്കപ്പെട്ടിട്ടുണ്ട്
Length: 31


In [None]:
def word2vec(word, lang):
    vec = []

    if lang == "english":
        start_token = len(english_chars) + 1
        vec.append(start_token)

        for char in word:
            if char in english_chars:
                vec.append(english_chars.index(char) + 1)

        while len(vec) < max_len_eng + 1:  # +1 for start token
            vec.append(0)

        vec.append(0)  # end token

    elif lang == "malayalam":
        start_token = len(malayalam_chars) + 1
        vec.append(start_token)

        for char in word:
            if char in malayalam_chars:
                vec.append(malayalam_chars.index(char) + 1)

        while len(vec) < max_len_mal + 1:
            vec.append(0)

        vec.append(0)

    return vec


In [None]:
vec = word2vec(malayalam_words[10], "malayalam")
print("Malayalam word:", malayalam_words[50000])
print("Tokenized vector:", vec)


Malayalam word: വീണ്
Tokenized vector: [71, 3, 1, 18, 20, 63, 20, 45, 52, 41, 53, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]


In [None]:
def ip_matrix_construct(words, lang):
    ans = []
    for word in words:
        ans.append(word2vec(word, lang))
    return ans


In [None]:
import torch
english_matrix = ip_matrix_construct(english_words.dropna().astype(str), "english")
malayalam_matrix = ip_matrix_construct(malayalam_words.dropna().astype(str), "malayalam")

# Convert to tensors
english_matrix = torch.tensor(english_matrix)
malayalam_matrix = torch.tensor(malayalam_matrix)

# For validation data
english_matrix_val = ip_matrix_construct(english_words_val.dropna().astype(str), "english")
malayalam_matrix_val = ip_matrix_construct(malayalam_words_val.dropna().astype(str), "malayalam")
english_matrix_val = torch.tensor(english_matrix_val)
malayalam_matrix_val = torch.tensor(malayalam_matrix_val)

# For test data
english_matrix_test = ip_matrix_construct(english_words_test.dropna().astype(str), "english")
malayalam_matrix_test = ip_matrix_construct(malayalam_words_test.dropna().astype(str), "malayalam")
english_matrix_test = torch.tensor(english_matrix_test)
malayalam_matrix_test = torch.tensor(malayalam_matrix_test)


In [None]:
from torch.utils.data import Dataset, DataLoader

class TransliterationDataset(Dataset):
    def __init__(self, src_tensor, trg_tensor):
        self.src = src_tensor
        self.trg = trg_tensor

    def __len__(self):
        return len(self.src)

    def __getitem__(self, idx):
        return self.src[idx], self.trg[idx]
train_dataset = TransliterationDataset(english_matrix, malayalam_matrix)
val_dataset = TransliterationDataset(english_matrix_val, malayalam_matrix_val)
test_dataset = TransliterationDataset(english_matrix_test, malayalam_matrix_test)



# Question 1 (15 Marks)
Build a RNN based seq2seq model which contains the following layers: (i) input layer for character embeddings (ii) one encoder RNN which sequentially encodes the input character sequence (Latin) (iii) one decoder RNN which takes the last state of the encoder as input and produces one output character at a time (Devanagari).

The code should be flexible such that the dimension of the input character embeddings, the hidden states of the encoders and decoders, the cell (RNN, LSTM, GRU) and the number of layers in the encoder and decoder can be changed.

(a) What is the total number of computations done by your network? (assume that the input embedding size is $m$, encoder and decoder have 1 layer each, the hidden cell state is $k$ for both the encoder and decoder, the length of the input and output sequence is the same, i.e., $T$, the size of the vocabulary is the same for the source and target language, i.e., $V$)

(b) What is the total number of parameters in your network? (assume that the input embedding size is $m$, encoder and decoder have 1 layer each, the hidden cell state is $k$ for both the encoder and decoder and the length of the input and output sequence is the same, i.e., $T$, the size of the vocabulary is the same for the source and target language, i.e., $V$)



# Question 2 (10 Marks)

You will now train your model using any one language from the [Dakshina dataset](https://github.com/google-research-datasets/dakshina) (I would suggest pick a language that you can read so that it is easy to analyse the errors). Use the standard train, dev, test set from the folder dakshina_dataset_v1.0/hi/lexicons/ (replace hi by the language of your choice)

Using the sweep feature in wandb find the best hyperparameter configuration. Here are some suggestions but you are free to decide which hyperparameters you want to explore

- input embedding size: 16, 32, 64, 256, ...
- number of encoder layers: 1, 2, 3
- number of decoder layers: 1, 2, 3
- hidden layer size: 16, 32, 64, 256, ...
- cell type: RNN, GRU, LSTM
- dropout: 20%, 30% (btw, where will you add dropout? you should read up a bit on this)
- beam search in decoder with different beam sizes:

Based on your sweep please paste the following plots which are automatically generated by wandb:
- accuracy v/s created plot (I would like to see the number of experiments you ran to get the best configuration).
- parallel co-ordinates plot
- correlation summary table (to see the correlation of each hyperparameter with the loss/accuracy)

Also write down the hyperparameters and their values that you sweeped over. Smart strategies to reduce the number of runs while still achieving a high accuracy would be appreciated. Write down any unique strategy that you tried for efficiently searching the hyperparameters.

In [None]:
import torch
import torch.nn as nn

class Encoder(nn.Module):
    def __init__(self, input_size, embedding_size, hidden_size, num_layers, dropout, cell_type='LSTM', bidirectional=False):
        super(Encoder, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.cell_type = cell_type
        self.bidirectional = bidirectional

        self.embedding = nn.Embedding(input_size, embedding_size)
        self.dropout = nn.Dropout(dropout)

        if cell_type == 'GRU':
            self.rnn = nn.GRU(embedding_size, hidden_size, num_layers, dropout=dropout if num_layers > 1 else 0, bidirectional=bidirectional)
        elif cell_type == 'RNN':
            self.rnn = nn.RNN(embedding_size, hidden_size, num_layers, dropout=dropout if num_layers > 1 else 0, bidirectional=bidirectional)
        else:  # LSTM by default
            self.rnn = nn.LSTM(embedding_size, hidden_size, num_layers, dropout=dropout if num_layers > 1 else 0, bidirectional=bidirectional)

    def forward(self, x):
        # x shape: (seq_len, batch)
        embedded = self.dropout(self.embedding(x))
        # embedded shape: (seq_len, batch, embedding_size)

        if self.cell_type == 'LSTM':
            outputs, (hidden, cell) = self.rnn(embedded)
            return outputs, (hidden, cell)
        else:
            outputs, hidden = self.rnn(embedded)
            return outputs, hidden
class Decoder(nn.Module):
    def __init__(self, output_size, embedding_size, hidden_size, num_layers, dropout, cell_type='LSTM'):
        super(Decoder, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.cell_type = cell_type

        self.embedding = nn.Embedding(output_size, embedding_size)
        self.dropout = nn.Dropout(dropout)

        if cell_type == 'GRU':
            self.rnn = nn.GRU(embedding_size, hidden_size, num_layers, dropout=dropout if num_layers > 1 else 0)
        elif cell_type == 'RNN':
            self.rnn = nn.RNN(embedding_size, hidden_size, num_layers, dropout=dropout if num_layers > 1 else 0)
        else:  # LSTM
            self.rnn = nn.LSTM(embedding_size, hidden_size, num_layers, dropout=dropout if num_layers > 1 else 0)

        self.fc_out = nn.Linear(hidden_size, output_size)

    def forward(self, x, hidden):
        # x shape: (batch), because one timestep input
        x = x.unsqueeze(0)  # now (1, batch)
        embedded = self.dropout(self.embedding(x))
        # embedded shape: (1, batch, embedding_size)

        if self.cell_type == 'LSTM':
            output, (hidden, cell) = self.rnn(embedded, hidden)
            prediction = self.fc_out(output.squeeze(0))
            return prediction, (hidden, cell)
        else:
            output, hidden = self.rnn(embedded, hidden)
            prediction = self.fc_out(output.squeeze(0))
            return prediction, hidden
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        # src shape: (src_len, batch)
        # trg shape: (trg_len, batch)

        batch_size = trg.shape[1]
        trg_len = trg.shape[0]
        trg_vocab_size = self.decoder.fc_out.out_features

        # tensor to store decoder outputs
        outputs = torch.zeros(trg_len, batch_size, trg_vocab_size).to(self.device)

        # encoder forward pass
        if self.encoder.cell_type == 'LSTM':
            encoder_outputs, (hidden, cell) = self.encoder(src)
        else:
            encoder_outputs, hidden = self.encoder(src)
            cell = None  # no cell in RNN/GRU

        # first input to the decoder is the <sos> token
        input = trg[0, :]

        for t in range(1, trg_len):
            if self.encoder.cell_type == 'LSTM':
                output, (hidden, cell) = self.decoder(input, (hidden, cell))
            else:
                output, hidden = self.decoder(input, hidden)

            outputs[t] = output

            teacher_force = torch.rand(1).item() < teacher_forcing_ratio
            top1 = output.argmax(1)

            input = trg[t] if teacher_force else top1

        return outputs
    def beam_search(self, src, sos_idx, eos_idx, max_len=30, beam_width=3):
        # src: (src_len, batch=1) - typically batch=1 for beam search inference
        self.eval()

        with torch.no_grad():
            # Encode source sentence
            if self.encoder.cell_type == 'LSTM':
                encoder_outputs, (hidden, cell) = self.encoder(src)
            else:
                encoder_outputs, hidden = self.encoder(src)
                cell = None

            # Initialize beam with sequences, scores, and hidden states
            sequences = [[ [sos_idx], 0.0, hidden, cell ]]  # list of [sequence, score, hidden, cell]

            for _ in range(max_len):
                all_candidates = []
                # Expand each sequence in the beam
                for seq, score, hidden_state, cell_state in sequences:
                    # If last token is EOS, add sequence as is
                    if seq[-1] == eos_idx:
                        all_candidates.append((seq, score, hidden_state, cell_state))
                        continue

                    input_token = torch.LongTensor([seq[-1]]).to(self.device)
                    if self.encoder.cell_type == 'LSTM':
                        output, (hidden_new, cell_new) = self.decoder(input_token, (hidden_state, cell_state))
                    else:
                        output, hidden_new = self.decoder(input_token, hidden_state)
                        cell_new = None

                    # Get log probabilities
                    log_probs = F.log_softmax(output, dim=1).squeeze(0)  # (vocab_size,)

                    # Get top beam_width tokens
                    top_log_probs, top_indices = torch.topk(log_probs, beam_width)

                    for i in range(beam_width):
                        candidate_seq = seq + [top_indices[i].item()]
                        candidate_score = score + top_log_probs[i].item()
                        all_candidates.append((candidate_seq, candidate_score, hidden_new, cell_new))

                # Order all candidates by score and select top beam_width
                ordered = sorted(all_candidates, key=lambda tup: tup[1], reverse=True)
                sequences = ordered[:beam_width]

                # Optional: break early if all sequences end with EOS
                if all(seq[-1] == eos_idx for seq, _, _, _ in sequences):
                    break

            # Return the highest scoring sequence
            best_seq = sequences[0][0]
            return best_seq


In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

INPUT_DIM = len(english_chars) + 2  # +2 for padding and possibly <sos>/<eos>
OUTPUT_DIM = len(malayalam_chars) + 2

ENC_EMB_DIM = 64
DEC_EMB_DIM = 64
HID_DIM = 128
ENC_LAYERS = 2
DEC_LAYERS = 2
ENC_DROPOUT = 0.3
DEC_DROPOUT = 0.3
CELL_TYPE ='LSTM' # 'LSTM'  or 'GRU' or 'RNN'

encoder = Encoder(INPUT_DIM, ENC_EMB_DIM, HID_DIM, ENC_LAYERS, ENC_DROPOUT, CELL_TYPE).to(device)
decoder = Decoder(OUTPUT_DIM, DEC_EMB_DIM, HID_DIM, DEC_LAYERS, DEC_DROPOUT, CELL_TYPE).to(device)

model = Seq2Seq(encoder, decoder, device).to(device)
import torch.optim as optim

PAD_IDX = 0  # assuming 0 is padding index in your tokenizer

criterion = nn.CrossEntropyLoss(ignore_index=PAD_IDX)
optimizer = optim.Adam(model.parameters(), lr=0.001)
BATCH_SIZE = 64

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE)
model = model.to(device)
optimizer = torch.optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss(ignore_index=0)  # padding token is 0
CLIP = 1


In [None]:
def train(model, iterator, optimizer, criterion, clip):
    model.train()
    epoch_loss = 0
    epoch_acc = 0
    total_tokens = 0

    for src, trg in iterator:
        src = src.transpose(0, 1).to(device)  # (seq_len, batch)
        trg = trg.transpose(0, 1).to(device)  # (seq_len, batch)

        optimizer.zero_grad()

        output = model(src, trg[:-1, :])  # (seq_len-1, batch, output_dim)

        output_dim = output.shape[-1]
        output = output.reshape(-1, output_dim)  # ( (seq_len-1)*batch, output_dim )
        trg_y = trg[1:, :].reshape(-1)           # ( (seq_len-1)*batch )

        loss = criterion(output, trg_y)
        loss.backward()

        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()

        epoch_loss += loss.item()

        # Calculate accuracy
        preds = output.argmax(1)                # predicted tokens
        non_pad_mask = trg_y != 0               # ignore padding
        correct = (preds == trg_y) & non_pad_mask

        epoch_acc += correct.sum().item()
        total_tokens += non_pad_mask.sum().item()

    accuracy = epoch_acc / total_tokens if total_tokens > 0 else 0
    return epoch_loss / len(iterator), accuracy
def evaluate(model, iterator, criterion):
    model.eval()

    epoch_loss = 0
    epoch_acc = 0
    total_tokens = 0

    with torch.no_grad():
        for src, trg in iterator:
            src = src.transpose(0, 1).to(model.device)  # (seq_len, batch)
            trg = trg.transpose(0, 1).to(model.device)

            output = model(src, trg, 0)  # turn off teacher forcing

            output_dim = output.shape[-1]
            output = output[1:].view(-1, output_dim)  # remove <sos>, flatten
            trg_y = trg[1:].reshape(-1)               # remove <sos>, flatten

            loss = criterion(output, trg_y)
            epoch_loss += loss.item()

            preds = output.argmax(1)
            non_pad_mask = trg_y != 0
            correct = (preds == trg_y) & non_pad_mask

            epoch_acc += correct.sum().item()
            total_tokens += non_pad_mask.sum().item()

    accuracy = epoch_acc / total_tokens if total_tokens > 0 else 0
    return epoch_loss / len(iterator), accuracy



In [None]:
train_loss, train_acc = train(model, train_loader, optimizer, criterion, CLIP)

print(f"Train Loss: {train_loss:.3f} | Train Acc: {train_acc:.2%}")
valid_loss, valid_acc = evaluate(model, val_loader, criterion)

print(f"Val. Loss: {valid_loss:.3f} | Val. Acc: {valid_acc:.2%}")

Train Loss: 1.756 | Train Acc: 53.05%
Val. Loss: 4.165 | Val. Acc: 13.70%


In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

INPUT_DIM = len(english_chars) + 2  # +2 for padding and special tokens <sos>/<eos>
OUTPUT_DIM = len(malayalam_chars) + 2

ENC_EMB_DIM = 256
DEC_EMB_DIM = 256
HID_DIM = 512
ENC_LAYERS = 5
DEC_LAYERS = 5
ENC_DROPOUT = 0.0
DEC_DROPOUT = 0.0
CELL_TYPE = 'GRU'
BIDIRECTIONAL = False
BATCH_SIZE = 256
CLIP = 1

# Create Encoder and Decoder

encoder = Encoder(INPUT_DIM, ENC_EMB_DIM, HID_DIM, ENC_LAYERS, ENC_DROPOUT, CELL_TYPE).to(device)
decoder = Decoder(OUTPUT_DIM, DEC_EMB_DIM, HID_DIM, DEC_LAYERS, DEC_DROPOUT, CELL_TYPE).to(device)
# Initialize Seq2Seq model with encoder and decoder
model = Seq2Seq(encoder, decoder, device).to(device)
import torch.optim as optim

PAD_IDX = 0  # assuming 0 is the padding index in your tokenizer

criterion = nn.CrossEntropyLoss(ignore_index=PAD_IDX)
optimizer = optim.Adam(model.parameters(), lr=0.001)

from torch.utils.data import DataLoader

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE)


In [None]:
epochs = 19

for i in range(epochs):
    print(f"Epoch: {i+1}/{epochs}")

    train_loss, train_acc = train(model, train_loader, optimizer, criterion, CLIP)
    print(f"Train Loss: {train_loss:.3f} | Train Acc: {train_acc:.2%}")

    valid_loss, valid_acc = evaluate(model, val_loader, criterion)
    print(f"Val. Loss: {valid_loss:.3f} | Val. Acc: {valid_acc:.2%}")


Epoch: 1/19


In [5]:
%%writefile train.py
import argparse
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence
import matplotlib.pyplot as plt
import random
import time
import argparse
from tqdm import tqdm


class TransliterationDataset(Dataset):
    def __init__(self, malayalam_words, english_words, ml_char_to_idx, en_char_to_idx):
        self.malayalam_words = malayalam_words
        self.english_words = english_words
        self.ml_char_to_idx = ml_char_to_idx
        self.en_char_to_idx = en_char_to_idx

    def __len__(self):
        return len(self.malayalam_words)

    def __getitem__(self, idx):
        ml_word = self.malayalam_words[idx]
        en_word = self.english_words[idx]

        # Convert characters to indices
        ml_indices = [self.ml_char_to_idx.get(char, self.ml_char_to_idx['<UNK>']) for char in ml_word]
        # Add start and end tokens for English
        en_indices = [self.en_char_to_idx['<SOS>']] + [self.en_char_to_idx.get(char, self.en_char_to_idx['<UNK>']) for char in en_word] + [self.en_char_to_idx['<EOS>']]

        return torch.tensor(ml_indices), torch.tensor(en_indices), len(ml_indices), len(en_indices)


def collate_fn(batch):
    ml_words, en_words, ml_lengths, en_lengths = zip(*batch)

    # Pad sequences
    ml_words_padded = pad_sequence(ml_words, batch_first=True, padding_value=0)
    en_words_padded = pad_sequence(en_words, batch_first=True, padding_value=0)

    return ml_words_padded, en_words_padded, torch.tensor(ml_lengths), torch.tensor(en_lengths)


class Encoder(nn.Module):
    def __init__(self, input_size, embedding_dim, hidden_size, num_layers, dropout, cell_type='LSTM'):
        super(Encoder, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.cell_type = cell_type

        self.embedding = nn.Embedding(input_size, embedding_dim)
        self.dropout = nn.Dropout(dropout)

        if cell_type == 'LSTM':
            self.rnn = nn.LSTM(embedding_dim, hidden_size, num_layers, dropout=dropout if num_layers > 1 else 0, batch_first=True)
        elif cell_type == 'GRU':
            self.rnn = nn.GRU(embedding_dim, hidden_size, num_layers, dropout=dropout if num_layers > 1 else 0, batch_first=True)
        elif cell_type == 'RNN':
            self.rnn = nn.RNN(embedding_dim, hidden_size, num_layers, dropout=dropout if num_layers > 1 else 0, batch_first=True)
        else:
            raise ValueError("Cell type must be 'LSTM', 'GRU', or 'RNN'")

    def forward(self, src, src_lengths):
        # src: [batch_size, src_len]
        embedded = self.dropout(self.embedding(src))
        # embedded: [batch_size, src_len, embedding_dim]

        # Pack padded sequences for more efficient computation
        packed_embedded = pack_padded_sequence(embedded, src_lengths.cpu(), batch_first=True, enforce_sorted=False)

        if self.cell_type == 'LSTM':
            packed_outputs, (hidden, cell) = self.rnn(packed_embedded)
            outputs, _ = pad_packed_sequence(packed_outputs, batch_first=True)
            return outputs, (hidden, cell)
        else:
            packed_outputs, hidden = self.rnn(packed_embedded)
            outputs, _ = pad_packed_sequence(packed_outputs, batch_first=True)
            return outputs, hidden


class Attention(nn.Module):
    def __init__(self, hidden_size):
        super(Attention, self).__init__()
        self.hidden_size = hidden_size
        self.attn = nn.Linear(hidden_size * 2, hidden_size)
        self.v = nn.Parameter(torch.rand(hidden_size))

    def forward(self, hidden, encoder_outputs):
        # hidden: [batch_size, hidden_size]
        # encoder_outputs: [batch_size, src_len, hidden_size]

        batch_size = encoder_outputs.shape[0]
        src_len = encoder_outputs.shape[1]

        # Repeat decoder hidden state across sequence length
        hidden = hidden.unsqueeze(1).repeat(1, src_len, 1)

        # energy: [batch_size, src_len, hidden_size]
        energy = torch.tanh(self.attn(torch.cat((hidden, encoder_outputs), dim=2)))

        # v: [hidden_size] -> [batch_size, src_len]
        attention = torch.sum(self.v * energy, dim=2)

        # Return attention weights and apply them to encoder outputs
        return torch.softmax(attention, dim=1)


class Decoder(nn.Module):
    def __init__(self, output_size, embedding_dim, hidden_size, num_layers, dropout, cell_type='LSTM', attention=False):
        super(Decoder, self).__init__()
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.num_layers = num_layers
        self.cell_type = cell_type
        self.attention = attention

        self.embedding = nn.Embedding(output_size, embedding_dim)
        self.dropout = nn.Dropout(dropout)

        # If using attention, we'll concatenate the attention output with embedding
        input_size = embedding_dim
        if attention:
            self.attention_layer = Attention(hidden_size)
            self.fc_combine = nn.Linear(hidden_size + embedding_dim, embedding_dim)

        if cell_type == 'LSTM':
            self.rnn = nn.LSTM(input_size, hidden_size, num_layers, dropout=dropout if num_layers > 1 else 0, batch_first=True)
        elif cell_type == 'GRU':
            self.rnn = nn.GRU(input_size, hidden_size, num_layers, dropout=dropout if num_layers > 1 else 0, batch_first=True)
        elif cell_type == 'RNN':
            self.rnn = nn.RNN(input_size, hidden_size, num_layers, dropout=dropout if num_layers > 1 else 0, batch_first=True)
        else:
            raise ValueError("Cell type must be 'LSTM', 'GRU', or 'RNN'")

        self.fc_out = nn.Linear(hidden_size, output_size)

    def forward(self, input, hidden, encoder_outputs=None):
        # input: [batch_size, 1]
        # hidden: tuple of [num_layers, batch_size, hidden_size] (for LSTM)
        # encoder_outputs: [batch_size, src_len, hidden_size] (needed for attention)

        embedded = self.dropout(self.embedding(input))
        # embedded: [batch_size, 1, embedding_dim]

        if self.attention and encoder_outputs is not None:
            # Get the last hidden state from the top layer for attention
            if self.cell_type == 'LSTM':
                attn_hidden = hidden[0][-1]
            else:
                attn_hidden = hidden[-1]

            # Apply attention
            attn_weights = self.attention_layer(attn_hidden, encoder_outputs)
            # attn_weights: [batch_size, src_len]

            # Apply attention weights to encoder outputs
            context = torch.bmm(attn_weights.unsqueeze(1), encoder_outputs)
            # context: [batch_size, 1, hidden_size]

            # Combine context and embedded
            embedded = self.fc_combine(torch.cat((embedded.squeeze(1), context.squeeze(1)), dim=1)).unsqueeze(1)

        # Pass through RNN
        if self.cell_type == 'LSTM':
            output, (hidden, cell) = self.rnn(embedded, hidden)
            output = self.fc_out(output)
            return output, (hidden, cell)
        else:
            output, hidden = self.rnn(embedded, hidden)
            output = self.fc_out(output)
            return output, hidden


class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    def forward(self, src, src_lengths, trg, teacher_forcing_ratio=0.5):
        # src: [batch_size, src_len]
        # trg: [batch_size, trg_len]
        batch_size = src.shape[0]
        trg_len = trg.shape[1]
        trg_vocab_size = self.decoder.output_size

        # Tensor to store decoder outputs
        outputs = torch.zeros(batch_size, trg_len, trg_vocab_size).to(self.device)

        # Encode the source sequence
        encoder_outputs, hidden = self.encoder(src, src_lengths)

        # First input to the decoder is the <SOS> token
        input = trg[:, 0:1]

        for t in range(1, trg_len):
            # Use teacher forcing (use actual target as input) or not
            use_teacher_force = random.random() < teacher_forcing_ratio

            # Forward pass through decoder
            if self.decoder.attention:
                output, hidden = self.decoder(input, hidden, encoder_outputs)
            else:
                output, hidden = self.decoder(input, hidden)

            # output: [batch_size, 1, output_size]
            outputs[:, t:t+1, :] = output

            # Get the next input for decoder
            top1 = output.argmax(2) if not use_teacher_force else trg[:, t:t+1]
            input = top1

        return outputs

    def infer(self, src, src_lengths, max_len=50, beam_size=1):
        # For inference with or without beam search
        batch_size = src.shape[0]

        # Encode the source sequence
        encoder_outputs, hidden = self.encoder(src, src_lengths)

        if beam_size == 1:
            # Simple greedy decoding
            return self._greedy_decode(encoder_outputs, hidden, max_len)
        else:
            # Beam search decoding
            return self._beam_search_decode(encoder_outputs, hidden, max_len, beam_size)

    def _greedy_decode(self, encoder_outputs, hidden, max_len):
        batch_size = encoder_outputs.shape[0]

        # Start with <SOS> tokens
        input = torch.tensor([[1]] * batch_size).to(self.device)  # Assuming 1 is <SOS>

        # Lists to store predicted indices
        predictions = torch.zeros(batch_size, max_len, dtype=torch.long).to(self.device)

        for t in range(max_len):
            # Forward pass through decoder
            if self.decoder.attention:
                output, hidden = self.decoder(input, hidden, encoder_outputs)
            else:
                output, hidden = self.decoder(input, hidden)

            # Get most likely next token
            top1 = output.argmax(2)
            predictions[:, t] = top1.squeeze()

            # Break if all sequences have generated <EOS>
            if (top1 == 2).all():  # Assuming 2 is <EOS>
                break

            # Update input for next timestep
            input = top1

        return predictions

    def _beam_search_decode(self, encoder_outputs, hidden, max_len, beam_size):
        batch_size = encoder_outputs.shape[0]

        # List to store final predictions for each item in batch
        batch_predictions = []

        # Process each item in batch separately for beam search
        for b in range(batch_size):
            # Get encoder outputs and hidden state for this item
            single_encoder_output = encoder_outputs[b:b+1]

            if self.decoder.cell_type == 'LSTM':
                single_hidden = (hidden[0][:, b:b+1, :], hidden[1][:, b:b+1, :])
            else:
                single_hidden = hidden[:, b:b+1, :]

            # Start with <SOS> token
            input = torch.tensor([[1]]).to(self.device)  # Assuming 1 is <SOS>

            # Lists to keep track of beams: (sequence, score, hidden_state)
            beams = [(torch.tensor([[1]], device=self.device), 0, single_hidden)]
            complete_beams = []

            for t in range(max_len):
                new_beams = []

                # Explore each current beam
                for sequence, score, beam_hidden in beams:
                    # Skip completed sequences
                    if sequence[0, -1].item() == 2:  # <EOS> token
                        complete_beams.append((sequence, score, beam_hidden))
                        continue

                    # Use last token as input
                    beam_input = sequence[:, -1:].to(self.device)

                    # Forward pass through decoder
                    if self.decoder.attention:
                        output, new_hidden = self.decoder(beam_input, beam_hidden, single_encoder_output)
                    else:
                        output, new_hidden = self.decoder(beam_input, beam_hidden)

                    # Get log probabilities
                    log_probs = nn.functional.log_softmax(output.squeeze(1), dim=1)

                    # Get top beam_size probabilities
                    topk_probs, topk_idx = log_probs.topk(beam_size)

                    # Create new beams
                    for i in range(beam_size):
                        token = topk_idx[0, i].unsqueeze(0).unsqueeze(0)
                        new_seq = torch.cat([sequence, token], dim=1)
                        new_score = score + topk_probs[0, i].item()
                        new_beams.append((new_seq, new_score, new_hidden))

                # Keep only the top beam_size beams
                beams = sorted(new_beams, key=lambda x: x[1], reverse=True)[:beam_size]

                # Stop if all beams end with <EOS>
                if all(beam[0][0, -1].item() == 2 for beam in beams):
                    complete_beams.extend(beams)
                    break

            # Handle case if no complete beams (no <EOS> found)
            if not complete_beams:
                complete_beams = beams

            # Sort completed beams by score and return highest scoring sequence
            best_beam = max(complete_beams, key=lambda x: x[1])
            batch_predictions.append(best_beam[0])

        # Pad sequences to same length for batch
        max_pred_len = max(pred.shape[1] for pred in batch_predictions)
        padded_preds = torch.zeros(batch_size, max_pred_len, dtype=torch.long).to(self.device)

        for i, pred in enumerate(batch_predictions):
            padded_preds[i, :pred.shape[1]] = pred

        return padded_preds


def create_vocabularies(malayalam_words, english_words):
    # Create Malayalam character vocabulary
    ml_chars = set()
    for word in malayalam_words:
        ml_chars.update(word)

    # Create English character vocabulary
    en_chars = set()
    for word in english_words:
        en_chars.update(word)

    # Add special tokens
    ml_char_to_idx = {'<PAD>': 0, '<UNK>': 1}
    en_char_to_idx = {'<PAD>': 0, '<UNK>': 1, '<SOS>': 2, '<EOS>': 3}

    # Add characters to dictionaries
    for i, char in enumerate(sorted(ml_chars)):
        ml_char_to_idx[char] = i + 2  # +2 for <PAD> and <UNK>

    for i, char in enumerate(sorted(en_chars)):
        en_char_to_idx[char] = i + 4  # +4 for <PAD>, <UNK>, <SOS>, <EOS>

    ml_idx_to_char = {idx: char for char, idx in ml_char_to_idx.items()}
    en_idx_to_char = {idx: char for char, idx in en_char_to_idx.items()}

    return ml_char_to_idx, en_char_to_idx, ml_idx_to_char, en_idx_to_char


def evaluate(model, dataloader, criterion, device):
    model.eval()
    total_loss = 0

    with torch.no_grad():
        for batch in dataloader:
            src, trg, src_lengths, trg_lengths = batch
            src, trg = src.to(device), trg.to(device)

            # Forward pass through the model
            output = model(src, src_lengths, trg, teacher_forcing_ratio=0)

            # Reshape for computing loss
            output_dim = output.shape[-1]
            output = output[:, 1:].reshape(-1, output_dim)
            trg = trg[:, 1:].reshape(-1)

            # Compute loss
            loss = criterion(output, trg)
            total_loss += loss.item()

    return total_loss / len(dataloader)


def train(model, train_dataloader, val_dataloader, optimizer, criterion, n_epochs, device, clip=1, teacher_forcing_ratio=0.5):
    best_val_loss = float('inf')
    train_losses = []
    val_losses = []

    for epoch in range(n_epochs):
        model.train()
        epoch_loss = 0

        for batch in tqdm(train_dataloader, desc=f'Epoch {epoch+1}/{n_epochs}'):
            src, trg, src_lengths, trg_lengths = batch
            src, trg = src.to(device), trg.to(device)

            optimizer.zero_grad()

            # Forward pass
            output = model(src, src_lengths, trg, teacher_forcing_ratio)

            # Reshape for computing loss
            output_dim = output.shape[-1]
            output = output[:, 1:].reshape(-1, output_dim)
            trg = trg[:, 1:].reshape(-1)

            # Compute loss
            loss = criterion(output, trg)

            # Backpropagation
            loss.backward()

            # Gradient clipping
            torch.nn.utils.clip_grad_norm_(model.parameters(), clip)

            # Update parameters
            optimizer.step()

            epoch_loss += loss.item()

        # Evaluate on validation set
        val_loss = evaluate(model, val_dataloader, criterion, device)

        # Save losses for plotting
        train_losses.append(epoch_loss / len(train_dataloader))
        val_losses.append(val_loss)

        print(f'Epoch: {epoch+1:02}')
        print(f'\tTrain Loss: {epoch_loss / len(train_dataloader):.4f}')
        print(f'\tValidation Loss: {val_loss:.4f}')

        # Save best model
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), 'best_transliteration_model.pt')
            print(f'\tBest model saved with validation loss: {val_loss:.4f}')

    return train_losses, val_losses


def predict(model, src, src_lengths, en_idx_to_char, device, beam_size=1):
    model.eval()
    with torch.no_grad():
        # Forward pass with beam search or greedy decoding
        predictions = model.infer(src.to(device), src_lengths, beam_size=beam_size)

    # Convert predictions to characters
    predicted_words = []
    for pred in predictions:
        word = []
        for idx in pred:
            idx = idx.item()
            if idx == 3:  # <EOS> token
                break
            if idx > 3:  # Skip special tokens
                word.append(en_idx_to_char[idx])
        predicted_words.append(''.join(word))

    return predicted_words


def calculate_accuracy(predicted_words, target_words):
    correct = 0
    for pred, target in zip(predicted_words, target_words):
        if pred == target:
            correct += 1
    return correct / len(target_words) * 100.0


def plot_learning_curves(train_losses, val_losses):
    plt.figure(figsize=(10, 6))
    plt.plot(train_losses, label='Training Loss')
    plt.plot(val_losses, label='Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.title('Training and Validation Loss')
    plt.legend()
    plt.grid(True)
    plt.savefig('learning_curves.png')
    plt.close()


def main():
    parser = argparse.ArgumentParser(description='Malayalam to English Transliteration')
    parser.add_argument('--embedding_dim', type=int, default=64, choices=[16, 32, 64, 256],
                        help='Embedding dimension size')
    parser.add_argument('--encoder_layers', type=int, default=2, choices=[1, 2, 3],
                        help='Number of encoder layers')
    parser.add_argument('--decoder_layers', type=int, default=2, choices=[1, 2, 3],
                        help='Number of decoder layers')
    parser.add_argument('--hidden_size', type=int, default=64, choices=[16, 32, 64, 256],
                        help='Hidden layer size')
    parser.add_argument('--cell_type', type=str, default='LSTM', choices=['RNN', 'GRU', 'LSTM'],
                        help='RNN cell type')
    parser.add_argument('--dropout', type=float, default=0.2, choices=[0.2, 0.3],
                        help='Dropout rate')
    parser.add_argument('--attention', action='store_true',
                        help='Use attention mechanism')
    parser.add_argument('--teacher_forcing', type=float, default=0.5,
                        help='Teacher forcing ratio (0 to disable)')
    parser.add_argument('--beam_size', type=int, default=1,
                        help='Beam size for beam search decoding (1 for greedy)')
    parser.add_argument('--batch_size', type=int, default=64,
                        help='Batch size for training')
    parser.add_argument('--epochs', type=int, default=15,
                        help='Number of training epochs')
    parser.add_argument('--learning_rate', type=float, default=0.001,
                        help='Learning rate')
    parser.add_argument('--data_path', type=str, default='/content/ml/lexicons/ml.translit.sampled.train.tsv',
                        help='Path to training data')
    parser.add_argument('--val_path', type=str, default='/content/ml/lexicons/ml.translit.sampled.dev.tsv',
                        help='Path to validation data')
    parser.add_argument('--test_path', type=str, default='/content/ml/lexicons/ml.translit.sampled.test.tsv',
                        help='Path to test data')

    args = parser.parse_args()

    # Set device
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Using device: {device}")

    # Load data
    try:
        df = pd.read_csv(args.data_path, sep='\t', header=None)
        df_val = pd.read_csv(args.val_path, sep='\t', header=None)
        df_test = pd.read_csv(args.test_path, sep='\t', header=None)
    except FileNotFoundError:
        print("Error: File not found. Please check file paths.")
        return

    # Split into native (Malayalam) and romanized (English) columns
    malayalam_words = df[0].tolist()
    english_words = df[1].tolist()

    malayalam_words_val = df_val[0].tolist()
    english_words_val = df_val[1].tolist()

    malayalam_words_test = df_test[0].tolist()
    english_words_test = df_test[1].tolist()

    # Create vocabularies
    ml_char_to_idx, en_char_to_idx, ml_idx_to_char, en_idx_to_char = create_vocabularies(
        malayalam_words + malayalam_words_val + malayalam_words_test,
        english_words + english_words_val + english_words_test
    )

    # Create datasets
    train_dataset = TransliterationDataset(malayalam_words, english_words, ml_char_to_idx, en_char_to_idx)
    val_dataset = TransliterationDataset(malayalam_words_val, english_words_val, ml_char_to_idx, en_char_to_idx)
    test_dataset = TransliterationDataset(malayalam_words_test, english_words_test, ml_char_to_idx, en_char_to_idx)

    # Create dataloaders
    train_dataloader = DataLoader(train_dataset, batch_size=args.batch_size, shuffle=True, collate_fn=collate_fn)
    val_dataloader = DataLoader(val_dataset, batch_size=args.batch_size, shuffle=False, collate_fn=collate_fn)
    test_dataloader = DataLoader(test_dataset, batch_size=args.batch_size, shuffle=False, collate_fn=collate_fn)

    # Create model
    input_size = len(ml_char_to_idx)
    output_size = len(en_char_to_idx)

    encoder = Encoder(
        input_size=input_size,
        embedding_dim=args.embedding_dim,
        hidden_size=args.hidden_size,
        num_layers=args.encoder_layers,
        dropout=args.dropout,
        cell_type=args.cell_type
    )

    decoder = Decoder(
        output_size=output_size,
        embedding_dim=args.embedding_dim,
        hidden_size=args.hidden_size,
        num_layers=args.decoder_layers,
        dropout=args.dropout,
        cell_type=args.cell_type,
        attention=args.attention
    )

    model = Seq2Seq(encoder, decoder, device).to(device)

    # Define loss function and optimizer
    criterion = nn.CrossEntropyLoss(ignore_index=0)  # Ignore padding
    optimizer = optim.Adam(model.parameters(), lr=args.learning_rate)

    # Print model architecture and hyperparameters
    print(f"Model Architecture:")
    print(f"Input Size: {input_size}")
    print(f"Output Size: {output_size}")
    print(f"Embedding Dimension: {args.embedding_dim}")
    print(f"Hidden Size: {args.hidden_size}")
    print(f"Encoder Layers: {args.encoder_layers}")
    print(f"Decoder Layers: {args.decoder_layers}")
    print(f"Cell Type: {args.cell_type}")
    print(f"Dropout: {args.dropout}")
    print(f"Attention: {args.attention}")
    print(f"Teacher Forcing Ratio: {args.teacher_forcing}")
    print(f"Beam Size: {args.beam_size}")
    print(f"Batch Size: {args.batch_size}")
    print(f"Learning Rate: {args.learning_rate}")
    print(f"Number of Epochs: {args.epochs}")

    # Train model
    start_time = time.time()
    train_losses, val_losses = train(
        model,
        train_dataloader,
        val_dataloader,
        optimizer,
        criterion,
        args.epochs,
        device,
        teacher_forcing_ratio=args.teacher_forcing
    )
    print(f"Training completed in {time.time() - start_time:.2f} seconds")

    # Plot learning curves
    plot_learning_curves(train_losses, val_losses)

    # Load best model
    model.load_state_dict(torch.load('best_transliteration_model.pt'))

    # Evaluate on test set
    test_loss = evaluate(model, test_dataloader, criterion, device)
    print(f"Test Loss: {test_loss:.4f}")

    # Test some examples with and without beam search
    num_examples = min(5, len(test_dataset))
    for i in range(num_examples):
        src, trg, src_len, trg_len = test_dataset[i]
        src = src.unsqueeze(0)  # Add batch dimension
        src_len = torch.tensor([src_len])

        # Get original Malayalam and English words
        ml_word = ''.join([ml_idx_to_char[idx.item()] for idx in src[0] if idx.item() > 1])
        en_word = ''.join([en_idx_to_char[idx.item()] for idx in trg if idx.item() > 3])  # Skip special tokens

        # Predict with greedy decoding
        greedy_pred = predict(model, src, src_len, en_idx_to_char, device, beam_size=1)[0]

        # Predict with beam search if beam size > 1
        beam_pred = predict(model, src, src_len, en_idx_to_char, device, beam_size=args.beam_size)[0] if args.beam_size > 1 else greedy_pred

        print(f"Example {i+1}:")
        print(f"Malayalam: {ml_word}")
        print(f"Target English: {en_word}")
        print(f"Prediction (Greedy): {greedy_pred}")
        if args.beam_size > 1:
            print(f"Prediction (Beam Search, size={args.beam_size}): {beam_pred}")
        print()

    # Calculate accuracy on test set
    all_predictions = []
    all_targets = []

    for batch in test_dataloader:
        src, trg, src_lengths, _ = batch
        src, trg = src.to(device), trg.to(device)

        # Get predictions
        predictions = predict(model, src, src_lengths, en_idx_to_char, device, beam_size=args.beam_size)

        # Get target English words
        for i in range(len(trg)):
            # Skip special tokens
            target_word = ''.join([en_idx_to_char[idx.item()] for idx in trg[i] if idx.item() > 3])
            all_targets.append(target_word)

        all_predictions.extend(predictions)

    # Calculate accuracy
    accuracy = calculate_accuracy(all_predictions, all_targets)
    print(f"Test Accuracy: {accuracy:.2f}%")

    # Save vocabularies for later use
    import json
    with open('ml_char_to_idx.json', 'w') as f:
        json.dump(ml_char_to_idx, f)
    with open('en_char_to_idx.json', 'w') as f:
        json.dump(en_char_to_idx, f)


if __name__ == "__main__":
    main()

Writing train.py


In [None]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence
import matplotlib.pyplot as plt
import random
import time
import argparse
from tqdm import tqdm

class TransliterationDataset(Dataset):
    def __init__(self, malayalam_words, english_words, ml_char_to_idx, en_char_to_idx):
        self.malayalam_words = malayalam_words
        self.english_words = english_words
        self.ml_char_to_idx = ml_char_to_idx
        self.en_char_to_idx = en_char_to_idx

        # Always use these specific tokens regardless of what's in the vocabulary
        self.sos_index = 2  # Always use index 2 for SOS token
        self.eos_index = 3  # Always use index 3 for EOS token

        print(f"Dataset initialized with SOS index: {self.sos_index}, EOS index: {self.eos_index}")

    def __len__(self):
        return len(self.malayalam_words)

    def __getitem__(self, idx):
        ml_word = self.malayalam_words[idx]
        en_word = self.english_words[idx]

        # Convert characters to indices
        ml_indices = [self.ml_char_to_idx.get(char, self.ml_char_to_idx['<UNK>']) for char in ml_word]

        # Use direct indices instead of token lookup
        en_indices = [self.sos_index] + [self.en_char_to_idx.get(char, self.en_char_to_idx['<UNK>']) for char in en_word] + [self.eos_index]

        return torch.tensor(ml_indices), torch.tensor(en_indices), len(ml_indices), len(en_indices)
def create_vocabularies(malayalam_words, english_words):
    # Ensure all words are strings
    malayalam_words = [str(word) if not isinstance(word, str) else word for word in malayalam_words]
    english_words = [str(word) if not isinstance(word, str) else word for word in english_words]

    # Create Malayalam character vocabulary
    ml_chars = set()
    for word in malayalam_words:
        ml_chars.update(word)

    # Create English character vocabulary
    en_chars = set()
    for word in english_words:
        en_chars.update(word)

    # Add special tokens
    ml_char_to_idx = {'<PAD>': 0, '<UNK>': 1}
    en_char_to_idx = {'<PAD>': 0, '<UNK>': 1, '<SOS>': 2, '<EOS>': 3}

    # Add characters to dictionaries
    for i, char in enumerate(sorted(ml_chars)):
        ml_char_to_idx[char] = i + 2  # +2 for <PAD> and <UNK>

    for i, char in enumerate(sorted(en_chars)):
        en_char_to_idx[char] = i + 4  # +4 for <PAD>, <UNK>, <SOS>, <EOS>

    ml_idx_to_char = {idx: char for char, idx in ml_char_to_idx.items()}
    en_idx_to_char = {idx: char for char, idx in en_char_to_idx.items()}

    return ml_char_to_idx, en_char_to_idx, ml_idx_to_char, en_idx_to_char

def collate_fn(batch):
    ml_words, en_words, ml_lengths, en_lengths = zip(*batch)

    # Pad sequences
    ml_words_padded = pad_sequence(ml_words, batch_first=True, padding_value=0)
    en_words_padded = pad_sequence(en_words, batch_first=True, padding_value=0)

    return ml_words_padded, en_words_padded, torch.tensor(ml_lengths), torch.tensor(en_lengths)


class Encoder(nn.Module):
    def __init__(self, input_size, embedding_dim, hidden_size, num_layers, dropout, cell_type='LSTM'):
        super(Encoder, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.cell_type = cell_type

        self.embedding = nn.Embedding(input_size, embedding_dim)
        self.dropout = nn.Dropout(dropout)

        if cell_type == 'LSTM':
            self.rnn = nn.LSTM(embedding_dim, hidden_size, num_layers, dropout=dropout if num_layers > 1 else 0, batch_first=True)
        elif cell_type == 'GRU':
            self.rnn = nn.GRU(embedding_dim, hidden_size, num_layers, dropout=dropout if num_layers > 1 else 0, batch_first=True)
        elif cell_type == 'RNN':
            self.rnn = nn.RNN(embedding_dim, hidden_size, num_layers, dropout=dropout if num_layers > 1 else 0, batch_first=True)
        else:
            raise ValueError("Cell type must be 'LSTM', 'GRU', or 'RNN'")

    def forward(self, src, src_lengths):
        # src: [batch_size, src_len]
        embedded = self.dropout(self.embedding(src))
        # embedded: [batch_size, src_len, embedding_dim]

        # Pack padded sequences for more efficient computation
        packed_embedded = pack_padded_sequence(embedded, src_lengths.cpu(), batch_first=True, enforce_sorted=False)

        if self.cell_type == 'LSTM':
            packed_outputs, (hidden, cell) = self.rnn(packed_embedded)
            outputs, _ = pad_packed_sequence(packed_outputs, batch_first=True)
            return outputs, (hidden, cell)
        else:
            packed_outputs, hidden = self.rnn(packed_embedded)
            outputs, _ = pad_packed_sequence(packed_outputs, batch_first=True)
            return outputs, hidden


class Attention(nn.Module):
    def __init__(self, hidden_size):
        super(Attention, self).__init__()
        self.hidden_size = hidden_size
        self.attn = nn.Linear(hidden_size * 2, hidden_size)
        self.v = nn.Parameter(torch.rand(hidden_size))

    def forward(self, hidden, encoder_outputs):
        # hidden: [batch_size, hidden_size]
        # encoder_outputs: [batch_size, src_len, hidden_size]

        batch_size = encoder_outputs.shape[0]
        src_len = encoder_outputs.shape[1]

        # Repeat decoder hidden state across sequence length
        hidden = hidden.unsqueeze(1).repeat(1, src_len, 1)

        # energy: [batch_size, src_len, hidden_size]
        energy = torch.tanh(self.attn(torch.cat((hidden, encoder_outputs), dim=2)))

        # v: [hidden_size] -> [batch_size, src_len]
        attention = torch.sum(self.v * energy, dim=2)

        # Return attention weights and apply them to encoder outputs
        return torch.softmax(attention, dim=1)


class Decoder(nn.Module):
    def __init__(self, output_size, embedding_dim, hidden_size, num_layers, dropout, cell_type='LSTM', attention=False):
        super(Decoder, self).__init__()
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.num_layers = num_layers
        self.cell_type = cell_type
        self.attention = attention

        self.embedding = nn.Embedding(output_size, embedding_dim)
        self.dropout = nn.Dropout(dropout)

        # If using attention, we'll concatenate the attention output with embedding
        input_size = embedding_dim
        if attention:
            self.attention_layer = Attention(hidden_size)
            self.fc_combine = nn.Linear(hidden_size + embedding_dim, embedding_dim)

        if cell_type == 'LSTM':
            self.rnn = nn.LSTM(input_size, hidden_size, num_layers, dropout=dropout if num_layers > 1 else 0, batch_first=True)
        elif cell_type == 'GRU':
            self.rnn = nn.GRU(input_size, hidden_size, num_layers, dropout=dropout if num_layers > 1 else 0, batch_first=True)
        elif cell_type == 'RNN':
            self.rnn = nn.RNN(input_size, hidden_size, num_layers, dropout=dropout if num_layers > 1 else 0, batch_first=True)
        else:
            raise ValueError("Cell type must be 'LSTM', 'GRU', or 'RNN'")

        self.fc_out = nn.Linear(hidden_size, output_size)

    def forward(self, input, hidden, encoder_outputs=None):
        # input: [batch_size, 1]
        # hidden: tuple of [num_layers, batch_size, hidden_size] (for LSTM)
        # encoder_outputs: [batch_size, src_len, hidden_size] (needed for attention)

        embedded = self.dropout(self.embedding(input))
        # embedded: [batch_size, 1, embedding_dim]

        if self.attention and encoder_outputs is not None:
            # Get the last hidden state from the top layer for attention
            if self.cell_type == 'LSTM':
                attn_hidden = hidden[0][-1]
            else:
                attn_hidden = hidden[-1]

            # Apply attention
            attn_weights = self.attention_layer(attn_hidden, encoder_outputs)
            # attn_weights: [batch_size, src_len]

            # Apply attention weights to encoder outputs
            context = torch.bmm(attn_weights.unsqueeze(1), encoder_outputs)
            # context: [batch_size, 1, hidden_size]

            # Combine context and embedded
            embedded = self.fc_combine(torch.cat((embedded.squeeze(1), context.squeeze(1)), dim=1)).unsqueeze(1)

        # Pass through RNN
        if self.cell_type == 'LSTM':
            output, (hidden, cell) = self.rnn(embedded, hidden)
            output = self.fc_out(output)
            return output, (hidden, cell)
        else:
            output, hidden = self.rnn(embedded, hidden)
            output = self.fc_out(output)
            return output, hidden


class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    def forward(self, src, src_lengths, trg, teacher_forcing_ratio=0.5):
        # src: [batch_size, src_len]
        # trg: [batch_size, trg_len]
        batch_size = src.shape[0]
        trg_len = trg.shape[1]
        trg_vocab_size = self.decoder.output_size

        # Tensor to store decoder outputs
        outputs = torch.zeros(batch_size, trg_len, trg_vocab_size).to(self.device)

        # Encode the source sequence
        encoder_outputs, hidden = self.encoder(src, src_lengths)

        # First input to the decoder is the <SOS> token
        input = trg[:, 0:1]

        for t in range(1, trg_len):
            # Use teacher forcing (use actual target as input) or not
            use_teacher_force = random.random() < teacher_forcing_ratio

            # Forward pass through decoder
            if self.decoder.attention:
                output, hidden = self.decoder(input, hidden, encoder_outputs)
            else:
                output, hidden = self.decoder(input, hidden)

            # output: [batch_size, 1, output_size]
            outputs[:, t:t+1, :] = output

            # Get the next input for decoder
            top1 = output.argmax(2) if not use_teacher_force else trg[:, t:t+1]
            input = top1

        return outputs

    def infer(self, src, src_lengths, max_len=50, beam_size=1):
        # For inference with or without beam search
        batch_size = src.shape[0]

        # Encode the source sequence
        encoder_outputs, hidden = self.encoder(src, src_lengths)

        if beam_size == 1:
            # Simple greedy decoding
            return self._greedy_decode(encoder_outputs, hidden, max_len)
        else:
            # Beam search decoding
            return self._beam_search_decode(encoder_outputs, hidden, max_len, beam_size)

    def _greedy_decode(self, encoder_outputs, hidden, max_len):
        batch_size = encoder_outputs.shape[0]

        # Start with <SOS> tokens
        input = torch.tensor([[1]] * batch_size).to(self.device)  # Assuming 1 is <SOS>

        # Lists to store predicted indices
        predictions = torch.zeros(batch_size, max_len, dtype=torch.long).to(self.device)

        for t in range(max_len):
            # Forward pass through decoder
            if self.decoder.attention:
                output, hidden = self.decoder(input, hidden, encoder_outputs)
            else:
                output, hidden = self.decoder(input, hidden)

            # Get most likely next token
            top1 = output.argmax(2)
            predictions[:, t] = top1.squeeze()

            # Break if all sequences have generated <EOS>
            if (top1 == 2).all():  # Assuming 2 is <EOS>
                break

            # Update input for next timestep
            input = top1

        return predictions

    def _beam_search_decode(self, encoder_outputs, hidden, max_len, beam_size):
        batch_size = encoder_outputs.shape[0]

        # List to store final predictions for each item in batch
        batch_predictions = []

        # Process each item in batch separately for beam search
        for b in range(batch_size):
            # Get encoder outputs and hidden state for this item
            single_encoder_output = encoder_outputs[b:b+1]

            if self.decoder.cell_type == 'LSTM':
                single_hidden = (hidden[0][:, b:b+1, :], hidden[1][:, b:b+1, :])
            else:
                single_hidden = hidden[:, b:b+1, :]

            # Start with <SOS> token
            input = torch.tensor([[1]]).to(self.device)  # Assuming 1 is <SOS>

            # Lists to keep track of beams: (sequence, score, hidden_state)
            beams = [(torch.tensor([[1]], device=self.device), 0, single_hidden)]
            complete_beams = []

            for t in range(max_len):
                new_beams = []

                # Explore each current beam
                for sequence, score, beam_hidden in beams:
                    # Skip completed sequences
                    if sequence[0, -1].item() == 2:  # <EOS> token
                        complete_beams.append((sequence, score, beam_hidden))
                        continue

                    # Use last token as input
                    beam_input = sequence[:, -1:].to(self.device)

                    # Forward pass through decoder
                    if self.decoder.attention:
                        output, new_hidden = self.decoder(beam_input, beam_hidden, single_encoder_output)
                    else:
                        output, new_hidden = self.decoder(beam_input, beam_hidden)

                    # Get log probabilities
                    log_probs = nn.functional.log_softmax(output.squeeze(1), dim=1)

                    # Get top beam_size probabilities
                    topk_probs, topk_idx = log_probs.topk(beam_size)

                    # Create new beams
                    for i in range(beam_size):
                        token = topk_idx[0, i].unsqueeze(0).unsqueeze(0)
                        new_seq = torch.cat([sequence, token], dim=1)
                        new_score = score + topk_probs[0, i].item()
                        new_beams.append((new_seq, new_score, new_hidden))

                # Keep only the top beam_size beams
                beams = sorted(new_beams, key=lambda x: x[1], reverse=True)[:beam_size]

                # Stop if all beams end with <EOS>
                if all(beam[0][0, -1].item() == 2 for beam in beams):
                    complete_beams.extend(beams)
                    break

            # Handle case if no complete beams (no <EOS> found)
            if not complete_beams:
                complete_beams = beams

            # Sort completed beams by score and return highest scoring sequence
            best_beam = max(complete_beams, key=lambda x: x[1])
            batch_predictions.append(best_beam[0])

        # Pad sequences to same length for batch
        max_pred_len = max(pred.shape[1] for pred in batch_predictions)
        padded_preds = torch.zeros(batch_size, max_pred_len, dtype=torch.long).to(self.device)

        for i, pred in enumerate(batch_predictions):
            padded_preds[i, :pred.shape[1]] = pred

        return padded_preds


def create_vocabularies(malayalam_words, english_words):
    # Ensure all words are strings
    malayalam_words = [str(word) if not isinstance(word, str) else word for word in malayalam_words]
    english_words = [str(word) if not isinstance(word, str) else word for word in english_words]

    # Create Malayalam character vocabulary
    ml_chars = set()
    for word in malayalam_words:
        ml_chars.update(word)

    # Create English character vocabulary
    en_chars = set()
    for word in english_words:
        en_chars.update(word)

    # Add special tokens
    ml_char_to_idx = {'<PAD>': 0, '<UNK>': 1}
    en_char_to_idx = {'<PAD>': 0, '<UNK>': 1, '< SOS >': 2, '<EOS>': 3}

    # Add characters to dictionaries
    for i, char in enumerate(sorted(ml_chars)):
        ml_char_to_idx[char] = i + 2  # +2 for <PAD> and <UNK>

    for i, char in enumerate(sorted(en_chars)):
        en_char_to_idx[char] = i + 4  # +4 for <PAD>, <UNK>, < SOS >, <EOS>

    ml_idx_to_char = {idx: char for char, idx in ml_char_to_idx.items()}
    en_idx_to_char = {idx: char for char, idx in en_char_to_idx.items()}

    return ml_char_to_idx, en_char_to_idx, ml_idx_to_char, en_idx_to_char


def evaluate(model, dataloader, criterion, device):
    model.eval()
    total_loss = 0

    with torch.no_grad():
        for batch in dataloader:
            src, trg, src_lengths, trg_lengths = batch
            src, trg = src.to(device), trg.to(device)

            # Forward pass through the model
            output = model(src, src_lengths, trg, teacher_forcing_ratio=0)

            # Reshape for computing loss
            output_dim = output.shape[-1]
            output = output[:, 1:].reshape(-1, output_dim)
            trg = trg[:, 1:].reshape(-1)

            # Compute loss
            loss = criterion(output, trg)
            total_loss += loss.item()

    return total_loss / len(dataloader)


def train(model, train_dataloader, val_dataloader, optimizer, criterion, n_epochs, device, clip=1, teacher_forcing_ratio=0.5):
    best_val_loss = float('inf')
    train_losses = []
    val_losses = []

    for epoch in range(n_epochs):
        model.train()
        epoch_loss = 0

        for batch in tqdm(train_dataloader, desc=f'Epoch {epoch+1}/{n_epochs}'):
            src, trg, src_lengths, trg_lengths = batch
            src, trg = src.to(device), trg.to(device)

            optimizer.zero_grad()

            # Forward pass
            output = model(src, src_lengths, trg, teacher_forcing_ratio)

            # Reshape for computing loss
            output_dim = output.shape[-1]
            output = output[:, 1:].reshape(-1, output_dim)
            trg = trg[:, 1:].reshape(-1)

            # Compute loss
            loss = criterion(output, trg)

            # Backpropagation
            loss.backward()

            # Gradient clipping
            torch.nn.utils.clip_grad_norm_(model.parameters(), clip)

            # Update parameters
            optimizer.step()

            epoch_loss += loss.item()

        # Evaluate on validation set
        val_loss = evaluate(model, val_dataloader, criterion, device)

        # Save losses for plotting
        train_losses.append(epoch_loss / len(train_dataloader))
        val_losses.append(val_loss)

        print(f'Epoch: {epoch+1:02}')
        print(f'\tTrain Loss: {epoch_loss / len(train_dataloader):.4f}')
        print(f'\tValidation Loss: {val_loss:.4f}')

        # Save best model
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), 'best_transliteration_model.pt')
            print(f'\tBest model saved with validation loss: {val_loss:.4f}')

    return train_losses, val_losses


def predict(model, src, src_lengths, en_idx_to_char, device, beam_size=1):
    model.eval()
    with torch.no_grad():
        # Forward pass with beam search or greedy decoding
        predictions = model.infer(src.to(device), src_lengths, beam_size=beam_size)

    # Convert predictions to characters
    predicted_words = []
    for pred in predictions:
        word = []
        for idx in pred:
            idx = idx.item()
            if idx == 3:  # <EOS> token
                break
            if idx > 3:  # Skip special tokens
                word.append(en_idx_to_char[idx])
        predicted_words.append(''.join(word))

    return predicted_words


def calculate_accuracy(predicted_words, target_words):
    correct = 0
    total = 0
    for pred, target in zip(predicted_words, target_words):
        target = str(target)  # Convert target to string in case it's not
        if pred == target:
            correct += 1
        total += 1
    return (correct / total) * 100.0 if total > 0 else 0.0


def plot_learning_curves(train_losses, val_losses):
    plt.figure(figsize=(10, 6))
    plt.plot(train_losses, label='Training Loss')
    plt.plot(val_losses, label='Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.title('Training and Validation Loss')
    plt.legend()
    plt.grid(True)
    plt.savefig('learning_curves.png')
    plt.close()


def main(args=None):
    # If running in a Jupyter/Colab environment, allow parameters to be passed directly
    # Otherwise, use command line arguments
    if args is None:
        parser = argparse.ArgumentParser(description='Malayalam to English Transliteration')
        parser.add_argument('--embedding_dim', type=int, default=64, choices=[16, 32, 64, 256],
                            help='Embedding dimension size')
        parser.add_argument('--encoder_layers', type=int, default=2, choices=[1, 2, 3],
                            help='Number of encoder layers')
        parser.add_argument('--decoder_layers', type=int, default=2, choices=[1, 2, 3],
                            help='Number of decoder layers')
        parser.add_argument('--hidden_size', type=int, default=64, choices=[16, 32, 64, 256],
                            help='Hidden layer size')
        parser.add_argument('--cell_type', type=str, default='LSTM', choices=['RNN', 'GRU', 'LSTM'],
                            help='RNN cell type')
        parser.add_argument('--dropout', type=float, default=0.2, choices=[0.2, 0.3],
                            help='Dropout rate')
        parser.add_argument('--attention', action='store_true',
                            help='Use attention mechanism')
        parser.add_argument('--teacher_forcing', type=float, default=0.5,
                            help='Teacher forcing ratio (0 to disable)')
        parser.add_argument('--beam_size', type=int, default=1,
                            help='Beam size for beam search decoding (1 for greedy)')
        parser.add_argument('--batch_size', type=int, default=64,
                            help='Batch size for training')
        parser.add_argument('--epochs', type=int, default=15,
                            help='Number of training epochs')
        parser.add_argument('--learning_rate', type=float, default=0.001,
                            help='Learning rate')
        parser.add_argument('--data_path', type=str, default='/content/ml/lexicons/ml.translit.sampled.train.tsv',
                            help='Path to training data')
        parser.add_argument('--val_path', type=str, default='/content/ml/lexicons/ml.translit.sampled.dev.tsv',
                            help='Path to validation data')
        parser.add_argument('--test_path', type=str, default='/content/ml/lexicons/ml.translit.sampled.test.tsv',
                            help='Path to test data')

        try:
            args = parser.parse_args()
        except SystemExit:
            # If running in Jupyter/Colab, use default arguments
            print("Using default arguments since we appear to be in a notebook environment")
            class Args:
                def __init__(self):
                    self.embedding_dim = 64
                    self.encoder_layers = 2
                    self.decoder_layers = 2
                    self.hidden_size = 64
                    self.cell_type = 'LSTM'
                    self.dropout = 0.2
                    self.attention = True
                    self.teacher_forcing = 0.5
                    self.beam_size = 1
                    self.batch_size = 64
                    self.epochs = 15
                    self.learning_rate = 0.001
                    self.data_path = '/content/ml/lexicons/ml.translit.sampled.train.tsv'
                    self.val_path = '/content/ml/lexicons/ml.translit.sampled.dev.tsv'
                    self.test_path = '/content/ml/lexicons/ml.translit.sampled.test.tsv'
            args = Args()

    # Set device
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Using device: {device}")

    # Load data
    try:
        df = pd.read_csv(args.data_path, sep='\t', header=None).dropna()
        df_val = pd.read_csv(args.val_path, sep='\t', header=None).dropna()
        df_test = pd.read_csv(args.test_path, sep='\t', header=None).dropna()

        print(f"Data loaded successfully:")
        print(f"Training samples: {len(df)}")
        print(f"Validation samples: {len(df_val)}")
        print(f"Test samples: {len(df_test)}")

        # Check data types
        print("\nChecking data types:")
        print(f"Training data types: {df.dtypes}")

        # Display a few samples
        print("\nSample data:")
        print(df.head(3))

    except FileNotFoundError as e:
        print(f"Error: File not found. {e}")
        return
    except Exception as e:
        print(f"Error loading data: {e}")
        return

    # Split into native (Malayalam) and romanized (English) columns
    malayalam_words = df[0].tolist()
    english_words = df[1].tolist()
    print(f"Number of Malayalam words: {len(malayalam_words)}")
    print(f"Number of English words: {len(english_words)}")

    malayalam_words_val = df_val[0].tolist()
    english_words_val = df_val[1].tolist()

    malayalam_words_test = df_test[0].tolist()
    english_words_test = df_test[1].tolist()

    # Check for non-string values and convert if needed
    print("\nConverting data types if needed...")

    # Create vocabularies
    ml_char_to_idx, en_char_to_idx, ml_idx_to_char, en_idx_to_char = create_vocabularies(
        malayalam_words + malayalam_words_val + malayalam_words_test,
        english_words + english_words_val + english_words_test
    )

    # Create datasets
    train_dataset = TransliterationDataset(malayalam_words, english_words, ml_char_to_idx, en_char_to_idx)
    val_dataset = TransliterationDataset(malayalam_words_val, english_words_val, ml_char_to_idx, en_char_to_idx)
    test_dataset = TransliterationDataset(malayalam_words_test, english_words_test, ml_char_to_idx, en_char_to_idx)

    # Create dataloaders
    train_dataloader = DataLoader(train_dataset, batch_size=args.batch_size, shuffle=True, collate_fn=collate_fn)
    val_dataloader = DataLoader(val_dataset, batch_size=args.batch_size, shuffle=False, collate_fn=collate_fn)
    test_dataloader = DataLoader(test_dataset, batch_size=args.batch_size, shuffle=False, collate_fn=collate_fn)

    # Create model
    input_size = len(ml_char_to_idx)
    output_size = len(en_char_to_idx)

    encoder = Encoder(
        input_size=input_size,
        embedding_dim=args.embedding_dim,
        hidden_size=args.hidden_size,
        num_layers=args.encoder_layers,
        dropout=args.dropout,
        cell_type=args.cell_type
    )

    decoder = Decoder(
        output_size=output_size,
        embedding_dim=args.embedding_dim,
        hidden_size=args.hidden_size,
        num_layers=args.decoder_layers,
        dropout=args.dropout,
        cell_type=args.cell_type,
        attention=args.attention
    )

    model = Seq2Seq(encoder, decoder, device).to(device)

    # Define loss function and optimizer
    criterion = nn.CrossEntropyLoss(ignore_index=0)  # Ignore padding
    optimizer = optim.Adam(model.parameters(), lr=args.learning_rate)

    # Print model architecture and hyperparameters
    print(f"Model Architecture:")
    print(f"Input Size: {input_size}")
    print(f"Output Size: {output_size}")
    print(f"Embedding Dimension: {args.embedding_dim}")
    print(f"Hidden Size: {args.hidden_size}")
    print(f"Encoder Layers: {args.encoder_layers}")
    print(f"Decoder Layers: {args.decoder_layers}")
    print(f"Cell Type: {args.cell_type}")
    print(f"Dropout: {args.dropout}")
    print(f"Attention: {args.attention}")
    print(f"Teacher Forcing Ratio: {args.teacher_forcing}")
    print(f"Beam Size: {args.beam_size}")
    print(f"Batch Size: {args.batch_size}")
    print(f"Learning Rate: {args.learning_rate}")
    print(f"Number of Epochs: {args.epochs}")

    # Train model
    start_time = time.time()
    train_losses, val_losses = train(
        model,
        train_dataloader,
        val_dataloader,
        optimizer,
        criterion,
        args.epochs,
        device,
        teacher_forcing_ratio=args.teacher_forcing
    )
    print(f"Training completed in {time.time() - start_time:.2f} seconds")

    # Plot learning curves
    plot_learning_curves(train_losses, val_losses)

    # Load best model
    model.load_state_dict(torch.load('best_transliteration_model.pt'))

    # Evaluate on test set
    test_loss = evaluate(model, test_dataloader, criterion, device)
    print(f"Test Loss: {test_loss:.4f}")

    # Test some examples with and without beam search
    num_examples = min(5, len(test_dataset))
    for i in range(num_examples):
        src, trg, src_len, trg_len = test_dataset[i]
        src = src.unsqueeze(0)  # Add batch dimension
        src_len = torch.tensor([src_len])

        # Get original Malayalam and English words
        ml_word = ''.join([ml_idx_to_char[idx.item()] for idx in src[0] if idx.item() > 1])
        en_word = ''.join([en_idx_to_char[idx.item()] for idx in trg if idx.item() > 3])  # Skip special tokens

        # Predict with greedy decoding
        greedy_pred = predict(model, src, src_len, en_idx_to_char, device, beam_size=1)[0]

        # Predict with beam search if beam size > 1
        beam_pred = predict(model, src, src_len, en_idx_to_char, device, beam_size=args.beam_size)[0] if args.beam_size > 1 else greedy_pred

        print(f"Example {i+1}:")
        print(f"Malayalam: {ml_word}")
        print(f"Target English: {en_word}")
        print(f"Prediction (Greedy): {greedy_pred}")
        if args.beam_size > 1:
            print(f"Prediction (Beam Search, size={args.beam_size}): {beam_pred}")
        print()

    # Calculate accuracy on test set
    all_predictions = []
    all_targets = []

    for batch in test_dataloader:
        src, trg, src_lengths, _ = batch
        src, trg = src.to(device), trg.to(device)

        # Get predictions
        predictions = predict(model, src, src_lengths, en_idx_to_char, device, beam_size=args.beam_size)

        # Get target English words
        for i in range(len(trg)):
            # Skip special tokens
            target_word = ''.join([en_idx_to_char[idx.item()] for idx in trg[i] if idx.item() > 3])
            all_targets.append(target_word)

        all_predictions.extend(predictions)

    # Calculate accuracy
    accuracy = calculate_accuracy(all_predictions, all_targets)
    print(f"Test Accuracy: {accuracy:.2f}%")

    # Save vocabularies for later use
    import json
    with open('ml_char_to_idx.json', 'w') as f:
        json.dump(ml_char_to_idx, f)
    with open('en_char_to_idx.json', 'w') as f:
        json.dump(en_char_to_idx, f)


# Function to run the model with specified parameters
def run_transliteration_model(
    embedding_dim=64,
    encoder_layers=2,
    decoder_layers=2,
    hidden_size=64,
    cell_type='LSTM',
    dropout=0.2,
    attention=True,
    teacher_forcing=0.5,
    beam_size=1,
    batch_size=64,
    epochs=15,
    learning_rate=0.001,
    data_path='/content/ml/lexicons/ml.translit.sampled.train.tsv',
    val_path='/content/ml/lexicons/ml.translit.sampled.dev.tsv',
    test_path='/content/ml/lexicons/ml.translit.sampled.test.tsv'
):
    """
    Run the Malayalam to English transliteration model with specified parameters.
    This function can be called directly from a Jupyter/Colab notebook.
    """
    class Args:
        pass

    args = Args()
    args.embedding_dim = embedding_dim
    args.encoder_layers = encoder_layers
    args.decoder_layers = decoder_layers
    args.hidden_size = hidden_size
    args.cell_type = cell_type
    args.dropout = dropout
    args.attention = attention
    args.teacher_forcing = teacher_forcing
    args.beam_size = beam_size
    args.batch_size = batch_size
    args.epochs = epochs
    args.learning_rate = learning_rate
    args.data_path = data_path
    args.val_path = val_path
    args.test_path = test_path

    main(args)


if __name__ == "__main__":
    main()

usage: colab_kernel_launcher.py [-h] [--embedding_dim {16,32,64,256}]
                                [--encoder_layers {1,2,3}]
                                [--decoder_layers {1,2,3}]
                                [--hidden_size {16,32,64,256}]
                                [--cell_type {RNN,GRU,LSTM}]
                                [--dropout {0.2,0.3}] [--attention]
                                [--teacher_forcing TEACHER_FORCING]
                                [--beam_size BEAM_SIZE]
                                [--batch_size BATCH_SIZE] [--epochs EPOCHS]
                                [--learning_rate LEARNING_RATE]
                                [--data_path DATA_PATH] [--val_path VAL_PATH]
                                [--test_path TEST_PATH]
colab_kernel_launcher.py: error: unrecognized arguments: -f /root/.local/share/jupyter/runtime/kernel-397ee6ca-b4ca-47be-a75f-46cf1938a8bb.json


Using default arguments since we appear to be in a notebook environment
Using device: cuda
Data loaded successfully:
Training samples: 58381
Validation samples: 5641
Test samples: 5610

Checking data types:
Training data types: 0    object
1    object
2     int64
dtype: object

Sample data:
     0     1  2
0   അം    am  3
1  അംഗ  amga  2
2  അംഗ  anga  1
Number of Malayalam words: 58381
Number of English words: 58381

Converting data types if needed...
Dataset initialized with SOS index: 2, EOS index: 3
Dataset initialized with SOS index: 2, EOS index: 3
Dataset initialized with SOS index: 2, EOS index: 3
Model Architecture:
Input Size: 72
Output Size: 30
Embedding Dimension: 64
Hidden Size: 64
Encoder Layers: 2
Decoder Layers: 2
Cell Type: LSTM
Dropout: 0.2
Attention: True
Teacher Forcing Ratio: 0.5
Beam Size: 1
Batch Size: 64
Learning Rate: 0.001
Number of Epochs: 15


Epoch 1/15:  51%|█████     | 464/913 [00:33<00:30, 14.79it/s]

In [None]:
sweep_config = {
  "method": "bayes",
  "metric": {"name": "val_accuracy", "goal": "maximize"},
  "parameters": {
    "embedding_size": {"values": [32, 64, 128]},
    "hidden_size": {"values": [64, 128]},
    "num_encoder_layers": {"values": [1, 2]},
    "num_decoder_layers": {"values": [1, 2]},
    "cell_type": {"values": ["RNN", "GRU", "LSTM"]},
    "dropout": {"values": [0.2, 0.3]},
    "beam_size": {"values": [1, 3, 5]},
    "learning_rate": {"values": [1e-3, 5e-4]}
  }
}


In [None]:
import wandb

wandb.init(project="DL-Assignment3")
sweep_id = wandb.sweep(sweep_config, project="DL-Assignment3")
wandb.agent(sweep_id, function=train_function)


In [None]:
class Encoder(nn.Module):
  def __init__(self,input_size, embedding_size, hidden_size, enc_layers, p, cell_type, bidirectional):
    super(Encoder,self).__init__()
    self.hidden_size = hidden_size
    self.enc_layers = enc_layers
    self.dropout = nn.Dropout(p)
    self.cell_type = cell_type
    self.bidirectional = bidirectional
    self.embedding = nn.Embedding(input_size, embedding_size)
    if(cell_type == "GRU"):
      self.gru = nn.GRU(embedding_size, hidden_size, enc_layers, dropout = p, bidirectional = bidirectional)
    if(cell_type == "RNN"):
      self.rnn = nn.RNN(embedding_size, hidden_size, enc_layers, dropout = p, bidirectional = bidirectional)
    if(cell_type == "LSTM"):
      self.lstm = nn.LSTM(embedding_size, hidden_size, enc_layers, dropout = p, bidirectional = bidirectional)

  def forward(self, x):
    embedding = self.dropout(self.embedding(x))
    # embedding shape : (seq_length, N, embedding_size)
    if(self.cell_type == "GRU"):
      output, hidden = self.gru(embedding)
    if(self.cell_type == "RNN"):
      output, hidden = self.rnn(embedding)
    if(self.cell_type == "LSTM"):
      outputs, (hidden,cell) = self.lstm(embedding)
      return outputs, hidden, cell
    return output, hidden

  def initHidden(self):
    return torch.zeros(1, 1, self.hidden_size, device=device)
class Decoder(nn.Module):
  def __init__(self, input_size, embedding_size, hidden_size, output_size, dec_layers, p, cell_type):
    super(Decoder, self).__init__()
    self.hidden_size = hidden_size
    self.dec_layers = dec_layers
    self.dropout = nn.Dropout(p)
    self.cell_type = cell_type
    self.embedding = nn.Embedding(input_size, embedding_size)
    if(cell_type == "GRU"):
      self.gru = nn.GRU(embedding_size, hidden_size, dec_layers, dropout = p)
    if(cell_type == "RNN"):
      self.rnn = nn.RNN(embedding_size, hidden_size, dec_layers, dropout = p)
    if(cell_type == "LSTM"):
      self.lstm = nn.LSTM(embedding_size, hidden_size, dec_layers, dropout = p)
    self.fc = nn.Linear(hidden_size, output_size)  # fully connected.

  def forward(self,x,output, hidden, cell = 0):
    # shape of x: (N) but we want (1,N)
    x = x.unsqueeze(0).int()
    embedding = self.dropout(self.embedding(x))
    # embedding shape : (1,N,embedding_size)
    if(self.cell_type == "GRU"):
        outputs, hidden = self.gru(embedding, hidden)
    if(self.cell_type == "RNN"):
        outputs, hidden = self.rnn(embedding, hidden)
    if(self.cell_type == "LSTM"):
        outputs, (hidden, cell) = self.lstm(embedding, (hidden, cell))
    # shape of outputs: (1, N, hidden_size)
    predictions = self.fc(outputs)
    # shape of predictions: (1, N, length_of_vocab)
    predictions = predictions.squeeze(0)
    # shape of predictions: (N, length_of_vocab)
    if(self.cell_type == "LSTM"):
        return predictions, hidden, cell
    return predictions, hidden


  def initHidden(self):
    return torch.zeros(1, 1, self.hidden_size, device=device)
class Atten_decoder(nn.Module):
  def __init__(self, input_size, embedding_size, hidden_size, output_size, dec_layers, p, cell_type, bidirectional):
    super(Atten_decoder, self).__init__()
    self.hidden_size = hidden_size
    self.output_size = output_size
    self.max_length = len(english_matrix[0])  #30
    self.dec_layers = dec_layers
    self.dropout = nn.Dropout(p)
    self.cell_type = cell_type
    self.embedding = nn.Embedding(input_size, embedding_size)
    if(cell_type == "GRU"):
      self.gru = nn.GRU(hidden_size, hidden_size, dec_layers, dropout = p)
    if(cell_type == "RNN"):
      self.rnn = nn.RNN(hidden_size, hidden_size, dec_layers, dropout = p)
    if(cell_type == "LSTM"):
      self.lstm = nn.LSTM(hidden_size, hidden_size, dec_layers, dropout = p)
    self.fc = nn.Linear(hidden_size, output_size)
    self.attn = nn.Linear(hidden_size+embedding_size, self.max_length)
    if(bidirectional):
      self.attn_combine = nn.Linear(hidden_size * 2 + embedding_size, hidden_size)
    else :
      self.attn_combine = nn.Linear(hidden_size + embedding_size, hidden_size)

  def forward(self, x,output, hidden, cell = 0):
    x = x.unsqueeze(0)
    output=output.permute(1,0,2)
    embedded = self.embedding(x)
    embedded = self.dropout(embedded)
    attn_weights = F.softmax(self.attn(torch.cat((embedded[0],hidden[0]), 1)), dim = 1)
    attn_applied = torch.bmm(attn_weights.unsqueeze(1),output)
    attn_applied = attn_applied.squeeze(1)
    op = torch.cat((embedded[0], attn_applied), 1)

    op = self.attn_combine(op).unsqueeze(0)
    op = F.relu(op)
    if(self.cell_type == "GRU"):
        outputs, hidden = self.gru(op, hidden)
    if(self.cell_type == "RNN"):
        outputs, hidden = self.rnn(op, hidden)
    if(self.cell_type == "LSTM"):
        outputs, (hidden, cell) = self.lstm(op, (hidden, cell))
    predictions = self.fc(outputs)
    # shape of predictions: (1, N, length_of_vocab)
    predictions = predictions.squeeze(0)
    # shape of predictions: (N, length_of_vocab)
    if(self.cell_type == "LSTM"):
        return predictions, hidden, cell
    return predictions, hidden
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, cell_type, bidirectional, enc_layers, dec_layers):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.cell_type = cell_type
        self.bidirectional = bidirectional
        self.enc_layers = enc_layers
        self.dec_layers = dec_layers

    def forward(self, source, target, teacher_force_ratio=0.5):
        batch_size = source.shape[1]
        target_len = target.shape[0]
        target_vocab_size = len(malayalam_chars) + 2
        outputs = torch.zeros(target_len, batch_size, target_vocab_size).to(device)
        if(self.cell_type == "LSTM"):
            encoder_output, hidden, cell = self.encoder(source)
        else:
            encoder_output, hidden = self.encoder(source)
        # if(self.bidirectional == True):
        if(self.enc_layers != self.dec_layers or self.bidirectional == True):
          hidden = hidden[self.enc_layers - 1] + hidden[self.enc_layers - 1]
          hidden = hidden.repeat(self.dec_layers,1,1)
          if(self.cell_type == "LSTM"):
              cell = cell[self.enc_layers - 1] + cell[self.enc_layers - 1]
              cell = cell.repeat(self.dec_layers,1,1)

        x = target[0]

        for t in range(1, target_len):
#             print("STARTED t= ",t)
            if(self.cell_type == "LSTM"):
                output, hidden, cell = self.decoder(x, encoder_output, hidden, cell)
            else :
                output, hidden = self.decoder(x, encoder_output, hidden)
            outputs[t] = output

            best_guess = output.argmax(1)

            x = target[t] if random.random() < teacher_force_ratio else best_guess

#         print("decoder sucessful")
        return outputs

In [None]:
import wandb
import torch
import torch.nn as nn
import torch.optim as optim
import random

def train(model, iterator, optimizer, criterion, clip, device, teacher_force_ratio=0.5):
    model.train()
    epoch_loss = 0
    epoch_acc = 0
    total_tokens = 0

    for src, trg in iterator:
        src = src.to(device).transpose(0,1)  # (seq_len, batch)
        trg = trg.to(device).transpose(0,1)  # (seq_len, batch)

        optimizer.zero_grad()

        output = model(src, trg, teacher_force_ratio)  # with teacher forcing

        output_dim = output.shape[-1]
        output = output[1:].reshape(-1, output_dim)
        trg_y = trg[1:].reshape(-1)

        loss = criterion(output, trg_y)
        loss.backward()

        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()

        epoch_loss += loss.item()

        preds = output.argmax(dim=1)
        non_pad = trg_y != criterion.ignore_index
        correct = (preds == trg_y) & non_pad

        epoch_acc += correct.sum().item()
        total_tokens += non_pad.sum().item()

    return epoch_loss / len(iterator), epoch_acc / total_tokens


def evaluate(model, iterator, criterion, device, beam_size=1):
    model.eval()
    epoch_loss = 0
    epoch_acc = 0
    total_tokens = 0

    with torch.no_grad():
        for src, trg in iterator:
            src = src.to(device).transpose(0,1)
            trg = trg.to(device).transpose(0,1)

            if beam_size == 1:
                # no beam search; greedy decoding (teacher forcing off)
                output = model(src, trg, teacher_force_ratio=0)
            else:
                # TODO: Implement beam search decoding here
                output = beam_search_decode(model, src, beam_size, device)

            output_dim = output.shape[-1]
            output_flat = output[1:].reshape(-1, output_dim)
            trg_flat = trg[1:].reshape(-1)

            loss = criterion(output_flat, trg_flat)
            epoch_loss += loss.item()

            preds = output_flat.argmax(dim=1)
            non_pad = trg_flat != criterion.ignore_index
            correct = (preds == trg_flat) & non_pad

            epoch_acc += correct.sum().item()
            total_tokens += non_pad.sum().item()

    return epoch_loss / len(iterator), epoch_acc / total_tokens


def wandb_trainer(config=None):
    # Initialize wandb run with sweep config
    wandb.init(config=config)
    config = wandb.config

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Setup model based on config parameters
    encoder_net = Encoder(
        input_size=len(english_chars) + 2,
        embedding_size=config.embedding_size,
        hidden_size=config.hidden_size,
        enc_layers=config.enc_layers,
        p=config.dropout,
        cell_type=config.cell_type,
        bidirectional=config.bidirectional
    ).to(device)

    if config.attention:
        decoder_net = Atten_decoder(
            input_size=len(malayalam_chars) + 2,
            embedding_size=config.embedding_size,
            hidden_size=config.hidden_size,
            output_size=len(malayalam_chars) + 2,
            dec_layers=config.dec_layers,
            p=config.dropout,
            cell_type=config.cell_type,
            bidirectional=config.bidirectional
        ).to(device)
    else:
        decoder_net = Decoder(
            input_size=len(malayalam_chars) + 2,
            embedding_size=config.embedding_size,
            hidden_size=config.hidden_size,
            output_size=len(malayalam_chars) + 2,
            dec_layers=config.dec_layers,
            p=config.dropout,
            cell_type=config.cell_type
        ).to(device)

    model = Seq2Seq(
        encoder_net,
        decoder_net,
        config.cell_type,
        config.bidirectional,
        config.enc_layers,
        config.dec_layers
    ).to(device)

    optimizer = optim.Adam(model.parameters(), lr=config.learning_rate)
    pad_idx = len(malayalam_chars) + 1
    criterion = nn.CrossEntropyLoss(ignore_index=pad_idx)

    CLIP = 1

    for epoch in range(config.epochs):
        train_loss, train_acc = train(model, train_loader, optimizer, criterion, CLIP, device, config.teacher_force_ratio)
        valid_loss, valid_acc = evaluate(model, val_loader, criterion, device, beam_size=config.beam_size)

        print(f"Epoch {epoch+1}/{config.epochs}")
        print(f"Train Loss: {train_loss:.3f} | Train Acc: {train_acc:.2%}")
        print(f"Val Loss: {valid_loss:.3f} | Val Acc: {valid_acc:.2%}")

        wandb.log({
            "train_loss": train_loss,
            "train_accuracy": train_acc,
            "val_loss": valid_loss,
            "val_accuracy": valid_acc,
            "epoch": epoch
        })

    # Optional: test evaluation after training ends
    test_loss, test_acc = evaluate(model, test_loader, criterion, device, beam_size=config.beam_size)
    print(f"Test Loss: {test_loss:.3f} | Test Acc: {test_acc:.2%}")
    wandb.log({"test_loss": test_loss, "test_accuracy": test_acc})



In [None]:
# Hyperparameters / setup (example)
clip = 1
teacher_force_ratio = 0.5

# Run one epoch of training
train_loss, train_acc = train(
    model=model,
    iterator=train_loader,
    optimizer=optimizer,
    criterion=criterion,
    clip=clip,
    device=device,
    teacher_force_ratio=teacher_force_ratio
)

print(f"Train Loss: {train_loss:.4f}, Train Accuracy: {train_acc*100:.2f}%")

# Run one epoch of evaluation (test)
val_loss, val_acc = evaluate(
    model=model,
    iterator=val_loader,
    criterion=criterion,
    device=device,
    beam_size=1  # greedy decoding
)

print(f"Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_acc*100:.2f}%")


Train Loss: 1.3874, Train Accuracy: 62.73%


TypeError: Seq2Seq.forward() got an unexpected keyword argument 'teacher_force_ratio'



# Question 3 (15 Marks)
Based on the above plots write down some insightful observations. For example,
- RNN based model takes longer time to converge than GRU or LSTM
- using smaller sizes for the hidden layer does not give good results
- dropout leads to better performance

(Note: I don't know if any of the above statements is true. I just wrote some random comments that came to my mind)

Of course, each inference should be backed by appropriate evidence.



# Question 4 (10 Marks)

You will now apply your best model on the test data (You shouldn't have used test data so far. All the above experiments should have been done using train and val data only).

(a) Use the best model from your sweep and report the accuracy on the test set (the output is correct only if it exactly matches the reference output).

(b) Provide sample inputs from the test data and predictions made by your best model (more marks for presenting this grid creatively). Also upload all the predictions on the test set in a folder **predictions_vanilla** on your github project.

(c) Comment on the errors made by your model (simple insightful bullet points)

- The model makes more errors on consonants than vowels
- The model makes more errors on longer sequences
- I am thinking confusion matrix but may be it's just me!
- ...



# Question 5 (20 Marks)

Now add an attention network to your basis sequence to sequence model and train the model again. For the sake of simplicity you can use a single layered encoder and a single layered decoder (if you want you can use multiple layers also). Please answer the following questions:

(a) Did you tune the hyperparameters again? If yes please paste appropriate plots below.

(b) Evaluate your best model on the test set and report the accuracy. Also upload all the predictions on the test set in a folder **predictions_attention** on your github project.

(c) Does the attention based model perform better than the vanilla model? If so, can you check some of the errors that this model corrected and note down your inferences (i.e., outputs which were predicted incorrectly by your best seq2seq model are predicted correctly by this model)

(d) In a 3 x 3 grid paste the attention heatmaps for 10 inputs from your test data (read up on what are attention heatmaps).



# Question 6 (20 Marks)

This a challenge question and most of you will find it hard.

I like the visualisation in the figure captioned "Connectivity" in this [article](https://distill.pub/2019/memorization-in-rnns/#appendix-autocomplete). Make a similar visualisation for your model. Please look at this [blog](https://towardsdatascience.com/visualising-lstm-activations-in-keras-b50206da96ff) for some starter code. The goal is to figure out the following: When the model is decoding the $i$-th character in the output which is the input character that it is looking at?

Have fun!




# Question 7 (10 Marks)
Paste a link to your github code for Part A

Example: https://github.com/&lt;user-id&gt;/da6401_assignment3/partA;

- We will check for coding style, clarity in using functions and a README file with clear instructions on training and evaluating the model (the 10 marks will be based on this).

- We will also run a plagiarism check to ensure that the code is not copied (0 marks in the assignment if we find that the code is plagiarised).

- We will check the number of commits made by the two team members and then give marks accordingly. For example, if we see 70% of the commits were made by one team member then that member will get more marks in the assignment (**note that this contribution will decide the marks split for the entire assignment and not just this question**).

- We will also check if the training and test splits have been used properly. You will get 0 marks on the assignment if we find any cheating (e.g., adding test data to training data) to get higher accuracy.




# Question 8 (0 Marks)

Note that this question does not carry any marks and will not be graded. This is only for students who are looking for a challenge and want to get something more out of the course.

Your task is to finetune the GPT2 model to generate lyrics for English songs. You can refer to [this blog](https://towardsdatascience.com/natural-language-generation-part-2-gpt-2-and-huggingface-f3acb35bc86a) and follow the steps there. This blog shows how to finetune the GPT2 model to generate headlines for financial articles. Instead of headlines you will use lyrics so you may find the following datasets useful for training: [dataset1](https://data.world/datasets/lyrics), [dataset2](https://www.kaggle.com/paultimothymooney/poetry)

At test time you will give it a prompt: "I love Deep Learning" and it should complete the song based on this prompt :-) Paste the generated song in a block below!

### Self Declaration



I, Name_XXX (Roll no: XXYY), swear on my honour that I have written the code and the report by myself and have not copied it from the internet or other students.



