In [233]:
import pytest
import numpy as np
from src.models.hmm import HiddenMarkovModel
from src.models.decoders import ViterbiAlgorithm

In [5]:
observation_states = ['on-time','late']
hidden_states = ['no-traffic','traffic']

In [7]:
use_case_one_data = np.load('./data/UserCase-One.npz')

In [11]:
hmm = HiddenMarkovModel(observation_states,
                                         hidden_states,
                      use_case_one_data['prior_probabilities'], # prior probabilities of hidden states in the order specified in the hidden_states list
                      use_case_one_data['transition_probabilities'], # transition_probabilities[:,hidden_states[i]]
                      use_case_one_data['emission_probabilities']) # emission_probabilities[hidden_states[i],:][:,observation_states[j]]

In [195]:
hmm.transition_probabilities

array([[0.8, 0.2],
       [0.4, 0.6]])

In [49]:
hmm.observation_states

['on-time', 'late']

In [194]:
hmm.emission_probabilities

array([[0.8, 0.2],
       [0.4, 0.6]])

In [310]:
hidden_states = ['A','B']
observation_states = ['H','L']
prior_probability = np.array([0.9, 0.1])
transition_probabilities = np.array([[0.1,0.9],[0.2,0.8]])
emission_probabilities = np.array([[0.1,0.9],[0.2,0.8]])
hmm = HiddenMarkovModel(observation_states,
                                         hidden_states, prior_probability, transition_probabilities, emission_probabilities)

viterbi = ViterbiAlgorithm(hmm)
best = viterbi.best_hidden_state_sequence(['H','H','H','H'])
np.alltrue(best==np.array(['A', 'B', 'B', 'B']))

True

In [145]:
delta

array([0.57142857, 0.42857143])

In [140]:
path

array([[1., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.]])

In [64]:
use_case_one_data['observation_states']

array(['on-time', 'on-time', 'late', 'late', 'late', 'on-time'],
      dtype='<U7')

In [155]:
hmm.emission_probabilities[:,state_0]

array([0.8, 0.4])

In [157]:
hmm.emission_probabilities

array([[0.8, 0.2],
       [0.4, 0.6]])

In [154]:
hmm.prior_probabilities

array([0.67, 0.33])

In [159]:
max_prob

array([0.31661238, 0.47491857])

In [158]:
len(use_case_one_data['observation_states'])

6

In [227]:
path = np.zeros((len(use_case_one_data['observation_states']), 
                         len(hmm.hidden_states)))
state_0 = hmm.observation_states_dict[use_case_one_data['observation_states'][0]]
delta = np.multiply(hmm.prior_probabilities, hmm.emission_probabilities[:,state_0])
#print(delta)
delta = delta / np.sum(delta)
path[0,:] = [hidden_state_index for hidden_state_index in range(len(hmm.hidden_states))]
#path[0,np.argmax(delta)] = 1

best_path = np.zeros((len(use_case_one_data['observation_states']), 
                         len(hmm.hidden_states)))


for trellis_node in range(1, len(use_case_one_data['observation_states'])):
    product_of_delta_and_transition_emission =  np.multiply(delta, hmm.transition_probabilities.transpose())
    #print(product_of_delta_and_transition_emission.transpose())
    max_pointer = np.argmax(product_of_delta_and_transition_emission.transpose(), axis = 0)
    max_prob = np.max(product_of_delta_and_transition_emission.transpose(), axis = 0)
    
    state = hmm.observation_states_dict[use_case_one_data['observation_states'][trellis_node]]
    path[trellis_node,:] = max_pointer
    delta = np.multiply(max_prob,hmm.emission_probabilities[:,state])
    delta = delta / np.sum(delta)
    
    high_prob = np.argmax(delta)
    
    for hidden_state in range(len(hmm.hidden_states)):
        if hidden_state == max_pointer[np.argmax(delta)]:
            best_path[trellis_node-1, hidden_state] = 1
        else:
            best_path[trellis_node-1, hidden_state] = 0
        
    path = best_path.copy()


In [231]:
for hidden_state in range(len(hmm.hidden_states)):
    if hidden_state == np.argmax(delta):
        best_path[-1, hidden_state] = 1
    else:
        best_path[-1, hidden_state] = 0


decode_hidden = []
for node in range(0,len(use_case_one_data['observation_states'])):
    idx = np.argmax(best_path[node,:])
    decode_hidden.append(hmm.hidden_states_dict[idx])
best_hidden_state_path = np.array(decode_hidden)


In [232]:
best_hidden_state_path

array(['no-traffic', 'no-traffic', 'traffic', 'traffic', 'traffic',
       'no-traffic'], dtype='<U10')

