## Hidden Markov Model

### Introduction

**Hidden Markov Model** is a statistical model in which the system being modeled is assumed to be a Markov process with underlying states.

In simpler Markov models, the states is directly visible to the learner, and the transition probabilities are the only parameters, while in the hidden Markov models, the state is not directly visible, but the output, dependent on the state, is visible. Each state has a distribution over the outputs.

A hidden Markov model can be considered a generalization of a mixture model where the hidden variables, which control the mixture component to be select for each observation, are related through a Markov process rather than independent of each other.

### Notations

Consider discrete HMMs of length $T$(i.e. each observation sequence is $T$ observations long). Let the space of observations be $X=\{1,2,\dots,N\}$, and the space of states $Z=\{1,2,\dots,M\}$. 

Define a HMM model $\theta=(\pi,A,B)$, where $\pi$ is the initial state matrix(i.e. $\pi_i=\mathbb{P}(z_1=i)$), $A$ is the state transition matrix(i.e. $A_{ij}=\mathbb{P}(z_{t+1}=j\mid z_{t}=i)$), and $B$ is the emission matrix($B_i(j)=\mathbb{P}(x_t=j\mid z_t=i)$).

Assume we have $D$ iid observation sequences. Let $\mathcal{X}=\{X^{(1)},X^{(2)},\dots,X^{(D)}\}$. Often, the state set $\mathcal{Z}$ is unknown to the learner.

### Properties of HMMs

HMM has Markov Property:

*  $\mathbb{P}(z_{t+1}\mid z_1,\dots,z_t)=\mathbb{P}(z_{t+1}\mid z_t)$.
*  The Markov chain of hidden states is homogeneous(the transition matrix $A$ is time-invariant).
*  The observations only depend on the state at current time $t$.

### Fundamental questions of HMMs

#### Inference in HMMs

1.Given model $\theta=(\pi,A)$ without observation, how to compute $P(Z)$ and $\mathbb{P}(z_t = i)$?
 
 By induction: $$p_1(i)=\pi_i $$ 
 
 $$ p_t(i)=\sum_j\mathbb{P}(z_t=i\mid z_{t-1}=j)$$

2.Given observation set $X$, and model $\theta=(\pi,A,B)$,how to compute $P(Z\mid X)$ and $\mathbb{P}(z_t=i\mid X)$?

 Define forward probability $\alpha_t(i)=\mathbb{P}(x_1,x_2,\dots,x_t\wedge z_t=i)$, one can show that 
 
 $$\alpha_1(i)=\mathbb{P}(x_1 \wedge z_1=i)=\mathbb{P}(x_1 \mid z_1=i)\mathbb{P}(z_1=i)=B_i(x_1)\pi_i$$
 
 $$\alpha_{t+1}(i)=\sum_jB_i(x_{t+1})A_{ji}\alpha_t(j)$$
 
\begin{proof}

\begin{align*}
\sum_j\alpha_t(j)A_{ji}B_i(x_{t+1})&=\sum_j\mathbb{P}(x_1,x_2,\dots,x_t\wedge z_t=j)\mathbb{P}(z_{t+1}=i\mid z_t=j)\mathbb{P}(x_{t+1}\mid z_{t+1}=i)\\
&=\sum_j \mathbb{P}(x_1,x_2,\dots,x_t\wedge z_t=j)\mathbb{P}(z_{t+1}=i\mid z_t=j)\mathbb{P}(x_{t+1}\mid z_{t+1}=i,z_t=j)\\
&=\sum_j \mathbb{P}(x_1,x_2,\dots,x_t\wedge z_t=j)\mathbb{P}(x_{t+1},z_{t+1}=i\mid z_t=j)\\
&=\sum_j \mathbb{P}(x_1,x_2,\dots,x_t\wedge z_t=j)\mathbb{P}(x_{t+1},z_{t+1}=i\mid z_t=j,x_1,x_2,\dots,x_t)\\
&=\sum_j \mathbb{P}(x_1,x_2,\dots,x_{t+1},z_t=j,z_{t+1}=i)\\
&=\mathbb{P}(x_1,x_2,\dots,x_{t+1},z_{t+1}=i)\\
&=\alpha_{t+1}(i)
\end{align*}

\end{proof}

 We can use Bayes rule:
 
 $$P(z_t=i\mid X)=\frac{P(X\mid z_t=i)P(z_t=i)}{P(X)}=\frac{P(X\wedge z_t=i)}{P(X)}=\frac{\alpha_t(i)}{\sum_j\alpha_t(j)}$$

