In [3]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import warnings
import conllu
from tqdm import tqdm

In [4]:
# Check if GPU is available
device = 'cuda' if torch.cuda.is_available() else 'cpu'
# device = 'cpu'
warnings.filterwarnings("ignore")

In [7]:
# Read data from the conllu files
trainingData = open("UD_English-Atis/en_atis-ud-train.conllu", "r", encoding="utf-8").read()
testingData = open("UD_English-Atis/en_atis-ud-test.conllu", "r", encoding="utf-8").read()

trainSentences = conllu.parse(trainingData)
testSentences = conllu.parse(testingData)

taggedTrainSentences = []

for sentence in trainSentences:
    taggedSentence = []
    for i in range(len(sentence)):
        taggedSentence.append((sentence[i]['form'], sentence[i]['upos']))
    taggedTrainSentences.append(taggedSentence)

In [8]:
def word2index(word, ix):
    return torch.tensor(ix[word], dtype=torch.long)

def char2index(char, ix):
    return torch.tensor(ix[char], dtype=torch.long)

def tag2index(tag, ix):
    return torch.tensor(ix[tag], dtype=torch.long)

def sequence2index(sequence, ix):
    return torch.tensor([ix[s] for s in sequence], dtype=torch.long)

word2index = {}
tag2index = {}
char2index = {}
for sentence in taggedTrainSentences:
    for word, pos_tag in sentence:
        if word not in word2index.keys():
            word2index[word] = len(word2index)
        if pos_tag not in tag2index.keys():
            tag2index[pos_tag] = len(tag2index)
        for char in word:
            if char not in char2index.keys():
                char2index[char] = len(char2index)

word_vocab_size = len(word2index)
tag_vocab_size = len(tag2index)
char_vocab_size = len(char2index)

In [9]:
WORD_EMBEDDING_DIM = 1024
CHAR_EMBEDDING_DIM = 128
WORD_HIDDEN_DIM = 1024
CHAR_HIDDEN_DIM = 1024
EPOCHS = 10

class DualLSTMTagger(nn.Module):
    def __init__(self, wordEmbeddingDimension, wordHiddenDimension, charEmbeddingDimension, charHiddenDimension,
                 wordVocabSize, charVocabSize, tagVocabSize):
        super(DualLSTMTagger, self).__init__()
        self.wordEmbedding = nn.Embedding(wordVocabSize, wordEmbeddingDimension)

        self.charEmbedding = nn.Embedding(charVocabSize, charEmbeddingDimension)
        self.charLSTM = nn.LSTM(charEmbeddingDimension, charHiddenDimension)

        self.lstm = nn.LSTM(wordEmbeddingDimension + charHiddenDimension, wordHiddenDimension)
        self.hidden2tag = nn.Linear(wordHiddenDimension, tagVocabSize)

    def forward(self, sentence, words):
        embeds = self.wordEmbedding(sentence)
        charHiddenTotal = []
        for word in words:
            charEmbedded = self.charEmbedding(word)
            _, (charHidden, char_cell_state) = self.charLSTM(charEmbedded.view(len(word), 1, -1))
            word_char_hidden_state = charHidden.view(-1)
            charHiddenTotal.append(word_char_hidden_state)
        charHiddenTotal = torch.stack(tuple(charHiddenTotal))

        combined = torch.cat((embeds, charHiddenTotal), 1)

        output, _ = self.lstm(combined.view(len(sentence), 1, -1))
        tagSpace = self.hidden2tag(output.view(len(sentence), -1))

        tagScores = F.log_softmax(tagSpace, dim=1)
        return tagScores

model = DualLSTMTagger(WORD_EMBEDDING_DIM, WORD_HIDDEN_DIM, CHAR_EMBEDDING_DIM, CHAR_HIDDEN_DIM, word_vocab_size,
                      char_vocab_size, tag_vocab_size)

model = model.to(device)
criterion = nn.NLLLoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)

In [10]:
lossList = []

interval = round(len(taggedTrainSentences) / 100.)
epochInterval = round(EPOCHS / 2.)

for epoch in tqdm(range(EPOCHS)):
    epochLoss = 0

    for taggedSentence in taggedTrainSentences:
        words = []
        sentence = []
        targets = []

        for word in taggedSentence:
            words.append(torch.tensor(sequence2index(word[0], char2index), dtype=torch.long).to(device))
            sentence.append(word[0])
            targets.append(word[1])

        sentence = torch.tensor(sequence2index(sentence, word2index), dtype=torch.long).to(device)
        targets = torch.tensor(sequence2index(targets, tag2index), dtype=torch.long).to(device)

        model.zero_grad()

        tagScores = model(sentence, words)

        loss = criterion(tagScores, targets)
        loss.backward()
        optimizer.step()
        epochLoss += loss.item()
        _, indices = torch.max(tagScores, 1)

    epochLoss = epochLoss / len(taggedTrainSentences)
    lossList.append(float(epochLoss))

    if (epoch + 1) % epochInterval == 0:
        print(f"Epoch {epoch+1} Completed,\tLoss {np.mean(lossList[-epochInterval:])}")


 50%|█████     | 5/10 [09:19<09:18, 111.74s/it]

Epoch 5 Completed,	Loss 0.2811941445917349


100%|██████████| 10/10 [18:37<00:00, 111.70s/it]

Epoch 10 Completed,	Loss 0.28733548180116397





In [18]:
testSentence = input("Please enter sentence:")
testSequence = testSentence.split()

with torch.no_grad():
    words = [torch.tensor(sequence2index(s[0], char2index), dtype=torch.long).to(device) for s in testSequence]
    sentence = torch.tensor(sequence2index(testSequence, word2index), dtype=torch.long).to(device)

    tagScores = model(sentence, words)
    _, indices = torch.max(tagScores, 1)
    ans = []
    for i in range(len(indices)):
        for key, value in tag2index.items():
            if indices[i] == value:
                ans.append((testSequence[i], key))
    print(ans)

Please enter sentence:airplanes can fly
[('airplanes', 'NOUN'), ('can', 'AUX'), ('fly', 'VERB')]
