In [30]:
from utils import get_word_tag, preprocess  
import pandas as pd
from collections import defaultdict
import math
import numpy as np

In [52]:
with open("WSJ_02-21.pos", 'r') as f:
    training_corpus = f.readlines()

In [56]:
import pickle

In [58]:
file = open('vocab.pkl', 'rb')
vocab = pickle.load(file)
file.close()
cnt = 0
for k,v in vocab.items():
    print(f"{k}:{v}")
    cnt += 1
    if cnt > 20:
        break

 :0
!:1
#:2
$:3
%:4
&:5
':6
'':7
'40s:8
'60s:9
'70s:10
'80s:11
'86:12
'90s:13
'N:14
'S:15
'd:16
'em:17
'll:18
'm:19
'n':20


In [59]:
with open("WSJ_24.pos", 'r') as f:
    y = f.readlines()

print("A sample of the test corpus")
print(y[0:10])

A sample of the test corpus
['The\tDT\n', 'economy\tNN\n', "'s\tPOS\n", 'temperature\tNN\n', 'will\tMD\n', 'be\tVB\n', 'taken\tVBN\n', 'from\tIN\n', 'several\tJJ\n', 'vantage\tNN\n']


In [60]:
tokens = [line.split('\t')[0] for line in y]
_,prep = preprocess(vocab, tokens)  

print('The length of the preprocessed test corpus: ', len(prep))
print('This is a sample of the test_corpus: ')
print(prep[0:10])

The length of the preprocessed test corpus:  34199
This is a sample of the test_corpus: 
['The', 'economy', "'s", 'temperature', 'will', 'be', 'taken', 'from', 'several', '--unk_noun--']


In [61]:
def create_dictionaries(training_corpus, vocab):
    emission_counts = defaultdict(int)
    transition_counts = defaultdict(int)
    tag_counts = defaultdict(int)

    prev_tag = '--s--' 
    for word_tag in training_corpus:
        word, tag = get_word_tag(word_tag,vocab) 
        transition_counts[(prev_tag, tag)] += 1
        emission_counts[(tag, word)] += 1
        tag_counts[tag] += 1
        prev_tag = tag
    return emission_counts, transition_counts, tag_counts

In [62]:
emission_counts, transition_counts, tag_counts = create_dictionaries(training_corpus, vocab)

In [63]:
states = sorted(tag_counts.keys())
print(f"Number of POS tags (number of 'states'): {len(states)}")
print("View these POS tags (states)")
print(states)

Number of POS tags (number of 'states'): 46
View these POS tags (states)
['#', '$', "''", '(', ')', ',', '--s--', '.', ':', 'CC', 'CD', 'DT', 'EX', 'FW', 'IN', 'JJ', 'JJR', 'JJS', 'LS', 'MD', 'NN', 'NNP', 'NNPS', 'NNS', 'PDT', 'POS', 'PRP', 'PRP$', 'RB', 'RBR', 'RBS', 'RP', 'SYM', 'TO', 'UH', 'VB', 'VBD', 'VBG', 'VBN', 'VBP', 'VBZ', 'WDT', 'WP', 'WP$', 'WRB', '``']


In [64]:
def create_transition_matrix(alpha, tag_counts, transition_counts):
    all_tags = sorted(tag_counts.keys())
    num_tags = len(all_tags)
    A = np.zeros((num_tags,num_tags))
    trans_keys = set(transition_counts.keys())
    for i in range(num_tags):
        for j in range(num_tags):
            count = 0

            key = (all_tags[i],all_tags[j])
            if key in transition_counts: 

                count = transition_counts[key]
            count_prev_tag = tag_counts[all_tags[i]]
            A[i,j] = (count + alpha) / (count_prev_tag + alpha*num_tags) 
    return A

In [65]:
alpha = 0.001
A = create_transition_matrix(alpha, tag_counts, transition_counts)

In [66]:
def create_emission_matrix(alpha, tag_counts, emission_counts, vocab):

    num_tags = len(tag_counts)
    
    all_tags = sorted(tag_counts.keys())
    
    num_words = len(vocab)
    B = np.zeros((num_tags, num_words))

    emis_keys = set(list(emission_counts.keys()))

    for i in range(num_tags):
        for j in range(num_words): 

            count = 0
            key = (all_tags[i],vocab[j])
            if key in emission_counts.keys(): 
                count = emission_counts[key]
            count_tag = tag_counts[all_tags[i]]
            B[i,j] = (count + alpha) / (count_tag+ alpha*num_words)
    return B

