In [1]:
import numpy as np
import pandas as pd
import torch
import os
import sys
from torch.utils.data import Dataset, DataLoader
import wandb
import regex as re
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence
import wandb
import lightning as pl
from pytorch_lightning import LightningModule
from pytorch_lightning.loggers import WandbLogger

In [2]:
train_path = "/home/user/Documents/Courses/dakshina_dataset_v1.0/ta/lexicons/ta.translit.sampled.train.tsv"
valid_path = "/home/user/Documents/Courses/dakshina_dataset_v1.0/ta/lexicons/ta.translit.sampled.dev.tsv"
test_path = "/home/user/Documents/Courses/dakshina_dataset_v1.0/ta/lexicons/ta.translit.sampled.test.tsv"

train_df = pd.read_csv(train_path, sep="\t", header=None, names=["native", "latin", 'n_annot'], encoding='utf-8')
valid_df = pd.read_csv(valid_path, sep="\t", header=None, names=["native", "latin", 'n_annot'], encoding='utf-8')
test_df = pd.read_csv(test_path, sep="\t", header=None, names=["native", "latin", 'n_annot'], encoding='utf-8')

train_df.head()

Unnamed: 0,native,latin,n_annot
0,ஃபியட்,fiat,2
1,ஃபியட்,phiyat,1
2,ஃபியட்,piyat,1
3,ஃபிரான்ஸ்,firaans,1
4,ஃபிரான்ஸ்,france,2


In [4]:
class NativeTokenizer():
    def __init__(self, train_path, valid_path, test_path, special_tokens={'START': '<start>','END':'<end>', 'PAD':'<pad>'}):
        
        self.train_df = pd.read_csv(train_path, sep="\t", header=None, names=["native", "latin", 'n_annot'], encoding='utf-8')
        self.valid_df = pd.read_csv(valid_path, sep="\t", header=None, names=["native", "latin", 'n_annot'], encoding='utf-8')
        self.test_df = pd.read_csv(test_path, sep="\t", header=None, names=["native", "latin", 'n_annot'], encoding='utf-8')
        self.special_tokens = special_tokens
        # Build vocabulary
        self._build_vocab(add_special_tokens=True)
        
        # Id to token mapping
        self.id_to_latin = {i: char for i, char in enumerate(self.latin_vocab)}
        self.id_to_native = {i: char for i, char in enumerate(self.native_vocab)}

        self.latin_vocab_size = len(self.latin_vocab)
        self.nat_vocab_size = len(self.native_vocab)

    # Build vocabulary
    def _build_vocab(self, add_special_tokens=True):
        self.nat_set = set()
        self.latin_set = set()
        for lat, nat in zip(self.train_df['latin'], self.train_df['native']):
            nat_chars = re.findall(r'\X' , nat)
            try:
                lat_chars = list(lat)
            except:
                print(f"Invalid latin string: {lat}, skipping....")
            
            for char in nat_chars:
                self.nat_set.add(char)
            for char in lat_chars:
               self.latin_set.add(char.lower())
            
        self.nat_set = sorted(list(self.nat_set))
        self.latin_set = sorted(list(self.latin_set))
        
        if add_special_tokens:
            self.nat_set = list(self.special_tokens.values()) + self.nat_set
            self.latin_set = [self.special_tokens['PAD']] + self.latin_set   

        self.latin_vocab = {char: i for i, char in enumerate(self.latin_set)}
        self.native_vocab = {char: i for i, char in enumerate(self.nat_set)}

    def tokenize(self, text, lang='latin'):
        if type(text) != str:
            print("Invalid text:", text)
            print("Language must be a string, but got", type(text))
        if lang == 'latin':
            return [self.latin_vocab[char] for char in text]
        elif lang == 'native':
            return [self.native_vocab['<start>']] + [self.native_vocab[char] for char in re.findall('\X', text)] + [self.native_vocab['<end>']]
        else:
            raise ValueError("Language must be either 'latin' or 'native'.")




In [5]:
tokenizer = NativeTokenizer(train_path, valid_path, test_path)
print(f"Latin vocab size: {tokenizer.latin_vocab_size}")
print(f"Native vocab size: {tokenizer.nat_vocab_size}")

