In [3]:
import numpy as np 
import pandas as pd 

In [39]:
class ModelHMM:
    
    """ A class used to learn parameters of a Hidden Markov Model using Baum Welch algorithm """
    
    def __init__(self, A, B, data, freeze_B): 
        
        """ Instantiate a model instance with: 
        
        A: (num_hidden*num_hidden) array representing transition probabilities between hidden states  
        B: (num_hidden*num_obs) array representing emission probabilities conditioned on each hidden state 
        data: list of observations 
        freeze_B: list of Booleans of len=num_hidden indicating whether emission probabilities should be fixed 
            (e.g. [True, False, False] means to freeze only the first hidden state's)
                
        """
        
        # initialize HMM parameters 
        self.A = A 
        self.B = B
        self.num_hidden, self.num_obs = B.shape
        assert len(freeze_B) == self.num_hidden
        self.freeze_B = freeze_B
        
        # initialize data parameters 
        self.data = data 
        self.T = len(data)
        
        # initialize additional attributes used in learning the parameters of an HMM 
        self.forwards = np.zeros((self.num_hidden, self.T))
        self.backwards = np.zeros((self.num_hidden, self.T))
        self.coefs = np.zeros(self.T) # coefs used to rescale forward/backward probas for numerical stability 
        self.gammas = np.zeros((self.num_hidden, self.T))
        self.ixs = np.zeros((self.num_hidden, self.num_hidden, self.T-1))
        
    def forward_pass(self, pi):
        
        """ Implements the forward algorithm to compute forward probabilities 
        (note: to prevent numerical underflow probabilities are normalized with scaling factors 'coefs') """
        
        # define initial probabilities pi if not provided 
        if pi is None: 
            if not np.any(self.gammas):
                # use gammas at t=0 if estimates already exist 
                pi = self.gammas[:,0]
            else:
                # in case of cold start, assume all hidden states are equally probable 
                pi = np.full(shape=(self.num_hidden), fill_value=1./self.num_hidden)
        
        # compute first time step 
        self.forwards[:,0] = np.multiply(pi, self.B[:, self.data[0]-1]) # because 0-index 
        self.coefs[0] = 1 / self.forwards[:,0].sum()
        self.forwards[:,0] = self.coefs[0] * self.forwards[:,0]

        # iterate over rest of time steps 
        for t in range(1, self.T): 
            self.forwards[:,t] = np.multiply(self.A @ self.forwards[:,t-1], self.B[:, self.data[t]-1])
            self.coefs[t] = 1 / self.forwards[:,t].sum()
            self.forwards[:,t] = self.coefs[t] * self.forwards[:,t]
            
    def backward_pass(self): 
        
        """ Implements backward algorithm to compute backward probabilities 
        (note: uses 'coefs' computed in forward step to normalize probabilities to prevent underflow) """
        
        # compute last time step 
        self.backwards[:, -1] = 1
        self.backwards[:, -1] = self.coefs[-1] * self.backwards[:, -1]

        # iterate backward through rest of time steps 
        for t in reversed(range(self.T-1)):
            self.backwards[:,t] = np.multiply(self.A @ self.backwards[:,t+1], self.B[:, self.data[t+1]-1])
            self.backwards[:,t] = self.coefs[t] * self.backwards[:,t]
            
    def e_step(self, pi=None):
        
        """ Expectation step of the Baum-Welch EM algorithm. Uses forward and backward probabilities to compute: 
            - gammas: (num_hidden*T) array representing probabilities of being in a hidden state j at time t
            - ixs: (num_hidden*num_hidden*T-1) array representing probabilities of transitioning 
            from hidden state i at time t to hidden state j at time t+1 """
        
        self.forward_pass(pi)
        self.backward_pass()
        
        # compute ixs 
        for t in range(self.T-1):
            for i in range(self.num_hidden):
                for j in range(self.num_hidden): 
                    self.ixs[i,j,t] = self.forwards[i,t] * self.A[i,j] * \
                        self.B[j, self.data[t+1]-1] * self.backwards[j,t+1]
        
        # compute gammas 
        for t in range(self.T):
            self.gammas[:,t] = np.multiply(self.forwards[:,t], self.backwards[:,t])
    
    def m_step(self): 

        """ Maximization step of the Baum-Welch EM algorithm. Uses gammas and ixs to update 
        transition probabilities A and emission probabilities B. """

        # re-estimate A 
        for i in range(self.num_hidden):
            for j in range(self.num_hidden):
                self.A[i,j] = self.ixs[i,j,:].sum() / self.ixs[i,:,:].sum()        
                
        # re-estimate B 
        for j in range(self.num_hidden):
            if self.freeze_B[j]: # if one die's proba is fixed don't update it 
                pass 
            else:
                for k in range(self.num_obs):
                    obs_mask = np.array([datum == k+1 for datum in self.data])
                    self.B[j,k] = (self.gammas[j,:] * obs_mask).sum() / self.gammas[j,:].sum()
                
    def train(self, num_iter, initial_pi):
        
        """ Runs Baum-Welch algo for num_iter times to update model parameters, store parameters at each 
        time step to output for analysis """
        
        # initialize dictionaries to hold params 
        transition_probs, emission_probs = {}, {} 
        
        # run EM for num_iter iterations 
        for m in range(1, num_iter+1): 
            self.e_step(initial_pi) # fix initial_pi in this model because we know casino started w/ fair die
            self.m_step() 
            transition_probs[m] = self.A.copy() 
            emission_probs[m] = self.B.copy() 
            
        return transition_probs, emission_probs 

