# Assignment 11  
Implement Hidden Markov Model with supervised training and Viterbi algorithm for finding the most probable sequence of hidden states. Use [Laplace smoothing](https://en.wikipedia.org/wiki/Additive_smoothing) for estimation of probabilities. Apply the developed model to the problems:
* part of speech tagging
* spelling correction

In [1]:
import numpy as np
from scipy import sparse
from tqdm import tqdm
from IPython.display import Image
from collections import defaultdict
from itertools import product

HMM model for 3-grams:

$P(x_1, .., x_T, y_1, .., y_T, y_{T+1}) = \prod_{t=1}^{T+1} q(y_t | y_{t-2}, y_{t-1}) \prod_{t=1}^T e(x_t | y_t)$

$x_1, .., x_T$ - sequence of observed states of length T  
$y_1, .., y_T$ - sequence of hidden states of length T  
$q(i | u, v) = \frac {count(u, v, i)} {count(u, v)} $ - transition probability   
$e(x_k | i) = \frac {count(i, x_k)} {count(i)}$ - emission probability  
$A_{i,j} = A_{(u,v), j} = q(i | u, v)$ - transition matrix  
$B_{j,k} = e(x_k | j)$ - emission matrix  

We assume, that $y_{T+1} = TERM$, and $y_0 = y_{-1} = START$

In [35]:
class HMM:
    START = '*'
    TERM = '$'
    REST = '$REST$' # to deal with observed states who have never appeared in train dataset.
        
    def cond_idx(self, u, v):
        return u + v*self.h_dim
        
    def fit(self, X, y):
        """
        X - list of lists, observed states
        y - list of lists, hidden states
        estimate elements of A, B matrices from train sequence. 
        """
        
        X_flat = [xx for yy in X for xx in yy]
        y_flat = [xx for yy in y for xx in yy]
        
        self.hidden_idx2state = list(set(y_flat)) + [self.START, self.TERM]
        self.hidden_states = {x:i for i,x in enumerate(self.hidden_idx2state)}
        self.h_dim = len(self.hidden_idx2state)

        self.observed_idx2state = list(set(X_flat)) + [self.REST]
        self.observed_states = {x:i for i,x in enumerate(self.observed_idx2state)}
        self.o_dim = len(self.observed_idx2state)
        
        #######################
        # YOUR CODE HERE
        
        # self.hidden_idx2state = # lisf of unique hidden states in train sequence + [START, TERM]
        # self.hidden_states = # from state name to state index in hidden_idx2state
        # self.h_dim = number of hidden states
        
        # self.observed_idx2state = # lisf of unique observed states in train sequence + [REST]
        # self.observed_states = # from state name to state index in hidden_idx2state
        # self.o_dim = number of observed states
        
        #######################       
        
        ########################
        # transition matrix
        # YOUR CODE HERE
        # self.A = dense matrix of shape (h_dim **2, h_dim)
        # remember about padding for sequence of hidden states, eg {a, b} -> {START, START, a, b, TERM}
        ########################        
        
        #######################
        # estimate emission matrix
        # YOUR CODE HERE
        # self.B = sparse csr matrix of shape (h_dim, o_dim)
        #######################
        
        self.A = sparse.csr_matrix((self.h_dim**2, self.h_dim))
        self.B = np.zeros(((self.h_dim, self.o_dim)))
        
        for h_el, o_el in zip(y,X):
            row = [self.START, self.START] + h_el
            for i in range(2,len(row)):
                prev = self.cond_idx(self.hidden_states[row[i-2]],self.hidden_states[row[i-1]])
                curr_h = self.hidden_states[row[i]]
                curr_o = self.observed_states[o_el[i-2]]
                self.A[prev,curr_h] += 1
                self.B[curr_h,curr_o] += 1
            prev = self.cond_idx(self.hidden_states[row[i-1]],self.hidden_states[row[i]])
            curr_h = self.hidden_states[self.TERM]
            self.A[prev,curr_h] += 1
                
        self.A_rowsum = np.ravel(self.A.sum(axis=1))
        self.B_rowsum = np.ravel(self.B.sum(axis=1))
        
        
        return self
    
    def tr_prob(self, i, j): # i prev, j curr
        """
        A_ij = q(j | i) = q(j| u, v) with Laplace smoothing
        """
        result = (self.A[i,j] + 1) / (self.A_rowsum[i] + self.h_dim)
        ########################
        # YOUR CODE HERE
        # result = smoothed probability
        ########################
        return result
    
    def em_prob(self, i, j): # i hidden, j observed
        """
        B_jk = e(x_k| j) with Laplace smoothing
        """
        result = (self.B[i,j] + 1) / (self.B_rowsum[i] + self.o_dim)
        ########################
        # YOUR CODE HERE
        # result = smoothed probability
        ########################
        return result
        
    def o_state(self, x):
        """
        return index of obseved state
        """
        return self.observed_states.get(x, self.observed_states[self.REST])
    
    
    def predict(self, X):
        """
        Predict the most probable sequence of hidden states for every sequence of observed states
        X - list of lists
        """
        y_pred = [self._viterbi(seq) for seq in tqdm(X)]
        return y_pred 
            
    def _viterbi(self, X):
        """
        X - list of observables
        product of probabilities usually is not numerically stable
        remember, that log(ab) = log(a) + log(b) and argmax[log(f(x))] = argmax[f(x)]
        
        """   
        T = len(X)
        
        # pi[t, u, v] - max probability for any state sequence ending with x_t = v and x_{t-1} = u.
        pi = np.zeros((T + 1, self.h_dim, self.h_dim))
        # backpointers, bp[t, u, v] = argmax probability for any state sequence ending with x_t = v and x_{t-1} = u.
        bp = np.zeros((T + 1, self.h_dim, self.h_dim), dtype=np.int)
        
        ###################
        # fill tables pi and bp
        # pi[t, u, v] = max_{w} [ pi[t-1, w, u] * q(v| w, u) * e(x_k| v) ]
        # bp[t, u, v] = argmax_{w} [ pi[t-1, w, u] * q(v| w, u) * e(x_k| v) ]
        # YOUR CODE HERE 
        x0 = self.o_state(X[0])
        start_idx = self.hidden_states[self.START]
        for v in range(self.h_dim):
            log_b_smoothed = self.em_prob(v,x0)
            log_a_smoothed = self.tr_prob(self.cond_idx(start_idx,start_idx),v)
            pi[0,start_idx,v] = log_a_smoothed * log_b_smoothed
        
        for k in range(1, T):
            xk = self.o_state(X[k])

            for v in range(self.h_dim):
                log_b_smoothed = self.em_prob(v,xk)
                for u in range(self.h_dim): 
                    r = np.zeros(self.h_dim)
                    for w in range(self.h_dim):
                        log_a_smoothed = self.tr_prob(self.cond_idx(w,u),v)
                        r[w] = pi[k-1, w, u] * log_a_smoothed * log_b_smoothed
                    bp[k, u, v] = np.argmax(r)
                    pi[k, u, v] = np.max(r)
        ###################
                    
        term_idx = self.hidden_states[self.TERM]
        
        ###################
        # r(u,v) = pi[T, u, v] * q(TERM | u, v)
        # find argmax_{u, v} r(u, v)
        # YOUR CODE HERE
        r = np.zeros((self.h_dim,self.h_dim))
        for u in range(self.h_dim):
            for v in range(self.h_dim):
                r[u,v] = pi[k,u,v] * self.tr_prob(self.cond_idx(u,v),term_idx)
        u, v = np.unravel_index(np.argmax(r), r.shape)
        ###################
        
        h_states = [v, u]
        ###################
        # rollback backpointers
        # y_{t-2} = bp[t, y_{t-1}, y_t]
        # h_states is a reversed sequence of hidden states
        # YOUR CODE HERE
        for k in range(T-1,1,-1):
            h_states.append(bp[k, h_states[-1], h_states[-2]]) 
        ###################
            
        return [self.hidden_idx2state[i] for i in reversed(h_states[:T])]

## Problem 1. Part of speech tagging

Build Part-of-Speech tagging model using HMM. Estimate accuracy on test dataset.

In [3]:
import nltk
nltk.download('treebank')
from nltk.corpus import treebank
from sklearn import metrics

[nltk_data] Downloading package treebank to D:\nltk_data...
[nltk_data]   Package treebank is already up-to-date!


In [34]:
data = treebank.tagged_sents()[:3000]
test_data = treebank.tagged_sents()[3000:3010]

X_train = [[x[0] for x in y] for y in data]
y_train = [[x[1] for x in y] for y in data]

X_test = [[x[0] for x in y] for y in test_data]
y_test = [[x[1] for x in y] for y in test_data]

print('sentence: ', " ".join(X_train[0]))
print('tags: ', " ".join(y_train[0]))

sentence:  Pierre Vinken , 61 years old , will join the board as a nonexecutive director Nov. 29 .
tags:  NNP NNP , CD NNS JJ , MD VB DT NN IN DT JJ NN NNP CD .


In [5]:
def accuracy(y_true, y_pred):       
    y_true = np.concatenate(y_true)
    y_pred = np.concatenate(y_pred)
    
    return np.mean(y_true == y_pred)

In [36]:
%%time

hh = HMM().fit(X_train, y_train)
y_pred = hh.predict(X_test)
print(accuracy(y_test, y_pred))


  0%|                                                   | 0/10 [00:00<?, ?it/s]
 10%|████▎                                      | 1/10 [01:33<14:05, 93.91s/it]
 20%|████████▌                                  | 2/10 [02:41<10:47, 80.94s/it]
 30%|████████████▉                              | 3/10 [04:03<09:27, 81.11s/it]
 40%|█████████████████▏                         | 4/10 [04:38<06:57, 69.59s/it]
 50%|█████████████████████▌                     | 5/10 [06:10<06:10, 74.07s/it]
 60%|█████████████████████████▊                 | 6/10 [07:28<04:59, 74.78s/it]
 70%|██████████████████████████████             | 7/10 [09:05<03:53, 77.91s/it]
 80%|██████████████████████████████████▍        | 8/10 [10:09<02:32, 76.20s/it]
 90%|██████████████████████████████████████▋    | 9/10 [11:05<01:13, 73.92s/it]
100%|██████████████████████████████████████████| 10/10 [13:16<00:00, 79.62s/it]


0.757847533632
Wall time: 13min 29s


Your accuracy must be > 0.74

## Problem 1.2 Vectorized viterbi
Our currrent implementation of Viterbi is too slow. Let's make it vectorized. 

In [7]:
class HmmVectorized(HMM):
    
    def _viterbi(self, X):
        """
        Vectorized version of Viterbi. Let's speed up!
        X - list of observables
        """   
        T = len(X)
        
        # One may notice, at every step t we only need pi[t-1, u, v] = pi_prev[u,v] to compute pi[t, u, v] = pi_curr[u,v]
        pi_prev = np.zeros((self.h_dim, self.h_dim))
        
        # backpointers
        bp = np.zeros((T + 1, self.h_dim, self.h_dim), dtype=np.int)
        
        #a_rowsum = self.A_rowsum.reshape(self.h_dim, self.h_dim)
        
        ###################
        # fill pi and bp
        # pi_curr[u, v] = max_{w} [ pi_prev[w, u] * q(v| w, u) * e(x_k| v) ]
        # bp[t, u, v] = argmax_{w} [ pi_prev[w, u] * q(v| w, u) * e(x_k| v) ]
        # don't use tr_prob() and em_prob(), apply laplace smoothing directly here
        # YOUR CODE HERE
        
        x0 = self.o_state(X[0])
        start_idx = self.hidden_states[self.START]
        for v in range(self.h_dim):
            log_b_smoothed = self.em_prob(v,x0)
            log_a_smoothed = self.tr_prob(self.cond_idx(start_idx,start_idx),v)
            pi_prev[start_idx,v] = log_a_smoothed * log_b_smoothed

        for k in range(1, T):            
            xk = self.o_state(X[k])
            pi_curr = np.zeros_like(pi_prev)
            
            for v in range(self.h_dim):
                log_b_smoothed = (self.B[:, xk] + 1) / (self.B_rowsum + self.o_dim)
                #a = self.A[:, v].reshape(self.h_dim, self.h_dim)
                log_a_smoothed = (self.A[:, v] + 1) / (self.A_rowsum + self.h_dim)
                r = log_b_smoothed * log_a_smoothed * pi_prev # переделать
                bp[k, :, v] = np.argmax(r, axis=1)
                pi_curr[:, v] = np.max(r, axis=1)
                    
            pi_prev = pi_curr
        ###################
        
        term_idx = self.hidden_states[self.TERM]
        
        ###################
        # r(u,v) = pi[T, u, v] * q(TERM | u, v)
        # find argmax_{u, v} r(u, v)
        # express r(u,v) as matrix additions
        # YOUR CODE HERE
        # u, v = 
        ###################
        
        h_states = [v, u]
        ###################
        # rollback backpointers
        # y_{t-2} = bp[t, y_{t-1}, y_t]
        # h_states is a reversed sequence of hidden states
        # YOUR CODE HERE
        # h_states = 
            
        ###################
        
        return [self.hidden_idx2state[i] for i in reversed(h_states[:T])]

Let's take a larger test subset

In [13]:
data = treebank.tagged_sents()[:3000]
test_data = treebank.tagged_sents()[3000:3300]

X_train = [[x[0] for x in y] for y in data]
y_train = [[x[1] for x in y] for y in data]

X_test = [[x[0] for x in y] for y in test_data]
y_test = [[x[1] for x in y] for y in test_data]

print('sentence: ', " ".join(X_train[0]))
print('tags: ', " ".join(y_train[0]))

sentence:  Pierre Vinken , 61 years old , will join the board as a nonexecutive director Nov. 29 .
tags:  NNP NNP , CD NNS JJ , MD VB DT NN IN DT JJ NN NNP CD .


In [14]:
%%time

hh = HmmVectorized().fit(X_train, y_train)
y_pred = hh.predict(X_test)
print(accuracy(y_test, y_pred))

100%|██████████| 300/300 [00:46<00:00,  6.45it/s]

0.8424766977363515
CPU times: user 46.7 s, sys: 104 ms, total: 46.8 s
Wall time: 46.7 s





Your accuracy must be > 0.84 

In [38]:
hh.A_rowsum[:, 5].reshape(hh.h_dim, hh.h_dim)

IndexError: too many indices for array

## Problem 2. Spelling correction

Given data of true_char corrupted\_char build spelling correction model using HMM.    
There are 2 datatsets (spelling10.txt, spelling20.txt) with 10% and 20% corruption probability respectively.  
Each dataset contains 30556 words. Use first 28000 for training and the rest for testing. Estimate accuracy on the test subset.  


In [None]:
def get_states(filename):
    with open(filename,'r',encoding='utf-8') as f:
        data = f.read()
    words = data.split('\n_ _\n')

    o_data = []
    h_data = []

    for word in words:
        letters = word.split('\n')
        h_data.append([x.split(' ')[0] for x in letters])
        o_data.append([x.split(' ')[1] for x in letters])

    o_train, h_train = o_data[:28000], h_data[:28000]
    o_test, h_test = o_data[28000:], h_data[28000:]
    
    return o_train,h_train,o_test,h_test

In [None]:
%%time

o_train,h_train,o_test,h_test = get_states('spelling10.txt')
hh10 = HmmVectorized().fit(o_train, h_train)
h_pred = hh10.predict(o_test)
print(accuracy(h_test, h_pred))

You should get > 93% accuracy (+3%) on spelling10 dataset

In [19]:
%%time

o_train,h_train,o_test,h_test = get_states('spelling20.txt')
hh20 = HmmVectorized().fit(o_train, h_train)
h_pred = hh20.predict(o_test)
print(accuracy(h_test, h_pred))

You should get > 89% accuracy (+9%) on spelling20 dataset