In [None]:
%config IPCompleter.greedy = True

In [None]:
import warnings
warnings.filterwarnings('ignore')
import numpy as np
import pandas as pd
import random
import pprint

import nltk
from nltk.tokenize import word_tokenize

from sklearn.model_selection import train_test_split

from collections import defaultdict

**Load NLTK and Test Dataset</font>**

In [None]:
import nltk
nltk.download('universal_tagset')

[nltk_data] Downloading package universal_tagset to /root/nltk_data...
[nltk_data]   Unzipping taggers/universal_tagset.zip.


True

In [None]:
from nltk.corpus import treebank
nltk_data = list(nltk.corpus.treebank.tagged_sents(tagset='universal'))

In [None]:
# observe a few tagged sentences from the corpora
print(nltk_data[:2])

[[('Pierre', 'NOUN'), ('Vinken', 'NOUN'), (',', '.'), ('61', 'NUM'), ('years', 'NOUN'), ('old', 'ADJ'), (',', '.'), ('will', 'VERB'), ('join', 'VERB'), ('the', 'DET'), ('board', 'NOUN'), ('as', 'ADP'), ('a', 'DET'), ('nonexecutive', 'ADJ'), ('director', 'NOUN'), ('Nov.', 'NOUN'), ('29', 'NUM'), ('.', '.')], [('Mr.', 'NOUN'), ('Vinken', 'NOUN'), ('is', 'VERB'), ('chairman', 'NOUN'), ('of', 'ADP'), ('Elsevier', 'NOUN'), ('N.V.', 'NOUN'), (',', '.'), ('the', 'DET'), ('Dutch', 'NOUN'), ('publishing', 'VERB'), ('group', 'NOUN'), ('.', '.')]]


**Loading Test Data**

In [None]:
file_object = open(r"/content/Training set_HMM.txt","r")
test_data = file_object.read()
test_data

"i\tPRP\n'd\tMD\nlike\tVB\nto\tTO\ngo\tVB\nto\tIN\na\tDT\nfancy\tJJ\nrestaurant\tNN\n.\t.\n\ni\tPRP\n'd\tMD\nlike\tVB\nfrench\tJJ\nfood\tNN\n.\t.\n\nnext\tJJ\nthursday\tNN\n.\t.\n\nnext\tJJ\nthursday\tNN\n.\t.\n\ndinner\tNN\n.\t.\n\ni\tPRP\nwant\tVBP\nto\tTO\neat\tVB\nfrench\tJJ\nfood\tNN\n.\t.\n\ni\tPRP\nwant\tVBP\nto\tTO\nhave\tVB\ndinner\tNN\n.\t.\n\nit\tPRP\ncan\tMD\nbe\tVB\nreally\tRB\nexpensive\tJJ\n.\t.\n\nas\tRB\nfar\tRB\naway\tRB\nas\tIN\nwe\tPRP\ncan\tMD\nget\tVB\n.\t.\n\nas\tRB\nfar\tRB\naway\tRB\nas\tIN\nwe\tPRP\ncan\tMD\nget\tVB\n.\t.\n\nas\tRB\nfar\tRB\naway\tRB\nas\tIN\nwe\tPRP\ncan\tMD\nget\tVB\n.\t.\n\ni\tPRP\nwant\tVBP\nto\tTO\nbe\tVB\nfar\tRB\naway\tRB\nfrom\tIN\nicsi\tNN\n.\t.\n\ntell\tVB\nme\tPRP\nabout\tIN\nle\tFW\nbateau\tFW\nivre\tFW\n.\t.\n\ntell\tVB\nme\tPRP\nabout\tIN\nle\tFW\nbateau\tFW\nivre\tFW\n.\t.\n\ntell\tVB\nme\tPRP\nabout\tIN\nle\tFW\nbateau\tFW\nivre\tFW\n.\t.\n\ntell\tVB\nme\tPRP\nabout\tIN\nle\tFW\nbateau\tFW\nivre\tFW\n.\t.\n\ntell\tVB\nme\tPRP\n

In [None]:
nltk.download('punkt')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


True

In [None]:
# number of words in the test dataset
test_data_words = nltk.word_tokenize(test_data)
len(test_data_words)

300526

**Tagging Test Dataset With NLTK POS Tagger**

In [None]:
nltk.download('averaged_perceptron_tagger')

