In [1]:
# Importing libraries
import nltk
import time
import random
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
 
# download the treebank corpus from nltk
nltk.download('treebank')
 
# download the universal tagset from nltk
nltk.download('universal_tagset')
 
# reading the Treebank tagged sentences
nltk_data = list(nltk.corpus.treebank.tagged_sents(tagset='universal'))
 
# print the first two sentences along with tags
print(nltk_data[:2])

[nltk_data] Downloading package treebank to /root/nltk_data...
[nltk_data]   Unzipping corpora/treebank.zip.
[nltk_data] Downloading package universal_tagset to /root/nltk_data...
[nltk_data]   Unzipping taggers/universal_tagset.zip.


[[('Pierre', 'NOUN'), ('Vinken', 'NOUN'), (',', '.'), ('61', 'NUM'), ('years', 'NOUN'), ('old', 'ADJ'), (',', '.'), ('will', 'VERB'), ('join', 'VERB'), ('the', 'DET'), ('board', 'NOUN'), ('as', 'ADP'), ('a', 'DET'), ('nonexecutive', 'ADJ'), ('director', 'NOUN'), ('Nov.', 'NOUN'), ('29', 'NUM'), ('.', '.')], [('Mr.', 'NOUN'), ('Vinken', 'NOUN'), ('is', 'VERB'), ('chairman', 'NOUN'), ('of', 'ADP'), ('Elsevier', 'NOUN'), ('N.V.', 'NOUN'), (',', '.'), ('the', 'DET'), ('Dutch', 'NOUN'), ('publishing', 'VERB'), ('group', 'NOUN'), ('.', '.')]]


In [2]:
# print each word with its respective tag for first two sentences
for sent in nltk_data[:2]:
    print('==============')
    for tuple in sent:
        print(tuple,)

('Pierre', 'NOUN')
('Vinken', 'NOUN')
(',', '.')
('61', 'NUM')
('years', 'NOUN')
('old', 'ADJ')
(',', '.')
('will', 'VERB')
('join', 'VERB')
('the', 'DET')
('board', 'NOUN')
('as', 'ADP')
('a', 'DET')
('nonexecutive', 'ADJ')
('director', 'NOUN')
('Nov.', 'NOUN')
('29', 'NUM')
('.', '.')
('Mr.', 'NOUN')
('Vinken', 'NOUN')
('is', 'VERB')
('chairman', 'NOUN')
('of', 'ADP')
('Elsevier', 'NOUN')
('N.V.', 'NOUN')
(',', '.')
('the', 'DET')
('Dutch', 'NOUN')
('publishing', 'VERB')
('group', 'NOUN')
('.', '.')


In [3]:
# Experimental: Try removing the noise(X) and punctuations(.) tags to check for better accuracy
'''
new_nltk = [[] for i in range(len(nltk_data))]
i=0
for sent in nltk_data:
  for tup in sent:
    if tup[1] == 'X' or tup[1] == '.':
      continue
    else:
      print(i)
      new_nltk[i].append(tup)
  i+=1
nltk_data = new_nltk
'''

"\nnew_nltk = [[] for i in range(len(nltk_data))]\ni=0\nfor sent in nltk_data:\n  for tup in sent:\n    if tup[1] == 'X' or tup[1] == '.':\n      continue\n    else:\n      print(i)\n      new_nltk[i].append(tup)\n  i+=1\nnltk_data = new_nltk\n"

In [4]:
# split data into training and validation set in the ratio 80:20
train_set, test_set = train_test_split(nltk_data, train_size = 0.80, test_size = 0.20, random_state = 101)

In [5]:
# create list of train and test tagged words
train_tagged_words = [ tup for sent in train_set for tup in sent ]
test_tagged_words = [ tup for sent in test_set for tup in sent ]

print(len(train_tagged_words))
print(len(test_tagged_words))

80310
20366


In [6]:
# check some of the tagged words.
train_tagged_words[:5]

[('Drink', 'NOUN'),
 ('Carrier', 'NOUN'),
 ('Competes', 'VERB'),
 ('With', 'ADP'),
 ('Cartons', 'NOUN')]

In [7]:
# use set datatype to check how many unique tags are present in training data
tags = {tag for word, tag in train_tagged_words}
print('Number of tags:', len(tags))
print(tags)
 
# check total words in vocabulary
vocab = {word for word, tag in train_tagged_words}
print('Number of words in vocabulary:', len(vocab))

Number of tags: 12
{'PRT', 'NUM', 'X', 'DET', 'CONJ', 'ADJ', 'VERB', '.', 'ADV', 'NOUN', 'PRON', 'ADP'}
Number of words in vocabulary: 11052


