In [1]:
import os
import wandb
import torch
import torch.nn as nn
import random
from torch.autograd import Variable
from torch.utils.data import DataLoader
import pandas as pd
import torch.optim as optim
import torch.nn.functional as Function
import argparse


In [None]:
SYMBOL_BEGIN, SYMBOL_END, SYMBOL_UNKNOWN, SYMBOL_PADDING = 0, 1, 2, 3

INPUT_LABEL = "input"
TARGET_LABEL = "target"
DELIMETER = ","

RNN_KEY = "RNN"
GRU_KEY = "GRU"
LSTM_KEY = "LSTM"

INPUT_LANG_KEY = "input_lang"
OUTPUT_LANG_KEY = "output_lang"
PAIRS_KEY = "pairs"
MAX_LEN_KEY = "max_len"

input_lang = "eng"
TARGET_LANG = "hin"

TRAIN_LABEL = "train"
TEST_LABEL = "test"
VALID_LABEL = "valid"

DEFAULT_PATH = "/kaggle/input/aksharantar-sampled/aksharantar_sampled"
TRAIN_DATASET_PATH = f"{DEFAULT_PATH}/{TARGET_LANG}/{TARGET_LANG}_{TRAIN_LABEL}.csv"
VALIDATION_DATASET_PATH = f"{DEFAULT_PATH}/{TARGET_LANG}/{TARGET_LANG}_{VALID_LABEL}.csv"
TEST_DATASET_PATH = f"{DEFAULT_PATH}/{TARGET_LANG}/{TARGET_LANG}_{TEST_LABEL}.csv"

NADAM_KEY = "Nadam"

# Sweep param labels
EMBEDDING_SIZE_KEY = "embedding_size"
EPOCHS_KEY = "epochs"
ENCODER_LAYER_KEY = "encoder_layers"
DECODER_LAYER_KEY = "decoder_layers"
HIDDEN_LAYER_KEY = "hidden_layer"
IS_BIDIRECTIONAL_KEY = "bidirectional"
DROPOUT_KEY = "dropout"
CELL_TYPE_KEY = "cell_type"
LEARNING_RATE_KEY = "learning_rate"
BATCH_SIZE_KEY = "batch_size"

# wandb constants
WANDB_PROJECT_NAME="dl-assignment-3"
WANDB_ENTITY_NAME="dl-1"

# wandb plot titles
TRAIN_ACCURACY_TITLE = "train_acc"
VALIDATION_ACCURACY_TITLE = "val_acc"
TEST_ACCURACY_TITLE = "test_acc"
TRAIN_LOSS_TITLE = "train_loss"
VALIDATION_LOSS_TITLE = "val_loss"
TEST_LOSS_TITLE = "test_loss"



# Set the device type to CUDA if available, otherwise use CPU
is_gpu = torch.cuda.is_available()
if is_gpu:
    device = torch.device("cuda")
else:
    device = torch.device("cpu")


# Utility Functions and classes

In [None]:
class Vocabulary:
    def __init__(self):
        self.str_count,self.int_encodding = dict(),dict()
        self.n_chars = 4
        self.str_encodding = {0: "<", 1: ">", 2: "?", 3: "."}

    def addWord(self, word):


        for char in word:
            try:
                self.str_count[char] += 1
            except:
                self.int_encodding[char] = self.n_chars
                self.str_encodding[self.n_chars] = char
                self.str_count[char] = 1
                self.n_chars += 1

def prepareData(dir):
    data = pd.read_csv(dir, sep=DELIMETER, names=[INPUT_LABEL, TARGET_LABEL])

    max_input_length = data[INPUT_LABEL].apply(len).max()
    max_target_length = data[TARGET_LABEL].apply(len).max()
    
    max_len=max(max_input_length,max_target_length)

    input_lang, output_lang = Vocabulary(), Vocabulary()

    pairs = pd.concat([data[INPUT_LABEL], data[TARGET_LABEL]], axis=1).values.tolist()

    for pair in pairs:
        input_lang.addWord(pair[0])
        output_lang.addWord(pair[1])

    return input_lang,output_lang,pairs,max_len


def helpTensor(lang, word, max_length):

    index_list = []
    for char in word:
        try:
            index_list.append(lang.char2index[char])
        except:
            index_list.append(SYMBOL_UNKNOWN)

    indexes = index_list
    indexes.append(SYMBOL_END)
    n = len(indexes)
    indexes.extend([SYMBOL_PADDING] * (max_length - n))
    result = torch.LongTensor(indexes)
    if is_gpu:
        return result.cuda()
    return result

def makeTensor(input_lang, output_lang, pairs, reach):
    res = [(helpTensor(input_lang, pairs[i][0], reach), helpTensor(output_lang, pairs[i][1], reach)) for i in range(len(pairs))]
    return res


