In [1]:
import pandas as pd
from collections import defaultdict
import math
import numpy as np

In [2]:
# declaring a few util functions
import string

punct = set(string.punctuation)

noun_suffix = ["action", "age", "ance", "cy", "dom", "ee", "ence", "er", "hood", "ion", "ism", "ist", "ity", "ling", "ment", "ness", "or", "ry", "scape", "ship", "ty"]
verb_suffix = ["ate", "ify", "ise", "ize"]
adj_suffix = ["able", "ese", "ful", "i", "ian", "ible", "ic", "ish", "ive", "less", "ly", "ous"]
adv_suffix = ["ward", "wards", "wise"]

def get_word_tag(line, vocab):
  """
  function to get the correct tag for a word
  """
  if not line.split():
    word = "--n--"
    tag = "--s--"
    return word, tag
  else:
    word, tag = line.split()
    if word not in vocab: 
      # Handle unknown words
      word = assign_unk(word)
    return word, tag
  return None 


def preprocess(vocab, data_fp):
  """
  Preprocess data
  """
  orig = []
  prep = []

  with open(data_fp, "r") as infile:
    for i, word in enumerate(infile):
      if not word.split():
        orig.append(word.strip())
        word = "--n--"
        prep.append(word)
        continue
      elif word.strip() not in vocab:
        orig.append(word.strip())
        word = assign_unk(word)
        prep.append(word)
        continue
      else:
        orig.append(word.strip())
        prep.append(word.strip())
    
    assert(len(orig) == len(open(data_fp, "r").readlines()))
    assert(len(orig) == len(open(data_fp, "r").readlines()))

    return orig, prep

def assign_unk(tok):
  """
  Assign unknown word tokens
  """
  # Digits
  if any(char.isdigit() for char in tok):
    return "--unk_digit--"

  # Punctuation
  elif any(char in punct for char in tok):
    return "--unk_punct--"

  # Upper-case
  elif any(char.isupper() for char in tok):
    return "--unk_upper--"

  # Nouns
  elif any(tok.endswith(suffix) for suffix in noun_suffix):
    return "--unk_noun--"

  # Verbs
  elif any(tok.endswith(suffix) for suffix in verb_suffix):
    return "--unk_verb--"

  # Adjectives
  elif any(tok.endswith(suffix) for suffix in adj_suffix):
    return "--unk_adj--"

  # Adverbs
  elif any(tok.endswith(suffix) for suffix in adv_suffix):
    return "--unk_adv--"

  return "--unk--"

In [3]:
# loading the training corpus
with open("WSJ_02-21.pos", "r") as infile:
  training = infile.readlines()

print(training[:5])

['In\tIN\n', 'an\tDT\n', 'Oct.\tNNP\n', '19\tCD\n', 'review\tNN\n']


In [4]:
# reading the vocab
with open("hmm_vocab.txt", 'r') as voc_infile:
  # reading lines of the vocab
  voc_lines = voc_infile.read().split("\n")
print(voc_lines[:5])
print(len(voc_lines))

#i = 0
#for k, v in vocab.items():
#  print(f'{k}:{v}')
#  i += 1
 # if i > 20:
  #  break

['!', '#', '$', '%', '&']
23777


In [5]:
voc_lines[-5:]

['zones', 'zoning', '{', '}', '']

In [6]:
# creating a dictionary with the index of the words in the vocab
vocab = {}
for i, word in enumerate(sorted(voc_lines)):
  vocab[word] = i

len(vocab)

23777

In [7]:
# loading the test corpus
with open("WSJ_24.pos", 'r') as test_infile:
  y = test_infile.readlines()

y[:10]

['The\tDT\n',
 'economy\tNN\n',
 "'s\tPOS\n",
 'temperature\tNN\n',
 'will\tMD\n',
 'be\tVB\n',
 'taken\tVBN\n',
 'from\tIN\n',
 'several\tJJ\n',
 'vantage\tNN\n']

