# HMM
    Hidden Markov Model

## Reference
- https://www.cs.sjsu.edu/~stamp/RUA/HMM.pdf
- https://en.wikipedia.org/wiki/Baum%E2%80%93Welch_algorithm

## Notation
    - A: state transition matrix
    - B: observation probability / emission matrix
    - O: observation sequence
    - PI: initial state

In [27]:
import numpy as np

A = np.array([
    [0.7, 0.3], 
    [0.4, 0.6]])
B = np.array([
    [0.1, 0.4, 0.5], 
    [0.7, 0.2, 0.1]])

O = np.array([
    0,
    1,
    0,
    2])

PI = np.array([0.6, 0.4])

## Problem 1
    An Evaluation Problem

### Description
    given A,B,PI and O, compute p
    
### Solution
    forward algorithm

In [2]:
# 计算前向变量矩阵 alpha
def forward(a,b,pi,o):
    T = len(o)
    N = a.shape[0]
    alpha = np.zeros((T,N))
    alpha[0] = pi * b[:, o[0]]
    
    for t in range(1,T):
        alpha[t] = alpha[t-1].dot(a) * b[:, o[t]]
    return alpha

alpha = forward(A,B,PI,O)
forward_probability = alpha[-1].sum()
print('alpha \n', alpha, '\n')
print('forward probability', forward_probability)

alpha 
 [[0.06      0.28     ]
 [0.0616    0.0372   ]
 [0.0058    0.02856  ]
 [0.007742  0.0018876]] 

forward probability 0.009629599999999997


## Problem 2
    A Decoding / Prediction Problem

### Description
    given A,B,PI,O, find optimal state sequence
    
### Solution
    viterbi algorithm,  a DP algorithm
    should yield same result as that from brute force by compute p of all possible path with forward algorithm

In [3]:
def viterbi(a,b,pi,o):
    T = len(o)
    N = a.shape[0]
    delta = np.zeros((T,N))
    psi = np.zeros((T,N))
    delta[0] = pi * b[:, o[0]]

    for t in range(1,T):
        for i in range(N):
            delta[t,i] = np.max(delta[t-1]* a[:,i]) * b[i, o[t]] 
            psi[t,i] = np.argmax(delta[t-1]* a[:,i])
    
    # backtrack
    i_star = np.zeros(T,dtype=np.int32)
    i_star[T-1] = np.argmax(delta[T-1])
    for t in range(T-2,-1,-1): 
        i_star[t] = psi[t+1, i_star[t+1]]
    
    return i_star

v = viterbi(A,B,PI,O)
print('viterbi\n', v)

viterbi
 [1 1 1 0]


## Problem 3
    A Learning Problem

### Description
    given O, total number of distinct states(A.shape), total number of distinct observations(B.shape), find optimal state sequence
    
### Solution
    forward-backward algorithm, aka Baum-Welch algorithm, a particular case of EM(Expectation-Maximization) algorithm

```
"Unsupervised training of a HMM involves running forward and backward to estimate the *expected* probability of taking various state sequences, then updates the model to match these *expectations*. This repeats many times until things stabilise (covergence). Note that it's non-convex, so the starting point often affects the converged solution." --from https://people.eng.unimelb.edu.au/tcohn/comp90042/HMM.py
```

In [10]:
# 计算后向变量矩阵 beta
def backward(a,b,pi,o):
    T = len(o)
    N = a.shape[0]
    beta = np.zeros((T,N))
    beta[T-1] = np.ones((N))

    for t in range(T-2,-1,-1):
        for i in range(N):
            beta[t,i] = np.sum( a[i, :] * b[:, o[t+1]] * beta[t+1,:])

    # marginalized_likelihood = np.sum(pi * b[:,o[0]] * beta[0,:]) 
    return beta

beta = backward(A,B,PI,O)
print('beta\n', beta, '\n')
backward_probability = np.sum(PI * B[:,O[0]] * beta[0,:]) 
print('backward_probability\n',backward_probability)

beta
 [[0.0302  0.02792]
 [0.0812  0.1244 ]
 [0.38    0.26   ]
 [1.      1.     ]] 

backward_probability
 0.009629599999999999


