In [164]:
import numpy as np
from tqdm import tqdm

In [170]:
def forward(A, B, pi, sequence, eval=False):
    T = len(sequence)
    N = A.shape[0]
    alpha = np.zeros((N, T))
    alpha[:, 0] = pi * B[:, sequence[0]]
    constant = np.ones(T)
    if not eval:
        constant[0] = 1/np.sum(alpha[:, 0])
        alpha[:, 0] *= constant[0]
    for i in range(1, T):
        alpha[:, i] = (alpha[:, i-1] @ A) * B[:, sequence[i]]
        if not eval:
            constant[i] = 1/np.sum(alpha[:, i])
            alpha[:, i] *= constant[i]
    return alpha, constant

def backward(A, B, sequence, constant, eval=False):
    T = len(sequence)
    N = A.shape[0]
    beta = np.zeros((N, T))
    beta[:, -1] = 1
    if not eval:
        beta[:, -1] *= constant[-1]
    for i in range(T-1)[::-1]:
        beta[:, i] = A @ (B[:, sequence[i+1]] * beta[:, i+1])
        if not eval:
            beta[:, i] *= constant[i]
    return beta

In [171]:
def init_hmm_params(num_states, num_observations):
    
    # Init transition matrix with regards to the given map
    A = np.zeros((num_states, num_states))
    A[0, [0, 1, 5]] = 1
    A[1, [1, 0, 2]] = 1
    A[2, [2, 1, 3]] = 1
    A[3, [3, 2, 4, 8]] = 1
    A[4, [4, 3, 9]] = 1
    A[5, [5, 0, 6, 10]] = 1
    A[6, [6, 5, 7, 11, 12]] = 1
    A[7, [7, 6, 8, 12]] = 1
    A[8, [8, 3, 7, 9, 12, 13]] = 1
    A[9, [9, 4, 8, 14]] = 1
    A[10, [10, 5, 15]] = 1
    A[11, [11, 6, 12, 16]] = 1
    A[12, [12, 6, 7, 8, 11, 13, 16, 17, 18]] = 1
    A[13, [13, 8, 12, 14, 18]] = 1
    A[14, [14, 9, 13, 19]] = 1
    A[15, [15, 10, 20]] = 1
    A[16, [16, 11, 12, 17, 21]] = 1
    A[17, [17, 12, 16, 18, 22]] = 1
    A[18, [18, 12, 13, 17, 19, 23]] = 1
    A[19, [19, 14, 18, 24]] = 1
    A[20, [20, 15, 21]] = 1
    A[21, [21, 16, 20, 22]] = 1
    A[22, [22, 17, 21, 23]] = 1
    A[23, [23, 18, 22, 24]] = 1
    A[24, [24, 19, 23]] = 1
    assert (A - A.T).sum() == 0
    A = (A.T / A.sum(axis=1)).T
    
    # Init emission probabilities
    B = np.random.rand(num_states, num_observations)
    B = (B.T / B.sum(axis=1)).T

    # Init initial probabilities
    pi = np.random.rand(num_states)
    pi /= pi.sum()

    return A, B, pi

In [172]:
def baum_welch(sequences, iters=100):

    A, B, pi = init_hmm_params(25, 11)
    
    N = A.shape[0]
    O = B.shape[1]
    T = len(sequences[0])

    for i in range(iters):

        A_num = np.zeros((N, N))           # (N, N)
        A_denom = np.zeros((N, 1))         # (N, 1)
        B_num = np.zeros((N, O))           # (N, O)
        B_denom = np.zeros((N, 1))         # (N, 1)
        pi_num = np.zeros(N)          

        for seq_num in tqdm(range(sequences.shape[0]), bar_format='{l_bar}{bar:100}{r_bar}{bar:-10b}'):
            
            sequence = sequences[seq_num]
        
            alpha, constant = forward(A, B, pi, sequence)         # alpha: (N, T)
            beta = backward(A, B, sequence, constant)             # beta: (N, T)

            # xi: (N, N, T-1)
            # compute xi vectorized 
            alpha_t = alpha[:, :-1]
            beta_t = beta[:, 1:]
            B_t = B[:, sequence[1:]]
            numerator = (A[:, :, None] * alpha_t[:, None, :]) * (beta_t * B_t)
            denom = numerator.sum(axis=(0, 1))
            xi = numerator / (denom + 1e-7)

            gamma_temp = alpha * beta                                # gamma_temp: (N, T)
            gamma = gamma_temp / (gamma_temp.sum(axis=0) + 1e-7)     # gamma: (N, T)

            A_num += xi.sum(axis = 2)                                      # A_num: (N, N)
            A_denom += gamma[:, :-1].sum(axis=1).reshape(-1, 1)            # A_denom: (N, 1)

            for o in range(O):
                B_num[:, o] += np.sum(gamma[:, sequence == o], axis=1)
            B_denom += np.sum(gamma, axis=1).reshape(-1, 1)

            pi_num += gamma[:, 0]

        A = A_num / (A_denom + 1e-7)
        B = B_num / (B_denom + 1e-7)
        pi = pi_num / sequences.shape[1]

    return A, B, pi


# **Quesion 1: Learn HMM Parameters**

In [173]:
train_data = np.load('train_data.npy')

A, B, pi = baum_welch(train_data, iters=2)
np.save('transition_matrix.npy', A)
np.save('emission_matrix.npy', B)
np.save('initial_dist.npy', pi)


100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 500/500 [00:08<00:00, 56.97it/s]
100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 500/500 [00:07<00:00, 65.09it/s]


# **Quesion 2: Compute sequence probabilities**

In [187]:
def forward_backward(sequence, A, B, pi):
    T = len(sequence)
    idx = np.random.randint(1, T)
    alpha, c = forward(A, B, pi, sequence, eval=True)
    beta = backward(A, B, sequence, c, eval=True)
    prob = alpha[:, idx] @ beta[:, idx]
    return prob

test_data = np.load('test_data.npy')
test_probs = np.zeros(test_data.shape[0])
for i, observation in enumerate(test_data):
    prob = forward_backward(observation, A, B, pi)
    test_probs[i] = prob
np.save('test_probs.npy', test_probs)