In [2]:
import re
import os
import math
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import random
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
import conllu
from tqdm import tqdm, notebook
from sklearn.metrics import f1_score, accuracy_score, precision_score, recall_score, classification_report
from nltk import word_tokenize

In [3]:
torch.manual_seed(123)

if torch.cuda.is_available():
    print('Used device: GPU')
    device = torch.device("cuda")
else:
    print('Used device: CPU')
    device = torch.device("cpu")

Used device: GPU


In [4]:
trainFile = "dataset/train.conllu"
devFile = "dataset/dev.conllu"
testFile = "dataset/test.conllu"

f = open(trainFile, "r", encoding="utf-8")
trainData = conllu.parse(f.read())

f = open(devFile, "r", encoding="utf-8")
devData = conllu.parse(f.read())


In [5]:
class obsDataset(Dataset):

    def __init__(self, inpData):
        self.word2idx = {"UNK" : 0}
        self.idx2word = ["UNK"]
        self.idTensor = []
        self.vocabSize = 0
        self.unkownWords = []
        self.data = self.makeDict(inpData)
    
    def __len__(self):
        return len(self.idx2word)

    def addWord(self, word):
        if word not in self.word2idx:
            self.word2idx[word] = len(self.idx2word)
            self.idx2word.append(word)
        return self.word2idx[word]
    
    def getWord(self, idx):
        return self.idx2word[idx]

    def makeDict(self, data):
        totalWords = 0
        for sent in data:
            totalWords += len(sent)
            for word in sent:
                self.addWord(word["form"])
        
        # make words unkown if freq < 3
        wordFreq = {}
        for sent in data:
            for word in sent:
                if word["form"] not in wordFreq:
                    wordFreq[word["form"]] = 1
                else:
                    wordFreq[word["form"]] += 1
        
        for word in wordFreq:
            if wordFreq[word] < 3:
                self.word2idx[word] = 0
                self.unkownWords.append(word)
                
                
        for i in range(len(data)):
            tempTensor = torch.zeros(len(data[i]), dtype=torch.long)
            for j in range(len(data[i])):
                tempTensor[j] = self.word2idx[data[i][j]["form"]]
            self.idTensor.append(tempTensor)

        self.vocabSize = len(self.idx2word)
        
class stateDataset(Dataset):

    def __init__(self, inpData, unkownWords):
        self.word2idx = {"UNK" : 0}
        self.idx2word = ["UNK"]
        self.idTensor = []
        self.vocabSize = 0
        self.unkownWords = unkownWords
        self.data = self.makeDict(inpData)
    
    def __len__(self):
        return len(self.idx2word)

    def addWord(self, word):
        if word not in self.word2idx:
            self.word2idx[word] = len(self.idx2word)
            self.idx2word.append(word)
        return self.word2idx[word]
    
    def getWord(self, idx):
        return self.idx2word[idx]

    def makeDict(self, data):
        totalWords = 0
        for sent in data:
            totalWords += len(sent)
            for word in sent:
                self.addWord(word["upos"])

        # if word is unknown take its pos tag as unkown

        for i in range(len(data)):
            tempTensor = torch.zeros(len(data[i]), dtype=torch.long)
            for j in range(len(data[i])):
                if data[i][j]["form"] in self.unkownWords:
                    tempTensor[j] = self.word2idx["UNK"]
                else:
                    tempTensor[j] = self.word2idx[data[i][j]["upos"]]
            
            self.idTensor.append(tempTensor)
        
        self.vocabSize = len(self.idx2word)

In [6]:
def padSent(currBatch):
    currBatch.sort(key=lambda x: len(x[0]), reverse=True)
    maxLen = len(currBatch[0][0])
    inputs = []
    targets = []
    for i in range(len(currBatch)):
        currBatch[i][0] = torch.cat((currBatch[i][0], torch.zeros(maxLen - len(currBatch[i][0]), dtype=torch.long)))
        currBatch[i][1] = torch.cat((currBatch[i][1], torch.zeros(maxLen - len(currBatch[i][1]), dtype=torch.long)))
        inputs.append(currBatch[i][0])
        targets.append(currBatch[i][1])
    inputs = torch.stack(inputs)
    targets = torch.stack(targets)
    return inputs, targets

