
# CO495 ASML Coursework 2 - Hidden Markov Models

In this coursework, you are asked to implement filtering, smoothing, and optionally Viterbi decoding for discrete and continuous valued HMMs. Input data and initialization is provided and should be used for reproducibility.

In [47]:
import numpy as np
from collections import namedtuple

The functions below are here to guide you in your implementation of the EM and Viterbi algorithms for Hidden Markov Models. We follow Section 17.4 of _Machine Learning: A Probabilistic Perspective_ by Kevin Murphy (2012).

You should write vectorized modular code to promote re-usability, efficiency, and readability.

Your task is to complete the implementation and to report the results obtained from the provided initialization. You are strongly encouraged to explore different initialization schemes for the algorithms.

## Helper functions and classes

You may use this function in your implementation.

In [48]:
def normalize(A, dim=None, precision=1e-9):
    """This function is adapted from Kevin Murphy's code for Machine Learning: a Probabilistic Perspective.

    Make the entries of a (multidimensional) array sum to 1
    A, z = normalize(A) normalize the whole array, where z is the normalizing constant
    A, z = normalize(A, dim)
    If dim is specified, we normalize the specified dimension only.
    dim=0 means each column sums to one
    dim=1 means each row sums to one


    Set any zeros to one before dividing.
    This is valid, since s=0 iff all A(i)=0, so
    we will get 0/1=0

    Adapted from https://github.com/probml/pmtk3"""
    
    if dim is not None and dim > 1:
        raise ValueError("Normalize doesn't support more than two dimensions.")
    
    z = A.sum(dim)
    # If z is a scalar, z.shape is an empty tuple and evaluates to False
    if z.shape:
        z[np.abs(z) < precision] = 1
    elif np.abs(z) < precision:
        return 0, 1
    
    if dim == 1:
        return np.transpose(A.T / z), z
    else:
        return A / z, z

The initial values are provided as namedtuples (initialization.A is the initial value for A)

In [49]:
InitGaussian = namedtuple('InitGaussian', ['A', 'Means', 'Variances', 'pi'])
InitMultinomial = namedtuple('InitMultinomial', ['A', 'B', 'pi'])

## Filtering and Smoothing

Break down your implementation according to the functions below. Feel free to create additional ones whenever you see fit, but the general flow of the algorithm should be made apparent

### Observation model

The core of EM estimation on HMM operates on vectors of probabilities, so the main difference between EM for Gaussian HMM and multinomial HMM is the computation of the observation probabilities and which parameters to estimate.

Complete the two functions below to compute the probabilities of the data for a given observation model and use them in the rest of the algorithm. Your filtering and smoothing steps should be model agnostic.

In [50]:
from scipy.stats import norm
def computeSmallB_Gaussian(Y, Means, Variances, Nhidden, T, seq):
    """Compute the probabilities for the data points Y for a Gaussian observation model 
        with parameters Means and Variances.
        
        Input parameters:
            - Y: the data
            - Means: vector of the current estimates of the means
            - Variances: vector of the current estimates of the variances
            - Nhidden: number of hidden states
            - T: length of the sequence
        Output:
            - b: vector of observation probabilities
    """

    b = np.zeros((Nhidden,T))
    for i in range(Nhidden):
        for j in range(T):
            b[i,j] = norm.pdf(Y[seq,j],Means[i],Variances[i])
    return b

In [51]:
def computeSmallB_Discrete(Y, B, Nhidden, T, seq):
    """Compute the probabilities for the data points Y for a multinomial observation model 
        with observation matrix B
        
        Input parameters:
            - Y: the data
            - B: matrix of observation probabilities
        Output:
            - b: vector of observation probabilities
            
    """
    
    b = np.zeros((Nhidden,T))
    for i in range(Nhidden):
        for j in range(T):
            b[i,j] = B[i,Y[seq,j]-1]
            
    return b

### Smoothing and filtering: Estimation step

The E step involves smoothing and filtering. Refer to the course notes and/or to the recommended readings to implement these steps in the functions below.

