In [None]:
# Importing packages and loading in the data set 
import pandas as pd
from collections import defaultdict
import math
import numpy as np
import string


In [None]:
class HMM:


  def __init__(self,training,voc, test_corpus, test):

    # Punctuation characters
    self.punct = set(string.punctuation)

    # Morphology rules used to assign unknown word tokens
    self.noun_suffix = ["action", "age", "ance", "cy", "dom", "ee", "ence", "er", "hood", "ion", "ism", "ist", "ity", "ling", "ment", "ness", "or", "ry", "scape", "ship", "ty"]
    self.verb_suffix = ["ate", "ify", "ise", "ize"]
    self.adj_suffix = ["able", "ese", "ful", "i", "ian", "ible", "ic", "ish", "ive", "less", "ly", "ous"]
    self.adv_suffix = ["ward", "wards", "wise"]


    with open(training, 'r') as f:
      training_corpus = f.readlines()

    with open(voc, 'r') as f:
      voc_l = f.read().split('\n')

    # vocab: dictionary that has the index of the corresponding words
    self.vocab = {} 

    # Get the index of the corresponding words. 
    for i, word in enumerate(sorted(voc_l)): 
        self.vocab[word] = i   

    #Reading the test corpus
    with open(test_corpus, 'r') as f:
      self.y = f.readlines()



    #corpus without tags, preprocessed
    self._, self.prep = self.preprocess(self.vocab, test)




    
    self.cnt = 0
    for k,v in self.vocab.items():
        self.cnt += 1
        if self.cnt > 20:
            break

    #create dictionaries
    self.emission_counts, self.transition_counts, self.tag_counts = self.create_dictionaries(training_corpus, self.vocab)

    # get all the POS states
    self.states = sorted(self.tag_counts.keys())


    #create transition matrix
    alpha = 0.001
    self.A = self.create_transition_matrix(alpha, self.tag_counts, self.transition_counts)


    #creating emission matrix
    self.B = self.create_emission_matrix(alpha, self.tag_counts, self.emission_counts, list(self.vocab))

    #initilaize best_probs and best_paths matrices
    self.best_probs, self.best_paths = self.initialize(self.states, self.tag_counts, self.A, self.B, self.prep, self.vocab)

    # this will take a few minutes to run => processes ~ 30,000 words VITERBI FWD
    self.best_probs, self.best_paths = self.viterbi_forward(self.A, self.B, self.prep, self.best_probs, self.best_paths, self.vocab)



    #RUnning Viterbi Backward
    self.pred = self.viterbi_backward(self.best_probs, self.best_paths, self.prep, self.states)

  def get_word_tag(self,line, vocab): 
      if not line.split():
          word = "--n--"
          tag = "--s--"
          return word, tag
      else:
          word, tag = line.split()
          if word not in vocab: 
              word = self.assign_unk(word)
          return word, tag
      return None 


  def preprocess(self, vocab, data_fp):
      '''
      Input: 
          data_fp: file pointer to test data
          vocab: a dictionary where keys are words in vocabulary and value is an index
          
      Output: 
          orig: original data with words and the assigned POS tags
          prep: Data without the POS tags for testing
      '''
      orig = []
      prep = []

      with open(data_fp, "r") as data_file:

          for cnt, word in enumerate(data_file):
              if not word.split():
                  orig.append(word.strip())
                  word = "--n--"
                  prep.append(word)
                  continue
              elif word.strip() not in vocab:
                  orig.append(word.strip())
                  word = self.assign_unk(word)
                  prep.append(word)
                  continue
              else:
                  orig.append(word.strip())
                  prep.append(word.strip())

      assert(len(orig) == len(open(data_fp, "r").readlines()))
      assert(len(prep) == len(open(data_fp, "r").readlines()))

      return orig, prep


  def assign_unk(self, tok):
     #Assign unk tags
      if any(char.isdigit() for char in tok):
          return "--unk_digit--"
      elif any(char in self.punct for char in tok):
          return "--unk_punct--"
      elif any(char.isupper() for char in tok):
          return "--unk_upper--"
      elif any(tok.endswith(suffix) for suffix in self.noun_suffix):
          return "--unk_noun--"
      elif any(tok.endswith(suffix) for suffix in self.verb_suffix):
          return "--unk_verb--"
      elif any(tok.endswith(suffix) for suffix in self.adj_suffix):
          return "--unk_adj--"
      elif any(tok.endswith(suffix) for suffix in self.adv_suffix):
          return "--unk_adv--"
      return "--unk--"




  def create_dictionaries(self, training_corpus, vocab):
      '''
      Input: 
          prep: a preprocessed version of 'y'. A list with the 'word' component of the tuples.
          training_corpus: a corpus composed of a list of tuples where each tuple consists of (word, POS)
          vocab: a dictionary where keys are words in vocabulary and value is an index
          
      Output: 
          emission_counts: a dictionary where the keys are (tag,word) tuples and the value is the count
          transition_counts: a dictionary where the keys are (prev_tag,curr_tag) tuples and the value is the count
          tag_counts: a dictionary where the keys are tags and the value is the count
      '''    
      emission_counts = defaultdict(int)
      transition_counts = defaultdict(int)
      tag_counts = defaultdict(int)
      
      prev_tag = '--s--' 

      i = 0 

      for word_tag in training_corpus:

          i += 1

          if i % 50000 == 0:
              print(f"word count = {i}")
          word,tag = self.get_word_tag(word_tag,vocab)
          transition_counts[(prev_tag,tag)] += 1
          emission_counts[(tag,word)] += 1
          tag_counts[tag] += 1
          prev_tag = tag
          
      return emission_counts, transition_counts, tag_counts

  def create_transition_matrix(self, alpha, tag_counts, transition_counts):
      ''' 
      Input: 
          alpha: number used for smoothing
          tag_counts: a dictionary mapping each tag to its respective count
          transition_counts: transition count for the previous word and tag
      Output:
          A: matrix of dimension (num_tags,num_tags)
      '''
      # Write your code here
      tags=sorted(tag_counts.keys())
      number=len(tags)
      A=np.zeros((number,number))
      prev_keys=set(transition_counts.keys())
      for i in range(number):
        for j in range(number):
          c=0
          k=(tags[i],tags[j])
          if k in transition_counts:
            c=transition_counts[k]
          prev_tag_count=tag_counts[tags[i]]

          A[i,j]=(c + alpha) / (prev_tag_count + alpha * number)

      return A

  def create_emission_matrix(self, alpha, tag_counts, emission_counts, vocab):
      '''
      Input: 
          alpha: tuning parameter used in smoothing 
          tag_counts: a dictionary mapping each tag to its respective count
          emission_counts: a dictionary where the keys are (tag, word) and the values are the counts
          vocab: a dictionary where keys are words in vocabulary and value is an index
      Output:
          B: a matrix of dimension (num_tags, len(vocab))
      '''
      # Write your code here
      number=len(tag_counts)
      tags=sorted(tag_counts.keys())
      tot_word_ct=len(vocab)
      B = np.zeros((number, tot_word_ct))


      keys=set(list(emission_counts.keys()))

      for i in range(number):
        for j in range(tot_word_ct):
          c=0
          key=(tags[i],vocab[j])
          if key in emission_counts.keys():
            c=emission_counts[key]
          pos_tag_ct= tag_counts[tags[i]]

          B[i,j] = (c + alpha) / (pos_tag_ct + alpha * tot_word_ct)


      return B

  def initialize(self, states, tag_counts, A, B, corpus, vocab):
    #helper function to initialize the matrix
    no_of_tags = len(tag_counts)
    best_probs = np.zeros((len(tag_counts), len(corpus)))
    best_paths= np.zeros((len(tag_counts), len(corpus)))

    s_idx = states.index("--s--")
    for i in range(no_of_tags):
      if A[s_idx,i]==0:
        best_probs[i,0]=float('-inf')
      else:
        best_probs[i,0]=math.log(A[s_idx,i])+math.log(B[i,vocab[corpus[0]]])
    return best_probs, best_paths





  def viterbi_forward(self, A, B, test_corpus, best_probs, best_paths, vocab):
      '''
      Input: 
          A, B: The transiton and emission matrices respectively
          test_corpus: a list containing a preprocessed corpus
          best_probs: an initilized matrix of dimension (num_tags, len(corpus))
          best_paths: an initilized matrix of dimension (num_tags, len(corpus))
          vocab: a dictionary where keys are words in vocabulary and value is an index 
      Output: 
          best_probs: a completed matrix of dimension (num_tags, len(corpus))
          best_paths: a completed matrix of dimension (num_tags, len(corpus))
      '''
      # Write your code here
      num_tags = best_probs.shape[0]
      for i in range(1,len(test_corpus)):
        for j in range(num_tags):
          best_prob_i = float('-inf')
          best_path_i = None
          for k in range(num_tags):
            prob = best_probs[k,i-1] + math.log(A[k,j]) + math.log(B[j,vocab.get(test_corpus[i])])
            if prob>best_prob_i:
              best_prob_i = prob
              best_path_i = k
          best_probs[j,i] = best_prob_i
          best_paths[j,i] = best_path_i 
      return best_probs, best_paths



  def viterbi_backward(self, best_probs, best_paths, corpus, states):
    '''
      Input: 
          corpus: a list containing a preprocessed corpus
          best_probs: an initilized matrix of dimension (num_tags, len(corpus))
          best_paths: an initilized matrix of dimension (num_tags, len(corpus))
          states: a list of all possible POS tags
      Output: 
          best_probs: a completed matrix of dimension (num_tags, len(corpus))
          best_paths: a completed matrix of dimension (num_tags, len(corpus))
    '''
    m = best_paths.shape[1]
    tracker = [None] * m
    num_tags = best_probs.shape[0]
    best_p = -9999999999999 
    pos_pred = [None] * m
    for k in range(1,num_tags-1):
      if best_probs[k,m-1]<best_probs[k-1,m-1]:
        best_p=best_probs[k-1,m-1]
        tracker[m - 1] = int(k)
    pos_pred[m-1] = states[k]
    for i in range(m-1, 0, -1):
      tracker[i-1]=best_paths[int(tracker[i]), i]
      pos_pred[i-1] = states[int(tracker[i-1])]
    return pos_pred


