<pre>
We will be build a observation scorer to measure the probsbility of a sequence of observation
We will then build a predictor for optimul path (sequence of states) for a given observation to happen
We will then cross check the predictions against hmmlearn standard library.
</pre>

In [1]:
import numpy as np
from hmmlearn import hmm
import operator

In [2]:
np.random.seed(777)

In [3]:
states = ["Rainy", "Sunny"]
n_states = len(states)

observations = ["walk", "shop", "clean"]
n_observations = len(observations)

start_probability = np.array([0.2, 0.8]) # started observation on a sunny day

transition_probability = np.array([
  [0.7, 0.3],
  [0.4, 0.6]
])

emission_probability = np.array([
  [0.1, 0.4, 0.5],
  [0.6, 0.3, 0.1]
])

In [4]:
# predict a sequence of hidden states based on visible states
# Bob says: walk, clean, shop, shop, clean, walk
bob_says = np.array([[0, 2, 1, 1, 2, 0]]).T

### Custom implementation of HMM

In [5]:
def get_obs_prob_for_hidden_state(observation, hidden_state):
    # observation cound be 0, 1, 2
    # hidden_state could be 0, 1
    return emission_probability[hidden_state][observation]

In [6]:
def get_hidden_state_prob_given_prev_state(hidden_state, prev_hidden_state=None):
    if prev_hidden_state is None:
        return start_probability[hidden_state]
    return transition_probability[prev_hidden_state][hidden_state]

In [7]:
def get_hidden_state_prob(hidden_state, observation_step):
    if observation_step == 0:
        return start_probability[hidden_state]
    prob_score = 0
    for state_id in range(n_states):
        prob_score += get_hidden_state_prob_given_prev_state(hidden_state, state_id) * \
                          get_hidden_state_prob(state_id, observation_step - 1)
    return prob_score

In [8]:
def get_observation_prob(observation, observation_step):
    prob_score = 0
    for state_id in range(n_states):
        prob_score += get_obs_prob_for_hidden_state(observation, state_id) *\
                          get_hidden_state_prob(state_id, observation_step)
    return prob_score

In [9]:
def get_hidden_state_prob_by_observation(observation, state_id, observation_step):
    obs_prob_for_state = get_obs_prob_for_hidden_state(observation, state_id)
    state_prob = get_hidden_state_prob(state_id, observation_step)
    obs_prob = get_observation_prob(observation, observation_step)
    return obs_prob_for_state * state_prob/ obs_prob

In [10]:
get_hidden_state_prob_by_observation(0, 0, 2)

0.1625377643504532

### Get observation score

In [11]:
def get_path_score_by_states(observed_values, debug=False):
    state_probs = [0 for s_id, state in enumerate(states)]
    if len(observed_values) == 1:
        prev_scores = [1]
    else:
        prev_scores = get_path_score_by_states(observed_values[:-1], debug)
    curr_observation = observed_values[-1]
    for s_id_prev, prev_score in enumerate(prev_scores):
        score = 0
        if len(observed_values) == 1:
            s_id_prev = None 
        for s_id, state in enumerate(states):
            state_prob = get_hidden_state_prob_given_prev_state(s_id, s_id_prev)
            obs_prob = get_obs_prob_for_hidden_state(curr_observation, s_id)
            if debug: print('s_id_prev, s_id, prev_score, state_prob, obs_prob', s_id_prev, s_id, prev_score, state_prob, obs_prob)
            score = state_prob * obs_prob * prev_score
            state_probs[s_id] += score
    if debug: print('state_probs', observed_values, ':', state_probs)
    return state_probs

In [12]:
def get_path_score(observed_values, debug=False):
    return np.log(sum(get_path_score_by_states(observed_values, debug)))

In [13]:
# just observation scores
for v in range(len(bob_says.T[0])):
    print(v, bob_says.T[0][:v+1], get_path_score(bob_says.T[0][:v+1]))