[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /root/nltk_data...
[nltk_data]   Unzipping taggers/averaged_perceptron_tagger.zip.


True

In [None]:
test_tagged_words = {}
test_tagged = nltk.pos_tag(test_data_words, tagset='universal')
universal_tagset = [
    'VERB', 'NOUN', 'PRON', 'ADJ', 'ADV', 
    'ADP', 'CONJ', 'DET', 'NUM', 'PRT', 'X', '.'
]

for utag in universal_tagset:
    test_tagged_words[utag] = sorted(
        set([word for (word, tag) in test_tagged if tag == utag]))

i = random.randrange(len(universal_tagset))

pprint.pprint('words with tagged with {}'.format(universal_tagset[i]))
pprint.pprint(test_tagged_words[universal_tagset[i]])

'words with tagged with CONJ'
['NN',
 'and',
 'but',
 'christopher',
 'easier',
 'eighty',
 'either',
 'everett',
 'kosher',
 'less',
 'let',
 'lococo',
 'luther',
 'nakapan',
 'norteno',
 'or',
 'ten',
 'thousand',
 'twelve',
 'yangtze',
 'yeah']


**Split data into train and validation datasets**

In [None]:
train_set, validation_set = train_test_split(nltk_data,
                                             test_size=0.05,
                                             random_state=1234)

print('Number of sentences in train dataset : {0}'.format(len(train_set)))
print('Number of sentences in validation dataset : {0}'.format(len(validation_set)))

Number of sentences in train dataset : 3718
Number of sentences in validation dataset : 196


In [None]:
train_tagged_words = [tup for sent in train_set for tup in sent]

In [None]:
tokens = [pair[0] for pair in train_tagged_words]
print('total number of words in the training set : {0}'.format(len(tokens)))

vocabulary = set(tokens)
print('total number of unique words in the training set: {0}'.format(len(vocabulary)))

total number of words in the training set : 95799
total number of unique words in the training set: 12073


In [None]:
all_tags = [pair[1] for pair in train_tagged_words]
unique_tags = sorted(set(all_tags))

print('number of tags in the universal tagset : {}'.format(len(unique_tags)))
print(unique_tags)

number of tags in the universal tagset : 12
['.', 'ADJ', 'ADP', 'ADV', 'CONJ', 'DET', 'NOUN', 'NUM', 'PRON', 'PRT', 'VERB', 'X']


**Store number of times a tag 'T' appears in the training dataset**

In [None]:
tag_count_dict = dict()

for utag in unique_tags:
    tag_list = [pair[1] for pair in train_tagged_words if pair[1] == utag]
    tag_count_dict[utag] = len(tag_list)
    
print(tag_count_dict)

{'.': 11130, 'ADJ': 6063, 'ADP': 9387, 'ADV': 3052, 'CONJ': 2144, 'DET': 8269, 'NOUN': 27471, 'NUM': 3364, 'PRON': 2619, 'PRT': 3070, 'VERB': 12910, 'X': 6320}


**List of Unknown Words in Validation Dataset**

In [None]:
val_data_unknown_words = [word for sent in validation_set for (word, tag) in sent if word not in vocabulary]
print('number of unknown words in validation data set : {0}'.format(len(set(val_data_unknown_words))))

number of unknown words in validation data set : 335


**List of Unknown Words in Test Dataset**

In [None]:
test_data_unknown_words = [word for word in test_data_words if word not in vocabulary]
print('number of unknown words in test data set : {0}'.format(len(set(test_data_unknown_words))))

number of unknown words in test data set : 678


**Calculate Number of Words correctly tagged in Test Dataset**

In [None]:
def calc_test_dataset_accuracy(tagged_test_set):
    total_words = 0
    correct_tagged_words = 0

    for word, tag in tagged_test_set:
        try:
            list_for_tag = test_tagged_words[tag]
        except KeyError:
            list_for_tag = []

        total_words += 1

        if word in list_for_tag:
            correct_tagged_words += 1

    print('total words - {0}. correctly tagged words - {1}. accuracy - {2}'.
          format(total_words, correct_tagged_words,
                 correct_tagged_words / total_words))

# **HMM Model Parameters**

**Emission Probabilities**

In [None]:
def word_given_tag(word, tag, train_bag=train_tagged_words):

    w_given_tag_list = [
        pair[0] for pair in train_bag if pair[0] == word and pair[1] == tag
    ]
    count_w_given_tag = len(w_given_tag_list)

    return count_w_given_tag

**Transition Probabilities**

In [None]:
def t2_given_t1(t2, t1, train_bag=train_tagged_words):
    
    count_t2_t1 = 0

    for index in range(len(all_tags) - 1):
        if all_tags[index] == t1 and all_tags[index + 1] == t2:
            count_t2_t1 += 1

    return count_t2_t1

In [None]:
tags_matrix = np.zeros((len(unique_tags), len(unique_tags)), dtype='float32')

for i, t1 in enumerate(list(unique_tags)):
    for j, t2 in enumerate(list(unique_tags)):
        count_t1 = tag_count_dict[t1]
        tags_matrix[i, j] = t2_given_t1(t2, t1) / count_t1

In [None]:
df_tag = pd.DataFrame(tags_matrix,
                      columns=list(unique_tags),
                      index=list(unique_tags))

df_tag

Unnamed: 0,.,ADJ,ADP,ADV,CONJ,DET,NOUN,NUM,PRON,PRT,VERB,X
.,0.09407,0.044654,0.090386,0.051932,0.057772,0.173226,0.223091,0.080593,0.065768,0.002336,0.088769,0.027314
ADJ,0.065809,0.065314,0.077519,0.004948,0.016658,0.004948,0.698499,0.021112,0.00066,0.010886,0.012205,0.021442
ADP,0.039842,0.105785,0.016512,0.013849,0.000959,0.322893,0.322893,0.062001,0.070203,0.001491,0.008522,0.035048
ADV,0.134666,0.129751,0.118611,0.081258,0.006881,0.06848,0.031127,0.031455,0.0154,0.014744,0.344364,0.023263
CONJ,0.033116,0.118937,0.052705,0.05597,0.000466,0.11847,0.348881,0.041511,0.057369,0.005131,0.158582,0.008862
DET,0.017777,0.203652,0.009191,0.012698,0.000484,0.005442,0.63865,0.022373,0.003749,0.000242,0.039545,0.046197
NOUN,0.239307,0.012231,0.177023,0.017182,0.042263,0.01325,0.264898,0.009537,0.004769,0.043974,0.146336,0.029231
NUM,0.115933,0.032402,0.035672,0.002973,0.013377,0.002973,0.354637,0.184899,0.001486,0.027051,0.018133,0.210464
PRON,0.040473,0.073692,0.023291,0.032837,0.004582,0.009164,0.207331,0.007255,0.007637,0.011837,0.487972,0.093929
PRT,0.041694,0.084039,0.021173,0.009772,0.00228,0.099674,0.247883,0.056678,0.017915,0.001954,0.402932,0.014007


**Start Probabilities**

In [None]:
df_tag.loc['.', :]

.       0.094070
ADJ     0.044654
ADP     0.090386
ADV     0.051932
CONJ    0.057772
DET     0.173226
NOUN    0.223091
NUM     0.080593
PRON    0.065768
PRT     0.002336
VERB    0.088769
X       0.027314
Name: ., dtype: float32

**Vanilla Viterbi Based POS Tagger**

In [None]:
def Viterbi_Vanilla(words, train_bag=train_tagged_words):
    state = []
    
    for key, word in enumerate(words):
        #initialise list of probability column for a given observation
        p = []
        for tag in unique_tags:
            if key == 0:
                transition_p = df_tag.loc['.', tag]
            else:
                transition_p = df_tag.loc[state[-1], tag]

            # compute emission and state probabilities
            emission_p = word_given_tag(words[key], tag) / tag_count_dict[tag]
            state_probability = emission_p * transition_p
            
            p.append(state_probability)

        pmax = max(p)
        
        # getting state for which probability is maximum
        # tagging unknown words as 'X' to mark those as foreign words
        if pmax == 0:
            state_max = 'X'
        else:    
            state_max = unique_tags[p.index(pmax)]

        state.append(state_max)

    return list(zip(words, state))

**Running Algorithm On Validation Dataseta**

In [None]:
random.seed(1234)

random_indices = [random.randint(1, len(validation_set)) for x in range(5)]

validation_run = [validation_set[i] for i in random_indices]

validation_run_base = [tup for sent in validation_run for tup in sent]

validation_untagged_words = [tup[0] for tup in validation_run_base]

In [None]:
print('number of words in selected validation set : {0}'.format(len(validation_untagged_words)))

number of words in selected validation set : 166


In [None]:
%%time

validation_tagged_sent = Viterbi_Vanilla(validation_untagged_words)

CPU times: user 20.2 s, sys: 6.59 ms, total: 20.2 s
Wall time: 20.2 s


**Model Validation**

In [None]:
correct_tags = [i for i, j in zip(validation_run_base, validation_tagged_sent) if i == j]

accuracy = len(correct_tags) / len(validation_run_base)

accuracy

0.8674698795180723

In [None]:
validation_incorrect_tagged_words = [(i, j) for i, j in zip(validation_run_base, validation_tagged_sent) if i != j]

print(len(validation_incorrect_tagged_words))
validation_incorrect_tagged_words

22


[(('sell', 'NOUN'), ('sell', 'VERB')),
 (('printers', 'NOUN'), ('printers', 'X')),
 (('there', 'ADV'), ('there', 'DET')),
 (('Gunmen', 'NOUN'), ('Gunmen', 'X')),
 (('Lebanon', 'NOUN'), ('Lebanon', 'X')),
 (('assassinated', 'VERB'), ('assassinated', 'X')),
 (('Arabian', 'NOUN'), ('Arabian', 'X')),
 (('pro-Iranian', 'ADJ'), ('pro-Iranian', 'X')),
 (('Islamic', 'NOUN'), ('Islamic', 'X')),
 (('slaying', 'NOUN'), ('slaying', 'X')),
 (('avenge', 'VERB'), ('avenge', 'X')),
 (('beheading', 'NOUN'), ('beheading', 'X')),
 (('terrorists', 'NOUN'), ('terrorists', 'X')),
 (('Riyadh', 'NOUN'), ('Riyadh', 'X')),
 (('Card', 'NOUN'), ('Card', 'X')),
 (('sweepstakes', 'NOUN'), ('sweepstakes', 'X')),
 (('forthcoming', 'ADJ'), ('forthcoming', 'X')),
 (('10-year', 'NUM'), ('10-year', 'ADJ')),
 (('yen-denominated', 'ADJ'), ('yen-denominated', 'X')),
 (('about', 'ADV'), ('about', 'ADP')),
 (('redeeming', 'VERB'), ('redeeming', 'X')),
 (('convert', 'VERB'), ('convert', 'X'))]

**Running Algorithm On Test Dataset**