In [1]:
import numpy as np
import pandas as pd

import wandb
import random
import torch
import torch.nn as nn
from torch import optim
import torch.nn.functional as F
from tqdm.auto import tqdm

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [2]:
# File paths
train_file = '/Users/pratikkadlak/Pratik/IITM/SEM_2/Deep_Learning/Assignment_3/aksharantar_sampled/mar/mar_train.csv'
test_file = '/Users/pratikkadlak/Pratik/IITM/SEM_2/Deep_Learning/Assignment_3/aksharantar_sampled/mar/mar_test.csv'
val_file = '/Users/pratikkadlak/Pratik/IITM/SEM_2/Deep_Learning/Assignment_3/aksharantar_sampled/mar/mar_valid.csv'

# Read data
train_data = pd.read_csv(train_file, header=None)
test_data = pd.read_csv(test_file, header=None)
val_data = pd.read_csv(val_file, header=None)

# Split into English and Marathi words
english_train = train_data.iloc[:, 0]
marathi_train = train_data.iloc[:, 1]

english_test = test_data.iloc[:, 0]
marathi_test = test_data.iloc[:, 1]

english_val = val_data.iloc[:, 0]
marathi_val = val_data.iloc[:, 1]

In [3]:
def create_char_list(words):
    char_list = []
    max_length_word = -1
    for word in words:
        max_length_word = max(max_length_word, len(word))
        for char in word:
            char_list.append(char)
    char_list = list(set(char_list))
    char_list.sort()
    return char_list, max_length_word

def find_max_length(word_list):
    max_length = -1
    for word in word_list:
        max_length = max(max_length, len(word))
    return max_length

# Create character lists and find maximum word lengths
english_chars, english_max_len = create_char_list(english_train)
marathi_chars, marathi_max_len = create_char_list(marathi_train)

# Find maximum word lengths from validation and test data
english_max_len = max(find_max_length(english_val), find_max_length(english_test), english_max_len)
marathi_max_len = max(find_max_length(marathi_val), find_max_length(marathi_test), marathi_max_len)

In [4]:
def word_to_vector(word, lang):
    vector = []
    if(lang == "english"):
        vector.append(len(english_chars) + 1)
        for char in word:
            for i in range(len(english_chars)):
                if(english_chars[i] == char):
                    vector.append(i+1)
    else :
        vector.append(len(marathi_chars) + 1)
        for char in word:
            for i in range(len(marathi_chars)):
                if( marathi_chars[i] == char):
                    vector.append(i+1)
            
    max_len = -1
    if lang == "english": max_len = english_max_len
    else: max_len = marathi_max_len
        
    while(len(vector) < max_len + 1):  # padding with max_length + 1.
        vector.append(0)
            
    vector.append(0)
    return(vector)

In [5]:
# creating matrix of representation for whole words of english and marathi.
def word_matrix(words, language):
    matrix = []
    for word in words:
        matrix.append(word_to_vector(word, language))
    return(matrix)

In [6]:
# Calculate representations of Training English and Marathi words
english_word_representations = word_matrix(english_train, "english")
marathi_word_representations = word_matrix(marathi_train, "marathi")

# Convert to PyTorch tensors
english_matrix = torch.tensor(english_word_representations)
marathi_matrix = torch.tensor(marathi_word_representations)

# Calculate representations for validation data
english_word_representations_val = word_matrix(english_val, "english")
marathi_word_representations_val = word_matrix(marathi_val, "marathi")

# Convert to PyTorch tensors
english_matrix_val = torch.tensor(english_word_representations_val)
marathi_matrix_val = torch.tensor(marathi_word_representations_val)

# Calculate representations for test data
english_word_representations_test = word_matrix(english_test, "english")
marathi_word_representations_test = word_matrix(marathi_test, "marathi")

# Convert to PyTorch tensors
english_matrix_test = torch.tensor(english_word_representations_test)
marathi_matrix_test = torch.tensor(marathi_word_representations_test)

In [10]:
def calculate_accuracy(model, input_data, target_data, batch_size):
    correct_count = 0
    total_batches = len(input_data) // batch_size
    for batch_idx in range(total_batches):
        inp_data = input_data[batch_size * batch_idx: batch_size * (batch_idx + 1)].to(device)
        target = target_data[batch_size * batch_idx: batch_size * (batch_idx + 1)].to(device)
        
        output = model(inp_data.T, target.T, teacher_force_ratio=0)
        output = F.softmax(output, dim=2)
        output = torch.argmax(output, dim=2)
        output = output.T
        
        for i in range(batch_size):
            if torch.equal(output[i][1:], target[i][1:]):
                correct_count += 1
                
    accuracy = correct_count * 100 / len(input_data)
    return accuracy

In [11]:
train_predictions = []
test_predictions = []
val_predictions = []