In [52]:
def BackwardFiltering(A, b, Nhidden, T):
    """Perform backward filtering.
        Input parameters:
            - A: estimated transition matrix (between states)
            - b: estimated observation probabilities (local evidence vector)
            - N: number of hidden states
            - T: length of the sequence
        Output:
            - beta: filtered probabilities
    """
    beta = np.zeros((Nhidden,T))
    # Initialization of the base cases
    beta[:,-1] = np.ones(Nhidden)
    for j in reversed(range(0,T-1)):
        beta[:,j], Zb = normalize(np.dot(A, np.multiply(b[:,j+1],beta[:,j+1])))
       
    return beta

In [53]:
def ForwardFiltering(A, b, pi, Nhidden, T, j):
    """Filtering using the forward algorithm (Section 17.4.2 of K. Murphy's book)
    Input:
      - A: estimated transition matrix
      - b: estimated observation probabilities (local evidence vector)
      - pi: initial state distribution pi(j) = p(z_1 = j)
    Output:
      - Filtered belief state at time t: alpha = p(z_t|x_1:t)
      - log p(x_1:T)
      - Z: normalization constant"""
    Z = np.zeros(T)
    alpha = np.zeros((Nhidden,T))
    (alpha[:, 0], Z[0]) = normalize(np.multiply(b[:,0],pi.T)) 
    for t in range(1,T):
        alpha[:,t], Z[t] = normalize(np.multiply(b[:,t],np.dot(A.T,alpha[:,t-1])))

    logProb = np.sum(np.log(Z))


    return alpha, logProb, Z

In [54]:
def ForwardBackwardSmoothing(A, b, pi, Nhidden, T, j):
    """Smoothing using the forward-backward algorithm.
    Input:
      - A: estimated transition matrix
      - b: local evidence vector (observation probabilities)
      - pi: initial distribution of states
      - N: number of hidden states
      - T: length of the sequence
    Output:
      - alpha: filtered belief state as defined in ForwardFiltering
      - beta: conditional likelihood of future evidence as defined in BackwardFiltering
      - gamma: gamma_t(j) proportional to alpha_t(j) * beta_t(j)
      - lp: log probability defined in ForwardFiltering
      - Z: constant defined in ForwardFiltering"""
    
    alpha, logProb, Z = ForwardFiltering(A, b, pi, Nhidden, T, j)

    beta = BackwardFiltering(A, b, Nhidden, T)
    gamma = np.zeros((Nhidden,T))
    for t in range(0,T):
        gamma[:,t], Zg =  normalize(np.multiply(alpha[:,t],beta[:,t])) 
    
    return alpha, beta, gamma, logProb, Z

Use the output of SmoothedMarginals in the maximization step for A.

In [55]:
def SmoothedMarginals(A, b, alpha, beta, T, Nhidden):
    "Two-sliced smoothed marginals p(z_t = i, z_t+1 = j | x_1:T)"
    
    marginal = np.zeros((Nhidden, Nhidden, T-1));

    for t in range(T-1):
        marginal[:, :, t] = normalize(A * np.outer(alpha[:, t], np.transpose( (b[:, t+1] * beta[:, t+1]) ) ))[0];
    
    return marginal

## EM estimation

Implement the main algorithm in the skeletons below.
How can you measure the performance of your model and choose an appropriate convergence criterion?
_Hint: the logProb returned by the ForwardBackwardSmoothing function can be used_.

### Gaussian observation model

