## Custom Tokenizer

In [131]:
import copy
import pickle
import torch
class CharVocab:
  def __init__(self):
      # English
      chars = '0123456789'+ \
        '०१२३४५६७८९' + \
        'abcdefghijklmnopqrstuvwxyz' + \
        'ँंॉॆॊॏऺऻॎःािीुूेैोौअआइईउऊएऐओऔकखगघचछजझटठडढणतथदधनपफबभमयरलवशषसहज्ञक्षश्रज़रफ़ड़ढ़ख़क़ग़ळृृ़़ऑ' # + \
        # 'äöüß' + \
        # 'àâæçéèêëîïôœùûüÿ'

      self.len = 0
      # Codes
      self.special_tokens = {'<pad>': 0, '<eos>': 1, '<unk>': 2, '<sos>': 3}

      # Language Tags
      self.lang_tags = {'<en>':4, '<hi>':5, '<de>':6, '<fr>':7 }

      # Alphabets
      self.alpha = chars

      # Vocabulary
      self.vocab = self.build()

  def info(self):
    return {
        'special_tokens':self.special_tokens,
        'lang_tags':self.lang_tags,
        'alpha':self.alpha
    }
  def __len__(self):
    return self.len

  def build(self):
    class Vocab:
      def __init__(self, ctoi, itoc):
        self.ctoi = ctoi
        self.itoc = itoc

    ctoi = {}
    self.len = 0
    for k in self.special_tokens:
      ctoi[k] = self.special_tokens[k]
      self.len += 1
    for i, k in enumerate(self.lang_tags.keys(), len(ctoi)):
      ctoi[k] = i
      self.len += 1
    for i, k in enumerate(self.alpha, len(ctoi)):
      ctoi[k] = i
      self.len += 1
    itoc = {v:k for k,v in ctoi.items()}

    return Vocab(ctoi, itoc)

  def encode(self, inp, src_lang=None, tgt_lang=None):
    if not src_lang and not tgt_lang:
      raise Exception('Language not specified')
    inp_tokens = []
    if isinstance(inp, str):
      inp = inp.strip().split()
      prefix = [self.lang_tags[src_lang], self.lang_tags[tgt_lang]] # self.lang_tags[tgt_lang]
      for word in inp:
        tokens = []
        tokens.extend(list(word))
        tokens = [self.vocab.ctoi[char] if char in self.vocab.ctoi else self.vocab.ctoi['<unk>'] for char in tokens]
        inp_tokens.append(prefix+tokens)
    elif isinstance(inp, list) and isinstance(inp[0], tuple):
      src_word, tgt_word = zip(*inp)
      src_prefix = [self.lang_tags[src_lang], self.lang_tags[tgt_lang]] # self.lang_tags[tgt_lang]
      src_tokens = []
      for w in src_word:
        tokens = []
        tokens.extend(list(w))
        tokens = [self.vocab.ctoi[char] if char in self.vocab.ctoi else self.vocab.ctoi['<unk>'] for char in tokens]
        src_tokens.append(src_prefix+tokens)

      tgt_tokens = []
      tgt_prefix = [self.special_tokens['<sos>'], self.lang_tags[tgt_lang]] # self.lang_tags[tgt_lang]
      for w in tgt_word:
        tokens = []
        tokens.extend(list(w))
        tokens = [self.vocab.ctoi[char] if char in self.vocab.ctoi else self.vocab.ctoi['<unk>'] for char in tokens]
        tokens.append(self.special_tokens['<eos>'])
        tgt_tokens.append(tgt_prefix+tokens)
      inp_tokens = (src_tokens, tgt_tokens)
    return inp_tokens

  def decode(self, ids):
    if isinstance(ids, torch.Tensor):
      ids = ids.tolist()
      
    if isinstance(ids, list):
      decoded_word = []
      for id in ids:
        if id not in self.special_tokens.values() and id not in self.lang_tags.values():
          if id in self.vocab.itoc.keys():
            decoded_word.append(self.vocab.itoc[id])
          # else:
          #   decoded_word.append(self.vocab.itoc[2])
    return ''.join(decoded_word)

# encode : 'hello world' '<en>' '<hi>' -> [[4,5,...], [4,5...]]
# encode : [("hello", "हेलो")] '<en>' '<hi>' -> [[4,5,...], [3,5,...,1]]
# decode : [3,4,...] -> 'word'