Invalid latin string: nan, skipping....
Invalid latin string: nan, skipping....
Invalid latin string: nan, skipping....
Latin vocab size: 27
Native vocab size: 253


In [6]:
class LatNatDataset(Dataset):
    def __init__(self, df, tokenizer):
        self.df = df
        self.tokenizer = tokenizer

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        entry = self.df.iloc[idx]
        latin_word = entry['latin']
        native_word = entry['native']
               
        # Tokenize and convert to IDs
        #latin_ids = [self.tokenizer.latin_vocab[i] for i in latin_word]
        #native_ids = [self.tokenizer.native_vocab[i] for i in re.findall(r'\X' , native_word)]
        latin_ids = self.tokenizer.tokenize(latin_word, lang='latin')
        native_ids = self.tokenizer.tokenize(native_word, lang='native')


        return (torch.tensor(latin_ids),
            torch.tensor(native_ids))

    def collate_fn(self, batch):
        x,y = zip(*batch)
        x_len = [len(seq) for seq in x]
        y_len = [len(seq) for seq in y]

        padded_x = pad_sequence(x, batch_first=True, padding_value=self.tokenizer.latin_vocab['<pad>'])
        padded_y = pad_sequence(y, batch_first=True, padding_value=self.tokenizer.native_vocab['<pad>'])
        
        x_len, perm_idx = torch.tensor(x_len).sort(0, descending=True)
        padded_x = padded_x[perm_idx]

        y_len = torch.tensor(y_len).sort(0, descending=True)
        padded_y = padded_y[perm_idx]

        return padded_x, x_len, padded_y, y_len



In [27]:
train_dataset = LatNatDataset(train_df, tokenizer)
valid_dataset = LatNatDataset(valid_df, tokenizer)
test_dataset = LatNatDataset(test_df, tokenizer)

train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=train_dataset.collate_fn)
valid_dataloader = DataLoader(valid_dataset, batch_size=32, shuffle=False, collate_fn=valid_dataset.collate_fn)
test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=False, collate_fn=test_dataset.collate_fn)

In [287]:
class Encoder(torch.nn.Module):
    def __init__(self, input_size, embedding_size, hidden_size):
        super(Encoder, self).__init__()
        self.hidden_size = hidden_size
        self.embedding = torch.nn.Embedding(num_embeddings=input_size, embedding_dim=embedding_size, )
        #self.rnn = torch.nn.RNN(input_size=embedding_size, hidden_size=hidden_size, batch_first=True, num_layers=3, nonlinearity='relu')
        self.rnn = torch.nn.GRU(input_size=embedding_size, hidden_size=hidden_size, batch_first=True, num_layers=3,dropout=0.1)
    
    def forward(self, seq, seq_len):
        embedding = self.embedding(input=seq)
        packed = pack_padded_sequence(input=embedding, lengths=seq_len, batch_first=True, enforce_sorted=True)
        output, hidden = self.rnn(packed)
        output, _ = pad_packed_sequence(output, batch_first=True)
        return output, hidden

In [286]:
class Decoder(torch.nn.Module):
    def __init__(self, output_size, embedding_size, hidden_size):
        super(Decoder, self).__init__()
        self.hidden_size = hidden_size
        self.embedding = torch.nn.Embedding(num_embeddings=output_size, embedding_dim=embedding_size)
        #self.rnn = torch.nn.RNN(input_size=embedding_size, hidden_size=hidden_size, batch_first=True, num_layers=3, nonlinearity='relu')
        self.rnn = torch.nn.GRU(input_size=embedding_size, hidden_size=hidden_size, batch_first=True, num_layers=3, dropout=0.1)

        self.out = torch.nn.Linear(hidden_size, output_size)
        self.softmax = torch.nn.LogSoftmax(dim=2)  

    def forward(self, input_step, hidden):
        # input_step: (batch_size, 1) [a single timestep]
        embedded = self.embedding(input_step)  # (batch_size, 1, hidden_size)
        rnn_output, hidden = self.rnn(embedded, hidden)  # output: (batch_size, 1, hidden_size)
        output = self.out(rnn_output)  # (batch_size, 1, output_size)
        return output, hidden


