In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm
from sklearn.model_selection import train_test_split
from torch.utils.tensorboard import SummaryWriter
import torch.nn.functional as F

writer = SummaryWriter('runs/Luong_attention_LSTM')

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
print(f"Device name: {torch.cuda.get_device_name(0) if torch.cuda.is_available() else 'CPU'}")

In [None]:
data = pd.read_csv("../../data/Dataset_English_Hindi.csv")
data.sample(5)

In [None]:
mask = data['English'].apply(lambda x: isinstance(x, str)) & data['Hindi'].apply(lambda x: isinstance(x, str))
data = data.loc[mask].copy()
data['English'] = data['English'].str.lower()
data['Hindi'] = data['Hindi'].str.lower()

In [None]:
MAX_SENT_LEN = 15
data = data[data['English'].str.split().apply(len) < MAX_SENT_LEN].copy()
data.reset_index(drop=True, inplace=True)
print(f"Rows after filtering: {len(data)}")

In [None]:
class Tokenizer:
    def __init__(self, texts):
        self.oov_token = "<|unknown|>"
        self.start_token = "<|startoftext|>"
        self.end_token = "<|endoftext|>"
        self.padding_token = "<|pad|>"
        self.word_index = {self.oov_token: 0, self.start_token: 1, self.end_token: 2, self.padding_token: 3}
    
        for text in texts:
            text = text.replace('!', '')
            text = text.replace('.', '')
            text = text.replace('(', '')
            text = text.replace(')', '')
            text = text.replace('?', '')
            text = text.replace('-', ' ')

            text = text.split()
            for word in text:
                if word not in self.word_index:
                    self.word_index[word] = len(self.word_index) + 1
        self.index_word = {idx : word for word, idx in self.word_index.items()}

        self.vocab_size = len(self.word_index)
    def encode(self, texts):
        tokenized_texts = []
        for text in texts:
            text = text.replace('!', '')
            text = text.replace('.', '')
            text = text.replace('(', '')
            text = text.replace(')', '')
            text = text.replace('?', '')
            text = text.replace('-', ' ')
            tokenized_text = []
            text = text.split()
            for word in text:
                tokenized_text.append(self.word_index.get(word, self.word_index[self.oov_token]))
            tokenized_texts.append(tokenized_text)
        return tokenized_texts
    def decode(self, sequences):
        decoded_texts = []
        for sequence in sequences:
            decoded_text = []
            for index in sequence:
                decoded_text.append(self.index_word.get(index, self.oov_token))
            decoded_texts.append(' '.join(decoded_text))
        return decoded_texts

In [None]:
eng_tokenizer = Tokenizer(data['English'])
hin_tokenizer = Tokenizer(data['Hindi'])
print(f"English Vocabulary Size: {eng_tokenizer.vocab_size}")
print(f"Hindi Vocabulary Size: {hin_tokenizer.vocab_size}")

In [None]:
EMBED_DIM = 128
HIDDEN_DIM = 256
BATCH_SIZE = 128

START_LR = 0.005
END_LR = 0.000001
TOTAL_EPOCHS = 20

In [None]:
data, test_data = train_test_split(data, test_size=0.05)
print(len(data), len(test_data))

