In [26]:
# Jupyter "magic methods" run once per kernel restart
%load_ext autoreload
%aimport helpers, tests
%autoreload 1

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [27]:
# import python libraries 
import matplotlib.pyplot as plt
import numpy as np

from IPython.core.display import HTML
from itertools import chain, combinations
from collections import Counter, defaultdict
from helpers import show_model, Dataset

# This script uses Hidden Markov Model from pomegranate python library
from pomegranate import State, HiddenMarkovModel, DiscreteDistribution

## Data Preparation

Data sets is from the Brown corpus and the unversal tagset originally from NLTK. The code below will split the data into two sets. %80 of the data will be used as a training set, while the remaining data is the test set. There are approximately 57k sentences in the corpus. 

In [30]:
# Split the data into test and training set
data = Dataset("tags-universal.txt", "brown-universal.txt", train_test_split=0.8)

# How to access the data
assert len(data) == len(data.training_set) + len(data.testing_set), \
       "The number of sentences in the training set + testing set should sum to the number of sentences in the corpus"

# access given sentence       
key = 'b100-50001'
print("Sentence: {}".format(key))
print("words:\n\t{!s}".format(data.sentences[key].words))
print("tags:\n\t{!s}".format(data.sentences[key].tags))

# access unique words
# count unique words
print("There are a total of {} samples of {} unique words in the corpus."
      .format(data.N, len(data.vocab)))
print("There are {} words in the test set that are missing in the training set."
      .format(len(data.testing_set.vocab - data.training_set.vocab)))

assert data.N == data.training_set.N + data.testing_set.N, \
       "The number of training + test samples should sum to the total number of samples"
       
# access words and tags
for i in range(2):    
    print("Sentence {}:".format(i + 1), data.X[i])
    print()
    print("Labels {}:".format(i + 1), data.Y[i])
    print()
    
# access word and tag
# use Dataset.stream() (word, tag) samples for the entire corpus
print("\nStream (word, tag) pairs:\n")
for i, pair in enumerate(data.stream()):
    print("\t", pair)
    if i > 5: break
    
    
print("Total number of sentences in the corpus: {}".format(len(data)))

Sentence: b100-50001
words:
	('Mmmm', ',', 'it', 'sure', 'itches', "''", '.')
tags:
	('PRT', '.', 'PRON', 'ADV', 'VERB', '.', '.')
There are a total of 1161192 samples of 56057 unique words in the corpus.
There are 5521 words in the test set that are missing in the training set.
Sentence 1: ('Mr.', 'Podger', 'had', 'thanked', 'him', 'gravely', ',', 'and', 'now', 'he', 'made', 'use', 'of', 'the', 'advice', '.')

Labels 1: ('NOUN', 'NOUN', 'VERB', 'VERB', 'PRON', 'ADV', '.', 'CONJ', 'ADV', 'PRON', 'VERB', 'NOUN', 'ADP', 'DET', 'NOUN', '.')

Sentence 2: ('But', 'there', 'seemed', 'to', 'be', 'some', 'difference', 'of', 'opinion', 'as', 'to', 'how', 'far', 'the', 'board', 'should', 'go', ',', 'and', 'whose', 'advice', 'it', 'should', 'follow', '.')

Labels 2: ('CONJ', 'PRT', 'VERB', 'PRT', 'VERB', 'DET', 'NOUN', 'ADP', 'NOUN', 'ADP', 'ADP', 'ADV', 'ADV', 'DET', 'NOUN', 'VERB', 'VERB', '.', 'CONJ', 'DET', 'NOUN', 'PRON', 'VERB', 'VERB', '.')


Stream (word, tag) pairs:

	 ('Mr.', 'NOUN')
	 

# Baseline Tagger - Most Frequent Class

One of the simplest way to assign a tag to a word is to check how often certain word (e.g. Jane) is tagged with what lexical category, and assign the most frequent tag to the word. for example is the word *Jane* is mostly tagged as a noun, the 'Most Frequent Class' tagger will tag Jane as a noun. 
This is a simple but effective method. However, if the same word appears in text in more than one lexical category, the tagger perform poorly. For example, take these two sentences: 
 1- **It’s an interesting book. (noun)*
 2- **We ought to book a holiday soon. (verb)**

