In [1]:
from __future__ import print_function
import json
import numpy as np
import sys

def forward(pi, A, B, O):
    """
    Forward algorithm

    Inputs:
    - pi: A numpy array of initial probabilities. pi[i] = P(Z_1 = s_i)
    - A: A numpy array of transition probabilities. A[i, j] = P(Z_t = s_j|Z_t-1 = s_i)
    - B: A numpy array of observation probabilities. B[i, k] = P(X_t = o_k| Z_t = s_i)
    - O: A list of observation sequence (in terms of index, not the actual symbol)

    Returns:
    - alpha: A numpy array alpha[j, t-1] = P(Z_t = s_j, X_{1:t}=x_{1:t})
    """
    S = len(pi)
    N = len(O)
    alpha = np.zeros([S, N])
    ###################################################
    # Q3.1 Edit here
    ###################################################
    # alpha[j, t-1] = b_s,x_t
    
    # initialize
    #for j in range(S):
    #    alpha[j, 0] = pi[j] * B[j, O[0]]
    alpha[:, 0] = pi * B[:, O[0]]
    
    for t in range(N-1):
        for j in range(S):
            #alpha[j, t+1] = B[j, O[t+1]] * np.sum(np.multiply(A[:, j], alpha[j, t]))
            alpha[j, t+1] = B[j, O[t+1]] * np.sum(A[:, j] * alpha[:, t])

    return alpha


def backward(pi, A, B, O):
    """
    Backward algorithm

    Inputs:
    - pi: A numpy array of initial probabilities. pi[i] = P(Z_1 = s_i)
    - A: A numpy array of transition probabilities. A[i, j] = P(Z_t = s_j|Z_t-1 = s_i)
    - B: A numpy array of observation probabilities. B[i, k] = P(X_t = o_k| Z_t = s_i)
    - O: A list of observation sequence (in terms of index, not the actual symbol)

    Returns:
    - beta: A numpy array beta[j, t-1] = P(X_{t+1:N}=x_{t+1:N} | Z_t = s_j)
    """
    S = len(pi)
    N = len(O)
    beta = np.zeros([S, N])
    ###################################################
    # Q3.1 Edit here
    ###################################################

    # initialize
    beta[:, N-1] = np.ones(S)

    for t in range(N-1)[::-1]:
        print(t)
        for j in range(S):
            #beta[j, t] = np.sum(A[:, j] * B[:, O[t+1]] * beta[:, t+1])
            beta[j, t] = np.sum(A[j, :] * B[:, O[t+1]] * beta[:, t+1])
            
    return beta

def seqprob_forward(alpha):
    """
    Total probability of observing the whole sequence using the forward messages

    Inputs:
    - alpha: A numpy array alpha[j, t-1] = P(Z_t = s_j, X_{1:t}=x_{1:t})

    Returns:
    - prob: A float number of P(X_{1:N}=O)
    """
    prob = 0
    ###################################################
    # Q3.2 Edit here
    ###################################################
    
    return prob


def seqprob_backward(beta, pi, B, O):
    """
    Total probability of observing the whole sequence using the backward messages

    Inputs:
    - beta: A numpy array beta: A numpy array beta[j, t-1] = P(X_{t+1:N}=x_{t+1:N} | Z_t = s_j)
    - pi: A numpy array of initial probabilities. pi[i] = P(Z_1 = s_i)
    - B: A numpy array of observation probabilities. B[i, k] = P(X_t = o_k| Z_t = s_i)
    - O: A list of observation sequence
            (in terms of the observation index, not the actual symbol)

    Returns:
    - prob: A float number of P(X_{1:N}=O)
    """
    prob = 0
    ###################################################
    # Q3.2 Edit here
    ###################################################
    
    return prob

def viterbi(pi, A, B, O):
    """
    Viterbi algorithm

    Inputs:
    - pi: A numpy array of initial probabilities. pi[i] = P(Z_1 = s_i)
    - A: A numpy array of transition probabilities. A[i, j] = P(Z_t = s_j|Z_t-1 = s_i)
    - B: A numpy array of observation probabilities. B[i, k] = P(X_t = o_k| Z_t = s_i)
    - O: A list of observation sequence (in terms of index, not the actual symbol)

    Returns:
    - path: A list of the most likely hidden state path (in terms of the state index)
    """
    path = []
    ###################################################
    # Q3.3 Edit here
    ###################################################
    
    return path


