# first attempt at implementing the baum welch algorithm for learning a HMM

- In the 'make_data' notebook we made up some data from an unfair casino
- Now let's try to write the algorithm to estimate the transition matrix and the emission matrix from the data

In [1]:
import numpy as np

In [4]:
x = np.ones((3,2))
cs = x.sum(axis=1)
cst = np.tile(cs, (x.shape[1], 1)).T
x /= cst
x

array([[ 0.5,  0.5],
       [ 0.5,  0.5],
       [ 0.5,  0.5]])

In [None]:

def fowards_pass_algorithm(transition_matrix, emission_matrix, observations, init_state_distribution):
    """
    transition_matrix: n_state x n_state matrix
    emission_matrix: n_state x n_discrete_observations (unique types of observations, eg sides of a dice, H/T of a coin)
    observations: 1d vector of length n_samples
    """
    n_samples = len(observations)
    # keep a list of the scaling factors, this way we don't run into underflow errors
    scaling_factors = np.zeros(n_samples)
    
    n_states = transition_matrix.shape[0]
    
    forward_pass_probabilities = np.zeros((n_samples, n_states))
    
    # compute the initial forward probabilities
    forward_pass_probabilities[0, :] = init_state_distribution * emission_matrix[:, observations[0]]
    scaling_factors[0] = forward_pass_probabilities[0, :].sum()
    
    forward_pass_probabilities[0, :] /= scaling_factors[0]
    
    # now we compute the probabilities of stuff going forward, using the probabilites from the previous estimate
    for current_t in range(1, n_samples):
        # first we want to compute the probability of transitioning over all the previous states and now
        # t_steps-1 x n_state * n_state x n_state
        trans_prob = forward_pass_probabilities[0:current_t-1, :].dot(transition_matrix) 
        
        # compute the sum over the rows of the previous matrix multiplication and then multiply by the 
        # probabilities associated with the emission matrix
        current_state_estimate = trans_prob * emission_matrix[:, observations[current_t]]
        forward_pass_probabilities[current_t, :] = current_state_estimate
        
        # compute the scaling factor and normalize our running forward probabilities
        scaling_factors[current_t] = forward_pass_probabilities[current_t, :].sum()
        forward_pass_probabilities[current_t, :] /= scaling_factors[current_t]
        
    # we be done
    return forward_pass_probabilities, scaling_factors
 
    
def backwards_pass_algorithm(transition_matrix, emission_matrix, observations, scaling_factors):
    n_samples = len(observations)
    
    n_states = transition_matrix.shape[0]
    
    backward_pass_probabilities = np.zeros((n_samples, n_states))
    
    # initialize the last row of the backwards pass probabilities 
    backward_pass_probabilities[-1, :] = 1.0
    
    for current_t in range(n_samples-1, -1, -1):
        trans_prob = transition_matrix.dot(emission_matrix[:, observations[current_t]])
        current_state_estimate = backward_pass_probabilities[current_t+1, :] * trans_prob
        backward_pass_probabilities[current_t, :] = current_state_estimate
        backward_pass_probabilities[current_t] /= scaling_factors[current_t]
        
    return backward_pass_probabilities
    
    
    

def baum_welch(labeled_states, 
               observations, 
               init_state_distribution=None,
               init_transition_matrix=None, 
               init_emission_matrix=None):
    """
    Returns a 'learned' transition and emission matrix from the supplied data
    """
    
    if init_transition_matrix is None:
        n_states = len(set(labeled_states))
        init_transition_matrix = np.ones(shape=(n_states, n_states))
        col_sums = init_transition_matrix.sum(axis=1)
        col_sums = np.tile(col_sums, (n_states, 1)).T
        init_transition_matrix /= col_sums
        
    if init_emission_matrix is None:
        n_states = len(set(labeled_states))
        n_obs = len(set(observations))
        init_emission_matrix = np.ones((n_states, n_obs))
        col_sums = init_emission_matrix.sum(axis=1)
        col_sums = np.tile(col_sums, (init_transition_matrix.shape[1], 1)).T
        init_emission_matrix /= col_sums
        
    if init_state_distribution is None:
        n_states = len(set(labeled_states))
        init_state_distribution = np.ones(shape=(n_states,))
        init_state_distribution /= init_state_distribution.sum()
    
    
    # now we need to do the rest of this
    trans_mat = init_transition_matrix
    emit_mat = init_emission_matrix
    start_state = init_state_distribution
    while(True):
        forward_probs, scaling = forwards_pass_algorithm(trans_mat, emit_mat, observations, start_state)
        backward_prob = backwards_pass_algorithm(trans_mat, emit_mat, observations, scaling)
        
        #
    
    
    
    
    return None

In [None]:
		xi = np.zeros((nStates,nStates,nSamples-1));
		for t in range(nSamples-1):
			denom = np.dot(np.dot(alpha[:,t].T, self.A) * self.B[:,observations[t+1]].T,
						beta[:,t+1])
			for i in range(nStates):
				numer = alpha[i,t] * self.A[i,:] * self.B[:,observations[t+1]].T * \
						beta[:,t+1].T
				xi[i,:,t] = numer / denom

		# gamma_t(i) = P(q_t = S_i | O, hmm)
		gamma = np.squeeze(np.sum(xi,axis=1))
		# Need final gamma element for new B
		prod =  (alpha[:,nSamples-1] * beta[:,nSamples-1]).reshape((-1,1))
		gamma = np.hstack((gamma,  prod / np.sum(prod))) #append one more to gamma!!!

		newpi = gamma[:,0]
		newA = np.sum(xi,2) / np.sum(gamma[:,:-1],axis=1).reshape((-1,1))
		newB = copy(B)

		numLevels = self.B.shape[1]
		sumgamma = np.sum(gamma,axis=1)
		for lev in range(numLevels):
			mask = observations == lev
			newB[:,lev] = np.sum(gamma[:,mask],axis=1) / sumgamma

		if np.max(abs(pi - newpi)) < criterion and \
				np.max(abs(A - newA)) < criterion and \
				np.max(abs(B - newB)) < criterion:
			done = 1;

		A[:],B[:],pi[:] = newA,newB,newpi