In [1]:
import torch
from torch import nn
import pandas as pd
import torch.optim as optim
import torch.nn.functional as F
import copy
from torch.utils.data import Dataset, DataLoader
import random
import wandb
%env WANDB_MODE = disabled

env: WANDB_MODE=disabled


In [2]:
wandb.login(key="b4dc866a06ba17317c20de0d13c1a64cc23096dd")

True

In [3]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [4]:
# File paths
train_csv = "/kaggle/input/dakshina-dataset-hindi/DakshinaDataSet_Hindi/hindi_Train_dataset.csv"
test_csv = "/kaggle/input/dakshina-dataset-hindi/DakshinaDataSet_Hindi/hindi_Test_dataset.csv"
val_csv = "/kaggle/input/dakshina-dataset-hindi/DakshinaDataSet_Hindi/hindi_Validation_dataset.csv"

In [5]:
train_data = pd.read_csv(train_csv, header=None)
train_input = train_data[0].to_numpy()
train_output = train_data[1].to_numpy()
val_data = pd.read_csv(val_csv, header=None)
val_input = val_data[0].to_numpy()
val_output = val_data[1].to_numpy()
test_data = pd.read_csv(test_csv, header=None)
test_input = test_data[0].to_numpy()
test_output = test_data[1].to_numpy()

In [6]:
def pre_processing(train_input, train_output):
    data = {
        "all_characters": [],
        "char_num_map": {},
        "num_char_map": {},
        "source_charToNum": torch.zeros(len(train_input), 30, dtype=torch.int, device=device),
        "source_data": train_input,
        "all_characters_2": [],
        "char_num_map_2": {},
        "num_char_map_2": {},
        "val_charToNum": torch.zeros(len(train_output), 23, dtype=torch.int, device=device),
        "target_data": train_output,
        "source_len": 0,
        "target_len": 0
    }
    k = 0
    for i in range(len(train_input)):
        train_input[i] = "{" + train_input[i] + "}" * (29 - len(train_input[i]))
        charToNum = []
        for char in train_input[i]:
            if char not in data["all_characters"]:
                data["all_characters"].append(char)
                index = data["all_characters"].index(char)
                data["char_num_map"][char] = index
                data["num_char_map"][index] = char
            else:
                index = data["all_characters"].index(char)
            charToNum.append(index)
        my_tensor = torch.tensor(charToNum, device=device)
        data["source_charToNum"][k] = my_tensor

        charToNum1 = []
        train_output[i] = "{" + train_output[i] + "}" * (22 - len(train_output[i]))
        for char in train_output[i]:
            if char not in data["all_characters_2"]:
                data["all_characters_2"].append(char)
                index = data["all_characters_2"].index(char)
                data["char_num_map_2"][char] = index
                data["num_char_map_2"][index] = char
            else:
                index = data["all_characters_2"].index(char)
            charToNum1.append(index)
        my_tensor1 = torch.tensor(charToNum1, device=device)
        data["val_charToNum"][k] = my_tensor1
        k += 1

    data["source_len"] = len(data["all_characters"])
    data["target_len"] = len(data["all_characters_2"])
    return data

In [7]:
data = pre_processing(copy.copy(train_input), copy.copy(train_output))

In [8]:
def pre_processing_validation(val_input, val_output):
    data2 = {
        "all_characters": [],
        "char_num_map": {},
        "num_char_map": {},
        "source_charToNum": torch.zeros(len(val_input), 30, dtype=torch.int, device=device),
        "source_data": val_input,
        "all_characters_2": [],
        "char_num_map_2": {},
        "num_char_map_2": {},
        "val_charToNum": torch.zeros(len(val_output), 23, dtype=torch.int, device=device),
        "target_data": val_output,
        "source_len": 0,
        "target_len": 0
    }
    k = 0
    m1 = data["char_num_map"]
    m2 = data["char_num_map_2"]
    for i in range(len(val_input)):
        val_input[i] = "{" + val_input[i] + "}" * (29 - len(val_input[i]))
        charToNum = []
        for char in val_input[i]:
            if char not in data2["all_characters"]:
                data2["all_characters"].append(char)
                index = m1[char]
                data2["char_num_map"][char] = index
                data2["num_char_map"][index] = char
            else:
                index = m1[char]
            charToNum.append(index)
        my_tensor = torch.tensor(charToNum, device=device)
        data2["source_charToNum"][k] = my_tensor

        charToNum1 = []
        val_output[i] = "{" + val_output[i] + "}" * (22 - len(val_output[i]))
        for char in val_output[i]:
            if char not in data2["all_characters_2"]:
                data2["all_characters_2"].append(char)
                index = m2[char]
                data2["char_num_map_2"][char] = index
                data2["num_char_map_2"][index] = char
            else:
                index = m2[char]
            charToNum1.append(index)
        my_tensor1 = torch.tensor(charToNum1, device=device)
        data2["val_charToNum"][k] = my_tensor1
        k += 1

    data2["source_len"] = len(data2["all_characters"])
    data2["target_len"] = len(data2["all_characters_2"])
    return data2

In [9]:
data2 = pre_processing_validation(copy.copy(val_input), copy.copy(val_output))
data_test = pre_processing_validation(copy.copy(test_input), copy.copy(test_output))

In [10]:
data_test.keys()

dict_keys(['all_characters', 'char_num_map', 'num_char_map', 'source_charToNum', 'source_data', 'all_characters_2', 'char_num_map_2', 'num_char_map_2', 'val_charToNum', 'target_data', 'source_len', 'target_len'])

In [11]:
class MyDataset(Dataset):
    def __init__(self, x, y):
        self.source = x
        self.target = y

    def __len__(self):
        return len(self.source)

    def __getitem__(self, idx):
        source_data = self.source[idx]
        target_data = self.target[idx]
        return source_data, target_data