def createLoaders(trainData, devData):
    trainObj1 = obsDataset(trainData)
    unkownWords = trainObj1.unkownWords
    trainObj2 = stateDataset(trainData, unkownWords)
    combined = []
    for i in range(len(trainObj1.idTensor)):
        combined.append([trainObj1.idTensor[i], trainObj2.idTensor[i]])
    trainLoader = DataLoader(combined, batch_size, shuffle=True, collate_fn=padSent)

    devObj1 = obsDataset(devData)
    unkown2 = devObj1.unkownWords
    devObj2 = stateDataset(devData, unkown2)
    combined2 = []
    for i in range(len(devObj1.idTensor)):
        combined2.append([devObj1.idTensor[i], devObj2.idTensor[i]])
    devLoader = DataLoader(combined2, batch_size, shuffle=True, collate_fn=padSent)
    return trainObj1, trainObj2, trainLoader, devLoader, combined

In [7]:
class GRU(nn.Module):
    def __init__(self, embedding_size, hidden_size, inputVocabSize, outputVocabSize, num_layers):
        super(GRU, self).__init__()
        self.encoding = nn.Embedding(inputVocabSize, embedding_size)
        self.gru = nn.GRU(embedding_size, hidden_size, num_layers, batch_first=True, dropout=0.4)
        self.decoding = nn.Linear(hidden_size, outputVocabSize)
        self.loss = nn.CrossEntropyLoss()
        self.activation = nn.LogSoftmax(dim=2)
    
    def forward(self, input, hidden):
        batch_size, seq_len = input.size()
        embed = self.encoding(input)
        seqTensor = torch.tensor([seq_len] * batch_size, dtype=torch.long)
        updatedEmbedding = pack_padded_sequence(embed, seqTensor, batch_first=True)
        output, hidden = self.gru(updatedEmbedding, hidden)
        output, _ = pad_packed_sequence(output, batch_first=True)
        output = self.decoding(output)
        output = self.activation(output)
        return output, hidden

In [13]:
def trainModel(model, trainLoader, num_epochs, learning_rate, devLoader, outputVocabSize):
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    counter = 0
    lossArray = []
    for epoch in range(num_epochs):
        for i, (inputs, targets) in enumerate(trainLoader):
            inputs = inputs.to(device)
            targets = targets.to(device)
            hidden = torch.zeros(num_layers, inputs.size(0), hidden_size)
            hidden = hidden.to(device)
            output, hidden = model(inputs, hidden)
            loss = model.loss(output.view(-1, outputVocabSize), targets.view(-1))
            lossArray.append(loss.item())
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            if counter % 20 == 0:
                devLossArray = []
                for i, (inputs, targets) in enumerate(devLoader):
                    inputs = inputs.to(device)
                    targets = targets.to(device)
                    hidden = torch.zeros(num_layers, inputs.size(0), hidden_size)
                    hidden = hidden.to(device)
                    output, hidden = model(inputs, hidden)
                    loss = model.loss(output.view(-1, outputVocabSize), targets.view(-1))
                    devLossArray.append(loss.item())
                avgTrainLoss = sum(lossArray) / len(lossArray)
                avgDevLoss = sum(devLossArray) / len(devLossArray)
                print("Epoch: ", epoch, " Train Loss: ", avgTrainLoss, " Dev Loss: ", avgDevLoss)
                lossArray = []
            counter += 1

    return model

