In [22]:
# OS Libaries
import numpy as np

In [23]:
# Hyperparameters
state_idx = {'A':0, 'B':1, 'C':2}
states = 4
iterations = 50

In [24]:
# Generate data
X = 'AABBBACABBBACAAAAAAAAABBBACAAAAABACAAAAAABBBBACAAAAAAAAAAAABACABACAABBACAAABBBBACAAABACAAAABACAABACAAABBACAAAABBBBACABBACAAAAAABACABACAAABACAABBBACAAAABACABBACA'

In [25]:
class HMM:
    """
    
    alpha (Joint probability): represents  the joint probability of a partial observation sequence  up to time t and the value of z for a given state
    --> The forward variable
    --> alpha_t(i) = P(O_1 0_2 ... 0_T, s_t=i | theta) 
    
    beta  (conditional prob) : the probability of all future data from time t+1 until to T given the value of the state_t = i
    --> The backward variable
    --> B_t(i) = P(O_t+1 O_t+2 ... O_T | s_t=i, theta)
    
    gamma (conditional prob) : probability of being in state j at time t, given the oberservation sequence and the model
    --> y_t(i) = P(s_t=i | x_1:T, theta)
    
    """
    def __init__(self, num_latent_states, observations):
        self.num_latent_states      = num_latent_states # N Variable
        self.T                      = len(observations)
        self.num_observation_states = len(set(observations))
        
        self.transition_prob      = np.zeros([self.num_latent_states, self.num_latent_states]) # An N X N matrix
        self.emission_prob        = np.zeros([self.num_latent_states, self.num_observation_states])
        self.initial_distribution = np.zeros(self.num_latent_states)
        
        self.alpha = np.zeros([self.T, self.num_latent_states]) # alpha_t(i) = P(x_1:t, s_t=i)
        self.beta  = np.zeros([self.T, self.num_latent_states]) # beta_t(i) = P(x_t+1:T | s_t=i)
        self.gamma = np.zeros([self.T, self.num_latent_states]) # gamma_t(i) = P(s_t=i | x_1:T)
        self.xi    = np.zeros([self.T, self.num_latent_states, self.num_latent_states]) # xi_t(i,j) = P(s_t=i, s_t+1=j | x_1:T)
    
    # Initialise model before training it    
    def random_init(self, seed=False):
        if seed:
            np.random.seed(seed)
        
        # initialise transition prob - Creates a unique transition value per cell Mij
        for i in range(self.num_latent_states):
            self.transition_prob[i,:] = np.random.uniform(low=0, high=1, size=self.num_latent_states)
            self.transition_prob[i,:] /= np.sum(self.transition_prob[i,:]) # Ensure the row integrates to 1
        
        # initialise emission_prob - Given state, prob of observation?
        for i in range(self.num_latent_states):
            self.emission_prob[i,:] = np.random.uniform(low=0, high=1, size=self.num_observation_states)
            self.emission_prob[i,:] /= np.sum(self.emission_prob[i,:])

        # Randomly init the latent states - Equivilent to setting ℼ
        self.initial_distribution = np.random.uniform(low=0, high=1, size=self.num_latent_states)
        self.initial_distribution /= np.sum(self.initial_distribution)
    
    def Baum_Welch_E_step(self):
        # computes alpha, beta, gamma, xi
        # forward-backward algorithm
        
        # forward pass
        
        # For the first observation, initialise alpha for each state
        for i in range(self.num_latent_states):
            # alplha_1(state_i) = ℼ_i * O(i) - Latent * Oberservation given state
            self.alpha[0, i] = self.initial_distribution[i] * self.emission_prob[i, state_idx[X[0]]]
        
        # Induction
        for t in range(1, self.T): # For every observation
            for i in range(self.num_latent_states): # For every state
                self.alpha[t, i] = np.sum([self.alpha[t-1, j] * self.transition_prob[j,i] for j in range(self.num_latent_states)]) * self.emission_prob[i, state_idx[X[t]]]
        
        # backward pass
        self.beta[self.T-1, :] = np.ones((1,self.num_latent_states))
        for t in range(self.T-2, -1, -1):
            for i in range(self.num_latent_states):
                self.beta[t, i] = np.sum([self.emission_prob[j, state_idx[X[t+1]]] * self.transition_prob[i, j] * self.beta[t+1, j]for j in range(self.num_latent_states)])
            
        # marginal
        for t in range(self.T):
            for j in range(self.num_latent_states):
                self.gamma[t,j] = self.alpha[t,j] * self.beta[t,j] / np.sum([self.alpha[t,k]*self.beta[t,k] for k in range(self.num_latent_states)])
        
        # xi
        for t in range(self.T-1):
            for i in range(self.num_latent_states):
                for j in range(self.num_latent_states):
                    self.xi[t,i,j] = self.alpha[t,i] * self.transition_prob[i,j] * self.emission_prob[j, state_idx[X[t+1]]]*self.beta[t+1, j]
                    self.xi[t,i,j] /= np.sum([np.sum([self.alpha[t,i] * self.transition_prob[i,j] *self.emission_prob[j, state_idx[X[t+1]]]*self.beta[t+1,j] for j in range(self.num_latent_states)]) for i in range(self.num_latent_states)])
                
    def Baum_Welch_M_step(self):
        # computes pi, transition prob, emission prob
        indicator = lambda x, y: 1 if x == y else 0
        for i in range(self.num_latent_states):
            self.initial_distribution[i] = self.gamma[0,i]
            for j in range(self.num_latent_states):
                self.transition_prob[i,j] = np.sum([self.xi[t, i, j] for t in range(self.T)])/np.sum([self.gamma[t,i] for t in range(self.T)])
            for k in range(self.num_observation_states):
                self.emission_prob[i,k] = np.sum([self.gamma[t,i] * indicator(k, state_idx[X[t]]) for t in range(self.T)]) / np.sum([self.gamma[t,i] for t in range(self.T)])
        
    def show_params(self):
        print(f'    Initial distribution: \n    {self.initial_distribution}')
        print(f'    Transition probabilities: \n    {self.transition_prob}')
        print(f'    Emission probabilities: \n    {self.emission_prob}')
        
    def train(self, num_iter):
        print('Initial parameters:')
        self.show_params()
        for i in range(num_iter):
            self.Baum_Welch_E_step()
            self.Baum_Welch_M_step()
            if (i+1) % 10 == 0:
                print('\n\n')
                print(f'#### Iteration {i+1} complete ####')
                self.show_params()

In [26]:
model = HMM(num_latent_states=states, observations=X)
model.random_init()
print(model.transition_prob)
# model.train(num_iter=iterations)

[[0.15000985 0.06213498 0.26743688 0.52041829]
 [0.3633888  0.06438312 0.27046854 0.30175954]
 [0.3367076  0.32793008 0.09102021 0.24434211]
 [0.40459438 0.32392498 0.10839675 0.16308389]]
