In [None]:
import nltk, re, pprint
import numpy as np
import pandas as pd
import requests
import matplotlib.pyplot as plt
import seaborn as sns
import pprint, time
import random
from sklearn.model_selection import train_test_split
from nltk.tokenize import word_tokenize
nltk.download('treebank')

[nltk_data] Downloading package treebank to /root/nltk_data...
[nltk_data]   Package treebank is already up-to-date!


True

In [None]:
# reading the Treebank tagged sentences
wsj = list(nltk.corpus.treebank.tagged_sents())
print(wsj[:40])

[[('Pierre', 'NNP'), ('Vinken', 'NNP'), (',', ','), ('61', 'CD'), ('years', 'NNS'), ('old', 'JJ'), (',', ','), ('will', 'MD'), ('join', 'VB'), ('the', 'DT'), ('board', 'NN'), ('as', 'IN'), ('a', 'DT'), ('nonexecutive', 'JJ'), ('director', 'NN'), ('Nov.', 'NNP'), ('29', 'CD'), ('.', '.')], [('Mr.', 'NNP'), ('Vinken', 'NNP'), ('is', 'VBZ'), ('chairman', 'NN'), ('of', 'IN'), ('Elsevier', 'NNP'), ('N.V.', 'NNP'), (',', ','), ('the', 'DT'), ('Dutch', 'NNP'), ('publishing', 'VBG'), ('group', 'NN'), ('.', '.')], [('Rudolph', 'NNP'), ('Agnew', 'NNP'), (',', ','), ('55', 'CD'), ('years', 'NNS'), ('old', 'JJ'), ('and', 'CC'), ('former', 'JJ'), ('chairman', 'NN'), ('of', 'IN'), ('Consolidated', 'NNP'), ('Gold', 'NNP'), ('Fields', 'NNP'), ('PLC', 'NNP'), (',', ','), ('was', 'VBD'), ('named', 'VBN'), ('*-1', '-NONE-'), ('a', 'DT'), ('nonexecutive', 'JJ'), ('director', 'NN'), ('of', 'IN'), ('this', 'DT'), ('British', 'JJ'), ('industrial', 'JJ'), ('conglomerate', 'NN'), ('.', '.')], [('A', 'DT'), ('f

In [None]:
# Splitting into train and test
random.seed(1234)
train_set, test_set = train_test_split(wsj,test_size=0.3)
print(len(train_set))
print(len(test_set))

2739
1175


In [None]:
# Custom train set
train_set = [[('Martin', 'NNP'), (',', ','), ('Justin', 'NNP'), ('can', 'MD'), ('watch', 'VBN'), ('will', 'NNP'), ('.', '.')], [('Spot', 'NNP'), ('will', 'MD'), ('watch', 'VBN'), ('Martin', 'NNP'), ('.', '.')], [('Will', 'MD'), ('Justin', 'NNP'), ('spot', 'VBN'), ('Martin', 'NNP'), ('.', '.')], [('Martin', 'NNP'), ('will', 'MD'), ('pat', 'VBN'), ('Spot', 'NNP'), ('.', '.')]]

In [None]:
test_set

[[('During', 'IN'),
  ('the', 'DT'),
  ('current', 'JJ'),
  ('crop', 'NN'),
  ('year', 'NN'),
  (',', ','),
  ('Brazil', 'NNP'),
  ('was', 'VBD'),
  ('expected', 'VBN'),
  ('*-1', '-NONE-'),
  ('to', 'TO'),
  ('produce', 'VB'),
  ('6.9', 'CD'),
  ('million', 'CD'),
  ('tons', 'NNS'),
  ('of', 'IN'),
  ('sugar', 'NN'),
  (',', ','),
  ('a', 'DT'),
  ('drop', 'NN'),
  ('from', 'IN'),
  ('8.1', 'CD'),
  ('million', 'CD'),
  ('tons', 'NNS'),
  ('in', 'IN'),
  ('1988-89', 'CD'),
  ('.', '.')],
 [('In', 'IN'),
  ('1966', 'CD'),
  (',', ','),
  ('on', 'IN'),
  ('route', 'NN'),
  ('to', 'TO'),
  ('a', 'DT'),
  ('re-election', 'NN'),
  ('rout', 'NN'),
  ('of', 'IN'),
  ('Democrat', 'NNP'),
  ('Frank', 'NNP'),
  ("O'Connor", 'NNP'),
  (',', ','),
  ('GOP', 'NNP'),
  ('Gov.', 'NNP'),
  ('Nelson', 'NNP'),
  ('Rockefeller', 'NNP'),
  ('of', 'IN'),
  ('New', 'NNP'),
  ('York', 'NNP'),
  ('appeared', 'VBD'),
  ('in', 'IN'),
  ('person', 'NN'),
  ('*-1', '-NONE-'),
  ('saying', 'VBG'),
  (',', ','),
 

In [None]:
# Getting list of tagged words
train_tagged_words = [tup for sent in train_set for tup in sent]
len(train_tagged_words)

22

In [None]:
# tokens
tokens = [pair[0] for pair in train_tagged_words]
tokens[:10]

['Martin', ',', 'Justin', 'can', 'watch', 'will', '.', 'Spot', 'will', 'watch']

In [None]:
# vocabulary
V = set(tokens)
print(len(V))

# number of tags
T = set([pair[1] for pair in train_tagged_words])
print(T)

11
{'NNP', '.', ',', 'VBN', 'MD'}


In [None]:
# computing P(w/t) and storing in T x V matrix
t = len(T)
v = len(V)
w_given_t = np.zeros((t, v))

# **Emission Probability**

In [None]:
# compute word given tag: Emission Probability
def word_given_tag(word, tag, train_bag = train_tagged_words):
    tag_list = [pair for pair in train_bag if pair[1]==tag]
    count_tag = len(tag_list)
    w_given_tag_list = [pair[0] for pair in tag_list if pair[0]==word]
    count_w_given_tag = len(w_given_tag_list)

    return (count_w_given_tag, count_tag)

# **Transition Probability**

In [None]:
# compute tag given tag: tag2(t2) given tag1 (t1), i.e. Transition Probability
def t2_given_t1(t2, t1, train_bag = train_tagged_words):
    tags = [pair[1] for pair in train_bag]
    count_t1 = len([t for t in tags if t==t1])
    count_t2_t1 = 0
    for index in range(len(tags)-1):
        if tags[index]==t1 and tags[index+1] == t2:
            count_t2_t1 += 1
    return (count_t2_t1, count_t1)

# **Transition Probability Matrix**

In [None]:
tags_matrix = np.zeros((len(T), len(T)), dtype='float32')
for i, t1 in enumerate(list(T)):
    for j, t2 in enumerate(list(T)):
        tags_matrix[i, j] = t2_given_t1(t2, t1)[0]/t2_given_t1(t2, t1)[1]

# convert the matrix to a df for better readability
tags_df = pd.DataFrame(tags_matrix, columns = list(T), index=list(T))
tags_df

Unnamed: 0,NNP,.,",",VBN,MD
NNP,0.0,0.444444,0.111111,0.111111,0.333333
.,0.5,0.0,0.0,0.0,0.25
",",1.0,0.0,0.0,0.0,0.0
VBN,1.0,0.0,0.0,0.0,0.0
MD,0.25,0.0,0.0,0.75,0.0


In [None]:
tags_df.loc['.', :]

Unnamed: 0,.
NNP,0.5
.,0.0
",",0.0
VBN,0.0
MD,0.25


# **Assigning Tags to the Test Data**

In [None]:
def Viterbi(words, train_bag = train_tagged_words):
    state = []
    T = list(set([pair[1] for pair in train_bag]))

    for key, word in enumerate(words):
        #initialise list of probability column for a given observation
        p = []
        for tag in T:
            if key == 0:
                transition_p = tags_df.loc['.', tag]
            else:
                transition_p = tags_df.loc[state[-1], tag]

            # compute emission and state probabilities
            emission_p = word_given_tag(words[key], tag)[0]/word_given_tag(words[key], tag)[1]
            state_probability = emission_p * transition_p
            p.append(state_probability)

        pmax = max(p)
        # getting state for which probability is maximum
        state_max = T[p.index(pmax)]
        state.append(state_max)
    return list(zip(words, state))

In [None]:
rndom = [random.randint(1,len(test_set)) for x in range(5)]

corpus = "Jack will spot will"
test_tagged_words = corpus.split()

# Expected result
expected_result = [('Jack', 'NNP'), ('will', 'MD'), ('spot', 'VBN'), ('will', 'NNP')]

# list of sents
test_run = [test_set[i] for i in rndom]

# list of tagged words
test_run_base = [tup for sent in test_run for tup in sent]

# list of untagged words
tagged_seq = Viterbi(test_tagged_words)

# Calculating accuracy
check = [i for i, j in zip(tagged_seq, expected_result) if i == j]
accuracy = len(check)/len(tagged_seq)

print(tagged_seq)
print("\nAccuracy : ", accuracy * 100)

[('Jack', 'NNP'), ('will', 'MD'), ('spot', 'VBN'), ('will', 'NNP')]

Accuracy :  100.0
