### POS Tagging can be difficult as a single word can represent various parts of speech at different times. 

In [1]:
#Import the libraries
import pandas as pd
import numpy as np
import math
from collections import defaultdict
from utils_pos import get_word_tag, preprocess

### Data Sources: We use training data and test data along with vocabulary. Training set will be used to create the emission, transmission and tag counts

In [2]:
#Load the training corpus
with open('WSJ_02-21.pos', 'r') as f:
    training_corpus = f.readlines()

print("Some examples from training corpus: ")
print(training_corpus[:5])

Some examples from training corpus: 
['In\tIN\n', 'an\tDT\n', 'Oct.\tNNP\n', '19\tCD\n', 'review\tNN\n']


In [3]:
#Get the vocabulary from text file
with open('hmm_vocab.txt', 'r') as f:
    voc_l = f.read().split('\n')

print("Length of vocabulary: ", len(voc_l))
print("Few examples of vacabulary: ")
print(voc_l[:50])
print(voc_l[-50:])


Length of vocabulary:  23777
Few examples of vacabulary: 
['!', '#', '$', '%', '&', "'", "''", "'40s", "'60s", "'70s", "'80s", "'86", "'90s", "'N", "'S", "'d", "'em", "'ll", "'m", "'n'", "'re", "'s", "'til", "'ve", '(', ')', ',', '-', '--', '--n--', '--unk--', '--unk_adj--', '--unk_adv--', '--unk_digit--', '--unk_noun--', '--unk_punct--', '--unk_upper--', '--unk_verb--', '.', '...', '0.01', '0.0108', '0.02', '0.03', '0.05', '0.1', '0.10', '0.12', '0.13', '0.15']
['yards', 'yardstick', 'year', 'year-ago', 'year-before', 'year-earlier', 'year-end', 'year-on-year', 'year-round', 'year-to-date', 'year-to-year', 'yearlong', 'yearly', 'years', 'yeast', 'yelled', 'yelling', 'yellow', 'yen', 'yes', 'yesterday', 'yet', 'yield', 'yielded', 'yielding', 'yields', 'you', 'young', 'younger', 'youngest', 'youngsters', 'your', 'yourself', 'youth', 'youthful', 'yuppie', 'yuppies', 'zero', 'zero-coupon', 'zeroing', 'zeros', 'zinc', 'zip', 'zombie', 'zone', 'zones', 'zoning', '{', '}', '']


In [4]:
#Create a vocabulary dictionary with word as key and index as value after sorting
vocab = {}
for i, word in enumerate(sorted(voc_l)):
    vocab[word] = i

#Print first 10 elements of the dictionary
print("First 10 elements of vocab dictionary")
i = 0
for key, val in vocab.items():
    print(f"{key}:{val}")
    i += 1
    if i == 9:
        break

First 10 elements of vocab dictionary
:0
!:1
#:2
$:3
%:4
&:5
':6
'':7
'40s:8


In [5]:
#Load the test corpus
with open("WSJ_24.pos", 'r') as f:
    y = f.readlines()

print("Some examples of testing data")
print(y[:5])

Some examples of testing data
['The\tDT\n', 'economy\tNN\n', "'s\tPOS\n", 'temperature\tNN\n', 'will\tMD\n']


In [6]:
#Get the preprocessed corpus (without tags)
_, prep = preprocess(vocab, 'test.words')

print('The length of the preprocessed test corpus: ', len(prep))
print('This is a sample of the test_corpus: ')
print(prep[0:10])

The length of the preprocessed test corpus:  34199
This is a sample of the test_corpus: 
['The', 'economy', "'s", 'temperature', 'will', 'be', 'taken', 'from', 'several', '--unk--']


## Prediction based on Start of the Art method (We will word that are not ambiguous).

#### We will create 3 dictionaries: Transition counts: no of times a tag is next to other tag, Emission counts: Probability of occurence of a word given a tag, and Tag counts: counts of a tag