def validationAccuracy(encoder, decoder, batchsize, tf_ratio, cellType, bidirection):
    dataLoader = dataLoaderFun("validation", batchsize)
    encoder.eval()
    decoder.eval()
    total_sequences = 0
    total_correct_sequences = 0
    total_char_matches = 0
    total_characters = 0
    total_loss = 0

    lossFunction = nn.NLLLoss()

    for source_batch, target_batch in dataLoader:
        actual_batch_size = source_batch.shape[0]
        total_sequences += actual_batch_size
        total_characters += target_batch.numel()

        encoder_initial_state = encoder.getInitialState(actual_batch_size)
        if bidirection == "Yes":
            reversed_batch = torch.flip(source_batch, dims=[1])
            source_batch = (source_batch + reversed_batch) // 2
        if cellType == 'LSTM':
            encoder_initial_state = (encoder_initial_state, encoder.getInitialState(actual_batch_size))

        encoder_states, _ = encoder(source_batch, encoder_initial_state)
        decoder_current_state = encoder_states[-1, :, :, :]
        encoder_final_layer_states = encoder_states[:, -1, :, :]
        output_seq_len = target_batch.shape[1]

        loss = 0
        decoder_actual_output = []
        randNumber = random.random()

        for i in range(output_seq_len):
            if i == 0:
                decoder_current_input = torch.full((actual_batch_size, 1), 0, device=device)
            else:
                if randNumber < tf_ratio:
                    decoder_current_input = target_batch[:, i].reshape(actual_batch_size, 1)
                else:
                    decoder_current_input = decoder_current_input.reshape(actual_batch_size, 1)
            decoder_output, decoder_current_state, _ = decoder(decoder_current_input, decoder_current_state, encoder_final_layer_states)
            topv, topi = decoder_output.topk(1)
            decoder_current_input = topi.squeeze().detach()
            decoder_actual_output.append(decoder_current_input)

            decoder_output = decoder_output[:, -1, :]
            curr_target_chars = target_batch[:, i].long()
            loss += lossFunction(decoder_output, curr_target_chars)

        total_loss += loss.item() / output_seq_len
        decoder_actual_output = torch.cat(decoder_actual_output, dim=0).reshape(output_seq_len, actual_batch_size).transpose(0, 1)
        total_correct_sequences += (decoder_actual_output == target_batch).all(dim=1).sum().item()
        total_char_matches += (decoder_actual_output == target_batch).sum().item()

    encoder.train()
    decoder.train()

    wandb.log({
        'validation_loss': total_loss / len(dataLoader),
        'validation_accuracy': total_correct_sequences / total_sequences,
        'validation_char_accuracy': total_char_matches / total_characters
    })
    return total_correct_sequences / total_sequences

In [12]:
# class Attention(nn.Module):
#     def __init__(self, hidden_size):
#         super(Attention, self).__init__()
#         self.Wa = nn.Linear(hidden_size, hidden_size)
#         self.Ua = nn.Linear(hidden_size, hidden_size)
#         self.Va = nn.Linear(hidden_size, 1)

#     def forward(self, query, keys):
#         scores = self.Va(torch.tanh(self.Wa(query) + self.Ua(keys)))
#         scores = scores.squeeze().unsqueeze(1)
#         weights = F.softmax(scores, dim=0)
#         weights = weights.permute(2, 1, 0)
#         keys = keys.permute(1, 0, 2)
#         context = torch.bmm(weights, keys)
#         return context, weights

In [13]:
# class Encoder(nn.Module):
#     def __init__(self, inputDim, embSize, encoderLayers, hiddenLayerNuerons, cellType, batch_size):
#         super(Encoder, self).__init__()
#         self.embedding = nn.Embedding(inputDim, embSize)
#         self.encoderLayers = encoderLayers
#         self.hiddenLayerNuerons = hiddenLayerNuerons
#         self.cellType = cellType
#         self.num_directions = 2 if bidirection == "Yes" else 1
        
#         if cellType == 'GRU':
#             self.rnn = nn.GRU(embSize, hiddenLayerNuerons, num_layers=encoderLayers, batch_first=True)
#         elif cellType == 'RNN':
#             self.rnn = nn.RNN(embSize, hiddenLayerNuerons, num_layers=encoderLayers, batch_first=True)
#         else:
#             self.rnn = nn.LSTM(embSize, hiddenLayerNuerons, num_layers=encoderLayers, batch_first=True)

#     def forward(self, sourceBatch, encoderCurrState):
#         sequenceLength = sourceBatch.shape[1]
#         batch_size = sourceBatch.shape[0]
#         encoderStates = torch.zeros(sequenceLength, self.encoderLayers, batch_size, self.hiddenLayerNuerons, device=device)
#         for i in range(sequenceLength):
#             currInput = sourceBatch[:, i].reshape(batch_size, 1)
#             _, encoderCurrState = self.statesCalculation(currInput, encoderCurrState)
#             if self.cellType == 'LSTM':
#                 encoderStates[i] = encoderCurrState[1]
#             else:
#                 encoderStates[i] = encoderCurrState
#         return encoderStates, encoderCurrState
        
#     def initHidden(self, batch_size=1):
#         h0 = torch.zeros(self.encoderLayers * self.num_directions,
#                          batch_size,
#                          self.hiddenLayerNuerons,
#                          device=device)
#         if isinstance(self.rnn, nn.LSTM):
#             c0 = torch.zeros_like(h0)
#             return (h0, c0)
#         else:
#             return h0
#     def statesCalculation(self, currentInput, prevState):
#         embdInput = self.embedding(currentInput)
#         output, prev_state = self.rnn(embdInput, prevState)
#         return output, prev_state

#     def getInitialState(self, batch_size):
#         return torch.zeros(self.encoderLayers, batch_size, self.hiddenLayerNuerons, device=device)

In [14]:
# class Decoder(nn.Module):
#     def __init__(self, outputDim, embSize, hiddenLayerNuerons, decoderLayers, cellType, dropout_p):
#         super(Decoder, self).__init__()
#         self.embedding = nn.Embedding(outputDim, embSize)
#         self.cellType = cellType
#         if cellType == 'GRU':
#             self.rnn = nn.GRU(embSize + hiddenLayerNuerons, hiddenLayerNuerons, num_layers=decoderLayers, batch_first=True)
#         elif cellType == 'RNN':
#             self.rnn = nn.RNN(embSize + hiddenLayerNuerons, hiddenLayerNuerons, num_layers=decoderLayers, batch_first=True)
#         else:
#             self.rnn = nn.LSTM(embSize + hiddenLayerNuerons, hiddenLayerNuerons, num_layers=decoderLayers, batch_first=True)
#         self.fc = nn.Linear(hiddenLayerNuerons, outputDim)
#         self.softmax = nn.LogSoftmax(dim=2)
#         self.dropout = nn.Dropout(dropout_p)
#         self.attention = Attention(hiddenLayerNuerons).to(device)