In [None]:
def compute_accuracy(pred, y):
    '''
    Input: 
        pred: a list of the predicted parts-of-speech 
        y: a list of lines where each word is separated by a '\t' (i.e. word \t tag)
    Output: 
        
    '''
    num_correct = 0
    total = 0
    
    # Zip together the prediction and the labels
    for prediction, y in zip(pred, y):
        # Split the label into the word and the POS tag
        word_tag_tuple = y.split()
        
        # Check that there is actually a word and a tag
        # no more and no less than 2 items
        if len(word_tag_tuple)!=2: # complete this line
            continue 

        # store the word and tag separately
        word, tag = word_tag_tuple
        
        # Check if the POS tag label matches the prediction
        if prediction == tag: # complete this line
            
            # count the number of times that the prediction
            # and label match
            num_correct += 1
            
        # keep track of the total number of examples (that have valid labels)
        total += 1
        
    p=(num_correct/total)*100
    return print("The accuracy of the Viterbi Algorithm on test dataset is "+str(np.round(p,3))+"%")

In [None]:
obj= HMM(training="WSJ-2_21.pos", voc= "hmm_vocab.txt", test_corpus="WSJ-24.pos", test="test.words.txt")

word count = 50000
word count = 100000
word count = 150000
word count = 200000
word count = 250000
word count = 300000
word count = 350000
word count = 400000
word count = 450000
word count = 500000
word count = 550000
word count = 600000
word count = 650000
word count = 700000
word count = 750000
word count = 800000
word count = 850000
word count = 900000
word count = 950000


