POS Tagging using Viterbi Algorithm





Run the cell below to import everything needed to complete the rest of this notebook. 


In [None]:
import nltk
import pprint

import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
import math

nltk.download('brown')
nltk.download('universal_tagset')

## Section 1: POS Tagging with HMM

A Hidden Markov Model is a probabilistic sequence model that, given a sequence of words, computes a probability distribution over a sequence of POS tags. Given a trained HMM, we want to compute the likelihood of a particular sequence using a forward algorithm.
 

### 1.1 Exploring the Dataset

We will be using the tagged sentences in the Brown corpus. Let's first take a look at the data we are working with.


In [None]:
corpus = nltk.corpus.brown.tagged_sents(tagset='universal')

In [None]:
# first two tagged sentences in the corpus
pp = pprint.PrettyPrinter()
pp.pprint(corpus[:2])

What are the unique tags that are in this data? Save this into a variable `tags`.

Also, what are the unique words that are in this data? Save this into a variable `words`.


In [None]:
new_corpus=[]
for i in corpus:
    i.insert(0, ('<s>','START'))
    x = ('</s>','END')
    i.append(x)
    new_corpus.append(i)
    

In [None]:
tags_list=[]
words_list=[]
for index1,sentence in enumerate(new_corpus):
  for index2, pair in enumerate(sentence):
    words_list.append(pair[0])
    tags_list.append(pair[1])

In [None]:
tags = set(tags_list)
words = set(words_list)

### 1.2 Evaluating Likelihood

Now that we've taken a look at the dataset, let's use the Hidden Markov Model to calculate the emission probabilities and transition probabilities. 

We'll start with the emission probabilities. Recall that the emission probability computes the probability of a word given a tag. In other words, the emission probability can be calculated by dividing the number of times a word is given a tag by the total count of that tag.

To calculate the accuracy of the Viterbi algorithm that we will implement later, let's divide the corpus up into a training and a test set. 

In [None]:
# splitting into train and test sets
train_set, test_set = train_test_split(new_corpus,train_size = 0.95, test_size = 0.05, random_state = 123)

# putting those into their own lists
train_words = [tup for sent in train_set for tup in sent]
test_words = [tup[0] for sent in test_set for tup in sent]

# print out first five train set words
pp.pprint(train_words[:5])

In [None]:
word_list = []

for i in train_words:
  word_list.append(i[0])


Now let's calculate the emission and transition probabilities. We will use this to assign a tag `t` 
to each word `w` which maximizes the likelihood of `P(t|w)`. Recall from lecture that by using Bayes rule, this can be computed with `P(w|t)` (emission probability) and `P(t|t-1)` (transition probability).

Create a matrix `emissions` which, for each word as the key, stores the key-value pair of POS tags and the count of that word under that POS tag. Note that this does not calculate the probability, but will be used as a helper function in our Viterbi algorithm. Be sure to use the words from the training set `train_words` to create this matrix.

In [None]:
pos_map={}
for index1,pair in enumerate(train_words):
    key = pair[1] + "*" + pair[0]
    if pos_map.get(key) is None:
      pos_map[key] = 1
    else:
      pos_map[key] = pos_map[key] + 1 

In [None]:
print(pos_map)

In [None]:
emissions = {}
for item in pos_map.items():
  pos_word_pair = item[0].split("*",1)
  pos_word_pair.reverse()
  count = str(item[1])
  temp={}
  temp[pos_word_pair[-1]] = int(count)
  if emissions.get(pos_word_pair[0],'empty')=='empty':
    emissions[pos_word_pair[0]]={}
  emissions[pos_word_pair[0]].update(temp)
 

In [None]:
print(emissions)

Now, we will create the emission matrix using the emissions dictionary we created, and save it to a dataframe. This may be used in the implementation of the Viterbi algorithm. 

In [None]:
emissions_df = pd.DataFrame(emissions).transpose()
emissions_df

Likewise, implement `transitions` which stores as keys the POS tags, and stores as values the key (previous POS tag) - value (number of times the previous POS tag is followed by the current POS tag) pair. We include the `START` and `END` tags. 

