In [1]:
import numpy as np
from src.models.hmm import HiddenMarkovModel
#from src.models.decoders import ViterbiAlgorithm

In [27]:
import numpy as np
class HiddenMarkovModel:
    """_summary_
    """

    def __init__(self, observation_states: np.ndarray, hidden_states: np.ndarray, prior_probabilities: np.ndarray, transition_probabilities: np.ndarray, emission_probabilities: np.ndarray):
        """_summary_

        Args:
            observation_states (np.ndarray): _description_
            hidden_states (np.ndarray): _description_
            prior_probabilities (np.ndarray): _description_
            transition_probabilities (np.ndarray): _description_
            emission_probabilities (np.ndarray): _description_
        """             
        self.observation_states = observation_states
        self.observation_states_dict = {observation_state: observation_state_index \
                                  for observation_state_index, observation_state in enumerate(list(self.observation_states))}

        self.hidden_states = hidden_states
        self.hidden_states_dict = {hidden_state_index: hidden_state \
                                   for hidden_state_index, hidden_state in enumerate(list(self.hidden_states))}
        

        self.prior_probabilities = prior_probabilities
        self.transition_probabilities = transition_probabilities
        self.emission_probabilities = emission_probabilities

In [247]:
import copy
import numpy as np
class ViterbiAlgorithm:
    """_summary_
    """    

    def __init__(self, hmm_object):
        """_summary_

        Args:
            hmm_object (_type_): _description_
        """              
        self.hmm_object = hmm_object

    def best_hidden_state_sequence(self, decode_observation_states: np.ndarray) -> np.ndarray:
        """_summary_

        Args:
            decode_observation_states (np.ndarray): _description_

        Returns:
            np.ndarray: _description_
        """        
        
        # Initialize path (i.e., np.arrays) to store the hidden sequence states returning the maximum probability
        path = np.zeros((len(decode_observation_states),
                         len(self.hmm_object.hidden_states)))
        path[0,:] = [hidden_state_index for hidden_state_index in range(len(self.hmm_object.hidden_states))]

        best_path = np.zeros(len(decode_observation_states))
        
        # Compute initial delta:
        # 1. Calculate the product of the prior and emission probabilities. This will be used to decode the first observation state.
        delta = np.multiply(self.hmm_object.prior_probabilities, np.transpose(self.hmm_object.emission_probabilities[:, self.hmm_object.observation_states_dict.get(decode_observation_states[0])]))
        # 2. Scale: normalize probability of transitioning to next observed state given transition/emission probs of hidden state
        # delta values sum to 1
        delta = delta / np.sum(delta)
        best_path[0] = path[0][np.argmax(delta)]
        # For each observation state to decode, select the hidden state sequence with the highest probability (i.e., Viterbi trellis)
        # trellis is way of walking down the path of observed hidden states, the path that we take
        for trellis_node in range(1, len(decode_observation_states)):
            # product of delta and transition
            # then product between that and emission probability of the observation in the nth state of trellis path
            product_of_delta_and_transition_emission = np.multiply(delta, self.hmm_object.transition_probabilities.transpose())
            # emission: prob that hidden state is emitting the observed state. rows are hidden states and columns are obs
            product_of_delta_and_transition_emission = np.multiply(product_of_delta_and_transition_emission, self.hmm_object.emission_probabilities[:, self.hmm_object.observation_states_dict.get(decode_observation_states[trellis_node])])
            # maximize product_of_delta_and_transition_emission
            # max for each COLUMN (what each hidden state/obs state corresponds to), then transpose so 
            max_prob_hidden_state = product_of_delta_and_transition_emission.max(axis=1).transpose()
            # obs state are rows now, hidden states are columns
            # scale probabilities
            scaled_max_probs = max_prob_hidden_state / np.sum(max_prob_hidden_state)
            # track indices of observed states (to decode hidden states) based on the max value we chose (rows are observed states in product_of_delta_and_transition_emission)
            obs_state_index = product_of_delta_and_transition_emission.argmax(axis=1)
            # add the observed indices to the path variable to keep track
            path[trellis_node] = obs_state_index
            # using our delta, get the next observed state based on the max probability
            best_path[trellis_node - 1] = path[trellis_node - 1][np.argmax(scaled_max_probs)]
            # recalculate delta
            delta = np.multiply(self.hmm_object.prior_probabilities, np.transpose(self.hmm_object.emission_probabilities[:, self.hmm_object.observation_states_dict.get(decode_observation_states[trellis_node])]))
            
        best_hidden_state_path = np.array([self.hmm_object.hidden_states[np.int32(index)] for index in best_path])
        return best_hidden_state_path

