In [1]:
import string
import numpy as np
import pandas as pd
from functions import get_word_tag,preprocess
from collections import defaultdict
import math

In [2]:
with open('train.pos','r') as f:
    train = f.readlines()
    
train[0:5]

['In\tIN\n', 'an\tDT\n', 'Oct.\tNNP\n', '19\tCD\n', 'review\tNN\n']

In [3]:
with open('vocab.txt','r') as f:
    vocab = f.read().split('\n')

print(vocab[0:50]) # first 50 words
print('\n',vocab[-50:]) # last 50  words

['!', '#', '$', '%', '&', "'", "''", "'40s", "'60s", "'70s", "'80s", "'86", "'90s", "'N", "'S", "'d", "'em", "'ll", "'m", "'n'", "'re", "'s", "'til", "'ve", '(', ')', ',', '-', '--', '--n--', '--unk--', '--unk_adj--', '--unk_adv--', '--unk_digit--', '--unk_noun--', '--unk_punct--', '--unk_upper--', '--unk_verb--', '.', '...', '0.01', '0.0108', '0.02', '0.03', '0.05', '0.1', '0.10', '0.12', '0.13', '0.15']

 ['yards', 'yardstick', 'year', 'year-ago', 'year-before', 'year-earlier', 'year-end', 'year-on-year', 'year-round', 'year-to-date', 'year-to-year', 'yearlong', 'yearly', 'years', 'yeast', 'yelled', 'yelling', 'yellow', 'yen', 'yes', 'yesterday', 'yet', 'yield', 'yielded', 'yielding', 'yields', 'you', 'young', 'younger', 'youngest', 'youngsters', 'your', 'yourself', 'youth', 'youthful', 'yuppie', 'yuppies', 'zero', 'zero-coupon', 'zeroing', 'zeros', 'zinc', 'zip', 'zombie', 'zone', 'zones', 'zoning', '{', '}', '']


In [4]:
vocab_dict = {}
for i,word in enumerate(sorted(vocab)):
    vocab_dict[word] = i
    
c=0
for a,b in vocab_dict.items():
    print(f"{a}:{b}")
    c=c+1
    if(c>10):
        break

:0
!:1
#:2
$:3
%:4
&:5
':6
'':7
'40s:8
'60s:9
'70s:10


In [5]:
with open('test.pos','r') as f:
    y = f.readlines()

print(y[0:10])

['The\tDT\n', 'economy\tNN\n', "'s\tPOS\n", 'temperature\tNN\n', 'will\tMD\n', 'be\tVB\n', 'taken\tVBN\n', 'from\tIN\n', 'several\tJJ\n', 'vantage\tNN\n']


In [6]:
_,prep = preprocess(vocab,'test.words')
print(len(prep))
print(prep[0:10])
# 10th word is 'vantage' which is not in vocab therefore --unk--

34199
['The', 'economy', "'s", 'temperature', 'will', 'be', 'taken', 'from', 'several', '--unk--']


In [7]:
def matrices(training,vocab):
    emission_counts = defaultdict(int)
    transistion_counts = defaultdict(int)
    tag_counts = defaultdict(int)
    prev_tag = '--s--'
    i=0
    for word_tag in training:
        
        i+=1
        if(i%50000==0):
            print(f"Iteration: {i}")
        
        word,tag = get_word_tag(word_tag,vocab)
        
        emission_counts[(tag,word)]+=1
        transistion_counts[(prev_tag,tag)]+=1
        tag_counts[tag]+=1
        
        prev_tag = tag
    
    return emission_counts,transistion_counts,tag_counts

In [8]:
emission_counts,transistion_counts,tag_counts = matrices(train,vocab)

Iteration: 50000
Iteration: 100000
Iteration: 150000
Iteration: 200000
Iteration: 250000
Iteration: 300000
Iteration: 350000
Iteration: 400000
Iteration: 450000
Iteration: 500000
Iteration: 550000
Iteration: 600000
Iteration: 650000
Iteration: 700000
Iteration: 750000
Iteration: 800000
Iteration: 850000
Iteration: 900000
Iteration: 950000


In [9]:
hidden_states = sorted(tag_counts.keys())
print(len(hidden_states))
print(hidden_states)

46
['#', '$', "''", '(', ')', ',', '--s--', '.', ':', 'CC', 'CD', 'DT', 'EX', 'FW', 'IN', 'JJ', 'JJR', 'JJS', 'LS', 'MD', 'NN', 'NNP', 'NNPS', 'NNS', 'PDT', 'POS', 'PRP', 'PRP$', 'RB', 'RBR', 'RBS', 'RP', 'SYM', 'TO', 'UH', 'VB', 'VBD', 'VBG', 'VBN', 'VBP', 'VBZ', 'WDT', 'WP', 'WP$', 'WRB', '``']


In [10]:
def predict(prep,vocab,emission_counts,states,y):
    correct = 0
    total = len(prep)
    
    for word,y_tup in zip(prep,y):
        y_tup_l = y_tup.split()
        if len(y_tup_l) == 2:
            true_label = y_tup_l[1]
        else:
            continue
        
        if word in vocab:
            count = 0
            pos_pred = '' 
            for pos in states:
                aa = emission_counts[(pos,word)]
                if(aa>count):
                    count = aa
                    pos_pred = pos
            
            if(pos_pred == true_label):
                correct = correct + 1
    accuracy = correct/total
    return accuracy