#     def forward(self, current_input, prev_state, encoder_final_layers):
#         if self.cellType == 'LSTM':
#             context, attn_weights = self.attention(prev_state[1][-1, :, :], encoder_final_layers)
#         else:
#             context, attn_weights = self.attention(prev_state[-1, :, :], encoder_final_layers)
#         embd_input = self.embedding(current_input)
#         curr_embd = F.relu(embd_input)
#         input_gru = torch.cat((curr_embd, context), dim=2)
#         output, prev_state = self.rnn(input_gru, prev_state)
#         output = self.dropout(output)
#         output = self.softmax(self.fc(output))
#         return output, prev_state, attn_weights


In [15]:
#From ChatGPT
import torch.nn.functional as F

class Attention(nn.Module):
    def __init__(self, hidden_size):
        super(Attention, self).__init__()
        self.Wa = nn.Linear(hidden_size, hidden_size)
        self.Ua = nn.Linear(hidden_size, hidden_size)
        self.Va = nn.Linear(hidden_size, 1)

    def forward(self, query, keys):
        """
        query: (batch, hidden_size)
        keys:  (seq_len, batch, hidden_size)
        returns:
            context: (batch, 1, hidden_size)
            weights: (batch, 1, seq_len)
        """
        # reshape keys to (batch, seq_len, hidden_size)
        keys_trans = keys.permute(1, 0, 2)            # (batch, seq_len, hidden)

        # expand query to (batch, seq_len, hidden_size)
        query_expanded = query.unsqueeze(1).repeat(1, keys_trans.size(1), 1)

        # compute score with additive attention
        scores = self.Va(torch.tanh(self.Wa(query_expanded) + self.Ua(keys_trans)))  # (batch, seq_len, 1)
        scores = scores.squeeze(2).unsqueeze(1)    # (batch, 1, seq_len)

        # normalize to obtain attention weights
        weights = F.softmax(scores, dim=2)         # (batch, 1, seq_len)

        # compute context vector as weighted sum
        context = torch.bmm(weights, keys_trans)   # (batch, 1, hidden)
        return context, weights

class Encoder(nn.Module):
    def __init__(self,
                 inputDim: int,
                 embSize: int,
                 encoderLayers: int,
                 hiddenLayerNuerons: int,
                 cellType: str = 'LSTM',
                 bidirectional: bool = False,
                 dropout: float = 0.0):
        super(Encoder, self).__init__()
        self.embedding = nn.Embedding(inputDim, embSize)
        self.cell_type = cellType.upper()
        self.num_layers = encoderLayers
        self.hidden_size = hiddenLayerNuerons
        self.bidirectional = bidirectional
        self.num_directions = 2 if bidirectional else 1

        rnn_kwargs = {
            'input_size': embSize,
            'hidden_size': hiddenLayerNuerons,
            'num_layers': encoderLayers,
            'batch_first': True,
            'dropout': dropout if encoderLayers > 1 else 0.0,
            'bidirectional': bidirectional
        }
        if self.cell_type == 'GRU':
            self.rnn = nn.GRU(**rnn_kwargs)
        elif self.cell_type == 'RNN':
            self.rnn = nn.RNN(**rnn_kwargs)
        else:
            self.rnn = nn.LSTM(**rnn_kwargs)

    def forward(self, src, hidden):
        # src: (batch, seq_len)
        emb = self.embedding(src)                     # (batch, seq_len, emb_size)
        outputs, hidden = self.rnn(emb, hidden)       # outputs: (batch, seq_len, hidden*directions)

        # prepare encoder states for attention
        # convert to (seq_len, batch, hidden*directions)
        encoder_states = outputs.permute(1, 0, 2)     # (seq_len, batch, hidden*directions)
        return encoder_states, hidden

    def initHidden(self, batch_size: int, device: torch.device):
        num = self.num_layers * self.num_directions
        h0 = torch.zeros(num, batch_size, self.hidden_size, device=device)
        if isinstance(self.rnn, nn.LSTM):
            c0 = torch.zeros_like(h0)
            return (h0, c0)
        return h0

class Decoder(nn.Module):
    def __init__(self,
                 outputDim: int,
                 embSize: int,
                 hiddenLayerNuerons: int,
                 decoderLayers: int,
                 cellType: str = 'LSTM',
                 dropout_p: float = 0.0):
        super(Decoder, self).__init__()
        self.embedding = nn.Embedding(outputDim, embSize)
        self.cell_type = cellType.upper()
        self.hidden_size = hiddenLayerNuerons
        self.num_layers = decoderLayers

        rnn_kwargs = {
            'input_size': embSize + hiddenLayerNuerons,
            'hidden_size': hiddenLayerNuerons,
            'num_layers': decoderLayers,
            'batch_first': True,
            'dropout': dropout_p if decoderLayers > 1 else 0.0
        }
        if self.cell_type == 'GRU':
            self.rnn = nn.GRU(**rnn_kwargs)
        elif self.cell_type == 'RNN':
            self.rnn = nn.RNN(**rnn_kwargs)
        else:
            self.rnn = nn.LSTM(**rnn_kwargs)

        self.attention = Attention(hiddenLayerNuerons)
        self.fc = nn.Linear(hiddenLayerNuerons, outputDim)
        self.softmax = nn.LogSoftmax(dim=2)
        self.dropout = nn.Dropout(dropout_p)

    def forward(self, input_step, hidden, encoder_states):
        # input_step: (batch, 1)
        # hidden:    (h_n, c_n) or h_n
        # encoder_states: (seq_len, batch, hidden)

        # get context from attention
        if isinstance(hidden, tuple):  # LSTM
            query = hidden[0][-1]  # take last layer's hidden state (batch, hidden)
        else:
            query = hidden[-1]     # (batch, hidden)
        context, attn_weights = self.attention(query, encoder_states)

        # embed input and concat with context
        emb = self.embedding(input_step)              # (batch, 1, emb_size)
        emb = F.relu(emb)
        rnn_input = torch.cat((emb, context), dim=2)  # (batch, 1, emb_size+hidden)

        output, hidden = self.rnn(rnn_input, hidden)
        output = self.dropout(output)                 # (batch, 1, hidden)
        prediction = self.softmax(self.fc(output))  # (batch, 1, output_dim)
        return prediction, hidden, attn_weights


