In [293]:
import copy
import random
import math
# Initialize all the matrices required for the hidden markov models (for testing only)
N = 2
M = 3
T = 4
pi = [0.60, 0.40]
O = [0, 1, 0, 2]
A = [[0.70, 0.30], [0.40, 0.60]]
B = [[0.10, 0.40, 0.50], [0.70, 0.20, 0.10]]

In [294]:
# The Forward algorithm. Alpha-pass
# pi represents probability of initial states
# A is the state transition matrix
# B is the observation probability matrix
# N is the number of distinct states of the markov process
# T is the length of the observation sequence
# M is the number of ovservation symbols
# O is the observation sequence
def alpha_pass(pi, A, B, N, M, T, O):
    # The alpha pass calculates the probability P(O_0, O_1, O_2 ... O_t, X_T = q_i | model)

    # initialize the zeroth alpha-pass
    alpha = []
    scaled_constants = []
    const = 0
    for states in range(0, N):
        alpha.append([pi[states] * B[states][O[0]]]);
        const += alpha[states][0] # for normalization of alpha constants
    
    scaled_constants.append(1 / const)
    
    for states in range(0, N): # Nomalizing
        alpha[states][0] *= scaled_constants[0]
    
    # starting the DP algorithm here
    for time in range(1, T):

        const = 0

        # for each state
        for states in range(0, N):
            # calculate transition probabilities from each state in time-1 and add it
            prob = 0
            for prev_states in range(0, N):
                prob += (alpha[prev_states][time - 1] * A[prev_states][states]);
            
            # multiply it with probabilities to get the observation symbols
            alpha[states].append(prob * B[states][O[time]])
            const += alpha[states][time]
        
        # Normalizing
        for states in range(0, N):
            alpha[states][time] /= const
        scaled_constants.append(1 / const)

    return alpha, scaled_constants

In [295]:
alpha, scaled_constants = alpha_pass(pi, A, B, N, M, T, O)
print(alpha)

[[0.17647058823529413, 0.6234817813765182, 0.16880093131548313, 0.8039793968596827], [0.8235294117647058, 0.3765182186234818, 0.8311990686845169, 0.19602060314031738]]


In [296]:
# The backward algorithm. Beta-pass
# pi represents probability of initial states
# A is the state transition matrix
# B is the observation probability matrix
# N is the number of distinct states of the markov process
# T is the length of the observation sequence
# M is the number of ovservation symbols
# O is the observation sequence
def beta_pass(pi, A, B, N, M, T, O, scaled_constants):
    # initialize the beta-pass
    beta = [[scaled_constants[T - 1]] * T for _ in range(0, N)]
    
    # The algorithm starts here
    for time in range(T - 2, -1, -1):
        
        for states in range(0, N):
            # find the sum of transition probabilities multiplied with the observational probability and the beta-pass of next states
            beta[states][time] = 0
            for next_states in range(0, N):
                beta[states][time] += (A[states][next_states] * B[next_states][O[time + 1]] * beta[next_states][time + 1])
            beta[states][time] *= scaled_constants[time]
    
    return beta

In [297]:
beta = beta_pass(pi, A, B, N, M, T, O, scaled_constants)
print(beta)

[[3.1361634958876796, 2.866993436902883, 3.8988119963446044, 3.568164825122539], [2.899393536595498, 4.392290437816732, 2.667608208025256, 3.568164825122539]]


In [298]:
# the procedure to find the digammas and gammas
def digammas(pi, A, B, N, M, T, O, alpha, beta):

    digamma = [[[1] * T for _ in range(0, N)] for _ in range(0, N)]
    gamma = [[1] * T for _ in range(0, N)]

    # P(O | model)
    P_O = 0
    for i in range(0, N):
        P_O += alpha[i][T - 1]
    
    for time in range(0, T - 1):
        for stat1 in range(0, N):
            gamma[stat1][time] = 0
            for stat2 in range(0, N):
                digamma[stat1][stat2][time] = (alpha[stat1][time] * A[stat1][stat2] * B[stat2][O[time + 1]] * beta[stat2][time + 1]) / P_O;
                gamma[stat1][time] += digamma[stat1][stat2][time]
    
    # Special case for y(T - 1)
    for stat in range(0, N):
        gamma[stat][T - 1] = alpha[stat][T - 1]
    
    return digamma, gamma

