In [1]:
import numpy as np
from numpy import log2
import pandas as pd
from hmmlearn.hmm import MultinomialHMM

#### Setting up an example

Inspired by [this lecture](https://www.csb.pitt.edu/ComputationalGenomics/Lectures/Lec5.pdf)
from the U of Pittsburg (starting at slide 11).

Consider two hidden states ("H" for high GC and "L" for low GC) which can each "emit" the nucleotides A, C, G and T.

Using ["MultinomialHMM"](https://hmmlearn.readthedocs.io/en/latest/api.html#hmmlearn.hmm.MultinomialHMM)
because the emissions are discrete states.

In "hmmlearn" speak, **states** are **components** and emmitted **symbols** are **features**.

In [2]:
states = ['H', 'L']
nucleotides = ['A', 'C', 'G', 'T']
emission_p = np.array([[0.2, 0.3, 0.3, 0.2], [0.3, 0.2, 0.2, 0.3]]) # shape = len(states), len(symbols)
transition_p = np.array([[0.5, 0.5], [0.4, 0.6]])
initial_p = np.array([0.5, 0.5])

In [3]:
assert transition_p[0, 0] == 0.5 # prob(H|H) = 0.5
assert transition_p[1, 0] == 0.4 # prob(H|L) = 0.4
assert emission_p[0, 0] == 0.2   # prob(A|H) = 0.2
assert emission_p[1, 2] == 0.2   # prob(G|L) = 0.2

Build model

In [4]:
hmm = MultinomialHMM(n_components=len(states), 
                     startprob_prior=initial_p, 
                     transmat_prior=transition_p, 
                     verbose=False,  
                     init_params='')
hmm.transmat_ = transition_p
hmm.emissionprob_ = emission_p
hmm.startprob_ = initial_p
hmm.n_features = len(nucleotides)

In [5]:
sample_dna, sample_hid = hmm.sample(n_samples=10, random_state=21)

In [6]:
sample_dna.ravel(), sample_hid.ravel()

(array([1, 0, 0, 2, 2, 3, 0, 3, 3, 1]), array([0, 1, 0, 0, 0, 0, 0, 0, 1, 1]))

#### Some helper functions

In [13]:
def encode_seq(symbols, seqtype='dna'):
    encdr = nucleotides
    if seqtype != 'dna':
        encdr = states
    outseq = np.array([encdr.index(s) for s in symbols])
    return outseq

test_hl = 'HHHLLL'
test_nuc = 'GGGAAA'
assert encode_seq(test_hl, seqtype='states')[0] == states.index(test_aa[0]) and \
       encode_seq(test_hl, seqtype='states')[-1] == states.index(test_aa[-1])
assert encode_seq(test_nuc, seqtype='dna')[0] == nucleotides.index(test_nuc[0]) and \
       encode_seq(test_nuc, seqtype='dna')[-1] == nucleotides.index(test_nuc[-1])

def decode_seq(num_array, seqtype='dna'):
    encdr = nucleotides
    if seqtype != 'dna':
        encdr = states
    outseq = [encdr[s] for s in num_array]
    return ''.join(outseq)

assert decode_seq(encode_seq(test_nuc)) == test_nuc
assert decode_seq(encode_seq(test_aa, seqtype='prot'), seqtype='prot') == test_hl

In [15]:
decode_seq(sample_dna.reshape(-1), seqtype='dna'), decode_seq(sample_hid.reshape(-1), seqtype='hid')

('CAAGGTATTC', 'HLHHHHHHLL')

Viterbi algorithm for most probable sequence of hidden states to explain observations

In [16]:
observed_dna = 'GGCACTGAA'
obs_dna_e = encode_seq(observed_dna, seqtype='dna')
obs_dna_e.ravel()

array([2, 2, 1, 0, 1, 3, 2, 0, 0])

In [17]:
mle_hid_indices = hmm.predict(obs_dna_e.reshape(-1, 1))
mle_hid = decode_seq(mle_hid_indices, seqtype='hid')
mle_hid

'HHHLLLLLL'

#### How Viterbi works

Assume that the probability at position *pos* only depends on the emitted symbol and the hidden state at position *pos-1*.

Start by calculating the probability of each hidden state given the first symbol "G" at position 0 and the probability of starting in each state.
Determine the most likely hidden state at position 0 by comparing the two probabilities.

Next, determine the probability of each hidden state at position 1, given the probability of the maximum of each hidden state previously calculated, and the second symbol "G".
Determine the most likely hidden state at position 1 by comparing the two probabilities.


In [18]:
cum_prob = np.zeros((len(states), len(observed_dna)))
gi = nucleotides.index('G')
cum_prob[0, 0] = emission_p[0, gi] * initial_p[0]
cum_prob[1, 0] = emission_p[1, gi] * initial_p[1]
cum_prob

array([[0.15, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
       [0.1 , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ]])

In [19]:
pos = 1
pi = nucleotides.index(observed_dna[pos])
cum_prob[0, pos] = emission_p[0, pi] * max(cum_prob[0, pos-1]*transition_p[0, 0], cum_prob[1, pos-1]*transition_p[1, 0])
cum_prob[1, pos] = emission_p[1, pi] * max(cum_prob[0, pos-1]*transition_p[0, 1], cum_prob[1, pos-1]*transition_p[1, 1])
cum_prob

array([[0.15  , 0.0225, 0.    , 0.    , 0.    , 0.    , 0.    , 0.    ,
        0.    ],
       [0.1   , 0.015 , 0.    , 0.    , 0.    , 0.    , 0.    , 0.    ,
        0.    ]])

`max(cum_prob[0, pos-1]*transition_p[0, 0], cum_prob[1, pos-1]*transition_p[1, 0])`

^ This is where things can start to seem confusing, but they are intuitive if you take a moment to understand it

We are selecting the most likely of two possibilities (e.g. the max() operation).
The two possibilities are an H at the previous step transitioning to an H at this step OR an
L at the previous step transitioning to an H at this step.

We've already calculated the probability of an H or an L at the previous step, so we combine that with the transition probability.

In [20]:
pos = 2
pi = nucleotides.index(observed_dna[pos])
cum_prob[0, pos] = emission_p[0, pi] * max(cum_prob[0, pos-1]*transition_p[0, 0], cum_prob[1, pos-1]*transition_p[1, 0])
cum_prob[1, pos] = emission_p[1, pi] * max(cum_prob[0, pos-1]*transition_p[0, 1], cum_prob[1, pos-1]*transition_p[1, 1])
cum_prob

array([[0.15    , 0.0225  , 0.003375, 0.      , 0.      , 0.      ,
        0.      , 0.      , 0.      ],
       [0.1     , 0.015   , 0.00225 , 0.      , 0.      , 0.      ,
        0.      , 0.      , 0.      ]])

And so on ...

The only additional detail is that we generally operate using logarithms instead of the raw probabilities to avoid underflow. Recall that using logs, multiplications become summations.

The whole thing generalizes to:

In [21]:
pos = 0
pi = nucleotides.index(observed_dna[pos])
cum_prob[0, pos] = log2(emission_p[0, pi]) + log2(initial_p[0])
cum_prob[1, pos] = log2(emission_p[1, pi]) + log2(initial_p[1])
most_likely_hidden_index = np.argmax([cum_prob[0, pos], cum_prob[1, pos]])
most_likely_hidden = states[most_likely_hidden_index]
print(most_likely_hidden)

for pos in range(1, len(observed_dna)):
    pi = nucleotides.index(observed_dna[pos])
    prob_H = max(cum_prob[0, pos-1] + log2(transition_p[0, 0]), cum_prob[1, pos-1] + log2(transition_p[1, 0]))
    prob_L = max(cum_prob[0, pos-1] + log2(transition_p[0, 1]), cum_prob[1, pos-1] + log2(transition_p[1, 1]))
    cum_prob[0, pos] = log2(emission_p[0, pi]) + prob_H
    cum_prob[1, pos] = log2(emission_p[1, pi]) + prob_L
    
    most_likely_hidden_index = np.argmax([cum_prob[0, pos], cum_prob[1, pos]])
    most_likely_hidden = states[most_likely_hidden_index]
    if cum_prob[0, pos] == cum_prob[1, pos]:
        # use hidden state estimate as tie breaker
        most_likely_hidden_index = np.argmax([prob_H, prob_L])
        most_likely_hidden = states[most_likely_hidden_index]
    print(most_likely_hidden)
cum_prob

H
H
H
L
L
L
L
L
L


array([[ -2.73696559,  -5.47393119,  -8.21089678, -11.53282488,
        -14.00675607, -17.32868416, -19.53958094, -22.86150904,
        -25.65736832],
       [ -3.32192809,  -6.05889369,  -8.79585928, -10.94786238,
        -14.00675607, -16.48068725, -19.53958094, -22.01351213,
        -24.48744332]])

In [None]:
state_probabilities = hmm.predict_proba(prot_e.reshape(-1, 1))
state_probabilities[4,]