In [6]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset
from torch.utils.data import DataLoader 

from tqdm import tqdm
import heapq
import csv

import numpy as np
import random
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from matplotlib.font_manager import FontProperties

import wandb
# Instantiates the device to be used as GPU/CPU based on availability
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

#specify max length of sequence
hin_embedd_size = 29
eng_embedd_size = 32
device.type

'cuda'

In [7]:
#ANOTHER STYLE BEGIN

import numpy as np

# Load Data to capture all characters
arr = np.loadtxt("/kaggle/input/aksharantar-sampled2/aksharantar_sampled/hin/hin_train.csv",
                 delimiter=",", dtype=str)
num_sample = arr.shape[0]
x_train, y_train = arr[:, 0], arr[:, 1]

english_dict = {}
hindi_dict = {}
english_index_dict = {}
hindi_index_dict = {}

'''
english_index = 3
hin_index = 3'''

english_index = hin_index = 3

for sentence in np.concatenate((x_train, y_train)):
    for char in sentence:
        if char not in english_dict:
            english_dict[char] = english_index
            english_index_dict[english_index] = char
            english_index += 1

for sentence in y_train:
    for char in sentence:
        if char not in hindi_dict:
            hindi_dict[char] = hin_index
            hindi_index_dict[hin_index] = char
            hin_index += 1

# Adding start, stop and padding symbols
start_symbol = '<S>'
end_symbol = '<E>'
padding_symbol = '<P>'
english_index_dict[0] = hindi_index_dict[0] = padding_symbol
english_index_dict[1] = hindi_index_dict[1] = start_symbol
english_index_dict[2] = hindi_index_dict[2] = end_symbol  #ANOTHER STYLE END


In [8]:
#ANOTHER STYLE BEGIN

import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader

class DataProcessor:
    def __init__(self, eng_embedd_size, hin_embedd_size):
        self.eng_embedd_size = eng_embedd_size
        self.hin_embedd_size = hin_embedd_size
        self.english_dict = {}
        self.hindi_dict = {}

    def process_data(self, path):
        arr = np.loadtxt(path, delimiter=",", dtype=str)
        num_samples = arr.shape[0]
        x, y = arr[:, 0], arr[:, 1]

        X = np.zeros((num_samples, self.eng_embedd_size))  # input
        Y = np.zeros((num_samples, self.hin_embedd_size))  # target

        for i in range(num_samples):
            X[i][0] = Y[i][0] = 1

            for j, char in enumerate(x[i]):
                X[i][j + 1] = self.english_dict.setdefault(char, len(self.english_dict) + 3)

            X[i][len(x[i]) + 1] = 2

            for j, char in enumerate(y[i]):
                Y[i][j + 1] = self.hindi_dict.setdefault(char, len(self.hindi_dict) + 3)

            Y[i][len(y[i]) + 1] = 2

        return X, Y

class CustomDataset(Dataset):
    def __init__(self, X, Y):
        self.X = torch.tensor(X, dtype=torch.int64)
        self.Y = torch.tensor(Y, dtype=torch.int64)
        self.length = X.shape[0]

    def __getitem__(self, index):
        return self.X[index], self.Y[index]

    def __len__(self):
        return self.length

data_processor = DataProcessor(eng_embedd_size, hin_embedd_size)

X_train, y_train = data_processor.process_data("/kaggle/input/aksharantar-sampled2/aksharantar_sampled/hin/hin_train.csv")
X_val, y_val = data_processor.process_data("/kaggle/input/aksharantar-sampled2/aksharantar_sampled/hin/hin_valid.csv")
X_test, y_test = data_processor.process_data("/kaggle/input/aksharantar-sampled2/aksharantar_sampled/hin/hin_test.csv")

train_dataset = CustomDataset(X_train, y_train)
val_dataset = CustomDataset(X_val, y_val)
test_dataset = CustomDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, shuffle=True, batch_size=256)
val_loader = DataLoader(val_dataset, shuffle=True, batch_size=256)
test_loader = DataLoader(test_dataset, shuffle=False, batch_size=256) #ANOTHER STYLE END

In [9]:
#  ANOTHER STYLE BEGIN
import torch
import torch.nn as nn

