# Assignment 11  
Implement Hidden Markov Model with supervised training and Viterbi algorithm for finding the most probable sequence of hidden states. Use [Laplace smoothing](https://en.wikipedia.org/wiki/Additive_smoothing) for estimation of probabilities. Apply the developed model to the problems:
* part of speech tagging
* spelling correction

In [None]:
import numpy as np
from scipy import sparse
from tqdm import tqdm
from IPython.display import Image
from collections import defaultdict

HMM model for 3-grams:

$P(x_1, .., x_T, y_1, .., y_T, y_{T+1}) = \prod_{t=1}^{T+1} q(y_t | y_{t-2}, y_{t-1}) \prod_{t=1}^T e(x_t | y_t)$

$x_1, .., x_T$ - sequence of observed states of length T  
$y_1, .., y_T$ - sequence of hidden states of length T  
$q(i | u, v) = \frac {count(u, v, i)} {count(u, v)} $ - transition probability   
$e(x_k | i) = \frac {count(i, x_k)} {count(i)}$ - emission probability  
$A_{i,j} = A_{(u,v), j} = q(i | u, v)$ - transition matrix  
$B_{j,k} = e(x_k | j)$ - emission matrix  

We assume, that $y_{T+1} = TERM$, and $y_0 = y_{-1} = START$

In [None]:
class HMM:
    START = '*'
    TERM = '$'
    REST = '$REST$' # to deal with observed states who have never appeared in train dataset.
        
    def cond_idx(self, u, v):
        return u + v*self.h_dim
        
    def fit(self, X, y):
        """
        X - list of lists, observed states
        y - list of lists, hidden states
        estimate elements of A, B matrices from train sequence. 
        """
        
        #######################
        # YOUR CODE HERE
        y_arr = [START + i + TERM for i in y]
        self.hidden_idx2state = np.array(set(y_arr))  # lisf of unique hidden states in train sequence + [START, TERM]
        self.hidden_states = range(len(self.hidden_idx2state))#? from state name to state index in hidden_idx2state
        self.h_dim = len(self.hidden_states) #number of hidden states
        
        self.observed_idx2state = np.array(list(set(X)) + REST) # lisf of unique observed states in train sequence + [REST]
        self.observed_states = range(len(self.observed_idx2state))#? from state name to state index in hidden_idx2state
        self.o_dim = len(self.observed_states) #number of observed states
        
        #######################       
        
        
        #######################
        # estimate emission matrix
        # YOUR CODE HERE
        self.B = sparse.csr_matrix((self.h_dim, self.o_dim)) #sparse csr matrix of shape (h_dim, o_dim)
        #######################
        
        self.B_rowsum = np.ravel(self.B.sum(axis=1))
        
        
        ########################
        # transition matrix
        # YOUR CODE HERE
        self.A = sparse.csr_matrix((self.h_dim**2, self.o_dim)) #dense matrix of shape (h_dim **2, h_dim)
        # ???remember about padding for sequence of hidden states, eg {a, b} -> {START, START, a, b, TERM}
        ########################
        
        self.A_rowsum = np.ravel(self.A.sum(axis=1))
        
        return self
    
    def tr_prob(self, i, j):
        """
        A_ij = q(j | i) = q(j| u, v) with Laplace smoothing
        """
        A_ij = j/(i[0]+i[1])
        ########################
        # A_qs = (# transitions from state q to state s)/(# transitions from state q)
        # result = smoothed probability
        ########################
        return result
    
    def em_prob(self, j, k):
        """
        B_jk = e(x_k| j) with Laplace smoothing
        """
        B_ij = k/j
        ########################
        #B_ij = (# of transitions from state j to observation i)/(# of states j)
        #result = smoothed probability
        ########################
        return result
        
    def o_state(self, x):
        """
        return index of obseved state
        """
        return self.observed_states.get(x, self.observed_states[self.REST])
    
    
    def predict(self, X):
        """
        Predict the most probable sequence of hidden states for every sequence of observed states
        X - list of lists
        """
        y_pred = [self._viterbi(seq) for seq in tqdm(X)]
        return y_pred 
            
    def _viterbi(self, X):
        """
        X - list of observables
        product of probabilities usually is not numerically stable
        remember, that log(ab) = log(a) + log(b) and argmax[log(f(x))] = argmax[f(x)]
        
        """   
        T = len(X)
        
        # pi[t, u, v] - max probability for any state sequence ending with x_t = v and x_{t-1} = u.
        pi = np.zeros((T + 1, self.h_dim, self.h_dim))
        # backpointers, bp[t, u, v] = argmax probability for any state sequence ending with x_t = v and x_{t-1} = u.
        bp = np.zeros((T + 1, self.h_dim, self.h_dim), dtype=np.int)
        
        ###################
        # fill tables pi and bp
        #pi[t, u, v] = max_{w} [ pi[t-1, w, u] * q(v| w, u) * e(x_k| v) ]
        #bp[t, u, v] = argmax_{w} [ pi[t-1, w, u] * q(v| w, u) * e(x_k| v) ]
        # YOUR CODE HERE 
        for k in range(1, T + 1):
            xk = self.o_state(X[k-1]) #return index of obseved state
            for v in range(self.h_dim): #start
                log_b_smoothed = self.em_prob(v, xk)
                for u in range(self.h_dim): #next
                    r = np.zeros(self.h_dim)
                    for w in range(self.h_dim): #current
                        log_a_smoothed = self.tr_prob([u,v], w) 
                        r[w] = [(pi[t-1, u, v]*w[v,u]*u[xk], u) for u in self.hidden_idx2state(t-2)]
                        bp[k, u, v] = np.argmax(r)
                        pi[k, u, v] = np.max(r)
        ###################
                    
        term_idx = self.hidden_states[self.TERM]
        
        ###################
        # r(u,v) = pi[T, u, v] * q(TERM | u, v)
        # find argmax_{u, v} r(u, v)
        # YOUR CODE HERE
        # u, v = 
        ###################

        h_states = [v, u]
        ###################
        # rollback backpointers
        # y_{t-2} = bp[t, y_{t-1}, y_t]
        # h_states is a reversed sequence of hidden states
        # YOUR CODE HERE
        # h_states = 
            
        ###################        
            
        return [self.hidden_idx2state[i] for i in reversed(h_states[:T])]

## Problem 1. Part of speech tagging

Build Part-of-Speech tagging model using HMM. Estimate accuracy on test dataset.

In [None]:
import nltk
#nltk.download('treebank')
from nltk.corpus import treebank
from sklearn import metrics

In [None]:
data = treebank.tagged_sents()[:3000]
test_data = treebank.tagged_sents()[3000:3010]

X_train = [[x[0] for x in y] for y in data]
y_train = [[x[1] for x in y] for y in data]

X_test = [[x[0] for x in y] for y in test_data]
y_test = [[x[1] for x in y] for y in test_data]

print('sentence: ', " ".join(X_train[0]))
print('tags: ', " ".join(y_train[0]))

In [None]:
def accuracy(y_true, y_pred):       
    y_true = np.concatenate(y_true)
    y_pred = np.concatenate(y_pred)
    
    return np.mean(y_true == y_pred)

In [None]:
%%time

hh = HMM().fit(X_train, y_train)
y_pred = hh.predict(X_test)
print(accuracy(y_test, y_pred))

Your accuracy must be > 0.74

## Problem 1.2 Vectorized viterbi
Our currrent implementation of Viterbi is too slow. Let's make it vectorized. 

In [None]:
class HmmVectorized(HMM):
    
    def _viterbi(self, X):
        """
        Vectorized version of Viterbi. Let's speed up!
        X - list of observables
        """   
        T = len(X)
        
        # One may notice, at every step t we only need pi[t-1, u, v] = pi_prev[u,v] to compute pi[t, u, v] = pi_curr[u,v]
        pi_prev = np.zeros((self.h_dim, self.h_dim))
        
        # backpointers
        bp = np.zeros((T + 1, self.h_dim, self.h_dim), dtype=np.int)
        
        a_rowsum = self.A_rowsum.reshape(self.h_dim, self.h_dim)
        
        ###################
        # fill pi and bp
        # pi_curr[u, v] = max_{w} [ pi_prev[w, u] * q(v| w, u) * e(x_k| v) ]
        # bp[t, u, v] = argmax_{w} [ pi_prev[w, u] * q(v| w, u) * e(x_k| v) ]
        # don't use tr_prob() and em_prob(), apply laplace smoothing directly here
        # YOUR CODE HERE
#         for k in range(1, T + 1):            
#             xk = self.o_state(X[k-1])
#             pi_curr = np.zeros_like(pi_prev)
            
#             for v in range(self.h_dim):
#                 log_b_smoothed = 
#                 a = self.A[:, v].reshape(self.h_dim, self.h_dim)
#                 log_a_smoothed = 
#                 r =  
#                 bp[k, :, v] = np.argmax(r, axis=1)
#                 pi_curr[:, v] = np.max(r, axis=1)
                    
#             pi_prev = pi_curr
        ###################
        
        term_idx = self.hidden_states[self.TERM]
        
        ###################
        # r(u,v) = pi[T, u, v] * q(TERM | u, v)
        # find argmax_{u, v} r(u, v)
        # express r(u,v) as matrix additions
        # YOUR CODE HERE
        # u, v = 
        ###################
        
        h_states = [v, u]
        ###################
        # rollback backpointers
        # y_{t-2} = bp[t, y_{t-1}, y_t]
        # h_states is a reversed sequence of hidden states
        # YOUR CODE HERE
        # h_states = 
            
        ###################
        
        return [self.hidden_idx2state[i] for i in reversed(h_states[:T])]

Let's take a larger test subset

In [None]:
data = treebank.tagged_sents()[:3000]
test_data = treebank.tagged_sents()[3000:3300]

X_train = [[x[0] for x in y] for y in data]
y_train = [[x[1] for x in y] for y in data]

X_test = [[x[0] for x in y] for y in test_data]
y_test = [[x[1] for x in y] for y in test_data]

print('sentence: ', " ".join(X_train[0]))
print('tags: ', " ".join(y_train[0]))

In [None]:
%%time

hh = HmmVectorized().fit(X_train, y_train)
y_pred = hh.predict(X_test)
print(accuracy(y_test, y_pred))

Your accuracy must be > 0.84 

## Problem 2. Spelling correction

Given data of true_char corrupted\_char build spelling correction model using HMM.    
There are 2 datatsets (spelling10.txt, spelling20.txt) with 10% and 20% corruption probability respectively.  
Each dataset contains 30556 words. Use first 28000 for training and the rest for testing. Estimate accuracy on the test subset.  


In [None]:
# YOUR CODE HERE
f1 = open('./data/spelling10.txt', 'r', encoding="utf-8")
f2 = open('./data/spelling20.txt', 'r', encoding="utf-8")
spell_data_1 = f1.read()
spell_data_2 = f2.read()
f1.close()
f2.close()

In [None]:
def preprocess(data):
    raw = []
    data = spell_data_1.split('_ _')
    data = [i.split('\n') for i in data]
    for word in data:
        word = [x for x in word if x]
        raw.append(word)
    return(raw)

def create_data(data):
    X = []#observed
    y = []#hidden
    raw = preprocess(data)
    for word in raw:
        letter = [i.split() for i in word]
        obs = [i[1] for i in letter]
        hid = [i[0] for i in letter]
        X.append(obs)
        y.append(hid)
    return X,y

In [None]:
# 10% corruption probability
X_1, y_1 = create_data(spell_data_1)
X_train = X_1[:28000]
y_train = y_1[:28000]

X_test = X_1[28000:]
y_test = y_1[28000:]

In [None]:
%%time

hh = HMM().fit(X_train, y_train)
y_pred = hh.predict(X_test)
print(accuracy(y_test, y_pred))

You should get > 93% accuracy (+3%) on spelling10 dataset

In [None]:
# 20% corruption probability
X_2, y_2 = create_data(spell_data_2)
X_train = X_2[:28000]
y_train = y_2[:28000]

X_test = X_2[28000:]
y_test = y_2[28000:]

In [None]:
%%time

hh = HMM().fit(X_train, y_train)
y_pred = hh.predict(X_test)
print(accuracy(y_test, y_pred))

You should get > 89% accuracy (+9%) on spelling20 dataset