##### DO NOT MODIFY ANYTHING BELOW THIS ###################
def main():
    #model_file = sys.argv[1]
    #Osymbols = sys.argv[2]
    model_file = 'hmm_model.json'
    Osymbols = 'AGCGTA'
    
    #### load data ####
    with open(model_file, 'r') as f:
        data = json.load(f)
    A = np.array(data['A'])
    B = np.array(data['B'])
    pi = np.array(data['pi'])
    #### observation symbols #####
    obs_symbols = data['observations']
    #### state symbols #####
    states_symbols = data['states']

    N = len(Osymbols)
    O = [obs_symbols[j] for j in Osymbols]

    alpha = forward(pi, A, B, O)
    beta = backward(pi, A, B, O)

    prob1 = seqprob_forward(alpha)
    prob2 = seqprob_backward(beta, pi, B, O)
    print('Total log probability of observing the sequence %s is %g, %g.' % (Osymbols, np.log(prob1), np.log(prob2)))

    viterbi_path = viterbi(pi, A, B, O)

    print('Viterbi best path is ')
    for j in viterbi_path:
        print(states_symbols[j], end=' ')

#if __name__ == "__main__":
#    main()

In [2]:
    model_file = 'hmm_model.json'
    Osymbols = 'AGCGTA'
    
    #### load data ####
    with open(model_file, 'r') as f:
        data = json.load(f)
    A = np.array(data['A'])
    B = np.array(data['B'])
    pi = np.array(data['pi'])
    #### observation symbols #####
    obs_symbols = data['observations']
    #### state symbols #####
    states_symbols = data['states']

    N = len(Osymbols)
    O = [obs_symbols[j] for j in Osymbols]

## 1.1

In [3]:
    #def forward(pi, A, B, O):
    """
    Forward algorithm

    Inputs:
    - pi: A numpy array of initial probabilities. pi[i] = P(Z_1 = s_i)
    - A: A numpy array of transition probabilities. A[i, j] = P(Z_t = s_j|Z_t-1 = s_i)
    - B: A numpy array of observation probabilities. B[i, k] = P(X_t = o_k| Z_t = s_i)
    - O: A list of observation sequence (in terms of index, not the actual symbol)

    Returns:
    - alpha: A numpy array alpha[j, t-1] = P(Z_t = s_j, X_{1:t}=x_{1:t})
    """    
    S = len(pi)
    N = len(O)
    alpha = np.zeros([S, N])
    ###################################################
    # Q3.1 Edit here
    ###################################################
    # alpha[j, t-1] = b_s,x_t
    
    # initialize
    alpha[:, 0] = pi * B[:, O[0]]
        
    for t in range(N-1):
        for j in range(S):
            #alpha[j, t+1] = B[j, O[t+1]] * np.sum(A[:, j] * alpha[j, t])
            alpha[j, t+1] = B[j, O[t+1]] * np.sum(A[:, j] * alpha[:, t])
        #alpha[:, t+1] = B[:, O[t+1]] * np.sum(A[:, j] * alpha[:, t])
            
    #def backward(pi, A, B, O):
    """
    Backward algorithm

    Inputs:
    - pi: A numpy array of initial probabilities. pi[i] = P(Z_1 = s_i)
    - A: A numpy array of transition probabilities. A[i, j] = P(Z_t = s_j|Z_t-1 = s_i)
    - B: A numpy array of observation probabilities. B[i, k] = P(X_t = o_k| Z_t = s_i)
    - O: A list of observation sequence (in terms of index, not the actual symbol)

    Returns:
    - beta: A numpy array beta[j, t-1] = P(X_{t+1:N}=x_{t+1:N} | Z_t = s_j)
    """
    #S = len(pi)
    #N = len(O)
    beta = np.zeros([S, N])
    ###################################################
    # Q3.1 Edit here
    ###################################################
    # initialize
    beta[:, -1] = np.ones(S)

    for t in range(N-1)[::-1]:
        print('t is', t,'X[t+1] is ', O[t+1])
        for j in range(S):
            beta[j, t] = np.sum(A[j, :] * B[:, O[t+1]] * beta[:, t+1])

t is 4 X[t+1] is  0
t is 3 X[t+1] is  3
t is 2 X[t+1] is  2
t is 1 X[t+1] is  1
t is 0 X[t+1] is  2


In [4]:
B[0, O[5]]

0.4

In [5]:
alpha

array([[2.80000000e-01, 9.92000000e-02, 8.67200000e-03, 4.25728000e-03,
        3.98924800e-04, 2.10532352e-04],
       [6.00000000e-02, 1.84000000e-02, 9.26400000e-03, 1.45856000e-03,
        5.17977600e-04, 7.81143040e-05]])

In [6]:
A[:, j]

array([0.2, 0.6])

