In [1]:
# Ref: https://www.kaggle.com/code/amirtharajanpks/pos-tagging-with-hmm-and-modified-viterbi
# Different datasets

## Import package

In [2]:
import nltk
from nltk.corpus import treebank
from sklearn.model_selection import train_test_split
import pandas as pd
from collections import Counter
from nltk.probability import FreqDist
import numpy as np

## Load data

In [3]:
nltk.download('treebank')
nltk.download('universal_tagset')

[nltk_data] Downloading package treebank to /root/nltk_data...
[nltk_data]   Unzipping corpora/treebank.zip.
[nltk_data] Downloading package universal_tagset to /root/nltk_data...
[nltk_data]   Unzipping taggers/universal_tagset.zip.


True

In [4]:
data = list(treebank.tagged_sents(tagset='universal'))

In [5]:
train_set, test_set = train_test_split(data,random_state=1,test_size=0.1)

len(train_set),len(test_set)

(3522, 392)

## Computation

In [6]:
train_words = []
train_tags = []
train_pairs = []
for sentence in train_set:
    for word, tag in sentence:
        train_words.append(word)
        train_tags.append(tag)
        train_pairs.append((word, tag))
        
test_words = []
test_tags = []
test_pairs = []
temp_set = []
for sentence in test_set:
    s = []
    for word, tag in sentence:
        if word in train_words:
            test_words.append(word)
            test_tags.append(tag)
            test_pairs.append((word, tag))
            s.append((word, tag))
        temp_set.append(s)
test_set = temp_set

In [7]:
for i in range(10):
    print(train_words[i], '-', train_tags[i])

Cray - NOUN
Research - NOUN
indicated - VERB
that - ADP
the - DET
survival - NOUN
of - ADP
a - DET
spinoff - NOUN
company - NOUN


In [8]:
vocabulary = set(train_words+test_words)
len(vocabulary)

11766

In [9]:
tag_set = sorted(set(train_tags+test_tags))
len(tag_set)

12

In [10]:
tags_counter = Counter(train_tags)
tags_counter

Counter({'.': 10623,
         'ADJ': 5730,
         'ADP': 8821,
         'ADV': 2844,
         'CONJ': 2049,
         'DET': 7858,
         'NOUN': 26119,
         'NUM': 3201,
         'PRON': 2478,
         'PRT': 2930,
         'VERB': 12181,
         'X': 5961})

In [11]:
# Emission probability p(word|tag)
word_given_tags = pd.Series(FreqDist(train_pairs)).reset_index()\
                                .rename(columns={'level_0':'vocabulary','level_1':'tag', 0:'count'})\
                                .reset_index(drop=True).sort_values(by='count',ascending=False)

alpha = 0
L = []
for i, test_word in enumerate(test_words):
    if test_word not in train_words:
        L.append([test_word, test_tags[i], alpha])
# a = pd.DataFrame(L, columns=['vocabulary', 'tag', 'count'])
# word_given_tags = word_given_tags.append(a, ignore_index=True)
word_given_tags

Unnamed: 0,vocabulary,tag,count
10,",",.,4418
4,the,DET,3648
26,.,.,3445
6,of,ADP,2090
68,to,PRT,1978
...,...,...,...
6890,condemning,VERB,1
6889,Delegates,NOUN,1
6887,panic,NOUN,1
6886,lessen,VERB,1


In [12]:
# method to compute word given tag: count
def word_given_tag(word, tag):
    word_given_tag_count = word_given_tags.loc[(word_given_tags['vocabulary'] == word) & (word_given_tags['tag'] == tag),'count']    
    count = word_given_tag_count.to_list()
    return word_given_tag_count.to_list()[0] if len(count) != 0 else 0

In [13]:
print("Count for `he`-`PRON`:", word_given_tag("he", "PRON"))

Count for `he`-`PRON`: 202


### Emission probability

In [14]:
emission_probability = pd.pivot_table(word_given_tags,columns='tag',index='vocabulary',values='count',fill_value=0.0)
emission_probability = emission_probability.apply(lambda x: x/x.sum(), axis=0)
# emission_probability = np.log(emission_probability)
# emission_probability = emission_probability.replace(-np.inf, -9999)
alpha = 1e-10
emission_probability = emission_probability + alpha
emission_probability

tag,.,ADJ,ADP,ADV,CONJ,DET,NOUN,NUM,PRON,PRT,VERB,X
vocabulary,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
!,5.648123e-04,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10
#,1.506166e-03,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10
$,6.184694e-02,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10
%,1.000000e-10,1.745202e-04,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10,1.523795e-02,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10
&,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10,3.709126e-02,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10,1.677572e-04
...,...,...,...,...,...,...,...,...,...,...,...,...
zero,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10,3.124025e-04,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10
zinc,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10,3.828640e-05,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10
zip,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10,8.209517e-05,1.000000e-10
zone,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10,7.657271e-05,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10,1.000000e-10


In [15]:
print("p('he','PRON') =", word_given_tag('he', 'PRON'))
print("p('PRON') =", tags_counter['PRON'])
print("p('he'|'PRON') =", f"{word_given_tag('he', 'PRON')}/{tags_counter['PRON']} = {emission_probability['PRON']['he']}")

p('he','PRON') = 202
p('PRON') = 2478
p('he'|'PRON') = 202/2478 = 0.08151735280379338


### Transition probability

In [16]:
def t2_given_t1(t2, t1):
    
    # get tag count for t1
    count_t1 = tags_counter[t1]
    count_t2_t1 = 0
    
    # iterate through all the tags
    for index in range(len(train_tags) - 1):
        
        # check if t1 is followed by t2 in the on each iterated tag index
        if train_tags[index] == t1 and train_tags[index+1] == t2:
            count_t2_t1 += 1
            
    return (count_t2_t1, count_t1) 