0 [0] -0.6931471805599453
1 [0 2] -2.021927635479229
2 [0 2 1] -3.034348369525084
3 [0 2 1 1] -4.053450634289115
4 [0 2 1 1 2] -5.129269787139621
5 [0 2 1 1 2 0] -6.447971343364465


In [14]:
get_path_score(bob_says.T[0])

-6.447971343364465

### Get most probable sequence of state to generate a given observation

In [15]:
import itertools

In [16]:
# observed_values = bob_says.T[0]

In [17]:
def get_all_possible_path_score_by_states(observed_values, debug=False):
    path_state_probs = {}
    if len(observed_values) == 1:
        prev_path_state_probs = {'': 0}
    else:
        prev_path_state_probs = get_all_possible_path_score_by_states(observed_values[:-1], debug)
    curr_observation = observed_values[-1]
    for prev_path, prev_score in prev_path_state_probs.items():
        score = 0
        if len(prev_path) == 0:
            s_id_prev = None 
        else:
            s_id_prev = int(prev_path[-1])
        for s_id, state in enumerate(states):
            new_path = prev_path + str(s_id)
            state_prob = get_hidden_state_prob_given_prev_state(s_id, s_id_prev)
            obs_prob = get_obs_prob_for_hidden_state(curr_observation, s_id)
            if debug: print('s_id_prev, s_id, prev_score, state_prob, obs_prob', s_id_prev, s_id, prev_score, state_prob, obs_prob)
            score = np.log(state_prob) + np.log(obs_prob) + prev_score
#             if debug: print('score', score, np.log(score))
#             new_score = np.log(score)
            path_state_probs[new_path] = score
    if debug: print('path_state_probs', path_state_probs)
    return path_state_probs

In [18]:
def get_best_path(observed_values, debug=False):
    res = get_all_possible_path_score_by_states(observed_values, debug)
    prob_states, prob_score = sorted(res.items(), key=lambda val: val[1], reverse=True)[0]
    return prob_states, prob_score

In [19]:
prob_states, prob_score = get_best_path(bob_says.T[0])

In [20]:
prob_states, prob_score

('100001', -7.653958991730681)

In [21]:
print("Bob says:", ", ".join(list(map(lambda x: observations[x], bob_says.T[0]))))
print("Alice hears:", ", ".join(list(map(lambda x: states[int(x)], list(prob_states)))))

Bob says: walk, clean, shop, shop, clean, walk
Alice hears: Sunny, Rainy, Rainy, Rainy, Rainy, Sunny


### The Viterbi Algorithm

In [22]:
from collections import defaultdict

In [23]:
def get_optimul_viterbi_path(observed_values, debug=False):
    state_path_prob = defaultdict(list)
    for s_id, state in enumerate(states):
        obs_prob = get_obs_prob_for_hidden_state(observed_values[0], s_id)
        state_path_prob[0].append({
            's_id': s_id,
            'score': np.log(get_hidden_state_prob_given_prev_state(s_id, None)) + np.log(obs_prob),
            'path': str(s_id)
        })
    for idx, obs_val in enumerate(observed_values[1:]):
        for s_id, state in enumerate(states):
            new_state_id = None
            new_state_score = None
            path_so_far = None
            obs_prob = get_obs_prob_for_hidden_state(obs_val, s_id)
            
            for s_id_prev, _ in enumerate(states):
                prev_score = state_path_prob[idx][s_id_prev]['score']
                state_prob = get_hidden_state_prob_given_prev_state(s_id, s_id_prev)
                score = np.log(state_prob) + np.log(obs_prob) + prev_score
                if debug: print('s_id_prev, s_id, prev_score, state_prob, obs_prob, score', s_id_prev, s_id, prev_score, state_prob, obs_prob, score)
                if new_state_id is None or score > new_state_score:
                    new_state_id = s_id
                    new_state_score = score
                    path_so_far = state_path_prob[idx][s_id_prev]['path']
            state_path_prob[idx+1].append({
                's_id': new_state_id,
                'score': new_state_score,
                'path': path_so_far + str(new_state_id)
            })
            if debug: print('idx, s_id, score, path', idx, new_state_id, new_state_score, path_so_far + str(new_state_id))
        if debug: print("--")
    return state_path_prob

