In [None]:
import numpy as np

def forward(A, B, T, pi, observations):
    alpha = np.empty((T, 2))
    alpha[0, 0] = pi[0] * B[0, observations[0]]
    alpha[0, 1] = pi[1] * B[1, observations[0]]
    for t in range(1, T):
        for j in range(2):
            alpha_t_j = 0
            for i in range(2):
                alpha_t_j += A[i, j] * alpha[t - 1, i]
            alpha_t_j *= B[j, observations[t]]
            alpha[t, j] = alpha_t_j
        
    return alpha

def backward(A, B, T, observations):
    beta = np.empty((T, 2))
    beta[T - 1, :] = 1
    for t in range(T - 2, -1, -1):
        for i in range(2):
            beta_t_i = 0
            for j in range(2):
                beta_t_i += A[i, j] * B[j, observations[t + 1]] * beta[t + 1, j]
            beta[t, i] = beta_t_i
            
    return beta

def compute_gamma(Alpha, Beta, T):
    gamma = np.empty((T, 2))
    for t in range(T):
        num = 0
        for j in range(2):        
            num += Alpha[t, j] * Beta[t, j]
        for i in range(2):
            gamma[t, i] = (Alpha[t, i] * Beta[t, i]) / num

    return gamma

def compute_xi(Alpha, Beta, A, B, observations, T):
    xi = np.empty((T, 2, 2))
    for t in range(T - 1):
        for i in range(2):
            for j in range(2):
                den = 0
                for k in range(2):
                    for l in range(2):
                        den += Alpha[t, k] * A[k, l] * B[l, observations[t + 1]] * Beta[t + 1, l]
                xi[t, i, j] = Alpha[t, i] * A[i, j] * B[j, observations[t + 1]] * Beta[t + 1, j] / den
    return xi

def compute_a(gamma, xi, T):
    A = np.empty((2, 2))
    for i in range(2):
        for j in range(2):
            num, den = 0, 0
            for t in range(T - 1):
                num += xi[t, i, j]
                den += gamma[t, i]
            A[i, j] = num / den
    return A

def compute_b(gamma, observations, T):
    B = np.empty((2, 2))
    for i in range(2):
        for k in range(2):
            num, den = 0, 0
            for t in range(T):
                if observations[t] == k:
                    num += gamma[t, i]
                den += gamma[t, i]
            B[i, k] = num / den
    return B

In [117]:
import numpy as np

T = 500

# hidden data (that we try to estimate)
A = np.array([[0.5, 0.5], [0.8, 0.2]]) # columns and rows indexed by (rain, no rain)
B = np.array([[0.8, 0.2], [0.15, 0.85]]) # rows indexed by (rain, no rain), columns by (umbrella, no umbrella)
pi = np.array([0.5, 0.5]) # (prob for rain, prob for no rain)

# we use A, B and pi to construct a HMM environment with two states and two observable values
observations = np.empty(T)
states = np.empty(T)

states[0] = int(np.random.uniform() > pi[0]) # 0 = rain
observations[0] = int(np.random.uniform() > B[int(states[0]), 0]) # 0 = umbrella

for t in range(1, T):
    prob_rain = A[int(states[t - 1]), 0]
    states[t] = int(np.random.uniform() > prob_rain)
    prob_umb = B[int(states[t]), 0]
    observations[t] = int(np.random.uniform() > prob_umb)

observations = [int(t) for t in observations]
states = [int(t) for t in states]

A_real = A.copy()
B_real = B.copy()
pi_real = pi.copy()

ll_tol = 0.2
n_attempts = 10

ll_list = []
A_list = []
B_list = []

for _ in range(n_attempts):

    prev_ll = 0
    ll = 1.0

    # initial guesses
    A = np.empty((2, 2)) # columns and rows indexed by (rain, no rain)
    A[:, 0] = np.random.uniform(size=2)
    A[:, 1] = 1 - A[:, 0]
    B = np.empty((2, 2)) # rows indexed by (rain, no rain), columns by (umbrella, no umbrella)
    B[:, 0] = np.random.uniform(size=2)
    B[:, 1] = 1 - B[:, 0]
    prob_start_rain = np.random.uniform()
    pi = np.array([prob_start_rain, 1 - prob_start_rain]) # (prob for rain, prob for no rain)

    while abs(prev_ll - ll) > ll_tol:
        prev_ll = ll

        # E step
        alpha = forward(A, B, T, pi, observations)
        beta = backward(A, B, T, observations)
        gamma = compute_gamma(alpha, beta, T)
        xi = compute_xi(alpha, beta, A, B, observations, T)

        # M step
        A = compute_a(gamma, xi, T)
        B = compute_b(gamma, observations, T)
        pi = gamma[0, :]

        ll = np.log(np.sum(alpha[-1, :]))
        print('log_likelyhood:', ll)

        # print('this iteration:')
        # print('alpha:'); print(alpha)
        # print('beta:'); print(beta)
        # print('gamma:'); print(gamma)
        # print('xi:'); print(xi)
        # print('a:'); print(A)
        # print('b:'); print(B)
        # print('pi:', pi)
        # print()

    print('final estimated values:')
    print('A:'); print(A)
    print('B:'); print(B)
    print('pi:'); print(pi)

    print('true values:')
    print('A:'); print(A_real)
    print('B:'); print(B_real)
    print('pi:'); print(pi_real)

    A_list.append(A)
    B_list.append(B)
    ll_list.append(ll)

idx = np.argmax(ll_list)
print('highest_ll:', ll_list[idx])
print('corresponding A:', A_list[idx])
print('corresponding B:', B_list[idx])

print(ll_list)
print(A_list)
print(B_list)



log_likelyhood: -1.1102230246251565e-16
log_likelyhood: 0.0
final estimated values:
A:
[[0.50178492 0.49821508]
 [0.53262584 0.46737416]]
B:
[[0.14831345 0.85168655]
 [0.94878474 0.05121526]]
pi:
[0.00601151 0.99398849]
true values:
A:
[[0.5 0.5]
 [0.8 0.2]]
B:
[[0.8  0.2 ]
 [0.15 0.85]]
pi:
[0.5 0.5]
log_likelyhood: 0.0
log_likelyhood: -1.1102230246251565e-16
final estimated values:
A:
[[0.01463571 0.98536429]
 [0.17628895 0.82371105]]
B:
[[0.69445839 0.30554161]
 [0.50753287 0.49246713]]
pi:
[0.45683902 0.54316098]
true values:
A:
[[0.5 0.5]
 [0.8 0.2]]
B:
[[0.8  0.2 ]
 [0.15 0.85]]
pi:
[0.5 0.5]
log_likelyhood: 0.0
log_likelyhood: 0.0
final estimated values:
A:
[[0.12295809 0.87704191]
 [0.87186664 0.12813336]]
B:
[[0.67863345 0.32136655]
 [0.39395605 0.60604395]]
pi:
[0.98651489 0.01348511]
true values:
A:
[[0.5 0.5]
 [0.8 0.2]]
B:
[[0.8  0.2 ]
 [0.15 0.85]]
pi:
[0.5 0.5]
log_likelyhood: -1.1102230246251565e-16
log_likelyhood: -1.1102230246251565e-16
final estimated values:
A:
[[0.