## Problem Statement

we'll have to predict the Part of Speech (POS) tag for each word in a provided sentence in this work.

You are required to build a model using Hidden Markov Models which would help you predict the POS tags for all words in an utterance.

### What is a POS tag?

In corpus linguistics, part-of-speech tagging (POS tagging or PoS tagging or POST), also called grammatical tagging or word-category disambiguation, is the process of marking up a word in a text (corpus) as corresponding to a particular part of speech, based on both its definition and its context—i.e., its relationship with adjacent and related words in a phrase, sentence, or paragraph. A simplified form of this is commonly taught to school-age children, in the identification of words as nouns, verbs, adjectives, adverbs, etc.


#### Dataset Description
##### Sample Tuple
b100-5507

Mr.	NOUN
<br>
Podger	NOUN
<br>
had	VERB
<br>
thanked	VERB
<br>
him	PRON
<br>
gravely	ADV
<br>
,	.
<br>
and	CONJ
<br>
now	ADV
<br>
he	PRON
<br>
made	VERB
<br>
use	NOUN
<br>
of	ADP
<br>
the	DET
<br>
advice	NOUN
<br>
.	.
<br>
##### Explanation
The first token "b100-5507" is just a key and acts like an identifier to indicate the beginning of a sentence.
<br>
The other tokens have a (Word, POS Tag) pairing.

__List of POS Tags are:__
.
<br>
ADJ
<br>
ADP
<br>
ADV
<br>
CONJ
<br>
DET
<br>
NOUN
<br>
NUM
<br>
PRON
<br>
PRT
<br>
VERB
<br>
X

__Note__
<br>
__.__ is used to indicate special characters such as '.', ','
<br>
__X__ is used to indicate vocab not part of Enlish Language mostly.
Others are Standard POS tags.


### Train-Test Split
Let us use a 80-20 split of our data for training and evaluation purpose.



In [53]:
#Import libraries
import pandas as pd
import numpy as np
from collections import Counter, defaultdict, namedtuple, OrderedDict
from pomegranate import State, HiddenMarkovModel, DiscreteDistribution
from itertools import chain
import random

In [60]:
Sentence = namedtuple("Sentence", "words tags")

def read_data(filename):
    """This is to read tagged sentence data"""
    with open(filename, 'r') as f:
        sentence_lines = [l.split("\n") for l in f.read().split("\n\n")]
    return OrderedDict(((s[0], Sentence(*zip(*[l.strip().split("\t")
                        for l in s[1:]]))) for s in sentence_lines if s[0]))


def read_tags(filename):
    """This is to read list of word tag classes"""
    with open(filename, 'r') as f:
        tags = f.read().split("\n")
    return frozenset(tags)

class Subset(namedtuple("BaseSet", "sentences keys vocab X tagset Y N stream")):
    def __new__(cls, sentences, keys):
        word_sequences = tuple([sentences[k].words for k in keys])
        tag_sequences = tuple([sentences[k].tags for k in keys])
        wordset = frozenset(chain(*word_sequences))
        tagset = frozenset(chain(*tag_sequences))
        N = sum(1 for _ in chain(*(sentences[k].words for k in keys)))
        stream = tuple(zip(chain(*word_sequences), chain(*tag_sequences)))
        return super().__new__(cls, {k: sentences[k] for k in keys}, keys, wordset, word_sequences,
                               tagset, tag_sequences, N, stream.__iter__)

    def __len__(self):
        return len(self.sentences)

    def __iter__(self):
        return iter(self.sentences.items())


class Dataset(namedtuple("_Dataset", "sentences keys vocab X tagset Y training_set testing_set N stream")):
    def __new__(cls, tagfile, datafile, train_test_split=0.8, seed=112890):
        tagset = read_tags(tagfile)
        sentences = read_data(datafile)
        keys = tuple(sentences.keys())
        wordset = frozenset(chain(*[s.words for s in sentences.values()]))
        word_sequences = tuple([sentences[k].words for k in keys])
        tag_sequences = tuple([sentences[k].tags for k in keys])
        N = sum(1 for _ in chain(*(s.words for s in sentences.values())))
        
        # split data into train/test sets
        _keys = list(keys)
        if seed is not None: random.seed(seed)
        random.shuffle(_keys)
        split = int(train_test_split * len(_keys))
        training_data = Subset(sentences, _keys[:split])
        testing_data = Subset(sentences, _keys[split:])
        stream = tuple(zip(chain(*word_sequences), chain(*tag_sequences)))
        return super().__new__(cls, dict(sentences), keys, wordset, word_sequences, tagset,
                               tag_sequences, training_data, testing_data, N, stream.__iter__)

    def __len__(self):
        return len(self.sentences)

    def __iter__(self):
        return iter(self.sentences.items())