In [309]:
from torch import nn
def train(input_tensor, input_lengths, target_tensor, target_lengths, encoder, decoder, 
          encoder_optimizer, decoder_optimizer, criterion, max_target_len, teacher_forcing_ratio=0.5):
    
    encoder.train()
    decoder.train()
    
    encoder_optimizer.zero_grad()
    decoder_optimizer.zero_grad()

    batch_size = input_tensor.size(0)
    loss = 0

    encoder_outputs, encoder_hidden = encoder(input_tensor, input_lengths)

    # Prepare decoder input and hidden state

    #decoder_input = target_tensor[:, :-1].detach().clone()
    decoder_target = target_tensor[:, 1:].detach().clone()

    decoder_input = torch.tensor(([SOS_token]*batch_size)).unsqueeze(1)
    
    print(decoder_input.shape)

    
    decoder_hidden = encoder_hidden # directly use last hidden state from encoder
    # Feed the target as the next input
    for i in range(target_tensor.shape[1]-1):

        #decoder_output, decoder_hidden = decoder(decoder_input[:, i].unsqueeze(1), decoder_hidden)
        decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)

        decoder_input = decoder_output.argmax(dim=2)
        loss += criterion(decoder_output.squeeze(1), decoder_target[:, i])
        # Append the new column
        if i == 0:
            preds = decoder_output.argmax(dim=2).cpu().numpy()
        else:
            preds = np.hstack((preds, decoder_output.argmax(dim=2).cpu().numpy()))

    mask = (decoder_target[:,:-1] != SOS_token ) & (decoder_target[:, :-1]!= PAD_TOKEN) & (decoder_target[:, :-1]!=EOS_token)
    exact_matches = (preds[:, :-1] == decoder_target[:, :-1])  & mask
    accuracy = exact_matches.float().mean()

    loss.backward()
    encoder_optimizer.step()
    decoder_optimizer.step()

    return loss.item() / max_target_len, accuracy


In [310]:
INPUT_SIZE = tokenizer.latin_vocab_size
OUTPUT_SIZE = tokenizer.nat_vocab_size
EMBEDDING_SIZE = 128
HIDDEN_SIZE = 256
MAX_TARGET_LEN = 28  # Set this to the maximum length of your target sequences
SOS_token = tokenizer.native_vocab['<start>']
PAD_TOKEN = tokenizer.native_vocab['<pad>']
EOS_token = tokenizer.native_vocab['<end>']
encoder = Encoder(input_size=INPUT_SIZE, embedding_size= EMBEDDING_SIZE,hidden_size=HIDDEN_SIZE)
decoder = Decoder(output_size=OUTPUT_SIZE, embedding_size= EMBEDDING_SIZE,hidden_size=HIDDEN_SIZE)