In [248]:

# index annotation observation_states=[i,j]    
observation_states = ['on-time','late'] 

# index annotation hidden_states=[i,j]
hidden_states = ['no-traffic','traffic']

# PONDERING QUESTION: How would a user define/compute their own HMM instantiation inputs to decode the hidden states for their use case observations?
use_case_one_data = np.load('./data/UserCase-One.npz')

# Instantiate submodule class models.HiddenMarkovModel with
# observation and hidden states and prior, transition, and emission probabilities.
use_case_one_hmm = HiddenMarkovModel(observation_states,
                                     hidden_states,
                  use_case_one_data['prior_probabilities'], # prior probabilities of hidden states in the order specified in the hidden_states list
                  use_case_one_data['transition_probabilities'], # transition_probabilities[:,hidden_states[i]]
                  use_case_one_data['emission_probabilities']) # emission_probabilities[hidden_states[i],:][:,observation_states[j]]

# Instantiate submodule class models.ViterbiAlgorithm using the use case one HMM 
use_case_one_viterbi = ViterbiAlgorithm(use_case_one_hmm)

use_case_one_viterbi.best_hidden_state_sequence(use_case_one_data['observation_states'])


[0.51353293 0.12838323]
[0 0]
[0.08576 0.04752]
[0 1]
[0.04752 0.07128]
[1 1]
[0.04752 0.07128]
[1 1]
[0.08576 0.04752]
[0 1]
[0. 0. 1. 1. 1. 0.]
['no-traffic' 'no-traffic' 'traffic' 'traffic' 'traffic' 'no-traffic']


In [2]:
np.savez('data/UserCase-One.npz',
         prior_probabilities=np.array([0.67, 0.33]),
         transition_probabilities=np.array([[0.8, 0.2], 
                                         [0.4, 0.6]]) ,
         emission_probabilities=np.array([[0.8, 0.2],
                                       [0.4, 0.6]]),
         observation_states=np.array(['on-time', 'on-time', 'late', 'late', 'late', 'on-time']),
         hidden_states=np.array(['no-traffic','no-traffic', 'traffic', 'traffic', 'traffic', 'no-traffic']))

In [2]:
np.savez('data/PersonalCase.npz',
         prior_probabilities=np.array([0.3, 0.7]),
         transition_probabilities=np.array([[0.83, 0.17], 
                                         [0.22, 0.88]]) ,
         emission_probabilities=np.array([[0.42, 0.38],
                                       [0.4, 0.6]]),
         observation_states=np.array(['rain', 'rain', 'sunny', 'rain', 'sunny', 'sunny']),
         hidden_states=np.array(['high-humidity','high-humidity', 'low-humidity', 'high-humidity', 'low-humidity', 'high-humidity']))

In [3]:
np.savez('data/PersonalCase2.npz',
         prior_probabilities=np.array([0.27, 0.33, 0.40]),
         transition_probabilities=np.array([[0.7, 0.2, 0.1], 
                                         [0.2, 0.6, 0.2],
                                           [0.3, 0.1, 0.6]]) ,
         emission_probabilities=np.array([[0.7, 0.2, 0.1], 
                                         [0.2, 0.6, 0.2],
                                           [0.3, 0.1, 0.6]]),
         # educational status are the observations
         observation_states=np.array(['lower', 'lower', 'middle', 'upper', 'upper', 'upper', 'middle', 'lower', 'lower']),
         hidden_states=np.array(['high-ace','high-ace', 'some-ace', 'low-ace', 'low-ace', 'low-ace', 'some-ace', 'high-ace', 'high-ace']))