In [24]:
def get_viterbi_path(observed_values, debug=False):
    res = get_optimul_viterbi_path(observed_values, debug)
    final_res = sorted(res[max(res)], key=lambda val: val['score'], reverse=True)[0]
    return final_res['path'], final_res['score']

In [25]:
path, score = get_viterbi_path(bob_says.T[0])

In [26]:
path, score

('100001', -7.653958991730681)

In [27]:
# Most probable state transition for the given observations
res = get_optimul_viterbi_path(bob_says.T[0])
final_res = sorted(res[max(res)], key=lambda val: val['score'], reverse=True)[0]

In [28]:
for idx, state in enumerate(final_res['path']):
    for path_vals in res[idx]:
        if list(path_vals['path'])[-1] == state:
            print(idx, bob_says.T[0][0:idx], path_vals['path'], path_vals['score'])
            break

0 [] 1 -0.7339691750802004
1 [0] 10 -2.3434070875143007
2 [0 2] 100 -3.6163727633271883
3 [0 2 1] 1000 -4.889338439140076
4 [0 2 1 1] 10000 -5.939160563638754
5 [0 2 1 1 2] 100001 -7.653958991730681


In [29]:
print("Bob says:", ", ".join(list(map(lambda x: observations[x], bob_says.T[0]))))
print("Alice hears:", ", ".join(list(map(lambda x: states[int(x)], list(path)))))

Bob says: walk, clean, shop, shop, clean, walk
Alice hears: Sunny, Rainy, Rainy, Rainy, Rainy, Sunny


### Using hmmlearn MultinomialHMM

In [30]:
model = hmm.MultinomialHMM(n_components=n_states, random_state=777)

In [31]:
model.startprob_ = start_probability
model.transmat_ = transition_probability
model.emissionprob_ = emission_probability

In [32]:
model._check()

In [33]:
logprob, alice_hears = model.decode(bob_says, algorithm="viterbi")

In [34]:
logprob, alice_hears

(-7.65395899173068, array([1, 0, 0, 0, 0, 1]))

In [35]:
print("Bob says:", ", ".join(list(map(lambda x: observations[x], bob_says.T[0]))))
print("Alice hears:", ", ".join(map(lambda x: states[x], alice_hears)))

Bob says: walk, clean, shop, shop, clean, walk
Alice hears: Sunny, Rainy, Rainy, Rainy, Rainy, Sunny


In [36]:
# Most probable state transition for the given observations
for v in range(len(bob_says.T[0])):
    logprob, alice_hears = model.decode(bob_says[:v+1], algorithm="viterbi")
    print(v, bob_says.T[0][:v+1], logprob, alice_hears)

0 [0] -0.7339691750802004 [1]
1 [0 2] -2.3434070875143007 [1 0]
2 [0 2 1] -3.6163727633271883 [1 0 0]
3 [0 2 1 1] -4.889338439140076 [1 0 0 0]
4 [0 2 1 1 2] -5.939160563638754 [1 0 0 0 0]
5 [0 2 1 1 2 0] -7.65395899173068 [1 0 0 0 0 1]


In [37]:
# just observation scores
for v in range(len(bob_says.T[0])):
    print(v, bob_says.T[0][:v+1], model.score([bob_says.T[0][:v+1]]))

0 [0] -0.6931471805599452
1 [0 2] -2.021927635479229
2 [0 2 1] -3.034348369525084
3 [0 2 1 1] -4.053450634289115
4 [0 2 1 1 2] -5.12926978713962
5 [0 2 1 1 2 0] -6.447971343364464


In [38]:
model.score([bob_says.T[0]])

-6.447971343364464