In [11]:
predict(prep,vocab,emission_counts,hidden_states,y)*100

88.88563993099213

In [12]:
# 88.88% Accuracy
# Increasing the Accuracy using the Hidden Markov Models and the Viterbi Algorithm

In [13]:
def transistion_matrix(epsilon,transistion_counts,tag_counts):
    all_tags = sorted(tag_counts.keys())
    N = len(all_tags)
    
    t_matrix = np.zeros((N,N))
    
    for i in range(N):
        for j in range(N):
            key = (all_tags[i],all_tags[j])
            count = tag_counts[(all_tags[i])]
            t_matrix[i][j] = (transistion_counts[(key)]+epsilon)/(count+(N*epsilon))
    
    return t_matrix

In [14]:
def emission_matrix(epsilon,tag_counts,emission_counts,vocab):
    all_tags = sorted(tag_counts.keys())
    N = len(all_tags)
    m = len(vocab)
    e_matrix = np.zeros((N,m))
    for i in range(N):
        for j in range(m):
            count = tag_counts[all_tags[i]]
            key = (all_tags[i],vocab[j])
            e_matrix[i][j] = (emission_counts[key]+epsilon)/(count+(m*epsilon))
    return e_matrix

In [15]:
t_matrix = transistion_matrix(0.001,transistion_counts,tag_counts)
e_matrix = emission_matrix(0.001,tag_counts,emission_counts,list(vocab_dict))

In [16]:
def initialize(states,A,B,tag_counts,vocab,prep):
    N = len(tag_counts)
    m = len(prep)
    
    best_probs = np.zeros((N,m))
    best_paths = np.zeros((N,m),dtype=int)
    s_idx = states.index('--s--')
    
    for i in range(N):
        if A[s_idx][i]==0:
            best_probs[i][0] = float('-inf')
        else:
            best_probs[i][0] = math.log(A[s_idx][i]) + math.log(B[i][vocab[prep[0]]])
            # vocab[corpus[0]] gives you the index of the first test word in the matrix B(e_matrix) 
    return best_probs,best_paths

In [17]:
best_probs,best_paths = initialize(hidden_states,t_matrix,e_matrix,tag_counts,vocab_dict,prep)

In [18]:
def viterbi_forward(A, B, prep, best_probs, best_paths, vocab):
    N = best_probs.shape[0]

    for i in range(1, len(prep)): # first array for words in the corpus
        if i % 5000 == 0:
            print("Words processed: {:>8}".format(i))

        for j in range(N): # second array for present word pos tag
            best_prob_i =  float("-inf")
            best_path_i = None

            for k in range(N): # third array for previous word pos tag
                prob = best_probs[k,i-1]+math.log(A[k,j]) +math.log(B[j,vocab[prep[i]]])
                if prob > best_prob_i: # calculating max prob happens from which pos tag
                    best_prob_i = prob 
                    best_path_i = k 

            best_probs[j,i] = best_prob_i # storing max prob in best_prob
            best_paths[j,i] = best_path_i # storing index of prev pos tag in best_path

    return best_probs, best_paths

In [19]:
best_probs, best_paths = viterbi_forward(t_matrix, e_matrix, prep, best_probs, best_paths, vocab_dict)

Words processed:     5000
Words processed:    10000
Words processed:    15000
Words processed:    20000
Words processed:    25000
Words processed:    30000


In [20]:
def viterbi_backward(best_probs, best_paths, prep, states):
    m = best_paths.shape[1] 
    z = [None] * m
    N = best_probs.shape[0]
    best_prob_for_last_word = float('-inf')
    pred = [None] * m
    
    for k in range(N): 
        if best_probs[k,-1]>best_prob_for_last_word: 
            best_prob_for_last_word = best_probs[k,-1]
            z[m-1]=k
            
    pred[m-1] = states[z[m-1]]
    
    for i in range(len(prep)-1, -1, -1): # to go from len(prep)-1 to 0 with i=i-1 at each iteration
        pos_tag_for_word_i = best_paths[np.argmax(best_probs[:,i]),i]
        z[i-1] = best_paths[pos_tag_for_word_i,i]
        pred[i-1] = states[pos_tag_for_word_i]
    
    return pred

In [21]:
pred = viterbi_backward(best_probs, best_paths, prep, hidden_states)

In [22]:
def compute_accuracy(pred, y):
    num_correct = 0
    total = 0
    
    for prediction, y in zip(pred, y):
        word_tag_tuple = y.split()
        if len(word_tag_tuple)!=2:
            continue 
        word, tag = word_tag_tuple
        if prediction == tag: 
            num_correct += 1
        total += 1
    
    return (num_correct/total)

In [23]:
print(f"Accuracy is {compute_accuracy(pred, y)*100}")
# Thus a 7% rise in accuracy using  the Hidden Markov Model and Viterbi Algorithm

Accuracy is 95.27592609502938