In [32]:
def pair_counts(sequences_A, sequences_B):
    """Return a dictionary keyed to each unique value in the first sequence list
    that counts the number of occurrences of the corresponding value from the
    second sequences list.
    """
    
    #initiate the counter
    most_freq_PoS = defaultdict(Counter)

    for i in range(len(sequences_A)):
        #combine words with tags per sentence & order, count how many times each tag occurs
        for tag, word in zip(sequences_A[i], sequences_B[i]):
            most_freq_PoS[tag][word] +=1
    
    return most_freq_PoS
    raise NotImplementedError

# Calculate C(t_i, w_i)
emission_counts = pair_counts(data.training_set.Y, data.training_set.X) #returns dict per PoS and corrosponding word + count

assert len(emission_counts) == 12, \
       "Uh oh. There should be 12 tags in your dictionary."
assert max(emission_counts["NOUN"], key=emission_counts["NOUN"].get) == 'time', \
       "Hmmm...'time' is expected to be the most common NOUN."
HTML('<div class="alert alert-block alert-success">Emission counts are calculated!</div>')

In [33]:
# Create a lookup table simple_table where simple_table[word] contains the tag label most frequently assigned to that word
from collections import namedtuple

FakeState = namedtuple("FakeState", "name")

class SimpleTagger:
    # NOTE: You should not need to modify this class or any of its methods
    missing = FakeState(name="<MISSING>")
    
    def __init__(self, table):
        self.table = defaultdict(lambda: SimpleTagger.missing)
        self.table.update({word: FakeState(name=tag) for word, tag in table.items()})
        
    def viterbi(self, seq):
        """This method simplifies predictions by matching the Pomegranate viterbi() interface"""
        return 0., list(enumerate(["<start>"] + [self.table[w] for w in seq] + ["<end>"]))


# calculate the frequency of each tag being assigned to each word and fill the simple_tagger_table

# returns PoS counts for each word
word_counts = pair_counts(data.training_set.X, data.training_set.Y)

#initiate the table
simple_tagger_table = defaultdict()

for word, tag_group in word_counts.items():
    #for each word, record only the highest occuring PoS
    most_common_word = [word for word, count in Counter(tag_group).most_common(1)]
    simple_tagger_table[word] = ''.join(most_common_word)

# Create a Most Frequent Class tagger instance
simple_tagger_model = SimpleTagger(simple_tagger_table) 

assert len(simple_tagger_table) == len(data.training_set.vocab), ""
assert all(k in data.training_set.vocab for k in simple_tagger_table.keys()), ""
assert sum(int(k not in simple_tagger_table) for k in data.testing_set.vocab) == 5521, ""
HTML('<div class="alert alert-block alert-success">Lookup table with words and corresponsing coulds</div>')

## Making predictions with the most frequent class tagger

In [16]:
def replace_unknown(sequence):
    """Return a copy of the input sequence where each unknown word is replaced
    by the literal string value 'nan'. Pomegranate will ignore these values
    during computation.
    """
    return [w if w in data.training_set.vocab else 'nan' for w in sequence]

def simplify_decoding(X, model):
    """X should be a 1-D sequence of observations for the model to predict"""
    _, state_path = model.viterbi(replace_unknown(X))
    return [state[1].name for state in state_path[1:-1]]  # do not show the start/end state predictions

for key in data.testing_set.keys[:2]:
    print("Sentence Key: {}\n".format(key))
    print("Predicted labels:\n-----------------")
    print(simplify_decoding(data.sentences[key].words, simple_tagger_model))
    print()
    print("Actual labels:\n--------------")
    print(data.sentences[key].tags)
    print("\n")

Sentence Key: b100-28144

Predicted labels:
-----------------
['CONJ', 'NOUN', 'NUM', '.', 'NOUN', 'NUM', '.', 'NOUN', 'NUM', '.', 'CONJ', 'NOUN', 'NUM', '.', '.', 'NOUN', '.', '.']

Actual labels:
--------------
('CONJ', 'NOUN', 'NUM', '.', 'NOUN', 'NUM', '.', 'NOUN', 'NUM', '.', 'CONJ', 'NOUN', 'NUM', '.', '.', 'NOUN', '.', '.')


