In [14]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import pickle
import torch 
import torch.nn as nn 
from torch import optim
import torch.nn.functional as F

def load_data(path):
    data = pd.read_csv(path, sep=',', header=None, names=["en", "hi"], skip_blank_lines=True, index_col=None)
    data = data[data['hi'].notna()]
    data = data[data['en'].notna()]
    data = data[['en', 'hi']]
    return data

train = load_data("/home/tenet/Documents/Deep_L_Ass/Assignment_03/aksharantar_sampled/hin/hin_train.csv")
dev = load_data("/home/tenet/Documents/Deep_L_Ass/Assignment_03/aksharantar_sampled/hin/hin_valid.csv")
test = load_data("/home/tenet/Documents/Deep_L_Ass/Assignment_03/aksharantar_sampled/hin/hin_test.csv")

x_train = train['en'].values
y_train = train['hi'].values

x_val = dev['en'].values
y_val = dev['hi'].values


english_tokens = set()
hindi_tokens = set()

for xx, yy in zip(x, y):
    for ch in xx:
        english_tokens.add(ch)
    for ch in yy:
        hindi_tokens.add(ch)

english_tokens = sorted(list(english_tokens))
hindi_tokens = sorted(list(hindi_tokens))

eng_token_map = dict([(ch, i + 1) for i, ch in enumerate(english_tokens)])
hin_token_map = dict([(ch, i + 1) for i, ch in enumerate(hindi_tokens)])

hin_token_map[" "] = 0
eng_token_map[" "] = 0

max_eng_len = max([len(i) for i in x])
max_hin_len = max([len(i) for i in y])

def process(data):
    x, y = data['en'].values, data['hi'].values
    # y = "\t" + y + "\n"
    
    a = np.zeros((len(x), max_eng_len), dtype="float32")
    b = np.zeros((len(y), max_hin_len), dtype="float32")
    c = np.zeros((len(y), max_hin_len, len(hindi_tokens) + 1), dtype="int")
    
    for i, (xx, yy) in enumerate(zip(x, y)):
        for j, ch in enumerate(xx):
            a[i, j] = eng_token_map[ch]

        a[i, j + 1:] = eng_token_map[" "]
        for j, ch in enumerate(yy):
            b[i, j] = hin_token_map[ch]

            if j > 0:
                c[i, j - 1, hin_token_map[ch]] = 1

        b[i, j + 1:] = hin_token_map[" "]
        c[i, j:, hin_token_map[" "]] = 1
        
    return a, b, c

trainx, trainxx, trainy = process(train)
valx, valxx, valy = process(dev)
# testx, testxx, testy = process(test)

np.random.seed(42)

reverse_eng_map = dict([(i, char) for char, i in eng_token_map.items()])
reverse_hin_map = dict([(i, char) for char, i in hin_token_map.items()])



In [17]:
import torch
import torch.nn as nn