# Test 1
inp = "hello world"
vocab = CharVocab()
encoded = vocab.encode(inp, src_lang='<en>', tgt_lang='<hi>')
print(f"Original: {inp}")
print(f"Encoded: {encoded}")
print("Decoded: ",end='')
for word_ids in encoded:
  print(vocab.decode(word_ids), end=', ')
print()

# Test 2
pairs = [("hello", "हेलो"), ("computer", "कम्प्यूटर"), ("mobile", "मोबाइल"), ("doctor", "डॉक्टर"), ("school", "स्कूल"), ("radio", "रेडियो"),
 ("hello", "bonjour"),("computer", "ordinateur"),("mobile", "mobile"),("doctor", "docteur"),("school", "école"),("radio", "radio"),
  ("hello", "hallo"),("computer", "computer"),("mobile", "handy"),("doctor", "arzt"),("school", "schule"),("radio", "radio")]

print(f"Original: {pairs}")
src_encoded, tgt_encoded = vocab.encode(pairs, src_lang='<en>', tgt_lang='<hi>')
print(f"Source Encoded: {src_encoded}")
print("Decoded: ",end='')
for i in src_encoded:
  print(vocab.decode(i), end=', ')
print()
print(f"Target Encoded: {tgt_encoded}")
print("Decoded: ",end='')
for i in tgt_encoded:
  print(vocab.decode(i), end=', ')
print()

