# TC4 - Lab exercises 2

In this notebook, we are going to improve the POS tagger of last week. Instead of using a naive Bayes classifier, we will rely on a HMM where:
- the hidden states are POS tags
- the observations are words

It is a first order HMM where probabilities are defined as follows:
$$
p(y_1...y_n, x_1...x_n) = p(y_1) \prod_{i=2}^n p(y_i | y_{i-1}) \prod_{i=1}^n p(x_i | y_i)
$$

In [41]:
import nltk
import numpy as np
import sys
import math
from collections import defaultdict
from sklearn.model_selection import train_test_split

# 1. Data

We first need to load and split the data between train and test data. You need to report the code from last week with the same split (90% train / 10% test).

In [2]:
# import dataset
data = nltk.corpus.brown.tagged_sents(tagset='universal')

In [4]:
train_data, test_data = train_test_split(data, test_size = 0.1)

In [5]:
len(train_data), len(test_data)

(51606, 5734)

We will store the parameters of the HMM in numpy arrays. Therefore, to simplify the model, we rely on a dictionnary that maps words to tokens:
- the constructor takes as argument an iteratable over strings (e.g. list of strings) containing the vocabulary to store in the dictionnary
- you can set unk="\*UNK\*" to add entry for unknown strings (do not do it for POS tags!)
- len(dict) gives you the numbers of entry in the dict
- str_to_id maps a string to an index
- id_to_str gives you the string stored at a given index

In [6]:
class Dict:
    def __init__(self, words, unk=None):
        self._unk = unk
        self._word_to_id = dict()
        self._id_to_word = list()

        if unk in words:
            raise RuntimeError("UNK word exists in vocabulary")

        if unk is not None:
            self.unk_index = self._add_word(unk)

        for word in words:
            self._add_word(word)

    # for internal use only!
    def _add_word(self, word):
        if word not in self._word_to_id:
            id = len(self._id_to_word)
            self._word_to_id[word] = id
            self._id_to_word.append(word)
            return id
        else:
            return self._word_to_id[word]

    def str_to_id(self, word):
        if self._unk is not None:
            return self._word_to_id.get(word, self.unk_index)
        else:
            return self._word_to_id[word]

    def id_to_str(self, id):
        return self._id_to_word[id]

    def __len__(self):
        return len(self._word_to_id)

    def has_unk(self):
        return self._unk is not None
    
    def unk(self):
        return self.unk_index

Example:

In [216]:
test_dict = Dict(["a", "b", "c"], unk="*UNK*")
print("N. entry: ", len(test_dict))
print("Index of \"b\":", test_dict.str_to_id("a"))
# the following line does not throw an error because we gave a unk word
print("Index of \"e\":", test_dict.str_to_id("e"))

N. entry:  4
Index of "b": 1
Index of "e": 0


We now build the dictionnary of words and tags. We will restrict the dictionnary of words to contain only words that appears 10 or more times in the training data (use the code of last time).

For the dictionnary of words, set the unk parameters to any string you want. For the dictionnary of POS tags, do not set an unk word!

In [22]:
distribution_per_word = {}
distribution_per_word_correct = {}

for s in train_data:
    for w, tag in s:
        if w in distribution_per_word : 
            if tag in distribution_per_word[w]:
                distribution_per_word[w][tag]+=1
            else:
                distribution_per_word[w][tag] = 1
        else:
            distribution_per_word[w]={}
            distribution_per_word[w][tag] = 1

for w in distribution_per_word:
    total = sum(distribution_per_word[w].values())
    if (total>=10): 
        distribution_per_word_correct[w] = distribution_per_word[w]
        

In [34]:
tmp = [list(x.keys()) for x in distribution_per_word_correct.values()]

tags = [item for sublist in tmp for item in sublist]

word_dict = Dict(distribution_per_word_correct.keys(), unk = "**UNK**")
tags_dict = Dict(tags)


# 2. Hidden Markov Model