In [16]:
def dataLoaderFun(dataName, batch_size):
    if dataName == 'train':
        dataset = MyDataset(data["source_charToNum"], data['val_charToNum'])
        return DataLoader(dataset, batch_size=batch_size, shuffle=True)
    else:
        dataset = MyDataset(data2["source_charToNum"], data2['val_charToNum'])
        return DataLoader(dataset, batch_size=batch_size, shuffle=True)

def train(embSize, encoderLayers, decoderLayers, hiddenLayerNuerons, cellType, bidirection, dropout, epochs, batchsize, learningRate, optimizer, tf_ratio):
    dataLoader = dataLoaderFun("train", batchsize)
    encoder = Encoder(data["source_len"], embSize, encoderLayers, hiddenLayerNuerons, cellType, batchsize).to(device)
    decoder = Decoder(data["target_len"], embSize, hiddenLayerNuerons,decoderLayers, cellType, dropout).to(device)
    if optimizer == 'Adam':
        encoderOptimizer = optim.Adam(encoder.parameters(), lr=learningRate)
        decoderOptimizer = optim.Adam(decoder.parameters(), lr=learningRate)
    else:
        encoderOptimizer = optim.NAdam(encoder.parameters(), lr=learningRate)
        decoderOptimizer = optim.NAdam(decoder.parameters(), lr=learningRate)
    lossFunction = nn.NLLLoss()
    best_val_acc =0.0
    for epoch in range(epochs):
        train_accuracy = 0
        train_loss = 0
        for batch_num, (source_batch, target_batch) in enumerate(dataLoader):
            actual_batch_size = source_batch.shape[0]
            encoder_initial_state = encoder.getInitialState(actual_batch_size)
            if bidirection == "Yes":
                reversed_batch = torch.flip(source_batch, dims=[1])
                source_batch = (source_batch + reversed_batch) // 2
            if cellType == 'LSTM':
                encoder_initial_state = (encoder_initial_state, encoder.getInitialState(actual_batch_size))
            encoder_states, dummy = encoder(source_batch, encoder_initial_state)
            decoder_current_state = dummy
            encoder_final_layer_states = encoder_states[:, -1, :, :]
            loss = 0
            output_seq_len = target_batch.shape[1]
            decoder_actual_output = []
            randNumber = random.random()
            for i in range(output_seq_len):
                if i == 0:
                    decoder_current_input = torch.full((actual_batch_size, 1), 0, device=device)
                else:
                    if randNumber < tf_ratio:
                        decoder_current_input = target_batch[:, i].reshape(actual_batch_size, 1)
                    else:
                        decoder_current_input = decoder_current_input.reshape(actual_batch_size, 1)
                decoder_output, decoder_current_state, _ = decoder(decoder_current_input, decoder_current_state, encoder_final_layer_states)
                topv, topi = decoder_output.topk(1)
                decoder_current_input = topi.squeeze().detach()
                decoder_actual_output.append(decoder_current_input)
                decoder_output = decoder_output[:, -1, :]
                curr_target_chars = target_batch[:, i].type(dtype=torch.long)
                loss += (lossFunction(decoder_output, curr_target_chars))
            decoder_actual_output = torch.cat(decoder_actual_output, dim=0).reshape(output_seq_len, actual_batch_size).transpose(0, 1)
            train_accuracy += (decoder_actual_output == target_batch).all(dim=1).sum().item()
            train_loss += (loss.item() / output_seq_len)
            encoderOptimizer.zero_grad()
            decoderOptimizer.zero_grad()
            loss.backward()
            encoderOptimizer.step()
            decoderOptimizer.step()
        
        #Logging train metrics here
        wandb.log({'train_accuracy': train_accuracy / len(data["source_charToNum"])})
        wandb.log({'train_loss': train_loss / len(dataLoader)})

        val_acc = validationAccuracy(encoder, decoder, batchsize, tf_ratio, cellType, bidirection)
        if( val_acc > best_val_acc ):
            best_val_acc = val_acc
            torch.save({
                'encoder_state_dict': encoder.state_dict(),
                'decoder_state_dict': decoder.state_dict(),
                }, "best_model.pth")
            print(checkpoint['decoder_state_dict'].keys())
            print(f"New best model saved with accuracy: { best_val_acc:.4f}")

def numToCharConverter(inputArray, outputArray, data):
    mp = data['num_char_map_2']
    for row1, row2 in zip(inputArray, outputArray):
        t1 = ''.join([mp[e1.item()] for e1 in row1])
        t2 = ''.join([mp[e2.item()] for e2 in row2])

In [17]:
# #Train Model
# def train_model():
#     # Initialize wandb run first
#     with wandb.init(project='CS23S025-Assignment-3-DL') as run:
#         config = wandb.config

#         # Dynamically name the run after initialization
#         run.name = f"embedding{config.embSize}_cellType{config.cellType}_batchSize{config.batchsize}"
#         # Call your training logic
#         train(
#              embSize=config.embSize,
#             encoderLayers=config.encoderLayers,
#             decoderLayers=config.decoderLayers,
#             hiddenLayerNuerons=config.hiddenLayerNuerons,
#             cellType=config.cellType,
#             bidirection=config.bidirection,
#             dropout=config.dropout,
#             epochs=config.epochs,
#             batchsize=config.batchsize,
#             learningRate=config.learningRate,
#             optimizer=config.optimizer,
#             tf_ratio=config.tf_ratio
#         )