In [61]:
#Read data
 
data = Dataset("D:\\M.Tech\\Sem 2\\AI\\tags.txt", "D:\\M.Tech\\Sem 2\\AI\\data.txt", train_test_split=0.8)

print("{} sentences are there in the corpus.".format(len(data)))
print("{} sentences are there in the training set.".format(len(data.training_set)))
print("{} sentences are there in the testing set.".format(len(data.testing_set)))

57340 sentences are there in the corpus.
45872 sentences are there in the training set.
11468 sentences are there in the testing set.


In [38]:
print("There are a total of {} samples of {} unique words in the corpus."
      .format(data.N, len(data.vocab)))
print("There are {} samples of {} unique words in the training set."
      .format(data.training_set.N, len(data.training_set.vocab)))
print("There are {} samples of {} unique words in the testing set."
      .format(data.testing_set.N, len(data.testing_set.vocab)))
print("There are {} words in the test set that are missing in the training set."
      .format(len(data.testing_set.vocab - data.training_set.vocab)))

assert data.N == data.training_set.N + data.testing_set.N, \
       "The number of training + test samples should be equal to the total number of samples"

There are a total of 1161192 samples of 56057 unique words in the corpus.
There are 928458 samples of 50536 unique words in the training set.
There are 232734 samples of 25112 unique words in the testing set.
There are 5521 words in the test set that are missing in the training set.


In [39]:
# Dataset.stream() usage in getting (word, tag) samples for the entire corpus
print("\n\tStream (word, tag) pairs:\n")
for i, pair in enumerate(data.stream()):
    print("\t", pair)
    if i > 3: break


	Stream (word, tag) pairs:

	 ('Mr.', 'NOUN')
	 ('Podger', 'NOUN')
	 ('had', 'VERB')
	 ('thanked', 'VERB')
	 ('him', 'PRON')


In [40]:
def tagwordpair_counts(tags, words):
    d = defaultdict(lambda: defaultdict(int))
    for tag, word in zip(tags, words):
        d[tag][word] += 1
        
    return d
tags = [tag for i, (word, tag) in enumerate(data.training_set.stream())]
words = [word for i, (word, tag) in enumerate(data.training_set.stream())]

In [41]:

def unknown_replace(sequence):
    
    return [w if w in data.training_set.vocab else 'nan' for w in sequence]

def simplification_decoding(X, model):
    
    _, state_path = model.viterbi(unknown_replace(X))
    return [state[1].name for state in state_path[1:-1]]

In [42]:
def unigram_counts(sequences):

    return Counter(sequences)

tags = [tag for i, (word, tag) in enumerate(data.training_set.stream())]
tag_unigrams = unigram_counts(tags)


In [43]:
def bigram_counts(sequences):

    d = Counter(sequences)
    return d

tags = [tag for i, (word, tag) in enumerate(data.stream())]
o = [(tags[i],tags[i+1]) for i in range(0,len(tags)-2,2)]
tag_bigrams = bigram_counts(o)

In [44]:
def starting_counts(sequences):
    
    d = Counter(sequences)
    return d

tags = [tag for i, (word, tag) in enumerate(data.stream())]
starts_tag = [i[0] for i in data.Y]
tag_starts = starting_counts(starts_tag)

In [45]:
def ending_counts(sequences):
    
    d = Counter(sequences)
    return d

end_tag = [i[len(i)-1] for i in data.Y]
tag_ends = ending_counts(end_tag)

In [46]:

def modelaccuracy(X, Y, model):
    
    correct = total_predictions = 0
    for observations, actual_tags in zip(X, Y):
        try:
            most_likely_tags = simplification_decoding(observations, model)
            correct += sum(p == t for p, t in zip(most_likely_tags, actual_tags))
        except:
            pass
        total_predictions += len(observations)
    return correct / total_predictions

In [47]:
#HMM Model Goes Here
basic_model = HiddenMarkovModel(name="base-hmm-tagger")

tags = [tag for i, (word, tag) in enumerate(data.stream())]
words = [word for i, (word, tag) in enumerate(data.stream())]