In [16]:
use_case_one_hmm.prior_probabilities

array([0.67, 0.33])

In [4]:
np.savez('data/UserCase-Lecture.npz',
         prior_probabilities=np.array([0.67, 0.33]),
         transition_probabilities=np.array([[0.8, 0.2], 
                                         [0.4, 0.6]]) ,
         emission_probabilities=np.array([[0.8, 0.2],
                                       [0.4, 0.6]]),
         observation_states=np.array(['committed', 'committed', 'ambivalent', 'ambivalent', 'ambivalent', 'committed']),
         hidden_states=np.array(['R01','R01', 'R21', 'R21', 'R21', 'R01']))

In [5]:
observation_states = ['committed','ambivalent'] # A graduate student's dedication to their rotation lab
    
    # index annotation hidden_states=[i,j]
hidden_states = ['R01','R21'] # The NIH funding source of the graduate student's rotation project 

    # PONDERING QUESTION: How would a user define/compute their own HMM instantiation inputs to decode the hidden states for their use case observations?
use_case_one_data = np.load('data/UserCase-Lecture.npz')

    # Instantiate submodule class models.HiddenMarkovModel with
    # observation and hidden states and prior, transition, and emission probabilities.
use_case_one_hmm = HiddenMarkovModel(observation_states,
                                         hidden_states,
    use_case_one_data['prior_probabilities'], # prior probabilities of hidden states in the order specified in the hidden_states list
    use_case_one_data['transition_probabilities'], # transition_probabilities[:,hidden_states[i]]
    use_case_one_data['emission_probabilities'])

In [6]:
self = use_case_one_hmm
self.hmm_object = use_case_one_hmm

path = np.zeros((len(use_case_one_data['observation_states']), 
                         len(self.hmm_object.hidden_states)))
path[0,:] = [hidden_state_index for hidden_state_index in range(len(self.hmm_object.hidden_states))]

best_path = np.zeros((len(use_case_one_data['observation_states']), 
                         len(self.hmm_object.hidden_states)))   

In [15]:
path

array([[0., 1.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.]])

In [16]:
path[0,:]

array([0., 1.])

In [17]:
best_path

array([[0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.]])

In [18]:
observation_states

['committed', 'ambivalent']

In [24]:
hidden_states

['R01', 'R21']

In [25]:
self.prior_probabilities # correspond to the observation states

array([0.67, 0.33])

In [26]:
self.emission_probabilities
# use_case_one_data['emission_probabilities']) # emission_probabilities[hidden_states[i],:][:,observation_states[j]]
# what is this :(
# hidden state row and observation is column??

array([[0.8, 0.2],
       [0.4, 0.6]])

In [33]:
self.observation_states_dict

{'committed': 0, 'ambivalent': 1}

In [9]:
decode_observation_states = use_case_one_data['observation_states']
# Compute initial delta:
        # 1. Calculate the product of the prior and emission probabilities. This will be used to decode the first observation state.
        # 2. Scale 
delta = [np.multiply(self.prior_probabilities[state], self.emission_probabilities[state][self.observation_states_dict.get(decode_observation_states[0])]) for state in range(len(self.hidden_states))]
delta

[0.536, 0.132]

In [17]:
delta = np.multiply(self.prior_probabilities, self.emission_probabilities[self.observation_states_dict.get(decode_observation_states[0])])
delta

array([0.536, 0.066])

In [21]:
self.transition_probabilities # transition_probabilities[:,hidden_states[i]]

array([[0.8, 0.2],
       [0.4, 0.6]])

In [66]:
range(len(self.observation_states))

range(0, 2)

In [22]:
self.emission_probabilities

array([[0.8, 0.2],
       [0.4, 0.6]])

In [349]:
path = self.hidden_states
path

'R01'

In [367]:
path = np.zeros((len(decode_observation_states),
                         len(self.hmm_object.hidden_states)))
path


array([[0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.]])

In [387]:
best_path= np.zeros((len(decode_observation_states), 
                         len(self.hmm_object.hidden_states)))
best_path

array([[0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.]])

In [365]:
path = self.hidden_states