Sentence Key: b100-23146

Predicted labels:
-----------------
['PRON', 'VERB', 'DET', 'NOUN', 'ADP', 'ADJ', 'ADJ', 'NOUN', 'VERB', 'VERB', '.', 'ADP', 'VERB', 'DET', 'NOUN', 'ADP', 'NOUN', 'ADP', 'DET', 'NOUN', '.']

Actual labels:
--------------
('PRON', 'VERB', 'DET', 'NOUN', 'ADP', 'ADJ', 'ADJ', 'NOUN', 'VERB', 'VERB', '.', 'ADP', 'VERB', 'DET', 'NOUN', 'ADP', 'NOUN', 'ADP', 'DET', 'NOUN', '.')




## Model Evaluation

Evaluate the accuracy of the model on the full corpus

In [17]:
def accuracy(X, Y, model):
    """Calculate the prediction accuracy by using the model to decode each sequence
    in the input X and comparing the prediction with the true labels in Y.
    
    The X should be an array whose first dimension is the number of sentences to test,
    and each element of the array should be an iterable of the words in the sequence.
    The arrays X and Y should have the exact same shape.
    
    X = [("See", "Spot", "run"), ("Run", "Spot", "run", "fast"), ...]
    Y = [(), (), ...]
    """
    correct = total_predictions = 0
    for observations, actual_tags in zip(X, Y):
        
        # The model.viterbi call in simplify_decoding will return None if the HMM
        # raises an error (for example, if a test sentence contains a word that
        # is out of vocabulary for the training set). Any exception counts the
        # full sentence as an error (which makes this a conservative estimate).
        try:
            most_likely_tags = simplify_decoding(observations, model)
            correct += sum(p == t for p, t in zip(most_likely_tags, actual_tags))
        except:
            pass
        total_predictions += len(observations)
    return correct / total_predictions

simple_tagger_training_acc = accuracy(data.training_set.X, data.training_set.Y, simple_tagger_model)
print("training accuracy simple_model: {:.2f}%".format(100 * simple_tagger_training_acc))

simple_tagger_testing_acc = accuracy(data.testing_set.X, data.testing_set.Y, simple_tagger_model)
print("testing accuracy simple_tagger: {:.2f}%".format(100 * simple_tagger_testing_acc))

assert simple_tagger_training_acc >= 0.955, "Most frequent count tagger accuracy on the training set doesn't look right."
assert simple_tagger_testing_acc >= 0.925, "Most frequent count tagger accuracy on the testing set doesn't look right."

training accuracy simple_model: 95.72%
testing accuracy simple_tagger: 93.01%


## Hidden Markov Model Tagger 

In [18]:
def unigram_counts(sequences):
    """Return a dictionary keyed to each unique value in the input sequence list that
    counts the number of occurrences of the value in the sequences list. The sequences
    collection should be a 2-dimensional array.
    
    For example, if the tag NOUN appears 275558 times over all the input sequences,
    then you should return a dictionary such that your_unigram_counts[NOUN] == 275558.
    """
    #initiate a table
    unigram_table = defaultdict(int)
    
    #iterate over each sentence of tags
    for sentence in sequences:
        #count # of occurances of each tag in a sentence
        for tag in sentence:
            unigram_table[tag] +=1
            
    return unigram_table
    raise NotImplementedError

# Call unigram counts on training set
tag_unigrams = unigram_counts(data.training_set.Y)

# Check if tags are assigned correctly
assert set(tag_unigrams.keys()) == data.training_set.tagset, \
       "Tag counts doesn't include all the tags!"
assert min(tag_unigrams, key=tag_unigrams.get) == 'X', \
       "Classes arent assigned accurately. 'X' is expected to be the least common class"
assert max(tag_unigrams, key=tag_unigrams.get) == 'NOUN', \
       "Classes arent assigned accurately.'NOUN' is expected to be the most common class"
HTML('<div class="alert alert-block alert-success">Unigrams correct!</div>')

