<a href="https://colab.research.google.com/github/manish2021iitd/Deep-Learning/blob/main/DLassignment3/DLassignment3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split
import wandb
from itertools import product


class Encoder(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers=1, cell_type='rnn'):
        super(Encoder, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        # Define the embedding layer
        self.embedding = nn.Embedding(input_size, hidden_size)

        # Choose the RNN cell type
        if cell_type.lower() == 'rnn':
            self.rnn = nn.RNN(hidden_size, hidden_size, num_layers, batch_first=True)
        elif cell_type.lower() == 'lstm':
            self.rnn = nn.LSTM(hidden_size, hidden_size, num_layers, batch_first=True)
        elif cell_type.lower() == 'gru':
            self.rnn = nn.GRU(hidden_size, hidden_size, num_layers, batch_first=True)
        else:
            raise ValueError("Invalid cell type. Please choose from 'rnn', 'lstm', or 'gru'.")

    def forward(self, x):
        # Convert input to tensor
        x = torch.tensor(x)
        embedded = self.embedding(x)
        output, hidden = self.rnn(embedded)
        return output, hidden

class Decoder(nn.Module):
    def __init__(self, output_size, hidden_size, num_layers=1, cell_type='rnn'):
        super(Decoder, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        # Define the embedding layer
        self.embedding = nn.Embedding(output_size, hidden_size)

        # Choose the RNN cell type
        if cell_type.lower() == 'rnn':
            self.rnn = nn.RNN(hidden_size, hidden_size, num_layers, batch_first=True)
        elif cell_type.lower() == 'lstm':
            self.rnn = nn.LSTM(hidden_size, hidden_size, num_layers, batch_first=True)
        elif cell_type.lower() == 'gru':
            self.rnn = nn.GRU(hidden_size, hidden_size, num_layers, batch_first=True)
        else:
            raise ValueError("Invalid cell type. Please choose from 'rnn', 'lstm', or 'gru'.")

        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        # Convert input to tensor
        x = torch.tensor(x)
        embedded = self.embedding(x)
        output, hidden = self.rnn(embedded)
        return output, hidden

class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, source, target):
        encoder_output, encoder_hidden = self.encoder(source)
        decoder_output, _ = self.decoder(target, encoder_hidden)
        return decoder_output

# Example usage:
input_size = 100  # Size of input vocabulary
output_size = 100  # Size of output vocabulary
input_embedding_size = 128
hidden_size = 256
num_layers = 1
cell_type = 'lstm'

encoder = Encoder(input_size, hidden_size, num_layers, cell_type)
decoder = Decoder(output_size, hidden_size, num_layers, cell_type)
model = Seq2Seq(encoder, decoder)

# Print model architecture
print(model)

class CustomDataset(Dataset):
    def __init__(self, input_data, target_data, input_vocab, target_vocab, max_length=None):
        self.input_data = input_data
        self.target_data = target_data
        self.input_vocab = input_vocab
        self.target_vocab = target_vocab
        self.max_length = max_length

    def __len__(self):
        return len(self.input_data)

    def __getitem__(self, idx):
        input_sequence = self.tokenize(self.input_data[idx], self.input_vocab)
        target_sequence = self.tokenize(self.target_data[idx], self.target_vocab)

        # Pad sequences to max_length
        if self.max_length:
            input_sequence = self.pad_sequence(input_sequence, self.max_length)
            target_sequence = self.pad_sequence(target_sequence, self.max_length)

        return input_sequence, target_sequence

    def tokenize(self, sequence, vocab):
        return [vocab.get(token, vocab['<unk>']) for token in sequence.split()]

    def pad_sequence(self, sequence, max_length):
        if len(sequence) < max_length:
            sequence += [self.input_vocab['<pad>']] * (max_length - len(sequence))
        return sequence



import pandas as pd

def load_data(path):
    df =pd.read_csv(path, header=None, names=['English', 'Hindi' ] )
    return df['English'].tolist(), df['Hindi'].tolist()

def create_vocab(text):
    vocab = set(char for sentence in text for char in sentence)
    vocab.add('<pad>')
    vocab.add('<sos>')  # Start of sequence token
    vocab.add('<eos>')  # End of sequence token
    vocab.add('<unk>')  # Unknown token
    # Create a dictionary mapping tokens to indices
    token_to_idx = {token: idx for idx, token in enumerate(vocab)}
    return token_to_idx



# Define the paths to your CSV files
train_data_file = '/kaggle/input/akasharantar/aksharantar_sampled/hin/hin_train.csv'
val_data_file = '/kaggle/input/akasharantar/aksharantar_sampled/hin/hin_valid.csv'
test_data_file = '/kaggle/input/akasharantar/aksharantar_sampled/hin/hin_test.csv'

# Load datasets
train_input_data, train_target_data = load_data(train_data_file)
print(type(train_input_data))
train_input_vocab = create_vocab(train_input_data)
train_target_vocab =create_vocab(train_target_data)

val_input_data, val_target_data = load_data(val_data_file)
val_input_vocab = create_vocab(val_input_data)
val_target_vocab =create_vocab(val_target_data)


test_input_data, test_target_data = load_data(test_data_file)
test_input_vocab = create_vocab(test_input_data)
test_target_vocab =create_vocab(test_target_data)

max_sequence_length = 50  # Example value, adjust as needed
train_dataset = CustomDataset(train_input_data, train_target_data, train_input_vocab, train_target_vocab, max_length=max_sequence_length)
val_dataset = CustomDataset(val_input_data, val_target_data, val_input_vocab, val_target_vocab, max_length=max_sequence_length)
test_dataset = CustomDataset(test_input_data, test_target_data, test_input_vocab, test_target_vocab, max_length=max_sequence_length)

# Create data loaders for each dataset
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Define your training function
def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=10):
    for epoch in range(num_epochs):
        model.train()
        total_loss = 0.0
        for batch_idx, data in enumerate(train_loader):
            input_data, target_data = data
            # Convert input and target sequences to tensors
            input_data = torch.tensor(input_data)
            target_data = torch.tensor(target_data)
            optimizer.zero_grad()
            output = model(input_data, target_data)
            loss = criterion(output.view(-1, output.shape[-1]), target_data.view(-1))
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        avg_train_loss = total_loss / len(train_loader)

        # Validation
        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for data in val_loader:
                input_data, target_data = data
                output = model(input_data, target_data)
                loss = criterion(output.view(-1, output.shape[-1]), target_data.view(-1))
                val_loss += loss.item()

        avg_val_loss = val_loss / len(val_loader)

        print(f'Epoch {epoch+1}/{num_epochs}, Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}')

        # Log metrics to wandb
        # wandb.log({"epoch": epoch+1, "train_loss": avg_train_loss, "val_loss": avg_val_loss})

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)


# Example usage:
input_size = 100  # Size of input vocabulary
output_size = 100  # Size of output vocabulary
input_embedding_size = 128
hidden_size = 256
num_layers = 1
cell_type = 'lstm'

encoder = Encoder(input_size, hidden_size, num_layers, cell_type)
decoder = Decoder(output_size, hidden_size, num_layers, cell_type)
model = Seq2Seq(encoder, decoder).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)
# Train the model
train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=10)
