<a href="https://colab.research.google.com/github/masa512/hidden_markov_model/blob/main/hmm.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import numpy as np
import matplotlib.pyplot as plt

# Hidden Markov Model

This codebook intends to dig into implementation of single-observation Markov Model for a given time sequence data.

We have the following quantities

$A : \{a_{ij}\} = P\{q_{t+1}=S_j | q_{t}=S_i\}$ : (N,N)

$B : \{b_j(O_{t})\} = P\{O_{t} = v_k | q_{t} = S_j\}$ : (M,N)

$P_0 : \{\pi_i\} = P(q_1 = S_i)$ 

We finally refer to $\lambda$ as the model parameter set $\{A,B,P_0\}$










---



---


# The Forward Algorithm

The forward algorithm uses dynamic programming to evaluate the following quantity

$$\alpha_t(i) = P\{\{ O_n\}_0^t,q_t=S_i | \lambda\}$$

We will process the update using the following steps : 



1. Initial :
> $\alpha_1(i) = \pi_i b_i(O_1)$

2. Updating :
> $\alpha_{t+1}(j) = [\sum_{i=1}^N \alpha_t(i)a_{ij}]b_j(O_{t+1})$

3. Terminal :
> $P\{O|\lambda\} = \sum_{i=1}^{N} \alpha_{T}(i)$




In [2]:
# Lets implement the initialization process of a HMM model

def init_hmm(N,M):
  """
  Input
  N (Scalar): Number of states
  M (Scalar): Number of possible emissions

  Output
  A (N,N): Transition matrix from state i to state j
  B (M,N): Emission at state i
  P (N,) : Priori probability for each state
  """

  # Initialize P
  P = np.random.rand(N,)
  P = P/np.sum(P)

  # Initialize A
  A = np.random.rand(N,N)
  A = A/np.sum(A,axis=0,keepdims=True)
  # Initialize B
  B = np.random.rand(M,N)
  B = B/np.sum(B,axis=0,keepdims= True)

  return (A,B,P)


# Lets implement the forward algorithm of HMM model


In [3]:
def forward_alg(V,A,B,P):
  """
  Forward probability evaluation using Observations V

  Input
  V (T,) : Observation sequence of length T (V[t] is t-th observation value given)
  A (N,N): Transition matrix
  B (M,N): Emission matrix at given state
  P (N,) : Priori state probability

  Output
  Alpha (T,N): Forward probability
  out_prob (Scalar): The terminal probability of output sequence for model
  """

  Alpha = np.zeros((V.shape[0],P.shape[0]))

  # initial condition
  Alpha[0,:] = (B[V[0]] * P).reshape(-1,)
  
  # Loop the rest
  for t in range(1,V.shape[0]):
    prev_alp = Alpha[t-1,:] # Length N
    Alpha[t,:] = (B[V[t]] * (A @ prev_alp)).reshape(-1,)
  
  # Terminal Condition
  out_prob = np.sum(Alpha[-1,:],keepdims=False)

  return out_prob, Alpha
  

In [4]:
# Test the forward operation

(N,M) = (3,4)
A,B,P = init_hmm(N,M)

V = np.array([3,1,1,0]) # My observation

out_prob, Alpha = forward_alg(V,A,B,P)


# Backward Probability

The Backward probability evaluates the following: 

$$\beta_{t}(j) = P\{ \{O_n\}_{n=t+1}^T | q_{t} = S_j\}$$

In the implementation, we use the following methods to calculate $\beta$



1.   Initial:
> $\beta_T(j) = 1$
2.   Inductive:
> $\beta_t(j) = \sum_{i = 1}^{N} a_{ij}b_j(O_{t+1})\beta_{t+1}(i)$



In [5]:
# We will furthermore define the backward operation as well
def Backward_alg(V,A,B,P):
  """
  Forward probability evaluation using Observations V

  Input
  V (T,) : Observation sequence of length T (V[t] is t-th observation value given)
  A (N,N): Transition matrix
  B (M,N): Emission matrix at given state
  P (N,) : Priori state probability

  Output
  Beta (T,N): Backward probability
  """

  Beta = np.zeros((V.shape[0],A.shape[0]))

  # Initial condition
  Beta[-1,:] = np.ones((A.shape[0],))

  # Iteration
  for t in range(V.shape[0]-2,-1,-1):
    prev_beta = Beta[t+1,:]
    Beta[t,:] = B[V[t+1],:]*(A @ prev_beta)
  
  return Beta

In [6]:
# Test the backward operation

(N,M) = (3,4)
A,B,P = init_hmm(N,M)

V = np.array([3,1,1,0]) # My observations

Beta = Backward_alg(V,A,B,P)

# Forward-Backward Variable and Adjacence probability

First, we define $\gamma_t(i)$ as the following:
$$\gamma_t(i) = P\{q_t=S_i | \{O_i\}, \lambda\}$$

which can be evaluated using the forward and backward parameters:

$$=\frac{\alpha_t(i)\beta_{t}(i)}{P\{\{O\}|\lambda\}} = \frac{\alpha_t(i)\beta_{t}(i)}{\sum_{i=1}^N \alpha_t(i)\beta_{t}(i)} $$

Lastly, we define the adjacence probability $\zeta_t(i,j)$ with the following:
$$\zeta_t(i,j) = P\{q_t = S_i, q_{t+1} = S_j | \{O\},\lambda\}$$