In [8]:
_, prep = preprocess(vocab, "test.words")

print(f'Length of preprocessed test corpus = {len(prep)}')
prep[:10]

Length of preprocessed test corpus = 34199


['The',
 'economy',
 "'s",
 'temperature',
 'will',
 'be',
 'taken',
 'from',
 'several',
 '--unk--']

In [9]:
# creating the transition, emission and tag counts
def create_dictionaries(training_corpus, vocab):
  """
  Input: 
    training_corpus: a corpus where each line has a word followed by a tag
    vocab: a dictionary were keys are words and values are their frequencies
  Output:
    transition_counts: dictionary where keys are pairs of type (previous_tag, tag) and values are their counts
    emission_counts: dictionary where keys are pairs of type (tag, word) and values are their frequencies
    tag_counts: dictionary where keys are tags and values are their corresponding frequencies in the training corpus
  """

  # init dictionaries
  emission_counts = defaultdict(int)
  transition_counts = defaultdict(int)
  tag_counts = defaultdict(int)

  # init prev_tag with starting placeholder
  prev_tag = "--s--"

  i = 0 # counter to track the line number in training corpus

  # looping through all items in training corpus
  for word_tag in training_corpus: 
    i+=1
    #print(word_tag)
    # obtaining the word and tag
    word, tag = get_word_tag(word_tag, vocab)

    # updating transition_counts dict
    transition_counts[(prev_tag, tag)] += 1

    # updating emission_counts dict
    emission_counts[(tag, word)] += 1

    # updating tag_count
    tag_counts[tag] += 1

    prev_tag = tag
  
  return transition_counts, emission_counts, tag_counts


In [10]:
transition_counts, emission_counts, tag_counts = create_dictionaries(training, vocab)

In [11]:
print(len(transition_counts))
print(len(emission_counts))
print(len(tag_counts))

1421
31140
46