def accuracy(encoder, decoder, loader, batch_size, criterion, cell_type, num_layers_enc, max_length, output_lang):
    with torch.no_grad():
        total = correct = 0

        for batch_x, batch_y in loader:
            encoder_hidden = encoder.initHidden(batch_size, num_layers_enc)

            input_variable = Variable(batch_x.transpose(0, 1))
            target_variable = Variable(batch_y.transpose(0, 1))

            if cell_type == LSTM_KEY:
                encoder_cell_state = encoder.initHidden(batch_size, num_layers_enc)
                encoder_hidden = (encoder_hidden, encoder_cell_state)

            output = torch.LongTensor(target_variable.size()[0], batch_size)

            for ei in range(input_variable.size()[0]):
                encoder_hidden = encoder(input_variable[ei], batch_size, encoder_hidden)[1]

            decoder_input = Variable(torch.LongTensor([SYMBOL_BEGIN] * batch_size))
            if is_gpu:
                decoder_input = decoder_input.cuda()

            decoder_hidden = encoder_hidden

            for di in range(target_variable.size()[0]):
                decoder_output, decoder_hidden = decoder(decoder_input, batch_size, decoder_hidden)
                topi = decoder_output.data.topk(1)[1]
                output[di], decoder_input = torch.cat(tuple(topi)), torch.cat(tuple(topi))
            output = output.transpose(0, 1)

            for di in range(output.size()[0]):
                ignore = [SYMBOL_BEGIN, SYMBOL_END, SYMBOL_PADDING]
                sent = [output_lang.str_encodding[letter.item()] for letter in output[di] if letter not in ignore]
                y = [output_lang.str_encodding[letter.item()] for letter in batch_y[di] if letter not in ignore]
                if sent == y:
                    correct += 1
                total += 1

    return (correct / total) * 100


def calc_loss(encoder, decoder, input_tensor, target_tensor, batch_size, encoder_optimizer, decoder_optimizer, criterion, cell_type, num_layers_enc, max_length, is_training, teacher_forcing_ratio=0.5):
    output_hidden = encoder.initHidden(batch_size, num_layers_enc)

    if cell_type == LSTM_KEY:
        encoder_cell_state = encoder.initHidden(batch_size, num_layers_enc)
        output_hidden = (output_hidden, encoder_cell_state)

    encoder_optimizer.zero_grad()
    decoder_optimizer.zero_grad()

    loss = 0

    for ei in range(input_tensor.size(0)):
        output_hidden = encoder(input_tensor[ei], batch_size, output_hidden)[1]

    decoder_input = torch.LongTensor([SYMBOL_BEGIN] * batch_size)
    decoder_input = decoder_input.cuda() if is_gpu else decoder_input

    use_teacher_forcing = True if random.random() < teacher_forcing_ratio else False

    if is_training:
        for di in range(target_tensor.size(0)):
            decoder_output, output_hidden = decoder(decoder_input, batch_size, output_hidden)
            decoder_input = target_tensor[di] if use_teacher_forcing else decoder_output.argmax(dim=1)
            loss = criterion(decoder_output, target_tensor[di]) + loss
    else:
        with torch.no_grad():
            for di in range(target_tensor.size(0)):
                decoder_output, output_hidden = decoder(decoder_input, batch_size, output_hidden)
                loss += criterion(decoder_output, target_tensor[di])
                decoder_input = decoder_output.argmax(dim=1)

    if is_training:
        loss.backward()
        encoder_optimizer.step()
        decoder_optimizer.step()

    return loss.item() / target_tensor.size(0)


def seq2seq(encoder, decoder, train_loader, val_loader, test_loader, lr, optimizer, epochs, max_length_word, num_layers_enc, output_lang,batch_size,cell_type,is_wandb):
    max_length = max_length_word - 1
    encoder_optimizer = optim.NAdam(encoder.parameters(), lr=lr) if optimizer == "nadam" else optim.Adam(encoder.parameters(), lr=lr)
    decoder_optimizer = optim.NAdam(decoder.parameters(), lr=lr) if optimizer == "nadam" else optim.Adam(decoder.parameters(), lr=lr)
    criterion = nn.NLLLoss()

    for epoch in range(epochs):
        train_loss_total = 0
        val_loss_total = 0

        for batch_x, batch_y in train_loader:
            batch_x = Variable(batch_x.transpose(0, 1))
            batch_y = Variable(batch_y.transpose(0, 1))
            loss = calc_loss(encoder, decoder, batch_x, batch_y, batch_size, encoder_optimizer, decoder_optimizer, criterion, cell_type, num_layers_enc, max_length, is_training=True)
            train_loss_total += loss

        train_loss_avg = train_loss_total / len(train_loader)
        print(f"Epoch: {epoch} | Train Loss: {train_loss_avg:.4f} |", end="")

        for batch_x, batch_y in val_loader:
            batch_x = Variable(batch_x.transpose(0, 1))
            batch_y = Variable(batch_y.transpose(0, 1))
            loss = calc_loss(encoder, decoder, batch_x, batch_y, batch_size, encoder_optimizer, decoder_optimizer, criterion, cell_type, num_layers_enc, max_length, is_training=False)
            val_loss_total += loss

        val_loss_avg = val_loss_total / len(val_loader)
        print(f"Val Loss: {val_loss_avg:.4f} |", end="")


        train_acc = accuracy(encoder, decoder, train_loader, batch_size, criterion, cell_type, num_layers_enc, max_length, output_lang)
        train_acc /= 100
        print(f"train Accuracy: {train_acc:.4%} |", end="")

        val_acc = accuracy(encoder, decoder, val_loader, batch_size, criterion, cell_type, num_layers_enc, max_length, output_lang)
        val_acc /= 100
        print(f"Val Accuracy: {val_acc:.4%} |", end="")
        
        test_acc = accuracy(encoder, decoder, test_loader, batch_size, criterion, cell_type, num_layers_enc, max_length, output_lang)
        test_acc /= 100
        print(f"Test Accuracy: {test_acc:.4%}")
        if is_wandb:
            wandb.log(
                {
                    TRAIN_ACCURACY_TITLE: train_acc,
                    VALIDATION_ACCURACY_TITLE: val_acc,
                    TEST_ACCURACY_TITLE: test_acc,
                    TRAIN_LOSS_TITLE: train_loss_avg,
                    VALIDATION_LOSS_TITLE: val_loss_avg,
                    # TEST_LOSS_TITLE: test_loss
                }
            )

            