def create_predictions_lists(model, input_data, target_data, batch_size, dataset):
    predictions_list = []
    total_batches = len(input_data) // batch_size
    
    for batch_idx in range(total_batches):
        start_idx = batch_size * batch_idx
        end_idx = batch_size * (batch_idx + 1)
        
        inp_data = input_data[start_idx:end_idx].to(device)
        target = target_data[start_idx:end_idx].to(device)
        
        output = model(inp_data.T, target.T, teacher_force_ratio=0)
        output = F.softmax(output, dim=2)
        output = torch.argmax(output, dim=2)
        output = output.T
        
        for i in range(len(target)):
            target_word = target[i]
            output_word = output[i]
            word_target = ""
            word_output = ""
            
            for j in range(len(target_word)):
                if target_word[j] > 0 and target_word[j] < 64:
                    word_target += marathi_chars[target_word[j] - 1]
            for j in range(len(output_word)):
                if output_word[j] > 0 and output_word[j] < 64:
                    word_output += marathi_chars[output_word[j] - 1]
            
            predictions_list.append([word_target, word_output])

    if dataset == "train":
        train_predictions.extend(predictions_list)
    elif dataset == "validation":
        val_predictions.extend(predictions_list)
    elif dataset == "test":
        test_predictions.extend(predictions_list)

In [7]:
class Encoder(nn.Module):
    def __init__(self, input_size, embedding_dim, hidden_size, num_layers, batch_size, dropout_prob, bidirectional, cell_type):
        super(Encoder, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.batch_size = batch_size
        self.dropout = nn.Dropout(dropout_prob)
        self.biderectional = bidirectional
        self.embedding_dim = embedding_dim
        self.embedding = nn.Embedding(input_size, embedding_dim)
        self.cell_type = cell_type
        
        rnn_class = nn.RNN if cell_type == "RNN" else (nn.LSTM if cell_type == "LSTM" else nn.GRU)
        self.rnn = rnn_class(embedding_dim, hidden_size, num_layers, dropout=dropout_prob, bidirectional=bidirectional)

    def forward(self, input):
        embedded = self.dropout(self.embedding(input))
        if self.cell_type == "LSTM":
            output, (hidden, cell) = self.rnn(embedded)
        else:
            output, hidden = self.rnn(embedded)
        
        return (output, hidden, cell) if self.cell_type == "LSTM" else (output, hidden)

    def init_hidden(self):
        return torch.zeros(1, 1, self.hidden_size, device=device)

In [8]:
class Decoder(nn.Module):    
    def __init__(self, input_size, hidden_size, output_size, num_layers, dropout_prob, batch_size, embedding_dim, bidirectional, cell_type):
        super(Decoder, self).__init__()
        self.hidden_size = hidden_size
        self.dropout = nn.Dropout(dropout_prob)
        self.num_layers = num_layers
        self.batch_size = batch_size
        self.max_len = english_max_len + 2 
        self.embedding_dim = embedding_dim
        self.embedding = nn.Embedding(input_size, embedding_dim)
        self.cell_type = cell_type
                
        rnn_class = nn.GRU if cell_type == "GRU" else nn.LSTM if cell_type == "LSTM" else nn.RNN
        self.rnn = rnn_class(hidden_size, hidden_size, num_layers, dropout=dropout_prob)
        
        self.out = nn.Linear(hidden_size, output_size)
        # self.softmax = nn.LogSoftmax(dim=1)
        
        self.attention = nn.Linear(hidden_size+embedding_dim, self.max_len)
        
        num = 1
        if bidirectional: num = 2
        
        self.attention_combine = nn.Linear(hidden_size*num + embedding_dim, hidden_size)

        
    def forward(self, input_seq, encoder_output, hidden_state, cell_state=None):
        embedded_input = self.dropout(self.embedding(input_seq.unsqueeze(0)))
        encoder_output = encoder_output.permute(1, 0, 2)
                
        attn_weights = F.softmax(self.attention(torch.cat((embedded_input[0], hidden_state[0]), 1)), dim=1)
        attn_applied = torch.bmm(attn_weights.unsqueeze(1), encoder_output).squeeze(1)
        
        combined_input = torch.cat((embedded_input[0], attn_applied), 1)
        combined_input = F.relu(self.attention_combine(combined_input).unsqueeze(0))
        
        if self.cell_type == "LSTM":
            output, (hidden_state, cell_state) = self.rnn(combined_input, (hidden_state, cell_state))
        else:
            output, hidden_state = self.rnn(combined_input, hidden_state)
            
        predictions = self.out(output)
        predictions = predictions.squeeze(0)
        
        if self.cell_type == "LSTM":
            return predictions, hidden_state, cell_state
        return predictions, hidden_state
    
    def init_hidden(self):
        return torch.zeros(1, 1, self.hidden_size, device=device)

In [9]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, cell_type, bidirectional, encoder_layers, decoder_layers):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.cell_type = cell_type
        self.bidirectional = bidirectional
        self.encoder_layers = encoder_layers
        self.decoder_layers = decoder_layers    
    
    def forward(self, source_sequence, target_sequence, teacher_force_ratio=0.5):
        target_len = target_sequence.shape[0]
        batch_size = source_sequence.shape[1]
        target_vocab_size = len(marathi_chars) + 2  

        outputs = torch.zeros(target_len, batch_size, target_vocab_size).to(device)

        if self.cell_type == "LSTM":
            encoder_output, encoder_hidden, encoder_cell = self.encoder(source_sequence)
        else:
            encoder_output, encoder_hidden = self.encoder(source_sequence) 
        
        if self.encoder_layers != self.decoder_layers or self.bidirectional:
            encoder_hidden = encoder_hidden[self.encoder_layers - 1] + encoder_hidden[self.encoder_layers - 1]
            encoder_hidden = encoder_hidden.repeat(self.decoder_layers, 1, 1)

            if self.cell_type == "LSTM":
                encoder_cell = encoder_cell[self.encoder_layers - 1] + encoder_cell[self.encoder_layers - 1]
                encoder_cell = encoder_cell.repeat(self.decoder_layers, 1, 1)

        decoder_input = target_sequence[0]

        for t in range(1, target_len):
            if self.cell_type == "LSTM":
                decoder_output, decoder_hidden, decoder_cell = self.decoder(decoder_input, encoder_output, encoder_hidden, encoder_cell)
            else:
                decoder_output, decoder_hidden = self.decoder(decoder_input, encoder_output, encoder_hidden)

            outputs[t] = decoder_output

            best_guess = decoder_output.argmax(1)

            decoder_input = target_sequence[t] if random.random() < teacher_force_ratio else best_guess

        return outputs