def build_model(cell="LSTM", nunits=32, enc_layers=1, dec_layers=1, embed_dim=32, dense_size=32, dropout=None):
    class Encoder(nn.Module):
        def __init__(self):
            super(Encoder, self).__init__()
            self.embedding = nn.Embedding(num_embeddings=len(english_tokens) + 1, embedding_dim=embed_dim)
            
            if cell == "LSTM":
                self.rnn_layers = nn.ModuleList([nn.LSTM(embed_dim, nunits, batch_first=True, return_sequences=True)] * (enc_layers - 1))
                self.rnn_fin = nn.LSTM(embed_dim, nunits, batch_first=True, return_state=True)
            elif cell == "Simple":
                self.rnn_layers = nn.ModuleList([nn.RNN(embed_dim, nunits, batch_first=True, return_sequences=True)] * (enc_layers - 1))
                self.rnn_fin = nn.RNN(embed_dim, nunits, batch_first=True, return_state=True)
            elif cell == "GRU":
                self.rnn_layers = nn.ModuleList([nn.GRU(embed_dim, nunits, batch_first=True, return_sequences=True)] * (enc_layers - 1))
                self.rnn_fin = nn.GRU(embed_dim, nunits, batch_first=True, return_state=True)
            
        def forward(self, x):
            x = self.embedding(x)
            for rnn in self.rnn_layers:
                if dropout is not None:
                    x = nn.Dropout(dropout)(x)
                x, _ = rnn(x)
                
            _, (state_h, state_c) = self.rnn_fin(x)
            encoder_states = (state_h, state_c)
            return encoder_states

    class Decoder(nn.Module):
        def __init__(self):
            super(Decoder, self).__init__()
            self.embedding = nn.Embedding(num_embeddings=len(hindi_tokens) + 1, embedding_dim=embed_dim)
            
            if cell == "LSTM":
                self.rnn_layers = nn.ModuleList([nn.LSTM(embed_dim, nunits, batch_first=True, return_sequences=True, return_state=True)] * dec_layers)
            elif cell == "Simple":
                self.rnn_layers = nn.ModuleList([nn.RNN(embed_dim, nunits, batch_first=True, return_sequences=True, return_state=True)] * dec_layers)
            elif cell == "GRU":
                self.rnn_layers = nn.ModuleList([nn.GRU(embed_dim, nunits, batch_first=True, return_sequences=True, return_state=True)] * dec_layers)
        
        def forward(self, x, encoder_states):
            x = self.embedding(x)
            for rnn in self.rnn_layers:
                x, _ = rnn(x, encoder_states)
            
            return x

    class Translator(nn.Module):
        def __init__(self):
            super(Translator, self).__init__()
            self.encoder = Encoder()
            self.decoder = Decoder()
            self.dense1 = nn.Linear(nunits, dense_size)
            self.dense2 = nn.Linear(dense_size, len(hindi_tokens) + 1)
            
        def forward(self, encoder_inputs, decoder_inputs):
            encoder_states = self.encoder(encoder_inputs)
            decoder_output = self.decoder(decoder_inputs, encoder_states)
            pre_output = self.dense1(decoder_output)
            final_output = self.dense2(pre_output)
            return final_output

    model = Translator()
    return model



In [None]:

import torch

def accuracy1(real, pred):
    real = torch.argmax(real, dim=2)
    pred = torch.argmax(pred, dim=2)
    mask = torch.logical_not(torch.eq(real, 0))
    acc = torch.eq(real, pred)
    mask = mask.to(torch.int32)
    acc = acc.to(torch.int32)
    acc = torch.mul(acc, mask)
    mask = torch.sum(mask, dim=1)
    acc = torch.sum(acc, dim=1)
    acc = torch.eq(acc, mask)
    acc = acc.to(torch.float32)
    return torch.mean(acc)


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim

class Model(nn.Module):
    def __init__(self, nunits, dense_size, enc_layers, dec_layers, cell, dropout, embed_dim):
        super(Model, self).__init__()
        # Define the layers and architecture of your model here
        # Example:
        self.encoder = nn.LSTM(input_size=embed_dim, hidden_size=nunits, num_layers=enc_layers, dropout=dropout)
        self.decoder = nn.Linear(nunits, dense_size)
    
    def forward(self, x):
        # Define the forward pass of your model here
        # Example:
        x, _ = self.encoder(x)
        x = self.decoder(x)
        return x

# Instantiate the model
train = Model(nunits=256,
              dense_size=512,
              enc_layers=3,
              dec_layers=1,
              cell="LSTM",
              dropout=0.2,
              embed_dim=256)