In [14]:
def reportTrainingAccuracy(model, trainLoader, outputVocabSize):
    yTrue = []
    yPred = []
    for x, y in trainLoader:
        x = x.to(device)
        y = y.to(device)
        hidden = torch.zeros(num_layers, x.size(0), hidden_size).to(device)
        output, hidden = model(x, hidden)
        output = output.view(-1, outputVocabSize)
        y = y.view(-1)
        _, predicted = torch.max(output.data, 1)
        yTrue.extend(y.tolist())
        yPred.extend(predicted.tolist())
    y_true = np.array(yTrue)
    y_pred = np.array(yPred)
    print("Training Accuracy: ", accuracy_score(y_true, y_pred))
    f1scores  = f1_score(y_true, y_pred, average="macro")
    print("Training Classification Report: ")
    print("Training F1 Macro Score: ", f1scores)
    print(classification_report(yTrue, yPred, target_names=trainObj2.idx2word))

In [17]:
def testData(model, testFile, outputVocabSize, trainingVocab, tagw2i):
    f = open(testFile, "r", encoding="utf-8")
    testData = conllu.parse(f.read())

    trainingVocab = trainingVocab
    yTrue = []
    yPred = []
    for sent in testData:
        for word in sent:
            if word["form"] not in trainingVocab:
                word["form"] = "UNK"
                word["upos"] = "UNK"
        idTensor = []
        for word in sent:
            idTensor.append(trainingVocab[word["form"]])
        idTensor = torch.tensor(idTensor, dtype=torch.long).to(device)
        hidden = torch.zeros(num_layers, 1, hidden_size).to(device)
        output, hidden = model(idTensor.view(1, -1), hidden)
        output = output.view(-1, outputVocabSize)
        _, predicted = torch.max(output.data, 1)
        predicted = predicted.tolist()
        yPred.extend(predicted)
        yTrue.extend([tagw2i[word["upos"]] for word in sent])
    yTrue = np.array(yTrue)
    yPred = np.array(yPred)
    # use classification report to report accuracy, precision and f1 score
    print("Testing Accuracy: ", accuracy_score(yTrue, yPred))
    f1scores  = f1_score(yTrue, yPred, average="macro")
    print("Testing F1 Macro Score: ", f1scores)
    print("Testing Classification Report: ")
    print(classification_report(yTrue, yPred, target_names=trainObj2.idx2word))

In [18]:
embedding_size = 28
hidden_size = 256
num_layers = 2
batch_size = 32
num_epochs = 15
learning_rate = 0.001


In [19]:
trainObj1, trainObj2, trainLoader, devLoader, combined = createLoaders(trainData, devData)
model = GRU(embedding_size, hidden_size, trainObj1.vocabSize, trainObj2.vocabSize, num_layers).to(device)

In [None]:
model = trainModel(model, trainLoader, num_epochs, learning_rate, devLoader, trainObj2.vocabSize)
torch.save(model.state_dict(), "model.pt")

In [None]:
reportTrainingAccuracy(model, trainLoader, trainObj2.vocabSize)

In [None]:
testData(model, testFile, trainObj2.vocabSize, trainObj1.word2idx, trainObj2.word2idx)

In [None]:
### Generate POS Tags for any input sentence

model.load_state_dict(torch.load("trainedModel/model.pt"))

sent = input ("Enter a sentence: ")
sent = sent.lower()
tokens = word_tokenize(sent)
# print(tokens)
idTensor = []
for token in tokens:
    if token in trainObj1.word2idx:
        idTensor.append(trainObj1.word2idx[token])
    else:
        idTensor.append(trainObj1.word2idx["UNK"])

output, hidden = model(torch.tensor(idTensor, dtype=torch.long).to(device).view(1, -1), torch.zeros(num_layers, 1, hidden_size).to(device))
output = output.view(-1, trainObj2.vocabSize)
_, predicted = torch.max(output.data, 1)
predicted = predicted.tolist()
# print(predicted)
for i in range(len(tokens)):
    print(tokens[i], "  ", trainObj2.idx2word[predicted[i]])