The HMM class is a simple container for:
- the dictionnary of hidden states y_dict (i.e. dictionnary of tags)
- the dictionnary of observations x_dict (i.e. dictionnary of words)
- the parameters of the HMM:
    * init_prob $\in \mathbb R^{|Y|}$: initial tag probabilities $p(y_0) = init\_prob[y_0]$
    * transition_prob $\in \mathbb R^{|Y| \times |Y|}$: tag transition probabilities $p(y_i | y_{i - 1}) = transition\_prob[y_{i - 1}, y_i]$
    * observation_prob $\in \mathbb R^{|Y| \times |X|}$: observation probabilities $p(x_i | y_i) = observation\_prob[y_i, x_i]$

In [35]:
class HMM:
    def __init__(self, y_dict, x_dict):
        if not isinstance(y_dict, Dict) or not isinstance(x_dict, Dict):
            raise RuntimeError("Arguments must be of type Dict")

        self.y_dict = y_dict
        self.x_dict = x_dict

        n_y = len(y_dict)
        n_x = len(x_dict)
        self.init_prob = np.zeros((n_y,), float) 
        self.transition_prob = np.zeros((n_y, n_y), float) 
        self.observation_prob = np.zeros((n_y, n_x), float) 

## 2.1 Learning

Compute the matrices of probabilities hmm.init_prob, hmm.observation_prob and hmm.transition_prob from the data.

You **must** smooth the distributions!

In [66]:
hmm = HMM(tags_dict, word_dict)

###init prob###
for sent in train_data:
    tag = sent[0][1]
    id_tag = tags_dict.str_to_id(tag)
    hmm.init_prob[id_tag]+=1
hmm.init_prob+=1
hmm.init_prob/=(len(tags_dict)+len(train_data))


###transition prob###
d_tag = defaultdict(int)
for sent in train_data:
    for i in range(1,len(sent)):
        cur_tag = sent[i][1]
        pred_tag = sent[i-1][1]
        id_cur_tag = tags_dict.str_to_id(cur_tag)
        id_pred_tag = tags_dict.str_to_id(pred_tag)
        hmm.transition_prob[id_pred_tag][id_cur_tag]+=1
        d_tag[id_pred_tag]+=1
        
hmm.transition_prob+=1
for id_tag in d_tag:
    hmm.transition_prob[id_tag,:]/=(d_tag[id_tag]+len(tags_dict))   
    
###observation prob###
d_tag = defaultdict(int)
for sent in train_data:
    for i in range(len(sent)):
        cur_tag = sent[i][1]
        cur_w = sent[i][0]
        id_cur_tag = tags_dict.str_to_id(cur_tag)
        id_cur_w = word_dict.str_to_id(cur_w)
        hmm.observation_prob[id_cur_tag][id_cur_w]+=1
        d_tag[id_cur_tag]+=1
        
hmm.observation_prob+=1
for id_tag in d_tag:
    hmm.observation_prob[id_tag,:]/=(d_tag[id_tag]+len(word_dict))   

The following three cells check that the distribution you computed correctly sum to one. The first cell should output 1.0, the two others should output array containing twelve times the number 1.

In [40]:
hmm.init_prob.sum()

1.0

In [68]:
hmm.transition_prob.sum(1)

array([1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.])

In [67]:
hmm.observation_prob.sum(1)

array([1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.])

# 2.2. Viterbi

Implement the viterbi **without** computing in the log domain. What tagging accuracy do you achieve? How is it compared to the naive Bayes model of last week?

In [265]:
def viterbi(hmm, words):
    """
    Input:
    - hmm: an HMM object
    - words: a list of words (ie a sentence)
    Return:
    - a list of POS tags
    """

    chart = np.zeros((len(hmm.y_dict), len(words)), float)
    backpointer = np.zeros((len(hmm.y_dict), len(words)), float)
    for i in range(len(hmm.y_dict)):
        id_w0 = word_dict.str_to_id(words[0])
        chart[i,0] = hmm.init_prob[i]*hmm.observation_prob[i,id_w0]
        
    for i in range(1,len(words)):
        for j in range(len(hmm.y_dict)):
            b_score = -1.0
            for k in range(len(hmm.y_dict)):
                score = hmm.transition_prob[j,k]*hmm.observation_prob[j,i]*chart[k,i-1]
                if(score>b_score):
                    chart[j,i] = score
                    b_score = score
                    backpointer[j,i] = k
    #print(chart)           
    y = np.zeros(len(words))
    y[len(words)-1] = np.argmax(chart[:,len(words)-1], axis=0)
    
    print(chart)
    print(y)
    for j in range(1,len(words))[::-1]:
        y[j-1] = backpointer[int(y[j-2]),j]
    pred = [tags_dict.id_to_str(int(i)) for i in (y)]
    print(y)
    return pred