# Optimizers and loss
encoder_optimizer = torch.optim.Adam(encoder.parameters(), lr=0.001)
decoder_optimizer = torch.optim.Adam(decoder.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss(ignore_index=tokenizer.native_vocab['<pad>'])  # Assume PAD token is 0



def train_iters(encoder, decoder, n_epochs, training_data, encoder_optimizer, decoder_optimizer, 
                criterion, max_target_len, batch_size=32, teacher_forcing_ratio=0.5, print_every=1):
    
    for epoch in range(1, n_epochs + 1):
        print(f"Epoch {epoch}/{n_epochs}")
        total_loss = 0
        accuracy = []
        # Assuming training_data is a list of (input_tensor, input_length, target_tensor, target_length)
        for batch in training_data:
            input_tensor, input_lengths, target_tensor, target_lengths = batch
            loss, acc = train(input_tensor, input_lengths, target_tensor, target_lengths,
                         encoder, decoder, encoder_optimizer, decoder_optimizer,
                         criterion, max_target_len=max_target_len,
                         teacher_forcing_ratio=teacher_forcing_ratio)
            total_loss += loss
            accuracy.append(acc)

        if epoch % print_every == 0:
            avg_loss = total_loss / len(training_data)
            print(f"Epoch {epoch}/{n_epochs}, Loss: {avg_loss:.4f}")
            print(f"Epoch {epoch}/{n_epochs}, avg acc: {np.mean(accuracy):.4f}")
    



In [311]:
train_iters(
    encoder=encoder,
    decoder=decoder,
    n_epochs=10,
    training_data=train_dataloader,
    encoder_optimizer=encoder_optimizer,
    decoder_optimizer=decoder_optimizer,
    criterion=criterion,
    max_target_len=MAX_TARGET_LEN,
    batch_size=32,
    teacher_forcing_ratio=1,
    print_every=1
)

Epoch 1/10
torch.Size([32, 1])
torch.Size([32, 1])
torch.Size([32, 1])
torch.Size([32, 1])
torch.Size([32, 1])
torch.Size([32, 1])
torch.Size([32, 1])
torch.Size([32, 1])
torch.Size([32, 1])
torch.Size([32, 1])
torch.Size([32, 1])
torch.Size([32, 1])
torch.Size([32, 1])
torch.Size([32, 1])
torch.Size([32, 1])
torch.Size([32, 1])
torch.Size([32, 1])
torch.Size([32, 1])
torch.Size([32, 1])
torch.Size([32, 1])
torch.Size([32, 1])
torch.Size([32, 1])
torch.Size([32, 1])
torch.Size([32, 1])
torch.Size([32, 1])
torch.Size([32, 1])
torch.Size([32, 1])
torch.Size([32, 1])
torch.Size([32, 1])
torch.Size([32, 1])
torch.Size([32, 1])
torch.Size([32, 1])
torch.Size([32, 1])
torch.Size([32, 1])
torch.Size([32, 1])
torch.Size([32, 1])
torch.Size([32, 1])
torch.Size([32, 1])
torch.Size([32, 1])
torch.Size([32, 1])
torch.Size([32, 1])
torch.Size([32, 1])
torch.Size([32, 1])
torch.Size([32, 1])
torch.Size([32, 1])
torch.Size([32, 1])
torch.Size([32, 1])
torch.Size([32, 1])
torch.Size([32, 1])
torch.Siz

KeyboardInterrupt: 

In [None]:
class RNN_light(pl.LightningModule):
    def __init__(self, input_size, embedding_size, hidden_size, cell, layers, lr):
        super().__init__()
        self.optim = optim
        self.save_hyperparameters()
        self.model = CNN(input=3, filters=filters, kernel=kernel, pool_kernel=pool_kernel, pool_stride=pool_stride, batchnorm=batchnorm, activation=activation, dropout=dropout, ffn_size=ffn_size, num_classes=n_classes)
        self.train_accuracy = Accuracy(task='multiclass', num_classes=n_classes)
        self.val_accuracy = Accuracy(task='multiclass', num_classes=n_classes)
        self.loss_fn = nn.CrossEntropyLoss()

    def forward(self, x):
        return self.model(x)

    def training_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = self.loss_fn(logits, y)
        acc = self.train_accuracy(logits, y)
        self.log("train loss", loss, on_step = False, on_epoch = True)
        self.log("train accuracy", acc, on_step = False, on_epoch = True)

        return loss

    def validation_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = self.loss_fn(logits, y)
        acc = self.val_accuracy(logits, y)
        self.log("val loss", loss, on_step = False, on_epoch = True)
        self.log("val accuracy", acc, on_step = False, on_epoch = True)

        return loss

    def test_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = self.loss_fn(logits, y)
        acc = self.val_accuracy(logits, y)
        self.log('test loss', loss, on_step = False, on_epoch = True)
        self.log('test accuracy', acc, on_step = False, on_epoch = True)

    def predict_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        return logits

    def configure_optimizers(self):
        if self.optim == 'sgd':
            optimizer = torch.optim.SGD(self.parameters(), lr=self.hparams.lr, momentum=0.9)
        elif self.optim == 'adam':
            optimizer = torch.optim.Adam(self.parameters(), lr=self.hparams.lr)
        return optimizer

In [None]:
model = CNN_light(filters = filters, kernel=kernel, pool_kernel=pool_kernel, pool_stride=pool_stride, batchnorm=batchnorm, activation=activation, dropout=dropout, ffn_size=ffn_size, optim=optim, n_classes=n_classes, lr =lr)
logger= WandbLogger(project= 'final_result', name = "best_model",resume="never")
trainer = pl.Trainer(max_epochs=epochs,  accelerator="auto",logger=logger, profiler='simple',  precision="16-mixed",)
trainer.fit(model, train_dataloader, val_dataloader)
trainer.test(model, dataloaders=test_dataloader)
trainer.save_checkpoint("trained_model.ckpt")