for trellis_node in range(1, len(decode_observation_states)):
    product_of_delta_and_transition_emission = [(delta * self.transition_probabilities[:, state]) for state in range(len(self.hidden_states))]
    delta = [np.max(product_of_delta_and_transition_emission[state]) * self.emission_probabilities[state][self.observation_states_dict.get(decode_observation_states[trellis_node])] for state in range(len(self.hidden_states))]
    print(product_of_delta_and_transition_emission)
    path = [path[np.argmax(product_of_delta_and_transition_emission[state])] + ", " + self.hidden_states[state] for state in range(len(self.hidden_states))]
    print(path)

final_path = path[np.argmax(delta)]
print(np.array(final_path))

[array([2.39261330e-20, 2.20959623e-21]), array([5.98153324e-21, 3.31439435e-21])]
['R01, R01', 'R01, R21']
[array([8.20762065e-21, 1.26329982e-22]), array([2.05190516e-21, 1.89494973e-22])]
['R01, R01, R01', 'R01, R01, R21']
[array([8.66724741e-23, 9.75065333e-23]), array([2.16681185e-23, 1.46259800e-22])]
['R01, R01, R21, R01', 'R01, R01, R21, R21']
[array([1.02966899e-24, 6.95026570e-24]), array([2.57417248e-25, 1.04253985e-23])]
['R01, R01, R21, R21, R01', 'R01, R01, R21, R21, R21']
[array([7.33948058e-26, 4.95414939e-25]), array([1.83487014e-26, 7.43122408e-25])]
['R01, R01, R21, R21, R21, R01', 'R01, R01, R21, R21, R21, R21']
R01, R01, R21, R21, R21, R01


In [361]:
path[np.argmax(product_of_delta_and_transition_emission[1])]

'R01, R01, R21, R21, R21, R21'

In [448]:
path = np.zeros((len(decode_observation_states),
                         len(self.hmm_object.hidden_states)))
path[0,:] = [hidden_state_index for hidden_state_index in range(len(self.hmm_object.hidden_states))]

best_path = np.zeros((len(decode_observation_states), 
                         len(self.hmm_object.hidden_states)))      

In [237]:
np.transpose(self.hmm_object.emission_probabilities[:, 0])
# need to transpose

array([0.8, 0.4])

In [240]:
delta = np.multiply(self.prior_probabilities, np.transpose(self.hmm_object.emission_probabilities[:, self.hmm_object.observation_states_dict.get(decode_observation_states[0])]))
delta

array([0.536, 0.132])

In [238]:
delta = np.multiply(self.prior_probabilities, np.transpose(self.hmm_object.emission_probabilities[:, self.observation_states_dict.get(decode_observation_states[0])]))
print(delta)
path = np.zeros((len(decode_observation_states),
                         len(self.hmm_object.hidden_states)))
path[0,:] = [hidden_state_index for hidden_state_index in range(len(self.hmm_object.hidden_states))]
best_path = np.zeros((len(decode_observation_states), 
                         len(self.hmm_object.hidden_states)))  

for trellis_node in range(1, len(decode_observation_states)):
    transition_probabilities = delta * self.hmm_object.transition_probabilities[self.observation_states_dict.get(decode_observation_states[trellis_node])] # transition_probabilities[:,hidden_states[i]]
    delta=np.max(transition_probabilities) * self.hmm_object.emission_probabilities[self.observation_states_dict.get(decode_observation_states[trellis_node])]
    for hidden_state in range(len(self.hmm_object.hidden_states)):
        #print(delta)
        path[trellis_node][hidden_state] = (delta[hidden_state])
    #print(path)
    #print(np.argmax(path[trellis_node]))
best_path

[0.536 0.132]


array([[0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.]])

In [192]:
##WORKING

delta = np.multiply(self.prior_probabilities, self.emission_probabilities)