In [266]:
tags_dict.id_to_str(0)

'NOUN'

In [267]:
viterbi(hmm,['the', 'cat', 'is', 'black'])

[[5.48827851e-07 7.31243131e-08 2.89123138e-13 4.94855334e-16]
 [8.83851784e-07 3.02494725e-07 5.17741780e-10 7.56597151e-17]
 [2.63409992e-07 8.67638411e-08 2.96406369e-13 4.57502182e-12]
 [2.43788845e-07 5.28413238e-08 2.82911163e-12 5.94298264e-15]
 [9.20687901e-02 4.11947683e-09 3.47470702e-13 3.58938928e-17]
 [1.04005494e-06 2.21016739e-07 1.53197410e-12 1.33457129e-15]
 [4.06692240e-07 6.49123430e-09 5.70877953e-13 5.47216258e-16]
 [6.25914760e-07 7.20940783e-08 2.59895280e-13 3.88644654e-16]
 [1.14848182e-06 3.26308767e-07 5.22778481e-13 8.94773494e-16]
 [1.54934759e-06 1.14779890e-07 7.33280342e-13 1.25506278e-15]
 [3.05186913e-06 3.02629370e-08 1.16686735e-12 5.51200807e-16]
 [7.71170388e-07 5.69367132e-08 1.82886976e-12 3.13024397e-15]]
[0. 0. 0. 2.]
[4. 1. 1. 2.]


['DET', 'ADP', 'ADP', 'VERB']

In [243]:
# Evaluate the HMM using the viterbi
n_tags = 0
n_correct_tags = 0
for sentence in test_data:
    words = [w for w, t in sentence]
    pred = viterbi(hmm, words)
    n_tags += len(sentence)
    n_correct_tags += sum(1 for w in range(len(sentence))  if sentence[w][1] == pred[w])

print("Tagging accuract: %.2f" % (100 * n_correct_tags / n_tags))

Tagging accuract: 20.93


# 2.3. Viterbi in the log domain

Copy/paste you code from the previous cell and change it to compute in the log domain. What tagging accuracy do you achieve? How is it compared to the naive Bayes model of last week and to the previous implementation of the viterbi?

In [None]:
def viterbi_log(hmm, words):
    """
    Input:
    - hmm: an HMM object
    - words: a list of words (ie a sentence)
    Return:
    - a list of POS tags
    """

    return pred

In [None]:
# Evaluate the HMM using the viterbi in the log domain

n_tags = 0
n_correct_tags = 0
for sentence in test_data:
    words = [w for w, t in sentence]
    pred = viterbi_log(hmm, words)
    n_tags += len(sentence)
    n_correct_tags += sum(1 for w in range(len(sentence))  if sentence[w][1] == pred[w])

print("Tagging accuract: %.2f" % (100 * n_correct_tags / n_tags))

# 3. Marginalization

As a last exercise, implement function that evaluate the probability of a sequence of words and a sequence of hidden states given a HMM.

In [None]:
def probabilit_y(hmm, tags):
    #TODO...

In [None]:
tags = "DET NOUN VERB DET ADJ NOUN .".split()
print(probabilit_y(hmm, tags))
random.shuffle(tags)
print(probabilit_y(hmm, tags))

In [None]:
def probabilit_x(hmm, words):
    # TODO

In [None]:
sentence = "This is a sentence .".split()
print(probabilit_x(hmm, sentence))
random.shuffle(sentence)
print(probabilit_x(hmm, sentence))