3.Given observation set $X$, and model $\theta=(\pi,A,B)$, how to find the most probable path such that $P(Z^*\mid X)=\arg\max_ZP(Z\mid X)$?  

Here we have:

$$\arg\max_ZP(Z\mid X)=\arg\max_Z\frac{P(X\mid Z)P(Z)}{P(X)}=\arg\max_ZP(X\mid Z)P(Z)=\arg\max_ZP(X,Z)$$

Viterbi Algorithm:

Given the defination that:

$$\delta_t(i)=\max_{z_1,\dots,z_{t-1}}\mathbb{P}(z_1,\dots,z_{t-1}\wedge z_t=i\wedge x_1,\dots,x_t)$$

In other words, we are interested in the most likely path from $1$ to $t$ that:

1.Ends in state $i$.

2.Produces observations $x_1,\dots,x_t$.

It is obvious that:

$$\delta_1(i) = \mathbb{P}(z_1=i\wedge x_1)=\mathbb{P}(z_1=i)\mathbb{P}(x_1\mid z_1=i)=\pi_iB_i(x_1)$$


\begin{align*}
\delta_{t+1}(i)&=\max_{z_1,\dots,z_{t}}\mathbb{P}(z_1,\dots,z_{t}\wedge z_{t+1}=i\wedge x_1,\dots,x_{t+1})\\
&= \max_j \mathbb{P}(z_1,\dots,z_{t-1}\wedge z_t=j\wedge x_1,\dots,x_t)\mathbb{P}(z_{t+1}=i\mid z_t=j)\mathbb{P}(x_{t+1}\mid z_{t+1}=i)\\
&= \max_j \delta_t(j)A_{ji}B_i(x_{t+1})
\end{align*}


We use dynamic programming for solving $\delta_t(i)$. And $P(Z^*\mid X)=\arg\max_ZP(X,Z)=\arg\max_j\delta_t(j)$ for $\forall t$.

#### Learning HMMs

1.Supervised learning framework

In this scenario, assume we can observe the set of states $\mathcal{Z}$, we using maximum likelihood estimation to learn the model.

Initial probabilities:

$$\pi^*=\arg\max_\pi\prod_{d=1}^D\pi_{z_1^{(d)}}B_{z_1^{(d)}}(x_1^{(d)})\prod_{t=2}^TP(z_t\mid z_{t-1})\propto \arg\max_\pi\prod_{k=1}^D\pi_{z_1^{(d)}}$$

Transition probabilities:

$$A_{ij}^*=\arg\max_{A_{ij}}\prod_{d=1}^D\pi_{z_1^{(d)}}B_{z_1^{(d)}}(x_1^{(d)})\prod_{t=2}^TP(z_t=j\mid z_{t-1}=i)\propto \arg\max_{A_{ij}}\prod_{t=2}^TP(z_t=j\mid z_{t-1}=i)$$

Emission probabilities:

$$B_i^*(j)=\hat{\mathbb{P}}(x_t=j\mid z_t=i)_{MLE}$$

2.Unsupervised learning framework

Define backward probability $\beta_t(i)=\mathbb{P}(x_{t+1},\dots,x_T\mid z_t=i)$, and it holds that

$$\beta_T(i)=1$$

$$\beta_t(i)=\sum_j A_{ij}B_j(x_{t+1})\beta_{t+1}(j)$$

\begin{proof}

\begin{align*}
\sum_j A_{ij}B_j(x_{t+1})\beta_{t+1}(j)&=\sum_j\mathbb{P}(z_{t+1}=j\mid z_t=i)\mathbb{P}(x_{t+1}\mid z_{t+1}=j)\mathbb{P}(x_{t+2},\dots,x_T\mid z_{t+1}=j)\\
&=\sum_j\mathbb{P}(z_{t+1}=j\mid z_t=i,z_{t+1}=j)\mathbb{P}(x_{t+1}\mid z_{t+1}=j)\mathbb{P}(x_{t+2},\dots,x_T\mid z_{t+1}=j)\\
&=\sum_j\mathbb{P}(x_{t+1},z_{t+1}=j\mid z_t=i)\mathbb{P}(x_{t+2},\dots,x_T\mid x_{t+1},z_{t+1}=j,z_t=i)\\
&=\sum_j\mathbb{P}(x_{t+1},x_{t+2},\dots,x_T,z_{t+1}=j\mid z_t=i)\\
&=\sum_j\mathbb{P}(x_{t+1},x_{t+2},\dots,x_T\mid z_t=i)\\
&=\beta_t(i)
\end{align*}