In [299]:
digamma, gamma = digammas(pi, A, B, N, M, T, O, alpha, beta)
print(digamma, "\n\n")
print(gamma)

[[[0.14166320511755423, 0.1701586774113151, 0.21080834094874137, 1], [0.046506604635706585, 0.34927307468638374, 0.01806928636703498, 1]], [[0.3777685469801446, 0.058718949904461255, 0.5931710559109413, 1], [0.4340616432665947, 0.42184929799784, 0.1779513167732824, 1]]] 


[[0.1881698097532608, 0.5194317520976989, 0.22887762731577635, 0.8039793968596827], [0.8118301902467393, 0.48056824790230124, 0.7711223726842238, 0.19602060314031738]]


In [300]:
# Estimating the model from here on
def estimate(N, M, T, O):
    # initialize the initial probability here
    pi = []
    for i in range(0, N):
        num = 0
        num = random.gauss(1 / N, 0.1)
        pi.append(num)
    
    su = sum(pi)
    
    for i in range(0, N):
        pi[i] /= su
    
    # initialize the state transition matrix
    A = []
    for i in range(0, N):
        A.append([])
        for j in range(0, N):
            A[i].append(random.gauss(1 / N, 0.1))
        su = sum(A[i])
        for j in range(0, N):
            A[i][j] /= su
    
    # initialize the symbol emission probability matrix
    B = []
    for i in range(0, N):
        B.append([])
        for j in range(0, M):
            B[i].append(random.gauss(1 / M, 0.1))
        su = sum(B[i])
        for j in range(0, M):
            B[i][j] /= su
    print(pi)
    print(A)
    print(B, end="\n\n")

    # max_iter is a explicit condition to stop the recursion.
    max_iter = 10
    for it in range(0, max_iter):
        alpha, scaled_constants = alpha_pass(pi, A, B, N, M, T, O)
        beta = beta_pass(pi, A, B, N, M, T, O, scaled_constants)
        digamma, gamma = digammas(pi, A, B, N, M, T, O, alpha, beta)

        # Re-estimate the initial probability matrix
        for stat in range(0, N):
            pi[stat] = gamma[stat][0]
        
        # Re-estimate the state transition matrix
        for stat in range(0, N):
            denom = 0
            for time in range(0, T - 1):
                denom += gamma[stat][time]
            for anot in range(0, N):
                numer = 0
                for time in range(0, T - 1):
                    numer += digamma[stat][anot][time]
                A[stat][anot] = numer / denom
        
        # Re-estimate the symbol emission probability matrix
        for stat in range(0, N):
            denom = 0
            for time in range(0, T):
                denom += gamma[stat][time]
            for anot in range(0, M):
                numer = 0
                for time in range(0, T):
                    if (O[time] == anot):
                        numer += gamma[stat][time]
                B[stat][anot] = numer / denom
    
    # return the model
    return pi, A, B

In [301]:
pi, A, B = estimate(N, M, T, O)
print(pi)
print(A)
print(B)

[0.5618661121147067, 0.4381338878852932]
[[0.483938892258943, 0.5160611077410571], [0.4743153424324541, 0.525684657567546]]
[[0.1886234793134056, 0.27501880743116797, 0.5363577132554265], [0.24096138229898703, 0.2613636068770131, 0.49767501082399984]]

[0.32395233566118065, 0.6760476643388196]
[[0.516889199165247, 0.483110800834753], [0.540269275644103, 0.459730724355897]]
[[0.4052651645098788, 0.29586151779013575, 0.2988733176999855], [0.5868708580796177, 0.20794546554442278, 0.20518367637595955]]
