### Required Imports

In [1]:
import json
import string
import numpy as np
from tqdm import tqdm
from nltk.util import ngrams
from tabulate import tabulate
from collections import defaultdict, Counter
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

### Preparing the dataset

In [2]:
def remove_punctuations(data):
    result = [line.translate(str.maketrans('', '', string.punctuation)) for line in data]
    return result

def load_dataset():
    data_file = open('penn-data.json')
    dataset = np.array(json.load(data_file), dtype=object)
    x, y = remove_punctuations(dataset[:, 0]), dataset[:, 1]
    return [[(word, tag) for word, tag in zip(sentence.split(), tag_seq)] for sentence, tag_seq in zip(x, y)]

In [3]:
dataset = load_dataset()
train, test = train_test_split(dataset, test_size=0.2)

### Hidden Markov Model

In [4]:
class HMM:
    LAPLACE = 0.0000001
    def __init__(self, train, ngram=2, with_context=False):
        self.state_list = ['*'] + list(set([token[1] for sequence in train for token in sequence])) + ['STOP']
        self.transition = self.generate_transition_matrix(train, ngram=ngram)
        self.emission = self.generate_emission_matrix(train, with_context=with_context)
    
    def generate_transition_matrix(self, train, ngram=2, laplace_factor=LAPLACE):
        y = [[token[1] for token in sequence] for sequence in train]
        ngram_tags = []
        for tag_list in y:
            tag_list = ["*"] * (ngram - 1) + tag_list + ["STOP"]
            ngram_tags.extend(ngrams(tag_list, ngram))
        ngram_count = dict(Counter(ngram_tags))

        n_minus_1_gram_tags = []
        for tag_list in y:
            tag_list = ["*"] * (ngram - 1) + tag_list + ["STOP"]
            n_minus_1_gram_tags.extend(ngrams(tag_list, ngram - 1))
        n_minus_1_gram_count = dict(Counter(n_minus_1_gram_tags))

        transition_matrix = defaultdict(lambda: laplace_factor)

        for ngram_tuple in ngram_count:
            n_minus_1_gram_tuple = ngram_tuple[:-1]
            transition_matrix[ngram_tuple] = ngram_count[ngram_tuple] / n_minus_1_gram_count[n_minus_1_gram_tuple]

        return transition_matrix

    def generate_emission_matrix(self, train, with_context=False, laplace_factor=LAPLACE):
        x = [[token[0] for token in sequence] for sequence in train]
        y = [[token[1] for token in sequence] for sequence in train]
        word_tag_count = defaultdict(lambda: 0)
        tag_count = defaultdict(lambda: 0)

        for line, tags in zip(x, y):
            prev_tag = '*'
            for word, tag in zip(line, tags):
                if with_context:
                    tag_count[(tag, prev_tag)] += 1
                    word_tag_count[(word, tag, prev_tag)] += 1
                else:
                    tag_count[(tag,)] += 1
                    word_tag_count[(word, tag)] += 1
                prev_tag = tag
                
        
        emission_matrix = defaultdict(lambda: laplace_factor)
        
        for word_tags in word_tag_count.keys():
            tags = word_tags[1:]
            emission_matrix[word_tags] = word_tag_count[word_tags] / tag_count[tags]

        return emission_matrix

### The Viterbi Algorithm

In [5]:
def kappa(position, state_list):
    return state_list if position not in [0, -1] else ['*']

def viterbi_trigram(sentence, hmm, with_context=False):
    pi = defaultdict(lambda: 0)
    bp = defaultdict(lambda: "OTH")
    pi[(0, '*', '*')] = 1.0
    A = hmm.transition
    B = hmm.emission
    state_list = hmm.state_list

    n = len(sentence)

    for k in range(1, n + 1):
        u_set = kappa(k - 1, state_list)
        v_set = kappa(k, state_list)
        w_set = kappa(k - 2, state_list)

        for v in v_set:
            for u in u_set:
                for w in w_set:
                    if with_context:
                        emission_tuple = (sentence[k - 1], v, u)
                    else:
                        emission_tuple = (sentence[k - 1], v)
                    reach_prob = pi[(k - 1, w, u)] * A[(w, u, v)] * B[emission_tuple]
                    if reach_prob > pi[(k, u, v)]:
                        pi[(k, u, v)] = reach_prob
                        bp[(k, u, v)] = w
    
    u_set = kappa(n - 1, state_list)
    v_set = kappa(n, state_list)
    result_tags = []
    for u in u_set:
        for v in v_set:
            if len(result_tags) == 0:
                result_tags = [v, u]
            if pi[(n, u, v)] * A[(u, v, 'STOP')] > \
            pi[(n, result_tags[1], result_tags[0])] * A[result_tags[1], result_tags[0], 'STOP']:
                result_tags = [v, u]
    
    if n == 0:
        return []
    elif n == 1:
        return [result_tags[0]]
    
    for k in range(n - 2, 0, -1):
        result_tags.append(bp[(k + 2, result_tags[-1], result_tags[-2])])
    
    result_tags.reverse()

    return result_tags