In [19]:
def bigram_counts(sequences):
    """Return a dictionary keyed to each unique PAIR of values in the input sequences
    list that counts the number of occurrences of pair in the sequences list. The input
    should be a 2-dimensional array.
    
    For example, if the pair of tags (NOUN, VERB) appear 61582 times, then you should
    return a dictionary such that your_bigram_counts[(NOUN, VERB)] == 61582
    """
    pair = []

    # iterate over tags for all sentences
    for i in range(len(sequences)):
        sentence = sequences[i]
        #iterate over each sentence
        for j in range(len(sentence)-1):
            #create a pair combination for tags in each sentece
            pair.append((sentence[j],sentence[j+1]))

    return Counter(pair)
    raise NotImplementedError

# Call bigram counts on training set
tag_bigrams = bigram_counts(data.training_set.Y)

# Check if tags are assigned correctly
assert len(tag_bigrams) == 144, \
       "Uh oh. There should be 144 pairs of bigrams (12 tags x 12 tags)"
assert min(tag_bigrams, key=tag_bigrams.get) in [('X', 'NUM'), ('PRON', 'X')], \
       "Classes arent assigned accurately. The least common bigram should be one of ('X', 'NUM') or ('PRON', 'X')."
assert max(tag_bigrams, key=tag_bigrams.get) in [('DET', 'NOUN')], \
       "Classes arent assigned accurately. ('DET', 'NOUN') is expected to be the most common bigram."
HTML('<div class="alert alert-block alert-success">Bigrams are correct!</div>')

Calculate the bigram probability of sequence starting with each tag

In [20]:
def starting_counts(sequences):
    """Return a dictionary keyed to each unique value in the input sequences list
    that counts the number of occurrences where that value is at the beginning of
    a sequence.
    
    For example, if 8093 sequences start with NOUN, then you should return a
    dictionary such that your_starting_counts[NOUN] == 8093
    """
    # TODO: Finish this function!
    first_tag = []

    for sentence in sequences:
        first_tag.append(sentence[0])
        
    return(Counter(first_tag))

    raise NotImplementedError

def ending_counts(sequences):
    """Return a dictionary keyed to each unique value in the input sequences list
    that counts the number of occurrences where that value is at the end of
    a sequence.
    
    For example, if 18 sequences end with DET, then you should return a
    dictionary such that your_starting_counts[DET] == 18
    """
    # TODO: Finish this function!
    last_tag = []

    for sentence in sequences:
        last_tag.append(sentence[len(sentence)-1])
        
    return(Counter(last_tag))
    raise NotImplementedError

# Calculate the count of each tag starting a sequence
tag_starts = starting_counts(data.training_set.Y)
# Check if tags are assigned correctly
assert len(tag_starts) == 12, "Uh oh. There should be 12 tags in your dictionary."
assert min(tag_starts, key=tag_starts.get) == 'X', "Classes arent assigned accurately.'X' is expected to be the least common starting bigram."
assert max(tag_starts, key=tag_starts.get) == 'DET', "Classes arent assigned accurately.'DET' is expected to be the most common starting bigram."
HTML('<div class="alert alert-block alert-success">Starting tag counts are accurate!</div>')

# Calculate the count of each tag ending a sequence
tag_ends = ending_counts(data.training_set.Y)

assert len(tag_ends) == 12, "Uh oh. There should be 12 tags in your dictionary."
assert min(tag_ends, key=tag_ends.get) in ['X', 'CONJ'], "Classes arent assigned accurately.'X' or 'CONJ' should be the least common ending bigram."
assert max(tag_ends, key=tag_ends.get) == '.', "Classes arent assigned accurately.'.' is expected to be the most common ending bigram."
HTML('<div class="alert alert-block alert-success">Ending tag counts are accurate!</div>')



In [21]:
basic_model = HiddenMarkovModel(name="base-hmm-tagger")

unique_tags = data.training_set.tagset # get unique tags
tag_unigrams = unigram_counts(data.training_set.Y) # get tag counts
emission_counts = pair_counts(data.training_set.Y, data.training_set.X) # get possible words per tag
tag_bigrams = bigram_counts(data.training_set.Y) # count of two PoS together
states = dict()

