In [1]:
import numpy as np

In [2]:
class HMM:
    def __init__(
            self,
            states,
            observations,
            initial_probability,
            transition_probability,
            emission_probability
    ):
        self.state = states
        self.obs = observations
        self.initial_prob = initial_probability
        self.trans_prob = transition_probability
        self.emit_prob = emission_probability
        self.m = len(self.state)
        self.T = len(self.obs)

    def forward_algorithm(self):
        alpha = np.zeros((self.m, self.T))

        # Initialize base case (t == 0)
        alpha[:, 0] = self.initial_prob * self.emit_prob[:, self.obs[0]]

        # Recursion step
        for t in range(1, self.T):
            alpha[:, t] = (alpha[:, t-1] @ self.trans_prob) * self.emit_prob[:, self.obs[t]]

        return alpha
    
    def backward_algorithm(self):
        beta = np.zeros((self.m, self.T))

        # Initialize base case (t == T-1)
        beta[:, self.T-1] = 1

        # Recursion step
        for t in range(self.T-2, -1, -1):
            beta[:, t] = self.trans_prob @ (self.emit_prob[:, self.obs[t + 1]] * beta[:, t + 1])

        return beta
    
    def baum_welch_algorithm(self, max_iter=100, dev=1e-6):

        for iteration in range(max_iter):

            # E-step: Calculate forward and backward probabilities
            alpha = self.forward_algorithm()
            beta = self.backward_algorithm()

            # Calculate xi (pairwise probabilities)
            denom = (alpha[:, :-1] * (self.trans_prob @ (self.emit_prob[:, self.obs[1:]] * beta[:, 1:]))).sum(axis=0)
            neom = (
                alpha[:, :-1].T[:, :, None] * 
                self.trans_prob * 
                (self.emit_prob[:, self.obs[1:]] * beta[:, 1:]).T[None, :, :]
            )
            xi = neom / denom 
                
            # Calculate gamma (marginal probabilities)
            gamma = alpha * beta
            gamma /= gamma.sum(axis=0)

            # M-step: Update parameters
            # Update start probabilities
            self.initial_prob = gamma[:, 0]

            # Update transition probabilities
            self.trans_prob = xi.sum(axis=0) / gamma[:, :-1].sum(axis=1, keepdims=True)

            # Convergence check (log-likelihood)
            for k in range(self.emit_prob.shape[1]):
                self.emit_prob[:, k] = gamma[:, self.obs == k].sum(axis=1) / gamma.sum(axis=1)

            # Convergence check (log-likelihood)
            new_log_likelihood = np.log(alpha[:, -1].sum())
            if iteration > 0 and abs(new_log_likelihood - old_log_likelihood) < dev:
                break

            old_log_likelihood = new_log_likelihood

        return self.initial_prob, self.trans_prob, self.emit_prob

In [3]:
# Example observation sequence (coded as integers)
obs = np.array([0, 1, 0])

# Hidden states
state = [0, 1]  # e.g., Sunny, Rainy

# Initial probabilities
initial_prob = np.array([0.6, 0.4])

# Transition probabilities
trans_prob = np.array([
    [0.7, 0.3],
    [0.4, 0.6]
])

# Emission probabilities
emit_prob = np.array([
    [0.5, 0.5],  # State 0 emits 0 and 1 with equal probability
    [0.1, 0.9]   # State 1 emits 0 with low probability and 1 with high probability
])

hmm = HMM(state, obs, initial_prob, trans_prob, emit_prob)
updated_start_prob, updated_trans_prob, updated_emit_prob = hmm.baum_welch_algorithm(max_iter=100, dev=1e-6)

print(f'Updated Start Probabilities: {updated_start_prob}')
print(f'Updated Transition Probabilities:\n {updated_trans_prob}')
print(f'Updated Emission Probabilities:\n {updated_emit_prob}')

Updated Start Probabilities: [1.00000000e+00 6.77015529e-72]
Updated Transition Probabilities:
 [[1.54679987e-131 1.00000000e+000]
 [1.00000000e+000 1.39883670e-036]]
Updated Emission Probabilities:
 [[1.00000000e+000 8.67024863e-129]
 [1.39883670e-036 1.00000000e+000]]