In [7]:
#Define a function to create all the three dictionaries
def create_dictionary(training_corpus, vocab):
    
    #Initialize the three dictionaries
    emission_counts = defaultdict(int)
    transition_counts = defaultdict(int)
    tag_counts = defaultdict(int)
    
    #Initialize the previous tag with start as --s--
    prev_tag = '--s--'
    
    #Iterator to track the line number in corpus
    i = 0
    
    #Iterate through the training corpus
    for word_tag in training_corpus:
        
        i += 1
        if i % 50000 == 0:
            print(f"{i} Words processed")
        
        #Get the word and its tag from vocab
        word, tag = get_word_tag(word_tag, vocab)
        
        #Increase the counter of transition, emission and tags dictionaries
        transition_counts[(prev_tag, tag)] += 1
        emission_counts[(tag, word)] += 1
        tag_counts[tag] += 1
        
        #Set the current tag to prev_tag for next iteration
        prev_tag = tag
    
    #Return all the three dictionaries
    return emission_counts, transition_counts, tag_counts

In [8]:
#Populate the dictionaries using out training corpus and vocab
emission_counts, transition_counts, tag_counts = create_dictionary(training_corpus, vocab)

50000 Words processed
100000 Words processed
150000 Words processed
200000 Words processed
250000 Words processed
300000 Words processed
350000 Words processed
400000 Words processed
450000 Words processed
500000 Words processed
550000 Words processed
600000 Words processed
650000 Words processed
700000 Words processed
750000 Words processed
800000 Words processed
850000 Words processed
900000 Words processed
950000 Words processed


In [9]:
#Get all the POS states
states = sorted(tag_counts.keys())
print("Number of POS states: ", len(states))
print("POS states: ", states)

Number of POS states:  46
POS states:  ['#', '$', "''", '(', ')', ',', '--s--', '.', ':', 'CC', 'CD', 'DT', 'EX', 'FW', 'IN', 'JJ', 'JJR', 'JJS', 'LS', 'MD', 'NN', 'NNP', 'NNPS', 'NNS', 'PDT', 'POS', 'PRP', 'PRP$', 'RB', 'RBR', 'RBS', 'RP', 'SYM', 'TO', 'UH', 'VB', 'VBD', 'VBG', 'VBN', 'VBP', 'VBZ', 'WDT', 'WP', 'WP$', 'WRB', '``']


In [10]:
#View the transition and emission dictionaries sample
print("Transition dictionary examples")
for ex in list(transition_counts.items())[:3]:
    print(ex)

print("Emission dictionary examples")
for ex in list(emission_counts.items())[:3]:
    print(ex)

#Print some of the ambiguous words
print("Ambiguous words examples")
for tupple, count in emission_counts.items():
    if tupple[1] == 'back':
        print(tupple, count)

Transition dictionary examples
(('--s--', 'IN'), 5050)
(('IN', 'DT'), 32364)
(('DT', 'NNP'), 9044)
Emission dictionary examples
(('IN', 'In'), 1735)
(('DT', 'an'), 3142)
(('NNP', 'Oct.'), 317)
Ambiguous words examples
('RB', 'back') 304
('VB', 'back') 20
('RP', 'back') 84
('JJ', 'back') 25
('NN', 'back') 29
('VBP', 'back') 4


#### Testing the accuracy of POS tagger using emission_counts dictionary. We will assign the POS tag to each word in test corpus

In [11]:
#Define the function to predict the POS tag
def predict_tags(prep, y, emission_counts, vocab, states):
    
    #y is test data and prep is the preprocessed version of y
    #Set the initial number of correct predictions to 0
    num_corrects = 0
    
    #Get all the tuples of emssion_dictionary
    all_words = set(emission_counts.keys())
    
    #Get the length of test data
    total = len(y)
    
    #Iterate over all the test data
    for word, y_tup in zip(prep, y):
        
        #Split the tuple to get word and POS
        y_tup_ = y_tup.split()
        
        #Check if both word and POS is present, if so get the correct POS label
        if len(y_tup_) == 2:
            correct_label = y_tup_[1]
        
        else:
            continue
        
        #Initialize the pos_tag and count to 0 get the prediction
        pos_tag = ''
        count_max = 0
        
        #Check if word in vocab
        if word in vocab:
            
            #Iterate over all states
            for pos in states:
                
                #Get the tupple 
                key = (pos, word)
                
                #Check if key is in emission_counts and its count
                if key in emission_counts:
                    
                    counts = emission_counts[key]
                    
                    #Update predicted pos if count is greater than max_count
                    if counts > count_max:
                        count_max = counts
                        pos_tag = pos
        
        #Check if pos_tag is same as correct_label
        if correct_label == pos_tag:
            num_corrects += 1
        
    #Return the accuracy
    accuracy = num_corrects / total
    return accuracy

