In [1]:
import numpy as np

# y observations, A transition matrix, B emission matrix, pi initial prob.

In [3]:
def forward(Y, A, B, pi):
    T = len(Y)
    N = len(pi)
    alpha = np.zeros((T, N))

    # Initialization
    alpha[0, :] = pi * B[:, Y[0]]

    # Recursion
    for t in range(1, T):
        for j in range(N):
            alpha[t, j] = np.sum(alpha[t - 1, :] * A[:, j]) * B[j, Y[t]]

    return alpha

In [4]:
def backward(Y, A, B):
    T = len(Y)
    N = A.shape[0]
    beta = np.zeros((T, N))

    # Initialization
    beta[T - 1, :] = 1

    # Recursion
    for t in range(T - 2, -1, -1):
        for i in range(N):
            beta[t, i] = np.sum(A[i, :] * B[:, Y[t + 1]] * beta[t + 1, :])

    return beta

In [5]:
def baum_welch(Y, A, B, pi, max_iter=100, tol=1e-4):
    N = A.shape[0]
    M = B.shape[1]  
    T = len(Y)     

    # First iteration
    alpha = forward(Y, A, B, pi)
    beta = backward(Y, A, B)

    # Compute gamma and xi after the first iteration
    gamma = np.zeros((T, N))
    xi = np.zeros((T - 1, N, N))

    for t in range(T):
        gamma[t, :] = (alpha[t, :] * beta[t, :]) / np.sum(alpha[t, :] * beta[t, :])

    for t in range(T - 1):
        denom = np.sum(alpha[t, :, None] * A * B[:, Y[t + 1]] * beta[t + 1, :])
        for i in range(N):
            xi[t, i, :] = (alpha[t, i] * A[i, :] * B[:, Y[t + 1]] * beta[t + 1, :]) / denom

    # Update parameters after the first iteration
    pi = gamma[0, :]

    for i in range(N):
        for j in range(N):
            A[i, j] = np.sum(xi[:, i, j]) / np.sum(gamma[:-1, i])

    for j in range(N):
        for k in range(M):
            B[j, k] = np.sum(gamma[Y == k, j]) / np.sum(gamma[:, j])

# Print results after first iteration
    print("First Iteration:")
    print("\nTrained Transition Probabilities (Rainy/Sunny):")
    for i, row in enumerate(A):
        print(f"From {states[i]}: {dict(zip(states, row))}")

    print("\nTrained Emission Probabilities (Rainy/Sunny):")
    for i, row in enumerate(B):
        print(f"From {states[i]}: {dict(zip(observations, row))}")

    print("\nTrained Initial Probabilities:")
    for i, p in enumerate(pi):
        print(f"{states[i]}: {p}")

# Store current A and B as old to check for convergence in subsequent iterations
    A_old, B_old = A.copy(), B.copy()

# Now continue with the rest of the iterations until max_iter or tolerance
    for iteration in range(1, max_iter):
        alpha = forward(Y, A, B, pi)
        beta = backward(Y, A, B)

        # Compute gamma and xi
        gamma = np.zeros((T, N))
        xi = np.zeros((T - 1, N, N))

        for t in range(T):
            gamma[t, :] = (alpha[t, :] * beta[t, :]) / np.sum(alpha[t, :] * beta[t, :])

        for t in range(T - 1):
            denom = np.sum(alpha[t, :, None] * A * B[:, Y[t + 1]] * beta[t + 1, :])
            for i in range(N):
                xi[t, i, :] = (alpha[t, i] * A[i, :] * B[:, Y[t + 1]] * beta[t + 1, :]) / denom

        # Update parameters
        pi = gamma[0, :]

        for i in range(N):
            for j in range(N):
                A[i, j] = np.sum(xi[:, i, j]) / np.sum(gamma[:-1, i])

        for j in range(N):
            for k in range(M):
                B[j, k] = np.sum(gamma[Y == k, j]) / np.sum(gamma[:, j])

        # Check for convergence based on tolerance
        if np.abs(A - A_old).max() < tol and np.abs(B - B_old).max() < tol:
            break

        A_old, B_old = A.copy(), B.copy()

    return A, B, pi

In [6]:
# Parameters
states = ["Rainy", "Sunny"]
observations = ["Walk", "Shop", "Clean"]
state_to_idx = {s: i for i, s in enumerate(states)}
obs_to_idx = {o: i for i, o in enumerate(observations)}

pi = np.array([0.6, 0.4])  # Initial probabilities
A = np.array([[0.7, 0.3], [0.4, 0.6]])  # Transition probabilities
B = np.array([[0.1, 0.4, 0.5], [0.6, 0.3, 0.1]])  # Emission probabilities
Y = np.array([obs_to_idx[o] for o in ["Walk", "Walk", "Clean"]])  # Observation sequence

# Train the HMM with max iterations set to 200
A_trained, B_trained, pi_trained = baum_welch(Y, A, B, pi, max_iter=200)

First Iteration:

Trained Transition Probabilities (Rainy/Sunny):
From Rainy: {'Rainy': 0.666044776119403, 'Sunny': 0.333955223880597}
From Sunny: {'Rainy': 0.4493227463801962, 'Sunny': 0.5506772536198038}

Trained Emission Probabilities (Rainy/Sunny):
From Rainy: {'Walk': 0.28445073412347427, 'Shop': 0.0, 'Clean': 0.7155492658765258}
From Sunny: {'Walk': 0.8916189484643415, 'Shop': 0.0, 'Clean': 0.1083810515356585}

Trained Initial Probabilities:
Rainy: 0.14431773495871017
Sunny: 0.8556822650412899


In [7]:
# Display human-readable results
print("\nTrained Transition Probabilities (Rainy/Sunny):")
for i, row in enumerate(A_trained):
    print(f"From {states[i]}: {dict(zip(states, row))}")

print("\nTrained Emission Probabilities (Rainy/Sunny):")
for i, row in enumerate(B_trained):
    print(f"From {states[i]}: {dict(zip(observations, row))}")

print("\nTrained Initial Probabilities:")
for i, p in enumerate(pi_trained):
    print(f"{states[i]}: {p}")


Trained Transition Probabilities (Rainy/Sunny):
From Rainy: {'Rainy': 1.0, 'Sunny': 7.408542529169662e-87}
From Sunny: {'Rainy': 0.6665149574904744, 'Sunny': 0.33348504250952565}

Trained Emission Probabilities (Rainy/Sunny):
From Rainy: {'Walk': 0.3331815551250396, 'Shop': 0.0, 'Clean': 0.6668184448749603}
From Sunny: {'Walk': 0.9999999999655821, 'Shop': 0.0, 'Clean': 3.441793992237612e-11}

Trained Initial Probabilities:
Rainy: 6.573408389534184e-13
Sunny: 0.9999999999993426