# # Define sweep configuration
# sweep_config = {
#     'method': 'bayes',
#     'name': 'BestModel_WithAttntion',
#     'metric': {
#         'goal': 'maximize',
#         'name': 'validation_accuracy',
#     },
#     'parameters': {
#         'embSize': {'values': [256]},
#         'encoderLayers': {'values': [1]},
#         'decoderLayers': {'values': [15]},
#         'hiddenLayerNuerons': {'values': [512]},
#         'cellType': {'values': ['GRU']},
#         'bidirection': {'values': ['no']},
#         'dropout': {'values': [0.1]},
#         'epochs': {'values': [1]},
#         'batchsize': {'values': [32]},
#         'learningRate': {'values': [1e-4]},
#         'optimizer': {'values': ['Nadam']},
#         'tf_ratio': {'values': [0.7]}
#     }
# }

# # Create the sweep
# sweep_id = wandb.sweep(sweep=sweep_config, project='CS23S025-Assignment-3-DL')

# # Launch the sweep agent (make sure to update the entity if needed)
# wandb.agent(sweep_id=sweep_id,
#     function=train_model,
#     count=1,  # or however many runs you want
#     entity="cs23s025-indian-institute-of-technology-madras",
#     project="CS23S025-Assignment-3-DL"
# )


In [18]:
char_to_num_target = data['char_num_map_2']
num_to_char_target = data['num_char_map_2']
char_to_num_source = data['char_num_map']
num_to_char_source = data['num_char_map']


# Define model parameters
embSize = 256
encoderLayers = 1
decoderLayers = 1
hiddenLayerNuerons = 512
cellType = "GRU"
bidirection = 'no'
dropout = 0.1
epochs = 25
batchsize = 32
learningRate = 0.0001
optimizer = 'Nadam'
tf_ratio = 0.7

# Define the device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Initialize models
encoder = Encoder(
    inputDim=data["source_len"],
    embSize=embSize,
    encoderLayers=encoderLayers,
    hiddenLayerNuerons=hiddenLayerNuerons,
    cellType=cellType
).to(device)

decoder = Decoder(
    outputDim=data["target_len"],
    embSize=embSize,
    hiddenLayerNuerons=hiddenLayerNuerons,
    decoderLayers=decoderLayers,
    cellType=cellType,
    dropout_p=dropout
).to(device)

# Load the best model
checkpoint = torch.load("/kaggle/input/attention_bestmodel/pytorch/default/1/best_model_attn.pth")
print(checkpoint['decoder_state_dict'].keys())
encoder.load_state_dict(checkpoint['encoder_state_dict'])
decoder.load_state_dict(checkpoint['decoder_state_dict'])
encoder.eval()
decoder.eval()

test_dataset = MyDataset(data_test["source_charToNum"], data_test["val_charToNum"])
test_loader  = DataLoader(test_dataset, batch_size=1, shuffle=False)

max_target_len = 23  

correct = 0
total   = 0

odict_keys(['embedding.weight', 'rnn.weight_ih_l0', 'rnn.weight_hh_l0', 'rnn.bias_ih_l0', 'rnn.bias_hh_l0', 'fc.weight', 'fc.bias', 'attention.Wa.weight', 'attention.Wa.bias', 'attention.Ua.weight', 'attention.Ua.bias', 'attention.Va.weight', 'attention.Va.bias'])


In [19]:
src_stoi = data['char_num_map']
src_itos = data['num_char_map']
tgt_stoi = data['char_num_map_2']
tgt_itos = data['num_char_map_2']

In [96]:
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
import numpy as np
import torch # Assuming your weights might be torch tensors

# Helper function to determine text color (black or white) based on background brightness
def get_text_color(bg_hex_color):
    """
    Determines if text should be black or white for good contrast against a given background color.
    Args:
        bg_hex_color (str): Background color in HEX format (e.g., "#RRGGBB").
    Returns:
        str: "black" or "white".
    """
    bg_hex_color = bg_hex_color.lstrip('#')
    r = int(bg_hex_color[0:2], 16)
    g = int(bg_hex_color[2:4], 16)
    b = int(bg_hex_color[4:6], 16)
    # Calculate luminance using the standard formula
    luminance = (0.299 * r + 0.587 * g + 0.114 * b) / 255.0
    return 'black' if luminance > 0.5 else 'white'

def visualize_attention_to_html(word_tokens, attention_weights, cmap_name='YlOrRd'):
    """
    Generates an HTML string to visualize attention weights on a sequence of word tokens.

    Args:
        word_tokens (list of str): The list of input tokens (e.g., characters of an English word).
        attention_weights (list, np.array, or torch.Tensor): 
            Attention weights for each token. Should be 1D. Values are expected
            to be in the [0, 1] range (e.g., output from a softmax).
        cmap_name (str): Name of the matplotlib colormap to use (e.g., 'viridis', 'YlOrRd', 'Blues').

    Returns:
        str: An HTML string for display.
    """
    if len(word_tokens) != len(attention_weights):
        raise ValueError(f"Length of word_tokens ({len(word_tokens)}) and attention_weights ({len(attention_weights)}) must be the same.")

    # Convert to NumPy array if it's a PyTorch tensor
    if isinstance(attention_weights, torch.Tensor):
        weights = attention_weights.cpu().squeeze().numpy() # Ensure it's 1D and on CPU
    else:
        weights = np.array(attention_weights).flatten() # Ensure it's 1D

    if weights.ndim != 1:
        raise ValueError(f"attention_weights must be 1D, but got {weights.ndim} dimensions.")
    
    # Get the chosen colormap
    cmap = plt.get_cmap(cmap_name)
    
    html_parts = []
    for token, weight in zip(word_tokens, weights):
        # Map weight to color; cmap expects values in [0,1]
        # Softmax output is naturally in this range.
        # If weights are very concentrated, consider normalizing for better visual distinction:
        # normalized_weight = (weight - weights.min()) / (weights.max() - weights.min() + 1e-9) # if needed
        # For now, we assume 'weight' is directly usable.
        rgba_color = cmap(weight) 
        
        # Convert RGBA (0-1 range) to HEX for HTML background
        bg_hex_color = mcolors.rgb2hex(rgba_color[:3]) # Use RGB part, ignore Alpha for background
        
        # Determine appropriate text color for readability
        text_color = get_text_color(bg_hex_color)
        
        # Escape HTML special characters in token (important if tokens can contain <, >, &)
        # For simple characters, this might not be strictly necessary.
        safe_token = token.replace('&', '&amp;').replace('<', '&lt;').replace('>', '&gt;')
        
        html_parts.append(
            f'<span style="background-color: {bg_hex_color}; color: {text_color}; padding: 3px 2px; margin: 1px; border-radius: 3px; font-family: monospace;">{safe_token}</span>'
        )
    
    return "".join(html_parts)