In [12]:
#Get the accuracy of our data
accuracy = predict_tags(prep, y, emission_counts, vocab, states)
print("Accuracy based on emission tags: ", accuracy)

Accuracy based on emission tags:  0.8888563993099213


#### We got 88.89% accuracy by State of Art Method, we will now try with Hidden Markov Models

## Hidden Markov Models for context based prediction

#### We will use transition matrix (A), emission matrix (B) and states (POS tag of word) in Hidden Markov Models

In [13]:
# Define a function to generate matrices transition matrix (A) (alpha is used for smoothing)

def create_transition_matrix(alpha, tag_counts, transition_counts):
    
    #Get the sorted order of POS tags
    all_tags = sorted(tag_counts.keys())
    #Get the length of the unique tags
    num_tags = len(all_tags)
    
    #Initialize transition matrix with zeros
    A = np.zeros((num_tags, num_tags))
    
    #Get the unique transition tuples (prev POS, cur POS)
    transition_keys = set(transition_counts.keys())
    
    #Iterate over all rows and columns of transition matrix
    for i in range(num_tags):
        for j in range(num_tags):
            
            #Initialize the counts to 0
            counts = 0
            
            #Get the tuple of row, col as key
            key = (all_tags[i], all_tags[j])
            
            #Check if key is in transition keys
            if key in transition_keys:
                
                #Get the count from transition counts
                counts = transition_counts[key]
            
            #Get the total counts of previous tags
            prev_tags_count = tag_counts[all_tags[i]]
            
            #Populate the transition matrix 
            A[i, j] = (counts + alpha) / (prev_tags_count + alpha * num_tags)
    return A

In [14]:
#Create a transition matrix with alpha = 0.001
alpha = 0.001
A = create_transition_matrix(alpha, tag_counts, transition_counts)

print("View a subset of transition matrix A")
A_sub = pd.DataFrame(A[30:35,30:35], index=states[30:35], columns = states[30:35] )
print(A_sub)

View a subset of transition matrix A
              RBS            RP           SYM        TO            UH
RBS  2.217069e-06  2.217069e-06  2.217069e-06  0.008870  2.217069e-06
RP   3.756509e-07  7.516775e-04  3.756509e-07  0.051089  3.756509e-07
SYM  1.722772e-05  1.722772e-05  1.722772e-05  0.000017  1.722772e-05
TO   4.477336e-05  4.472863e-08  4.472863e-08  0.000090  4.477336e-05
UH   1.030439e-05  1.030439e-05  1.030439e-05  0.061837  3.092348e-02


In [15]:
#Define a function to create emission matrix (occurence of tag and word pair)
def create_emission_matrix(alpha, tag_counts, emission_counts, vocab):
    
    #Get the number of POS tags
    num_tags = len(tag_counts)
    #Get the list of all tags
    all_tags = sorted(tag_counts.keys())
    #Get the total number of words in vocabulory
    num_words = len(vocab)
    
    #Initialize the emission matrix
    B = np.zeros((num_tags, num_words))
    
    #Get the set of (POS, word) tuples from emission_counts
    emission_keys = set(list(emission_counts.keys()))
    
    #Iterate through all elements of Emission matrix and populate them
    for i in range(num_tags):
        for j in range(num_words):
            
            count = 0
            
            key = (all_tags[i], vocab[j])
            if key in emission_keys:
                count = emission_counts[key]
            
            tag_count = tag_counts[all_tags[i]]
            
            B[i, j] = (count + alpha) / (tag_count + alpha * num_words)
    
    return B

