# CS6910 Assignment 3 (RNN Frameworks for transliteration) - Without attention

In [8]:
import torch
import torch.nn as nn
from torch import optim
import torch.nn.functional as F
import tqdm
import wandb
import numpy as np
import unicodedata
from torch.utils.data import DataLoader, Dataset
import pandas as pd
import random

In [20]:
device = ('cuda' if torch.cuda.is_available() else 'cpu')
print(device)
TARGET = 'tam'
SOURCE = 'eng'
SOS_SYM = '@'
EOS_SYM = '$'
UNK_SYM = '!'
PAD_SYM = '%'

unicode_ranges = {'tam' : [0x0B80, 0x0BFF], 
                  'eng' : [0x0061, 0x007A],
                  'hin' : [0x0900, 0x097F]}

cuda


## Preprocessing Functions and Helpers

In [18]:
# function to load the 'cat' (= train/val/test) data of language 'lang'
def load_data(lang, cat):
    fcontents = open(f'aksharantar_sampled/{lang}/{lang}_{cat}.csv','r', encoding='utf-8').readlines()
    pairs = [tuple(l.strip().split(',')) for l in fcontents]
    x_data, y_data = list(map(list,zip(*pairs)))
    return x_data, y_data

class Language:
    def __init__(self, name):
        self.lname = name
    
    # function to create the vocabulary using the words in 'data'
    def create_vocabulary(self, *data):
        symbols = set()
        for wd in data:
            for c in wd:
                symbols.add(c)
        self.symbols = symbols

    # function to use unicode ranges for creating the character set
    def create_vocabulary_range(self):
        symbols = set()
        begin, end = unicode_ranges[self.lname]
        for i in range(begin, end+1):
            if (unicodedata.category(chr(i)) != 'Cn'):
                symbols.add(chr(i))
        self.symbols = symbols
    
    def generate_mappings(self):
        self.index2sym = {0: SOS_SYM, 1 : EOS_SYM, 2 : UNK_SYM, 3 : PAD_SYM}
        self.sym2index = {SOS_SYM : 0, EOS_SYM : 1, UNK_SYM : 2, PAD_SYM : 3}
        self.symbols = list(self.symbols)
        self.symbols.sort()

        for i, sym in enumerate(self.symbols):
            self.sym2index[sym] = i + 3
            self.index2sym[i+3] = sym
        
        self.num_tokens = len(self.index2sym.keys())
    
    def convert_to_numbers(self, word):
        enc = [self.sym2index[SOS_SYM]]
        for ch in word:
            if ch in self.sym2index.keys():
                enc.append(self.sym2index[ch])
            else:
                enc.append(self.sym2index[UNK_SYM])
        enc.append(self.sym2index[EOS_SYM])
        return enc

    def get_index(self, sym):
        return self.sym2index[sym]
    
    def get_size(self):
        return self.num_tokens

In [12]:
x_train, y_train = load_data(TARGET, 'train')
x_valid, y_valid = load_data(TARGET, 'valid')
x_test, y_test = load_data(TARGET, 'test')

print(f'Number of train samples = {len(x_train)}')
print(f'Number of valid samples = {len(x_valid)}')
print(f'Number of test samples = {len(x_test)}')

Number of train samples = 51200
Number of valid samples = 4096
Number of test samples = 4096


In [22]:
# create language objects for storing vocabulary, index2sym and sym2index
SRC_LANG = Language(SOURCE)
TAR_LANG = Language(TARGET)

# creating vocabulary using train data only
SRC_LANG.create_vocabulary(*(x_train))
TAR_LANG.create_vocabulary(*(y_train))

# otherwise, use unicode characters (assigned codepoints) in the script's range
# src_lang.create_vocabulary_range()
# tar_lang.create_vocabulary_range()

# generate mappings from characters to numbers and vice versa
SRC_LANG.generate_mappings()
TAR_LANG.generate_mappings()

print(f'Source Vocabulary Size = {len(SRC_LANG.symbols)}')
print(f'Source Vocabulary = {SRC_LANG.symbols}')
print(f'Source Mapping {SRC_LANG.index2sym}')
print(f'Target Vocabulary Size = {len(TAR_LANG.symbols)}')
print(f'Target Vocabulary = {TAR_LANG.symbols}')
print(f'Target Mapping {TAR_LANG.index2sym}')

