# Hidden Markov model


Best performance: accuracy 94.63% on corpus $fr.ftb$

In [25]:
import json
import numpy as np
import matplotlib.pyplot as plt
import time
from collections import Counter,defaultdict
from pomegranate import State, HiddenMarkovModel, DiscreteDistribution

Import the data sets and choose one as our corpus.

In [26]:
train_set_ftb = json.load(open('./corpus/fr/fr.ftb.train.json', encoding = 'utf-8'))
test_set_ftb = json.load(open('./corpus/fr/fr.ftb.test.json', encoding = 'utf-8'))
train_set_gsd = json.load(open('./corpus/fr/fr.gsd.train.json', encoding = 'utf-8'))
test_set_gsd = json.load(open('./corpus/fr/fr.gsd.test.json', encoding = 'utf-8'))
train_set_partut = json.load(open('./corpus/fr/fr.partut.train.json', encoding = 'utf-8'))
test_set_partut = json.load(open('./corpus/fr/fr.partut.test.json', encoding = 'utf-8'))
train_set_pud = json.load(open('./corpus/fr/fr.pud.train.json', encoding = 'utf-8'))
test_set_pud = json.load(open('./corpus/fr/fr.pud.test.json', encoding = 'utf-8'))
train_set_sequoia = json.load(open('./corpus/fr/fr.sequoia.train.json', encoding = 'utf-8'))
test_set_sequoia = json.load(open('./corpus/fr/fr.sequoia.test.json', encoding = 'utf-8'))
train_set_spoken = json.load(open('./corpus/fr/fr.spoken.train.json', encoding = 'utf-8'))
test_set_spoken = json.load(open('./corpus/fr/fr.spoken.test.json', encoding = 'utf-8'))

test_set_foot = json.load(open('./corpus/fr/fr.foot.test.json', encoding = 'utf-8'))
test_set_natdis = json.load(open('./corpus/fr/fr.natdis.test.json', encoding = 'utf-8'))

**words_and_labels**: Seperate the words and the labels from the corpus.

In [27]:
def words_and_labels(data_set):
    
    words = []
    labels = []
    for sentence,label in data_set:
        for w,l in zip(sentence,label):
            words.append(w)
            labels.append(l)
    
    return words,labels


**ambiguous_words**: Collect ambiguous words from the train set.

In [28]:
def ambiguous_words(train_data,train_label):
    
    words = defaultdict(lambda: set())
    ambiguous_words = []
    
    for word,label in zip(train_data,train_label):
        words[word].add(label)
    
    for word,label in zip(train_data,train_label):
        if(len(words[word]) > 1):
            ambiguous_words.append(word)
    
    return ambiguous_words


**replace_unknown**: Return a copy of the input sequence where each unknown word is replaced by the literal string value 'nan'. Pomegranate will ignore these values during computation.

**simplify_decoding**: Using the viterbi algorithm to decode the input sentence. Return the PoS predicted of all the words, including OOV words and ambiguous words.
    

In [29]:
def replace_unknown(sequence):

    return [w if w in train_data else 'nan' for w in sequence]

def simplify_decoding(X, model):
    
    w_list = replace_unknown(X)
    
    id_oov = []
    id_amb = []
    
    _, state_path = model.viterbi(w_list)
    
    for i in range(len(w_list)):
        if w_list[i] == 'nan':
            id_oov.append(i)
            state_path_oov = state_path
        if w_list[i] in ambiguous_word:
            id_amb.append(i)
            state_path_amb = state_path
    
    state = []
    state_oov = []
    state_amb = []
    for i in range(len(state_path[1:-1])):
        state.append(state_path[i+1][1].name)
        if i in id_oov:
            state_oov.append(state_path[i+1][1].name)
        if i in id_amb:
            state_amb.append(state_path[i+1][1].name)
            
    return state,state_oov,id_oov,state_amb,id_amb

**accuracy**: Calculate the prediction accuracy by using the model to decode each sentence and comparing the prediction with the true labels.

In [30]:
def accuracy(X, Y, model):
    
    correct = total_predictions = 0
    correct_oov = total_predictions_oov = 0
    correct_amb = total_predictions_amb = 0
    for observations, actual_tags in zip(X, Y):
        # The model.viterbi call in simplify_decoding will return None if the HMM
        # raises an error (for example, if a test sentence contains a word that
        # is out of vocabulary for the training set). Any exception counts the
        # full sentence as an error (which makes this a conservative estimate).
        try:
            most_likely_tags,most_likely_tags_oov,id_oov,most_likely_tags_amb,id_amb = simplify_decoding(observations, model)
            actual_tags_oov = list(actual_tags[i] for i in id_oov)
            actual_tags_amb = list(actual_tags[i] for i in id_amb)

            correct_oov += sum(p == t for p, t in zip(most_likely_tags_oov, actual_tags_oov))
            total_predictions_oov += len(most_likely_tags_oov)
            
            correct_amb += sum(p == t for p, t in zip(most_likely_tags_amb, actual_tags_amb))
            total_predictions_amb += len(most_likely_tags_amb)

            correct += sum(p == t for p, t in zip(most_likely_tags, actual_tags))
            total_predictions += len(most_likely_tags)
        except:
            pass
        
    return correct / total_predictions, correct_oov/total_predictions_oov, correct_amb/total_predictions_amb