In [16]:
#Create emission matrix with alpha as 0.001
alpha = 0.001
B = create_emission_matrix(alpha, tag_counts, emission_counts, list(vocab))

print(f"View Matrix position at row 0, column 0: {B[0,0]:.9f}")
print(f"View Matrix position at row 3, column 1: {B[3,1]:.9f}")

# Try viewing emissions for a few words in a sample dataframe
cidx  = ['725','adroitly','engineers', 'promoted', 'synergy']

# Get the integer ID for each word
cols = [vocab[a] for a in cidx]

# Choose POS tags to show in a sample dataframe
rvals =['CD','NN','NNS', 'VB','RB','RP']

# For each POS tag, get the row number from the 'states' list
rows = [states.index(a) for a in rvals]

# Get the emissions for the sample of words, and the sample of POS tags
B_sub = pd.DataFrame(B[np.ix_(rows,cols)], index=rvals, columns = cidx )
print(B_sub)


View Matrix position at row 0, column 0: 0.000006032
View Matrix position at row 3, column 1: 0.000000720
              725      adroitly     engineers      promoted       synergy
CD   8.201296e-05  2.732854e-08  2.732854e-08  2.732854e-08  2.732854e-08
NN   7.521128e-09  7.521128e-09  7.521128e-09  7.521128e-09  2.257091e-05
NNS  1.670013e-08  1.670013e-08  4.676203e-04  1.670013e-08  1.670013e-08
VB   3.779036e-08  3.779036e-08  3.779036e-08  3.779036e-08  3.779036e-08
RB   3.226454e-08  6.456135e-05  3.226454e-08  3.226454e-08  3.226454e-08
RP   3.723317e-07  3.723317e-07  3.723317e-07  3.723317e-07  3.723317e-07


## Viterbi Algorithm and Dynamic Programming for POS tag prediction

#### Initialization phase: Initialize the best_paths and best_probabilities matrices for forward step

In [17]:
#Define a function to initialize the best_prob and best_path matrices 
def viterbi_initialize(states, tag_counts, A, B, corpus, vocab):     # Corpus: sequence of words whose POS is to be predicted
    
    #Count the number of possible tags
    num_tags = len(tag_counts)
    
    #Initialize best_prob and best_path matrices
    best_prob = np.zeros((num_tags, len(corpus)))
    best_paths = np.zeros((num_tags, len(corpus)), dtype = int)
    
    #Define the start token
    s_idx = states.index('--s--')
    
    #Iterate through all the tags
    for i in range(num_tags):
        
        #Set the probability from start to -inf if there is no transition from start to POS index 'i'
        if A[s_idx, i] == 0:
            best_prob[i, 0] = float('-inf')
        
        #Else use log summation (prob of reaching A to POS i and prob of POS i to word)
        else:
            best_prob[i, 0] = math.log(A[s_idx, i]) + math.log(B[i, vocab[corpus[0]]])
    return best_prob, best_paths

In [18]:
#Initialize best_probs and best_paths using function
best_prob, best_paths = viterbi_initialize(states, tag_counts, A, B, prep, vocab)
print(best_prob)

[[-22.60982633   0.           0.         ...   0.           0.
    0.        ]
 [-23.07660654   0.           0.         ...   0.           0.
    0.        ]
 [-23.57298822   0.           0.         ...   0.           0.
    0.        ]
 ...
 [-22.75551606   0.           0.         ...   0.           0.
    0.        ]
 [-19.6637215    0.           0.         ...   0.           0.
    0.        ]
 [-18.36288463   0.           0.         ...   0.           0.
    0.        ]]


In [19]:
# Test the function
print(f"best_prob[0,0]: {best_prob[0,0]:.4f}") 
print(f"best_paths[2,3]: {best_paths[2,3]:.4f}")

best_prob[0,0]: -22.6098
best_paths[2,3]: 0.0000


#### Forward phase: Populate best_prob and best_path by forward walk