In [8]:
# compute Emission Probability
def word_given_tag(words, tags, train_bag = train_tagged_words):
    # your code here

    # Create a dictionary of dictionaries to store the emission proabilities and dictionary of tags to store tag counts
    emission_matrix = {}
    tag_count = {}
    
    # Initialization
    for tag in tags:
      tag_count[tag] = 0.0

    # Calculate emission count and tag count by iterating over train dataset
    for word,tag in train_bag:
      if word not in emission_matrix:
        emission_matrix[word] = {tg1:0.0 for tg1 in tags}  # Intitalize all tag values to zero for each new word found in dataset
      emission_matrix[word][tag]+=1
      tag_count[tag]+=1

    # Calculate emission probabilities (emission_probability[word][tag] = (count of word,tag occurence in train_bag)/(count of tag occurence in train_bag))
    for word_tag_count_dict in emission_matrix.values():
      for tag in word_tag_count_dict.keys():
        word_tag_count_dict[tag] = word_tag_count_dict[tag]/tag_count[tag]
    
    # Initialize the prorabilities for unknown words in the test dataset to zero
    for word in words:
      if word not in emission_matrix:
        emission_matrix[word] = {tg1:1.0 for tg1 in tags}
    
    return emission_matrix


# compute Transition Probability
def t2_given_t1(tags, train_bag = train_tagged_words):
    # your code here

    # Create a dictionary of dictionaries to store the transition probabilities and dictionary of tags to store tag counts
    transition_matrix = {}
    tag_count = {}

    # Create a list of train dataset from 2nd word to the end to calculate transitions
    train_bag_next = train_bag[1:]
    
    # Inititalize tag counts and transition proability dictionary(matrix)
    for tag in tags:
      tag_count[tag] = 0.0
    
    for tag1 in tags:
      transition_matrix[tag1] = {}
      for tag2 in tags:
        transition_matrix[tag1][tag2] = 0.0

    # Calculate transition counts and tag counts by iterating over train dataset
    for tup1,tup2 in list(zip(train_bag[:-1],train_bag_next)):
      tag_count[tup1[1]]+=1
      transition_matrix[tup1[1]][tup2[1]]+=1
    
    # Calculate transition probabilities(transition_probability[tag1][tag2] = (count of tag2 after tag1 occurence in train_bag)/(total count of all tags after tag1 occurence in train_bag))
    for tag1,next_tag_count in transition_matrix.items():
      for tag2 in next_tag_count.keys():
        next_tag_count[tag2] = next_tag_count[tag2]/tag_count[tag1]

    return transition_matrix



# create t x t transition matrix of tags, t = no of tags
# Matrix(i, j) represents P(jth tag after the ith tag)
 
#word_given_tag(vocab, tags, train_tagged_words).shape
#t2_given_t1(tags, train_tagged_words)

In [9]:
# you can also convert the matrix to a pandas dataframe for better readability

In [10]:
def Viterbi(words, train_bag = train_tagged_words):
    # your code here

    # Create a list of tags
    tags = list({tag for word, tag in train_bag})

    # Calculate emission and transition probabilities
    emission_matrix = word_given_tag(words, tags, train_bag)
    transition_matrix = t2_given_t1(tags, train_bag)
    
    # Intitalize the probability matrix and the bakctrack matrix
    tag_given_word_max_probability_matrix = np.array([[0.0 for i in range(len(words))] for j in range(len(tags))]) 
    backtrack_best_path_matrix = np.array([['00000' for i in range(len(words))] for j in range(len(tags))])  # Take a string of 5 zeros as numpy stores unicode string of length <= initialized value

    #  Calculate the initial probabilities for tags(hidden states) occuring in the train dataset
    tag_prob = {}
    for word,tag in train_bag:
      if tag in tag_prob:
        tag_prob[tag]+=1/len(train_bag)
      else:
        tag_prob[tag]=1/len(train_bag)

    # Populate the probability matrix with prior probabilites in the 1st column (π vector)
    for i in range(len(tags)):
      tag_given_word_max_probability_matrix[i,0] = emission_matrix[words[0]][tags[i]]*tag_prob[tags[i]]
    
    # Normalize the probabilities (Since the values will keep on decreasing in each time step (i.e., with each word in sequence))
    tag_given_word_max_probability_matrix[:,0] = np.array([prob for prob in tag_given_word_max_probability_matrix[:,0]])
    tag_given_word_max_probability_matrix[:,0] = np.array([exp/np.sum(tag_given_word_max_probability_matrix[:,0]) for exp in tag_given_word_max_probability_matrix[:,0]])

    # Calculate forward pass probabilities - O(N^2 * T)
    for i in range(1, tag_given_word_max_probability_matrix.shape[1]):
      for j in range(len(tags)):
        probability_of_tagj_given_prev_tag = [0.0]*len(tags)    # Initialize a list which will store the probability of transition to tag 'j' in layer 'i' from each element in layer 'i-1'
        for k in range(len(tags)):
          probability_of_tagj_given_prev_tag[k] = tag_given_word_max_probability_matrix[k,i-1]*transition_matrix[tags[k]][tags[j]]*emission_matrix[words[i]][tags[j]]

        # Choose the max transition probability and argmax for tag 'j' from each tag in the previous layer
        tag_given_word_max_probability_matrix[j,i] = max(probability_of_tagj_given_prev_tag)
        backtrack_best_path_matrix[j,i] = tags[max(range(len(tags)), key=lambda x:probability_of_tagj_given_prev_tag[x])]
      
      # Normalize the probabilities (Since the values will keep on decreasing in each time step (i.e., with each word in sequence))
      tag_given_word_max_probability_matrix[:,i] = np.array([prob for prob in tag_given_word_max_probability_matrix[:,i]])
      tag_given_word_max_probability_matrix[:,i] = np.array([exp/np.sum(tag_given_word_max_probability_matrix[:,i]) for exp in tag_given_word_max_probability_matrix[:,i]])
    
    # Get the probability values from the backtrack matrix
    tags1 = {tags[i]:i for i in range(len(tags))}     # Create a mapping from tag to numeric index
    x = tags[np.argmax(tag_given_word_max_probability_matrix[:,-1])]  # Choose the 1st tag from argmax of probabilities of last layer
    res = [(words[backtrack_best_path_matrix.shape[1]-1], x)]

    for i in range(backtrack_best_path_matrix.shape[1]-1,0,-1):       # Backtrack from the last element to 1st and pick the most likely sequence
      x = backtrack_best_path_matrix[:,i][tags1[x]]
      res.insert(0, (words[i-1], x))

    return res