# calculate emission probabilities
for tag in unique_tags:

    emission_prob_table = dict()
    # emission prob for each word per tag
    for observation, count in emission_counts[tag].items():
            emission_prob_table[observation] = count / tag_unigrams[tag]
       
    # emission probability distributions
    word_tag_emissions = DiscreteDistribution(emission_prob_table)
    word_tag_state = State(word_tag_emissions, name=tag)
    states[tag] = word_tag_state
       
    # add one stage per tag
    basic_model.add_states(word_tag_state)       

# add edges between states for the observed transition frequencies P(tag_i | tag_i-1)

# calculate start and end state probabilities
start_probs_table = dict()
for tag in unique_tags:
    # calculate the probabilities for starting state
    start_probs_table = tag_starts[tag]/ len(tag_starts)
       
    # add all possible states from the start
    basic_model.add_transition(basic_model.start, states[tag] ,start_probs_table)
       
    # calculate probability for end state
    end_probs_table = tag_ends[tag] / tag_unigrams[tag]
    basic_model.add_transition(states[tag], basic_model.end, end_probs_table)

# calculate transitional probabilities
transition_probs_table = dict()
for tag1, tag2 in tag_bigrams:
    transition_probs_table =  tag_bigrams[tag1, tag2] / tag_unigrams[tag1]
    basic_model.add_transition(states[tag1], states[tag2], transition_probs_table)
      
# finalize the model
basic_model.bake()

assert all(tag in set(s.name for s in basic_model.states) for tag in data.training_set.tagset), \
       "Every state in your network should use the name of the associated tag, which must be one of the training set tags."
assert basic_model.edge_count() == 168, \
       ("Your network should have an edge from the start node to each state, one edge between every " +
        "pair of tags (states), and an edge from each state to the end node.")
HTML('<div class="alert alert-block alert-success">HMM network topology is good!</div>')

## Evaluate the accuracy of the HMM model

In [22]:
hmm_training_acc = accuracy(data.training_set.X, data.training_set.Y, basic_model)
print("training accuracy basic hmm model: {:.2f}%".format(100 * hmm_training_acc))

hmm_testing_acc = accuracy(data.testing_set.X, data.testing_set.Y, basic_model)
print("testing accuracy basic hmm model: {:.2f}%".format(100 * hmm_testing_acc))

assert hmm_training_acc > 0.97, "HMM accuracy on the training set is not correct."
assert hmm_testing_acc > 0.955, "HMM accuracy on the testing set is not correct."
HTML('<div class="alert alert-block alert-success">HMM tagger accuracy looks correct!</div>')

training accuracy basic hmm model: 97.54%
testing accuracy basic hmm model: 95.95%


Check the assign tag vs the actual tag

In [23]:
for key in data.testing_set.keys[:2]:
    print("Sentence Key: {}\n".format(key))
    print("Predicted labels:\n-----------------")
    print(simplify_decoding(data.sentences[key].words, basic_model))
    print()
    print("Actual labels:\n--------------")
    print(data.sentences[key].tags)
    print("\n")

Sentence Key: b100-28144

Predicted labels:
-----------------
['CONJ', 'NOUN', 'NUM', '.', 'NOUN', 'NUM', '.', 'NOUN', 'NUM', '.', 'CONJ', 'NOUN', 'NUM', '.', '.', 'NOUN', '.', '.']

Actual labels:
--------------
('CONJ', 'NOUN', 'NUM', '.', 'NOUN', 'NUM', '.', 'NOUN', 'NUM', '.', 'CONJ', 'NOUN', 'NUM', '.', '.', 'NOUN', '.', '.')


Sentence Key: b100-23146

Predicted labels:
-----------------
['PRON', 'VERB', 'DET', 'NOUN', 'ADP', 'ADJ', 'ADJ', 'NOUN', 'VERB', 'VERB', '.', 'ADP', 'VERB', 'DET', 'NOUN', 'ADP', 'NOUN', 'ADP', 'DET', 'NOUN', '.']

Actual labels:
--------------
('PRON', 'VERB', 'DET', 'NOUN', 'ADP', 'ADJ', 'ADJ', 'NOUN', 'VERB', 'VERB', '.', 'ADP', 'VERB', 'DET', 'NOUN', 'ADP', 'NOUN', 'ADP', 'DET', 'NOUN', '.')