In [20]:
#Define a function for forward pass of Viterbi algorithm
def viterbi_forward(A, B, test_corpus, best_prob, best_paths, vocab):
    
    #Get the no of unique POS tags
    num_tags = best_prob.shape[0]
    
    #Iterate through all the words in test_corpus starting from 1 (starting word set in initialization phase)
    for i in range(1, len(test_corpus)):
        
        if i % 5000 == 0:
            print("Words processed: ", i)
            
        #Iterate through all the tags
        for j in range(num_tags):
            
            #Initialize the best_prob and best_path of word i to -inf and None
            best_prob_i = float('-inf')
            best_path_i = None
            
            #Iterate for each tag a previous tag can be
            for k in range(num_tags):
                
                #Get the probability for k from i - 1
                prob = best_prob[k, i - 1] + math.log(A[k, j]) + math.log(B[j, vocab[test_corpus[i]]])
                
                if prob > best_prob_i:
                    best_prob_i = prob
                    best_path_i = k
            best_prob[j, i] = best_prob_i
            best_paths[j, i] = best_path_i
    return best_prob, best_paths

In [None]:
#Populate the best_prob and best_paths using function
best_prob, best_paths = viterbi_forward(A, B, prep, best_prob, best_paths, vocab)

Words processed:  5000
Words processed:  10000
Words processed:  15000
Words processed:  20000
Words processed:  25000


In [None]:
# Test this function 
print(f"best_prob[0,1]: {best_prob[0,1]:.4f}") 
print(f"best_prob[0,4]: {best_prob[0,4]:.4f}") 

#### Backward phase: Get prediction of POS tag for each word using best_prob and best_path

In [None]:
#Define a function to perform backward phase of Viterbi algorithm
def viterbi_backward(best_prob, best_paths, corpus, states):
    
    #Get the number of words in corpus
    num_words = best_paths.shape[1]
    
    #Initialize array with same length to store the result (POS number)
    z = [None] * num_words
    
    #Get the number of unique POS tags
    num_tags = best_prob.shape[0]
    
    #Initialize the best probability for last word to -inf
    best_prob_last_word = float('-inf')
    
    #Initialize array to store results (POS tag)
    pred = [None] * num_words
    
    #Iterate through each tag for last word
    for i in range(num_tags):
        
        #Check the probability for each tag from best_prob matrix
        if best_prob[i, num_words - 1] > best_prob_last_word:
            best_prob_last_word = best_prob[i, num_words - 1]
            
            #Set the result to index of POS tag
            z[num_words - 1] = i
            
    #Set the result to state of best POS tag
    pred[num_words - 1] = states[z[num_words - 1]]
    
    #Iterate backwards from last word to first word
    for i in range(num_words - 1, -1, -1):
        
        #Get the result POS tag for word i
        pos_tag_i = z[i]
        
        #Get the best path for i - 1 word
        z[i - 1] = best_paths[pos_tag_i, i]
        
        #Get the corresponding state
        pred[i - 1] = states[z[i - 1]]
        
    return pred

In [None]:
# Run and test your function
pred = viterbi_backward(best_prob, best_paths, prep, states)
m=len(pred)
print('The prediction for pred[-7:m-1] is: \n', prep[-7:m-1], "\n", pred[-7:m-1], "\n")
print('The prediction for pred[0:8] is: \n', pred[0:7], "\n", prep[0:7])

## Predicting on test set

In [None]:
print('The third word is:', prep[3])
print('Your prediction is:', pred[3])
print('Your corresponding label y is: ', y[3])

In [None]:
#Define the function to predict using viterbi algorithm and calculate the accuracy
def calculate_accuracy(pred, y):
    
    num_correct = 0
    total = 0
    
    for y_pred, y_test in zip(pred, y):
        word_tag_tuple = y_test.split()
        
        if len(word_tag_tuple) != 2:
            continue
        word, tag = word_tag_tuple
        
        if tag == y_pred:
            num_correct += 1
        total += 1
    accuracy = num_correct / total
    return accuracy

In [None]:
print(f"Accuracy of the Viterbi algorithm is {calculate_accuracy(pred, y):.4f}")

#### Accuracy of SOTA: 88.89% and Viterbi algorithm: 95.31%