Which can be explicitly evaluated by:

$$ = \frac{\alpha_t(i)a_{ij}b_j(O_{t+1})\beta_{t+1}(j)}{\sum_{i=1}^{N}\sum_{j=1}^{N}\alpha_t(i)a_{ij}b_j(O_{t+1})\beta_{t+1}(j)} $$

**NOTE:** $$\gamma_t(i) = \sum_{i=1}^{N} \zeta_t(i,j)$$

In [35]:
def eval_zeta(V,A,B,alpha,beta):

  """
  The expectation step for HMM prediction

  Input:
  V (T,) : Vector of inputs
  A (N,N) : Transition matrix
  B (M,N): Emission matrixx
  P (N,) : Priori state

  Alpha (T,N): Forward probability
  Beta (T,N): Backward probability
  """

  T = alpha.shape[0]
  N = A.shape[0]
  M = B.shape[0]

  zeta = (T-1)*[0]
  den = (T-1)*[0]
  for t in range(T-1):
    for i in range(N):
      for j in range(N):
        num = alpha[t,i]*A[i,j]*B[V[t+1],j]*beta[t+1,j]
        zeta[t] = num
        den[t] = den[t]+num
    zeta[t] = zeta[t]/den[t]
  return zeta


def eval_gamma(V,A,B,alpha,beta):

  """
  The expectation step for HMM prediction

  Input:
  V (T,) : Vector of inputs
  A (N,N) : Transition matrix
  B (M,N): Emission matrixx
  P (N,) : Priori state

  Alpha (T,N): Forward probability
  Beta (T,N): Backward probability
  """

  T = alpha.shape[0]
  N = A.shape[0]
  M = B.shape[0]
  print(alpha.shape)
  print(beta.shape)
  gamma = (T)*[0]
  den = (T)*[0]

  for t in range(T):
    gamma_t = np.zeros((N,))
    for i in range(N):
      print(t)
      gamma_t[i] = alpha[t,i]*beta[t,i]
      den[t] = alpha[t,i]*beta[t,i]
    gamma[t] = gamma_t/den[t]

  return gamma

def E_step(V,A,B,P,alpha,beta):
  """
  The expectation step for HMM prediction

  Input:
  V (T,) : Vector of inputs
  A (N,N) : Transition matrix
  B (M,N): Emission matrixx
  P (N,) : Priori state
  """
  zeta = eval_zeta(V,A,B,alpha,beta)
  gamma = eval_gamma(V,A,B,alpha,beta)
  return adj, gamma


# Maximization step

We will now update the model paramters using Baum-Welch with following formula

$$a_{ij} = \frac{\sum_{t=1}^{T-1} \zeta_{ij}(t)}{\sum_{i=1}^{T-1}\sum_{j=1}^{N} \zeta_{ij}(t)}$$

$$b_j(V(k)) = \frac{\sum_{t=1}^{T}\gamma_t(i)1(v(t)=k)}{\sum_{t=1}^{T}\gamma_t(i)}$$

$$\pi_i = \gamma_i(0)$$

In [36]:
def M_step(adj,gamma,V,M):
  """
  The maximization step
  """
  A = sum(adj)/(sum([np.sum(a,axis=0).reshape(1,-1) for a in adj]))
  print(gamma)
  N = gamma[0].shape[0]
  # Updating b a bit complicated
  B = np.zeros((M,N))
  T = V.shape[0]
  for k in range(M):
    mask = 1*(V == k)
    S = 0
    for t in range(T):
      S += mask[t]*gamma[t]
    B[k,:] = S/sum(gamma)
  
  # Updating pi
  P = gamma[0]
  return A,B,P

# Training

In [37]:
# Data

(N,M) = (3,4)
A,B,P = init_hmm(N,M)
n_epochs = 2

T = 10
V = np.random.randint(0,M,size=(T,))

for n in range(n_epochs):
  print(f'Epoch: {n+1}\n')
  print(f'Ashape = {A.shape}')
  print(f'Bshape = {B.shape}')
  print(f'Pshape = {P.shape}')
  
  # Forward step & Backward Step
  _, alpha = forward_alg(V,A,B,P)
  beta = Backward_alg(V,A,B,P)


  # Expectation Step
  adj, gamma = E_step(V,A,B,P,alpha,beta)
  
  # Maximization
  A,B,P = M_step(adj,gamma,V,M)

  


Epoch: 1

Ashape = (3, 3)
Bshape = (4, 3)
Pshape = (3,)
(10, 3)
(10, 3)
0
0
0
1
1
1
2
2
2
3
3
3
4
4
4
5
5
5
6
6
6
7
7
7
8
8
8
9
9
9
[array([0.19982146, 2.48412761, 1.        ]), array([0.75008704, 1.89840304, 1.        ]), array([0.41571744, 1.47380234, 1.        ]), array([0.77970047, 1.11980968, 1.        ]), array([0.84710936, 2.72845308, 1.        ]), array([0.66747067, 4.03867055, 1.        ]), array([0.53200064, 1.54896558, 1.        ]), array([1.28389804, 0.99797166, 1.        ]), array([0.77328807, 0.84657273, 1.        ]), array([0.4951953 , 0.57951168, 1.        ])]
Epoch: 2

Ashape = (1, 3)
Bshape = (4, 3)
Pshape = (3,)


ValueError: ignored