In [12]:
def train_model(epochs, learning_rate, cell_type, bidirectional, enc_layers, dec_layers, batch_size, embedding_dim, hidden_size, enc_dropout, dec_dropout):
    pad_idx = len(marathi_chars) + 1  # Padding index for Marathi

    input_size_encoder = len(english_chars) + 2  
    input_size_decoder = len(marathi_chars) + 2  
    output_size = len(marathi_chars) + 2  

    encoder = Encoder(input_size_encoder, embedding_dim, hidden_size, enc_layers, batch_size, enc_dropout, bidirectional, cell_type).to(device)
    decoder = Decoder(input_size_decoder, hidden_size, output_size, dec_layers, dec_dropout, batch_size, embedding_dim, bidirectional, cell_type).to(device)

    model = Seq2Seq(encoder, decoder, cell_type, bidirectional, enc_layers, dec_layers).to(device)
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    criterion = nn.CrossEntropyLoss(ignore_index=pad_idx)
    
    for epoch in range(epochs):
        print("Epoch: ", epoch+1)

        model.train()
        total_loss = 0
        step = 0
        total_batches = len(english_matrix) // batch_size

        for batch_idx in tqdm(range(total_batches)):
            start_idx = batch_size * batch_idx
            end_idx = batch_size * (batch_idx + 1)

            inp_data = english_matrix[start_idx:end_idx].to(device)
            target = marathi_matrix[start_idx:end_idx].to(device)
            target = target.T

            optimizer.zero_grad()
            output = model(inp_data.T, target)
            
            output = output[1:].reshape(-1, output.shape[2])
            target = target[1:].reshape(-1)

            loss = criterion(output, target)
            total_loss += loss.item()
            loss.backward()

            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1)

            optimizer.step()

            step += 1

        training_accuracy = calculate_accuracy(model, english_matrix, marathi_matrix, batch_size)
        val_accuracy = calculate_accuracy(model, english_matrix_val, marathi_matrix_val, batch_size)
        test_accuracy = calculate_accuracy(model, english_matrix_test, marathi_matrix_test, batch_size)
        
        print(f"Total Loss: {total_loss/step}\t Training Acc: {training_accuracy}\t Val_Acc: {val_accuracy}\t Test_Acc: {test_accuracy}")
#         wandb.log({"training_loss": total_loss / step, "train_accuracy": training_accuracy, "val_accuracy": val_accuracy,  'epoch': epoch})
        
        if epoch >= 9 and val_accuracy < 5: return
        if epoch >= 4 and val_accuracy < 0: return

#     create_predictions_lists(model, english_matrix, marathi_matrix, batch_size, "train")
#     create_predictions_lists(model, english_matrix_val, marathi_matrix_val, batch_size, "validation")
#     create_predictions_lists(model, english_matrix_test, marathi_matrix_test, batch_size, "test")

In [None]:
epochs = 20
learning_rate = 1e-3
cell_type = "GRU"
bidirectional = True
enc_layers = 2
dec_layers = 2
batch_size = 256
embedding_dim = 256
hidden_size = 512
enc_dropout = 0
dec_dropout = 0

train_model(epochs, learning_rate, cell_type, bidirectional, enc_layers,
            dec_layers, batch_size, embedding_dim, hidden_size, enc_dropout, dec_dropout)