In [97]:
data['source_data'][28]

'{anguthiyon}}}}}}}}}}}}}}}}}}}'

In [None]:
ishaara
ishita


In [157]:
english_word_str = "ishat"
idx = 431
data_test['source_data'][idx][1:8]

'ishat}}'

In [158]:
input_sequence = data_test['source_charToNum'][idx].to(device)
input_sequence = input_sequence.unsqueeze(0)

# Initialize encoder hidden state
encoder_hidden = encoder.initHidden(1, device)

# Encode the input sequence
encoder_states, encoder_hidden = encoder(input_sequence, encoder_hidden)

# Prepare for decoding
decoder_input = torch.tensor([[0]], device=device) # Start with SOS token
decoder_hidden = encoder_hidden # Initial decoder hidden state is the final encoder hidden state

# List to store attention weights for each decoder step
all_attention_weights = []
decoded_words = []

# Decoding loop
for di in range(max_target_len): # Or until EOS token is predicted
    # decoder_output: (batch, 1, output_dim)
    # decoder_hidden: (h_n, c_n) or h_n
    # attn_weights: (batch, 1, seq_len)
    decoder_output, decoder_hidden, attn_weights = decoder(
        decoder_input, decoder_hidden, encoder_states
    )

    # Store the attention weights for this time step
    # Squeeze to remove the batch dimension (if batch_size=1) and the decoder output token dimension
    all_attention_weights.append(attn_weights.squeeze(0)) # Shape will be (1, seq_len) for batch_size=1

    # Get the most likely next token
    topv, topi = decoder_output.topk(1)
    decoder_input = topi.squeeze(2).detach() # Detach from graph

    # For demonstration, let's just print the predicted token (you'd convert it back to character)
    # In a real scenario, you'd convert topi to the actual character/word
    # and check for EOS token
    predicted_token = topi.item()
    decoded_words.append(predicted_token)

    if predicted_token == 3:
        break

print(f"Decoded tokens: {decoded_words}")
print(f"Number of attention steps: {len(all_attention_weights)}")

# Concatenate all attention weights
# If all_attention_weights has elements of shape (1, seq_len), stacking them will give (num_decoder_steps, 1, seq_len)
# If you want (num_decoder_steps, seq_len), use squeeze(1)
final_attention_matrix = torch.cat(all_attention_weights, dim=0)
print(f"Final attention weights shape: {final_attention_matrix.shape}") # Should be (num_decoded_steps, 1, encoder_seq_len) if batch_size=1
# Or (num_decoded_steps, encoder_seq_len) if you squeeze(1) before append

# To visualize or analyze:
# final_attention_matrix will have a shape like (Length of Hindi word, 1, Length of English word)
# You can then visualize this matrix (e.g., using matplotlib's imshow)
# where rows correspond to Hindi output characters and columns to English input characters.
# The values in the matrix represent the attention strength.
tgt = []
for d in decoded_words:
    tgt.append(data['num_char_map_2'][d])

tgt = ''.join(tgt)

Decoded tokens: [0, 54, 12, 17, 28, 3]
Number of attention steps: 6
Final attention weights shape: torch.Size([6, 30])


In [160]:
# Example Usage:

# Assume 'english_word' is your input

input_tokens = list(english_word_str) # ['m', 'o', 'd', 'e', 'l']

source_word_tokens = english_word_str
num_decoding_steps = 4 # Imagine we are generating 4 target characters

# print(f"\n--- Simulating attention visualization during decoding for '{''.join(source_word_tokens)}' ---")
for t_step in range(1, max_target_len-2):
    # In a real scenario, these weights would come from your decoder at each step
    # Generating random weights for demonstration
    # simulated_weights = torch.softmax(torch.rand(len(source_word_tokens)), dim=0)
    attn_weights = final_attention_matrix[t_step][:len(source_word_tokens)].detach()
    html_step_output = visualize_attention_to_html(source_word_tokens, attn_weights, cmap_name='Oranges')
    if tgt[t_step] == '}':
        break
    print(f"Attention visualization for the character: {tgt[t_step]}")
    display(HTML(html_step_output))

Attention visualization for the character: इ


Attention visualization for the character: श


Attention visualization for the character: ्


Attention visualization for the character: ट


In [147]:
final_attention_matrix[0]

tensor([3.7762e-05, 2.8499e-04, 9.0696e-04, 4.9548e-02, 2.8051e-01, 4.0142e-02,
        3.7268e-01, 1.5843e-01, 4.7167e-02, 1.1484e-02, 3.8858e-03, 2.2402e-03,
        1.8458e-03, 1.7505e-03, 1.7393e-03, 1.7517e-03, 1.7686e-03, 1.7844e-03,
        1.7979e-03, 1.8092e-03, 1.8187e-03, 1.8267e-03, 1.8336e-03, 1.8395e-03,
        1.8446e-03, 1.8491e-03, 1.8530e-03, 1.8564e-03, 1.8593e-03, 1.8619e-03],
       device='cuda:0', grad_fn=<SelectBackward0>)

In [None]:
import torch
import torch.nn.functional as F