#Viterbi(test_tagged_words)

In [11]:
'''
tgs = ['X', '.', 'ADP', 'PRT', 'NUM', 'ADV', 'ADJ', 'CONJ', 'DET', 'NOUN', 'PRON', 'VERB']
tgs1 = {tgs[i]:i for i in range(len(tgs))}
x = tgs[np.argmax(pr[:,-1])]
res = [(test_run_base[btr.shape[1]-1][0], x)]

for i in range(btr.shape[1]-1,0,-1):
  x = btr[:,i][tgs1[x]]
  res.insert(0, (test_run_base[i-1][0], x))

res
'''

"\ntgs = ['X', '.', 'ADP', 'PRT', 'NUM', 'ADV', 'ADJ', 'CONJ', 'DET', 'NOUN', 'PRON', 'VERB']\ntgs1 = {tgs[i]:i for i in range(len(tgs))}\nx = tgs[np.argmax(pr[:,-1])]\nres = [(test_run_base[btr.shape[1]-1][0], x)]\n\nfor i in range(btr.shape[1]-1,0,-1):\n  x = btr[:,i][tgs1[x]]\n  res.insert(0, (test_run_base[i-1][0], x))\n\nres\n"

In [12]:
# test the Viterbi algorithm on a few sample sentences of test dataset
random.seed(1234)      # define a random seed to get same sentences when run multiple times
 
# choose random 10 numbers
rndom = [random.randint(1, len(test_set)) for x in range(10)]
 
# list of 10 sentencess on which to test the model
test_run = [test_set[i] for i in rndom]
 
# list of tagged words
test_run_base = [tup for sent in test_run for tup in sent]
 
# list of untagged words
test_tagged_words = [tup[0] for sent in test_run for tup in sent]

# testing 10 sentences to check the accuracy
start = time.time()
tagged_seq = Viterbi(test_tagged_words)
end = time.time()
difference = end - start
 
print("Time taken in seconds:", difference)
 
# accuracy should be good enough (> 90%) to be a satisfactory model
check = [i for i, j in zip(tagged_seq, test_run_base) if i == j] 
 
accuracy = len(check) / len(tagged_seq)
print('Viterbi Algorithm Accuracy:', accuracy * 100)

Time taken in seconds: 0.27846264839172363
Viterbi Algorithm Accuracy: 95.21531100478468


In [13]:
# test the Viterbi algorithm on the entire test dataset
# list of tagged words
test_run_base = [tup for sent in test_set for tup in sent]
 
# list of untagged words
test_tagged_words = [tup[0] for sent in test_set for tup in sent]

# testing 10 sentences to check the accuracy
start = time.time()
tagged_seq = Viterbi(test_tagged_words)
end = time.time()
difference = end - start
 
print("Time taken in seconds:", difference)
 
# accuracy should be good enough (> 90%) to be a satisfactory model
check = [i for i, j in zip(tagged_seq, test_run_base) if i == j] 
 
accuracy = len(check) / len(tagged_seq)
print('Viterbi Algorithm Accuracy:', accuracy * 100)

Time taken in seconds: 10.212006568908691
Viterbi Algorithm Accuracy: 93.9359717175685
