# DS-GA 3001.001 Special Topics in Data Science: Probabilistic Time Series Analysis

# Week 6 Hidden Markov Model

In [None]:
import numpy as np
import scipy.stats as stats
import matplotlib.pyplot as plt
import pandas as pd
import math
import random
import pylab
from collections import Counter
import time

##  Load Data

In [None]:
data_df = pd.read_csv("./sp500w.csv")
data = data_df.to_numpy()

In [None]:
# plot & visualize the data
print(data_df.head())
print("Shape of the data", data.shape)
plt.figure(figsize=(8,6))
plt.plot(data[:, 0], data[:, 1], '*b', label="S&P 500")
plt.xlabel("Week Index", fontsize=12)
plt.ylabel("Weekly Return", fontsize=12)
plt.legend(fontsize=12)
plt.grid()

In [None]:
def percentage_agree(x, z):
    """
    Function that shows the % of agreement among two list
    """
    assert len(x)==len(z)
    return float(np.sum(np.array(x)==np.array(z)))/len(x)

class MyHMM:
    def __init__(self, num_states, num_observations):
        """
        Constructor
        @param num_unique_states: # of unique states
        @param num_observations: # of unique observations
        """
        self.num_states = num_states
        self.num_observations = num_observations
        self.transition_matrix = np.zeros((num_states, num_states))
        self.initial_states_vector = np.zeros(num_states)
        self.mu = None
        self.sigma = None
        
    def initialize_em(self, A, pi, mu, sigma):
        print("Initializing EM... \n")
        
        #  initialize state transition matrix (KxK matrix)
        self.transition_matrix = A
        print("Initial transition matrix: \n", self.transition_matrix)
        
        # initialize the initial prob vector (K-dim vector)
        self.initial_states_vector = pi
        print("Initial states vector: ", self.initial_states_vector)
    
        # initialize the emission probabilities (assuming Gaussian distributions)
        self.mu = mu
        self.sigma = sigma
        print("Initial mu:", self.mu)
        print("Initial sigma", self.sigma)
        print("\n")
    
    def _one_normal_pdf(self, x, mu, sigma):
        # prob = stats.norm.pdf(x, mu, sigma)
        dx = 1e-4
        cdf_2 = stats.norm.cdf(x+dx, mu, np.sqrt(sigma))
        cdf_1 = stats.norm.cdf(x-dx, mu, np.sqrt(sigma))
        return cdf_2 - cdf_1
        
    def _compute_alpha(self, X):
        N = X.shape[0]
        alpha = np.zeros((self.num_states, N)) # (K, N)
        c = np.zeros(N)  # normalization constant
        
        for i in range(self.num_states):
            x_pdf_v = self._one_normal_pdf(X[0], self.mu[i], self.sigma[i])
            alpha[i, 0] = self.initial_states_vector[i] * x_pdf_v
        c[0] = np.sum(alpha[:, 0])
        alpha[:, 0] /= c[0]

        for t in range(1, N):
            for i in range(self.num_states):
                for j in range(self.num_states):
                    alpha[i, t] += alpha[j, t-1] * self.transition_matrix[j, i]
                alpha[i, t] *= self._one_normal_pdf(X[t], self.mu[i], self.sigma[i])
            c[t] = np.sum(alpha[:, t])
            alpha[:,t] /= c[t]
        
        return alpha, c
    
    def _compute_beta(self, X, c):
        N = c.shape[0]
        beta = np.zeros((self.num_states, N)) # (K, N)
        for k in range(self.num_states):
            beta[k, N-1] = 1.

        for t in range(N-2, -1, -1):
            for i in range(self.num_states):
                for j in range(self.num_states):
                    beta[i,t] += beta[j, t+1] * self.transition_matrix[i, j] * self._one_normal_pdf(X[t+1], self.mu[j], self.sigma[j])
            beta[:,t] /= c[t+1]
        return beta
    
    def fit(self, X, tot_it=2):
        """
        Method that fits the model.
        @param X: array-like with dimension [# of examples, # of length]
        """
        
        it = 0
        prev_logl = -1e6
        self.log_lh_it = []
        while True:
            it += 1
            print("\n")
            print("Starting iteration # ", it)
            #####################################################################################################
            
            # EXPECTATION STEP
            alpha, c = self._compute_alpha(X) # compute alpha
            beta = self._compute_beta(X, c) # compute beta
            gamma = alpha * beta
            
            N = X.shape[0]
            Xi = np.zeros((self.num_states, self.num_states, N))
            for t in range(1, N):
                Xi[:,:,t] = (1/c[t])*alpha[:,t-1][None].T.dot(beta[:,t][None])*self.transition_matrix
                for l in range(self.num_states):
                    Xi[:,l,t] *= self._one_normal_pdf(X[t], self.mu[l], self.sigma[l])
            
            # MAXIMIZATION STEP
            self.initial_states_vector = (gamma[:, 0] / np.sum(gamma[:, 0])).T  # update initial state
            temp = np.sum(Xi[:, :, 1:], axis=2)
            self.transition_matrix = temp / np.sum(temp, axis=1)[None].T # update transition matrix
            
            print("new initial conditions: ", self.initial_states_vector)
            print("new transition matrix: \n", self.transition_matrix)
            print("\n")

            gamma_sum = np.sum(gamma, axis=1)[None].T
            for k in range(self.num_states):  
                self.mu[k] = np.sum(gamma[k, :] * X) / gamma_sum[k]
                self.sigma[k] = np.sum(gamma[k, :] * X**2) / gamma_sum[k] - self.mu[k]**2
            
            print("new mu: ", self.mu)
            print("new sigma: ", self.sigma)
            
            #####################################################################################################
            
            train_logl = np.sum(np.log(c))
            self.log_lh_it.append(train_logl)
            # need some stopping condition
            if it >= tot_it:
                break
            print (f"Completed iteration # {it}: log-likelihood diff = {np.abs(train_logl - prev_logl)}")
            prev_logl = train_logl
        
    
    def decode_single_chain(self, X):
        """
        Auxiliary method that uses Viterbi on single chain
        @param X: array-like with dimension [ # of length]
        @return z: array-like with dimension [# of length]
        """
        # init holders
        z = []
        N = X.shape[0]
        V = np.zeros((N, self.num_states))
        best_states = np.zeros((N, self.num_states))
        
        #########################################
        # TODO: implement the Viterbi algorithm #
        #########################################
        
        for n in range(N):
            if n == 0:
                v_i_m1 = np.log(self.initial_states_vector)
            else:
                v_i_m1 = V[n-1, :]
                
            em_prob = np.zeros(self.num_states)
            for k in range(self.num_states):
                em_prob[k] = stats.norm.pdf(X[n], self.mu[k], np.sqrt(self.sigma[k]))
                
            v_i_all = v_i_m1[:, None] + np.log(self.transition_matrix) + np.log(em_prob)
            best_idx = np.argmax(v_i_all, axis=0)
                
            V[n, :] = np.amax(v_i_all, axis=0)
            best_states[n, :] = best_idx
        
        for n in range(N, 0, -1):
            if n == N:
                j = int(np.argmax(V[n-1, :]))
                z.append(j)
            else:
                j = int(best_states[n, j])
                z.append(j)

        return list(reversed(z))
        
    def decode(self, X):
        """
        Method that performs the Viterbi the model.
        @param X: array-like with dimension [# of examples, # of length]
        @return z: array-like with dimension [# of examples, # of length]
        """
        return [self.decode_single_chain(sample) for sample in X]