### Evaluation

In [6]:
def evaluate_metrics(true, pred):
    true = [ch for word in true for ch in word]
    pred = [ch for word in pred for ch in word]
    classes = list(set(true))
    classes.sort()
    # accuracy: (tp + tn) / (p + n)
    acc = accuracy_score(true, pred)
    # precision tp / (tp + fp)
    precision = precision_score(true, pred, average=None)
    # recall: tp / (tp + fn)
    recall = recall_score(true, pred, average=None)
    # f1: 2 tp / (2 tp + fp + fn)
    f1 = f1_score(true, pred, average=None)

    return acc, precision, recall, f1, classes

def print_metrics(test_acc, precision, recall, f1, classes):
    print(f"Accuracy of the model: {test_acc}")
    print(tabulate(zip(classes, precision, recall, f1),
                   headers=['Class (Alphabet)', 'Precision', 'Recall', 'F1'],
                   tablefmt='orgtbl'))

def test_and_evaluate(hmm, test_data):
    test = [[token[0] for token in sequence] for sequence in test_data]
    true = [[token[1] for token in sequence] for sequence in test_data]
    pred = []

    for sentence in tqdm(test, total=len(test)):
        pred.append(viterbi_trigram(sentence, hmm, with_context=True))

    accuracy, precision, recall, f1, classes = evaluate_metrics(true, pred)
    print_metrics(accuracy, precision, recall, f1, classes)        


In [7]:
hmm = HMM(train, ngram=3, with_context=True)
test_and_evaluate(hmm, test)

100%|██████████| 783/783 [19:58<00:00,  1.53s/it]
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


Accuracy of the model: 0.7687766892740517
| Class (Alphabet)   |   Precision |    Recall |       F1 |
|--------------------+-------------+-----------+----------|
| ''                 |   0         | 0         | 0        |
| -LRB-              |   0         | 0         | 0        |
| -RRB-              |   0.652174  | 0.535714  | 0.588235 |
| :                  |   0.619048  | 0.393939  | 0.481481 |
| CC                 |   0.0357143 | 0.0192308 | 0.025    |
| CD                 |   0.794805  | 0.72      | 0.755556 |
| DT                 |   0.813031  | 0.790634  | 0.801676 |
| EX                 |   0.789119  | 0.881027  | 0.832544 |
| FW                 |   0.909091  | 0.526316  | 0.666667 |
| IN                 |   0         | 0         | 0        |
| JJ                 |   0.774687  | 0.870313  | 0.81972  |
| JJR                |   0.677768  | 0.683303  | 0.680524 |
| JJS                |   0.661538  | 0.544304  | 0.597222 |
| LS                 |   0.814815  | 0.578947  | 0.676923 

### Working on the given case of POS Tagging
We run the viterbi algorithm on the given sentence using the parameters learned through the Hidden Markov Model. 

In [11]:
sentence = "That former Sri Lanka skipper and ace batsman Aravinda De Silva is a man of few\
    words was very much evident on Wednesday when the legendary batsman , who has\
        always let his bat talk , struggled to answer a barrage of questions at a function to_F\
            promote".split()

predicted_tags = viterbi_trigram(sentence, hmm, with_context=True)
for word, tag in zip(sentence, predicted_tags):
    print(f'{word}_/{tag}', end=' ')

That_/DT former_/JJ Sri_/NN Lanka_/IN skipper_/NNP and_/CC ace_/NNP batsman_/NNP Aravinda_/NNP De_/NNP Silva_/NNP is_/VBZ a_/DT man_/NN of_/JJ few_/JJ words_/NN was_/NN very_/RB much_/JJ evident_/NN on_/IN Wednesday_/NNP when_/WRB the_/DT legendary_/NNP batsman_/NNP ,_/NNP who_/WP has_/VBZ always_/RB let_/VBN his_/PRP$ bat_/JJ talk_/NN ,_/CC struggled_/VBD to_/TO answer_/VB a_/DT barrage_/JJ of_/NN questions_/NNS at_/IN a_/DT function_/NN to_F_/TO promote_/VB 