## POS tagging using modified Viterbi

### Data Preparation

In [1]:
#Importing libraries
import nltk, re, pprint
import numpy as np
import pandas as pd
import requests
import matplotlib.pyplot as plt
import seaborn as sns
import pprint, time
import random
from sklearn.model_selection import train_test_split
from nltk.tokenize import word_tokenize

In [2]:
# reading the Treebank tagged sentences
nltk_data = list(nltk.corpus.treebank.tagged_sents(tagset='universal'))

In [3]:
# first four tagged sentences
print(nltk_data[:4])

[[('Pierre', 'NOUN'), ('Vinken', 'NOUN'), (',', '.'), ('61', 'NUM'), ('years', 'NOUN'), ('old', 'ADJ'), (',', '.'), ('will', 'VERB'), ('join', 'VERB'), ('the', 'DET'), ('board', 'NOUN'), ('as', 'ADP'), ('a', 'DET'), ('nonexecutive', 'ADJ'), ('director', 'NOUN'), ('Nov.', 'NOUN'), ('29', 'NUM'), ('.', '.')], [('Mr.', 'NOUN'), ('Vinken', 'NOUN'), ('is', 'VERB'), ('chairman', 'NOUN'), ('of', 'ADP'), ('Elsevier', 'NOUN'), ('N.V.', 'NOUN'), (',', '.'), ('the', 'DET'), ('Dutch', 'NOUN'), ('publishing', 'VERB'), ('group', 'NOUN'), ('.', '.')], [('Rudolph', 'NOUN'), ('Agnew', 'NOUN'), (',', '.'), ('55', 'NUM'), ('years', 'NOUN'), ('old', 'ADJ'), ('and', 'CONJ'), ('former', 'ADJ'), ('chairman', 'NOUN'), ('of', 'ADP'), ('Consolidated', 'NOUN'), ('Gold', 'NOUN'), ('Fields', 'NOUN'), ('PLC', 'NOUN'), (',', '.'), ('was', 'VERB'), ('named', 'VERB'), ('*-1', 'X'), ('a', 'DET'), ('nonexecutive', 'ADJ'), ('director', 'NOUN'), ('of', 'ADP'), ('this', 'DET'), ('British', 'ADJ'), ('industrial', 'ADJ'), ('

In [4]:
# Splitting into train and test
random.seed(1234)
train_set, test_set = train_test_split(nltk_data,test_size=0.5)

print(len(train_set))
print(len(test_set))
print(train_set[:5])

1957
1957
[[('It', 'PRON'), ('*EXP*-2', 'X'), ('was', 'VERB'), ('just', 'ADV'), ('a', 'DET'), ('stupid', 'ADJ'), ('mistake', 'NOUN'), ('*', 'X'), ('to', 'PRT'), ('get', 'VERB'), ('the', 'DET'), ('license', 'NOUN'), (',', '.'), ("''", '.'), ('he', 'PRON'), ('said', 'VERB'), ('0', 'X'), ('*T*-3', 'X'), (',', '.'), ('*-1', 'X'), ('adding', 'VERB'), (',', '.'), ('``', '.'), ('I', 'PRON'), ("'d", 'VERB'), ('just', 'ADV'), ('as', 'ADV'), ('soon', 'ADV'), ('not', 'ADV'), ('get', 'VERB'), ('into', 'ADP'), ("''", '.'), ('details', 'NOUN'), ('of', 'ADP'), ('the', 'DET'), ('settlement', 'NOUN'), ('.', '.')], [('Separately', 'ADV'), (',', '.'), ('Reuter', 'NOUN'), ('reported', 'VERB'), ('that', 'ADP'), ('the', 'DET'), ('Papua-New', 'NOUN'), ('Guinea', 'NOUN'), ('government', 'NOUN'), ('urged', 'VERB'), ('its', 'PRON'), ('Parliament', 'NOUN'), ('*-1', 'X'), ('to', 'PRT'), ('extend', 'VERB'), ('a', 'DET'), ('state', 'NOUN'), ('of', 'ADP'), ('emergency', 'NOUN'), ('in', 'ADP'), ('copper-rich', 'ADJ')

In [5]:
# Getting list of tagged words
train_tagged_words = [tup for sent in train_set for tup in sent]
len(train_tagged_words)

50467

In [6]:
# tokens 
tokens = [pair[0] for pair in train_tagged_words]
tokens[:10]

['It', '*EXP*-2', 'was', 'just', 'a', 'stupid', 'mistake', '*', 'to', 'get']

In [7]:
# vocabulary
V = set(tokens)
print(len(V))

8540


In [8]:
# number of tags
T = set([pair[1] for pair in train_tagged_words])
print(T)
len(T)

{'VERB', 'PRON', 'CONJ', 'ADP', 'ADV', 'NOUN', 'NUM', '.', 'X', 'PRT', 'ADJ', 'DET'}


12

In [9]:
# computing P(w/t) and storing in T x V matrix
t = len(T)
v = len(V)
w_given_t = np.zeros((t, v))

### POS Tagging Algorithm - HMM
We'll use the HMM algorithm to tag the words. Given a sequence of words to be tagged, the task is to assign the most probable tag to the word.

In other words, to every word w, assign the tag t that maximises the likelihood P(t/w). Since P(t/w) = P(w/t). P(t) / P(w), after ignoring P(w), we have to compute P(w/t) and P(t).

P(w/t) is basically the probability that given a tag (say NN), what is the probability of it being w (say 'building'). This can be computed by computing the fraction of all NNs which are equal to w, i.e.

P(w/t) = count(w, t) / count(t).

The term P(t) is the probability of tag t, and in a tagging task, we assume that a tag will depend only on the previous tag. In other words, the probability of a tag being NN will depend only on the previous tag t(n-1). So for e.g. if t(n-1) is a JJ, then t(n) is likely to be an NN since adjectives often precede a noun (blue coat, tall building etc.).

Given the penn treebank tagged dataset, we can compute the two terms P(w/t) and P(t) and store them in two large matrices. The matrix of P(w/t) will be sparse, since each word will not be seen with most tags ever, and those terms will thus be zero.

### Emission Probabilities

In [10]:
# compute word given tag: Emission Probability
def word_given_tag(word, tag, train_bag = train_tagged_words):
    tag_list = [pair for pair in train_bag if pair[1]==tag]
    count_tag = len(tag_list)
    w_given_tag_list = [pair[0] for pair in tag_list if pair[0]==word]
    count_w_given_tag = len(w_given_tag_list)
    
    return (count_w_given_tag, count_tag)

### Transition Probabilities

In [11]:
# compute tag given tag: tag2(t2) given tag1 (t1), i.e. Transition Probability

def t2_given_t1(t2, t1, train_bag = train_tagged_words):
    tags = [pair[1] for pair in train_bag]
    count_t1 = len([t for t in tags if t==t1])
    count_t2_t1 = 0
    for index in range(len(tags)-1):
        if tags[index]==t1 and tags[index+1] == t2:
            count_t2_t1 += 1
    return (count_t2_t1, count_t1)

In [12]:
# creating t x t transition matrix of tags
# each column is t2, each row is t1
# thus M(i, j) represents P(tj given ti)

tags_matrix = np.zeros((len(T), len(T)), dtype='float32')
for i, t1 in enumerate(list(T)):
    for j, t2 in enumerate(list(T)): 
        tags_matrix[i, j] = t2_given_t1(t2, t1)[0]/t2_given_t1(t2, t1)[1]

In [13]:
# convert the matrix to a df for better readability
tags_df = pd.DataFrame(tags_matrix, columns = list(T), index=list(T))
tags_df

Unnamed: 0,VERB,PRON,CONJ,ADP,ADV,NOUN,NUM,.,X,PRT,ADJ,DET
VERB,0.168923,0.033402,0.005739,0.092996,0.082549,0.109182,0.021189,0.036345,0.219688,0.028546,0.066804,0.134638
PRON,0.49562,0.00438,0.006569,0.021168,0.034307,0.210949,0.006569,0.032117,0.09562,0.010949,0.073723,0.008029
CONJ,0.143993,0.05742,0.000883,0.059187,0.05212,0.349823,0.04417,0.026502,0.008834,0.0053,0.115724,0.136042
ADP,0.008286,0.06831,0.000808,0.017987,0.01536,0.324576,0.062854,0.037389,0.033953,0.001011,0.107922,0.321544
ADV,0.354997,0.017167,0.007357,0.123237,0.082771,0.030043,0.02943,0.136726,0.016554,0.015328,0.122624,0.063765
NOUN,0.145238,0.005602,0.041842,0.174908,0.017982,0.262881,0.010512,0.241441,0.029255,0.043502,0.012864,0.013971
NUM,0.018857,0.001664,0.013311,0.033278,0.004437,0.36162,0.184138,0.113145,0.207987,0.024404,0.032169,0.004992
.,0.086034,0.067368,0.058035,0.091804,0.052096,0.226201,0.083659,0.092822,0.028339,0.002036,0.044799,0.166638
X,0.21162,0.054485,0.009633,0.145996,0.02649,0.060205,0.003913,0.165864,0.073149,0.17941,0.019868,0.049368
PRT,0.402431,0.017274,0.002559,0.020473,0.008317,0.247601,0.057582,0.040947,0.008957,0.003199,0.09469,0.095969


### Build the vanilla Viterbi based POS tagger

In [14]:
# Viterbi Heuristic
def Viterbi(words, train_bag = train_tagged_words):
    state = []
    T = list(set([pair[1] for pair in train_bag]))
    
    for key, word in enumerate(words):
        #initialise list of probability column for a given observation
        p = [] 
        for tag in T:
            if key == 0:
                transition_p = tags_df.loc['.', tag]
            else:
                transition_p = tags_df.loc[state[-1], tag]
                
            # compute emission and state probabilities
            emission_p = word_given_tag(words[key], tag)[0]/word_given_tag(words[key], tag)[1]
            state_probability = emission_p * transition_p    
            p.append(state_probability)
            
        pmax = max(p)
        # getting state for which probability is maximum
        state_max = T[p.index(pmax)] 
        state.append(state_max)
    return list(zip(words, state))


In [15]:
# Running on entire test dataset would take more than 3-4hrs. 
# Let's test our Viterbi algorithm on a few sample sentences of test dataset

random.seed(1234)

# choose random 5 sents
rndom = [random.randint(1,len(test_set)) for x in range(5)]

# list of sents
test_run = [test_set[i] for i in rndom]

# list of tagged words
test_run_base = [tup for sent in test_run for tup in sent]

# list of untagged words
test_tagged_words = [tup[0] for sent in test_run for tup in sent]


#### Evaluating tagging accuracy

In [16]:
# tagging the test sentences
start = time.time()
tagged_seq = Viterbi(test_tagged_words)
end = time.time()
difference = end-start

print("Time taken in seconds: ", difference)
# accuracy
check = [i for i, j in zip(tagged_seq, test_run_base) if i == j] 
accuracy = len(check)/len(tagged_seq)
print("Vanilla Viterbi Algorithm Accuracy:", accuracy)


Time taken in seconds:  11.360621929168701
Vanilla Viterbi Algorithm Accuracy: 0.9115646258503401


In [17]:
# let's check the incorrectly tagged words
[j for i, j in enumerate(zip(tagged_seq, test_run_base)) if j[0]!=j[1]]


[(('deterioration', 'VERB'), ('deterioration', 'NOUN')),
 (('persons', 'VERB'), ('persons', 'NOUN')),
 (('*-35', 'VERB'), ('*-35', 'X')),
 (('notwithstanding', 'VERB'), ('notwithstanding', 'ADP')),
 (('statutory', 'VERB'), ('statutory', 'ADJ')),
 (('designations', 'VERB'), ('designations', 'NOUN')),
 (('credentials', 'VERB'), ('credentials', 'NOUN')),
 (('benefit', 'NOUN'), ('benefit', 'VERB')),
 (('roll', 'VERB'), ('roll', 'NOUN')),
 (('Chinese', 'ADJ'), ('Chinese', 'NOUN')),
 (('undiplomatic', 'VERB'), ('undiplomatic', 'ADJ')),
 (('fashion', 'VERB'), ('fashion', 'NOUN')),
 (('boosters', 'VERB'), ('boosters', 'NOUN'))]

### Solve the problem of unknown words

We can see that all of unknown words have been tagged as 'VERB' as 'VERB' is the first tag in tag list and is assigned if unknown word is encountered (emission probability =0).
 
### First solution for unknown words : 
### Viterbi Modification-Technique I
assign based on transition probabilities only in case of unknown words as emission probability for unknown word is zero.

In [18]:
# use transition probability of tags when emission probability is zero (in case of unknown words)

def ModifiedViterbi_1(words, train_bag = train_tagged_words):
    state = []
    T = list(set([pair[1] for pair in train_bag]))
    
    for key, word in enumerate(words):
        #initialise list of probability column for a given observation
        p = [] 
        p_transition =[] # list for storing transition probabilities
        for tag in T:
            if key == 0:
                transition_p = tags_df.loc['.', tag]
            else:
                transition_p = tags_df.loc[state[-1], tag]
                
            # compute emission and state probabilities
            emission_p = word_given_tag(words[key], tag)[0]/word_given_tag(words[key], tag)[1]
            state_probability = emission_p * transition_p    
            p.append(state_probability)
            p_transition.append(transition_p)
            
        pmax = max(p)
        state_max = T[p.index(pmax)] 
        
      
        # if probability is zero (unknown word) then use transition probability
        if(pmax==0):
            pmax = max(p_transition)
            state_max = T[p_transition.index(pmax)]
                        
        state.append(state_max)
    return list(zip(words, state))

#### Evaluating tagging accuracy

In [19]:

# tagging the test sentences
start = time.time()
tagged_seq = ModifiedViterbi_1(test_tagged_words)
end = time.time()
difference = end-start

print("Time taken in seconds: ", difference)

# accuracy
check = [i for i, j in zip(tagged_seq, test_run_base) if i == j] 
accuracy = len(check)/len(tagged_seq)
print('Modified Viterbi_1 Accuracy: ',accuracy*100)

Time taken in seconds:  10.19871711730957
Modified Viterbi_1 Accuracy:  93.19727891156462


In [20]:
# let's check the incorrectly tagged words
[j for i, j in enumerate(zip(tagged_seq, test_run_base)) if j[0] != j[1]]

[(('deterioration', 'X'), ('deterioration', 'NOUN')),
 (('notwithstanding', 'NOUN'), ('notwithstanding', 'ADP')),
 (('statutory', 'NOUN'), ('statutory', 'ADJ')),
 (('appearing', 'NOUN'), ('appearing', 'VERB')),
 (('requesting', 'NOUN'), ('requesting', 'VERB')),
 (('benefit', 'NOUN'), ('benefit', 'VERB')),
 (('roll', 'VERB'), ('roll', 'NOUN')),
 (('Chinese', 'ADJ'), ('Chinese', 'NOUN')),
 (('undiplomatic', 'VERB'), ('undiplomatic', 'ADJ')),
 (('fashion', 'X'), ('fashion', 'NOUN'))]

### Viterbi Modification-Technique II
second solution for unknown words:

backoff to rule based tagger in case of unknown words.
we further observe that POS tag 'X' can be easily encapsulated in regex rule, so we extract it only based on ruled based tagged.
Let's define a rule based tagger as below:m

In [21]:
# specify patterns for tagging
patterns = [
    (r'.*ing$', 'VERB'),              # gerund
    (r'.*ed$', 'VERB'),               # past tense 
    (r'.*es$', 'VERB'),               # verb    
    (r'.*\'s$', 'NOUN'),              # possessive nouns
    (r'.*s$', 'NOUN'),                # plural nouns
    (r'\*T?\*?-[0-9]+$', 'X'),        # X
    (r'^-?[0-9]+(.[0-9]+)?$', 'NUM'), # cardinal numbers
    (r'.*', 'NOUN')                   # nouns
]

# rule based tagger
rule_based_tagger = nltk.RegexpTagger(patterns)

In [22]:
# Modification in Viterbi Algorithm : Backoff to rule based tagger in case unknown word is encountered.
def ModifiedViterbi_2(words, train_bag = train_tagged_words):
    state = []
    T = list(set([pair[1] for pair in train_bag]))
    
    for key, word in enumerate(words):
        #initialise list of probability column for a given observation
        p = [] 
        for tag in T:
            if key == 0:
                transition_p = tags_df.loc['.', tag]
            else:
                transition_p = tags_df.loc[state[-1], tag]
                
            # compute emission and state probabilities
            emission_p = word_given_tag(words[key], tag)[0]/word_given_tag(words[key], tag)[1]
            state_probability = emission_p * transition_p    
            p.append(state_probability)
            
        pmax = max(p)
        state_max = rule_based_tagger.tag([word])[0][1]       
       
        
        if(pmax==0):
            state_max = rule_based_tagger.tag([word])[0][1] # assign based on rule based tagger
        else:
            if state_max != 'X':
                # getting state for which probability is maximum
                state_max = T[p.index(pmax)]                
            
        
        state.append(state_max)
    return list(zip(words, state))

#### Evaluating tagging accuracy

In [23]:
# tagging the test sentences
start = time.time()
tagged_seq = ModifiedViterbi_2(test_tagged_words)
end = time.time()
difference = end-start

print("Time taken in seconds: ", difference)


# accuracy
check = [i for i, j in zip(tagged_seq, test_run_base) if i == j] 
accuracy = len(check)/len(tagged_seq)
print('Modified Viterbi_2 Accuracy: ',accuracy*100)

Time taken in seconds:  10.337356328964233
Modified Viterbi_2 Accuracy:  95.23809523809523


In [24]:

# let's check the incorrectly tagged words
[j for i, j in enumerate(zip(tagged_seq, test_run_base)) if j[0] != j[1]]

[(('conclude', 'NOUN'), ('conclude', 'VERB')),
 (('notwithstanding', 'VERB'), ('notwithstanding', 'ADP')),
 (('statutory', 'NOUN'), ('statutory', 'ADJ')),
 (('benefit', 'NOUN'), ('benefit', 'VERB')),
 (('roll', 'VERB'), ('roll', 'NOUN')),
 (('Chinese', 'ADJ'), ('Chinese', 'NOUN')),
 (('undiplomatic', 'NOUN'), ('undiplomatic', 'ADJ'))]

### Compare the tagging accuracies of the modifications with the vanilla Viterbi algorithm


In [25]:

# Vanilla Viterbi Algorithm
start = time.time()
tagged_seq = Viterbi(test_tagged_words)
end = time.time()
difference = end-start

print("Time taken in seconds: ", difference)

# accuracy
check = [i for i, j in zip(tagged_seq, test_run_base) if i == j] 
accuracy = len(check)/len(tagged_seq)
print('vanilla Viterbi Algorithm Accuracy: ',accuracy*100)

Time taken in seconds:  10.634554624557495
vanilla Viterbi Algorithm Accuracy:  91.15646258503402


In [26]:
# Modified Viterbi Algorithm by using transition probability when emission probability is zero
start = time.time()
tagged_seq = ModifiedViterbi_1(test_tagged_words)
end = time.time()
difference = end-start

print("Time taken in seconds: ", difference)

# accuracy
check = [i for i, j in zip(tagged_seq, test_run_base) if i == j] 
accuracy = len(check)/len(tagged_seq)
print('Modified Viterbi Algorithm Accuracy: ',accuracy*100)

Time taken in seconds:  10.441089868545532
Modified Viterbi Algorithm Accuracy:  93.19727891156462


In [27]:
# Modified Viterbi Algorith by using applying rule based 
start = time.time()
tagged_seq = ModifiedViterbi_2(test_tagged_words)
end = time.time()
difference = end-start

print("Time taken in seconds: ", difference)

# accuracy
check = [i for i, j in zip(tagged_seq, test_run_base) if i == j] 
accuracy = len(check)/len(tagged_seq)
print('Modified Viterbi Algorithm Accuracy: ',accuracy*100)

Time taken in seconds:  11.14821982383728
Modified Viterbi Algorithm Accuracy:  95.23809523809523


### List down cases which were incorrectly tagged by original POS tagger and got corrected by your modifications


The following list of words have been correctly POS tagged by ModifiedViterbi_1 as compared to vanilla Viterbi Algorithm:

person: correctly tagged as NOUN

designations: correctly tagged as NOUN

credentials: correctly tagged as NOUN

boosters: correctly tagged as NOUN 

*-35: correctly tagged as 'X'

The following list of words have been correctly POS tagged by ModifiedViterbi_2 as compared to  Viterbi_1 and vanilla Viterbi Algorithm:
 
deterioration: correctly tagged as NOUN

appearing: correctly tagged as VERB

requesting: correctly tagged as VERB

fashion: correctly tagged as NOUN