In [None]:
### Assignment : 2
### Mohammed Intekhab : 2018AD04076
### Ajay Sharma       : 2018AD04064
### Avinash Chandra   : 2018AD04061

# Artificial and Computational Intelligence (Assignment - 2)

## Problem Statement

As part of the 2nd Assignment, we'll try and predict the Part of Speech (POS) tag for each word in a provided sentence.

You are required to build a model using Hidden Markov Models which would help you predict the POS tags for all words in an utterance.

### What is a POS tag?

In corpus linguistics, part-of-speech tagging (POS tagging or PoS tagging or POST), also called grammatical tagging or word-category disambiguation, is the process of marking up a word in a text (corpus) as corresponding to a particular part of speech, based on both its definition and its context—i.e., its relationship with adjacent and related words in a phrase, sentence, or paragraph. A simplified form of this is commonly taught to school-age children, in the identification of words as nouns, verbs, adjectives, adverbs, etc.

### Dataset

The dataset can be downloaded from https://drive.google.com/open?id=1345iaxqTImJN6mKGh_c1T5n2OWumpyYz. You can access it only using your BITS IDs.

#### Dataset Description
##### Sample Tuple
b100-5507

Mr.	NOUN
<br>
Podger	NOUN
<br>
had	VERB
<br>
thanked	VERB
<br>
him	PRON
<br>
gravely	ADV
<br>
,	.
<br>
and	CONJ
<br>
now	ADV
<br>
he	PRON
<br>
made	VERB
<br>
use	NOUN
<br>
of	ADP
<br>
the	DET
<br>
advice	NOUN
<br>
.	.
<br>
##### Explanation
The first token "b100-5507" is just a key and acts like an identifier to indicate the beginning of a sentence.
<br>
The other tokens have a (Word, POS Tag) pairing.

__List of POS Tags are:__
.
<br>
ADJ
<br>
ADP
<br>
ADV
<br>
CONJ
<br>
DET
<br>
NOUN
<br>
NUM
<br>
PRON
<br>
PRT
<br>
VERB
<br>
X

__Note__
<br>
__.__ is used to indicate special characters such as '.', ','
<br>
__X__ is used to indicate vocab not part of Enlish Language mostly.
Others are Standard POS tags.

### Evaluation
We wish to evaluate based on 
- coding practices being followed
- commenting to explain the code and logic behind doing something
- your understanding and explanation of data
- how good the model would perform on unseen data.

### Train-Test Split
Let us use a 80-20 split of our data for training and evaluation purpose.



In [None]:
#Import libraries
import os
import random
import numpy as np
import networkx as nx
from itertools import chain
import matplotlib.pyplot as plt
import matplotlib.image as mplimg
from IPython.core.display import HTML
from collections import Counter, defaultdict, namedtuple, OrderedDict
from pomegranate import State, HiddenMarkovModel, DiscreteDistribution

In [None]:
#functions to Read data
Sentence = namedtuple("Sentence", "words tags")

def read_data(filename):
    """Read tagged sentence data"""
    with open(filename, 'r') as f:
        sentence_lines = [l.split("\n") for l in f.read().split("\n\n")]
    return OrderedDict(((s[0], Sentence(*zip(*[l.strip().split("\t")
                        for l in s[1:]]))) for s in sentence_lines if s[0]))

def read_tags(filename):
    """Read a list of word tag classes"""
    with open(filename, 'r') as f:
        tags = f.read().split("\n")
    return frozenset(tags)

class Subset(namedtuple("BaseSet", "sentences keys vocab X tagset Y N stream")):
    def __new__(cls, sentences, keys):
        word_sequences = tuple([sentences[k].words for k in keys])
        tag_sequences = tuple([sentences[k].tags for k in keys])
        wordset = frozenset(chain(*word_sequences))
        tagset = frozenset(chain(*tag_sequences))
        N = sum(1 for _ in chain(*(sentences[k].words for k in keys)))
        stream = tuple(zip(chain(*word_sequences), chain(*tag_sequences)))
        return super().__new__(cls, {k: sentences[k] for k in keys}, keys, wordset, word_sequences,
                               tagset, tag_sequences, N, stream.__iter__)

    def __len__(self):
        return len(self.sentences)

    def __iter__(self):
        return iter(self.sentences.items())