In [None]:
class MyDataset(Dataset):
    def __init__(self, data, eng_tokenizer, hin_tokenizer):
        self.data = data
        self.eng_tokenizer = eng_tokenizer
        self.hin_tokenizer = hin_tokenizer

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        eng_text = self.data.iloc[idx]['English']
        hin_text = self.data.iloc[idx]['Hindi']

        eng_tokenized = self.eng_tokenizer.encode([eng_text])[0]
        if len(eng_tokenized) > MAX_SENT_LEN:
            eng_tokenized = eng_tokenized[:MAX_SENT_LEN]

        eng_padded = [self.eng_tokenizer.word_index[self.eng_tokenizer.padding_token]] * (MAX_SENT_LEN - len(eng_tokenized)) + eng_tokenized

        hin_tokenized = self.hin_tokenizer.encode([hin_text])[0]

        if len(hin_tokenized) > MAX_SENT_LEN - 2:
            hin_tokenized = hin_tokenized[:MAX_SENT_LEN - 2]
        hin_padded = [self.hin_tokenizer.word_index[self.hin_tokenizer.start_token]] + hin_tokenized + [self.hin_tokenizer.word_index[self.hin_tokenizer.end_token]] + [self.hin_tokenizer.word_index[self.hin_tokenizer.padding_token]] * (MAX_SENT_LEN - len(hin_tokenized) - 2)

        return {
            'eng_input': torch.tensor(eng_padded),
            'hin_target': torch.tensor(hin_padded)
        }

dataset = DataLoader(MyDataset(data, eng_tokenizer, hin_tokenizer), batch_size=BATCH_SIZE, shuffle=True)
test_dataset = DataLoader(MyDataset(test_data, eng_tokenizer, hin_tokenizer), batch_size=BATCH_SIZE, shuffle=True)

In [None]:
class Encoder(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_layers=2):
        super().__init__()
        self.embedding = nn.Embedding(input_dim, EMBED_DIM)
        self.rnn = nn.LSTM(EMBED_DIM, hidden_dim, num_layers, batch_first=True)

    def forward(self, src):
        outputs, (h, c) = self.rnn(self.embedding(src))
        return outputs, (h, c)

In [None]:
class LuongAttention(nn.Module):
    def __init__(self, hidden_dim):
        super().__init__()
        # Luong Attention 'General' Score: score(h_t, h_s) = h_t^T * W * h_s
        self.W = nn.Linear(hidden_dim, hidden_dim, bias=False)

    def forward(self, decoder_hidden, encoder_outputs):
        # decoder_hidden: (B, H) - this corresponds to h_t (current step hidden state)
        # encoder_outputs: (B, T, H) - corresponds to all h_s

        # Calculate Energy/Score
        # W(encoder_outputs) -> (B, T, H)
        # decoder_hidden.unsqueeze(2) -> (B, H, 1)
        # bmm((B, T, H), (B, H, 1)) -> (B, T, 1)
        
        # Project encoder outputs for general score
        energy = self.W(encoder_outputs) # (B, T, H)
        
        # Calculate score: h_t . W(h_s)
        scores = torch.bmm(energy, decoder_hidden.unsqueeze(2)) # (B, T, 1)
        
        # Attention weights
        attn_weights = F.softmax(scores, dim=1) # (B, T, 1)
        
        # Calculate Context Vector
        # encoder_outputs: (B, T, H)
        # attn_weights.transpose(1, 2) -> (B, 1, T)
        # bmm((B, 1, T), (B, T, H)) -> (B, 1, H)
        context = torch.bmm(attn_weights.transpose(1, 2), encoder_outputs)
        
        return context, attn_weights

In [None]:
class LuongDecoder(nn.Module):
    def __init__(self, output_dim, embed_dim, hidden_dim, num_layers=2):
        super().__init__()
        self.embedding = nn.Embedding(output_dim, embed_dim)
        self.rnn = nn.LSTM(embed_dim, hidden_dim, num_layers, batch_first=True)
        self.attention = LuongAttention(hidden_dim)
        self.fc = nn.Linear(hidden_dim + hidden_dim, output_dim)

    def forward(self, input_token, hidden, encoder_outputs):
        # input_token: (B)
        input_token = input_token.unsqueeze(1) # (B, 1)
        embedded = self.embedding(input_token) # (B, 1, Emb)

        # Run RNN step
        # In Luong, we typically feed just the embedding to the RNN (or concatenated with prev context if Input Feeding)
        # Here we follow simple approach: Emb -> RNN -> Attn -> Out
        rnn_output, (h, c) = self.rnn(embedded, hidden)
        # rnn_output: (B, 1, H)

        # Calculate Attention
        # Use the current RNN output as the query
        context, weights = self.attention(rnn_output.squeeze(1), encoder_outputs)
        # context: (B, 1, H)

        # Concatenate RNN output and Context Vector
        # (B, 1, H) cat (B, 1, H) -> (B, 1, 2H)
        combined = torch.cat((rnn_output, context), dim=2)
        
        # Final prediction
        prediction = self.fc(combined.squeeze(1))

        return prediction, (h, c), weights