In [None]:
compute_accuracy(obj.pred, obj.y)

The accuracy of the Viterbi Algorithm on test dataset is 95.306%


**Matrix Visualization:**

In [None]:
# Try viewing emissions using sample words
cidx  = ['725','adroitly','engineers', 'promoted', 'synergy']

# Get the integer ID for each word
cols = [obj.vocab[a] for a in cidx]

# Choose POS tags to show in a sample dataframe
rvals =['CD','NN','NNS', 'VB','RB','RP']

# For each POS tag, get the row number from the 'states' list
rows = [obj.states.index(a) for a in rvals]

# Get the emissions for the sample of words, and the sample of POS tags
B_sub = pd.DataFrame(obj.B[np.ix_(rows,cols)], index=rvals, columns = cidx )
print(B_sub)

              725      adroitly     engineers      promoted       synergy
CD   8.201296e-05  2.732854e-08  2.732854e-08  2.732854e-08  2.732854e-08
NN   7.521128e-09  7.521128e-09  7.521128e-09  7.521128e-09  2.257091e-05
NNS  1.670013e-08  1.670013e-08  4.676203e-04  1.670013e-08  1.670013e-08
VB   3.779036e-08  3.779036e-08  3.779036e-08  3.779036e-08  3.779036e-08
RB   3.226454e-08  6.456135e-05  3.226454e-08  3.226454e-08  3.226454e-08
RP   3.723317e-07  3.723317e-07  3.723317e-07  3.723317e-07  3.723317e-07


