In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random
from torch.utils.tensorboard import SummaryWriter  # to print to tensorboard
import torch.nn.functional as F
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [2]:
# path to the train, validation and test dataset

train_path = 'dakshina_dataset_v1.0\hi\lexicons\hi.translit.sampled.train.tsv'
val_path = 'dakshina_dataset_v1.0\hi\lexicons\hi.translit.sampled.dev.tsv'
test_path = 'dakshina_dataset_v1.0\hi\lexicons\hi.translit.sampled.test.tsv'

# creating the corpus and vectorizing the data

train_X = []
train_Y = []
input_corpus = set()
output_corpus = set()

with open(train_path, "r", encoding="utf-8") as f:
    lines = f.read().split("\n")
    
for line in lines[:len(lines) - 1]:
    target_text, input_text, _ = line.split("\t")
    #using "tab" as the "start sequence" character for the targets, and "\n" as "end sequence" character.
    target_text = "\t" + target_text + "\n"
    train_X.append(input_text)
    train_Y.append(target_text)
    for char in input_text:
        input_corpus.add(char)
    for char in target_text:
        output_corpus.add(char)

# ' ' is used to fill the empty spaces of shorter sequences
input_corpus.add(" ")
output_corpus.add(" ")
input_corpus = sorted(list(input_corpus))
output_corpus = sorted(list(output_corpus))
num_encoder_tokens = len(input_corpus)
num_decoder_tokens = len(output_corpus)
max_encoder_seq_length = max([len(txt) for txt in train_X])
max_decoder_seq_length = max([len(txt) for txt in train_Y])

val_X = []
val_Y = []
with open(val_path, "r", encoding="utf-8") as f:
    lines = f.read().split("\n")
    
for line in lines[:len(lines) - 1]:
    target_text, input_text, _ = line.split("\t")
    target_text = "\t" + target_text + "\n"
    val_X.append(input_text)
    val_Y.append(target_text)

In [3]:
print("Number of samples:", len(train_X))
print("Number of unique input tokens:", num_encoder_tokens)
print("Number of unique output tokens:", num_decoder_tokens)
print("Max sequence length for inputs:", max_encoder_seq_length)
print("Max sequence length for outputs:", max_decoder_seq_length)

Number of samples: 44204
Number of unique input tokens: 27
Number of unique output tokens: 66
Max sequence length for inputs: 20
Max sequence length for outputs: 21


In [4]:
input_char_index = dict([(char, i) for i, char in enumerate(input_corpus)])
output_char_index = dict([(char, i) for i, char in enumerate(output_corpus)])

input_data = np.zeros((max_encoder_seq_length,len(train_X)), dtype="int64")
target_data = np.zeros((max_decoder_seq_length,len(train_X)), dtype="int64")

for i, (x, y) in enumerate(zip(train_X, train_Y)):
    for t, char in enumerate(x):
        input_data[t, i] = input_char_index[char]
        
    input_data[t + 1 :,i] = input_char_index[" "]
    
    for t, char in enumerate(y):
        target_data[t, i] = output_char_index[char]
            
    target_data[t + 1 :,i] = output_char_index[" "]
    
input_data_val = np.zeros((max_encoder_seq_length,len(val_X)), dtype="int64")
target_data_val = np.zeros((max_decoder_seq_length,len(val_X)), dtype="int64")

for i, (x, y) in enumerate(zip(val_X, val_Y)):
    for t, char in enumerate(x):
        input_data_val[t, i] = input_char_index[char]
        
    input_data_val[t + 1 :,i] = input_char_index[" "]
    
    for t, char in enumerate(y):
        target_data_val[t, i] = output_char_index[char]
            
    target_data_val[t + 1 :,i] = output_char_index[" "]

In [5]:
# convertin numpy arrays to tensors
input_data = torch.tensor(input_data,dtype=torch.int64)
target_data = torch.tensor(target_data,dtype=torch.int64)
input_data_val = torch.tensor(input_data_val,dtype=torch.int64)
target_data_val = torch.tensor(target_data_val,dtype=torch.int64)