\end{proof}


$$S_t(i)\equiv\mathbb{P}(z_t=i\mid x_1,\dots,x_T)=\frac{\alpha_t(i)\beta_t(i)}{\sum_j\alpha_t(j)\beta_t(j)}=\frac{\mathbb{P}(x_1,\dots,x_t,z_t=i)\mathbb{P}(x_{t+1},\dots,x_T\mid z_t=i)}{\sum_j\mathbb{P}(x_1,\dots,x_t,z_t=j)\mathbb{P}(x_{t+1},\dots,x_T\mid z_t=j)}=\frac{\mathbb{P}(x_1,\dots,x_T,z_t=i)}{\sum_k\mathbb{P}(x_1,\dots,x_T,z_t=k)}$$

\begin{align*}
S_t(i,j)&\equiv \mathbb{P}(z_t=i,z_{t+1}=j\mid x_1,\dots,x_T)\\
&=\frac{\alpha_t(i)A_{ij}B_j(x_{t+1})\beta_{t+1}(j)}{\sum_j\alpha_t(j)\beta_t(j)}\\
&=\frac{\mathbb{P}(x_1,\dots,x_t,z_t=i)\mathbb{P}(z_{t+1}=j\mid z_t=i)\mathbb{P}(x_{t+1}\mid z_{t+1}=j)\mathbb{P}(x_{t+2},\dots,x_T\mid z_{t+1}=j)}{\sum_k\mathbb{P}(x_1,\dots,x_T,z_t=k)}\\
&=\frac{\mathbb{P}(x_1,\dots,x_T,z_t=i,z_{t+1}=j)}{\sum_k\mathbb{P}(x_1,\dots,x_T,z_t=k)}
\end{align*}

We can estimate parameters by $S_t(i)$ and $S_t(i,j)$:

$$A_{ij}=\mathbb{P}(z_{t+1}=j\mid z_{t}=i)\approx\frac{\sum_tS_t(i,j)}{\sum_k\sum_tS_t(i,k)}$$

$$B_i(j)=\mathbb{P}(x_t=j\mid z_t=i)\approx\frac{\sum_{t\mid x_t=j}S_t(i)}{\sum_k\sum_{t\mid x_t=j}S_t(k)}$$

## Baum-Welch Algorithm

Input: Observation $x_1,x_2,\dots,x_T$ and the number of states $N$.

1.Guess model parameters $A$, $\pi$ and $B$.

2.E-step: Compute $S_t(i)$ and $S_t(i,j)$.

3.M-step: Use $S$ to compute $A$ and $B$ (MLE).

4.If converge, to 5. Else to 2.

5.Model output

Baum-Welch is an iterative procedure for estimating $\theta^*$ from only $\mathcal{X}$. It works by maximizing log-likelihood, and updating the current model to be closer to the optimal model. Each step will increase the log-likelihood.

We can explain the algorithm as:

1.Compute $Q(\theta,\theta^s)=\sum_{z\in\mathcal{Z}}\log\[P(\mathcal{X},z;\theta)\]P(z\mid\mathcal{X};\theta^s)$.

2.Set $\theta^{s+1}=\arg\max_\theta Q(\theta,\theta^s)$

Where $$P(z,\mathcal{X};\theta)=\prod_{d=1}^D\big(\pi_{z_1^{(d)}}B_{z_1^{(d)}}(x_1^{(d)})\prod_{t=2}^TA_{z_{t-1}^{(d)}z_t^{(d)}}B_{z_t^{(d)}}(x_t^{(d)})\big)$$

Take logarithm we have:

$$\log P(z,\mathcal{X};\theta)=\sum_{d=1}^D\big[\log{\pi_{z_1^{(d)}}}+\sum_{t=2}^T\log{A_{z_{t-1}^{(d)}z_t^{(d)}}}+\sum_{t=1}^T\log{B_{z_t^{(d)}}(x_t^{(d)})}\big]$$

Note that $P(z,\mathcal{X})=P(z\mid\mathcal{X})P(\mathcal{X})$ and $P(\mathcal{X})$ is a constant (not affected by $\theta$).Hence,

