This notebook is adapted from Coursera's NLP with Probabilistic Models, Week 2 Assignment
Here, we'll implement unidirectional POS tagging with 95% accuracy rate using the Penn Treebank II Tag set

In [1]:
from utils_pos import get_word_tag, preprocess  
import pandas as pd
from collections import defaultdict
import math
import numpy as np

In [2]:
with open("WSJ_02-21.pos", 'r') as f:
    training_corpus = f.readlines()
    
print(training_corpus[0:50])

['In\tIN\n', 'an\tDT\n', 'Oct.\tNNP\n', '19\tCD\n', 'review\tNN\n', 'of\tIN\n', '``\t``\n', 'The\tDT\n', 'Misanthrope\tNN\n', "''\t''\n", 'at\tIN\n', 'Chicago\tNNP\n', "'s\tPOS\n", 'Goodman\tNNP\n', 'Theatre\tNNP\n', '(\t(\n', '``\t``\n', 'Revitalized\tVBN\n', 'Classics\tNNS\n', 'Take\tVBP\n', 'the\tDT\n', 'Stage\tNN\n', 'in\tIN\n', 'Windy\tNNP\n', 'City\tNNP\n', ',\t,\n', "''\t''\n", 'Leisure\tNN\n', '&\tCC\n', 'Arts\tNNS\n', ')\t)\n', ',\t,\n', 'the\tDT\n', 'role\tNN\n', 'of\tIN\n', 'Celimene\tNNP\n', ',\t,\n', 'played\tVBN\n', 'by\tIN\n', 'Kim\tNNP\n', 'Cattrall\tNNP\n', ',\t,\n', 'was\tVBD\n', 'mistakenly\tRB\n', 'attributed\tVBN\n', 'to\tTO\n', 'Christina\tNNP\n', 'Haag\tNNP\n', '.\t.\n', '\n']


In [3]:
#check how many tags there are 
def spli_t(string):
    try:
        return string.split('\t')[1]
    except:
        return string

tags = set([spli_t(x) for x in training_corpus])
print(tags)
print(len(tags))
print(len(training_corpus))


{'``\n', 'VBP\n', 'IN\n', 'CC\n', 'PRP\n', 'NN\n', 'JJR\n', "''\n", 'NNPS\n', ':\n', 'TO\n', 'WP\n', 'SYM\n', 'WRB\n', 'PDT\n', '.\n', 'NNS\n', 'PRP$\n', 'NNP\n', 'VBG\n', 'JJ\n', ',\n', 'LS\n', 'CD\n', 'POS\n', '\n', 'VBZ\n', 'MD\n', 'VB\n', 'JJS\n', ')\n', 'EX\n', '(\n', 'DT\n', 'RP\n', 'WP$\n', 'FW\n', 'VBN\n', '$\n', 'WDT\n', 'RB\n', 'UH\n', 'RBS\n', 'VBD\n', '#\n', 'RBR\n'}
46
989860


In [4]:
#create a vocab dictionary with each word's index
with open("hmm_vocab.txt", 'r') as f:
    voc_l = f.read().split('\n')

print(voc_l[:50])
print(voc_l[-50:])

voc_l.sort()
vocab={}

for index, word in enumerate(voc_l):
    vocab[word]=index

['!', '#', '$', '%', '&', "'", "''", "'40s", "'60s", "'70s", "'80s", "'86", "'90s", "'N", "'S", "'d", "'em", "'ll", "'m", "'n'", "'re", "'s", "'til", "'ve", '(', ')', ',', '-', '--', '--n--', '--unk--', '--unk_adj--', '--unk_adv--', '--unk_digit--', '--unk_noun--', '--unk_punct--', '--unk_upper--', '--unk_verb--', '.', '...', '0.01', '0.0108', '0.02', '0.03', '0.05', '0.1', '0.10', '0.12', '0.13', '0.15']
['yards', 'yardstick', 'year', 'year-ago', 'year-before', 'year-earlier', 'year-end', 'year-on-year', 'year-round', 'year-to-date', 'year-to-year', 'yearlong', 'yearly', 'years', 'yeast', 'yelled', 'yelling', 'yellow', 'yen', 'yes', 'yesterday', 'yet', 'yield', 'yielded', 'yielding', 'yields', 'you', 'young', 'younger', 'youngest', 'youngsters', 'your', 'yourself', 'youth', 'youthful', 'yuppie', 'yuppies', 'zero', 'zero-coupon', 'zeroing', 'zeros', 'zinc', 'zip', 'zombie', 'zone', 'zones', 'zoning', '{', '}', '']


