In [None]:
from __future__ import unicode_literals, print_function, division
from io import open
import unicodedata
import string
import re
import random
import pandas as pd
from sklearn.model_selection import train_test_split
import numpy as np
import time
import math
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import copy

import torch
import torch.nn as nn
from torch import optim
import torch.nn.functional as F

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
'''The model was adopted from a machine translation task, found in: https://pytorch.org/tutorials/intermediate/seq2seq_translation_tutorial.html'''
'''The term *word* (in the code) represents *character* (in the code) while the term *sentence* represents a *word*'''
'''It was decided to retain the original terminology of the code'''

SOS_token = 0
EOS_token = 1


class Lang:
    def __init__(self, name):
        self.name = name
        self.word2index = {}
        self.word2count = {}
        self.index2word = {0: "<", 1: ">"}
        self.n_words = 2  # Count SOS and EOS

    def addSentence(self, sentence):
        for word in list(sentence):
            self.addWord(word)

    def addWord(self, word):
        if word not in self.word2index:
            self.word2index[word] = self.n_words
            self.word2count[word] = 1
            self.index2word[self.n_words] = word
            self.n_words += 1
        else:
            self.word2count[word] += 1

In [None]:
'''Reading Data'''

def read_data(lang):
  return pd.read_csv(str(lang).lower()+"-train-medium", sep="\t",header=None)

# 'English' is set as the default selection out of this model's available languages.
# To change the language, please select a different one from the following: Dutch, North-Frisian, Kannada or Polish.
data = read_data("English")

In [None]:
'''Extracting and Preproceesing Data'''
'''It was initially thought that only verbs and conjugations would be dealt with'''
'''Later in the process it became clear that other lexical categories are also found in the data'''
'''The term *verb* (in the code) could also refer to other lexical categories while the term *conjugation(s)* (in the code) refers to *inflection(s)*'''
'''It was decided to retain the original terminology of the code'''

V_morph = sorted(set(data[2]))
morph2idx = {morph:idx for idx,morph in enumerate(V_morph)}


def morph2idx_fun(el):
    return morph2idx[el]


data[3] = data[2].map(morph2idx_fun)
X = data[[0,3]]
y = data[1]

keys = []
values = []

for idx in range(X.shape[0]):
    keys.append((X.iloc[idx][0] + "+" + str(X.iloc[idx][3])))
    values.append((y[idx]))
    
dicts = dict(zip(keys, values))
X1 = pd.DataFrame(dicts.items())
X1, X2 = train_test_split(X1, test_size = 0.2, random_state = 42) # X1 would be used for training while X2 would be used for testing.
MAX_LENGTH = max(max(X1[1].map(len)), max(X2[1].map(len)))+2
X1 = X1.reset_index()[[0, 1]]
X2 = X2.reset_index()[[0, 1]]
X1 = X1.rename(columns={0: 'Verb', 1: 'Conjugation'})
X2 = X2.rename(columns={0: 'Verb', 1: 'Conjugation'})

In [None]:
'''One-hot-encoding the Data'''

input_texts = []
target_texts = []
input_characters = []
target_characters = []

for idx in range(len(X1['Verb'])):
    input_text = X1.loc[idx, 'Verb']
    target_text = X1.loc[idx, 'Conjugation']
    # I use "tab" as the "start sequence" character for the targets, and "\n" as "end sequence" character.
    target_text = "\t" + target_text + "\n"
    input_texts.append(input_text)
    target_texts.append(target_text)
    for char in list(input_text):
        if char not in input_characters:
            input_characters.append(char)
    for char in list(target_text):
        if char not in target_characters:
            target_characters.append(char)

input_characters = sorted(list(input_characters))
target_characters = sorted(list(target_characters))
num_encoder_tokens = len(input_characters)
num_decoder_tokens = len(target_characters)

max_encoder_seq_length = len(X1['Verb'])
max_decoder_seq_length = len(X1['Conjugation'])

print("Number of samples:", len(input_texts))
print("Number of unique input tokens:", num_encoder_tokens)
print("Number of unique output tokens:", num_decoder_tokens)
print("Max sequence length for inputs:", max_encoder_seq_length)
print("Max sequence length for outputs:", max_decoder_seq_length)

input_token_index = dict([(char, i) for i, char in enumerate(input_characters)])
target_token_index = dict([(char, i) for i, char in enumerate(target_characters)])

val_list = []
for val in input_token_index.values():
    val_list.append(val)


In [None]:
def readLangs(input_data, reverse=False):
    print("Reading lines...")
    
    lang1 = "lemma_morph"
    lang2 = "surface"

    pairs = input_data.values.tolist()

    '''Reversing pairs and making Lang instances'''
    
    if reverse:
        pairs = [list(reversed(p)) for p in pairs]
        input_lang = Lang(lang2)
        output_lang = Lang(lang1)
    else:
        input_lang = Lang(lang1)
        output_lang = Lang(lang2)

    return input_lang, output_lang, pairs