In [101]:
probs = max_prob*hmm.emission_probabilities[:,state]

In [207]:
delta

array([0.57142857, 0.42857143])

In [176]:
use_case_one_data['hidden_states']

array(['no-traffic', 'no-traffic', 'traffic', 'traffic', 'traffic',
       'on-time'], dtype='<U10')

In [127]:
use_case_one_data['observation_states']

array(['on-time', 'on-time', 'late', 'late', 'late', 'on-time'],
      dtype='<U7')

0

'on-time'

{'on-time': 0, 'late': 1}

0
1


In [65]:
path[0,:]

array([0., 0.])

In [260]:
prog_cm_data = np.load('./data/ProjectDeliverable-ProgenitorCMs.npz')
observation_states = ['regulatory', 'regulatory-potential']
hidden_states = ['encode-atac', 'atac']

In [261]:
prog_cm_hmm_object = HiddenMarkovModel(observation_states,
                                           hidden_states,
                                           prog_cm_data['prior_probabilities'], #  prior probabilities of hidden states in the order specified in the hidden_states list
                                           prog_cm_data['transition_probabilities'], # transition_probabilities[:,hidden_states[i]]
                                           prog_cm_data['emission_probabilities'])

In [243]:
prog_cm_viterbi_instance = ViterbiAlgorithm(prog_cm_hmm_object)

In [244]:
evaluate_viterbi_decoder_using_observation_states_of_prog_cm = prog_cm_viterbi_instance.best_hidden_state_sequence(prog_cm_data['observation_states'])

In [245]:
prog_cm_hmm_object.hidden_states_dict

{0: 'encode-atac', 1: 'atac'}

In [246]:
evaluate_viterbi_decoder_using_observation_states_of_prog_cm

array(['encode-atac', 'atac', 'encode-atac', 'encode-atac', 'atac',
       'encode-atac', 'atac', 'atac', 'atac', 'encode-atac'], dtype='<U11')

In [250]:
prog_cm_data['hidden_states']

array(['atac', 'encode-atac', 'encode-atac', 'atac', 'atac',
       'encode-atac', 'encode-atac', 'encode-atac', 'encode-atac',
       'encode-atac'], dtype='<U11')

In [251]:
prog_cm_hmm_object.prior_probabilities

array([0.5, 0.5])

In [252]:
prog_cm_hmm_object.transition_probabilities

array([[0.4, 0.6],
       [0.5, 0.5]])

In [253]:
prog_cm_hmm_object.emission_probabilities

array([[0.6, 0.4],
       [0.8, 0.2]])

In [297]:
prog_cm_hmm_object.observation_states_dict

{'regulatory': 0, 'regulatory-potential': 1}

In [255]:
    prim_cm_data = np.load('./data/ProjectDeliverable-PrimitiveCMs.npz')

    # Instantiate submodule class models.ViterbiAlgorithm with the progenitor cardiomyocyte's HMM
    prim_cm_viterbi_instance = ViterbiAlgorithm(prog_cm_hmm_object)

    # Decode the hidden states of the primitive cardiomyocyte's regulatory observation states
    decoded_hidden_states_for_observed_states_of_prim_cm = prim_cm_viterbi_instance.best_hidden_state_sequence(prim_cm_data['observation_states'])
    

In [256]:
np.sum(prim_cm_data['hidden_states'] == decoded_hidden_states_for_observed_states_of_prim_cm)/len(prim_cm_data['observation_states'])

0.4

In [257]:
prim_cm_data['hidden_states']

array(['atac', 'encode-atac', 'encode-atac', 'atac', 'atac',
       'encode-atac', 'encode-atac', 'encode-atac', 'encode-atac',
       'encode-atac'], dtype='<U11')

In [269]:
decoded_hidden_states_for_observed_states_of_prim_cm

array(['encode-atac', 'atac', 'encode-atac', 'encode-atac', 'atac',
       'encode-atac', 'atac', 'atac', 'atac', 'encode-atac'], dtype='<U11')

In [259]:
proj_cm_d['hidden_states']

NameError: name 'proj_cm_data' is not defined

In [267]:
prog_cm_data['hidden_states']

array(['atac', 'encode-atac', 'encode-atac', 'atac', 'atac',
       'encode-atac', 'encode-atac', 'encode-atac', 'encode-atac',
       'encode-atac'], dtype='<U11')

In [268]:
prim_cm_data['hidden_states']

array(['atac', 'encode-atac', 'encode-atac', 'atac', 'atac',
       'encode-atac', 'encode-atac', 'encode-atac', 'encode-atac',
       'encode-atac'], dtype='<U11')