In [None]:
# Visualizing transition matrix
print(f"A at row 0, col 0: {obj.A[0,0]:.9f}")
print(f"A at row 3, col 1: {obj.A[3,1]:.4f}")

print("View a subset of transition matrix A")
A_sub = pd.DataFrame(obj.A[30:35,30:35], index=obj.states[30:35], columns = obj.states[30:35] )
print(A_sub)

A at row 0, col 0: 0.000007040
A at row 3, col 1: 0.1691
View a subset of transition matrix A
              RBS            RP           SYM        TO            UH
RBS  2.217069e-06  2.217069e-06  2.217069e-06  0.008870  2.217069e-06
RP   3.756509e-07  7.516775e-04  3.756509e-07  0.051089  3.756509e-07
SYM  1.722772e-05  1.722772e-05  1.722772e-05  0.000017  1.722772e-05
TO   4.477336e-05  4.472863e-08  4.472863e-08  0.000090  4.477336e-05
UH   1.030439e-05  1.030439e-05  1.030439e-05  0.061837  3.092348e-02


In [None]:
#Visualizing a subset of the best_probs matrix
print("View a subset of best_prob matrix A")
A_sub = pd.DataFrame(obj.best_probs[30:35,30:35], index=obj.states[30:35], columns = cidx )
print(A_sub)

View a subset of best_prob matrix A
            725    adroitly   engineers    promoted     synergy
RBS -202.756188 -208.388385 -210.469384 -210.159431 -223.792237
RP  -202.582976 -217.722668 -207.237257 -215.529735 -224.139572
SYM -202.008781 -214.230938 -217.410216 -220.737687 -222.033388
TO  -200.440161 -209.469378 -209.069517 -216.222978 -221.096697
UH  -208.741895 -214.620888 -209.793465 -213.526235 -228.704175


In [None]:
#Visualizing a subset of the best_paths matrix
print("View a subset of best_prob matrix A")
A_sub = pd.DataFrame(obj.best_paths[30:35,30:35], index=obj.states[30:35], columns = cidx )
print(A_sub)

View a subset of best_prob matrix A
      725  adroitly  engineers  promoted  synergy
RBS  20.0      19.0       35.0      11.0     21.0
RP   20.0      19.0       35.0      11.0     21.0
SYM  20.0      19.0       35.0      11.0     21.0
TO   20.0      19.0       35.0      11.0     21.0
UH   35.0      19.0       35.0      11.0     34.0


In [None]:
#Checking true and predicted POS tag values for first test sentence
print("Predicted POS sequence: ",obj.pred[0:13])
print("True POS sequence: ",["".join(reversed(i[-2:-4:-1])) for i in obj.y[0:13]])

Predicted POS sequence:  ['DT', 'NN', 'POS', 'NN', 'MD', 'VB', 'VBN', 'IN', 'JJ', 'NN', 'VBZ', 'DT', 'NN']
True POS sequence:  ['DT', 'NN', 'OS', 'NN', 'MD', 'VB', 'BN', 'IN', 'JJ', 'NN', 'NS', 'DT', 'NN']
