    Author: Roman Makarov
    e-mail: mcronomus@gmail.com

# Hidden Markov Models for POS Tagging

##Data preparation

In [None]:
!wget https://raw.githubusercontent.com/Gci04/AML-DS-2021/main/data/PosTagging/train_pos.txt
!wget https://raw.githubusercontent.com/Gci04/AML-DS-2021/main/data/PosTagging/test_pos.txt

--2023-05-29 14:26:01--  https://raw.githubusercontent.com/Gci04/AML-DS-2021/main/data/PosTagging/train_pos.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.111.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1855828 (1.8M) [text/plain]
Saving to: ‘train_pos.txt’


2023-05-29 14:26:02 (31.8 MB/s) - ‘train_pos.txt’ saved [1855828/1855828]

--2023-05-29 14:26:02--  https://raw.githubusercontent.com/Gci04/AML-DS-2021/main/data/PosTagging/test_pos.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 418682 (409K) [text/plain]
Saving to: ‘test_pos.txt’


2023-05-29 14:26:02 (11.3 MB/s

In [None]:
import nltk
from nltk.tokenize import word_tokenize

tags = []
words = []
tagged_sentences = []

for line in open('train_pos.txt', 'r').readlines():
    vals = tuple(line.strip().split())
    try:
        words.append(vals[0])
        tags.append(vals[1])
        tagged_sentences.append(vals)
    except:
        pass

In [None]:
train_sentences = [[]]
for entry in tagged_sentences:
    train_sentences[-1].append(entry)
    if entry[0] == '.':
        train_sentences.append([])

In [None]:
import nltk
from nltk.tokenize import word_tokenize

test_sentences = [[]]

for line in open('test_pos.txt', 'r').readlines():
    if line.strip() == '':
        continue

    vals = tuple(line.strip().split())
    try:
        test_sentences[-1].append(vals)
        if vals[0] == '.':
            test_sentences.append([])
    except:
        pass

test_sentences.pop()

[]

In [None]:
# Map tag to some number
tag_to_num = {}
for tag in tags:
    if tag not in tag_to_num:
        tag_to_num[tag] = len(tag_to_num)

# Map word to some number
word_to_num = {'<UNK>': 0}
for word in words:
    if word not in word_to_num:
        word_to_num[word] = len(word_to_num)

##Viterbi algorithnm

In [None]:
import numpy as np

def viterbi(y, A, B, Pi=None):
    # Cardinality of the state space
    K = A.shape[0]
    # Initialize the priors with default (uniform dist) if not given by caller
    Pi = Pi if Pi is not None else np.full(K, 1 / K)
    T = len(y)
    T1 = np.empty((K, T), 'd')
    T2 = np.empty((K, T), 'B')

    # Initilaize the tracking tables from first observation
    T1[:, 0] = Pi * B[:, y[0]]
    T2[:, 0] = 0

    # Iterate throught the observations updating the tracking tables
    for i in range(1, T):
        T1[:, i] = np.max(T1[:, i - 1] * A.T * B[np.newaxis, :, y[i]].T, 1)
        T2[:, i] = np.argmax(T1[:, i - 1] * A.T, 1)

    # Build the output, optimal model trajectory
    x = np.empty(T, 'B')
    x[-1] = np.argmax(T1[:, T - 1])
    for i in reversed(range(1, T)):
        x[i - 1] = T2[x[i], i]

    return x, T1, T2

##Computing matrices

In [None]:
A = np.ones((len(tag_to_num), len(tag_to_num)))
B = np.ones((len(tag_to_num), len(word_to_num)))
Pi = np.zeros(len(tag_to_num))

for i in range(len(tags)):
    if i == 0 or tags[i - 1] == '.':
        Pi[tag_to_num[tags[i]]] += 1

    B[tag_to_num[tags[i]]][word_to_num[words[i]]] += 1

    if i != len(tags) - 1:
        A[tag_to_num[tags[i]]][tag_to_num[tags[i + 1]]] += 1

for i in range(len(tag_to_num)):
    A[i] /= A[i].sum()
    B[i] /= B[i].sum()

Pi /= Pi.sum()

##Pretesting the model

In [None]:
import nltk
nltk.download('punkt')
nltk.download('averaged_perceptron_tagger')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /root/nltk_data...
[nltk_data]   Unzipping taggers/averaged_perceptron_tagger.zip.


True

In [None]:
from nltk import pos_tag, word_tokenize

def attempt(_sent, A, B, Pi, _print_end=True, _to_print=False):
    sent = []
    sent_words = []

    for word in word_tokenize(_sent):
        sent.append(word_to_num[word])
        sent_words.append(word)

    sent = np.array(sent)
    x, t1, t2 = viterbi(sent, A, B, Pi)

    correct = 0
    all = 0

    correct_answer = pos_tag(word_tokenize(_sent))
    for y_pred, y_test in zip(x, correct_answer):
        if y_pred == tag_to_num[y_test[1]]:
            correct += 1
        all += 1

    if _to_print:
        print('Prediction:\n  ', end='')
        for i, num in enumerate(x):
            print(f"('{sent_words[i]}', '{num_to_tag[num]}')", end=' ')

        print('\nCorrect answer:\n  ', end='')
        print(*pos_tag(word_tokenize(_sent)), end='\n\n' if _print_end else '\n')

    return correct * 100. / all


def multiple_attempts(_sents, A, B, Pi):
    total_accuracy = 0
    total_attempts = 0

    for i, _sent in enumerate(_sents):
        total_accuracy += attempt(_sent, A, B, Pi, i != len(_sents) - 1)
        total_attempts += 1

    return total_accuracy / total_attempts

In [None]:
from nltk import pos_tag, word_tokenize

sents = ['This is bad phone', 'There is a rain outside', 'This country is a home to many animals']
multiple_attempts(sents, A, B, Pi)

100.0

# LSTM for POS Tagging

##Data preparation

In [None]:
import nltk
from nltk.tokenize import word_tokenize

tags_lstm = set()
words_lstm = set()
tagged_sentences_lstm = [[]]

for line in open('train_pos.txt', 'r').readlines():
    vals = tuple(line.strip().split())
    try:
        words_lstm.add(vals[0])
        tags_lstm.add(vals[1])
        tagged_sentences_lstm[-1].append(vals)
        if vals[0] == '.':
            tagged_sentences_lstm.append([])
    except:
        pass

tagged_sentences_lstm.pop()

[]

In [None]:
import nltk
from nltk.tokenize import word_tokenize

test_sentences_lstm = [[]]

for line in open('test_pos.txt', 'r').readlines():
    if not line.strip():
        continue

    vals = tuple(line.strip().split())
    try:
        test_sentences_lstm[-1].append(vals)
        if vals[0] == '.':
            test_sentences_lstm.append([])
    except:
        pass

test_sentences_lstm.pop()

[]

In [None]:
# Map tag to some number
tag_to_num_lstm = {}
for tag in tags:
    if tag not in tag_to_num_lstm:
        tag_to_num_lstm[tag] = len(tag_to_num_lstm)

# Map word to some number
word_to_num_lstm = {}
for word in words:
    if word not in word_to_num_lstm:
        word_to_num_lstm[word] = len(word_to_num_lstm)

In [None]:
vocab = {"<PAD>": 0, "<UNK>": 1}
vocab.update(word_to_num_lstm)

tags_vocab = {"<PAD>": 0}
tags_vocab.update(tag_to_num_lstm)

In [None]:
tagged_sentences_lstm[0][:3], test_sentences_lstm[0][:3]

([('Confidence', 'NN'), ('in', 'IN'), ('the', 'DT')],
 [('Rockwell', 'NNP'), ('International', 'NNP'), ('Corp.', 'NNP')])

In [None]:
import torch

from torch.utils.data import DataLoader

batch_size = 64
SEQUENCE_LENGTH = max([len(L) for L in tagged_sentences_lstm])

def collate_fn(batch):
    batch_input, batch_output = [], []

    for sent in batch:
        sentence = [x[0] for x in sent]
        labels   = [x[1] for x in sent]

        if len(sentence) > SEQUENCE_LENGTH:
            sentence = sentence[:SEQUENCE_LENGTH]
            labels = labels[:SEQUENCE_LENGTH]
        elif len(sentence) < SEQUENCE_LENGTH:
            sentence.extend("<PAD>" for _ in range(SEQUENCE_LENGTH - len(sentence)))
            labels.extend("<PAD>" for _ in range(SEQUENCE_LENGTH - len(labels)))

        for i in range(len(sentence)):
            sentence[i] = vocab[sentence[i]] if sentence[i] in vocab else vocab['<UNK>']
            labels[i] = tags_vocab[labels[i]]

        batch_input.append(sentence)
        batch_output.append(labels)

    batch_input = torch.tensor(batch_input, dtype=torch.long)
    batch_output = torch.tensor(batch_output, dtype=torch.long)
    return batch_input, batch_output

dataloader      = DataLoader(tagged_sentences_lstm, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
test_dataloader = DataLoader(test_sentences_lstm, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)

In [None]:
print(next(iter(dataloader)))

(tensor([[  977,     2,  2646,  ...,     0,     0,     0],
        [ 3527,   263,   281,  ...,     0,     0,     0],
        [15605, 15516,    33,  ...,     0,     0,     0],
        ...,
        [   94,  1194,    35,  ...,     0,     0,     0],
        [  419,   266,    17,  ...,     0,     0,     0],
        [  403,   404,   686,  ...,     0,     0,     0]]), tensor([[ 1,  2,  0,  ...,  0,  0,  0],
        [18,  9,  4,  ...,  0,  0,  0],
        [18,  4, 14,  ...,  0,  0,  0],
        ...,
        [ 2,  0,  1,  ...,  0,  0,  0],
        [ 1, 10, 11,  ...,  0,  0,  0],
        [10, 10,  8,  ...,  0,  0,  0]]))


In [None]:
DATASET_SIZE_TRAIN = len(dataloader.dataset)
DATASET_SIZE_TEST = len(test_dataloader.dataset)

##LSTM model

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim


class LSTMTagger(nn.Module):
    def __init__(self, embedding_dim, hidden_dim, vocab_size, tagset_size):
        super(LSTMTagger, self).__init__()
        self.hidden_dim = hidden_dim

        self.word_embeddings = nn.Embedding(vocab_size, embedding_dim)

        self.lstm = nn.LSTM(
            embedding_dim,
            hidden_dim,
            batch_first=True,
            num_layers=2,
            bidirectional=True
        )

        self.dropout = nn.Dropout(0.1)

        self.hidden2tag = nn.Linear(hidden_dim * 2, tagset_size)

    def forward(self, sentence):
        embeds = self.word_embeddings(sentence)
        embeds = self.dropout(embeds)
        lstm_out, _ = self.lstm(embeds)
        lstm_out = self.dropout(lstm_out)
        tag_space = self.hidden2tag(lstm_out)
        tag_scores = F.log_softmax(tag_space, dim=1)
        return tag_scores

In [None]:
EMBEDDING_DIM = 512
HIDDEN_DIM = 512
VOCAB_SIZE = len(vocab)
TARGET_SIZE = SEQUENCE_LENGTH

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = LSTMTagger(EMBEDDING_DIM, HIDDEN_DIM, VOCAB_SIZE, TARGET_SIZE)

optimizer = optim.Adam(model.parameters())
criterion = nn.NLLLoss()

model = model.to(device)
criterion = criterion.to(device)

##Training and testing the model

In [None]:
def accuracy_calculator(preds, y):
    return (preds == y).sum() / (y.shape[0] * y.shape[1])

def train(model, dataloader, optimizer, criterion, device):
    epoch_loss = 0
    epoch_acc = 0

    model.train()

    for text, label in dataloader:
        text = text.to(device)
        label = label.to(device)

        optimizer.zero_grad()

        predictions = model(text)
        loss = criterion(predictions, label)

        acc = accuracy_calculator(
            torch.argmax(predictions, dim=1), label
        )

        loss.backward()
        optimizer.step()

        epoch_loss += loss.item() * label.shape[0]
        epoch_acc += acc.item() * label.shape[0]

    return epoch_loss / DATASET_SIZE_TRAIN, epoch_acc / DATASET_SIZE_TRAIN

In [None]:
def evaluate_model(model, data_batches, criterion, device):
    eval_loss = 0
    eval_acc = 0

    model.eval()

    with torch.no_grad():
        for text, tags in data_batches:

            tags = tags.type(torch.LongTensor)

            text = text.to(device)
            tags = tags.to(device)

            predictions = model(text)

            loss = criterion(predictions, tags)

            acc = accuracy_calculator(
                torch.argmax(predictions, dim=1), tags
            )

            eval_loss += loss.item() * tags.shape[0]
            eval_acc += acc.item()  * tags.shape[0]

    return eval_loss / DATASET_SIZE_TEST, eval_acc / DATASET_SIZE_TEST

Training the model with the most optimal hyperparameters and layer structure found

In [None]:
epochs = 20

for epoch in range(epochs):
    train_loss, train_acc = train(model, dataloader, optimizer, criterion, device)
    test_loss, test_acc = evaluate_model(model, test_dataloader, criterion, device)
    print(f'Epoch: {epoch+1}, Train [Loss:  {train_loss:.3f}  Acc: {train_acc*100:.2f}, Test_Acc: {test_acc*100.:.2f}]')

Epoch: 1, Train [Loss:  0.873  Acc: 81.00, Test_Acc: 83.37]
Epoch: 2, Train [Loss:  0.609  Acc: 83.54, Test_Acc: 83.60]
Epoch: 3, Train [Loss:  0.574  Acc: 83.96, Test_Acc: 84.06]
Epoch: 4, Train [Loss:  0.539  Acc: 84.57, Test_Acc: 84.60]
Epoch: 5, Train [Loss:  0.504  Acc: 85.29, Test_Acc: 84.95]
Epoch: 6, Train [Loss:  0.475  Acc: 85.96, Test_Acc: 85.66]
Epoch: 7, Train [Loss:  0.445  Acc: 86.65, Test_Acc: 86.13]
Epoch: 8, Train [Loss:  0.415  Acc: 87.37, Test_Acc: 86.28]
Epoch: 9, Train [Loss:  0.384  Acc: 88.20, Test_Acc: 86.75]
Epoch: 10, Train [Loss:  0.356  Acc: 88.94, Test_Acc: 87.02]
Epoch: 11, Train [Loss:  0.323  Acc: 89.86, Test_Acc: 87.40]
Epoch: 12, Train [Loss:  0.290  Acc: 90.83, Test_Acc: 87.75]
Epoch: 13, Train [Loss:  0.259  Acc: 91.79, Test_Acc: 88.00]
Epoch: 14, Train [Loss:  0.231  Acc: 92.65, Test_Acc: 88.08]
Epoch: 15, Train [Loss:  0.204  Acc: 93.52, Test_Acc: 88.17]
Epoch: 16, Train [Loss:  0.183  Acc: 94.20, Test_Acc: 88.26]
Epoch: 17, Train [Loss:  0.159  A

# Evaluation

##Evaluating HMM

In [None]:
def multiple_test(_sents, A, B, Pi):
    correct = 0
    all = 0

    for i, sent in enumerate(_sents):
        sent_words = []
        sent_tags = []
        for x in sent:
            if x[0] not in word_to_num:
                x = ("<UNK>", x[1])

            sent_words.append(word_to_num[x[0]])
            sent_tags.append(tag_to_num[x[1]])

        x, t1, t2 = viterbi(sent_words, A, B, Pi)

        correct += np.sum(x == sent_tags)
        all += len(x)

    return correct * 100. / all

In [None]:
hmm_acc = multiple_test(test_sentences, A, B, Pi)
print(f'Accuracy of HMM on test data: {hmm_acc:.2f}%')

Accuracy of HMM on test data: 89.44%


##Evaluating LSTM

In [None]:
test_loss, test_acc = evaluate_model(model, test_dataloader, criterion, device)
print(f'Accuracy of LSTM on test data: {test_acc*100:.2f}%')

Accuracy of LSTM on test data: 88.46%


##Printing samples where HMM made mistake

In [None]:
num_to_word = {j: i for i, j in word_to_num.items()}
num_to_tag = {j: i for i, j in tag_to_num.items()}

In [None]:
def find_mistakes_hmm(_sents, A, B, Pi):
    mistakes = []

    for i, sent in enumerate(_sents):
        sent_words = []
        sent_tags = []
        for x in sent:
            if x[0] not in word_to_num:
                x = ("<UNK>", x[1])

            sent_words.append(word_to_num[x[0]])
            sent_tags.append(tag_to_num[x[1]])

        x, t1, t2 = viterbi(sent_words, A, B, Pi)

        for word, pred, true in zip(sent, x, sent_tags):
            if pred != true:
                mistakes.append((word[0], num_to_tag[pred], num_to_tag[true]))

        if len(mistakes) > 10:
            break

    return mistakes

In [None]:
mistakes = find_mistakes_hmm(test_sentences, A, B, Pi)

In [None]:
print(f'Word                   Predicted label         True label       In Train')
print(f'------------------------------------------------------------------------')
for word, predicted, true in mistakes[:10]:
    print(f'{word:<28} {predicted:<20} {true:<15} {word in word_to_num}')

Word                   Predicted label         True label       In Train
------------------------------------------------------------------------
Tulsa                        JJ                   NNP             True
extending                    IN                   VBG             False
747                          JJ                   CD              True
jetliners                    NN                   NNS             False
Rockwell                     PRP                  NNP             False
calls                        NNS                  VBZ             True
supply                       $                    VB              True
shipsets                     NN                   NNS             False
include                      NN                   VBP             True
bulkheads                    NN                   NNS             False


##Printing samples where LSTM made mistakes

In [None]:
inv_vocab = {j: i for i, j in vocab.items()}
inv_tags_vocab = {j: i for i, j in tags_vocab.items()}

In [None]:
def find_mistakes(model, data_batches, criterion, device):
    model.eval()

    mistakes = []

    with torch.no_grad():
        for text, tags in data_batches:
            tags = tags.type(torch.LongTensor)

            text = text.to(device)
            tags = tags.to(device)

            predictions = model(text)

            loss = criterion(predictions, tags)

            acc = accuracy_calculator(
                torch.argmax(predictions, dim=1), tags
            )

            pred = torch.argmax(predictions, dim=1).cpu().detach().numpy()
            true = tags.cpu().detach().numpy()
            word = text.cpu().detach().numpy()

            for pred_i, true_i, words_i in zip(pred, true, word):
                for i in range(len(pred_i)):
                    if pred_i[i] != true_i[i]:
                        mistakes.append((words_i[i], pred_i[i], true_i[i]))
                        break

            if len(mistakes) >= 10:
                break

    return mistakes

In [None]:
mistakes = find_mistakes(model, test_dataloader, criterion, device)

In [None]:
print(f'Word                   Predicted label         True label       In Train')
print(f'------------------------------------------------------------------------')
for word, predicted, true in mistakes[:10]:
    print(f'{inv_vocab[word]:<28} {inv_tags_vocab[predicted]:<20} {inv_tags_vocab[true]:<15} {word in vocab}')

Word                   Predicted label         True label       In Train
------------------------------------------------------------------------
in                           IN                   NNP             False
unexpectedly                 NN                   RB              False
latest                       NN                   JJS             False
,                            POS                  ,               False
Small-business               IN                   NN              False
prices                       VBD                  NNS             False
was                          NN                   VBD             False
although                     ,                    IN              False
often                        VBZ                  RB              False
.                            VBN                  .               False


# Conclusion

HMM:
* Matrices computation and prediction are fast (computation <1s, prediction ~2s)
* The resulting accuracy on test dataset is **89.44%**
* HMM sometimes makes mistakes in the words that it did not see yet (explained in the pros and cons part)

LSTM:
* Training takes 6 minutes (for the hyperparameters that I chose in the end)
* The resulting accuracy on test dataset is **88.14%**
* LSTM mostly makes mistakes in the words that it did not see before (which receive an \<UNK\> tag during testing)