In [None]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        batch_size = src.size(0)
        trg_len = trg.size(1)
        vocab_size = self.decoder.fc.out_features

        outputs = torch.zeros(batch_size, trg_len, vocab_size).to(self.device)
        
        # Encoder
        encoder_outputs, hidden = self.encoder(src)

        input_token = trg[:, 0] # <start> token

        for t in range(1, trg_len):
            output, hidden, _ = self.decoder(input_token, hidden, encoder_outputs)
            outputs[:, t] = output
            
            # Teacher Forcing
            teacher_force = torch.rand(1).item() < teacher_forcing_ratio
            top1 = output.argmax(1)
            input_token = trg[:, t] if teacher_force else top1

        return outputs

In [None]:
encoder = Encoder(input_dim=eng_tokenizer.vocab_size+1, hidden_dim=HIDDEN_DIM)
decoder = LuongDecoder(output_dim=hin_tokenizer.vocab_size+1, embed_dim=EMBED_DIM, hidden_dim=HIDDEN_DIM)

model = Seq2Seq(encoder, decoder, device).to(device)

In [None]:
criterion = nn.CrossEntropyLoss(ignore_index=hin_tokenizer.word_index[hin_tokenizer.padding_token])
optimizer = torch.optim.Adam(model.parameters(), lr=START_LR)

In [None]:
def train_epoch(model, iterator, optimizer, criterion, clip=1.0):
    model.train()
    epoch_loss = 0
    
    for batch in tqdm(iterator):
        src = batch['eng_input'].to(device)
        trg = batch['hin_target'].to(device)
        
        optimizer.zero_grad()
        
        output = model(src, trg)
        
        # trg: [batch_size, trg_len] -> [batch_size * (trg_len-1)] (exclude start token for loss)
        # output: [batch_size, trg_len, vocab] -> [batch_size * (trg_len-1), vocab]
        output_dim = output.shape[-1]
        output = output[:, 1:].reshape(-1, output_dim)
        trg = trg[:, 1:].reshape(-1)
        
        loss = criterion(output, trg)
        loss.backward()
        
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()
        
        epoch_loss += loss.item()
        
    return epoch_loss / len(iterator)

def evaluate(model, iterator, criterion):
    model.eval()
    epoch_loss = 0
    
    with torch.no_grad():
        for batch in iterator:
            src = batch['eng_input'].to(device)
            trg = batch['hin_target'].to(device)

            output = model(src, trg, 0) # Turn off teacher forcing

            output_dim = output.shape[-1]
            output = output[:, 1:].reshape(-1, output_dim)
            trg = trg[:, 1:].reshape(-1)

            loss = criterion(output, trg)
            epoch_loss += loss.item()
            
    return epoch_loss / len(iterator)

In [None]:
for epoch in range(TOTAL_EPOCHS):
    train_loss = train_epoch(model, dataset, optimizer, criterion)
    valid_loss = evaluate(model, test_dataset, criterion)
    
    print(f'Epoch: {epoch+1:02} | Train Loss: {train_loss:.3f} | Val. Loss: {valid_loss:.3f}')
    writer.add_scalar('Training Loss', train_loss, epoch)
    writer.add_scalar('Validation Loss', valid_loss, epoch)
    
    torch.save(model.state_dict(), f'model/Luong_attention_model_epoch_{epoch}.pth')