### My HMM: Latent Space dimension = 3

In [None]:
num_states = 3
num_obs = 1
total_iter = 1000
A = np.array([[.1, .7, .2],[.3, .2, .5],[.4, .4, .2]])
PI = np.array([.2, .3, .5])
mu = np.zeros(num_states)
sigma = np.array([.1, .2, .3])
my_hmm = MyHMM(num_states, num_obs)
my_hmm.initialize_em(A, PI, mu, sigma)
my_hmm.fit(data[:, 1], tot_it=total_iter)

In [None]:
# print the log-likelihood during training
plt.figure(figsize=(8, 6))
log_l = my_hmm.log_lh_it
plt.plot(range(1,len(log_l)+1), log_l, "-b")
plt.xlabel("Iteration #", fontsize=12)
plt.ylabel("Log-likelihood", fontsize=12)
plt.grid()
plt.title("EM with Latent space dimension = "+str(my_hmm.num_states), fontsize=12)
print(f"Final Log-likelihood value after {total_iter} iterations and latent dim {my_hmm.num_states} is: {log_l[-1]}")

#### Use Viterbi to decode the data, k = 3

In [None]:
def plot_labels(hmm_model, data_):
    # plot labels of latent variables we found by viterbi
    data = data_[:, 1]
    N = data.shape[0]
    z_seq = np.array(hmm_model.decode_single_chain(data))+1
    plt.figure(figsize=(12,8))
    plt.plot(np.arange(N), data, 'k-.', linewidth = 0.5)
    plt.grid()
    colors = ['k', 'r', 'b']
    plt.title("Latent space dimension = "+str(my_hmm.num_states), fontsize=12)
    for i in range(N):
        plt.text(i, data[i], z_seq[i], c=colors[z_seq[i]-1])

