In [1]:
import numpy as np

In [2]:
def generate_HMM_params(num_hidden_state, num_obs):
    # random generate the transition matrix and observation matrix, and compute the stationary distribution
    
    alpha_state = np.ones(num_hidden_state)
    alpha_obs = np.ones(num_obs)
    trans_mat = np.random.dirichlet(alpha_state, num_hidden_state)
    obs_mat = np.random.dirichlet(alpha_obs, num_hidden_state)
    tmp = np.ones((num_hidden_state + 1, num_hidden_state))
    tmp[:-1] = np.identity(num_hidden_state) - trans_mat.T
    tmp_v = np.zeros(num_hidden_state + 1)
    tmp_v[-1] = 1
    stat_dist = np.linalg.lstsq(tmp, tmp_v, rcond=None)[0]
    return trans_mat, obs_mat, stat_dist

In [3]:
def generate_HMM_sequences(trans_mat, obs_mat, init_dist, length, num_samples = 1):
    # generate sample sequences from HMM
    
    states = np.zeros((num_samples, length))
    obs = np.zeros((num_samples, length))
    tmp_state = np.argmax(np.random.multinomial(1, init_dist, num_samples), axis = 1)
    #print(tmp_state)
    for i in range(length):
        #print("i: ", i)
        states[:, i] = tmp_state
        for j in range(num_samples):
            obs[j, i] = np.random.multinomial(1, obs_mat[tmp_state[j]]).argmax()
            tmp_state[j] = np.random.multinomial(1, trans_mat[tmp_state[j]]).argmax()
        #print("obs[:, i]: ", obs[:, i])
    return states, obs

In [4]:
def forward_compute(trans_mat, obs_mat, init_dist, obs_to_pos):
    # compute \sum_{h_1,...,h_{pos-1}} P(h_1,...,h_{pos},x_1,...,x_{pos-1})
    pos = obs_to_pos.shape[0] + 1
    num_hidden_state = trans_mat.shape[0]
    num_obs = obs_mat.shape[1]
    forward = np.zeros((pos, num_hidden_state))
    forward[0] = init_dist
    for i in range(1, pos):
        for j in range(num_hidden_state):
            for k in range(num_hidden_state):
                #print(i, j, k)
                #print(forward[i - 1, k], trans_mat[k, j], obs_mat[k, int(obs_to_pos[i - 1])])
                forward[i, j] += forward[i - 1, k] * trans_mat[k, j] * obs_mat[k, int(obs_to_pos[i - 1])]
    #print("forward: ", forward)
    return forward[pos - 1]

In [5]:
def backward_compute(trans_mat, obs_mat, obs_from_pos):
    num_hidden_state = trans_mat.shape[0]
    num_obs = obs_mat.shape[1]
    back_length = obs_from_pos.shape[0]
    if (back_length == 0):
        return np.ones(num_hidden_state)
    backward = np.zeros((back_length, num_hidden_state))
    for j in range(num_hidden_state):
         for k in range(num_hidden_state):
            backward[0, j] += trans_mat[j, k] * obs_mat[k, int(obs_from_pos[-1])]
    for i in range(1, back_length):
        for j in range(num_hidden_state):
            for k in range(num_hidden_state):
                backward[i, j] += trans_mat[j, k] * obs_mat[k, int(obs_from_pos[-(i + 1)])] * backward[i - 1, k]
    #print("backward: ", backward)
    return backward[-1]

In [6]:
def x_i_conditional_prob(trans_mat, obs_mat, init_dist, known_X, pos):
    num_hidden_state = trans_mat.shape[0]
    num_obs = obs_mat.shape[1]
    num_samples = known_X.shape[0]
    length = known_X.shape[1]
    x_pos_conditional_prob = np.zeros((num_samples, num_obs))
    for i in range(num_samples):
        #print("x_i_conditional_prob: i=", i)
        sample_obs_vec = known_X[i]
        forward_vec = forward_compute(trans_mat, obs_mat, init_dist, known_X[i, :pos])
        backward_vec = backward_compute(trans_mat, obs_mat, known_X[i, pos + 1:])
        #print("forward_vec: ", forward_vec)
        #print("backward_vec: ", backward_vec)
        h_prob_tmp = forward_vec * backward_vec
        tmp = h_prob_tmp.sum()
        h_prob_tmp /= tmp
        x_pos_conditional_prob[i] = h_prob_tmp @ obs_mat
    return x_pos_conditional_prob

In [23]:
trans_mat = np.array([1.0, 0.0, 1.0, 0.0]).reshape((2,2))
obs_mat = np.identity(2)
init_dist = np.array([0.5, 0.5])
num_of_samples = 1
length = 3
trans_mat, obs_mat, stat_dist = generate_HMM_params(2, 3)
print("trans_mat: ", trans_mat)
print("obs_mat: ", obs_mat)
print("init_dist: ", stat_dist)
states, obs = generate_HMM_sequences(trans_mat, obs_mat, stat_dist, length, num_of_samples)
print("states: ", states)
print("obs: ", obs)
pos = 1
x_i_conditional_prob(trans_mat, obs_mat, stat_dist, obs, pos)

trans_mat:  [[0.18889228 0.81110772]
 [0.46537437 0.53462563]]
obs_mat:  [[0.4576714  0.17743767 0.36489093]
 [0.02192317 0.89344686 0.08462997]]
init_dist:  [0.36457572 0.63542428]
states:  [[1. 1. 0.]]
obs:  [[2. 1. 2.]]


array([[0.10477604, 0.75730536, 0.13791859]])

In [45]:
x_i_conditional_prob(trans_mat, obs_mat, stat_dist, obs, 2)

array([[0.21652426, 0.57368385, 0.20979189]])

In [39]:
test_states, test_obs = generate_HMM_sequences(trans_mat, obs_mat, stat_dist, length, 100000)

In [46]:
lst = [0, 0, 0]
for i in range(test_obs.shape[0]):
    if (test_obs[i, 1] == 1.0) and (test_obs[i, 0] == 2.0):
        lst[int(test_obs[i, 2])] += 1
print(lst)

[2827, 7603, 2685]


In [47]:
lst = np.array(lst).astype('float64')
lst /= lst.sum()
print(lst)

[0.21555471 0.57971788 0.20472741]