In [None]:
def prepareData(input_data, reverse=False):
    input_lang, output_lang, pairs = readLangs(input_data, reverse)
    print("Read %s sentence pairs" % len(pairs))
    print("Counting words...")
    for pair in pairs:
        input_lang.addSentence(pair[0])
        output_lang.addSentence(pair[1])
    print("Counted words:")
    print(input_lang.name, input_lang.n_words)
    print(output_lang.name, output_lang.n_words)
    return input_lang, output_lang, pairs


input_lang, output_lang, pairs = prepareData(X1, False)
print(random.choice(pairs))

In [None]:
class EncoderRNN(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(EncoderRNN, self).__init__()
        self.hidden_size = hidden_size

        self.embedding = nn.Embedding(input_size, hidden_size)
        self.gru = nn.GRU(hidden_size, hidden_size)

    def forward(self, input, hidden):
        embedded = self.embedding(input).view(1, 1, -1)
        output = embedded
        output, hidden = self.gru(output, hidden)
        return output, hidden

    def initHidden(self):
        return torch.zeros(1, 1, self.hidden_size, device=device)

In [None]:
class DecoderRNN(nn.Module):
    def __init__(self, hidden_size, output_size):
        super(DecoderRNN, self).__init__()
        self.hidden_size = hidden_size

        self.embedding = nn.Embedding(output_size, hidden_size)
        self.gru = nn.GRU(hidden_size, hidden_size)
        self.out = nn.Linear(hidden_size, output_size)
        self.softmax = nn.LogSoftmax(dim=1)

    def forward(self, input, hidden):
        output = self.embedding(input).view(1, 1, -1)
        output = F.relu(output)
        output, hidden = self.gru(output, hidden)
        output = self.softmax(self.out(output[0]))
        return output, hidden

    def initHidden(self):
        return torch.zeros(1, 1, self.hidden_size, device=device)

In [None]:
class AttnDecoderRNN(nn.Module):
    def __init__(self, hidden_size, output_size, dropout_p=0.1, max_length=MAX_LENGTH):
        super(AttnDecoderRNN, self).__init__()
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.dropout_p = dropout_p
        self.max_length = max_length

        self.embedding = nn.Embedding(self.output_size, self.hidden_size)
        self.attn = nn.Linear(self.hidden_size * 2, self.max_length)
        self.attn_combine = nn.Linear(self.hidden_size * 2, self.hidden_size)
        self.dropout = nn.Dropout(self.dropout_p)
        self.gru = nn.GRU(self.hidden_size, self.hidden_size)
        self.out = nn.Linear(self.hidden_size, self.output_size)

    def forward(self, input, hidden, encoder_outputs):
        embedded = self.embedding(input).view(1, 1, -1)
        embedded = self.dropout(embedded)

        attn_weights = F.softmax(
            self.attn(torch.cat((embedded[0], hidden[0]), 1)), dim=1)
        attn_applied = torch.bmm(attn_weights.unsqueeze(0),
                                 encoder_outputs.unsqueeze(0))

        output = torch.cat((embedded[0], attn_applied[0]), 1)
        output = self.attn_combine(output).unsqueeze(0)

        output = F.relu(output)
        output, hidden = self.gru(output, hidden)

        output = F.log_softmax(self.out(output[0]), dim=1)
        return output, hidden, attn_weights

    def initHidden(self):
        return torch.zeros(1, 1, self.hidden_size, device=device)

In [None]:
def indexesFromSentence(lang, sentence):
    return [lang.word2index[word] for word in list(sentence)]


def tensorFromSentence(lang, sentence):
    indexes = indexesFromSentence(lang, sentence)
    indexes.append(EOS_token)
    return torch.tensor(indexes, dtype=torch.long, device=device).view(-1, 1)


def tensorsFromPair(pair):
    input_tensor = tensorFromSentence(input_lang, pair[0])
    target_tensor = tensorFromSentence(output_lang, pair[1])
    return (input_tensor, target_tensor)

In [None]:
def MED(sent_01, sent_02):
    n = len(sent_01)
    m = len(sent_02)

    matrix = [[i+j for j in range(m+1)] for i in range(n+1)]

    for i in range(1, n+1):
        for j in range(1, m+1):
            if sent_01[i-1] == sent_02[j-1]:
                d = 0
            else:
                d = 1

            matrix[i][j] = min(matrix[i-1][j]+1, matrix[i][j-1]+1, matrix[i-1][j-1]+d)

    distance_score = matrix[n][m]
   
    return distance_score

In [None]:
teacher_forcing_ratio = 0.5


def train(input_tensor, target_tensor, encoder, decoder, encoder_optimizer, decoder_optimizer, criterion, max_length=MAX_LENGTH):
    encoder_hidden = encoder.initHidden()

    encoder_optimizer.zero_grad()
    decoder_optimizer.zero_grad()

    input_length = input_tensor.size(0)
    target_length = target_tensor.size(0)

    encoder_outputs = torch.zeros(max_length, encoder.hidden_size, device=device)

    loss = 0

    for ei in range(input_length):
        encoder_output, encoder_hidden = encoder(
            input_tensor[ei], encoder_hidden)
        encoder_outputs[ei] = encoder_output[0, 0]

    decoder_input = torch.tensor([[SOS_token]], device=device)

    decoder_hidden = encoder_hidden

    use_teacher_forcing = True if random.random() < teacher_forcing_ratio else False

    if use_teacher_forcing:
        # Teacher forcing: Feed the target as the next input
        for di in range(target_length):
            decoder_output, decoder_hidden, decoder_attention = decoder(
                decoder_input, decoder_hidden, encoder_outputs)
            loss += criterion(decoder_output, target_tensor[di])
            decoder_input = target_tensor[di]  # Teacher forcing

    else:
        # Without teacher forcing: use its own predictions as the next input
        for di in range(target_length):
            decoder_output, decoder_hidden, decoder_attention = decoder(
                decoder_input, decoder_hidden, encoder_outputs)
            topv, topi = decoder_output.topk(1)
            decoder_input = topi.squeeze().detach()  # detach from history as input

            loss += criterion(decoder_output, target_tensor[di])

    loss.backward()

    encoder_optimizer.step()
    decoder_optimizer.step()

    return loss.item() / target_length

In [None]:
def asMinutes(s):
    m = math.floor(s / 60)
    s -= m * 60
    return '%dm %ds' % (m, s)


def timeSince(since, percent):
    now = time.time()
    s = now - since
    es = s / (percent)
    rs = es - s
    return '%s (- %s)' % (asMinutes(s), asMinutes(rs))

In [None]:
def trainIters(encoder, decoder, n_iters, print_every=10000, plot_every=100, learning_rate=0.001):
    start = time.time()
    plot_losses = []
    print_loss_total = 0  # Reset every print_every
    plot_loss_total = 0  # Reset every plot_every

    encoder_optimizer = optim.SGD(encoder.parameters(), lr=learning_rate)
    decoder_optimizer = optim.SGD(decoder.parameters(), lr=learning_rate)
    training_pairs = [tensorsFromPair(random.choice(pairs))
                      for i in range(n_iters)]
    criterion = nn.NLLLoss()

    for iter in range(1, n_iters + 1):
        training_pair = training_pairs[iter - 1]
        input_tensor = training_pair[0]
        target_tensor = training_pair[1]

        loss = train(input_tensor, target_tensor, encoder,
                     decoder, encoder_optimizer, decoder_optimizer, criterion)
        print_loss_total += loss
        plot_loss_total += loss

        if iter % print_every == 0:
            print_loss_avg = print_loss_total / print_every
            print_loss_total = 0
            print('%s (%d %d%%) %.4f' % (timeSince(start, iter / n_iters),
                                         iter, iter / n_iters * 100, print_loss_avg))

        if iter % plot_every == 0:
            plot_loss_avg = plot_loss_total / plot_every
            plot_losses.append(plot_loss_avg)
            plot_loss_total = 0

    showPlot(plot_losses)

In [None]:
def showPlot(points):
    plt.figure()
    fig, ax = plt.subplots()
    # This locator puts ticks at regular intervals
    loc = ticker.MultipleLocator(base=0.2)
    ax.yaxis.set_major_locator(loc)
    plt.plot(points)
    plt.show()

In [None]:
def evaluate(encoder, decoder, sentence, max_length=MAX_LENGTH):
    with torch.no_grad():
        input_tensor = tensorFromSentence(input_lang, sentence)
        input_length = input_tensor.size()[0]
        encoder_hidden = encoder.initHidden()

        encoder_outputs = torch.zeros(max_length, encoder.hidden_size, device=device)

        for ei in range(input_length):
            encoder_output, encoder_hidden = encoder(input_tensor[ei],
                                                     encoder_hidden)
            encoder_outputs[ei] += encoder_output[0, 0]

        decoder_input = torch.tensor([[SOS_token]], device=device)  # SOS

        decoder_hidden = encoder_hidden

        decoded_words = []
        decoder_attentions = torch.zeros(max_length, max_length)

        for di in range(max_length):
            decoder_output, decoder_hidden, decoder_attention = decoder(
                decoder_input, decoder_hidden, encoder_outputs)
            decoder_attentions[di] = decoder_attention.data
            topv, topi = decoder_output.data.topk(1)
            if topi.item() == EOS_token:
                decoded_words.append('>')
                break
            else:
                decoded_words.append(output_lang.index2word[topi.item()])

            decoder_input = topi.squeeze().detach()

        return decoded_words, decoder_attentions[:di + 1]

In [None]:
def evaluateRandomly(encoder, decoder, n=10):
    for i in range(n):
        pair = random.choice(pairs)
        print('>', pair[0])
        print('=', pair[1])
        output_words, attentions = evaluate(encoder, decoder, pair[0])
        output_sentence = ' '.join(output_words)
        print('<', output_sentence)
        print('MED = ', MED((str(pair[1]).replace(" ", "")), str(output_sentence.replace(" ", "").replace(">", ""))))
        print('')

In [None]:
'''The hyperparametes: hidden_size, dropout_p, num_iters, print_every and l_rate were set to default values and may be changed'''
hidden_size = 512
num_iters = 100000
l_rate = 0.001
encoder1 = EncoderRNN(input_lang.n_words, hidden_size).to(device)
attn_decoder1 = AttnDecoderRNN(hidden_size, output_lang.n_words, dropout_p=0.1).to(device)

trainIters(encoder1, attn_decoder1, num_iters, print_every=10000, learning_rate=l_rate)

In [None]:
evaluateRandomly(encoder1, attn_decoder1)

In [None]:
def showAttention(input_sentence, output_words, attentions):
    # Set up figure with colorbar
    fig = plt.figure()
    ax = fig.add_subplot(111)
    cax = ax.matshow(attentions.numpy(), cmap='bone')
    fig.colorbar(cax)

    # Set up axes
    ax.set_xticklabels([''] + list(input_sentence) +
                       ['<EOS>'], rotation=90)
    ax.set_yticklabels([''] + output_words)

    # Show label at every tick
    ax.xaxis.set_major_locator(ticker.MultipleLocator(1))
    ax.yaxis.set_major_locator(ticker.MultipleLocator(1))

    plt.show()


def evaluateAndShowAttention(input_sentence, gold_sent):
    output_words, attentions = evaluate(
        encoder1, attn_decoder1, input_sentence)
    print('input =', input_sentence)
    print('output =', ' '.join(output_words))
    print('MED = ', MED(gold_sent.replace(" ", ""), (' '.join(output_words).replace(" ", "").replace(">", ""))))
    showAttention(input_sentence, output_words, attentions)

In [None]:
'''Preparing the testing data including preprocessing'''

def has_any(word, presence_dict):
    new_dict = presence_dict
    for k in new_dict.keys():
        if k in word:
            new_dict[k] = True
        else:
            new_dict[k] = False
            
    return new_dict

input_lang2, _, _ = prepareData(X2, False)
x1_dict = copy.deepcopy(input_lang.word2index)
x2_dict = copy.deepcopy(input_lang2.word2index)
comb_dict = {**x1_dict, **x2_dict}

In [None]:
def return_false_keys1(presence_dict):
    false_keys_list = []
    for key, value in comb_dict.items():
        if key not in list(presence_dict.keys()):
            false_keys_list.append(key)
    return false_keys_list

In [None]:
keys_not_in_x1 = return_false_keys1(x1_dict)
keys_not_in_x2 = return_false_keys1(x2_dict)

print("The following characters appear in the test set but not in the training set:")
print(keys_not_in_x1)
print()
print("The following characters appear in the training set but not in the test set:")
print(keys_not_in_x2)

In [None]:
'''Finding unknown verb conjugations (or inflections to be exact)'''

def check_false_chars(word):
    for char in keys_not_in_x1:
        if char in word:
            return None
    
    return 1

X2['UNKVC'] = X2.Verb.apply(check_false_chars)
X2['UNKCC'] = X2.Conjugation.apply(check_false_chars)

X2.dropna(subset = ['UNKVC'], inplace=True)
print("A total of ", 200-len(X2), " entries have been removed from the original test-set.")

In [None]:
'''Preparing the final version of the test-set after preprocessing'''

X2 = X2.sample(frac=1).reset_index(drop=True)

x_test = X2['Verb']
y_test = X2['Conjugation']

In [None]:
'''Uncomment the following to see attention alignment visualisations'''

# for idx, el in enumerate(x_test):
#     evaluateAndShowAttention(x_test[idx], y_test[idx])

In [None]:
sum_med = 0
acc_count = 0

for idx, el in enumerate(x_test):
    out, _ = evaluate(encoder1, attn_decoder1, x_test[idx])
    out = ' '.join(out).replace(" ", "").replace(">", "")
    sum_med+=(MED(out, y_test[idx].replace(" ", "")))
    if (MED(out, y_test[idx])) == 0:
        acc_count+=1    
    
print("The measured average MED score is: ", sum_med/len(x_test))
print("The measured accuracy score is: ", acc_count/len(x_test))