In [None]:
plot_labels(my_hmm, data)

In [None]:
print(f"Final results with k = {num_states}: \n")
print("Transition matrix: \n", my_hmm.transition_matrix)
print("Initial state vector: ", my_hmm.initial_states_vector)
print("MU: ", my_hmm.mu)
print("Sigma: ", np.sqrt(my_hmm.sigma))

As we can see from the picture above and the final parameters after 1000 iterations of EM for the case k=3, the results are very similar to the ones reported in the book. In addition, we can see that our EM implementation converges to very similar values for the HMM's parameters (A,PI,mu,sigma). Finally, the maximum log-likelihood value is -3098.

### My HMM: Latent Space dimension = 2

In [None]:
num_states = 2
num_obs = 1
total_iter = 1000
A = np.array([[.4, .6], [.2, .8]])
PI = np.array([.4, .6])
mu = np.zeros(num_states)
sigma = np.array([.1, .2])
my_hmm_2 = MyHMM(num_states, num_obs)
my_hmm_2.initialize_em(A, PI, mu, sigma)
my_hmm_2.fit(data[:, 1], tot_it=total_iter)

In [None]:
# print the log-likelihood during training
plt.figure(figsize=(8, 6))
log_l = my_hmm_2.log_lh_it
plt.plot(range(1,len(log_l)+1), log_l, "-b")
plt.xlabel("Iteration #", fontsize=12)
plt.ylabel("Log-likelihood", fontsize=12)
plt.grid()
plt.title("EM with Latent space dimension = "+str(my_hmm_2.num_states), fontsize=12)
print(f"Final Log-likelihood value after {total_iter} iterations and latent dim {my_hmm_2.num_states} is: {log_l[-1]}")

#### Use Viterbi to decode the data, k = 2

In [None]:
plot_labels(my_hmm_2, data)

In [None]:
print(f"Final results with k = {num_states}: \n")
print("Transition matrix: \n", my_hmm_2.transition_matrix)
print("Initial state vector: ", my_hmm_2.initial_states_vector)
print("MU: ", my_hmm_2.mu)
print("Sigma: ", np.sqrt(my_hmm_2.sigma))

From results with latent space dimension k=2 after running 1000 iterations of EM, we reach the maximum log-likelihood values of -3106. In the case of k=3, the maximum log-likelihood values is -3098, which is greater than the value for k=2. Hence, we can conlcude that the HMM with latent dimension k=3 performs better than the case k=2 (although the the gap among the two models in terms of log-likelihood is not that high)