**pair_counts**: Return a dictionary keyed to each unique value in the first sequence list that counts the number of occurrences of the corresponding value from the second sequences list.

In [31]:
def pair_counts(data_set):
    
    dict_label_word = defaultdict(list)
    dict_word = defaultdict(list)

    for sentence,labels in data_set:
        for word,label in zip(sentence,labels):
            dict_label_word[label].append(word)
    
    for k in dict_label_word.keys():
        dict_word[k] = Counter(dict_label_word[k])
        
    return dict_word

**starting_counts**: Return a dictionary keyed to each unique value in the input sequences list that counts the number of occurrences where that value is at the beginning of a sequence.

In [32]:
def starting_counts(data_set):
    
    dict_starting = defaultdict(list)
    
    lab = set()
    for _,labels in data_set:
        for label in labels:
            lab.add(label)
            
    for label in lab:
        dict_starting[label] = len([labels[0] for _,labels in data_set if labels[0]==label])
        
    return dict_starting

**ending_counts**: Return a dictionary keyed to each unique value in the input sequences list that counts the number of occurrences where that value is at the end of a sequence.

In [33]:
def ending_counts(data_set):
    
    dict_ending = defaultdict(list)
    
    lab = set()
    for _,labels in data_set:
        for label in labels:
            lab.add(label)
            
    for label in lab:
        dict_ending[label] = len([labels[-1] for _,labels in data_set if labels[-1]==label])
    return dict_ending

**unigram_counts**: Return a dictionary keyed to each unique value in the input sequence list that counts the number of occurrences of the value in the sequences list. The sequences collection should be a 2-dimensional array.

In [34]:
def unigram_counts(data_set):
    
    dict_tag = defaultdict(int)
    
    for _,labels in data_set:
        for label in labels:
            dict_tag[label] += 1           
    return dict_tag

**bigram_counts**: Return a dictionary keyed to each unique PAIR of values in the input sequences list that counts the number of occurrences of pair in the sequences list. The input should be a 2-dimensional array.

In [36]:
def bigram_counts(data_set):
    
    dict_bigram = defaultdict(int)
    for _,labels in data_set:
        for i in range(1,len(labels)):
            dict_bigram[(labels[i-1],labels[i])] += 1
            
    return dict_bigram

Extract the data and construct the HMM model:

In [37]:
train_set = train_set_ftb
test_set = test_set_ftb

In [38]:
hmm = HiddenMarkovModel(name="base-hmm-tagger")

In [39]:
label_starts = starting_counts(train_set)
label_ends = ending_counts(train_set)
label_unigrams = unigram_counts(train_set)
label_bigrams = bigram_counts(train_set)

In [40]:
train_data,train_label = words_and_labels(train_set)
ambiguous_word = ambiguous_words(train_data,train_label)
print(len(ambiguous_word))

212787


In [41]:
count_label_and_word = pair_counts(train_set)
states = {}
for label, word_dict in count_label_and_word.items():
    p_words_given_label_state = defaultdict(float)
    # for each label/word, calculate P(word|label)
    for word in word_dict.keys():
        p_words_given_label_state[word] = count_label_and_word[label][word] / label_unigrams[label] 
    # create a new state for each label from the dict of words that represents P(word|label)
    emission = DiscreteDistribution(dict(p_words_given_label_state))
    states[label] = State(emission, name=label)
    
hmm.add_states(list(states.values()))

In [42]:
label_list = set()
for _,labels in train_set:
    for label in labels:
        label_list.add(label)

In [43]:
# Adding start states
for label in label_list:        
    state = states[label]
    hmm.add_transition(hmm.start, state, label_starts[label]/len(label_list))

# Adding end states
for label in label_list: 
    state = states[label]
    hmm.add_transition(state, hmm.end, label_ends[label]/label_unigrams[label])

# Adding pairs
for label1 in label_list: 
    state1 = states[label1]
    for label2 in label_list: 
        state2 = states[label2]
        hmm.add_transition(state1, state2, label_bigrams[(label1, label2)]/label_unigrams[label1])

In [44]:
hmm.bake()

Collect the data from test set and prediction:

In [22]:
def sentences_and_labels(data_set):
    
    sentences = []
    labels = []
    for sentence,label in data_set:
        sentences.append(sentence)
        labels.append(label)
    
    return sentences,labels

test_data,test_label = sentences_and_labels(test_set)

In [23]:
hmm_training_acc = accuracy(test_data,test_label,hmm)

In [45]:
print('test accuracy:',hmm_training_acc[0])
print('oov accuracy:',hmm_training_acc[1])
print('amb accuracy:',hmm_training_acc[2])

test accuracy: 0.9462762554643352
oov accuracy: 0.46537396121883656
amb accuracy: 0.9329699059909885