In [296]:
use_case_one_data = prog_cm_data
hmm = prog_cm_hmm_object


path = np.zeros((len(use_case_one_data['observation_states']), 
                         len(hmm.hidden_states)))
state_0 = hmm.observation_states_dict[use_case_one_data['observation_states'][0]]
delta = np.multiply(hmm.prior_probabilities, hmm.emission_probabilities[:,state_0])
#print(delta)
delta = delta / np.sum(delta)
path[0,:] = [hidden_state_index for hidden_state_index in range(len(hmm.hidden_states))]
#path[0,np.argmax(delta)] = 1

best_path = np.zeros((len(use_case_one_data['observation_states']), 
                         len(hmm.hidden_states)))


for trellis_node in range(1, len(use_case_one_data['observation_states'])):
    product_of_delta_and_transition_emission =  np.multiply(delta, hmm.transition_probabilities.transpose())
    print(product_of_delta_and_transition_emission.transpose())
    max_pointer = np.argmax(product_of_delta_and_transition_emission.transpose(), axis = 0)
    max_prob = np.max(product_of_delta_and_transition_emission.transpose(), axis = 0)
    #print(max_prob)
    
    state = hmm.observation_states_dict[use_case_one_data['observation_states'][trellis_node]]
    path[trellis_node,:] = max_pointer
    delta = np.multiply(max_prob,hmm.emission_probabilities[:,state])
    delta = delta / np.sum(delta)
    #print(delta)
    
    high_prob = np.argmax(delta)
    
    for hidden_state in range(len(hmm.hidden_states)):
        if hidden_state == max_pointer[np.argmax(delta)]:
            best_path[trellis_node-1, hidden_state] = 1
        else:
            best_path[trellis_node-1, hidden_state] = 0
        
    # path = best_path.copy()

[[0.17142857 0.25714286]
 [0.28571429 0.28571429]]
[[0.26666667 0.4       ]
 [0.16666667 0.16666667]]
[[0.22857143 0.34285714]
 [0.21428571 0.21428571]]
[[0.22857143 0.34285714]
 [0.21428571 0.21428571]]
[[0.13333333 0.2       ]
 [0.33333333 0.33333333]]
[[0.26666667 0.4       ]
 [0.16666667 0.16666667]]
[[0.13333333 0.2       ]
 [0.33333333 0.33333333]]
[[0.17142857 0.25714286]
 [0.28571429 0.28571429]]
[[0.17142857 0.25714286]
 [0.28571429 0.28571429]]


In [294]:
path

array([[0., 1.],
       [1., 1.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [1., 1.],
       [0., 0.],
       [1., 1.],
       [1., 1.],
       [1., 1.]])

In [286]:
delta

array([0.66666667, 0.33333333])

In [276]:
hmm.emission_probabilities

array([[0.6, 0.4],
       [0.8, 0.2]])

In [280]:
hmm.hidden_states_dict

{0: 'encode-atac', 1: 'atac'}

In [None]:
encode,atac,encode,encode,encode,atac,encode,atac,atac,encode

In [289]:
prog_cm_data['hidden_states']

array(['atac', 'encode-atac', 'encode-atac', 'atac', 'atac',
       'encode-atac', 'encode-atac', 'encode-atac', 'encode-atac',
       'encode-atac'], dtype='<U11')

In [320]:
observation_states = ['committed','ambivalent'] # A graduate student's dedication to their rotation lab
    
    # index annotation hidden_states=[i,j]
hidden_states = ['R01','R21'] # The NIH funding source of the graduate student's rotation project 

    # PONDERING QUESTION: How would a user define/compute their own HMM instantiation inputs to decode the hidden states for their use case observations?
use_case_one_data = np.load('./data/UserCase-Lecture.npz')

    # Instantiate submodule class models.HiddenMarkovModel with
    # observation and hidden states and prior, transition, and emission probabilities.
use_case_one_hmm = HiddenMarkovModel(observation_states,
                                         hidden_states,
                      use_case_one_data['prior_probabilities'], # prior probabilities of hidden states in the order specified in the hidden_states list
                      use_case_one_data['transition_probabilities'], # transition_probabilities[:,hidden_states[i]]
                      use_case_one_data['emission_probabilities']) # emission_probabilities[hidden_states[i],:][:,observation_states[j]]
    
    # Instantiate submodule class models.ViterbiAlgorithm using the use case one HMM 
use_case_one_viterbi = ViterbiAlgorithm(use_case_one_hmm)

In [336]:
len(use_case_one_hmm.observation_states)

2

In [337]:
use_case_one_viterbi.hmm_object == use_case_one_hmm

True