In [17]:
import os
import nltk
import numpy as np
import pandas as pd
import random
from sklearn.model_selection import train_test_split
import pprint, time


ROOT_DIR = '../' if 'HMM' in os.getcwd() else os.getcwd() # setting the root dir
POS_DIR = os.path.join(ROOT_DIR, 'dataset') # setting the pos dir

pos_train = os.path.join(POS_DIR, "train.txt") 

In [18]:
def format_data(fname):
    sentences = [] # master list
    with open(fname) as f:
        content = f.readlines()
    
    sentence = [] # local list
    for line in content:
        if line !='\n':
            line = line.strip() # remove leading/trailing spaces
            word = line.split()[0].lower() # get the word
            pos = ""
            pos = line.split()[1] # get the pos tag
            sentence.append((word, pos)) # create a pair and save to local list
        else:
            sentences.append(sentence) # once a \n is detected, append the local sentence to master sentence
            sentence = []
    return sentences

datalist = format_data(pos_train)

print(len(datalist))

8936


In [19]:
train_set,test_set =train_test_split(datalist,train_size=0.80,test_size=0.20,random_state = 101)
print(len(train_set), len(test_set))

7148 1788


In [20]:
train_tagged_words = [ tup for sent in train_set for tup in sent ]
test_tagged_words = [ tup for sent in test_set for tup in sent ]
print(len(train_tagged_words), len(test_tagged_words))

170288 41439


In [21]:
tags = {tag for word,tag in train_tagged_words}
print(tags, len(tags))
vocab = {word for word,tag in train_tagged_words}

{'WRB', 'VB', '#', ',', 'POS', 'FW', 'SYM', "''", 'RB', '``', 'PRP$', 'RBS', 'NNP', 'NNS', 'UH', 'MD', 'WP$', 'JJ', 'IN', 'NNPS', 'CC', 'TO', 'VBZ', 'VBG', 'VBN', '(', 'JJS', ':', 'RP', 'PRP', 'NN', '.', 'WDT', 'WP', 'CD', 'VBD', 'PDT', 'DT', '$', 'RBR', ')', 'EX', 'JJR', 'VBP'} 44


In [27]:
def compute_emmision(word, tag, train_bag = train_tagged_words):
    tag_list = [pair for pair in train_bag if pair[1]==tag]
    count_tag = len(tag_list)#total number of times the passed tag occurred in train_bag
    w_given_tag_list = [pair[0] for pair in tag_list if pair[0]==word]
#now calculate the total number of times the passed word occurred as the passed tag.
    count_w_given_tag = len(w_given_tag_list)
 
     
    return count_w_given_tag /count_tag

transition_tags = [pair[1] for pair in train_tagged_words]


def compute_transition(t2, t1):
    count_t1 = len([t for t in transition_tags if t==t1])
    count_t2_t1 = 0
    for index in range(len(transition_tags)-1):
        if transition_tags[index]==t1 and transition_tags[index+1] == t2:
            count_t2_t1 += 1
    return count_t2_t1/count_t1

In [23]:
tags_matrix = np.zeros((len(tags), len(tags)), dtype='float32')
for i, t1 in enumerate(list(tags)):
    for j, t2 in enumerate(list(tags)): 
        tags_matrix[i, j] = compute_transition(t2, t1)
print(tags_matrix)

[[0.         0.00263158 0.         ... 0.00789474 0.00263158 0.00789474]
 [0.00519103 0.00685216 0.00020764 ... 0.00103821 0.01349668 0.00020764]
 [0.         0.         0.         ... 0.         0.         0.        ]
 ...
 [0.         0.         0.         ... 0.         0.         0.1875    ]
 [0.         0.01023392 0.         ... 0.         0.         0.        ]
 [0.00346771 0.00433463 0.         ... 0.00173385 0.00953619 0.00130039]]


In [25]:
tags_df = pd.DataFrame(tags_matrix, columns = list(tags), index=list(tags))
display(tags_df)

Unnamed: 0,WRB,VB,#,",",POS,FW,SYM,'',RB,``,...,CD,VBD,PDT,DT,$,RBR,),EX,JJR,VBP
WRB,0.0,0.002632,0.0,0.002632,0.0,0.0,0.0,0.0,0.034211,0.0,...,0.010526,0.0,0.0,0.260526,0.0,0.0,0.0,0.007895,0.002632,0.007895
VB,0.005191,0.006852,0.000208,0.016819,0.000208,0.0,0.0,0.00353,0.046719,0.005399,...,0.021179,0.001453,0.000415,0.214909,0.007475,0.002284,0.0,0.001038,0.013497,0.000208
#,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
",",0.006132,0.004628,0.0,0.0,0.000231,0.000116,0.0,0.056925,0.04628,0.013074,...,0.019206,0.052412,0.0,0.13294,0.00162,0.000579,0.0,0.002777,0.00243,0.007636
POS,0.000705,0.0,0.001409,0.004228,0.0,0.000705,0.0,0.0,0.007047,0.007047,...,0.030303,0.006342,0.0,0.0,0.008457,0.0,0.0,0.0,0.001409,0.0
FW,0.0,0.0,0.0,0.035714,0.0,0.035714,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
SYM,0.0,0.0,0.0,0.0,0.0,0.2,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
'',0.005853,0.004181,0.0,0.000836,0.0,0.0,0.0,0.005853,0.0301,0.035953,...,0.010033,0.076923,0.0,0.096154,0.000836,0.000836,0.000836,0.000836,0.0,0.003344
RB,0.003398,0.109475,0.000378,0.09211,0.000566,0.000189,0.0,0.001133,0.059456,0.001133,...,0.022273,0.048509,0.000566,0.051529,0.0151,0.004908,0.000189,0.000755,0.017743,0.030389
``,0.011457,0.022095,0.0,0.0,0.0,0.0,0.0,0.0,0.051555,0.0,...,0.007365,0.011457,0.000818,0.153028,0.0,0.000818,0.0,0.01964,0.005728,0.017185