class Encoder(nn.Module):
    
    def __init__(self,
                 input_dimension=72,
                 embed_dimension=64,
                 hidden_dimension=256,
                 cell_type='gru',
                 layers=2,
                 bidirectional=True,
                 dropout=0,
                 device=torch.device('cuda' if torch.cuda.is_available() else 'cpu')
                ):
        super(Encoder, self).__init__()
        
        self.detail_parameters = {}
        self.detail_parameters['input_dimension'] = input_dimension
        self.detail_parameters['embed_dimension'] = embed_dimension
        self.detail_parameters['hidden_dimension'] = hidden_dimension
        self.detail_parameters['cell_type'] = cell_type
        self.detail_parameters['dropout'] = dropout
        self.detail_parameters['layers'] = layers
        self.detail_parameters['direction_value'] = 2 if bidirectional else 1
        self.detail_parameters['device'] = device.type

        
        self.input_dimension = input_dimension
        self.embed_dimension = embed_dimension
        self.hidden_dimension = hidden_dimension
        self.cell_type = cell_type
        self.layers = layers
        self.dropout = dropout
        self.device = device

        self.embedding = nn.Embedding(self.input_dimension, self.embed_dimension)
        self.dropout_layer = nn.Dropout(dropout)
        
        self.direction_value = 2 if bidirectional else 1

        # Define different types of recurrent cells
        if self.cell_type == 'rnn':
            self.encoder_type = RNNLayer(self.embed_dimension, self.hidden_dimension, self.layers, bidirectional, dropout)
        elif self.cell_type == 'gru':
            self.encoder_type = GRULayer(self.embed_dimension, self.hidden_dimension, self.layers, bidirectional, dropout)
        elif self.cell_type == 'lstm':
            self.encoder_type = LSTMLayer(self.embed_dimension, self.hidden_dimension, self.layers, bidirectional, dropout)

    def forward(self, input, hidden, cell=None):
        embedded = self.embedding(input)
        embedded = self.dropout_layer(embedded)
        
        if self.cell_type == 'lstm':
            output, (hidden, cell) = self.encoder_type(embedded, (hidden, cell))
        else:
            output, hidden = self.encoder_type(embedded, hidden)

        return output, hidden, cell if self.cell_type == 'lstm' else None

    def getParams(self):
        return self.detail_parameters
    
    def init_hidden(self, batch):
        return torch.zeros(self.direction_value * self.layers, batch, self.hidden_dimension, device=self.device)

# Define RNN layer as a subclass of nn.Module
class RNNLayer(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, bidirectional, dropout):
        super(RNNLayer, self).__init__()
        self.rnn = nn.RNN(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, bidirectional=bidirectional, dropout=dropout)

    def forward(self, input, hidden):
        return self.rnn(input, hidden)

# Define GRU layer as a subclass of nn.Module
class GRULayer(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, bidirectional, dropout):
        super(GRULayer, self).__init__()
        self.gru = nn.GRU(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, bidirectional=bidirectional, dropout=dropout)

    def forward(self, input, hidden):
        return self.gru(input, hidden)

# Define LSTM layer as a subclass of nn.Module
class LSTMLayer(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, bidirectional, dropout):
        super(LSTMLayer, self).__init__()
        self.lstm = nn.LSTM(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, bidirectional=bidirectional, dropout=dropout)

    def forward(self, input, hidden):
        return self.lstm(input, hidden)  #ANOTHER STYLE END


In [10]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class Decoder(nn.Module):
    def __init__(self,
                 input_dimension=26,
                 embed_dimension=64,
                 hidden_dimension=256,
                 cell_type='lstm',
                 layers=2,
                 use_attention=False,
                 attention_dimension=None,
                 dropout=0,
                 bidirectional=True,
                 device=torch.device('cuda' if torch.cuda.is_available() else 'cpu')
                ):
        super(Decoder, self).__init__()

        self.input_dimension = input_dimension
        self.embed_dimension = embed_dimension
        self.hidden_dimension = hidden_dimension
        self.cell_type = cell_type
        self.layers = layers
        self.use_attention = use_attention
        self.attention_dimension = attention_dimension
        self.dropout = dropout
        self.device = device
        #self.linear_transform = nn.Linear(hidden_dimension, output_dimension)  # Adjust output_dimension as needed

        # Embedding layer
        self.embedding = nn.Embedding(input_dimension, embed_dimension)
        self.dropout_layer = nn.Dropout(dropout)

        # Calculate input size considering attention
        self.input_size = embed_dimension
        if use_attention:
            self.input_size += attention_dimension

        # Define decoder type (RNN, GRU, LSTM)
        if cell_type == 'rnn':
            self.decoder_type = nn.RNN(input_size=self.input_size, hidden_size=hidden_dimension,
                                        num_layers=layers, bidirectional=bidirectional, dropout=dropout)
        elif cell_type == 'gru':
            self.decoder_type = nn.GRU(input_size=self.input_size, hidden_size=hidden_dimension,
                                        num_layers=layers, bidirectional=bidirectional, dropout=dropout)
        elif cell_type == 'lstm':
            self.decoder_type = nn.LSTM(input_size=self.input_size, hidden_size=hidden_dimension,
                                         num_layers=layers, bidirectional=bidirectional, dropout=dropout)

        # Attention mechanism components
        if use_attention:
            self.U = nn.Linear(hidden_dimension, hidden_dimension)
            self.W = nn.Linear(hidden_dimension, hidden_dimension)
            self.V = nn.Linear(hidden_dimension, 1)

        # Output layer to match input dimension
        self.W1 = nn.Linear(hidden_dimension * (2 if bidirectional else 1), input_dimension)
    
    def forward(self, input, hidden, cell=None, encoder_outputs=None):
        embedded = self.embedding(input)
        embedded = self.dropout_layer(embedded)

        # Apply attention mechanism if enabled
        if self.use_attention:
            context, attention_weights = self.apply_attention(hidden, encoder_outputs)
            embedded = torch.cat((embedded, context), 2)

        # Pass through decoder RNN type
        if self.cell_type == 'lstm':
            output, (hidden, cell) = self.decoder_type(embedded, (hidden, cell))
        else:
            output, hidden = self.decoder_type(embedded, hidden)

        # Apply linear layer to match output dimension
        output = self.W1(output)

        return output, hidden, cell, attention_weights if self.use_attention else None
    

    
    def apply_attention(self, hidden, encoder_outputs):
    # Project encoder outputs and hidden state
        encoder_transform = self.W(encoder_outputs)
        hidden_transform = self.U(hidden)

    # Combine encoder and hidden transformations
        concat_transform = encoder_transform + hidden_transform

    # Apply activation function
        concat_transform = torch.tanh(concat_transform)

    # Calculate attention scores
        score = self.V(concat_transform)

    # Apply softmax to get attention weights
        attention_weights = F.softmax(score, dim=1)

    # Compute context vector
        context_vector = torch.sum(attention_weights * encoder_outputs, dim=1)

    # Reshape context vector
        normalized_context_vector = context_vector.unsqueeze(0)

        return normalized_context_vector, attention_weights

    
    def getParams(self):
        return {
            'input_dimension': self.input_dimension,
            'embed_dimension': self.embed_dimension,
            'hidden_dimension': self.hidden_dimension,
            'attention_dimension': self.attention_dimension,
            'cell_type': self.cell_type,
            'layers': self.layers,
            'device': self.device.type,
            'dropout': self.dropout,
            'use_attention': self.use_attention,
            'attention_dimension': self.attention_dimension
        }