In [56]:
def EM_estimate_gaussian(Y, Nhidden, Niter, epsilon, init):
    
    print("Estimating Gaussian...")
    print("\n")
    # Dimensions of the data
    N, T = Y.shape
    
    # Initialization
    
    # Initial transition matrix should be stochastic (rows sum to 1)
    A = init.A
    
    # Initial means and variances of the emission probabilities
    Means = init.Means
    Variances = init.Variances;
    
    # Class prior
    pi = init.pi
    
    ###############################################
    # EM algorithm
    
    i = 0
    # Initialize convergence criterion here
    E_mean = np.zeros(2)
    E_var = np.zeros(2)
    E_ztk = np.zeros((N, Nhidden, T))
    E_zt1 = np.zeros((N, Nhidden))
    E_a = np.zeros((N, Nhidden, Nhidden, T-1))
    av_log = 10
    log_prev = epsilon 
    while i<Niter and abs(av_log - log_prev) > epsilon: # and condition on criterion and precision epsilon    
        s_log = 0
        for j in range(N):   # For each observation
            # E-step
            b = computeSmallB_Gaussian(Y, Means, Variances, Nhidden, T, j)
            alpha, beta, gamma, logProb, Z = ForwardBackwardSmoothing(A, b, pi, Nhidden, T, j)
            s_log += logProb
            E_ztk[j,:,:] = gamma 
            E_zt1[j] = gamma[:,1]
            E_a[j] = SmoothedMarginals(A, b, alpha, beta, T, Nhidden)

            
        # M-Step
        A = np.sum(E_a, (0,3))/np.sum(E_a, (0,1,3))
        pi = np.sum(E_zt1,0)/ np.sum(E_zt1)  

        m = np.zeros(Nhidden)
        v = np.zeros(Nhidden)
        
        for h in range(Nhidden):
            for n in range(N):
                for t in range(T):
                    m[h] += E_ztk[n,h,t] * Y[n,t]
                    
        for k in range(Nhidden):
            Means[k] = m[k] / np.sum(E_ztk[:,k,:])

        for h in range(Nhidden):
            for n in range(N):
                for t in range(T):
                    v[h] += E_ztk[n,h,t] * (Y[n,t] - Means[h]) * (Y[n,t] - Means[h])
        
        for ii in range(Nhidden):
            Variances[ii] = v[ii] / np.sum(E_ztk[:,ii,:])
              
        i = i+1
        log_prev = av_log
        av_log = s_log/N
    print("Number of iterations: ", i) 
    print("A:",A)
    print("pi:", pi)
    print("Means:", Means)
    print("Variances:", Variances)    
    return A, Means, Variances, pi

### Multinomial observation model

In the maximization step for B you will have to compute a quantity involving indicators on the values of Y. One efficient way to do it is to pre-compute a representation of Y using _one-hot encoding_. In MATLAB:

```% X sparse coding
Nv = length(unique(Y));
X = zeros(T, Nv);
for i=1:T
    X(i, Y(i)) = 1;
end
% Maximization: emission matrix
B1 = B1 + gamma * X;```

In [57]:
def EM_estimate_multinomial(Y, Nhidden, Niter, epsilon, init):
    
    # Dimensions of the data
    N, T = Y.shape
    # Initialization
    
    # Initial transition matrix should be stochastic (rows sum to 1)
    A = init.A
    
    # Observation matrix B
    B = init.B
    # Class prior
    pi = init.pi
    
    ###############################################
    # EM algorithm
    
    i = 0
    # Initialize convergence criterion here
    E_ztk = np.zeros((N, Nhidden, T))
    E_zt1 = np.zeros((N, Nhidden))
    E_a = np.zeros((N, Nhidden, Nhidden, T-1))
    av_log = 10
    log_prev = epsilon 
    while i<Niter and abs(av_log-log_prev) > epsilon: # and condition on criterion and precision epsilon
        Nv = len(np.unique(Y))
        B1 = np.zeros((Nhidden, Nv))
        s_log=0
        for j in range(N):   # For each observation
            
            # E-step
            b = computeSmallB_Discrete(Y, B, Nhidden, T, j)
            alpha, beta, gamma, logProb, Z = ForwardBackwardSmoothing(A, b, pi, Nhidden, T,j)
            s_log += logProb
            E_ztk[j,:,:] = gamma 
            X = np.zeros((T, Nv))
            for k in range(T):
                X[k, Y[j,k]-1] = 1
            B1 = B1 + np.dot(gamma,X)
            
            E_zt1[j] = gamma[:,1]
            E_a[j] = SmoothedMarginals(A, b, alpha, beta, T, Nhidden)
            
        # M-Step
        A = np.sum(E_a, (0,3))/np.sum(E_a, (0,1,3))
        pi = np.sum(E_zt1,0)/ np.sum(E_zt1)   
        for h in range(Nhidden):
            B[h] = B1[h]/np.sum(E_ztk[:,h,:])
       # print("New B is: ", B)
        i += 1
        log_prev = av_log
        av_log = s_log/N
    print("Number of iterations: ", i)    
    print("B is:", B)
    print("pi is: ", pi)
    print("A is:", A)
    return A, B, pi