In [6]:
class Encoder(nn.Module):
    def __init__(self, input_size, embedding_size, hidden_size, num_layers, dropout):
        super(Encoder, self).__init__()
        self.dropout = nn.Dropout(dropout)
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        self.embedding = nn.Embedding(input_size, embedding_size)
        self.rnn = nn.LSTM(embedding_size, hidden_size, num_layers, dropout=dropout)

    def forward(self, x):
        # x shape: (seq_length, N) where N is batch size

        embedding = self.dropout(self.embedding(x))
        # embedding shape: (seq_length, N, embedding_size)

        outputs, (hidden, cell) = self.rnn(embedding)
        # outputs shape: (seq_length, N, hidden_size)

        return hidden, cell

In [7]:
class Decoder(nn.Module):
    def __init__(self, input_size, embedding_size, hidden_size, output_size, num_layers, dropout):
        super(Decoder, self).__init__()
        self.dropout = nn.Dropout(dropout)
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        self.embedding = nn.Embedding(input_size, embedding_size)
        self.rnn = nn.LSTM(embedding_size, hidden_size, num_layers, dropout=dropout)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x, hidden, cell):
        # x shape: (N) where N is for batch size, we want it to be (1, N), seq_length
        # is 1 here because we are sending in a single word and not a sentence
        x = x.unsqueeze(0)

        embedding = self.dropout(self.embedding(x))
        # embedding shape: (1, N, embedding_size)

        outputs, (hidden, cell) = self.rnn(embedding, (hidden, cell))
        # outputs shape: (1, N, hidden_size)

        predictions = self.fc(outputs)

        # predictions shape: (1, N, length_target_vocabulary) to send it to
        # loss function we want it to be (N, length_target_vocabulary) so we're
        # just gonna remove the first dim
        predictions = predictions.squeeze(0)

        return predictions, hidden, cell

In [8]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, source, target, teacher_force_ratio=0.5):
        batch_size = source.shape[1]
        target_len = target.shape[0]
        target_vocab_size = num_decoder_tokens

        outputs = torch.zeros(target_len, batch_size, target_vocab_size).to(device)

        hidden, cell = self.encoder(source)

        # Grab the first input to the Decoder which will be <SOS> token
        x = target[0]

        for t in range(1, target_len):
            # Use previous hidden, cell as context from encoder at start
            output, hidden, cell = self.decoder(x, hidden, cell)

            # Store next output prediction
            outputs[t] = output

            # Get the best word the Decoder predicted (index in the vocabulary)
            best_guess = output.argmax(1)
            x = target[t] if random.random() < teacher_force_ratio else best_guess

        return outputs

In [9]:
num_epochs = 100
learning_rate = 0.001
batch_size = 32

In [10]:
load_model = False
input_size_encoder = num_encoder_tokens
input_size_decoder = num_decoder_tokens
output_size = num_decoder_tokens
encoder_embedding_size = 64
decoder_embedding_size = 64
hidden_size = 128  # Needs to be the same for both RNN's
num_enc_layers = 3
num_dec_layers = 3
enc_dropout = 0.1
dec_dropout = 0.1
training = False

In [11]:
encoder_net = Encoder(
    input_size_encoder, encoder_embedding_size, hidden_size, num_enc_layers, enc_dropout
).to(device)

decoder_net = Decoder(
    input_size_decoder,
    decoder_embedding_size,
    hidden_size,
    output_size,
    num_dec_layers,
    dec_dropout,
).to(device)

model = Seq2Seq(encoder_net, decoder_net).to(device)

In [None]:
# this cell is only for training, not to be used now as we have saved the model
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
criterion = nn.CrossEntropyLoss()

train_ds_x = torch.split(input_data,batch_size,dim=1)
train_ds_y = torch.split(target_data,batch_size,dim=1)
input_data_val = input_data_val.to(device)
target_data_val = target_data_val.to(device)
target_val = target_data_val[1:].reshape(-1)