# Define the optimizer and loss function
optimizer = optim.Adam(train.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()

# Define the accuracy function (equivalent to accuracy1)
def accuracy1(real, pred):
    pred = pred.argmax(dim=2)
    mask = real != 0
    acc = (real == pred) & mask
    mask = mask.to(torch.int32)
    acc = acc.to(torch.int32)
    acc = acc * mask
    mask = mask.sum(dim=1)
    acc = acc.sum(dim=1)
    acc = acc == mask
    acc = acc.to(torch.float32)
    return acc.mean()

# Compile the model (no equivalent in PyTorch)
# Instead, we manually call the optimizer and loss function during training

# Example training loop
for epoch in range(num_epochs):
    # Forward pass
    outputs = train(inputs)
    loss = criterion(outputs, targets)

    # Backward and optimize
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    # Compute accuracy
    acc = accuracy1(targets, outputs)

    # Print training information
    print('Epoch [{}/{}], Loss: {:.4f}, Accuracy: {:.2f}%'.format(epoch+1, num_epochs, loss.item(), acc.item()*100))


In [None]:
import torch

# Define a variable to keep track of the best validation accuracy
best_val_accuracy = 0.0

# ...

# Inside your training loop
for epoch in range(num_epochs):
    # Perform the training steps
    # ...

    # Evaluate the model on the validation set
    # Compute the validation accuracy using your accuracy function (accuracy1)
    val_accuracy = accuracy1(val_targets, val_outputs)

    # Check if the current validation accuracy is better than the best so far
    if val_accuracy > best_val_accuracy:
        # Update the best validation accuracy
        best_val_accuracy = val_accuracy

        # Save the model checkpoint
        torch.save(train.state_dict(), 'best_model.pth')
        print('Saved the best model!')

    # ...

# ...


In [None]:
train.fit([trainx,trainxx],trainy,
             batch_size=64,
             validation_data = ([valx,valxx],valy),
             epochs=20,
             callbacks = [model_cb])

In [None]:
import torch
import torch.nn as nn

def inference_models(model, nunits=32, enc_layers=1, dec_layers=1, cell='LSTM', dropout=None):
    encoder_inputs = model.input[0]
    encoder_embedding = model.get_layer('enc_embed')
    encoder_context = encoder_embedding(encoder_inputs)
    decoder_inputs = model.input[1]
    decoder_embedding = model.get_layer('dec_embed')
    decoder_context = decoder_embedding(decoder_inputs)

    encoder_prev = [model.get_layer(f'enc_{i}') for i in range(enc_layers-1)]
    encoder_fin = model.get_layer(f'enc_{enc_layers-1}')
    temp = encoder_context
    for i, lay in enumerate(encoder_prev):
        temp = lay(temp)
        if dropout is not None:
            temp = model.get_layer(f'do_{i}')(temp)

    if cell == "LSTM":
        _, (state_h, state_c) = encoder_fin(temp)
        encoder_states = [(state_h, state_c)]

    elif cell == "GRU":
        _, state = encoder_fin(temp)
        encoder_states = [state]

    encoder_model = nn.Sequential(*[encoder_inputs], encoder_states)

    decoder = [model.get_layer(f'dec_{i}') for i in range(dec_layers)]

    if cell == "LSTM":
        state_inputs = []
        state_outputs = []

        decoder_input_h = nn.Input(shape=(nunits,), name='inputh0')
        decoder_input_c = nn.Input(shape=(nunits,), name='inputc0')
        temp, (sh, sc) = decoder[0](decoder_context, initial_state=[decoder_input_h, decoder_input_c])
        state_inputs += [decoder_input_h, decoder_input_c]
        state_outputs += [sh, sc]

        for i in range(1, dec_layers):
            decoder_input_h = nn.Input(shape=(nunits,), name=f'inputh{i}')
            decoder_input_c = nn.Input(shape=(nunits,), name=f'inputc{i}')
            temp, (sh, sc) = decoder[i](temp, initial_state=[decoder_input_h, decoder_input_c])
            state_inputs += [decoder_input_h, decoder_input_c]
            state_outputs += [sh, sc]

        decoder_input_pass = [decoder_inputs] + state_inputs

    elif cell == "GRU":
        state_inputs = []
        state_outputs = []

        state_input = nn.Input(shape=(nunits,), name='inputs0')
        temp, s = decoder[0](decoder_context, initial_state=state_input)
        state_inputs.append(state_input)
        state_outputs.append(s)

        for i in range(1, dec_layers):
            state_input = nn.Input(shape=(nunits,), name=f'inputs{i}')
            temp, s = decoder[i](temp, initial_state=state_input)
            state_inputs.append(state_input)
            state_outputs.append(s)

        decoder_input_pass = [decoder_inputs] + state_inputs

    pre_out = model.get_layer('dense1')(temp)
    final_output = model.get_layer('dense2')(pre_out)

    decoder_model = nn.Sequential(*decoder_input_pass, final_output, *state_outputs)

    return encoder_model, decoder_model


In [None]:
enc,dec = inference_models(model,nunits=256,enc_layers=3,dec_layers=1,cell="LSTM",dropout='yes')