def transliterate(word: str,
                  encoder: nn.Module,
                  decoder: nn.Module,
                  src_stoi,
                  src_itos,
                  tgt_stoi,
                  tgt_itos,
                  device: torch.device,
                  max_len: int = 30) -> str:
    """
    Greedy transliteration of `word` (a string of source chars) 
    into target script characters.

    
    """

    # 1) Source → indices (no <sos>/<eos> on encoder side)
    # src_idxs = [src_stoi[ch] for ch in word]
    # src_tensor = torch.LongTensor(src_idxs).unsqueeze(0).to(device)  # (1, seq_len)
    # 2) Run through encoder
    encoder_hidden = encoder.initHidden(batch_size=1, device=device)             # hidden init
    encoder_states, encoder_hidden = encoder(word, encoder_hidden)
    # encoder_states: (seq_len, num_layers, 1, hidden_size)
    # encoder_hidden: final hidden state(s)

    # 3) Prepare decoder input (start with <sos>)
    sos_idx = tgt_stoi['{']
    eos_idx = tgt_stoi['}']
    dec_input = torch.LongTensor([[sos_idx]]).to(device)         # (1, 1)
    dec_hidden = encoder_hidden                                  # seed decoder
    
    # 4) Greedy decode loop
    output_chars = []
    for _ in range(max_len):
        # decoder returns log-probs over outputDim
        dec_out, dec_hidden, attn_weights = decoder(dec_input, dec_hidden, encoder_states)
        # dec_out: (1, 1, outputDim)
        
        # pick highest-prob token
        top1 = dec_out.argmax(2)         # (1,1)
        token_idx = top1.item()
        if token_idx == eos_idx:
            break
        output_chars.append(tgt_itos[token_idx])
        
        # next input is current prediction
        dec_input = top1

    return ''.join(output_chars)

# Example usage:
encoder.to(device); decoder.to(device)
word = data['source_charToNum'][2].unsqueeze(0)
print(transliterate(word, encoder, decoder, src_stoi, src_itos, tgt_stoi, tgt_itos, device))


In [None]:
#New
def get_connectivity_fn(encoder, decoder, word, tgt_stoi, device, max_len = 30):

    # encoder.eval() # Set encoder to evaluation mode
    # decoder.eval() # Set decoder to evaluation mode

    gradient_list = []
    batch_size = 1 

    encoder_hidden = encoder.initHidden(batch_size, device)
    if isinstance(encoder_hidden, tuple): # LSTM
        encoder_hidden = (encoder_hidden[0].to(device), enc_state_init[1].to(device))
    else: # GRU/RNN
        encoder_hidden = encoder_hidden.to(device)

    # Embed encoder input
    # Shape: (1, max_input_len, emb_dim)
    embedded_in = encoder.embedding(word)
    embedded_in.requires_grad_(True) # Crucial for gradient calculation

    # Get encoder outputs
    # enc_out shape: (1, max_input_len, enc_hidden_size)
    # enc_state_final: final encoder hidden state (tuple for LSTM)
    outputs, encoder_hidden = encoder.rnn(embedded_in, encoder_hidden)

    # prepare encoder states for attention
    # convert to (seq_len, batch, hidden*directions)
    encoder_states = outputs.permute(1, 0, 2)     # (seq_len, batch, hidden*directions)
    # --- Decoder Pass (Iterative) ---
    sos_idx = tgt_stoi['{']
    eos_idx = tgt_stoi['}']
    dec_input = torch.LongTensor([[sos_idx]]).to(device)         # (1, 1)
    dec_hidden = encoder_hidden                                  # seed decoder

    # 4) Greedy decode loop
    output_chars = []
    for s in range(max_len):
        print(f'This is {s}th iteration')
        # decoder returns log-probs over outputDim
        # dec_out, dec_hidden, attn_weights = decoder(dec_input, dec_hidden, encoder_states)
        # dec_out: (1, 1, outputDim)
        # get context from attention
        if isinstance(dec_hidden, tuple):  # LSTM
            query = dec_hidden[0][-1]  # take last layer's hidden state (batch, hidden)
        else:
            query = dec_hidden[-1]     # (batch, hidden)
        context, attn_weights = decoder.attention(query, encoder_states)

        # embed input and concat with context
        emb = decoder.embedding(dec_input)              # (batch, 1, emb_size)
        emb = F.relu(emb)
        rnn_input = torch.cat((emb, context), dim=2)  # (batch, 1, emb_size+hidden)

        output, hidden = decoder.rnn(rnn_input, dec_hidden)
        bef_output = decoder.dropout(output)                 # (batch, 1, hidden)
        dec_out = decoder.softmax(decoder.fc(output))  # (batch, 1, output_dim)
        # return prediction, hidden, attn_weights

        # pick highest-prob token
        top1 = dec_out.argmax(2)         # (1,1)
        token_idx = top1.item()
        if token_idx == eos_idx:
            break
        output_chars.append(tgt_itos[token_idx])
        
        # next input is current prediction
        dec_input = top1
        
        if embedded_in.grad is not None:
            embedded_in.grad.zero_() # Zero gradients from previous steps if any accumulate
            
        grad = torch.autograd.grad(
            outputs=bef_output,
            inputs=embedded_in,
            grad_outputs=torch.ones_like(bef_output),
            retain_graph=True, 
            allow_unused=False # Should be False, if None, something is wrong.
        )[0] # We are interested in the gradient w.r.t. embedded_in

        if grad is not None:
            # Since batch_size is 1, grad will be (1, max_input_len, emb_dim).
            # We can store it as is, or select the first element if TF implies that.
            # The output of torch.autograd.grad is a tuple, so [0] accesses grad for embedded_in.
            gradient_list.append(grad.clone()) # .clone() before .numpy()
        else:
            # This case should ideally not happen if connections are correct.
            print(f"Warning: Gradient is None at decoding step {t}.")
            gradient_list.append(torch.zeros_like(embedded_in).cpu().numpy())

    return ''.join(output_chars), gradient_list

In [None]:
import numpy as np
import torch
import torch.nn.functional as F
from sklearn.preprocessing import MinMaxScaler
from IPython.display import HTML as html_print, display

# Optional: define device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# HTML formatting helpers
def cstr(s, color='black'):
    if s == ' ':
        return f"<span style='color:{color}'>{s}</span>"
    else:
        return f"<span style='color:{color}'>{s}</span>"