In [7]:
alpha

array([[2.80000000e-01, 9.92000000e-02, 8.67200000e-03, 4.25728000e-03,
        3.98924800e-04, 2.10532352e-04],
       [6.00000000e-02, 1.84000000e-02, 9.26400000e-03, 1.45856000e-03,
        5.17977600e-04, 7.81143040e-05]])

In [8]:
beta

array([[8.653056e-04, 2.279040e-03, 1.718400e-02, 4.560000e-02,
        3.600000e-01, 1.000000e+00],
       [7.726848e-04, 3.400320e-03, 1.507200e-02, 6.480000e-02,
        2.800000e-01, 1.000000e+00]])

In [9]:
print(A)
print(type(A))

[[0.8 0.2]
 [0.4 0.6]]
<class 'numpy.ndarray'>


In [10]:
print(B)
print(type(B))

[[0.4 0.1 0.4 0.1]
 [0.2 0.3 0.2 0.3]]
<class 'numpy.ndarray'>


In [11]:
print(pi)
print(type(pi))

[0.7 0.3]
<class 'numpy.ndarray'>


In [12]:
print(obs_symbols)
print(type(obs_symbols))

{'A': 0, 'C': 1, 'T': 3, 'G': 2}
<class 'dict'>


In [13]:
print(states_symbols)
print(type(states_symbols))

['1', '2']
<class 'list'>


In [14]:
print(N)
print(type(N))

6
<class 'int'>


In [15]:
print(O)
print(type(O))

[0, 2, 1, 2, 3, 0]
<class 'list'>


In [16]:
B[0,:]

array([0.4, 0.1, 0.4, 0.1])

In [17]:
S

2

In [18]:
alpha.shape

(2, 6)

In [19]:
O

[0, 2, 1, 2, 3, 0]

In [20]:
print(O[0])
print(O[-1])

0
0


In [21]:
print(B[0,O[0]])
print(B[:,O[0]])

0.4
[0.4 0.2]


In [22]:
np.multiply(A[:, 0], alpha[:, 0])

array([0.224, 0.024])

In [23]:
A[:, 0]

array([0.8, 0.4])

In [24]:
alpha[:, 0]

array([0.28, 0.06])

In [25]:
for n in range(N-1)[::-1]:
    print(n)

4
3
2
1
0


In [26]:
XA = np.array([1, -1, 1])
XB = np.array([2, 3, 4])
XC = np.array([4, 3, 2])


In [27]:
np.multiply(XA, XB)


array([ 2, -3,  4])

In [28]:
np.multiply(XB, XC)

array([8, 9, 8])

In [29]:
XB

array([2, 3, 4])

In [30]:
XC

array([4, 3, 2])

In [31]:
XA * XB * XC

array([ 8, -9,  8])

## 1.2

In [32]:
    #def seqprob_forward(alpha):
    """
    Total probability of observing the whole sequence using the forward messages

    Inputs:
    - alpha: A numpy array alpha[j, t-1] = P(Z_t = s_j, X_{1:t}=x_{1:t})

    Returns:
    - prob: A float number of P(X_{1:N}=O)
    """
    prob = 0
    ###################################################
    # Q3.2 Edit here
    ###################################################
    prob = np.sum(alpha[:, -1])
    
    print(prob)

0.00028864665600000013


In [33]:
    #def seqprob_backward(beta, pi, B, O):
    """
    Total probability of observing the whole sequence using the backward messages

    Inputs:
    - beta: A numpy array beta: A numpy array beta[j, t-1] = P(X_{t+1:N}=x_{t+1:N} | Z_t = s_j)
    - pi: A numpy array of initial probabilities. pi[i] = P(Z_1 = s_i)
    - B: A numpy array of observation probabilities. B[i, k] = P(X_t = o_k| Z_t = s_i)
    - O: A list of observation sequence
            (in terms of the observation index, not the actual symbol)

    Returns:
    - prob: A float number of P(X_{1:N}=O)
    """
    prob = 0
    ###################################################
    # Q3.2 Edit here
    ###################################################
    # B[j, O[0]] * pi(j) * beta[j, 0]
    prob = np.sum(B[:, O[0]] * pi * beta[:, 0])
    #    return prob
    print(prob)

0.0002886466560000002


### confirm

$$\gamma = \alpha_s(t)\beta_s(t) $$
$$P(X_{1:T}=x_{1:T}) =\sum_{s}\alpha_s(t)\beta_s(t)$$  for all t

In [34]:
for n in range(N):
    print(np.sum(alpha[:,n] * beta[:,n]))