class Dataset(namedtuple("_Dataset", "sentences keys vocab X tagset Y training_set testing_set N stream")):
    def __new__(cls, tagfile, datafile, train_test_split=0.8, seed=46675):
        tagset = read_tags(tagfile)
        sentences = read_data(datafile)
        keys = tuple(sentences.keys())
        wordset = frozenset(chain(*[s.words for s in sentences.values()]))
        word_sequences = tuple([sentences[k].words for k in keys])
        tag_sequences = tuple([sentences[k].tags for k in keys])
        N = sum(1 for _ in chain(*(s.words for s in sentences.values())))
        
        # split data into train/test sets
        _keys = list(keys)
        if seed is not None: random.seed(seed)
        random.shuffle(_keys)
        split = int(train_test_split * len(_keys))
        training_data = Subset(sentences, _keys[:split])
        testing_data = Subset(sentences, _keys[split:])
        stream = tuple(zip(chain(*word_sequences), chain(*tag_sequences)))
        return super().__new__(cls, dict(sentences), keys, wordset, word_sequences, tagset,
                               tag_sequences, training_data, testing_data, N, stream.__iter__)

    def __len__(self):
        return len(self.sentences)

    def __iter__(self):
        return iter(self.sentences.items())

### Data Description

1. Training Data = 80% , Test Data = 20%
2. List of POS Tags are present in the file `all-tags.txt`.
3. The input file has been downloaded and saved as `data.txt`.
4. Basic stats about input file data.txt

In [None]:
data = Dataset("all-tags.txt", "data.txt", train_test_split=0.8)
print("There are {} sentences in the corpus.".format(len(data)))
print("There are {} sentences in the training set.".format(len(data.training_set)))
print("There are {} sentences in the testing set.".format(len(data.testing_set)))

In [None]:
print("There are a total of {} samples of {} unique words in the corpus.".format(data.N, len(data.vocab)))
print("There are {} samples of {} unique words in the training set.".format(data.training_set.N, len(data.training_set.vocab)))
print("There are {} samples of {} unique words in the testing set.".format(data.testing_set.N, len(data.testing_set.vocab)))
print("There are {} words in the test set that are missing in the training set.".format(len(data.testing_set.vocab - data.training_set.vocab)))

## Sentences
- the input file is read into a dictionary where the first identifier is dictionary key. `Example (b100-38532)`
- for each key the value is a tuple of (words,tag) for that sentence. Example the value for key `b100-38532` is
- (words=('Perhaps', 'it', 'was', 'right', ';', ';'), tags=('ADV', 'PRON', 'VERB', 'ADJ', '.', '.'))
- `Dataset.sentences` is a dictionary of all sentences in the training set from `data.txt`


In [None]:
# Example 
key = 'b100-5507'
print(f"Sentence: {key}")
print(f"words:\n\t{data.sentences[key].words}")
print(f"tags:\n\t{data.sentences[key].tags}")

### Helper functions 
- calculate unigram , bigram count
- Emission Probability
- Transition Probability

In [None]:
def pair_counts(sequences_A, sequences_B):
    dcount = defaultdict(dict) 
    d = dict(Counter(list(zip(sequences_A, sequences_B))))
    for key,value in d.items():
        dcount[key[0]][key[1]] = value
    return dcount

tags = [tag for word, tag in data.stream()]
words = [word for word, tag in data.stream()]
## Emission_count is a dictionary with Key as POS tags and value as a dictionary with key as word and value as count
## Example {"VERB" : {'had': 5101,'thanked': 6} ...}
## Emission count is used for calculating Emission Probabilities example: what is probability of "had" being a "VERB"
emission_counts = pair_counts(tags, words)

## Unigram Count
def unigram_counts(sequences):
    return(dict(Counter(sequences)))

tag_unigrams = unigram_counts(tags)

## Bigram Count 
import itertools
def pairwise(iterable):
    t, t_1 = itertools.tee(iterable)
    next(t_1, 'end')
    return zip(t, t_1)
def bigram_counts(sequences):
    return(dict(Counter(pairwise(sequences))))

## Used to calculate transition probabilities 
## gives count of sequences like (NOUN,NOUN) , (NOUN,VERB) etc
tag_bigrams = bigram_counts(tags)