Original: hello world
Encoded: [[4, 5, 35, 32, 39, 39, 42], [4, 5, 50, 42, 45, 39, 31]]
Decoded: hello, world, 
Original: [('hello', 'हेलो'), ('computer', 'कम्प्यूटर'), ('mobile', 'मोबाइल'), ('doctor', 'डॉक्टर'), ('school', 'स्कूल'), ('radio', 'रेडियो'), ('hello', 'bonjour'), ('computer', 'ordinateur'), ('mobile', 'mobile'), ('doctor', 'docteur'), ('school', 'école'), ('radio', 'radio'), ('hello', 'hallo'), ('computer', 'computer'), ('mobile', 'handy'), ('doctor', 'arzt'), ('school', 'schule'), ('radio', 'radio')]
Source Encoded: [[4, 5, 35, 32, 39, 39, 42], [4, 5, 30, 42, 40, 43, 48, 47, 32, 45], [4, 5, 40, 42, 29, 36, 39, 32], [4, 5, 31, 42, 30, 47, 42, 45], [4, 5, 46, 30, 35, 42, 42, 39], [4, 5, 45, 28, 31, 36, 42], [4, 5, 35, 32, 39, 39, 42], [4, 5, 30, 42, 40, 43, 48, 47, 32, 45], [4, 5, 40, 42, 29, 36, 39, 32], [4, 5, 31, 42, 30, 47, 42, 45], [4, 5, 46, 30, 35, 42, 42, 39], [4, 5, 45, 28, 31, 36, 42], [4, 5, 35, 32, 39, 39, 42], [4, 5, 30, 42, 40, 43, 48, 47, 32, 45], [4, 5, 40, 

In [132]:
dummy = CharVocab()
print(dummy.vocab.ctoi)
print(dummy.vocab.itoc)
len(dummy)

{'<pad>': 0, '<eos>': 1, '<unk>': 2, '<sos>': 3, '<en>': 4, '<hi>': 5, '<de>': 6, '<fr>': 7, '0': 8, '1': 9, '2': 10, '3': 11, '4': 12, '5': 13, '6': 14, '7': 15, '8': 16, '9': 17, '०': 18, '१': 19, '२': 20, '३': 21, '४': 22, '५': 23, '६': 24, '७': 25, '८': 26, '९': 27, 'a': 28, 'b': 29, 'c': 30, 'd': 31, 'e': 32, 'f': 33, 'g': 34, 'h': 35, 'i': 36, 'j': 37, 'k': 38, 'l': 39, 'm': 40, 'n': 41, 'o': 42, 'p': 43, 'q': 44, 'r': 45, 's': 46, 't': 47, 'u': 48, 'v': 49, 'w': 50, 'x': 51, 'y': 52, 'z': 53, 'ँ': 54, 'ं': 55, 'ॉ': 56, 'ॆ': 57, 'ॊ': 58, 'ॏ': 59, 'ऺ': 60, 'ऻ': 61, 'ॎ': 62, 'ः': 63, 'ा': 64, 'ि': 65, 'ी': 66, 'ु': 67, 'ू': 68, 'े': 69, 'ै': 70, 'ो': 71, 'ौ': 72, 'अ': 73, 'आ': 74, 'इ': 75, 'ई': 76, 'उ': 77, 'ऊ': 78, 'ए': 79, 'ऐ': 80, 'ओ': 81, 'औ': 82, 'क': 117, 'ख': 84, 'ग': 85, 'घ': 86, 'च': 87, 'छ': 88, 'ज': 114, 'झ': 90, 'ट': 91, 'ठ': 92, 'ड': 93, 'ढ': 94, 'ण': 95, 'त': 96, 'थ': 97, 'द': 98, 'ध': 99, 'न': 100, 'प': 101, 'फ': 102, 'ब': 103, 'भ': 104, 'म': 105, 'य': 106, 'र': 124,

137

In [62]:
import torch
from torch.utils.data import DataLoader

class TransliterationDataset:
  def __init__(self, pairs, tokenizer):
    self.pairs = pairs
    self.tokenizer = tokenizer
    self.padding_value = tokenizer.vocab.ctoi['<pad>']

  def __len__(self):
    return len(self.pairs)

  def __getitem__(self, idx):
    pair, lang_pair = self.pairs[idx][:2], self.pairs[idx][2]
    src_ids, tgt_ids = self.tokenizer.encode([pair], src_lang=lang_pair[0], tgt_lang=lang_pair[1])
    return torch.tensor(src_ids[0], dtype=torch.long), torch.tensor(tgt_ids[0], dtype=torch.long)

  def collate_fn(self, batch):
    src_batch, tgt_batch = zip(*batch)
    src_padded = torch.nn.utils.rnn.pad_sequence(src_batch, batch_first=True, padding_value=self.padding_value)
    tgt_padded = torch.nn.utils.rnn.pad_sequence(tgt_batch, batch_first=True, padding_value=self.padding_value)
    src_lengths = (src_padded != self.padding_value).sum(dim=1)
    return src_padded, tgt_padded, src_lengths

  def get_dataloader(self, batch_size, shuffle=True):
    return DataLoader(self, batch_size=batch_size, shuffle=shuffle, collate_fn=self.collate_fn)

# Test
pairs = [
    ("hello", "हेलो", ("<en>", "<hi>")), ("computer", "कम्प्यूटर", ("<en>", "<hi>")),
    ("mobile", "मोबाइल", ("<en>", "<hi>")), ("doctor", "डॉक्टर", ("<en>", "<hi>")),
    ("school", "स्कूल", ("<en>", "<hi>")), ("radio", "रेडियो", ("<en>", "<hi>")),

    ("hello", "bonjour", ("<en>", "<fr>")), ("computer", "ordinateur", ("<en>", "<fr>")),
    ("mobile", "mobile", ("<en>", "<fr>")), ("doctor", "docteur", ("<en>", "<fr>")),
    ("school", "école", ("<en>", "<fr>")), ("radio", "radio", ("<en>", "<fr>")),

    ("hello", "hallo", ("<en>", "<de>")), ("computer", "computer", ("<en>", "<de>")),
    ("mobile", "handy", ("<en>", "<de>")), ("doctor", "arzt", ("<en>", "<de>")),
    ("school", "schule", ("<en>", "<de>")), ("radio", "radio", ("<en>", "<de>"))
]

dataset = TransliterationDataset(pairs, tokenizer=vocab)
dataloader = dataset.get_dataloader(batch_size=2)

for src, tgt, src_len in dataloader:
  print(f"src: {src}")
  print(f"tgt: {tgt}")
  print(f"src_len: {src_len}")
  print(src.shape, tgt.shape, src_len)
  break

src: tensor([[ 4,  7, 40, 42, 29, 36, 39, 32],
        [ 4,  5, 45, 28, 31, 36, 42,  0]])
tgt: tensor([[  3,   7,  40,  42,  29,  36,  39,  32,   1],
        [  3,   5, 124,  69,  93,  65, 106,  71,   1]])
src_len: tensor([8, 7])
torch.Size([2, 8]) torch.Size([2, 9]) tensor([8, 7])


## Networks

In [63]:
import torch.nn as nn
import torch

In [64]:
class Encoder(nn.Module):
  def __init__(self,
      input_size, embedding_size, hidden_size,
      num_layers=1,
      dropout=0.01,
      bidirectional=False,
      arch = "gru",
      batch_first=True
    ):
    super(Encoder, self).__init__()
    self.input_size = input_size
    self.embedding_size = embedding_size
    self.hidden_size = hidden_size
    self.num_layers = num_layers
    self.dropout_rate = dropout
    self.directions = 2 if bidirectional else 1
    self.arch = arch
    self.batch_first = batch_first

    self.embedding = nn.Embedding(input_size, embedding_size)
    if self.arch == "lstm":
      RNN = nn.LSTM
    elif  self.arch == "gru":
      RNN = nn.GRU
    else:
      raise Exception("Invalid Architecture")
    self.rnn = RNN(
        input_size=embedding_size,
        hidden_size=hidden_size,
        num_layers=num_layers,
        dropout=dropout,
        bidirectional=bidirectional,
        batch_first=batch_first
    )
    self.fc = nn.Linear(self.hidden_size*self.directions, hidden_size)
    self.dropout = nn.Dropout(dropout)

  def forward(self, src_padded, src_lengths):
    embedded = self.dropout(self.embedding(src_padded))
    embedded = nn.utils.rnn.pack_padded_sequence(embedded, src_lengths, batch_first=True, enforce_sorted=False)
    # hidden: (h_n, c_n) if LSTM else (n_layer**num_directions, batch_size, hidden_dim)
    outputs, hidden = self.rnn(embedded)
    # output: (batch_size, max_length, hidden_dim*directions)
    outputs, _ = torch.nn.utils.rnn.pad_packed_sequence(outputs, batch_first=True)
    # output: (batch_size, max_length, hidden_dim)
    outputs = self.fc(outputs)
    outputs = outputs.permute(1, 0, 2)  # (max_length, batch_size, hidden_dim)
    return outputs, hidden
  

# Test
enc_vocab = CharVocab()
enc_pairs = [
  ("hello", "हेलो", ("<en>", "<hi>")), ("computer", "कम्प्यूटर", ("<en>", "<hi>")),
    ("mobile", "मोबाइल", ("<en>", "<hi>")), ("doctor", "डॉक्टर", ("<en>", "<hi>")),
    ("school", "स्कूल", ("<en>", "<hi>")), ("radio", "रेडियो", ("<en>", "<hi>"))
]
enc_dataset = TransliterationDataset(enc_pairs, vocab)
enc = Encoder(
      input_size = len(enc_vocab), embedding_size = 256, hidden_size = 512,
      num_layers=2,
      dropout=0.03,
      bidirectional=True,
      arch = "lstm",
      batch_first=True
)

device = torch.device('cuda' if torch.cuda.is_available() else "cpu")
print(f"Using {device}")
enc.to(device)

for src_padded, tgt_padded, src_len in enc_dataset.get_dataloader(batch_size=1):
  src_padded = src_padded.to(device)
  print(f"Input Shape: {src_padded.shape}, Input Len: {src_len.shape}, ")
  outputs, hidden = enc(src_padded, src_len)
  print(f"Output Shape: {outputs.shape}, Hidden State Shape: {hidden[0].shape}, Cell State Shape: {hidden[1].shape}")
  break


Using cuda
Input Shape: torch.Size([1, 8]), Input Len: torch.Size([1]), 
Output Shape: torch.Size([8, 1, 512]), Hidden State Shape: torch.Size([4, 1, 512]), Cell State Shape: torch.Size([4, 1, 512])


In [65]:
class LuongAttention(nn.Module):
    def __init__(self, method, hidden_size):
        super(LuongAttention, self).__init__()
        self.method = method
        self.hidden_size = hidden_size

        if self.method == 'general':
            self.attn = nn.Linear(self.hidden_size, hidden_size)
        elif self.method == 'concat':
            self.attn = nn.Linear(self.hidden_size * 2, hidden_size)
            self.v = nn.Parameter(torch.FloatTensor(1, hidden_size))

    def dot_score(self, hidden, encoder_output):
        return torch.sum(hidden * encoder_output, dim=2)

    def general_score(self, hidden, encoder_output):
        energy = self.attn(encoder_output)
        return torch.sum(hidden * energy, dim=2)

    def concat_score(self, hidden, encoder_output):
        hidden = hidden.repeat(encoder_output.size(0), 1, 1)
        energy = self.attn(torch.cat([hidden, encoder_output], 2)).tanh()
        return torch.sum(self.v * energy, dim=2)

    def forward(self, hidden, encoder_outputs):
        if self.method == 'general':
            attn_energies = self.general_score(hidden, encoder_outputs)
        elif self.method == 'concat':
            attn_energies = self.concat_score(hidden, encoder_outputs)
        elif self.method == 'dot':
            attn_energies = self.dot_score(hidden, encoder_outputs)

        attn_energies = attn_energies.t()
        #  attn_weights: shape: (batch_size, 1, seq_len)
        attn_weights = nn.functional.softmax(attn_energies, dim=1).unsqueeze(1)
        # context: shape: (batch_size, 1, hidden_size)
        context = torch.bmm(attn_weights, encoder_outputs.transpose(0, 1))
        return context, attn_weights
    
# Test


In [66]:
class Decoder(nn.Module):
  def __init__(self,
      vocab_size, embedding_size, hidden_size,
      num_layers=1,
      dropout=0.01,
      bidirectional=False,
      batch_first=True,
      arch = "lstm",
      attn = "dot",
    ):
    super(Decoder, self).__init__()
    self.input_size = vocab_size
    self.embedding_size = embedding_size
    self.hidden_size = hidden_size
    self.output_size = vocab_size
    self.num_layers = num_layers
    self.dropout_rate = dropout
    self.directions = 2 if bidirectional else 1
    self.batch_first = batch_first
    self.arch = arch
    self.attn_method = attn

    self.embedding = nn.Embedding(self.input_size, embedding_size)
    self.attention = LuongAttention(attn, hidden_size)
    if self.arch == "lstm":
      RNN = nn.LSTM
    elif  self.arch == "gru":
      RNN = nn.GRU
    else:
      raise Exception("Invalid Architecture")
    self.rnn = RNN(
        input_size=embedding_size+hidden_size,
        hidden_size=hidden_size,
        num_layers=num_layers,
        dropout=dropout,
        bidirectional=bidirectional,
        batch_first=batch_first
    )
    self.hid_attn = nn.Linear(self.num_layers * self.directions, 1)
    self.fc = nn.Sequential(
            nn.Linear(self.hidden_size*self.directions, self.embedding_size), nn.LeakyReLU(),
            nn.Linear(self.embedding_size, self.output_size),
    )
    self.dropout = nn.Dropout(dropout)

  def forward(self, tgt_padded, hidden, enc_out):

    if hidden is None:
       h_0 = torch.zeros(
          self.num_layers*self.directions,
          tgt_padded.size(0),
          self.hidden_size,
          device=tgt_padded.device
       )
       if self.arch =="lstm":
          c_0 = torch.zeros_like(h_0)
          hidden = (h_0, c_0)
       else:
          hidden = h_0

    if self.arch == "lstm" and isinstance(hidden,tuple):
      hidden_ = hidden[0]
    else:
      hidden_ = hidden

    # [L*D, B, H]
    hidden_ = hidden_.permute(1, 2, 0)           # [B, H, L*D]
    attn_inp = self.hid_attn(hidden_)            # [B, H, 1]
    attn_inp = attn_inp.permute(2, 0, 1)         # [1, B, H]

    # print(f"Eo: {enc_out.shape}, Attin: {attn_inp.shape}")
    context, attn_weights = self.attention(attn_inp, enc_out)
    embedded = self.dropout(self.embedding(tgt_padded))
    context = context.repeat(1, embedded.size(1), 1)
    rnn_input = torch.cat([embedded, context], dim=2)
    # hidden: (h_n, c_n) if LSTM else (n_layer**num_directions, batch_size, hidden_dim)
    outputs, hidden = self.rnn(rnn_input, hidden)
    output = self.fc(outputs)
    return output, hidden
  

# Test
test_vocab = CharVocab()
test_pairs = [
  ("hello", "हेलो", ("<en>", "<hi>")), ("computer", "कम्प्यूटर", ("<en>", "<hi>")),
    ("mobile", "मोबाइल", ("<en>", "<hi>")), ("doctor", "डॉक्टर", ("<en>", "<hi>")),
    ("school", "स्कूल", ("<en>", "<hi>")), ("radio", "रेडियो", ("<en>", "<hi>"))
]
test_dataset = TransliterationDataset(test_pairs, test_vocab)
enc = Encoder(
      input_size = len(test_vocab), embedding_size = 256, hidden_size = 512,
      num_layers=2,
      dropout=0.03,
      bidirectional=True,
      arch = "lstm",
      batch_first=True
)

device = 'cuda' if torch.cuda.is_available() else "cpu"
print(f"Using {device}")
enc.to(device)

dec = Decoder(
  vocab_size = len(test_vocab), embedding_size=256, hidden_size=512,
      num_layers=2,
      dropout=0.01,
      bidirectional=True,
      batch_first=True,
      arch = "lstm",
      attn = "dot",
)
dec.to(device)
for src_padded, tgt_padded, src_len in test_dataset.get_dataloader(batch_size=4):
  src_padded = src_padded.to(device)
  print(f"Input Shape: {src_padded.shape}, Input Len: {src_len.shape}, ")
  outputs, hidden = enc(src_padded, src_len)
  print(f"Output Shape: {outputs.shape}, Hidden State Shape: {hidden[0].shape}, Cell State Shape: {hidden[1].shape}")
  dec_in = tgt_padded[:, :1].to(device)
  prediction, hidden = dec(dec_in, hidden, outputs)
  print(f"Prediction Shape: {prediction.shape}, Hidden State Shape: {hidden[0].shape}, Cell State Shape: {hidden[1].shape}")
  break
  

Using cuda
Input Shape: torch.Size([4, 10]), Input Len: torch.Size([4]), 
Output Shape: torch.Size([10, 4, 512]), Hidden State Shape: torch.Size([4, 4, 512]), Cell State Shape: torch.Size([4, 4, 512])
Prediction Shape: torch.Size([4, 1, 137]), Hidden State Shape: torch.Size([4, 4, 512]), Cell State Shape: torch.Size([4, 4, 512])


In [150]:
import re
class TransliterationModel(nn.Module):
    def __init__(self, enc_class, dec_class, vocab_sz, embed_sz=256, hidden_sz=512, num_layers=2, bidir=True, dropout=0.03, arch='lstm', attn='dot'):
        super(TransliterationModel, self).__init__()
        self.batch_first = True
        self.dirs = 2 if bidir else 1
        self.attn = attn
        self.arch = arch
        self.dropout = dropout
        self.enc = enc_class(
            input_size = vocab_sz, embedding_size = embed_sz, hidden_size = hidden_sz,
            num_layers=num_layers,
            dropout=dropout,
            bidirectional=bidir,
            arch = arch,
            batch_first=True
            )
        self.dec = dec_class(
            vocab_size = vocab_sz, embedding_size=embed_sz, hidden_size=hidden_sz,
            num_layers=num_layers,
            dropout=dropout,
            bidirectional=bidir,
            batch_first=True,
            arch = arch,
            attn = attn,
        )
    
    def forward(self, src, src_len, tgt, tfr=0.7):
        tgt_len = tgt.shape[1]
        outputs = []
        _enc_outs, _enc_hidd = self.enc(src, src_len)
        _dec_hidd = _enc_hidd
        _dec_in = tgt[:,:1]

        for t in range(tgt_len-1):
            pred, _dec_hidd = self.dec(_dec_in, _dec_hidd, _enc_outs)
            pred = pred[:, -1:, :] # [batch, 1, vocab]
            outputs.append(pred)

            use_teacher = torch.rand(1).item() < tfr
            if use_teacher:
                # use the target token as the next input
                _dec_in = tgt[:, t+1:t+2]
            else:
                # use the predicted token as the next input
                _dec_in = pred.argmax(dim=2)

        outputs = torch.cat(outputs, dim=1)
        return outputs
    
    def generate(self, input_text: str, vocab: CharVocab, src_lang, tgt_lang, device, maxlen=25)-> str:
        with torch.no_grad():
            outs = []
            pad_id = vocab.special_tokens['<pad>']
            input_text = input_text.strip().lower()
            input_text = re.sub(r'\s+', ' ', input_text)
            input_tokens = input_text.split()
            for token in input_tokens:
                encoded = vocab.encode(token, src_lang=src_lang, tgt_lang=tgt_lang)
                encoded = [torch.tensor(encoded[0], dtype=torch.long)]
                encoded_padded = torch.nn.utils.rnn.pad_sequence(encoded, batch_first=True, padding_value=pad_id).to(device)
                src_lengths = (encoded_padded != pad_id).sum(dim=1).cpu()
                # encoded_padded = torch.Tensor(encoded_padded, dtype=torch.long).to(device)
                _enc_outs, _enc_hidd = self.enc(encoded_padded, src_lengths)
                _dec_hidd = _enc_hidd
                
                _dec_in = torch.full((1, 1), vocab.special_tokens['<sos>'], dtype=torch.long, device=device)
                pred_ids = [_dec_in]
                for m in range(maxlen):
                    pred, _dec_hidd = self.dec(_dec_in, _dec_hidd, _enc_outs)
                    pred = pred[:, -1:, :].argmax(dim=-1)  # [batch, seq_len, vocab] -> [batch, 1, vocab] - > [batch]
                    next_token = pred[0][0].item()
                    pred_ids.append(next_token)
                    _dec_in = torch.full((1, 1), next_token , dtype=torch.long, device=device)
                    
                    if next_token == vocab.special_tokens['<eos>']:
                        break
                # pred_ids = torch.cat(pred_ids, dim=1)
                # pred_ids = pred_ids.tolist()
                outs.append(vocab.decode(pred_ids))

            return ' '.join(outs)
                     
    
# Test
test_vocab = CharVocab()
test_pairs = [
  ("hello", "हेलो", ("<en>", "<hi>")), ("computer", "कम्प्यूटर", ("<en>", "<hi>")),
    ("mobile", "मोबाइल", ("<en>", "<hi>")), ("doctor", "डॉक्टर", ("<en>", "<hi>")),
    ("school", "स्कूल", ("<en>", "<hi>")), ("radio", "रेडियो", ("<en>", "<hi>"))
]
test_dataset = TransliterationDataset(test_pairs, test_vocab)

device = 'cuda' if torch.cuda.is_available() else "cpu"
print(f"Using {device}")

test_model = TransliterationModel(Encoder, Decoder, vocab_sz=len(test_vocab))
test_model.to(device)

test_model.eval()
# Test 1
for src_padded, tgt_padded, src_len in test_dataset.get_dataloader(batch_size=64):
  src_padded, tgt_padded = src_padded.to(device), tgt_padded.to(device)
  print(f"Input Shape: {src_padded.shape}, Input Len: {src_len.shape}")
  outputs = test_model(src_padded, src_len, tgt_padded)
  print(f"Output Shape: {outputs.shape}")
  break

print('Done')

# Test 2
test_input = "Hy this is test case."
test_output = test_model.generate(test_input, vocab=test_vocab, src_lang='<en>', tgt_lang='<hi>', device=device, maxlen=10)
print(f"Test Input: {test_input}") 
print(f"Test Output: {test_output}")

Using cuda
Input Shape: torch.Size([6, 10]), Input Len: torch.Size([6])
Output Shape: torch.Size([6, 11, 137])
Done
Test Input: Hy this is test case.
Test Output: ंःःःःःzzzछ ः8औछछछ8छछछ ः8छछछछछछछछ ः8औछछछछछछछ छछछछz8छछछछ


## Train

## Training

In [68]:
import json
import pandas as pd

with open("hin_train.json", "r") as f:
  hin_train = [json.loads(line) for line in f]
with open("hin_valid.json", "r") as f:
  hin_valid = [json.loads(line) for line in f]

# Convert train split to pandas DataFrame
train_ = pd.DataFrame(hin_train)
valid_ = pd.DataFrame(hin_valid)

In [None]:
en_hin_train = [(row["english word"], row["native word"], ("<en>", "<hi>")) for _, row in train_.iterrows()]
en_hin_valid = [(row["english word"], row["native word"], ("<en>", "<hi>")) for _, row in valid_.iterrows()]
# hin_en_train = [(row["native word"],row["english word"], ("<hi>", "<en>")) for _, row in train_.iterrows()]
# hin_en_valid = [(row["native word"],row["english word"], ("<hi>", "<en>")) for _, row in train_.iterrows()]

# train_pairs = en_hin_train + hin_en_train
# valid_pairs = en_hin_valid + hin_en_valid

# comment bellow line and uncomment other lines for bi directional
train_pairs = en_hin_train
valid_pairs = en_hin_valid

In [70]:
# import pickle

# with open("train_pairs.pkl", 'wb') as f:
#     pickle.dump(train_pairs, f)

# with open("valid_pairs.pkl", 'wb') as f:
#     pickle.dump(valid_pairs, f)

In [71]:
# import pickle

# with open("train_pairs.pkl", 'rb') as f:
#     train_pairs = pickle.load(f)

# with open("valid_pairs.pkl", 'rb') as f:
#     valid_pairs = pickle.load(f)

In [None]:
vocab = CharVocab()
train_df = TransliterationDataset(train_pairs[:32000], vocab)
train_loader = train_df.get_dataloader(batch_size=128)

val_df = TransliterationDataset(valid_pairs, vocab)
eval_loader = val_df.get_dataloader(batch_size=128)

device = 'cuda' if torch.cuda.is_available() else "cpu"
print(f"Using {device}")

main_model = TransliterationModel(Encoder, Decoder, vocab_sz=len(vocab))
main_model.to(device)

criterion = nn.CrossEntropyLoss(ignore_index=vocab.special_tokens['<pad>'])
optimizer = torch.optim.AdamW(main_model.parameters(), lr=1e-3, weight_decay=5e-3)

epochs = 1

Using cuda


In [117]:
import tqdm
def train(model, trainloader, optimizer, criterion, device, epochs=1, evaloader=None):
    train_loss = 0
    for e in range(epochs):
        epoch_loss = 0
        model.train()
        for src_tagged, tgt_tagged, src_len in tqdm.tqdm(trainloader, desc="Training"):
            src_padded = src_tagged.to(device)
            tgt_padded = tgt_tagged.to(device)
            optimizer.zero_grad()
            output = model(src_padded, src_len, tgt_padded)
            # compute the loss: compare 3D logits to 2D targets
            loss = criterion(output.view(-1, output.shape[-1]), tgt_padded[:, 1:].reshape(-1))
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()
        if e < epochs-1:
          print(f"Epoch {e+1}/{epochs} completed with epoch loss of {epoch_loss/len(trainloader):.4f}")
        train_loss += epoch_loss/len(trainloader)
        # if (e+1)%5 != 0 or not evaloader:
        #     continue
        eval_loss = 0
        model.eval()
        for src_tagged, tgt_tagged, src_len in tqdm.tqdm(evaloader, desc="Validation"):
            src_padded = src_tagged.to(device)
            tgt_padded = tgt_tagged.to(device)
            with torch.no_grad():
              output = model(src_padded, src_len, tgt_padded)
              # compute the loss: compare 3D logits to 2D targets
              loss = criterion(output.view(-1, output.shape[-1]), tgt_padded[:, 1:].reshape(-1))
              eval_loss += loss.item()
        print(f"Evaluation completed with epoch loss of {eval_loss/len(evaloader):.4f}")
        torch.save(main_model, f"en_hi_tlite_lstm2_{e+1}.pth")
    print(f"\nTraining completed with training loss of {train_loss/epochs:.4f}")

In [118]:
train(
    model=main_model, 
    trainloader=train_loader, 
    optimizer=optimizer, 
    criterion=criterion, 
    device=device, 
    epochs=epochs, 
    evaloader=eval_loader
)

Training: 100%|██████████| 1000/1000 [02:08<00:00,  7.76it/s]
Validation: 100%|██████████| 199/199 [00:04<00:00, 40.40it/s]


Evaluation completed with epoch loss of 0.7270

Training completed with training loss of 1.2725


In [121]:
torch.save(main_model.state_dict(), 'model_weights.pth')

## Predicting

In [151]:
def predict(model, tokenizer, input_seq, src_lang='<en>', tgt_lang='<hi>', device="cpu"):
    output = model.generate(input_seq, tokenizer, src_lang=src_lang, tgt_lang=tgt_lang, device=device)
    return output

In [170]:
inp_pred = "Hello World!"
pred_vocab = CharVocab()
model = TransliterationModel(Encoder, Decoder, vocab_sz=len(vocab))
model.load_state_dict(torch.load("en_hi_tlite_lstm2_1.pth"))
model.to(device)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(predict(model, pred_vocab, inp_pred, device=device))

हेलो वोर्लो