In [6]:
#test corpus
with open("WSJ_24.pos", 'r') as f:
    y = f.readlines()

y[:20]

['The\tDT\n',
 'economy\tNN\n',
 "'s\tPOS\n",
 'temperature\tNN\n',
 'will\tMD\n',
 'be\tVB\n',
 'taken\tVBN\n',
 'from\tIN\n',
 'several\tJJ\n',
 'vantage\tNN\n',
 'points\tNNS\n',
 'this\tDT\n',
 'week\tNN\n',
 ',\t,\n',
 'with\tIN\n',
 'readings\tNNS\n',
 'on\tIN\n',
 'trade\tNN\n',
 ',\t,\n',
 'output\tNN\n']

In [7]:
# preprocessing removes all tags from the test corpus
_, prep = preprocess(vocab, "test.words")    
print(prep[:10])
print(len(prep))


['The', 'economy', "'s", 'temperature', 'will', 'be', 'taken', 'from', 'several', '--unk--']
34199


Time to get training

In [8]:
# fn to creat emmission, transition and tag counts
def create_dictionaries(training_corpus, vocab):
    emission_counts = defaultdict(int)
    transition_counts = defaultdict(int)
    tag_counts = defaultdict(int)

    n=len(training_corpus)

    prev_tag='--s--' #denotes the start state

    for i in range(n):
        if i%50000==0:
            print('Word count: ', i)

        word,tag= get_word_tag(training_corpus[i],vocab)

        tag_counts[tag]+=1

        emission_counts[(tag,word)]+=1
        transition_counts[(prev_tag,tag)]+=1
        prev_tag=tag

    return emission_counts, transition_counts, tag_counts
    

In [9]:
emission_counts, transition_counts, tag_counts = create_dictionaries(training_corpus, vocab)

Word count:  0
Word count:  50000
Word count:  100000
Word count:  150000
Word count:  200000
Word count:  250000
Word count:  300000
Word count:  350000
Word count:  400000
Word count:  450000
Word count:  500000
Word count:  550000
Word count:  600000
Word count:  650000
Word count:  700000
Word count:  750000
Word count:  800000
Word count:  850000
Word count:  900000
Word count:  950000


In [10]:
#states is a sorted list of all tags
states = sorted(tag_counts.keys())
print(f"Number of POS tags (number of 'states'): {len(states)}")
print("View these POS tags (states)")
print(states)

Number of POS tags (number of 'states'): 46
View these POS tags (states)
['#', '$', "''", '(', ')', ',', '--s--', '.', ':', 'CC', 'CD', 'DT', 'EX', 'FW', 'IN', 'JJ', 'JJR', 'JJS', 'LS', 'MD', 'NN', 'NNP', 'NNPS', 'NNS', 'PDT', 'POS', 'PRP', 'PRP$', 'RB', 'RBR', 'RBS', 'RP', 'SYM', 'TO', 'UH', 'VB', 'VBD', 'VBG', 'VBN', 'VBP', 'VBZ', 'WDT', 'WP', 'WP$', 'WRB', '``']


In [11]:
print("transition examples: ")
for ex in list(transition_counts.items())[:3]:
    print(ex)
print()

print("emission examples: ")
for ex in list(emission_counts.items())[200:203]:
    print (ex)
print()

print("ambiguous word example: ")
for tup,cnt in emission_counts.items():
    if tup[1] == 'back': print (tup, cnt)

transition examples: 
(('--s--', 'IN'), 5050)
(('IN', 'DT'), 32364)
(('DT', 'NNP'), 9044)

emission examples: 
(('DT', 'any'), 721)
(('NN', 'decrease'), 7)
(('NN', 'insider-trading'), 5)