## Count of each sentence starting with a POS (Calculating Transtition Probability for (<start>,POS))
def starting_counts(sequences):
    return(dict(Counter([i[0] for i in sequences])))
tags_label= data.Y
tag_starts = starting_counts(tags_label)

## Count of each sentence ending with a POS (Calculating Transtition Probability for (POS,<END>))
def ending_counts(sequences):
    return(dict(Counter([i[-1] for i in sequences])))

tag_ends = ending_counts(tags_label)

In [None]:
## Word Count for each POS
print("There are {} words as {} ".format(tag_unigrams['NOUN'],'NOUN'))
print("There are {} words as {} ".format(tag_unigrams['ADJ'],'ADJ'))
print("There are {} words as {} ".format(tag_unigrams['ADV'],'ADV'))
print("There are {} words as {} ".format(tag_unigrams['ADP'],'ADP'))
print("There are {} words as {} ".format(tag_unigrams['CONJ'],'CONJ'))
print("There are {} words as {} ".format(tag_unigrams['DET'],'DET'))
print("There are {} words as {} ".format(tag_unigrams['NUM'],'NUM'))
print("There are {} words as {} ".format(tag_unigrams['PRON'],'PRON'))
print("There are {} words as {} ".format(tag_unigrams['PRT'],'PRT'))
print("There are {} words as {} ".format(tag_unigrams['VERB'],'VERB'))
print("There are {} words as {} ".format(tag_unigrams['X'],'X'))

### HMM Model 

In [None]:
basic_model = HiddenMarkovModel(name="base-hmm-tagger")
tags = [tag for _, tag in data.stream()]
words = [word for word, _ in data.stream()]
emission_counts = pair_counts(tags, words)
states = {}

for pos_tag in data.tagset:
    emission_probabilities = dict()
    
    for word, occurance in emission_counts[pos_tag].items(): 
        emission_probabilities[word] = occurance / tag_unigrams[pos_tag] 

    tag_distribution = DiscreteDistribution(emission_probabilities) 
    state = State(tag_distribution, name=pos_tag)
    states[pos_tag] = state
    basic_model.add_state(state)

In [None]:
for pos_tag in data.tagset:
    state = states[pos_tag]
    tags_label= data.Y
    tag_starts = starting_counts(tags_label)
    tag_ends = ending_counts(tags_label)
    start_probability = tag_starts[pos_tag] / sum(tag_starts.values())
    basic_model.add_transition(basic_model.start, state, start_probability)
    end_probability = tag_ends[pos_tag] / sum(tag_ends.values())
    basic_model.add_transition(state, basic_model.end, end_probability)

for tag_1 in data.tagset:
    state_1 = states[tag_1]
    sum_of_probabilities = 0
    for tag_2 in data.tagset:
        state_2 = states[tag_2]
        bigram = (tag_1, tag_2)
        transition_probability = tag_bigrams[bigram] / tag_unigrams[tag_1]
        sum_of_probabilities += transition_probability
        basic_model.add_transition(state_1, state_2, transition_probability)    
    
basic_model.bake()
print("Number of nodes or states: ", basic_model.node_count())
print("Number of edges: ", basic_model.edge_count())

### Model Accuracy Evaluation

In [None]:
def replace_unknown(sequence):
    return [w if w in data.training_set.vocab else 'nan' for w in sequence]
def simplify_decoding(X, model):
    _, state_path = model.viterbi(replace_unknown(X))
    return [state[1].name for state in state_path[1:-1]]  # do not show the start/end state predictions
def accuracy(X, Y, model):
    correct = total_predictions = 0
    for observations, actual_tags in zip(X, Y):
        try:
            most_likely_tags = simplify_decoding(observations, model)
            correct += sum(p == t for p, t in zip(most_likely_tags, actual_tags))
        except:
            pass
        total_predictions += len(observations)
    return correct / total_predictions

In [None]:
hmm_training_acc = accuracy(data.training_set.X, data.training_set.Y, basic_model)
print("training accuracy basic hmm model: {:.2f}%".format(100 * hmm_training_acc))
hmm_testing_acc = accuracy(data.testing_set.X, data.testing_set.Y, basic_model)
print("testing accuracy basic hmm model: {:.2f}%".format(100 * hmm_testing_acc))