A complete description of the tag set can be found [here](https://www.clips.uantwerpen.be/pages/mbsp-tags).

In [12]:
hidden_states = sorted(tag_counts.keys())
print(len(hidden_states))
print(hidden_states)

46
['#', '$', "''", '(', ')', ',', '--s--', '.', ':', 'CC', 'CD', 'DT', 'EX', 'FW', 'IN', 'JJ', 'JJR', 'JJS', 'LS', 'MD', 'NN', 'NNP', 'NNPS', 'NNS', 'PDT', 'POS', 'PRP', 'PRP$', 'RB', 'RBR', 'RBS', 'RP', 'SYM', 'TO', 'UH', 'VB', 'VBD', 'VBG', 'VBN', 'VBP', 'VBZ', 'WDT', 'WP', 'WP$', 'WRB', '``']


In [13]:
# checking examples of transitions, emissions and ambiguous words
print("A few transition examples:")
for ex in list(transition_counts.items())[:3]:
  print(ex)

print()

print("A few emission exampels:")
for ex in list(emission_counts.items())[:5]:
  print(ex)

print()

print("Examples of ambiguous words (words with more than one tag):")
for pair, cnt in emission_counts.items():
  if pair[1] == 'back' or pair[1] == 'well':
    print(pair, cnt)

A few transition examples:
(('--s--', 'IN'), 5050)
(('IN', 'DT'), 32364)
(('DT', 'NNP'), 9044)

A few emission exampels:
(('IN', 'In'), 1735)
(('DT', 'an'), 3142)
(('NNP', 'Oct.'), 317)
(('CD', '19'), 100)
(('NN', 'review'), 36)

Examples of ambiguous words (words with more than one tag):
('RB', 'well') 409
('RB', 'back') 304
('UH', 'well') 5
('VB', 'back') 20
('RP', 'back') 84
('JJ', 'back') 25
('NN', 'back') 29
('VBP', 'back') 4
('NN', 'well') 9
('JJ', 'well') 1


In [14]:
list(emission_counts.items())[200:203]

[(('DT', 'any'), 721), (('NN', 'decrease'), 7), (('NN', 'insider-trading'), 5)]

In [15]:
# testing using just the emission counts dictionary
# this will assign a tag to each word in the preprocessed testing corpus - prep
# this will also calculate the accuracy of the model using the ground truth tags in y

def predict_pos(prep, y, emission_counts, vocab, states):
  """
  Function to predict the POS tag for each word in the preprocessed test corpus
  Input:
    prep: a preprocessed version of the test corpus - a list containing all the words in the test corpus
    y: the original test corpus, with words and corresponding ground truth tags
    emission_counts: dictionary storing the frequency of (tag,word) occurrences
    vocab: dict where keys are words and values are indices
    states: sorted list of all possible tags / hidden states
  Output:
    accuracy: number of times the tagger predicts correctly
  """

  crct_cnt = 0

  # getting all possible unique pairs of (tag,words)
  all_pairs = set(emission_counts.keys())

  # count of tuples in the test corpus
  total = len(y)
  act_cnt = 0
  for word, y_tup in zip(prep, y):
    # splitting into a list of word and POS
    y_tup_l = y_tup.split()
    # checking if it contains both the word and POS
    if len(y_tup_l) == 2:
      true_lab = y_tup_l[1]
    else:
      continue
    
    largest_cnt = 0
    final_pos = '' 

    if word in vocab:
      act_cnt += 1
      for pos in states:
        key = (pos, word)

        if key in emission_counts:
          count = emission_counts[key]

          if count > largest_cnt:
            largest_cnt = count
            final_pos = pos
        
      if final_pos == true_lab:
        crct_cnt += 1
      
  accuracy = crct_cnt/act_cnt
  return accuracy      


In [16]:
accuracy = predict_pos(prep, y, emission_counts, vocab, hidden_states)
print(f"Accuracy of tagger using just emission counts = {accuracy}")

Accuracy of tagger using just emission counts = 0.9252731866191825


In [17]:
def create_transition_matrix(alpha, tag_counts, transition_counts):
  """
  Function to calculate the transition probabilities matrix
  Input: 
    alpha: Smoothing parameter
    tag_counts: dictionary storing the frequency of occurrence of each tag
    transition_counts: dictionary storing (prev_tag, tag) -> occurrence count key value pairs
  Output:
    A: a matrix of dim (num_tags, num_tags) storing smoothed probabilities of transitions between the different tags
  """

  all_tags = (sorted(tag_counts.keys()))
  num_tags = len(all_tags)

  # init matrix A
  A = np.zeros((num_tags, num_tags))

  # obtaining the unique transition pair keys
  trans_keys = set(transition_counts.keys())

  for i in range(num_tags):
    for j in range(num_tags):
      count = 0
      key = (all_tags[i], all_tags[j])

      if key in trans_keys:
        count = transition_counts[key]
      count_prev_tag = tag_counts[all_tags[i]]

      A[i,j] = (count + alpha)/((count_prev_tag) + (alpha*num_tags))
  return A



In [18]:
alpha = 0.001
A = create_transition_matrix(alpha, tag_counts, transition_counts)

print(f"A at row 0, col 0: {A[0,0]:.9f}")
print(f"A at row 3, col 1: {A[3,1]:.4f}")

print("A subset of the transition matrix A:")
print()
A_sub = pd.DataFrame(A[0:5, 30:35], index = hidden_states[0:5], columns = hidden_states[30:35])
print(A_sub)

A at row 0, col 0: 0.000007040
A at row 3, col 1: 0.1691
A subset of the transition matrix A:

             RBS            RP           SYM            TO            UH
#   7.039973e-06  7.039973e-06  7.039973e-06  7.039973e-06  7.039973e-06
$   1.356476e-07  1.356476e-07  1.356476e-07  1.356476e-07  1.356476e-07
''  1.445286e-07  1.445286e-07  1.445286e-07  1.604282e-02  1.445286e-07
(   7.320398e-07  7.320398e-07  7.320398e-07  5.125010e-03  3.660931e-03
)   7.267199e-07  7.267199e-07  7.267199e-07  2.034888e-02  7.267199e-07


In [19]:
def create_emission_matrix(alpha, tag_counts, emission_counts, vocab):
  """
  Function to calculate the emission probability matrix
  Input: 
    alpha: smoothing parameter
    tag_counts: frequency of occurrence of all the tags
    emission_counts: frequency of occurrence of (tag, word) pairs
    vocab: dictionary storing the index of words 
  Output:
    B: emission probability matrix
  """

  # get total number of tags
  num_tags = len(tag_counts)

  # getting list of all tags 
  all_tags = sorted(tag_counts.keys())

  # get total number of words
  num_words = len(vocab)

  # getting all words 
  #all_words = list(vocab.keys())

  # init emission probabilities matrix
  B = np.zeros((num_tags, num_words))

  # getting all the emission_counts keys
  emiss_keys = set(list(emission_counts.keys()))

  for i in range(num_tags):
    for j in range(num_words):
      count = 0
      key = (all_tags[i], vocab[j])

      if key in emiss_keys:
        count = emission_counts[key]
      
      count_tag = tag_counts[all_tags[i]]

      B[i,j] = (count + alpha)/(count_tag + (alpha*num_words))
  
  return B

In [20]:
# creating the emission probability matrix

B = create_emission_matrix(alpha, tag_counts, emission_counts, list(vocab))

# verifying part of the emission matrix

print(f"B[0,0] = {B[0,0]:.9f}")
print(f"B[3,1] = {B[3,1]:.9f}")

B[0,0] = 0.000006032
B[3,1] = 0.000000720


Viterbi Algorithm:

In [21]:
# initializing the best_paths and best_probabilities matrices
def initialize(states, tag_counts, A, B, corpus, vocab):
  """
  Function to initialize the best_probs and best_path matrices
  Input: 
    states: list of all hidden states (tags)
    tag_counts: frequency of occurrence of all the tags
    A: transition probability matrix
    B: emission probability matrix
    corpus: sequence of words whose POS is to be identified
    vocab: dictionary of (word, index) pair
  Output:
    best_probs: (num_tags, len(corpus)) matrix to identify max prob of following a transition sequence
    best_paths: (num_tags, len(corpus)) matrix of ints to store labels for all states traversed
  """

  num_tags = len(tag_counts)

  best_probs = np.zeros((num_tags, len(corpus)), dtype=float)
  best_paths = np.zeros((num_tags, len(corpus)), dtype=int)

  s_idx = states.index("--s--")

  for i in range(num_tags):
    if A[s_idx, i] == 0:
    # special case when transition probability from --s-- to tag is 0
      best_probs[i,0] = float('-inf')
    else:
      best_probs[i, 0] = (math.log(A[s_idx, i])) + math.log(B[i,vocab[corpus[0]]])
  
  return best_probs, best_paths

In [22]:
best_probs, best_paths = initialize(hidden_states, tag_counts, A, B, prep, vocab)

In [31]:
print(f"best_probs[0,0]:{best_probs[0,0]:.4f}")
print(f"best_probs[2,3]:{best_probs[2,3]:.4f}")
best_probs[2,0]

best_probs[0,0]:-22.6098
best_probs[2,3]:0.0000


-23.572988219423895

In [46]:
def viterbi_forward(A, B, test_corpus, best_probs, best_paths, vocab):
    '''
    Input: 
        A, B: The transiton and emission matrices respectively
        test_corpus: a list containing a preprocessed corpus
        best_probs: an initilized matrix of dimension (num_tags, len(corpus))
        best_paths: an initilized matrix of dimension (num_tags, len(corpus))
        vocab: a dictionary where keys are words in vocabulary and value is an index 
    Output: 
        best_probs: a completed matrix of dimension (num_tags, len(corpus))
        best_paths: a completed matrix of dimension (num_tags, len(corpus))
    '''
    num_tags = best_probs.shape[0]
    
    for i in range(1, len(test_corpus)): 
        
        if i % 5000 == 0:
            print("Words processed: {:>8}".format(i))
            
        for j in range(num_tags): 
            
            # Initialize best_prob for word i to negative infinity
            best_prob_i = float("-inf")
            
            # Initialize best_path for current word i to None
            best_path_i = None

            
            for k in range(num_tags): 
            
                prob = best_probs[k,i-1]+math.log(A[k,j]) +math.log(B[j,vocab[test_corpus[i]]])

                if prob > best_prob_i: 
                    
                    # Keep track of the best probability
                    best_prob_i = prob
                    
                    # keep track of the POS tag of the previous word
                    best_path_i = k

            
            best_probs[j,i] = best_prob_i
            best_paths[j,i] = best_path_i

    return best_probs, best_paths

In [47]:
best_probs, best_paths = viterbi_forward(A, B, prep, best_probs, best_paths, vocab)

Words processed:     5000
Words processed:    10000
Words processed:    15000
Words processed:    20000
Words processed:    25000
Words processed:    30000


In [48]:
print(f"best_probs[0,1] = {best_probs[0,1]:.4f}")

best_probs[0,1] = -24.7822


In [49]:
best_probs[0,4]

-49.560126133711904

In [50]:
# viterbi backward pass
def viterbi_backward(best_probs, best_paths, corpus, states):
  """
  Function to assign tags by backtracking using matrices populated in the forward pass
  Output: 
    z: An array having same size as test_corpus storing corresponding states
  """
  # obtain size of corpus
  m = best_probs.shape[1]

  # obtaining number of tags
  num_tags = best_probs.shape[0]

  # initialize result array
  z = [None] * m
  pred = [None] * m

  # find max probability tag for last word
  best_lw_prob = float('-inf')
  for k in range(num_tags):
    if best_probs[k,m-1] > best_lw_prob:
      best_lw_prob = best_probs[k,m-1]
      z[m-1] = k
  
  pred[m-1] = states[z[m-1]]

  # finding POS tags for previous words by backtracking

  for i in range(m-1, 1, -1):
    pos_tag_i = z[i]

    z[i-1] = best_paths[pos_tag_i, i]

    pred[i-1] = states[z[i-1]]
  
  return pred




In [51]:
pred = viterbi_backward(best_probs, best_paths, prep, hidden_states)

m = len(pred)
print('The prediction for pred[-7:m-1] is: \n', prep[-7:m-1], "\n", pred[-7:m-1], "\n")
print('The prediction for pred[0:8] is: \n', pred[0:7], "\n", prep[0:7])

The prediction for pred[-7:m-1] is: 
 ['see', 'them', 'here', 'with', 'us', '.'] 
 ['VB', 'PRP', 'RB', 'IN', 'PRP', '.'] 

The prediction for pred[0:8] is: 
 [None, 'NN', 'POS', 'NN', 'MD', 'VB', 'VBN'] 
 ['The', 'economy', "'s", 'temperature', 'will', 'be', 'taken']


In [52]:
# predicting on preprocessed test corpus

print(prep[2])
print(pred[2])
print(y[2])

's
POS
's	POS



In [53]:
def get_accuracy(pred, y):
  """
  Function to compute accuracy of tags assigned using Viterbi algo
  Input:
    pred: predicted POS tags
    y: list of lines containing the words and corresponding ground truth POS tags
  Output:
    accuracy score
  """

  num_correct = 0
  total = 0

  for prediction, word_tag in zip(pred, y):
    wt_tuple = word_tag.split()

    if len(wt_tuple) < 2:
      continue
    
    word, tag = wt_tuple[0], wt_tuple[1]
    if prediction == tag:
      num_correct+=1
    total+=1

  return num_correct/total




In [54]:
print(f"Final accuracy using the Viterbi Algorithm = {get_accuracy(pred, y):.5f}")

Final accuracy using the Viterbi Algorithm = 0.95303