ambiguous word example: 
('RB', 'back') 304
('VB', 'back') 20
('RP', 'back') 84
('JJ', 'back') 25
('NN', 'back') 29
('VBP', 'back') 4


Hidden Markov Model for PoS tagging

In [89]:
# A is the transition matrix giving probabilities of transitioning from tag i to tag j
def create_transition_matrix(alpha, tag_counts, transition_counts):
    n=len(tag_counts)
    A = np.zeros((n,n))
    all_tags = sorted(tag_counts.keys())
    
    for i in range(n):
        for j in range(n):
            key = (all_tags[i], all_tags[j])
            count = transition_counts.get(key)
            A[i,j]=(count+alpha)/(tag_counts[all_tags[i]]+n*alpha)
    
    return A

#B is the emission matrix giving probabilities of tag i emitting word j
def create_emission_matrix(alpha, tag_counts, emission_counts,vocab):
    n = len(tag_counts)
    m = len(vocab)
    B = np.zeros((n,m))
    all_tags = sorted(tag_counts.keys())
    
    for i in range(n):
        for j in range(m):
            key = (all_tags[i], vocab[j])
            count = emission_counts.get(key)
            B[i,j]=(count+alpha)/(tag_counts[all_tags[i]]+m*alpha)
    
    return B

In [88]:
alpha = 0.001
A = create_transition_matrix(alpha, tag_counts, transition_counts)
# Testing your function
print(f"A at row 0, col 0: {A[0,0]:.9f}")
print(f"A at row 3, col 1: {A[3,1]:.4f}")

print("View a subset of transition matrix A")
A_sub = pd.DataFrame(A[30:35,30:35], index=states[30:35], columns = states[30:35] )
print(A_sub)

A at row 0, col 0: 0.000007040
A at row 3, col 1: 0.1691
View a subset of transition matrix A
              RBS            RP           SYM        TO            UH
RBS  2.217069e-06  2.217069e-06  2.217069e-06  0.008870  2.217069e-06
RP   3.756509e-07  7.516775e-04  3.756509e-07  0.051089  3.756509e-07
SYM  1.722772e-05  1.722772e-05  1.722772e-05  0.000017  1.722772e-05
TO   4.477336e-05  4.472863e-08  4.472863e-08  0.000090  4.477336e-05
UH   1.030439e-05  1.030439e-05  1.030439e-05  0.061837  3.092348e-02


In [90]:
B = create_emission_matrix(alpha, tag_counts, emission_counts, list(vocab))

print(f"View Matrix position at row 0, column 0: {B[0,0]:.9f}")
print(f"View Matrix position at row 3, column 1: {B[3,1]:.9f}")

# Try viewing emissions for a few words in a sample dataframe
cidx  = ['725','adroitly','engineers', 'promoted', 'synergy']

# Get the integer ID for each word
cols = [vocab[a] for a in cidx]

# Choose POS tags to show in a sample dataframe
rvals =['CD','NN','NNS', 'VB','RB','RP']

# For each POS tag, get the row number from the 'states' list
rows = [states.index(a) for a in rvals]

# Get the emissions for the sample of words, and the sample of POS tags
B_sub = pd.DataFrame(B[np.ix_(rows,cols)], index=rvals, columns = cidx )
print(B_sub)

View Matrix position at row 0, column 0: 0.000006032
View Matrix position at row 3, column 1: 0.000000720
              725      adroitly     engineers      promoted       synergy
CD   8.201296e-05  2.732854e-08  2.732854e-08  2.732854e-08  2.732854e-08
NN   7.521128e-09  7.521128e-09  7.521128e-09  7.521128e-09  2.257091e-05
NNS  1.670013e-08  1.670013e-08  4.676203e-04  1.670013e-08  1.670013e-08
VB   3.779036e-08  3.779036e-08  3.779036e-08  3.779036e-08  3.779036e-08
RB   3.226454e-08  6.456135e-05  3.226454e-08  3.226454e-08  3.226454e-08
RP   3.723317e-07  3.723317e-07  3.723317e-07  3.723317e-07  3.723317e-07


The Viterbi Algorithm