In [67]:
B = create_emission_matrix(alpha, tag_counts, emission_counts, list(vocab))

In [68]:

def initialize(states, tag_counts, A, B, corpus, vocab):
   
    num_tags = len(tag_counts)
    best_probs = np.zeros((num_tags, len(corpus)))
    best_paths = np.zeros((num_tags, len(corpus)), dtype=int)
    s_idx = states.index("--s--")
    for i in range(num_tags): 
        if A[s_idx,i] == 0:
            best_probs[i,0] = float('-inf')
        else:
            best_probs[i,0] = math.log(A[s_idx,i]) + math.log(B[i,vocab[corpus[0]]] )
    return best_probs, best_paths

In [69]:
best_probs, best_paths = initialize(states, tag_counts, A, B, prep, vocab)

In [70]:

def viterbi_forward(A, B, test_corpus, best_probs, best_paths, vocab):
 
    num_tags = best_probs.shape[0]
    for i in range(1, len(test_corpus)): 
        if i % 5000 == 0:
            print("Words processed: {:>8}".format(i))
        for j in range(num_tags):
            
            best_prob_i =  float("-inf")
            best_path_i = None
            for k in range(num_tags):
                prob = best_probs[k,i-1]+math.log(A[k,j]) +math.log(B[j,vocab[test_corpus[i]]])
                if prob > best_prob_i: 
                    best_prob_i = prob
                    best_path_i = k
            best_probs[j,i] = best_prob_i
            best_paths[j,i] = best_path_i
    return best_probs, best_paths

In [71]:
best_probs, best_paths = viterbi_forward(A, B, prep, best_probs, best_paths, vocab)

Words processed:     5000
Words processed:    10000
Words processed:    15000
Words processed:    20000
Words processed:    25000
Words processed:    30000


In [72]:
def viterbi_backward(best_probs, best_paths, corpus, states):

    m = best_paths.shape[1] 
    z = [None] * m

    num_tags = best_probs.shape[0]
    best_prob_for_last_word = float('-inf')
    pred = [None] * m
    
    for k in range(num_tags): 
        if best_probs[k, m - 1]>best_prob_for_last_word: # complete this line
            best_prob_for_last_word = best_probs[k, m - 1]
            z[m - 1]=k
            
    pred[m - 1] = states[z[m - 1]]
    for i in range(m-1, -1, -1): # complete this line
        
        pos_tag_for_word_i = z[i]
        z[i - 1] = best_paths[pos_tag_for_word_i,i]
        pred[i - 1] = states[z[i - 1]]
    return pred

In [73]:
pred = viterbi_backward(best_probs, best_paths, prep, states)
m=len(pred)
print('The prediction for pred[-7:m-1] is: \n', prep[-7:m-1], "\n", pred[-7:m-1], "\n")
print('The prediction for pred[0:8] is: \n', pred[0:7], "\n", prep[0:7])

The prediction for pred[-7:m-1] is: 
 ['see', 'them', 'here', 'with', 'us', '.'] 
 ['VB', 'PRP', 'RB', 'IN', 'PRP', '.'] 

The prediction for pred[0:8] is: 
 ['DT', 'NN', 'POS', 'NN', 'MD', 'VB', 'VBN'] 
 ['The', 'economy', "'s", 'temperature', 'will', 'be', 'taken']


In [74]:
print('The third word is:', prep[3])
print('Your prediction is:', pred[3])
print('Your corresponding label y is: ', y[3])

The third word is: temperature
Your prediction is: NN
Your corresponding label y is:  temperature	NN



In [77]:
def compute_accuracy(pred, y):
    num_correct = 0
    total = 0
    for prediction, y in zip(pred, y):
        word_tag_tuple = y.split()
        if len(word_tag_tuple)!=2: 
            continue 
        word, tag = word_tag_tuple
        if prediction == tag: 
            num_correct += 1
        total += 1
    return num_correct/total

In [78]:
print(f"Accuracy of the Viterbi algorithm is {compute_accuracy(pred, y):.4f}")

Accuracy of the Viterbi algorithm is 0.9545
