# HMM Tutorial
### Wed Jan 25 | Tianjian Li | tli104@jhu.edu

This notebook contains three things:

a) An step by step implementation and explaination of the **Foward-Backward Algorithm** in Python

b) An step by step implementation of the **Baum-Welch Algorithm** in Python


## The Forward-Backward Algorithm

Hidden Random Variables $Z = \{z_1, z_2, ..., z_n\}, z_i \in \{1,2,...,m\}$
 
Observed Variables $X = {x_1, ..., x_n}$

The foward-backward algorithm computes the probability of $p(z_k|x)$.

The foward algorithm computes $p(z_k ,x_{1:k})$

The backward algorithm computes $ p(x_{k+1:n} | z_k)$

$p(z_k | x) \propto p(z_k, x) = p(x_{k+1:n}|z_k, x_{1:k}) \cdot p(z_k, x_{1:k})$


In [None]:
import numpy as np

def forward(transition, emission, initial, X):

  """
  The forward algorithm
  
  Parameters:
  emission: the emission matrix defining p(x_t|z_t) = emission[z_t, x_t]
  transition: the transition matrix defining p(z_t|z_{t-1}) = transition[z_{t-1}, z_t]
  initial: the initial probability defining p(z_1)
  X: the observations x_1, .... , x_T

  Returns:
  A matrix alpha such that
    alpha[z_t, t] = p(z_t, x_{1:t})

  """

  # m is the number of states
  m = transition.shape[0]
  # n is the sequence length
  T = len(X)

  alpha = np.zeros((m, T))
  alpha_debug = np.zeros((m, T))
  
  #initialization, the more efficient version
  alpha[:,0] = initial * emission[:, X[0]]

  #and the more understandable version
  for z_0 in range(m):
    alpha_debug[z_0, 0] = initial[z_0] * emission[z_0, X[0]]
  
  for t in range(1, T):
    for z_t in range(m):
      # For efficient computation, use
      alpha[z_t, t] = np.dot(alpha[:, t-1], transition[:, z_t]) * emission[z_t, X[t]]
      
      # The more understandable version if we set z_t_prev = z_{t-1}
      for z_t_prev in range(m):
        alpha_debug[z_t, t] += alpha_debug[z_t_prev, t-1] * transition[z_t_prev, z_t] * emission[z_t, X[t]]
  
  assert alpha.all() == alpha_debug.all(), "mismatch in alpha"
  return alpha

In [None]:
# This cell is a sanity check 

emis = np.array([[0.1, 0.9],
                [0.4, 0.6]])

trans = np.array([[0.7, 0.3],
                  [0.4, 0.6]])

init = np.array([0.5, 0.5])

obs = np.array([0, 1, 1, 0, 1])

_ = forward(trans, emis, init, obs)

In [None]:
def backward(transition, emission, X):

  """
  The backward algorithm
  
  Parameters:
  emission: the emission matrix defining p(x_t|z_t) = emission[z_t, x_t]
  transition: the transition matrix defining p(z_t|z_{t-1}) = transition[z_{t-1}, z_t]
  X: the observations x_1, .... , x_T

  Returns:
  A matrix beta such that
    beta[z_t, t] = p(x_{t:n} | z_t)
  """
  # m is the number of states
  m = transition.shape[0]
  # n is the sequence length
  T = len(X)

  beta = np.zeros((m, T))
  beta_debug = np.zeros((m, T))
  
  #initialization, the more efficient version
  beta[:,T-1] = 1
  beta_debug[:, T-1] = 1

  for t in range(T-2, -1, -1):
    for z_t in range(m):
      # Efficient Computation
      beta[z_t, t] = np.dot(beta[:, t+1] * emission[:, X[t+1]], transition[z_t, :])

      # More understandably, we denote z_{t+1} as z_t_next
      for z_t_next in range(m):
        beta_debug[z_t, t] += beta_debug[z_t_next, t+1] * emission[z_t_next, X[t+1]] * transition[z_t, z_t_next]
        
  assert beta.all() == beta_debug.all()
  return beta

Again we check if our implementation is correct

In [None]:
emis = np.array([[0.1, 0.9],
                [0.4, 0.6]])

trans = np.array([[0.7, 0.3],
                  [0.4, 0.6]])

init = np.array([0.5, 0.5])

obs = np.array([0, 1, 1, 0, 1])

_ = backward(trans, emis, obs)

Remember that 

$p(z_k | x) \propto p(z_k, x) = p(x_{k+1:n}|z_k, x_{1:k}) \cdot p(z_k, x_{1:k})$

We combine our forward algorithm and backward algorithm and normalize to get the final probabilty vector of $p(z_k, x)$

In [None]:
def forward_backward(transition, emission, initial, X):
  alpha = forward(transition, emission, initial, X)
  beta = backward(transition, emission, X)
  
  probs = alpha * beta 
  # This is a matrix of m * T, inidicating the unmarginalized probability of p(z_k, X)
  probs /= np.sum(probs, axis=0)
  return alpha, beta, probs

In [None]:
emis = np.array([[0.1, 0.9],
                [0.4, 0.6]])

trans = np.array([[0.7, 0.3],
                  [0.4, 0.6]])

init = np.array([0.5, 0.5])

obs = np.array([0, 1, 1, 0, 1])

_, _, p = forward_backward(trans, emis, init, obs)
p

array([[0.22015293, 0.56429141, 0.5793029 , 0.29596097, 0.58221138],
       [0.77984707, 0.43570859, 0.4206971 , 0.70403903, 0.41778862]])

## The Baum-Welch Algorithm

The Baum-Welch Algorithm is an example of the **Expectation-Maximization algorithm** for finding the paramteres of an Hidden Markov Model. The following function is a single iteration of the Baum-Welch Algorithm.

In [None]:
def baum_welch(observations, states, start_prob, trans_prob, emit_prob, forward_backward):
    
    """
    Implements one pass of the Baum-Welch Algorithm for estimating the parameters of a Hidden Markov Model.
    """
    
    # Initialize variables
    T = len(observations)
    m = len(states)
    A = np.zeros((m, m))
    B = np.zeros((m, T))
    alpha, beta, prob = forward_backward(observations, states, start_prob, trans_prob, emit_prob)
    # Compute the expected state transition counts
    for t in range(T - 1):
        for i in range(m):
            for j in range(m):
                A[i][j] += alpha[t][i] * trans_prob[i][j] * emit_prob[j][observations[t + 1]] * beta[t + 1][j] / prob
    
    # Compute the expected observation symbol counts
    for t in range(T):
        for i in range(m):
            B[i][observations[t]] += alpha[t][i] * beta[t][i] / prob
    
    # Normalize the expected state transition and observation symbol counts
    for i in range(m):
        A[i] /= np.sum(A[i])
        B[i] /= np.sum(B[i])
    
    # Return the estimated parameters
    return A, B