tags_count=unigram_counts(tags)
tag_words_count=tagwordpair_counts(tags,words)

starting_tag_list=[i[0] for i in data.Y]
ending_tag_list=[i[-1] for i in data.Y]

starting_tag_count=starting_counts(starting_tag_list)#the number of times a tag occured at the start
ending_tag_count=ending_counts(ending_tag_list)      #the number of times a tag occured at the end



to_pass_states = []
for tag, words_dict in tag_words_count.items():
    total = float(sum(words_dict.values()))
    distribution = {word: count/total for word, count in words_dict.items()}
    tag_emissions = DiscreteDistribution(distribution)
    tag_state = State(tag_emissions, name=tag)
    to_pass_states.append(tag_state)


basic_model.add_states()    
    

start_prob={}

for tag in tags:
    start_prob[tag]=starting_tag_count[tag]/tags_count[tag]

for tag_state in to_pass_states :
    basic_model.add_transition(basic_model.start,tag_state,start_prob[tag_state.name])    

end_prob={}

for tag in tags:
    end_prob[tag]=ending_tag_count[tag]/tags_count[tag]
for tag_state in to_pass_states :
    basic_model.add_transition(tag_state,basic_model.end,end_prob[tag_state.name])
    


transition_prob_pair={}

for key in tag_bigrams.keys():
    transition_prob_pair[key]=tag_bigrams.get(key)/tags_count[key[0]]
for tag_state in to_pass_states :
    for next_tag_state in to_pass_states :
        basic_model.add_transition(tag_state,next_tag_state,transition_prob_pair[(tag_state.name,next_tag_state.name)])

basic_model.bake()

In [48]:
#Model Accuracy Evaluation

hmm_training_acc = modelaccuracy(data.training_set.X, data.training_set.Y, basic_model)
print("Basic HMM Model Training Accuracy is: {:.2f}%".format(100 * hmm_training_acc))

hmm_testing_acc = modelaccuracy(data.testing_set.X, data.testing_set.Y, basic_model)
print("Basic HMM Model Testing Accuracy is: {:.2f}%".format(100 * hmm_testing_acc))

Basic HMM Model Training Accuracy is: 97.49%
Basic HMM Model Testing Accuracy is: 96.09%


In [49]:
#Adds code blocks wherever you feel necessary
for key in data.testing_set.keys[:5]:
    print("Sentence Key: {}\n".format(key))
    print("Here are predicted labels:\n")
    print(simplification_decoding(data.sentences[key].words, basic_model))
    print("\n")
    print("Here are Actual labels:\n")
    print(data.sentences[key].tags)
    print("\n")

Sentence Key: b100-28144

Here are predicted labels:

['CONJ', 'NOUN', 'NUM', '.', 'NOUN', 'NUM', '.', 'NOUN', 'NUM', '.', 'CONJ', 'NOUN', 'NUM', '.', '.', 'NOUN', '.', '.']


Here are Actual labels:

('CONJ', 'NOUN', 'NUM', '.', 'NOUN', 'NUM', '.', 'NOUN', 'NUM', '.', 'CONJ', 'NOUN', 'NUM', '.', '.', 'NOUN', '.', '.')


Sentence Key: b100-23146

Here are predicted labels:

['PRON', 'VERB', 'DET', 'NOUN', 'ADP', 'ADJ', 'ADJ', 'NOUN', 'VERB', 'VERB', '.', 'ADP', 'VERB', 'DET', 'NOUN', 'ADP', 'NOUN', 'ADP', 'DET', 'NOUN', '.']


Here are Actual labels:

('PRON', 'VERB', 'DET', 'NOUN', 'ADP', 'ADJ', 'ADJ', 'NOUN', 'VERB', 'VERB', '.', 'ADP', 'VERB', 'DET', 'NOUN', 'ADP', 'NOUN', 'ADP', 'DET', 'NOUN', '.')


Sentence Key: b100-35462

Here are predicted labels:

['DET', 'ADJ', 'NOUN', 'VERB', 'VERB', 'VERB', 'ADP', 'DET', 'ADJ', 'ADJ', 'NOUN', 'ADP', 'DET', 'ADJ', 'NOUN', '.', 'ADP', 'ADJ', 'NOUN', '.', 'CONJ', 'ADP', 'DET', 'NOUN', 'ADP', 'ADJ', 'ADJ', '.', 'ADJ', '.', 'CONJ', 'ADJ', 'NOUN

### Happy Coding!