$$\arg\max_\theta \sum_{z\in\mathcal{Z}}\log{[P(\mathcal{X},z;\theta)]}P(z\mid\mathcal{X};\theta^s)=\arg\max_\theta \sum_{z\in\mathcal{Z}}\log{[P(\mathcal{X},z;\theta)]}P(z,\mathcal{X};\theta^s)=\arg\max_\theta \hat{Q}(\theta,\theta^s)$$

Plugging $\log P(z,\mathcal{X};\theta)$ into $\hat{Q}(\theta,\theta^s)$:

$$\hat{Q}(\theta,\theta^s)=\sum_{z\in\mathcal{Z}}\sum_{d=1}^D\big[\log{\pi_{z_1^{(d)}}}+\sum_{t=2}^T\log{A_{z_{t-1}^{(d)}z_t^{(d)}}}+\sum_{t=1}^T\log{B_{z_t^{(d)}}(x_t^{(d)})}\big]P(z,\mathcal{X};\theta^s)$$

Let $\hat{L}(\theta,\theta^s)$ be the Lagrangian:

$$\hat{L}(\theta,\theta^s)=\hat{Q}(\theta,\theta^s)-\lambda_\pi\big(\sum_{i=1}^M\pi_i-1\big)-\sum_{i=1}^M\lambda_{A_i}\big(\sum_{i=j}^MA_{ij}-1\big)-\sum_{i=1}^M\lambda_{B_i}\big(\sum_{j=1}^NB_i(j)-1\big)$$

Solve it we have:

$$\pi_i^{(s+1)}=\frac{1}{D}\sum_{d=1}^DP(z_1^{(d)}=i\mid X^{(d)};\theta^s)$$

$$A_{ij}^{(s+1)}=\frac{\sum_{d=1}^D\sum_{t=2}^TP(z_{t-1}^{(d)}=i,z_t^{(d)}=j\mid X^{(d)};\theta^s)}{\sum_{d=1}^D\sum_{t=2}^TP(z_{t-1}^{(d)}=i\mid X^{(d)};\theta^s)}$$

$$B_i^{(s+1)}(j)=\frac{\sum_{d=1}^D\sum_{t=1}^TP(z_t^{(d)}=i\mid X^{(d)};\theta^s)\mathbb{I}\{x_t^{(d)}=j\}}{\sum_{d=1}^D\sum_{t=1}^TP(z_t^{(d)}=i\mid X^{(d)};\theta^s)}$$

In [1]:
import numpy as np