In [26]:
def Viterbi(words, train_bag = train_tagged_words):
    state = []
    T = list(set([pair[1] for pair in train_bag]))
     
    for key, word in enumerate(words):
        #initialise list of probability column for a given observation
        p = [] 
        for tag in T:
            if key == 0:
                transition_p = tags_df.loc['.', tag]
            else:
                transition_p = tags_df.loc[state[-1], tag]
                 
            # compute emission and state probabilities
            emission_p = compute_emmision(words[key], tag)
            state_probability = emission_p * transition_p    
            p.append(state_probability)
             
        pmax = max(p)
        # getting state for which probability is maximum
        state_max = T[p.index(pmax)] 
        state.append(state_max)
    return list(zip(words, state))

In [None]:
# Let's test our Viterbi algorithm on a few sample sentences of test dataset
random.seed(1234)      #define a random seed to get same sentences when run multiple times
 
# choose random 10 numbers
rndom = [random.randint(1,len(test_set)) for x in range(10)]
 
# list of 10 sents on which we test the model
test_run = [test_set[i] for i in rndom]
 
# list of tagged words
test_run_base = [tup for sent in test_run for tup in sent]
 
# list of untagged words
test_tagged_words = [tup[0] for sent in test_run for tup in sent]


In [None]:
#Here We will only test 10 sentences to check the accuracy
#as testing the whole training set takes huge amount of time
start = time.time()
tagged_seq = Viterbi(test_tagged_words)
end = time.time()
difference = end-start
 
print("Time taken in seconds: ", difference)
 
# accuracy
check = [i for i, j in zip(tagged_seq, test_run_base) if i == j] 
 
accuracy = len(check)/len(tagged_seq)
print('Viterbi Algorithm Accuracy: ',accuracy*100)

In [None]:
#Code to test all the test sentences
#(takes alot of time to run s0 we wont run it here)
# tagging the test sentences()
test_tagged_words = [tup for sent in test_set for tup in sent]
test_untagged_words = [tup[0] for sent in test_set for tup in sent]
test_untagged_words
 
start = time.time()
tagged_seq = Viterbi(test_untagged_words)
end = time.time()
difference = end-start
 
print("Time taken in seconds: ", difference)
 
# accuracy
check = [i for i, j in zip(test_tagged_words, test_untagged_words) if i == j] 
 
accuracy = len(check)/len(tagged_seq)
print('Viterbi Algorithm Accuracy: ',accuracy*100)

In [None]:
#To improve the performance,we specify a rule base tagger for unknown words 
# specify patterns for tagging
patterns = [
    (r'.*ing$', 'VERB'),              # gerund
    (r'.*ed$', 'VERB'),               # past tense 
    (r'.*es$', 'VERB'),               # verb    
    (r'.*\'s$', 'NOUN'),              # possessive nouns
    (r'.*s$', 'NOUN'),                # plural nouns
    (r'\*T?\*?-[0-9]+$', 'X'),        # X
    (r'^-?[0-9]+(.[0-9]+)?$', 'NUM'), # cardinal numbers
    (r'.*', 'NOUN')                   # nouns
]
 
# rule based tagger
rule_based_tagger = nltk.RegexpTagger(patterns)

In [None]:
#modified Viterbi to include rule based tagger in it
def Viterbi_rule_based(words, train_bag = train_tagged_words):
    state = []
    T = list(set([pair[1] for pair in train_bag]))
     
    for key, word in enumerate(words):
        #initialise list of probability column for a given observation
        p = [] 
        for tag in T:
            if key == 0:
                transition_p = tags_df.loc['.', tag]
            else:
                transition_p = tags_df.loc[state[-1], tag]
                 
            # compute emission and state probabilities
            emission_p = word_given_tag(words[key], tag)[0]/word_given_tag(words[key], tag)[1]
            state_probability = emission_p * transition_p    
            p.append(state_probability)
             
        pmax = max(p)
        state_max = rule_based_tagger.tag([word])[0][1]       
        
         
        if(pmax==0):
            state_max = rule_based_tagger.tag([word])[0][1] # assign based on rule based tagger
        else:
            if state_max != 'X':
                # getting state for which probability is maximum
                state_max = T[p.index(pmax)]                
             
         
        state.append(state_max)
    return list(zip(words, state))

In [None]:
#test accuracy on subset of test data 
start = time.time()
tagged_seq = Viterbi_rule_based(test_tagged_words)
end = time.time()
difference = end-start
 
print("Time taken in seconds: ", difference)
 
# accuracy
check = [i for i, j in zip(tagged_seq, test_run_base) if i == j] 
 
accuracy = len(check)/len(tagged_seq)
print('Viterbi Algorithm Accuracy: ',accuracy*100)