for trellis_node in range(1, len(decode_observation_states)):
    transition_probabilities = delta * self.hmm_object.transition_probabilities[self.observation_states_dict.get(decode_observation_states[trellis_node])] # transition_probabilities[:,hidden_states[i]]
    for hidden_state in range(len(self.hmm_object.hidden_states)):
        delta = [np.max(transition_probabilities[hidden_state]) * self.hmm_object.emission_probabilities[self.observation_states_dict.get(decode_observation_states[trellis_node])]] # get a 1 by 2 matrix with prob of the observed state
        path[trellis_node][hidden_state] = np.argmax(delta)
        print(path)
    best_path[trellis_node] = np.argmax(path[trellis_node])
    print(best_path)
    #print(self.hidden_states[best_path])

IndexError: list index out of range

In [25]:
delta[:, 0]

TypeError: list indices must be integers or slices, not tuple

In [109]:
delta = np.multiply(self.prior_probabilities, self.emission_probabilities)
path = np.zeros((len(decode_observation_states),
                         len(self.hmm_object.hidden_states)))
path[0,:] = [hidden_state_index for hidden_state_index in range(len(self.hmm_object.hidden_states))]
path[1]

array([0., 0.])

In [124]:
best_path = np.zeros((len(decode_observation_states), 
                         len(self.hmm_object.hidden_states)))  
best_path

array([[0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.]])

In [191]:
delta = np.multiply(self.prior_probabilities, self.emission_probabilities)
path = np.zeros((len(decode_observation_states),
                         len(self.hmm_object.hidden_states)))
path[0,:] = [hidden_state_index for hidden_state_index in range(len(self.hmm_object.hidden_states))]

best_path = np.zeros((len(decode_observation_states), 
                         len(self.hmm_object.hidden_states)))      

        # For each observation state to decode, select the hidden state sequence with the highest probability (i.e., Viterbi trellis)
for trellis_node in range(1, len(decode_observation_states)):
    transition_probabilities = (delta * self.hmm_object.transition_probabilities) # get a 4 by 4 matrix, columns are the different hidden state probabilities
    #print(transition_probabilities)
    # Update delta and scale
    delta = np.max(transition_probabilities) * self.hmm_object.emission_probabilities[:, self.hmm_object.observation_states_dict.get(decode_observation_states[trellis_node])] # get a 1 by 2 matrix with prob of the observed state
    # Select the hidden state sequence with the maximum probability
    print(delta)
    path = [np.argmax(delta)] # index of higher prob, corresponding to hidden state
    print(path)
    # Update best path
    best_path[trellis_node, path] = 1
    #print(best_path)
    # Set best hidden state sequence in the best_path np.ndarray THEN copy the best_path to path

        # Select the last hidden state, given the best path (i.e., maximum probability)
best_hidden_state_path = np.argmax(best_path, axis=1)
best_hidden_state_path

[0.34304 0.17152]
[0]
[0.0548864 0.1646592]
[1]
[0.0197591  0.05927731]
[1]
[0.00711328 0.02133983]
[1]
[0.01024312 0.00512156]
[0]


array([0, 0, 1, 1, 1, 0])

In [443]:
#delta = np.multiply(self.prior_probabilities, self.emission_probabilities[self.observation_states_dict.get(decode_observation_states[0])])
delta = np.multiply(self.prior_probabilities, self.emission_probabilities)
delta

array([[0.287296, 0.004356],
       [0.035376, 0.039204]])

In [433]:
best_path[1]

array([0., 0.])

In [420]:
np.max(transition_probabilities)

0.22983680000000006

In [394]:
print([self.hidden_states[state] for state in range(len(self.hidden_states))])

['R01', 'R21']


In [296]:
np.alltrue(final_path == use_case_one_data['hidden_states'])

False

In [302]:
final_path

'R01, R01, R21, R21, R21, R01'

In [303]:
use_case_one_data['hidden_states']

array(['R01', 'R01', 'R21', 'R21', 'R21', 'R01'], dtype='<U3')

In [233]:
self.observation_states

['committed', 'ambivalent']

In [106]:
max([(product_of_delta_and_transition_emission[state]) * self.emission_probabilities[state]for state in range(len(self.observation_states) - 1)])

array([[9.85540198e-02, 2.29996800e-05],
       [1.21353830e-02, 2.06997120e-04]])

In [52]:
len(use_case_one_data['observation_states'])

6