In [2]:
class HMM:
    
    def __init__(self, state_num):
        
        self.M = state_num
       
    
    # Encode for variable inputs
    def encode(self, seq):
        
        self.encode_dict = dict(zip(set(seq),range(len(set(seq)))))
        encode_seq = []
        for i in seq:
            encode_seq.append(self.encode_dict[i])
        return encode_seq   
        
        
    # Decode after encode
    def decode(self, seq):
        
        self.decode_dict = dict(zip(self.encode_dict.values(), self.encode_dict.keys()))
        decode_seq = []
        for i in seq:
            decode_seq.append(self.decode_dict[i])
        return decode_seq
        
        
    def learning(self, sequence, max_iteration):
        
        self.obs = np.array(self.encode(sequence))#np.array(sequence)[np.newaxis,:]
        # Number of observations
        self.N = len(np.unique(self.obs))
        # Sequence length
        self.T = self.obs.shape[0]
        # Number of sequences
        #self.D = self.obs.shape[0]
        # Initialize transition matrix
        A_unscaled = np.random.rand(self.M, self.M)
        self.A = (A_unscaled/A_unscaled.sum(axis=0)) 
        # Initialize emission matrix
        self.B = np.random.rand(self.M, self.N)
        for i in range(self.M):
            self.B[i] = 1.0 * self.B[i]/self.B.sum(axis = 1)[i]
        # Assign equal probabilities for each state
        self.pi = np.repeat(1.0/self.M, self.M)
        
        
        # Return matrix S_i at time t
        def compute_S_i(t):
            
            # Initialize forward probability
            alpha = self.pi * self.B[:,self.obs[0]]
            # Initialize backward probability
            beta = np.ones(self.M)
            beta_t_plus_one = beta
            # Forward algorithm
            for i in range(0, t):
                alpha = np.dot(alpha, self.A) * self.B[:, self.obs[i + 1]]
            # Backward algorithm
            for i in range(self.T - 2, t - 1, -1):
                if i == t:
                    beta_t_plus_one = beta
                beta = np.dot(self.A, beta * self.B[:, self.obs[i + 1]])
            return 1.0 * alpha * beta / (alpha * beta).sum()

        
        # Return matrix S_ij at time t    
        def compute_S_ij(t):
            
            # Initialize forward probability
            alpha = self.pi * self.B[:,self.obs[0]]
            # Initialize backward probability
            beta = np.ones(self.M)
            beta_t_plus_one = beta
            # Forward algorithm
            for i in range(0, t):
                alpha = np.dot(alpha, self.A) * self.B[:, self.obs[i + 1]]
            # Backward algorithm
            for i in range(self.T - 2, t - 1, -1):
                if i == t:
                    beta_t_plus_one = beta
                beta = np.dot(self.A, beta * self.B[:, self.obs[i + 1]])
            return self.A.T * np.repeat(beta_t_plus_one, self.M).reshape(self.M, self.M)\
                   * (self.B[:,self.obs[t + 1]].reshape(self.M, 1) * alpha.reshape(1, self.M)) / (alpha * beta).sum()
        
        
        # Update transition matrix and emission matrix
        def parameters_updating():
            
            # Set of S_i & S_ij at all time
            list_S_i = []
            list_S_ij = []
            for t in range(0, self.T):
                list_S_i.append(compute_S_i(t))
            for t in range(0, self.T - 1):
                list_S_ij.append(compute_S_ij(t)) 
            # Record the S_i matrix loop all observations at all time
            list_final = []
            for j in range(0, self.N):
                # Record the S_i matrix at all time at which observations is j 
                list_partial = []
                for t in range(0, self.T):
                    if self.obs[t] == j:
                        list_partial.append(list_S_i[t])
                list_final.append(list_partial)
            for i in range(self.N):
                self.B[:,i] = np.array(sum(list_final[i]))
            return (list_S_i[0], sum(list_S_ij)/sum(list_S_i[: -1]), (self.B.T/sum(list_S_i)).T)
        
        
        # Baum-Welch Algorithm
        for i in range(max_iteration):
            self.pi, self.A, self.B = parameters_updating()
            
        return self.pi, self.A, self.B
    
    
    # Viterbi Algorithm
    def inference(self, sequence, initial_matrix, transition_matrix, emission_matrix):
        
        self.pi = initial_matrix
        self.A = transition_matrix
        self.B = emission_matrix
        self.obs = np.array(self.encode(sequence))
        self.T = self.obs.shape[0]
        # delta[j,t] records the probability of end with state j at time t 
        self.delta = np.zeros((self.M, self.T))
        # psi[j,t] records current state is j at time t - 1
        self.psi = np.zeros((self.M, self.T)) 
        # Initialize delta & psi
        self.delta[:,0] = self.pi * self.B[:,self.obs[0]]
        self.psi[:,0] = np.zeros(self.M) - 1
        result = np.zeros(self.T)
        for t in range(1, self.T):
            # Update delta
            self.delta[:, t] = np.max(self.A.T * np.dot(self.B[:, self.obs[t]].reshape(self.M, 1), \
                                      self.delta[:, t - 1].reshape(1, self.M)), axis = 1)
            # Update psi
            self.psi[:, t] = np.argmax(np.repeat(self.delta[:, t - 1], self.M).reshape(self.M, self.M).T * A.T, axis = 1) 
        # Find the states that maximize the delta at each time t
        index = np.argmax(self.delta, axis=0)
        for i in range(len(index)):
                result[i] = int(self.psi[:, i][index[i]])
        # Correct the first max_probability
        result[0] = np.argmax(self.delta[:,0])
        return result

In [3]:
A = np.array([[0.5,0.2,0.3],[0.3,0.5,0.2],[0.2,0.3,0.5]])
B = np.array([[0.5,0.5],[0.4,0.6],[0.7,0.3]])
pi = np.array([0.2,0.4,0.4])
obs = [1,2,1]

In [4]:
h = HMM(3)
parameters = h.learning(obs, 50)
print "pi:", parameters[0], "\n"

print "A:", parameters[1], "\n"

print "B:", parameters[2]

pi: [ 0.01719123  0.98280877  0.        ] 

A: [[ 0.   0.   0.5]
 [ 0.   0.   0.5]
 [ 1.   1.   0. ]] 

B: [[ 1.  0.]
 [ 1.  0.]
 [ 0.  1.]]


In [5]:
h.inference(obs, pi, A, B)

array([ 2.,  2.,  2.])