for epoch in range(num_epochs):
    print(f"[Epoch {epoch} / {num_epochs}]")

    model.eval()
    model.train()

    for i, (x,y) in enumerate(zip(train_ds_x,train_ds_y)):
        # Get input and targets and get to cuda
        inp_data = x.to(device)
        target = y.to(device)

        # Forward prop
        output = model(inp_data, target)

        # Output is of shape (trg_len, batch_size, output_dim) but Cross Entropy Loss
        # doesn't take input in that form. For example if we have MNIST we want to have
        # output to be: (N, 10) and targets just (N). Here we can view it in a similar
        # way that we have output_words * batch_size that we want to send in into
        # our cost function, so we need to do some reshapin. While we're at it
        # Let's also remove the start token while we're at it
        output = output[1:].reshape(-1, output.shape[2])
        target = target[1:].reshape(-1)

        optimizer.zero_grad()
        loss = criterion(output, target)

        # Back prop
        loss.backward()

        # Clip to avoid exploding gradient issues, makes sure grads are
        # within a healthy range
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1)

        # Gradient descent step
        optimizer.step()
        
        
torch.save(model.state_dict(),'models\model_pytorch_noAT_state.pt')
torch.save(model,'models\model_pytorch_noAT.pt')

In [13]:
model.load_state_dict(torch.load('models\model_pytorch_noAT_state.pt'))
model.eval()

Seq2Seq(
  (encoder): Encoder(
    (dropout): Dropout(p=0.1, inplace=False)
    (embedding): Embedding(27, 64)
    (rnn): LSTM(64, 128, num_layers=3, dropout=0.1)
  )
  (decoder): Decoder(
    (dropout): Dropout(p=0.1, inplace=False)
    (embedding): Embedding(66, 64)
    (rnn): LSTM(64, 128, num_layers=3, dropout=0.1)
    (fc): Linear(in_features=128, out_features=66, bias=True)
  )
)

In [14]:
# Reverse-lookup token index to decode sequences back to
# something readable.
reverse_input_char_index = dict((i, char) for char, i in input_char_index.items())
reverse_target_char_index = dict((i, char) for char, i in output_char_index.items())

In [15]:
def translate(model, word, input_char_index, output_char_index, reverse_input_char_index, 
              reverse_target_char_index, max_encoder_seq_length, max_decoder_seq_length, 
              num_encoder_tokens, num_decoder_tokens, device):
    
    word_t = ''
    data = np.zeros((max_encoder_seq_length,1), dtype="int64")
    for t, char in enumerate(word):
        data[t, 0] = input_char_index[char]
        
    data[t + 1 :,0] = input_char_index[" "]
    
    data = torch.tensor(data,dtype=torch.int64).to(device)

    with torch.no_grad():
        hidden, cell = model.encoder(data)
        
    x = torch.tensor(np.array(output_char_index['\t']).reshape(1,)).to(device)

    for t in range(1, max_decoder_seq_length):
        output, hidden, cell = model.decoder(x, hidden, cell)
        best_guess = output.argmax(1)
        x = best_guess
        ch = reverse_target_char_index[x.item()]
        if ch == '\n':
            break
        else:
            word_t = word_t+ch

    return word_t

In [16]:
for i in range(20):
    print(train_Y[i])
    print(translate(model, train_X[i], input_char_index, output_char_index, reverse_input_char_index, 
              reverse_target_char_index, max_encoder_seq_length, max_decoder_seq_length, 
              num_encoder_tokens, num_decoder_tokens, device))

	अं

अं
	अंकगणित

अंकगणित
	अंकल

अंकल
	अंकुर

अंकुर
	अंकुरण

अंकुरण
	अंकुरित

अंकुरित
	अंकुश

आंकुश
	अंकुश

अंकुश
	अंग

आंग
	अंग

अंगा
	अंगद

अगंध
	अंगद

अंगद
	अंगने

अंगने
	अंगभंग

अंगभंग
	अंगरक्षक

अंगरक्षक
	अंगरक्षक

अंगरक्षक
	अंगारा

अंगारा
	अंगारे

अंगारे
	अंगारे

अंगारे
	अंगी

अंगी
