In [1]:
import numpy as np
import pandas as pd

obs_map = {'Cold':0, 'Hot':1}
obs = np.array([1,1,0,1,0,0,1,0,1,1,0,0,0,1])

inv_obs_map = dict((v,k) for k, v in obs_map.items())
obs_seq = [inv_obs_map[v] for v in list(obs)]

print("Simulated Observations:\n",pd.DataFrame(np.column_stack([obs, obs_seq]),columns=['Obs_code', 'Obs_seq']) )

pi = [0.6,0.4] # initial probabilities vector
states = ['Cold', 'Hot']
hidden_states = ['Snow', 'Rain', 'Sunshine']
pi = [0, 0.2, 0.8]
state_space = pd.Series(pi, index=hidden_states, name='states')
a_df = pd.DataFrame(columns=hidden_states, index=hidden_states)
a_df.loc[hidden_states[0]] = [0.3, 0.3, 0.4]
a_df.loc[hidden_states[1]] = [0.1, 0.45, 0.45]
a_df.loc[hidden_states[2]] = [0.2, 0.3, 0.5]
print("\n HMM matrix:\n", a_df)
a = a_df.values

observable_states = states
b_df = pd.DataFrame(columns=observable_states, index=hidden_states)
b_df.loc[hidden_states[0]] = [1,0]
b_df.loc[hidden_states[1]] = [0.8,0.2]
b_df.loc[hidden_states[2]] = [0.3,0.7]
print("\n Observable layer  matrix:\n",b_df)
b = b_df.values

Simulated Observations:
    Obs_code Obs_seq
0         1     Hot
1         1     Hot
2         0    Cold
3         1     Hot
4         0    Cold
5         0    Cold
6         1     Hot
7         0    Cold
8         1     Hot
9         1     Hot
10        0    Cold
11        0    Cold
12        0    Cold
13        1     Hot

 HMM matrix:
          Snow  Rain Sunshine
Snow      0.3   0.3      0.4
Rain      0.1  0.45     0.45
Sunshine  0.2   0.3      0.5

 Observable layer  matrix:
          Cold  Hot
Snow        1    0
Rain      0.8  0.2
Sunshine  0.3  0.7


In [29]:
path, delta, phi = viterbi(pi, a, b, obs)
state_map = {0:'Snow', 1:'Rain', 2:'Sunshine'}
state_path = [state_map[v] for v in path]
pd.DataFrame().assign(Observation=obs_seq).assign(Best_Path=state_path)


Start Walk Forward

delta[:,t] = [0.   0.04 0.56]
s=0 and t=1: phi[0, 1] = 2.0
s=1 and t=1: phi[1, 1] = 2.0
s=2 and t=1: phi[2, 1] = 2.0
delta[:,t] = [0.     0.0336 0.196 ]
s=0 and t=2: phi[0, 2] = 2.0
s=1 and t=2: phi[1, 2] = 2.0
s=2 and t=2: phi[2, 2] = 2.0
delta[:,t] = [0.0392  0.04704 0.0294 ]
s=0 and t=3: phi[0, 3] = 0.0
s=1 and t=3: phi[1, 3] = 1.0
s=2 and t=3: phi[2, 3] = 1.0
delta[:,t] = [0.        0.0042336 0.0148176]
s=0 and t=4: phi[0, 4] = 2.0
s=1 and t=4: phi[1, 4] = 2.0
s=2 and t=4: phi[2, 4] = 2.0
delta[:,t] = [0.00296352 0.00355622 0.00222264]
s=0 and t=5: phi[0, 5] = 0.0
s=1 and t=5: phi[1, 5] = 1.0
s=2 and t=5: phi[2, 5] = 1.0
delta[:,t] = [0.00088906 0.00128024 0.00048009]
s=0 and t=6: phi[0, 6] = 0.0
s=1 and t=6: phi[1, 6] = 1.0
s=2 and t=6: phi[2, 6] = 1.0
delta[:,t] = [0.         0.00011522 0.00040328]
s=0 and t=7: phi[0, 7] = 2.0
s=1 and t=7: phi[1, 7] = 2.0
s=2 and t=7: phi[2, 7] = 2.0
delta[:,t] = [8.06551603e-05 9.67861924e-05 6.04913702e-05]
s=0 and t=8: phi

Unnamed: 0,Observation,Best_Path
0,Hot,Sunshine
1,Hot,Sunshine
2,Cold,Rain
3,Hot,Sunshine
4,Cold,Rain
5,Cold,Rain
6,Hot,Sunshine
7,Cold,Rain
8,Hot,Sunshine
9,Hot,Sunshine


In [28]:
def viterbi(pi, a, b, obs):
    
    nStates = np.shape(b)[0]
    T = np.shape(obs)[0]
    
    # init blank path
    path = path = np.zeros(T,dtype=int)
    # delta --> highest probability of any path that reaches state i
    delta = np.zeros((nStates, T))
    # phi --> argmax by time step for each state
    phi = np.zeros((nStates, T))
    
    # init delta and phi 
    delta[:, 0] = pi * b[:, obs[0]]
    phi[:, 0] = 0
    print('\nStart Walk Forward\n')    
    # the forward algorithm extension
    for t in range(1, T):
        print( 'delta[:,t] = {delta}'.format(delta=delta[:, t-1]))
        for s in range(nStates):
            delta[s, t] = np.max(delta[:, t-1] * a[:, s]) * b[s, obs[t]] 
            phi[s, t] = np.argmax(delta[:, t-1] * a[:, s])
            print('s={s} and t={t}: phi[{s}, {t}] = {phi}'.format(s=s, t=t, phi=phi[s, t]))
    
    # find optimal path
    print('-'*50)
    print('Start Backtrace\n')
    path[T-1] = np.argmax(delta[:, T-1])
    for t in range(T-2, -1, -1):
        path[t] = phi[path[t+1], [t+1]]
        print('path[{}] = {}'.format(t, path[t]))
        
    return path, delta, phi