In [18]:
def baum_welch(init_a, init_b, init_pi, observations, iterations):
    a = init_a
    b = init_b
    pi = init_pi
    T = len(observations)
    N = init_a.shape[1]
    M = init_b.shape[1]
    
    last_p = 0
    last_i = 0
    
    for iteration in range(iterations):

        # E-step
        alpha = forward(a, b, pi, observations)
        beta = backward(a, b, pi, observations)
        gamma = np.zeros((T,N))
        for t in range(T):
            gamma[t,:] = np.multiply(alpha[t, :], beta[t,:]) / alpha[-1].sum()
        ksi = np.zeros([T-1,N,N])
        for t in range(T-1):
            for i in range(N):
                for j in range(N):                   
                    ksi[t,i,j] = alpha[t,i] * a[i,j] * beta[t+1,j] * b[j,observations[t+1]] / alpha[-1].sum()

        p = alpha[-1].sum()
        if p > last_p:
            last_p = p
            last_i = iteration
        else:
            break
            
        # M-step
        new_a = np.zeros((N,N))
        for i in range(N):
            for j in range(N):
                new_a[i,j] = np.sum(ksi[:,i,j]) / np.sum(gamma[0:-1,i])
        # same as
        # new_a = np.sum(ksi,axis=0) / np.sum(gamma,axis=0).reshape((-1,1)))

        new_b = np.zeros((N,M))
        for k in range(M):
            for j in range(N):
                new_b[j,k] = np.sum(gamma[observations == k, j]) / np.sum(gamma[:,j])
        
        new_pi = gamma[0]

        a = new_a
        b = new_b
        pi = new_pi
                     
    return a,b,pi,last_p, last_i+1



In [29]:
# generate data

def generate_training_data(a, b, pi, T):
    
    gen = lambda dist: np.where(np.random.multinomial(1,dist) == 1)[0][0]
    
    states = np.zeros(T, dtype=np.int32)
    observations = np.zeros(T, dtype=np.int32)
    states[0] = gen(pi)
    observations[0] = gen(b[states[0],:])
    
    for t in range(1,T):
        states[t] = gen(a[states[t-1],:])
        observations[t] = gen(b[states[t],:])
        
    return observations


# generate data
true_a = np.array([
    [0.5, 0.5], 
    [0.3, 0.7]])
true_b = np.array([
    [0.3, 0.7], 
    [0.8, 0.2]])
true_pi = np.array([0.2, 0.8])

T = 100

observations = generate_training_data(true_a, true_b, true_pi, T)

true_p = forward(true_a, true_b, true_pi, observations)[-1].sum()

print('observations',observations, '\n')

observations [0 0 0 0 0 1 1 1 0 1 0 1 0 0 1 1 1 0 1 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 1 0 0
 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 1 0 0 0 1 1 0 0 1 0 0 1 0 1 1 1 0 0 0 1 1 1
 0 0 1 0 0 1 0 0 0 1 1 1 0 0 0 1 1 0 0 0 0 0 0 0 0 0] 



In [31]:
# train
init_a = np.array([
    [0.9, 0.1], 
    [0.1, 0.9]])
init_b = np.array([
    [0.8, 0.2], 
    [0.2, 0.8]])
init_pi = np.array([0.5, 0.5])

train_a, train_b, train_pi, train_p, train_i = baum_welch(init_a, init_b, init_pi, observations, 100)

print('trained a\n',train_a)
print('True A\n', true_a, '\n')
print('trained b\n',train_b)
print('True B\n', true_b, '\n')
print('trained pi\n',train_pi)
print('True PI\n', true_pi, '\n')
print('trained forward probability\n',train_p, '\n')
print('trained forward probability2\n',train_p2, '\n')
print('True forward probability\n', true_p, '\n')
print('trained iteration', train_i, '\n')

trained a
 [[0.91425981 0.08574019]
 [0.05378814 0.94621186]]
True A
 [[0.5 0.5]
 [0.3 0.7]] 

trained b
 [[0.94798207 0.05201793]
 [0.53430714 0.46569286]]
True B
 [[0.3 0.7]
 [0.8 0.2]] 

trained pi
 [1.00000000e+00 9.53128917e-39]
True PI
 [0.2 0.8] 

trained forward probability
 6.535518988634797e-26 

trained forward probability2
 6.535518988634767e-26 

True forward probability
 1.5000935489976903e-27 

trained iteration 77 