In [115]:
# initialization
from math import log
def initialize(states, tag_counts, A, B, corpus, vocab):
    m = len(tag_counts)
    n = len(corpus)
    C = np.zeros((m,n)) # best probabilities
    D = np.zeros((m,n)) # best paths
    s_idx = states.index("--s--")
    for i in range(m):
        if A[s_idx,i] ==0 :
            C[i,0]=float('-inf')
        else:
            C[i,0] = log(A[s_idx,i])+log(B[i,vocab[corpus[0]]])
    
    return C,D

# C is the best probabilities matrix. C_ij gives the best probability of the entire string up till word j with tag i.
# D is the best paths matrix. after the first column, D_ij gives the index of the tag that leads us to the best probability correspondin to C_ij. 

#forward step

def viterbi_forward(A, B, corpus, C, D, vocab):
    m = C.shape[0]
    n = len(corpus)

    for j in range(1,n):
        if j%5000==0:
            print(f'Processing {j}th word . . .')
        for i in range(m):
            
            best_prob = float('-inf')
            best_index=None

            for k in range(m):
                prob = C[k,j-1]+log(A[k,i])+log(B[i,vocab[corpus[j]]])
                if prob>best_prob:
                    best_prob=prob
                    best_index=k

            C[i,j]=best_prob
            D[i,j]=best_index
    
    return C, D

#backwards step

def viterbi_backward(C, D, corpus, states):
    t = len(corpus)
    pred = [None]*t
    best_prob=float('-inf')
    best_index=None
    for i in range(C.shape[0]):
            if C[i,-1]>best_prob:
                best_prob=C[i,-1]
                best_index=i
    pred[-1]=states[best_index]

    for j in range(t-2,-1,-1):
        best_prob=float('-inf')
        best_index=None
        for i in range(C.shape[0]):
            if C[i,j+1]>best_prob:
                best_prob=C[i,j+1]
                best_index=i
        pred[j]=states[int(D[best_index,j+1])]
    
    return pred


In [92]:
C,D = initialize(states, tag_counts, A, B, prep, vocab)
# Test the function
print(f"best_probs[0,0]: {C[0,0]:.4f}") 
print(f"best_paths[2,3]: {D[2,3]:.4f}")

best_probs[0,0]: -22.6098
best_paths[2,3]: 0.0000


In [113]:
# this will take a few minutes to run => processes ~ 30,000 words
C, D = viterbi_forward(A, B, prep, C, D, vocab)

Processing 5000th word . . .
Processing 10000th word . . .
Processing 15000th word . . .
Processing 20000th word . . .
Processing 25000th word . . .
Processing 30000th word . . .


In [105]:
print(f"best_probs[0,1]: {C[0,1]:.4f}") 
print(f"best_probs[0,4]: {C[0,4]:.4f}") 

best_probs[0,1]: -24.7822
best_probs[0,4]: -49.5601


In [117]:
# Run and test your function
pred = viterbi_backward(C, D, prep, states)
m=len(pred)
print('The prediction for pred[-7:m-1] is: \n', prep[-7:m-1], "\n", pred[-7:m-1], "\n")
print('The prediction for pred[0:8] is: \n', pred[0:7], "\n", prep[0:7])

The prediction for pred[-7:m-1] is: 
 ['see', 'them', 'here', 'with', 'us', '.'] 
 ['VB', 'PRP', 'RB', 'IN', 'PRP', '.'] 

The prediction for pred[0:8] is: 
 ['DT', 'NN', 'POS', 'NN', 'MD', 'VB', 'VBN'] 
 ['The', 'economy', "'s", 'temperature', 'will', 'be', 'taken']


Computing the Accuracy

In [121]:
def compute_accuracy(pred,y):
    correct=0
    total=0
    for prediction, true_val in zip(pred,y):
        
        word_tag = true_val.split()

        if len(word_tag)!=2:
            continue

        word, tag = word_tag

        if prediction==tag:
            correct+=1
        total+=1
    
    return correct/total


In [122]:
print(f"Accuracy of the Viterbi algorithm is {compute_accuracy(pred, y):.4f}")

Accuracy of the Viterbi algorithm is 0.9528