In [49]:
# load data 
with open("sequence.txt", "r") as f:
    data = eval(f.read())

In [45]:
# two dice
A = np.array([[.5, .5], 
              [.5, .5]])
B = np.array([[1./6, 1./6, 1./6, 1./6, 1./6, 1./6], 
              [1./6, 1./6, 1./6, 1./6, 1./6, 1./6]])
initial_pi = np.array([1., 0.])
freeze_B = [True, False]

model_2d = ModelHMM(A, B, data, freeze_B)
transition_probs_2d, emission_probs_2d = model_2d.train(num_iter=200, initial_pi=initial_pi)
print(model_2d.A)
print(model_2d.B)

[[6.03627642e-04 9.99396372e-01]
 [1.17489317e-03 9.98825107e-01]]
[[0.16666667 0.16666667 0.16666667 0.16666667 0.16666667 0.16666667]
 [0.26375721 0.14044373 0.13728886 0.13826394 0.13742637 0.18281989]]


In [48]:
# three dice - don't do this (need to break symmetry)
A = np.array([[.34, .33, .33], 
              [.33, .34, .33],
              [.33, .33, .34]])
B = np.array([[1./6, 1./6, 1./6, 1./6, 1./6, 1./6], 
              [2./7, 1./7, 1./7, 1./7, 1./7, 1./7],
              [1./7, 1./7, 1./7, 1./7, 1./7, 2./7]])
initial_pi = np.array([1., 0., 0.])
freeze_B = [True, False, False]

model_3d = ModelHMM(A, B, data, freeze_B)
transition_probs_3_dice, emission_probs_3d = model_3d.train(num_iter=200, initial_pi=initial_pi)
print(model_3d.A)
print(model_3d.B)

[[1.11682682e-03 1.13400838e-01 8.85482335e-01]
 [6.43397373e-04 6.86016827e-01 3.13339775e-01]
 [4.61215627e-03 2.25477124e-01 7.69910720e-01]]
[[0.16666667 0.16666667 0.16666667 0.16666667 0.16666667 0.16666667]
 [0.11264013 0.17308977 0.16759647 0.16054701 0.14047959 0.24564703]
 [0.32096227 0.12788196 0.12720243 0.13446415 0.15020126 0.13928793]]


In [47]:
# three dice - don't do this (need to break symmetry)
A = np.array([[.34, .33, .33], 
              [.33, .34, .33],
              [.33, .33, .34]])
B = np.array([[1./6, 1./6, 1./6, 1./6, 1./6, 1./6], 
              [1./6, 1./6, 1./6, 1./6, 1./6, 1./6],
              [1./6, 1./6, 1./6, 1./6, 1./6, 1./6]])
initial_pi = np.array([1., 0., 0.])
freeze_B = [True, False, False]

model_3d = ModelHMM(A, B, data, freeze_B)
transition_probs_3_dice, emission_probs_3d = model_3d.train(num_iter=200, initial_pi=initial_pi)
print(model_3d.A)
print(model_3d.B)

[[8.52477813e-05 4.99957376e-01 4.99957376e-01]
 [5.93080158e-04 5.07161721e-01 4.92245199e-01]
 [5.93080158e-04 4.92245199e-01 5.07161721e-01]]
[[0.16666667 0.16666667 0.16666667 0.16666667 0.16666667 0.16666667]
 [0.26445784 0.14119105 0.13793066 0.13915996 0.13820468 0.17905582]
 [0.26445784 0.14119105 0.13793066 0.13915996 0.13820468 0.17905582]]


In [50]:
# pd.Series([(k,v[0,1]) for (k,v) in transition_probs_2d.items()])

In [52]:
import random
fake_data1 = np.random.choice([1,1,1,1,1,2,3,4,5,6], size=2500)
fake_data2 = np.random.choice([1,2,3,4,5,6], size=2500)
fake_data = np.concatenate([fake_data1, fake_data2])
random.shuffle(fake_data)