In [None]:
trans_map={}
print(len(train_words))
i=0
for index1,pair in enumerate(train_words):
    if index1==0 : 
      continue   
    prev_pair = train_words[index1-1]
    pos_key = pair[1] +  '*' + prev_pair[1]
    if pos_key in trans_map:
      trans_map[pos_key] = trans_map[pos_key] + 1
    else:
      trans_map[pos_key] = 1    


In [None]:
print(trans_map)

In [None]:
transitions = {}
for item in trans_map.items():
  pos_word_pair = item[0].split("*")
  count = str(item[1])
  temp={}
  temp[pos_word_pair[1]] = int(count)
  if transitions.get(pos_word_pair[0],'empty')=='empty':
    transitions[pos_word_pair[0]]={}
  transitions[pos_word_pair[0]].update(temp)

Again, we will create the transition matrix (`t` x `t`) using the transition probability dictionary we created, and save it to a dataframe. 

In [None]:
transitions_df = pd.DataFrame(transitions).T
transitions["START"]["END"] = 0
display(transitions_df)
#transitions_df['ADJ']['NOUN']

### 1.3: Decoding The Input - Viterbi Algorithm 

The Viterbi Algorithm computes the **most likely sequence** of hidden states, given the evidence that is fed into a HMM.

Below is a function `localMaximizer' that only considers forward probabilities and thus leads to only a local optimum, not to a global optimum. This is **NOT** what the Viterbi algorithm does -- we would instead like to store a global optimum using (1) a partial, best forward probability, and (2) a backpointer to what the best predecessor is. 

In [None]:

def localMaximizer(words, train_on = train_words):
  tags = list(set([tup[1] for tup in train_on])) 
  states = []

  # go through the sequence of words
  for key, word in enumerate(words):
    # initialize matrices
    prob = []
    for tag in tags:
      if key == 0:
        transition = transitions_df.loc['END', tag]
      else:
        transition = transitions_df.loc[states[-1], tag]
        
      if emissions.get(word, {}).get(tag):
        emission = emissions[words[key]][tag]
      else:
        emission = 0.0

      # calculate emission probabilities * transition probabilities for all tags
      prob.append(transition * emission)

    # find the maximum probability state, backtrace
    max_state = tags[prob.index(max(prob))]
    states.append(max_state)
  
  return list(zip(words, states))

Let's check out how this local maximizer does on our test set. Below, we create a list of test sentences with words that have tags, and another list of those same words but without tags. 

In [None]:
tests = test_set

# words with tags
test_with_tags = [tup for sent in tests for tup in sent]
# words without their tags
test_without_tags = [tup[0] for sent in tests for tup in sent]

find_sequence = localMaximizer(test_without_tags)

And below, we calculate the accuracy that was achieved based on this test set. 

In [None]:
correct = []
for i, j in zip(find_sequence, test_with_tags):
  if i == j:
    correct.append(i)

accuracy = len(correct)/len(find_sequence)
accuracy

Let's improve this now by implementing the Viterbi algorithm properly. 

We implement the [Viterbi Algorithm](https://en.wikipedia.org/wiki/Viterbi_algorithm#Pseudocode) using the pseudocode at the link. It will be helpful to use the helper functions above and both the `emissions_df` and `transitions_df` in our implementation.

In [None]:
transitions_df=transitions_df.T
index_list=transitions_df.index.values.tolist()
temp1_df=transitions_df[index_list]
col_list=list(transitions_df.columns) 
temp2_df=emissions_df.T.reindex(index_list)
temp3_df=emissions_df.T.reindex(col_list)

In [None]:
class Viterbi():
  def __init__(self, Y):
    self.O = list(set([w_t[0] for w_t in train_words]))  # Observation space  
    self.S=  col_list  # State space
    self.Y = Y  # Sequence of observations
    self.A = np.log(transitions_df.T+1)  # Transition matrix
    #self.A= temp1_df
    self.B = np.log(temp3_df+1)   # Emission matrix
    self.N = len(self.O)
    self.K = len(self.S)
    self.T = len(Y)
    self.T1 = [[0] * self.T for i in range(self.K)]
    self.T2 = [[None] * self.T for i in range(self.K)]
    self.X = [None] * self.T # Predicted tags

    self.training_word_list=[]

  def decode(self):

    for word in train_words:
      self.training_word_list.append(word[0])
    
    for i in range(0,self.K):
      if self.Y[0] in self.training_word_list:
        self.T1[i][0] = self.A.iloc[0][i] + self. B.iloc[i][self.Y[0]]
      else:
          self.T1[i][0] = self.A.iloc[0][i] + self. B.iloc[i][self.Y[1]]
      self.T2[i][0] = 0

    for j in range(1, self.T):
      argmax_k=1
      temp_list=[]
      for i in range(0,self.K):
        for k in range(0,self.K):
          if self.Y[j] not in self.training_word_list:
            self.T1[i][j] = max(self.T1[i][j],100)
          else:  
            pos = self.S[k]   
            if self.T1[i][j] < self.T1[k][j-1] + self.A[pos][i] + self.B.iloc[i][self.Y[j]]:      
              self.T1[i][j] = max(self.T1[i][j],self.T1[k][j-1] + self.A[pos][i] + self.B.iloc[i][self.Y[j]])
              pair=(self.T1[k][j-1] + self.A[pos][i] + self.B.iloc[i][self.Y[j]],k)
              temp_list.append(pair)
              argmax_k=k
        self.T2[i][j] = argmax_k
    z= [0] * self.T
    max_val=0
    for k in range(0,self.K):
      if max_val < self.T1[k][self.T-1]:
        max_val = self.T1[k][self.T-1]
        z[self.T-1] = k  
    self.X[self.T-1] = self.S[z[self.T-1]]
    for j in range(self.T-1, 0, -1):
        if self.T2[z[j]][j] is not None and self.Y[j]  in self.training_word_list:
          z[j-1] = self.T2[z[j]][j]
          self.X[j-1] = self.S[z[j-1]]
        else:          
          temp_list=np.array([row[j-1] for row in self.T1])
          tag = self.S[np.argmax(temp_list)]
          self.X[j-1]=tag    
    return self.X    

**Since the Viterbi Algorithm takes a long time to run, we split the test set into batches of 1000 words and we are currently running only on the first 5000 words of the test set.**

**Each batch of 500 words takes 2 minutes to run and the output of each batch is printed**

**To run on the entire test please change the variable named "TEST_DATA_SIZE" from 2000 to [number of words in test data]**

**This cell will take 10 minutes to run**

In [None]:
tests = test_set

TEST_DATA_SIZE=2000
BATCH_SIZE=500
mislabels=[]

count=1
for i in range(0,TEST_DATA_SIZE,BATCH_SIZE):
  test_with_tags = [tup for sent in tests for tup in sent][i:i+BATCH_SIZE]
  test_without_tags = [tup[0] for sent in tests for tup in sent][i:i+BATCH_SIZE]
  print("EXECUTING BATCH: ", count," from word index ", i, "till word index ", (i+BATCH_SIZE))
  decoder = Viterbi(test_without_tags)
  pred = decoder.decode()
  result = [(test_without_tags[i], pred[i]) for i in range(0, len(pred))] 
  print("VITERBI RESULT :", result)
  print("ACTUAL TAGS: " , test_with_tags)
  correct = []
  for i, j in zip(result, test_with_tags):
    if i == j:
      correct.append(i) 
    else:
      error=(i,j)
      mislabels.append(error)  
  accuracy = len(correct)/len(result)
  print("ACCURACY: " , accuracy)
  count =  count+1

Let's see how the Viterbi implementation performs on sentences in our test set.

And below, we calculate the accuracy that was achieved based on this test set.

What are some of the words that were tagged incorrectly?

In [None]:
for index, pair in enumerate(mislabels):
  print("Predicted word-tag : ", pair[0], " Actual word-tag : ", pair[1])
  if index > 15:
    break