<a href="https://colab.research.google.com/github/newmantic/HMM/blob/main/HMM.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
import numpy as np

class HiddenMarkovModel:
    def __init__(self, A, B, pi):
        self.A = np.array(A)  # Transition probability matrix
        self.B = np.array(B)  # Emission probability matrix
        self.pi = np.array(pi)  # Initial state distribution
        self.N = self.A.shape[0]  # Number of states
        self.M = self.B.shape[1]  # Number of observation symbols

    def forward(self, observations):
        T = len(observations)
        alpha = np.zeros((T, self.N))

        # Initialize base cases (t == 0)
        alpha[0, :] = self.pi * self.B[:, observations[0]]

        # Recursive case
        for t in range(1, T):
            for j in range(self.N):
                alpha[t, j] = np.sum(alpha[t-1, :] * self.A[:, j]) * self.B[j, observations[t]]

        return alpha

    def backward(self, observations):
        T = len(observations)
        beta = np.zeros((T, self.N))

        # Initialize base cases (t == T-1)
        beta[T-1, :] = 1

        # Recursive case
        for t in range(T-2, -1, -1):
            for i in range(self.N):
                beta[t, i] = np.sum(self.A[i, :] * self.B[:, observations[t+1]] * beta[t+1, :])

        return beta

    def baum_welch(self, observations, max_iters=100):
        T = len(observations)

        for n in range(max_iters):
            # Expectation step
            alpha = self.forward(observations)
            beta = self.backward(observations)

            xi = np.zeros((self.N, self.N, T-1))
            for t in range(T-1):
                denominator = np.sum(alpha[t, :] * np.sum(self.A * self.B[:, observations[t+1]] * beta[t+1, :], axis=1))
                for i in range(self.N):
                    numerator = alpha[t, i] * self.A[i, :] * self.B[:, observations[t+1]] * beta[t+1, :]
                    xi[i, :, t] = numerator / denominator

            gamma = np.sum(xi, axis=1)

            # Maximization step
            self.pi = gamma[:, 0]
            self.A = np.sum(xi, axis=2) / np.sum(gamma, axis=1, keepdims=True)

            gamma = np.hstack((gamma, np.sum(xi[:, :, T-2], axis=0, keepdims=True).T))

            for k in range(self.M):
                mask = (observations == k)
                self.B[:, k] = np.sum(gamma[:, mask], axis=1) / np.sum(gamma, axis=1)

    def viterbi(self, observations):
        T = len(observations)
        delta = np.zeros((T, self.N))
        psi = np.zeros((T, self.N), dtype=int)
        states = np.zeros(T, dtype=int)

        # Initialize base cases (t == 0)
        delta[0, :] = self.pi * self.B[:, observations[0]]

        # Recursive case
        for t in range(1, T):
            for j in range(self.N):
                delta[t, j] = np.max(delta[t-1, :] * self.A[:, j]) * self.B[j, observations[t]]
                psi[t, j] = np.argmax(delta[t-1, :] * self.A[:, j])

        # Backtracking
        states[T-1] = np.argmax(delta[T-1, :])
        for t in range(T-2, -1, -1):
            states[t] = psi[t+1, states[t+1]]

        return states, np.max(delta[T-1, :])

    def generate_sequence(self, length):
        states = np.zeros(length, dtype=int)
        observations = np.zeros(length, dtype=int)

        # Initialize first state
        states[0] = np.random.choice(self.N, p=self.pi)
        observations[0] = np.random.choice(self.M, p=self.B[states[0], :])

        for t in range(1, length):
            states[t] = np.random.choice(self.N, p=self.A[states[t-1], :])
            observations[t] = np.random.choice(self.M, p=self.B[states[t], :])

        return states, observations

In [4]:
# Define the HMM parameters
A = [[0.7, 0.3],
     [0.4, 0.6]]

B = [[0.1, 0.4, 0.5],
     [0.7, 0.2, 0.1]]

pi = [0.6, 0.4]

# Define observation sequence (e.g., encoded as 0, 1, 2 corresponding to the emissions)
observations = [0, 1, 2]

# Initialize HMM
hmm = HiddenMarkovModel(A, B, pi)

# Calculate the probability of the observation sequence using the forward algorithm
prob_forward = hmm.forward(observations)
print(f"Probability of the observation sequence (Forward): {prob_forward}")

# Calculate the probability of the observation sequence using the backward algorithm
prob_backward = hmm.backward(observations)
print(f"Probability of the observation sequence (Backward): {prob_backward}")

# Find the most likely state sequence using the Viterbi algorithm
state_sequence, viterbi_prob = hmm.viterbi(observations)
print(f"Most likely state sequence: {state_sequence}")
print(f"Probability of the most likely state sequence (Viterbi): {viterbi_prob}")

# Generate a random sequence of states and observations
generated_states, generated_observations = hmm.generate_sequence(5)
print(f"Generated states: {generated_states}")
print(f"Generated observations: {generated_observations}")

Probability of the observation sequence (Forward): [[0.06    0.28   ]
 [0.0616  0.0372 ]
 [0.029   0.00408]]
Probability of the observation sequence (Backward): [[0.122 0.092]
 [0.38  0.26 ]
 [1.    1.   ]]
Most likely state sequence: [1 0 0]
Probability of the most likely state sequence (Viterbi): 0.01568
Generated states: [0 0 1 0 0]
Generated observations: [2 2 1 2 2]