0.0002886466560000002
0.00028864665600000013
0.00028864665600000013
0.00028864665600000013
0.00028864665600000013
0.00028864665600000013


In [35]:
j = 0
beta[j,0]

0.0008653056000000007

In [36]:
pi[j]

0.7

In [37]:
np.sum(alpha[:,0] * beta[:,0])

0.0002886466560000002

In [38]:
B[O[0]]

array([0.4, 0.1, 0.4, 0.1])

## 1.3

In [39]:
    #def viterbi(pi, A, B, O):
    """
    Viterbi algorithm

    Inputs:
    - pi: A numpy array of initial probabilities. pi[i] = P(Z_1 = s_i)
    - A: A numpy array of transition probabilities. A[i, j] = P(Z_t = s_j|Z_t-1 = s_i)
    - B: A numpy array of observation probabilities. B[i, k] = P(X_t = o_k| Z_t = s_i)
    - O: A list of observation sequence (in terms of index, not the actual symbol)

    Returns:
    - path: A list of the most likely hidden state path (in terms of the state index)
    """
    path = []
    ###################################################
    # Q3.3 Edit here
    ###################################################
    S = len(pi)
    N = len(O)
    
    """
    - delta: A numpy array of the most likely path for time 1 : t ending at state s
        delta[s, t] = max P(Z_t = s, Z_1:t-1 = z_1:t-1, X_1:t = x_1:t)
    - Delta: A numpy array of argmax of delta Delta = argmax(delta[s, t])
    """
    delta = np.zeros([S, N])
    Delta = -np.ones([S, N]).astype(int)
    
    delta[:, 0] = pi * B[:, O[0]]
    for t in range(N-1):
        for s in range(S):
            delta[s, t+1] = B[s, O[t+1]] * np.max(A[:, s] * delta[:, t])
            Delta[s, t+1] = np.argmax(A[:, s] * delta[:, t])
            #delta[s, t+1] = B[s, O[t+1]] * np.max(A[s, :] * delta[:, t])
            #Delta[s, t+1] = np.argmax(A[s, :] * delta[:, t])
            print(s, A[:, s] * delta[:, t])
    
    z = -np.ones(N).astype(int)
    z[-1] = np.argmax(delta[:, -1]).astype(int)
    print('first z is ', z)
    for t in range(N-1)[::-1]:
        print('t is ', t)
        print('z*t is', z[t+1])
        
        z[t] = Delta[z[t+1], t+1]
        print('next z is ', z)
    path = z.tolist()
    # return path

0 [0.224 0.024]
1 [0.056 0.036]
0 [0.07168 0.00448]
1 [0.01792 0.00672]
0 [0.0057344 0.0021504]
1 [0.0014336 0.0032256]
0 [0.00183501 0.00025805]
1 [0.00045875 0.00038707]
0 [1.4680064e-04 5.5050240e-05]
1 [3.670016e-05 8.257536e-05]
first z is  [-1 -1 -1 -1 -1  0]
t is  4
z*t is 0
next z is  [-1 -1 -1 -1  0  0]
t is  3
z*t is 0
next z is  [-1 -1 -1  0  0  0]
t is  2
z*t is 0
next z is  [-1 -1  0  0  0  0]
t is  1
z*t is 0
next z is  [-1  0  0  0  0  0]
t is  0
z*t is 0
next z is  [0 0 0 0 0 0]


In [40]:
pi

array([0.7, 0.3])

In [41]:
A

array([[0.8, 0.2],
       [0.4, 0.6]])

In [42]:
s = 0
print(A[:, s] * delta[:, 0])
print(np.max(A[:, s] * delta[:, 0]))

[0.224 0.024]
0.22399999999999998


In [43]:
print(np.argmax(delta[:, -1]))
print(delta[0,-1])
print(delta[1,-1])

0
5.8720256000000014e-05
1.6515072e-05


In [44]:
Delta

array([[-1,  0,  0,  0,  0,  0],
       [-1,  0,  0,  1,  0,  1]])

In [45]:
delta

array([[2.8000000e-01, 8.9600000e-02, 7.1680000e-03, 2.2937600e-03,
        1.8350080e-04, 5.8720256e-05],
       [6.0000000e-02, 1.1200000e-02, 5.3760000e-03, 6.4512000e-04,
        1.3762560e-04, 1.6515072e-05]])

In [46]:
z

array([0, 0, 0, 0, 0, 0])

In [47]:
type(z[5])

numpy.int64

In [48]:
path

[0, 0, 0, 0, 0, 0]

In [49]:
pi * B[:, O[0]]

array([0.28, 0.06])