In [11]:
import torch
import heapq

class BeamNode:
    def __init__(self, index, path_probability, hidden_state, cell_state, parent=None):
        self.index = index
        self.path_probability = path_probability
        self.hidden_state = hidden_state
        self.cell_state = cell_state
        self.parent = parent
        self.length = 0

def expand_node(model, node):
    output, dec_hidden, cell, _ = model.decoder.forward(node.index, node.hidden, node.cell, None)
    output = model.softmax(output, dim=2)
    topk_output, topk_index = torch.topk(output, model.beam_width, dim=2)
    return topk_output, topk_index, dec_hidden, cell

def create_child_nodes(model, topk_output, topk_index, dec_hidden, cell, curr_node):
    child_nodes = []
    for j in range(model.beam_width):
        output = topk_output[:, :, j]
        index = topk_index[:, :, j]
        if curr_node.path_probability * output.item() < 0.001:
            continue
        child_node = BeamNode(output.item(), curr_node.path_probability * output.item(), index, dec_hidden, cell, curr_node)
        child_node.length = curr_node.length + 1
        child_nodes.append(child_node)
    return child_nodes

def traverse_path(model, path, predicted):
    while path is not None:
        output, _, _, _ = model.decoder.forward(path.index, path.hidden, path.cell, None)
        predicted[model.output_seq_length - path.length, i:i+1] = output
        path = path.parent

def beam_search(model, outputs, dec_hiddens, cells, predicted):
    batch_size = outputs.shape[1]
    paths = []

    for i in range(batch_size):
        with torch.no_grad():
            model.eval()
            output = outputs[:, i:i+1].contiguous()
            index = output.contiguous()
            dec_hidden = dec_hiddens[:, i:i+1, :].contiguous()
            cell = cells[:, i:i+1, :].contiguous() if cells is not None else None
            
            open_list = []
            heapq.heapify(open_list)
            
            root_node = BeamNode(1, 1, index, dec_hidden, cell, None)
            heapq.heappush(open_list, root_node)

            while len(open_list) > 0:
                curr_node = heapq.heappop(open_list)
                
                if curr_node.length == model.output_seq_length - 1:
                    paths.append(curr_node)
                    continue

                topk_output, topk_index, dec_hidden, cell = expand_node(model, curr_node)
                child_nodes = create_child_nodes(model, topk_output, topk_index, dec_hidden, cell, curr_node)
                for node in child_nodes:
                    heapq.heappush(open_list, node)

            if len(paths) > 0:
                best_path = min(paths, key=lambda x: x.path_probability)
                traverse_path(model, best_path, predicted)
            else:
                for t in range(1, model.output_seq_length):
                    output, _, _, _ = model.decoder.forward(index, dec_hidden, cell, None)
                    predicted[t, i:i+1] = output
                    output = model.softmax(output, dim=2)
                    output = torch.argmax(output, dim=2)