In [17]:
tags_matrix = np.zeros((len(tag_set), len(tag_set)), dtype='float32')

for i, t1 in enumerate(list(tag_set)):
    for j, t2 in enumerate(list(tag_set)): 
        tags_matrix[i, j] = t2_given_t1(t2, t1)[0]/tags_counter[t2]
        # tags_matrix[i, j] = np.log(t2_given_t1(t2, t1)[0]/tags_counter[t2]) # (i,j) entry: p(t1 | t2)

In [18]:
transition_probability = pd.DataFrame(tags_matrix, columns = list(tag_set), index=list(tag_set))
transition_probability

Unnamed: 0,.,ADJ,ADP,ADV,CONJ,DET,NOUN,NUM,PRON,PRT,VERB,X
.,0.093006,0.080628,0.111552,0.193038,0.303563,0.232375,0.090432,0.273977,0.278047,0.00785,0.077416,0.049824
ADJ,0.035301,0.068237,0.049768,0.009494,0.050268,0.003691,0.152839,0.038113,0.001614,0.02116,0.0055,0.019963
ADP,0.033324,0.162478,0.016778,0.042897,0.003416,0.363069,0.110686,0.166198,0.24657,0.004096,0.005829,0.048482
ADV,0.037183,0.063002,0.037411,0.078411,0.008785,0.02507,0.003369,0.028741,0.017353,0.013993,0.08111,0.011407
CONJ,0.006778,0.042757,0.011563,0.040084,0.000488,0.031433,0.027643,0.025617,0.04883,0.003072,0.026106,0.002684
DET,0.01365,0.280803,0.007709,0.033404,0.001952,0.00509,0.192542,0.055295,0.011703,0.000683,0.025285,0.05905
NOUN,0.5923,0.054625,0.519782,0.155063,0.540264,0.043395,0.265094,0.077476,0.050444,0.39215,0.313849,0.129341
NUM,0.034924,0.01815,0.012357,0.003516,0.02001,0.0014,0.043378,0.184005,0.001211,0.031058,0.004433,0.114914
PRON,0.009131,0.031763,0.006689,0.030591,0.005857,0.003181,0.020368,0.005623,0.007264,0.011263,0.097447,0.038249
PRT,0.012049,0.042932,0.006235,0.010549,0.003416,0.037669,0.027375,0.052171,0.022195,0.002048,0.097118,0.007046


## Viterbi Algorithm

In [19]:
def dptable(V):
    # Print a table of steps from dictionary
    yield " " * 5 + "     ".join(("%3d" % i) for i in range(len(V)))
    for state in V[0]:
        yield "%.7s: " % state + " ".join("%.7s" % ("%lf" % v[state]["prob"]) for v in V)
        
def viterbi(obs, states, start_p, trans_p, emit_p):
    V = [{}]
    for st in states:
        V[0][st] = {"prob": start_p[st] * emit_p[st][obs[0]], "prev": None}
    # Run Viterbi when t > 0
    for t in range(1, len(obs)):
        V.append({})
        for st in states:
            max_tr_prob = V[t - 1][states[0]]["prob"] * trans_p[states[0]][st]
            prev_st_selected = states[0]
            for prev_st in states[1:]:
                tr_prob = V[t - 1][prev_st]["prob"] * trans_p[prev_st][st]
                if tr_prob > max_tr_prob:
                    max_tr_prob = tr_prob
                    prev_st_selected = prev_st
            # print(max_tr_prob,st,obs[t])
            max_prob = max_tr_prob * emit_p[st][obs[t]]
            V[t] [st] = {"prob": max_prob, "prev": prev_st_selected}

    # for line in dptable(V):
        # print(line)

    opt = []
    max_prob = 0.0
    best_st = None
    # Get most probable state and its backtrack
    for st, data in V[-1].items():
        if data["prob"] > max_prob:
            
            max_prob = data["prob"]
            best_st = st
    opt.append(best_st)
    previous = best_st

    # Follow the backtrack till the first observation
    for t in range(len(V) - 2, -1, -1):
        opt.insert(0, V[t + 1][previous]["prev"])
        previous = V[t + 1][previous]["prev"]

    # print ("The steps of states are " + " ".join(opt) + " with highest probability of %s" % max_prob)
    return opt

## Test

In [20]:
start_probability = {}
for tag in tag_set:
    start_probability[tag] = 1/len(tag_set)
start_probability

{'.': 0.08333333333333333,
 'ADJ': 0.08333333333333333,
 'ADP': 0.08333333333333333,
 'ADV': 0.08333333333333333,
 'CONJ': 0.08333333333333333,
 'DET': 0.08333333333333333,
 'NOUN': 0.08333333333333333,
 'NUM': 0.08333333333333333,
 'PRON': 0.08333333333333333,
 'PRT': 0.08333333333333333,
 'VERB': 0.08333333333333333,
 'X': 0.08333333333333333}

In [21]:
test_words[:5]

['While', 'the', 'new', 'proposal', 'might']

In [22]:
test_tags[:5]

['ADP', 'DET', 'ADJ', 'NOUN', 'VERB']

In [None]:
accuracy = np.array([0])
for sentence in test_set:
    words, tags = [], []
    for word, tag in sentence:
        words.append(word)
        tags.append(tag)
        tags_pred = viterbi(words, tag_set, start_probability, transition_probability, emission_probability)
    for tag, tag_pred in zip(tags, tags_pred):
        if tag == tag_pred:
            accuracy = np.vstack((accuracy,np.array([1])))
        else:
            accuracy = np.vstack((accuracy,np.array([0])))
accuracy = accuracy[1:] 
print("Accuracy:", np.sum(accuracy)/accuracy.shape[0])

Accuracy: 0.9431271102214014