Source Vocabulary Size = 26
Source Vocabulary = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z']
Source Mapping {0: '@', 1: '$', 2: 'a', 3: 'b', 4: 'c', 5: 'd', 6: 'e', 7: 'f', 8: 'g', 9: 'h', 10: 'i', 11: 'j', 12: 'k', 13: 'l', 14: 'm', 15: 'n', 16: 'o', 17: 'p', 18: 'q', 19: 'r', 20: 's', 21: 't', 22: 'u', 23: 'v', 24: 'w', 25: 'x', 26: 'y', 27: 'z'}
Target Vocabulary Size = 72
Target Vocabulary = ['ஂ', 'ஃ', 'அ', 'ஆ', 'இ', 'ஈ', 'உ', 'ஊ', 'எ', 'ஏ', 'ஐ', 'ஒ', 'ஓ', 'ஔ', 'க', 'ங', 'ச', 'ஜ', 'ஞ', 'ட', 'ண', 'த', 'ந', 'ன', 'ப', 'ம', 'ய', 'ர', 'ற', 'ல', 'ள', 'ழ', 'வ', 'ஶ', 'ஷ', 'ஸ', 'ஹ', 'ா', 'ி', 'ீ', 'ு', 'ூ', 'ெ', 'ே', 'ை', 'ொ', 'ோ', 'ௌ', '்', 'ௐ', 'ௗ', '௦', '௧', '௨', '௩', '௪', '௫', '௬', '௭', '௮', '௯', '௰', '௱', '௲', '௳', '௴', '௵', '௶', '௷', '௸', '௹', '௺']
Target Mapping {0: '@', 1: '$', 2: 'ஂ', 3: 'ஃ', 4: 'அ', 5: 'ஆ', 6: 'இ', 7: 'ஈ', 8: 'உ', 9: 'ஊ', 10: 'எ', 11: 'ஏ', 12: 'ஐ', 13: 'ஒ', 14: 'ஓ', 15: 'ஔ', 16: 

In [None]:
class TransliterateDataset(Dataset):
    def __init__(self, x_data, y_data, src_lang : Language, tar_lang : Language):
        self.x_data = x_data
        self.y_data = y_data
        self.src_lang = src_lang
        self.tar_lang = tar_lang
        
    def __len__(self):
        return len(self.y_data)

    def __getitem__(self, idx):
        x, y = self.x_data[idx], self.y_data[idx]
        x = self.src_lang.convert_to_numbers(x)
        y = self.tar_lang.convert_to_numbers(y) 
        return torch.Tensor(x), torch.Tensor(y)

class CollationFunction:
    def __init__(self, src_lang : Language, tar_lang : Language):
        self.src_lang = src_lang
        self.tar_lang = tar_lang
    
    def __call__(self, batch):
        # sorting is to save encoder computation. 
        # reasoning : https://stackoverflow.com/questions/51030782/why-do-we-pack-the-sequences-in-pytorch
        batch = batch.sort(key = lambda x : len(x[0]))
        src, tar = zip(*batch)
        src_lens = torch.tensor([len(x) for x in src])
        src = nn.utils.rnn.pad_sequence(list(src), batch_first=True, padding_value=src_lang.get_index(PAD_SYM))
        tar = nn.utils.rnn.pad_sequence(list(tar), batch_first=True, padding_value=tar_lang.get_index(PAD_SYM))
        return src, tar, src_lens

## Encoder Network

In [None]:
class EncoderNet(nn.Module):
    def __init__(self, vocab_size, embed_size, num_layers, hid_size, cell_type, 
                 bidirect=False, dropout=0):
        super(EncoderNet, self).__init__()
        self.hidden_size = hid_size
        self.embed_size = embed_size
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.dropout = nn.Dropout(p=dropout)
        self.num_layers = num_layers

        # we create the required architecture using the received parameters
        if cell_type == 'RNN':
            self.network = nn.RNN(input_size=embed_size, hidden_size=hid_size, num_layers=num_layers, 
                               dropout=dropout, bidirectional=bidirect, batch_first=True)
        elif cell_type == 'LSTM':
            self.network = nn.LSTM(input_size=embed_size, hidden_size=hid_size, num_layers=num_layers, 
                               dropout=dropout, bidirectional=bidirect, batch_first=True)
        else:
            self.network = nn.GRU(input_size=embed_size, hidden_size=hid_size, num_layers=num_layers, 
                               dropout=dropout, bidirectional=bidirect, batch_first=True)
        
        self.cell_type = cell_type
        self.bidirect = bidirect

        # for combining the final layer's forward and reverse directions' final hidden state
        if (self.bidirect):
            self.combine_forward_backward = nn.Linear(2 * hid_size, hid_size)

        self.init_hidden_state()

    def init_states(self):
        num_dir = 2 if self.bidirect else 1
        self.init_hidden_state = torch.zeros(num_dir * self.num_layers, self.hidden_size,requires_grad=True)

        if self.cell_type == 'LSTM':
            self.init_cell_state = torch.zeros(num_dir * self.num_layers, self.hidden_size, requires_grad=True)

    def forward(self, batch_x, batch_lens):
        batch_x = self.embedding(batch_x)
        batch_x = self.dropout(batch_x)

        ## IMPORTANT - Try sorted=False as well here
        packed_batch_x = nn.utils.rnn.pack_padded_sequence(batch_x, lengths=batch_lens, batch_first=True, 
                                                           enforce_sorted=True)

        hidden_states_stack = torch.stack([self.init_hidden_state for _ in range(self.num_layers)], dim=1)
        if self.cell_type == 'LSTM':
            cell_states_stack = torch.stack([self.init_cell_state for _ in range(self.num_layers)], dim=1)
            packed_outputs, (hidden_outputs, cell_outputs) = self.network(packed_batch_x, hidden_states_stack, cell_states_stack)
        else:
            packed_outputs, hidden_outputs = self.network(packed_batch_x, hidden_states_stack)
        
        outputs, output_lengths = nn.utils.rnn.pad_packed_sequence(packed_outputs, batch_first=True)

        if self.bidirect:
            # remember 2nd dim in hidden_outputs; so, we have to concatenate forward and backward
            # final hidden states along **1st dimension**
            concat_hidden_state = torch.cat((hidden_outputs[-2,:,:], hidden_outputs[-1,:,:]), dim=1)
            hidden_state = self.combine_forward_backward(concat_hidden_state)
            hidden_state = torch.tanh(hidden_state)
        # hidden_state = (batch_size, hid_size); outputs = (batch_size, max_seq_len_batch, D * hid_size)
        # d = 2 if bidirectional; else d = 1
        return outputs, hidden_state

## Decoder Network


In [None]:
class DecoderNet(nn.Module):
    def __init__(self, vocab_size, embed_size, num_layers, hid_size, cell_type, 
                 dropout=0):
        super(EncoderNet, self).__init__()
        self.hidden_size = hid_size
        self.embed_size = embed_size
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.dropout = nn.Dropout(p=dropout)
        self.num_layers = num_layers
        self.vocab_size = vocab_size

        # we create the required architecture using the received parameters
        if cell_type == 'RNN':
            self.network = nn.RNN(input_size=embed_size, hidden_size=hid_size, num_layers=num_layers, 
                               dropout=dropout, batch_first=True)
        elif cell_type == 'LSTM':
            self.network = nn.LSTM(input_size=embed_size, hidden_size=hid_size, num_layers=num_layers, 
                               dropout=dropout, batch_first=True)
        else:
            self.network = nn.GRU(input_size=embed_size, hidden_size=hid_size, num_layers=num_layers, 
                               dropout=dropout, batch_first=True)
        
        self.cell_type = cell_type
        self.out_layer = nn.Linear(hid_size, vocab_size)

    # will always go 1 step forward in time (seqlen = L = 1)
    # previous decoder state shape = [num_layers, batch_size, hid_size]
    def forward(self, batch_y, prev_decoder_state):
        batch_y = batch_y.unsqueeze(1) # batch_size is first dim
        batch_y = self.embedding(batch_y)
        batch_y = self.dropout(batch_y)

        if self.cell_type == 'LSTM':
            outputs, (decoder_hidden_state, _) = self.network(batch_y, prev_decoder_state)
        else:
            outputs, decoder_hidden_state = self.network(batch_y, prev_decoder_state)
        
        outputs = outputs.squeeze(1) # remove seqlen dimension
        logits = self.out_layer(outputs)
        return logits, decoder_hidden_state

## Seq2Seq(Encoder-Decoder) Model

In [None]:
# explore changing teacher forcing ratio to something epoch-based as sir suggested
class EncoderDecoder(nn.Module):
    def __init__(self, encoder :EncoderNet, decoder : DecoderNet, src_lang, tar_lang, tf_ratio) -> None:
        super(EncoderDecoder, self).__init__()
        self.enc_model = encoder
        self.dec_model = decoder
        self.src_lang = src_lang
        self.tar_lang = tar_lang
        self.tf_ratio = tf_ratio
    
    def forward(self, batch_X, X_lens, batch_Y):
        batch_size = batch_X.size(0)
        _, final_hidden_states = self.enc_model.forward(batch_X, X_lens)
        
        num_dec_layers = self.dec_model.num_layers
        hidden_dec = torch.stack([final_hidden_states for _ in num_dec_layers], dim=0)
        # we will feed the encoder output into each decoder layer's initial hidden state
        # hidden_dec = (num_layers, batch_size, hid_size)

        tarlength = batch_Y.size(1)
        outlogits = torch.zeros(batch_size, tarlength, self.dec_model.vocab_size).to(device)
        dec_input = batch_Y[:,0]

        for tstep in range(1, tarlength):
            curlogits, hidden_dec = self.dec_model.forward(dec_input, hidden_dec)
            dec_input = batch_Y[:, tstep]
            pred = torch.argmax(curlogits, dim=1)
            rand_numbers = torch.randn(batch_size)
            dec_input = torch.where(rand_numbers > self.tf_ratio, dec_input, pred)
            outlogits[:, tstep, :] = curlogits 

        return outlogits  

## Train/Evaluation/Inference Class

In [None]:
class Runner():
    def __init__(self, src_lang : Language, tar_lang : Language, common_embed_size, num_enc_layers, num_dec_layers, 
                 common_hidden_size, common_cell_type, enc_bidirect, dropout, tf_ratio):
        
        encoder = EncoderNet(vocab_size=src_lang.get_size(), embed_size=common_embed_size,
                             num_layers=num_enc_layers, hid_size=common_hidden_size,
                             cell_type= common_cell_type, bidirect=enc_bidirect, dropout=dropout)
        decoder = DecoderNet(vocab_size=tar_lang.get_size(), embed_size=common_embed_size,
                             num_layers=num_dec_layers, hid_size=common_hidden_size,
                             cell_type=common_cell_type, dropout=dropout)
        model = EncoderDecoder(encoder=encoder, decoder=decoder, src_lang=src_lang, tar_lang=tar_lang,
                               tf_ratio=0.6)
        model.to(device)
        # for reproducibility - seed everything with 42
        torch.manual_seed(42); torch.cuda.manual_seed(42); np.random.seed(42); random.seed(42)
        model.apply(self.init_weights) # initialize model weights

    @staticmethod
    def init_weights(model : nn.Module, a=-0.08, b=0.08):
        for _, param in model.named_parameters():
            nn.init.uniform_(param.data, -0.0)
    
    def generate_data_loaders(self, train=True, data_X, data_y, batch_size):
        dataset = TransliterateDataset(data_X, data_y, src_lang=SRC_LANG, tar_lang=TAR_LANG)
        dataloader = DataLoader(dataset=dataset, batch_size=batch_size,
                                shuffle=True, collate_fn=CollationFunction(SRC_LANG, TAR_LANG))
        return dataloader

    def train(self, ):
        pass
    def evaluate(self, ):
        pass
    def inference(self, ):
        pass

    def beam_search_inference(self, ):
        pass