## Viterbi decoding

Viterbi decoding should be performed on the smoothed data and most of the algorithm doesn't depend on the output model. To help you, we identified the steps that are model specific. Implement Viterbi decoding by completing the skeleton below. 'smallB' is a function and should be used in the standard way: smallB(x).

In [58]:
def ViterbiDecode(Y, Nhidden, outModel, init):
    
    if outModel == 'gauss':
        A, Mu, Sigma, Pi = EM_estimate_gaussian(Y, Nhidden, 100, 1e-6, init)
        smallB = lambda X : computeSmallB_Gaussian(X, Mu, Sigma, Nhidden, len(X))
    elif outModel == 'multinomial':
        A, B, Pi = EM_estimate_multinomial(Y, Nhidden, 100, 1e-6, init)
        smallB = lambda X : computeSmallB_Discrete(X, B)
    else:
        raise ValueError('Invalid observation model: must be either "gauss" or "multinomial"')
        
    # Implement Viterbi decoding here.
    
    return S

## Demo code

In [59]:
with np.load('init_gaussian.npz') as f:
    init_g = InitGaussian(f['arr_0'], f['arr_1'], f['arr_2'], f['arr_3'])

with np.load('init_multinomial.npz') as f:
    init_m = InitMultinomial(f['arr_0'], f['arr_1'], f['arr_2'])

with np.load('data_gaussian.npz') as f:
    Y_c, S_c = f['arr_0'], f['arr_1']


with np.load('data_multinomial.npz') as f:
    Y_d, S_d = f['arr_0'], f['arr_1']

    
A_g, Means_g, Variances_g, Pi_g = EM_estimate_gaussian(Y_c, 2, 100, 1e-6, init_g)
print("\n")
print("Estimating Multinomial")
print("\n")
A_m, B_m, Pi_m = EM_estimate_multinomial(Y_d, 2, 100, 1e-6, init_m)

# S_g = ViterbiDecode(Y_c, 2, 'gauss', init_g)
# S_m = ViterbiDecode(Y_d, 2, 'multinomial', init_m)

#print('*** Viterbi decoding accuracy (Gaussian): {}'.format( (S_c == S_g).sum() / S_c.size ))
#print('*** Viterbi decoding accuracy (Multinomial): {}'.format( (S_d == S_m).sum() / S_d.size ))

Estimating Gaussian...
Initialised Means:  [[0.93504614]
 [5.60985797]]
Initialised Variances:  [[0.54688152]
 [0.95750684]]
Number of iterations:  51
A: [[0.38832547 0.38608991]
 [0.61167453 0.61391009]]
pi: [0.33392947 0.66607053]
Means: [[0.06975463]
 [4.97302249]]
Variances: [[0.37220458]
 [0.91312145]]
Estimating Multinomial
B is:  [[0.25796198 0.32504732 0.04640567 0.03135054 0.17721777 0.16201672]
 [0.25904333 0.05332419 0.19334562 0.14589911 0.23912919 0.10925857]]
Number of iterations:  100
B is: [[0.10979842 0.30528693 0.03692295 0.02734931 0.0801274  0.44051499]
 [0.13567859 0.0709345  0.15867929 0.15413939 0.14505279 0.33551544]]
pi is:  [0.23097902 0.76902098]
A is: [[0.26247072 0.24292709]
 [0.73752928 0.75707291]]
