In [1]:
import matplotlib.pyplot as plt
import numpy as np

from IPython.core.display import HTML
from itertools import chain
from collections import Counter, defaultdict
from helpers import show_model, Dataset
from pomegranate import State, HiddenMarkovModel, DiscreteDistribution

In [13]:
data = Dataset("tags-universal.txt", "brown-universal.txt", train_test_split=0.8)

# how many times given tag is present
def get_tag_count(tags_sequences):
    tags_counter = Counter()
    for sentence_tags in tags_sequences:
        for tag in sentence_tags:
            tags_counter[tag] += 1
    return tags_counter

single_tag_count = get_tag_count(data.training_set.Y)

# tag -> tag transitions needed for transition prob
def get_tags_pairs(tags_sequences):
    dual_counter = Counter()
    for tags in tags_sequences:
        for i in range(len(tags[:-1])):
            dual_counter[(tags[i], tags[i+1])] += 1
    return dual_counter

paired_tags = get_tags_pairs(data.training_set.Y)

# start/end -> tag transitions needed for transition prob
def get_start_end_tags(tags_sequences):
    start_tags = Counter()
    end_tags = Counter()
    for tags in tags_sequences:
        start_tags[tags[0]] += 1
        end_tags[tags[len(tags)-1]] += 1
    return start_tags, end_tags

start_tags, end_tags = get_start_end_tags(data.training_set.Y)

start_tags_sum = sum(start_tags.values())
end_tags_sum = sum(end_tags.values())

# tag to words mapping needed for emission propability
def get_tags_to_words_mapping(tags_sequence, words_sequence):
    word_tag_counter = defaultdict(Counter)
    for i in range(len(tags_sequence)):
        zipped_data = zip(tags_sequence[i], words_sequence[i])
        for tag, word in zipped_data:
            word_tag_counter[tag][word] += 1
    return word_tag_counter

emission_counts = get_tags_to_words_mapping(data.training_set.Y, data.training_set.X)

In [14]:
model = HiddenMarkovModel(name="hmm-tagger")
# adding emission propabilities ( from states to words ) and start -> tag / tag -> end

state_holder = {}

for tag in emission_counts:
    tag_sum = sum(emission_counts[tag].values())
    tag_emission_prob = {}
    for word in emission_counts[tag]:
        tag_emission_prob[word] = emission_counts[tag][word]/tag_sum
    tag_distr = DiscreteDistribution(tag_emission_prob)
    tag_state = State(tag_distr, name = tag)
    state_holder[tag] = tag_state
    model.add_states(tag_state)
    
    transition_prob = start_tags[tag]/start_tags_sum
    model.add_transition(model.start, tag_state, transition_prob)
    
    transition_prob = end_tags[tag]/end_tags_sum
    model.add_transition(tag_state, model.end, transition_prob)
    
# adding transition propability ( between states )
for _, tag_pair in enumerate(paired_tags):
    transition_prob = paired_tags[tag_pair]/single_tag_count[tag_pair[0]]
    model.add_transition(state_holder[tag_pair[0]], state_holder[tag_pair[1]], transition_prob)

model.bake()

In [17]:
def replace_unknown(sequence):
    """Return a copy of the input sequence where each unknown word is replaced
    by the literal string value 'nan'. Pomegranate will ignore these values
    during computation.
    """
    return [w if w in data.training_set.vocab else 'nan' for w in sequence]

def simplify_decoding(X, model):
    _, state_path = model.viterbi(replace_unknown(X))
    return [state[1].name for state in state_path[1:-1]]  # do not show the start/end state predictions

def accuracy(X, Y, model):
    correct = total_predictions = 0
    for observations, actual_tags in zip(X, Y):
        try:
            most_likely_tags = simplify_decoding(observations, model)
            correct += sum(p == t for p, t in zip(most_likely_tags, actual_tags))
        except Exception as  e:
            print("except ", e)
            pass
        total_predictions += len(observations)
    return correct / total_predictions

In [18]:
hmm_training_acc = accuracy(data.training_set.X, data.training_set.Y, model)
print("training accuracy basic hmm model: {:.2f}%".format(100 * hmm_training_acc))

hmm_testing_acc = accuracy(data.testing_set.X, data.testing_set.Y, model)
print("testing accuracy basic hmm model: {:.2f}%".format(100 * hmm_testing_acc))

training accuracy basic hmm model: 97.54%
testing accuracy basic hmm model: 95.96%