def print_color(t):
    display(html_print(''.join([cstr(ti, color=ci) for ti, ci in t])))

def get_clr(value):
    colors = [
        '#85c2e1', '#89c4e2', '#95cae5', '#99cce6', '#a1d0e8',
        '#b2d9ec', '#baddee', '#c2e1f0', '#eff7fb', '#f9e8e8',
        '#f9e8e8', '#f9d4d4', '#f9bdbd', '#f8a8a8', '#f68f8f',
        '#f47676', '#f45f5f', '#f34343', '#f33b3b', '#f42e2e'
    ]
    idx = int(value * (len(colors) - 1))
    return colors[min(idx, len(colors) - 1)]

def sigmoid(x):
    return 1 / (1 + np.exp(-x))

def softmax(x):
    x = x.detach().cpu().numpy()
    e_x = np.exp(x - np.max(x))
    return e_x / e_x.sum()

# Main function to compute scaled gradient norms
def get_gradient_norms(grad_list, word, activation="sigmoid"):
    grad_norms = []
    for grad_tensor in grad_list:
        grad_tensor = grad_tensor.detach().cpu()
        grad_mags = torch.norm(grad_tensor, dim=1)  # Shape: (seq_len,)
        grad_mags = grad_mags[:len(word)].numpy()

        if activation == "softmax":
            grad_scaled = softmax(grad_mags)
        elif activation == "scaler":
            scaler = MinMaxScaler()
            grad_scaled = scaler.fit_transform(grad_mags.reshape(-1, 1)).flatten()
        else:  # Default: sigmoid
            grad_scaled = sigmoid(grad_mags)
        grad_norms.append(grad_scaled)
    return grad_norms

def visualize(grad_norms, word, translated_word):
    print("Original Word:", word)
    print("Transliterated Word:", translated_word)
    for i in range(len(translated_word)):
        print("Connectivity Visualization for", translated_word[i], ":")
        text_colours = [(word[j], get_clr(grad_norms[i][j])) for j in range(len(grad_norms[i]))]
        print_color(text_colours)

# Wrapper that integrates everything
def visualise_connectivity(encoder, decoder, word, tgt_stoi, get_connectivity_fn, device, activation="sigmoid"):
    translated_word, grad_list = get_connectivity_fn(encoder, decoder, word, tgt_stoi, device)
    grad_norms = get_gradient_norms(grad_list, word, activation)
    visualize(grad_norms, word, translated_word)

encoder.train()
decoder.train()
word = data['source_charToNum'][2].unsqueeze(0)
# visualise_connectivity(encoder, decoder, word,tgt_stoi, get_connectivity_fn, device, activation="sigmoid")

In [None]:
import editdistance
import csv

char_to_num_target = data['char_num_map_2']
num_to_char_target = data['num_char_map_2']
char_to_num_source = data['char_num_map']
num_to_char_source = data['num_char_map']

# Define model parameters
embSize = 32
encoderLayers = 3
decoderLayers = 3
hiddenLayerNuerons = 512
cellType = "GRU"
bidirection = 'no'
dropout = 0.3
epochs = 15
batchsize = 64
learningRate = 0.001
optimizer = 'Nadam'
tf_ratio = 1.0

# Define the device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Initialize models
encoder = Encoder(
    inputDim=data["source_len"],
    embSize=embSize,
    encoderLayers=encoderLayers,
    hiddenLayerNuerons=hiddenLayerNuerons,
    cellType=cellType,
    bidirection=bidirection
).to(device)

decoder = Decoder(
    outputDim=data["target_len"],
    embSize=embSize,
    hiddenLayerNuerons=hiddenLayerNuerons,
    decoderLayers=decoderLayers,
    cellType=cellType,
    dropout_p=dropout
).to(device)

# Load the best model
checkpoint = torch.load("best_model.pth")
encoder.load_state_dict(checkpoint['encoder_state_dict'])
decoder.load_state_dict(checkpoint['decoder_state_dict'])
encoder.eval()
decoder.eval()

test_dataset = MyDataset(data_test["source_charToNum"], data_test["val_charToNum"])
test_loader  = DataLoader(test_dataset, batch_size=1, shuffle=False)

max_target_len = 23  

total = 0
print_limit = 10

def clean(seq):
    return ''.join(c for c in seq if c not in ('{', '}'))

with torch.no_grad():
    for source_tensor, target_tensor in test_loader:
        source_tensor = source_tensor.to(device)
        target_tensor = target_tensor.to(device)

        # Encoder
        enc_hidden = encoder.initHidden(batch_size=1)
        encoder_outputs, enc_hidden = encoder(source_tensor, enc_hidden)

        # Decoder init
        dec_hidden = init_decoder_state(enc_hidden, encoderLayers, decoderLayers, cellType)
        decoder_input = torch.tensor([[char_to_num_target["{"]]], device=device)

        decoded_output = []

        for _ in range(max_target_len):
            decoder_output, dec_hidden = decoder(decoder_input, dec_hidden)
            topv, topi = decoder_output.topk(1)
            next_index = topi.view(-1).item()
            next_char = num_to_char_target[next_index]

            if next_char == "}":
                break

            decoded_output.append(next_char)
            decoder_input = torch.tensor([[next_index]], device=device)

        # Get input/output strings, remove {} padding
        input_seq = clean([num_to_char_source[i.item()] for i in source_tensor[0]])
        target_seq = clean([num_to_char_target[i.item()] for i in target_tensor[0]])
        predicted_seq = clean(decoded_output)

        if predicted_seq == target_seq:
            correct += 1
        total += 1

        if total <= print_limit:
            print(f"Input:     {input_seq}")
            print(f"Target:    {target_seq}")
            print(f"Predicted: {predicted_seq}\n")

        if total <= 3 or (total <= 20 and predicted_seq == target_seq):
            print(f"MATCH! Input: {input_seq} | Target: {target_seq} | Predicted: {predicted_seq}")
        elif total <= 20:
            print(f"DIFF!  Input: {input_seq} | Target: {target_seq} | Predicted: {predicted_seq}")


accuracy = correct / total * 100
print(f"Test Accuracy: {accuracy:.2f}%")
