In [5]:
import math  # Just ignore this :-)
import numpy as np

# ML - Week 10 - Practical Exercises

In the exercise below, you will see an example of how a HMM can be represented, and you will implement and experiment with the computation of the joint probability and various decodings as explained in the lectures in week 8.

# 1 - Representing a HMM

We can represent a HMM as a triple consisting of three matrices: a $K \times 1$ matrix with the initial state probabilities, a $K \times K$ matrix with the transition probabilities and a $K \times |\Sigma|$ matrix with the emission probabilities. In Python we can write the matrices like this:

In [6]:
init_probs_7_state = [0.00, 0.00, 0.00, 1.00, 0.00, 0.00, 0.00]

trans_probs_7_state = [
    [0.00, 0.00, 0.90, 0.10, 0.00, 0.00, 0.00],
    [1.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00],
    [0.00, 1.00, 0.00, 0.00, 0.00, 0.00, 0.00],
    [0.00, 0.00, 0.05, 0.90, 0.05, 0.00, 0.00],
    [0.00, 0.00, 0.00, 0.00, 0.00, 1.00, 0.00],
    [0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 1.00],
    [0.00, 0.00, 0.00, 0.10, 0.90, 0.00, 0.00],
]

emission_probs_7_state = [
    #   A     C     G     T
    [0.30, 0.25, 0.25, 0.20],
    [0.20, 0.35, 0.15, 0.30],
    [0.40, 0.15, 0.20, 0.25],
    [0.25, 0.25, 0.25, 0.25],
    [0.20, 0.40, 0.30, 0.10],
    [0.30, 0.20, 0.30, 0.20],
    [0.15, 0.30, 0.20, 0.35],
]

How do we use these matrices? Remember that we are given some sequence of observations, e.g. like this:

In [7]:
obs_example = 'GTTTCCCAGTGTATATCGAGGGATACTACGTGCATAGTAACATCGGCCAA'

To make a lookup in our three matrices we have to translate each symbol in the string to an index.

In [8]:
def translate_observations_to_indices(obs):
    mapping = {'a': 0, 'c': 1, 'g': 2, 't': 3}
    return [mapping[symbol.lower()] for symbol in obs]

Let's try to translate the example above using this function:

In [9]:
obs_example_trans = translate_observations_to_indices(obs_example)

In [10]:
print(obs_example_trans)

[2, 3, 3, 3, 1, 1, 1, 0, 2, 3, 2, 3, 0, 3, 0, 3, 1, 2, 0, 2, 2, 2, 0, 3, 0, 1, 3, 0, 1, 2, 3, 2, 1, 0, 3, 0, 2, 3, 0, 0, 1, 0, 3, 1, 2, 2, 1, 1, 0, 0]


Use the function below to translate the indices back to observations:

In [11]:
def translate_indices_to_observations(indices):
    mapping = ['a', 'c', 'g', 't']
    return ''.join(mapping[idx] for idx in indices)

In [12]:
translate_indices_to_observations(translate_observations_to_indices(obs_example))

'gtttcccagtgtatatcgagggatactacgtgcatagtaacatcggccaa'

Now each symbol has been transformed (predictably) into a number which makes it much easier to make lookups in our matrices. We'll do the same thing for a list of states (a path):

In [13]:
def translate_path_to_indices(path):
    return list(map(lambda x: int(x), path))

def translate_indices_to_path(indices):
    return ''.join([str(i) for i in indices])

Given a path through a HMM, we can now translate it to a list of indices:

In [14]:
path_example = '33333333333321021021021021021021021021021021021021'

print(translate_path_to_indices(path_example))

[3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 2, 1, 0, 2, 1, 0, 2, 1, 0, 2, 1, 0, 2, 1, 0, 2, 1, 0, 2, 1, 0, 2, 1, 0, 2, 1, 0, 2, 1, 0, 2, 1, 0, 2, 1, 0, 2, 1]


Finally, we can collect the three matrices in a class to make it easier to work with our HMM.

In [19]:
class hmm:
    def __init__(self, init_probs, trans_probs, emission_probs):
        self.init_probs = np.array(init_probs)
        self.trans_probs = np.atleast_2d(trans_probs)
        self.emission_probs = np.atleast_2d(emission_probs)

# Collect the matrices in a class.
hmm_7_state = hmm(init_probs_7_state, trans_probs_7_state, emission_probs_7_state)

# We can now reach the different matrices by their names. E.g.:
hmm_7_state.trans_probs

array([[0.  , 0.  , 0.9 , 0.1 , 0.  , 0.  , 0.  ],
       [1.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
       [0.  , 1.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
       [0.  , 0.  , 0.05, 0.9 , 0.05, 0.  , 0.  ],
       [0.  , 0.  , 0.  , 0.  , 0.  , 1.  , 0.  ],
       [0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 1.  ],
       [0.  , 0.  , 0.  , 0.1 , 0.9 , 0.  , 0.  ]])

For testing, here's another model (which we will refer to as the 3-state model).

In [159]:
init_probs_3_state = [0.10, 0.80, 0.10]

trans_probs_3_state = [
    [0.90, 0.10, 0.00],
    [0.05, 0.99, 0.05],
    [0.00, 0.10, 0.90],
]

emission_probs_3_state = [
    #   A     C     G     T
    [0.40, 0.15, 0.20, 0.25],
    [0.25, 0.25, 0.25, 0.25],
    [0.20, 0.40, 0.30, 0.10],
]

hmm_3_state = hmm(init_probs_3_state, trans_probs_3_state, emission_probs_3_state)

# 2 - Validating a HMM (and handling floats)

Before using the model we'll write a function to validate that the model is valid. That is, the matrices should have the right dimensions and the following things should be true:

1. The initial probabilities must sum to 1.
2. Each row in the matrix of transition probabilities must sum to 1.
3. Each row in the matrix of emission probabilities must sum to 1.
4. All numbers should be between 0 and 1, inclusive.

Write a function `validate_hmm` that given a models returns True if the model is valid, and False otherwise:

In [160]:
import numpy as np

def validate_hmm(model):
    assert ((0 <= model.init_probs) & (model.init_probs <= 1)).all(), \
        "init_probs does not contain valid probabilities"
    assert ((0 <= model.trans_probs) & (model.trans_probs <= 1)).all(), \
        "trans_probs does not contain valid probabilities"
    assert ((0 <= model.emission_probs) & (model.emission_probs <= 1)).all(), \
        "emission_probs does not contain valid probabilities"
    
    K = model.trans_probs.shape[0]
    assert np.allclose(model.init_probs.sum(), [1.]), \
        "init_probs did not sum to one"
    assert np.allclose(model.trans_probs.sum(axis=1), np.ones(K)), \
        "init_probs did not sum to one"
    assert np.allclose(model.emission_probs.sum(axis=1), np.ones(K)), \
        "init_probs did not sum to one"
    print("Model valid!")

We can now use this function to check whether the example model is a valid model.

In [161]:
validate_hmm(hmm_7_state)

Model valid!


You might run into problems related to summing floating point numbers because summing floating point numbers does not (always) give the expected result as illustrated by the following examples. How do you suggest to deal with this?

In [162]:
0.15 + 0.30 + 0.20 + 0.35

0.9999999999999999

The order of the terms matter.

In [163]:
0.20 + 0.35 + 0.15 + 0.30

1.0

Because it changes the prefix sums

In [164]:
0.15 + 0.30

0.44999999999999996

In [165]:
0.20 + 0.35 + 0.15

0.7000000000000001

In [166]:
0.15 + 0.30

0.44999999999999996

On should never compare floating point numbers. They represent only an 'approximation'. Read more about the 'problems' in 'What Every Computer Scientist Should Know About Floating-Point Arithmetic' at:

http://docs.oracle.com/cd/E19957-01/806-3568/ncg_goldberg.html

# 3 - Computing the Joint Probability

Recall that the joint probability $p({\bf X}, {\bf Z}) = p({\bf x}_1, \ldots, {\bf x}_N, {\bf z}_1, \ldots, {\bf z}_N)$ of a hidden Markov model (HMM) can be compute as

$$
p({\bf x}_1, \ldots, {\bf x}_N, {\bf z}_1, \ldots, {\bf z}_N) = p({\bf z}_1) 
\left[ \prod_{n=2}^N p({\bf z}_n \mid {\bf z}_{n-1}) \right]
\prod_{n=1}^N p({\bf x}_n \mid {\bf z}_n)
$$

## Implementing without log-transformation

Write a function `joint_prob` given a model (e.g. in the representation above) and sequence of observables, ${\bf X}$, and a sequence of hidden states, ${\bf Z}$, computes the joint probability cf. the above formula.

In [167]:
def joint_prob(model, x, z):
    p = model.init_probs[z[0]] * model.emission_probs[z[0], x[0]]
    for i in range(1, len(x)):
        p *= model.trans_probs[z[i-1], z[i]] * model.emission_probs[z[i], x[i]] 
    return p

Now compute the joint probability of the ${\bf X}$ (`x_short`) and ${\bf Z}$ (`z_short`) below using the 7-state (`hmm_7_state`) model introduced above. (*Remember to translate them first using the appropriate functions introduces above!*)

In [168]:
x_short = 'GTTTCCCAGTGTATATCGAGGGATACTACGTGCATAGTAACATCGGCCAA'
z_short = '33333333333321021021021021021021021021021021021021'

# Your code here ...
x = translate_observations_to_indices(x_short)
z = translate_path_to_indices(z_short)
output_joint_prob = joint_prob(hmm_7_state, x, z)
print(output_joint_prob)


1.9114255184318882e-31


## Implementing with log-transformation (i.e. in "log-space")

Now implement the joint probability function in log space as explained in the lecture. We've given you a log-function that handles $\log(0)$.

Answer: The formula is the following:
$$
p({\bf x}_1, \ldots, {\bf x}_N, {\bf z}_1, \ldots, {\bf z}_N) = \log p({\bf z}_1) +
\sum_{n=2}^N \log p({\bf z}_n \mid {\bf z}_{n-1}) +
\sum_{n=1}^N p({\bf x}_n \mid {\bf z}_n)
$$

In [169]:
def log(x):
    if x == 0:
        return float('-inf')
    return math.log(x)

def joint_prob_log(model, x, z):
    p = log(model.init_probs[z[0]]) + log(model.emission_probs[z[0], x[0]])
    for i in range(1, len(x)):
        p += log(model.trans_probs[z[i-1], z[i]]) + log(model.emission_probs[z[i], x[i]])
    return p

Confirm that the log-space function is correct by comparing the output to the output of `joint_prob`.

In [170]:
# Your code here ...
output_joint_prob_log = joint_prob_log(hmm_7_state, x, z)
print(output_joint_prob_log)
assert log(output_joint_prob) == output_joint_prob_log

-70.73228857440486


## Comparison of Implementations

Now that you have two ways to compute the joint probability given a model, a sequence of observations, and a sequence of hidden states, try to make an experiment to figure out how long a sequence can be before it becomes essential to use the log-transformed version. For this experiment we'll use two longer sequences.

In [171]:
x_long = 'TGAGTATCACTTAGGTCTATGTCTAGTCGTCTTTCGTAATGTTTGGTCTTGTCACCAGTTATCCTATGGCGCTCCGAGTCTGGTTCTCGAAATAAGCATCCCCGCCCAAGTCATGCACCCGTTTGTGTTCTTCGCCGACTTGAGCGACTTAATGAGGATGCCACTCGTCACCATCTTGAACATGCCACCAACGAGGTTGCCGCCGTCCATTATAACTACAACCTAGACAATTTTCGCTTTAGGTCCATTCACTAGGCCGAAATCCGCTGGAGTAAGCACAAAGCTCGTATAGGCAAAACCGACTCCATGAGTCTGCCTCCCGACCATTCCCATCAAAATACGCTATCAATACTAAAAAAATGACGGTTCAGCCTCACCCGGATGCTCGAGACAGCACACGGACATGATAGCGAACGTGACCAGTGTAGTGGCCCAGGGGAACCGCCGCGCCATTTTGTTCATGGCCCCGCTGCCGAATATTTCGATCCCAGCTAGAGTAATGACCTGTAGCTTAAACCCACTTTTGGCCCAAACTAGAGCAACAATCGGAATGGCTGAAGTGAATGCCGGCATGCCCTCAGCTCTAAGCGCCTCGATCGCAGTAATGACCGTCTTAACATTAGCTCTCAACGCTATGCAGTGGCTTTGGTGTCGCTTACTACCAGTTCCGAACGTCTCGGGGGTCTTGATGCAGCGCACCACGATGCCAAGCCACGCTGAATCGGGCAGCCAGCAGGATCGTTACAGTCGAGCCCACGGCAATGCGAGCCGTCACGTTGCCGAATATGCACTGCGGGACTACGGACGCAGGGCCGCCAACCATCTGGTTGACGATAGCCAAACACGGTCCAGAGGTGCCCCATCTCGGTTATTTGGATCGTAATTTTTGTGAAGAACACTGCAAACGCAAGTGGCTTTCCAGACTTTACGACTATGTGCCATCATTTAAGGCTACGACCCGGCTTTTAAGACCCCCACCACTAAATAGAGGTACATCTGA'
z_long = '3333321021021021021021021021021021021021021021021021021021021021021021033333333334564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564563210210210210210210210210210210210210210210210210210210210210210210210210210210210210210210210210210210210210210210210210210210210321021021021021021021021033334564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564563333333456456456456456456456456456456456456456456456456456456456456456456456456456456456456456456456456456456456456456456332102102102102102102102102102102102102102102102102102102102102102102102102102102102102102102102103210210210210210210210210210210210210210210210210210210210210210'

Now compute the joint probability with `joint_prob` using both the 3-state (hmm_3_state) and the 7-state (hmm_7_state) model introduced above, and see when it breaks (i.e. when it wrongfully becomes 0). Does this make sense? No as 3-state only have 3 possible states for z, whereas here in this example it has 7. 

Here's some code to get you started.

In [172]:
b3 = True
b7 = True
for i in range(100, len(x_long), 1):
    x = x_long[:i]
    z = z_long[:i]

    x_trans = translate_observations_to_indices(x)
    z_trans = translate_path_to_indices(z)
    
    # Make your experiment here...
    # Ignore 3 state, as z_long have 7 states.
    z_trans_3_state = list(map(lambda x: x % 3, z_trans))
    if b3 & (joint_prob(hmm_3_state, x_trans, z_trans_3_state) == 0):
        print("hmm_3_state broke for sequence length", i)
        b3 = False
    if b7 & (joint_prob(hmm_7_state, x_trans, z_trans) == 0):
        print("hmm_7_state broke for sequence length", i)
        b7 = False


hmm_3_state broke for sequence length 100
hmm_7_state broke for sequence length 530


In the cell below you should state for which $i$ computing the joint probability (for the two models considered) using `joint_prob` wrongfully becomes 0.

**Your answer here:**

For the 3-state model, `joint_prob` becomes 0 for **i = 100 **.

For the 7-state model, `joint_prob` becomes 0 for **i = 530 **.

# 4 - Viterbi Decoding

Below you will implement and experiment with the Viterbi algorithm. The implementation has been split into three parts:

1. Fill out the $\omega$ table using the recursion presented at the lecture.
2. Find the state with the highest probability after observing the entire sequence of observations.
3. Backtrack from the state found in the previous step to obtain the optimal path.

We'll be working with the 7-state model (`hmm_7_state`) and the helper function for translating between observations, hidden states, and indicies, as introduced above.

Additionally, you're given the function below that constructs a table of a specific size filled with zeros.

In [173]:
def make_table(m, n):
    """Make a table with `m` rows and `n` columns filled with zeros."""
    return [[0] * n for _ in range(m)]

You'll be testing your code with the same two sequences as above, i.e:

In [174]:
x_short = 'GTTTCCCAGTGTATATCGAGGGATACTACGTGCATAGTAACATCGGCCAA'
z_short = '33333333333321021021021021021021021021021021021021'

In [175]:
x_long = 'TGAGTATCACTTAGGTCTATGTCTAGTCGTCTTTCGTAATGTTTGGTCTTGTCACCAGTTATCCTATGGCGCTCCGAGTCTGGTTCTCGAAATAAGCATCCCCGCCCAAGTCATGCACCCGTTTGTGTTCTTCGCCGACTTGAGCGACTTAATGAGGATGCCACTCGTCACCATCTTGAACATGCCACCAACGAGGTTGCCGCCGTCCATTATAACTACAACCTAGACAATTTTCGCTTTAGGTCCATTCACTAGGCCGAAATCCGCTGGAGTAAGCACAAAGCTCGTATAGGCAAAACCGACTCCATGAGTCTGCCTCCCGACCATTCCCATCAAAATACGCTATCAATACTAAAAAAATGACGGTTCAGCCTCACCCGGATGCTCGAGACAGCACACGGACATGATAGCGAACGTGACCAGTGTAGTGGCCCAGGGGAACCGCCGCGCCATTTTGTTCATGGCCCCGCTGCCGAATATTTCGATCCCAGCTAGAGTAATGACCTGTAGCTTAAACCCACTTTTGGCCCAAACTAGAGCAACAATCGGAATGGCTGAAGTGAATGCCGGCATGCCCTCAGCTCTAAGCGCCTCGATCGCAGTAATGACCGTCTTAACATTAGCTCTCAACGCTATGCAGTGGCTTTGGTGTCGCTTACTACCAGTTCCGAACGTCTCGGGGGTCTTGATGCAGCGCACCACGATGCCAAGCCACGCTGAATCGGGCAGCCAGCAGGATCGTTACAGTCGAGCCCACGGCAATGCGAGCCGTCACGTTGCCGAATATGCACTGCGGGACTACGGACGCAGGGCCGCCAACCATCTGGTTGACGATAGCCAAACACGGTCCAGAGGTGCCCCATCTCGGTTATTTGGATCGTAATTTTTGTGAAGAACACTGCAAACGCAAGTGGCTTTCCAGACTTTACGACTATGTGCCATCATTTAAGGCTACGACCCGGCTTTTAAGACCCCCACCACTAAATAGAGGTACATCTGA'
z_long = '3333321021021021021021021021021021021021021021021021021021021021021021033333333334564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564563210210210210210210210210210210210210210210210210210210210210210210210210210210210210210210210210210210210210210210210210210210210321021021021021021021021033334564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564564563333333456456456456456456456456456456456456456456456456456456456456456456456456456456456456456456456456456456456456456456332102102102102102102102102102102102102102102102102102102102102102102102102102102102102102102102103210210210210210210210210210210210210210210210210210210210210210'

Remember to translate these sequences to indices before using them with your algorithms.

## Implementing without log-transformation

First, we will implement the algorithm without log-transformation. This will cause issues with numerical stability (like above when computing the joint probability), so we will use the log-transformation trick to fix this in the next section.

### Computation of the $\omega$ table

In [176]:
def compute_w(model, x):
    K = len(model.init_probs)
    N = len(x)
    
    # w[k][n] = w(zn), k x n
    # w = make_table(K, N)
    w = np.zeros((K, N))
    
    # Base case: fill out w[i][0] for i = 0..k-1
    # w(z1) = p(z1)p(x1|z1)
    for i in range(K):
        w[i, 0] = model.init_probs[i]*model.emission_probs[i, x[0]]
    
    
    # Inductive case: fill out w[i][j] for i = 0..k, j = 1..n
    # w(zn) = p(xn|zn)*max_(zn-1) w(zn-1)p(zn|zn-1)
    for n in range(1, N):
        for k in range(K):
            for j in range(K):
                w[k, n] = max(w[k, n], model.emission_probs[k, x[n]] * w[j, n-1] * model.trans_probs[j, k])

    return w

### Finding the joint probability of an optimal path

Now, write a function that given the $\omega$-table, returns the probability of an optimal path through the HMM. As explained in the lecture, this corresponds to finding the highest probability in the last column of the table.

In [177]:
def opt_path_prob_log(w):
    return np.max(w[:, -1])

Now test your implementation in the box below:

In [178]:
w = compute_w(hmm_7_state, translate_observations_to_indices(x_short))
print(opt_path_prob(w))


1.911425518431887e-31


Now do the same for `x_long`. What happens?

In [179]:
# Your code here ...
w = compute_w(hmm_7_state, translate_observations_to_indices(x_long))
print(opt_path_prob(w))

0.0


### Obtaining an optimal path through backtracking

Implement backtracking to find a most probable path of hidden states given the $\omega$-table.

In [180]:
def backtrack(w, model, x):
    N = w.shape[1]
    # z[1..N] = undef
    z = np.empty((N), dtype=int)
    
    # z[N] = arg max_k w[k][N]
    z[N-1] = np.argmax(w[:, N-1])

    # z[n] = arg max_k (p(x[n+1]|z[n+1]) * w[k][n] * p(z[n+1]|k))
    for n in range(N-2, -1, -1):
        emission_prob = model.emission_probs[z[n+1], x[n+1]]
        trans_probs = [row[z[n+1]] for row in model.trans_probs]
        z[n] = np.argmax(emission_prob * w[:, n] * trans_probs)

    
    return z

In [181]:
x_indices = translate_observations_to_indices(x_short)
w = compute_w(hmm_7_state, x_indices)
z_viterbi = backtrack(w, hmm_7_state, x_indices)
print(z_viterbi)

[3 3 3 3 3 3 3 3 3 3 3 3 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2
 1 0 2 1 0 2 1 0 2 1 0 2 1]


Now do the same for `x_long`. What happens?

In [182]:
# Your code here ...
x_indices = translate_observations_to_indices(x_long)
w = compute_w(hmm_7_state, x_indices)
z_viterbi = backtrack(w, hmm_7_state, x_indices)
print(z_viterbi)

[3 3 3 3 3 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1
 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 3 3 3
 3 3 3 3 3 3 3 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6
 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4
 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5
 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 3 2 1 0 2 1 0 2 1 0 2 1
 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0
 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2
 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1
 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0
 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2
 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1
 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0
 2 1 0 2 1 0 2 1 0 2 1 0 

## Implementing with log-transformation

Now implement the Viterbi algorithm with log transformation. The steps are the same as above.

### Computation of the $\omega$ table

In [183]:
def compute_w_log(model, x):
    K = len(model.init_probs)
    N = len(x)
    
    # w[k][n] = w(zn), k x n
    # w = make_table(K, N)
    w = np.zeros((K, N))
    
    # Base case: fill out w[i][0] for i = 0..k-1
    for i in range(K):
        w[i, 0] = log(model.init_probs[i]) + log(model.emission_probs[i, x[0]])
    
    # Inductive case: fill out w[i][j] for i = 0..k, j = 1..n
    for n in range(1, N):
        for k in range(K):
            w[k, n] = -np.inf  # log probs go negative
            for j in range(K):         
                w[k, n] = max(w[k, n], log(model.emission_probs[k, x[n]]) + w[j, n-1] + log(model.trans_probs[j, k]))

    return w

### Finding the (log transformed) joint probability of an optimal path

In [184]:
def opt_path_prob_log(w):
    return max([row[-1] for row in w])

In [185]:
w = compute_w_log(hmm_7_state, translate_observations_to_indices(x_short))

opt_path_prob_log(w)

-70.73228857440486

Now do the same for `x_long`. What happens?

In [186]:
# Your code here ...
w = compute_w_log(hmm_7_state, translate_observations_to_indices(x_long))

opt_path_prob_log(w)

-1406.7209253880144

### Obtaining an optimal path through backtracking

In [192]:
def backtrack_log(w, model, x):
    N = w.shape[1]
    # z[1..N] = undef
    z = np.empty((N), dtype=int)
    
    # z[N] = arg max_k w[k][N]
    z[N-1] = np.argmax(w[:, N-1])

    # z[n] = arg max_k (p(x[n+1]|z[n+1]) * w[k][n] * p(z[n+1]|k))
    for n in range(N-2, -1, -1):
        emission_prob = log(model.emission_probs[z[n+1], x[n+1]])
        trans_probs = [log(row[z[n+1]]) for row in model.trans_probs]
        z[n] = np.argmax(emission_prob + w[:, n] + trans_probs)

    
    return z

In [193]:
w = compute_w_log(hmm_7_state, translate_observations_to_indices(x_short))
z_viterbi_log = backtrack_log(w, hmm_7_state, translate_observations_to_indices(x_short))
print(z_viterbi_log)

[3 3 3 3 3 3 3 3 3 3 3 3 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2
 1 0 2 1 0 2 1 0 2 1 0 2 1]


Now do the same for `x_long`. What happens?

In [194]:
# Your code here ...
w = compute_w_log(hmm_7_state, translate_observations_to_indices(x_long))
z_viterbi_log = backtrack_log(w, hmm_7_state, translate_observations_to_indices(x_long))
print(z_viterbi_log)

[3 3 3 3 3 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1
 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 3 3 3
 3 3 3 3 3 3 3 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6
 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4
 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5
 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 3 2 1 0 2 1 0 2 1 0 2 1
 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0
 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2
 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1
 0 2 1 0 2 1 0 3 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 3 3 3 3 4
 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5
 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6
 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4 5 6 4
 5 6 4 5 6 4 5 6 4 5 6 4 

### Does it work?

Think about how to verify that your implementations of Viterbi (i.e. `compute_w`, `opt_path_prob`, `backtrack`, and there log-transformed variants `compute_w_log`, `opt_path_prob_log`, `backtrack_log`) are correct.

One thing that should hold is that the probability of a most likely path as computed by `opt_path_prob` (or `opt_path_prob_long`) for a given sequence of observables (e.g. `x_short` or `x_long`) should be equal to the joint probability of a corersponding most probable path as found by `backtrack` (or `backtrack_log`) and the given sequence of observables. Why?

Answer: In `opt_path_prob` finds the probability of the most likely path. `backtrack` finds the most likely path. So the joint probability of this path should equal the probability of the most likely path.

opt_path_prob is the max in the last column in omega table:
$$
\max_{z_N} \omega(z_N) = \max_{z_N}\max_{z_1, \dots, z_{N-1}} p(x_1, \dots, x_N, z_1, \dots, z_N) = \max_{Z} P(X, Z) = P(X, Z^\star)
$$

Backtracking:
$$
Z^\star = \arg \max_{Z} p(X, Z)
$$

So we see that if we compute the join probability of $P(X, Z^\star)$, this exactly equals the result of opt_path_prob.





Make an experiment that validates that this is the case for your implementations of Viterbi and `x_short` and `x_long`.

In [195]:
# Your code here ...

def check_opt_path_prob_equals_joint_backtrack(model, x):
    x = translate_observations_to_indices(x)
    w = compute_w(model, x)
    most_likely_path_prob = opt_path_prob(w)
    backtrack_path = backtrack(w, model, x)
    backtrack_joint_prob = joint_prob(model, x, backtrack_path)
    
    print('opt_path_prob:', most_likely_path_prob)
    print('backtrack joint prob:', backtrack_joint_prob)
    assert math.isclose(most_likely_path_prob, backtrack_joint_prob)
    
def check_opt_path_prob_equals_joint_backtrack_log(model, x):
    x = translate_observations_to_indices(x)
    w = compute_w_log(model, x)
    most_likely_path_prob = opt_path_prob_log(w)
    backtrack_path = backtrack_log(w, model, x)
    backtrack_joint_prob = joint_prob_log(model, x, backtrack_path)

    print('opt_path_prob_log:', most_likely_path_prob)
    print('backtrack joint prob log:', backtrack_joint_prob)
    assert math.isclose(most_likely_path_prob, backtrack_joint_prob), "Joint backtrack did not equal opt path prob"

check_opt_path_prob_equals_joint_backtrack(hmm_7_state, x_short)
check_opt_path_prob_equals_joint_backtrack(hmm_7_state, x_long)

check_opt_path_prob_equals_joint_backtrack_log(hmm_7_state, x_short)
check_opt_path_prob_equals_joint_backtrack_log(hmm_7_state, x_long)


opt_path_prob: 1.911425518431887e-31
backtrack joint prob: 1.9114255184318882e-31
opt_path_prob: 0.0
backtrack joint prob: 0.0
opt_path_prob_log: -70.73228857440486
backtrack joint prob log: -70.73228857440486
opt_path_prob_log: -1406.7209253880144
backtrack joint prob log: -1406.7209253880178


### Does log transformation matter?

Make an experiment that investigates how long the input string can be before `backtrack` and `backtrack_log` start to disagree on a most likely path and its probability.

In [200]:
# Your code here ...
model = hmm_7_state
b3 = True
b7 = True
for i in range(10, len(x_long), 10):
    x = x_long[:i]
    z = z_long[:i]

    x_trans = translate_observations_to_indices(x)
    
    # Make your experiment here...
    w = compute_w(model, x_trans)
    b = backtrack(w, model, x_trans)
    
    w_log = compute_w_log(model, x_trans)
    b_log = backtrack_log(w_log, model, x_trans)
    
    if (b != b_log).any():
        print("backtrack disagreeed with backtrack_log for sequence length ", i)
        break

backtrack disagreeed with backtrack_log for sequence length  530


# 5 - Posterior Decoding

If you have time, try to implement posterior decoding (with scaling, if possible) as explained in the lecture

In [284]:
def posterior_decoding(model, x):
    N, K = len(x), model.init_probs.shape[0]

    alpha_hat = np.empty((K, N))
    c = np.empty((N,))
    beta_hat = np.empty((K, N))
    
    # do forward pass and calculate c to avoid numerical issues (normalize by c)
    
    # Basis:
    # alpha(z1) = p(z1)p(x1|z1)
    alpha_basis = model.init_probs * model.emission_probs[:, x[0]]
    # c1 = sum_z1 p(z1)p(x1|z1)
    c[0] = (alpha_basis).sum()
    alpha_hat[:, 0] = alpha_basis / c[0]
    
    for n in range(1, N):
        # delta(z_n) = p(xn|zn) sum_{zn-1} alpha_hat(z_n-1)p(zn|zn-1)
        delta = model.emission_probs[:, x[n]] * (model.trans_probs.T * alpha_hat[:, n-1]).sum(axis=1)
        # cn = sum_K delta(z_n)
        c[n] = delta.sum()
        # alpha_hat(z_n) = delta(z_n)/cn
        alpha_hat[:, n] = delta / c[n]
        
    # do backwards pass
    # Basis: beta_hat(zN) = 1
    beta_hat[:, -1] = 1 
    for n in range(N-2, -1, -1):
        # epsilon(zn) = sum_{zn+1} beta_hat(zn+1)p(xn+1|zn+1)p(zn+1|zn)
        eps = (beta_hat[:, n+1] * model.emission_probs[:, x[n+1]] * model.trans_probs).sum(axis=1)
        beta_hat[:, n] = eps / c[n+1]
    
    z = (alpha_hat * beta_hat).argmax(axis=0)
    
    return z

forward(hmm_7_state, translate_observations_to_indices(x_short))
print(posterior_decoding(hmm_7_state, translate_observations_to_indices(x_short)))

[3 3 3 3 3 3 3 3 3 3 3 3 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2
 1 0 2 1 0 2 1 0 2 1 0 2 1]


Compare the Viterbi and posterior decodings for the `x_short` and `x_long` using the 7-state model (`hmm_7_state`).

In [265]:
# Your code here ...
model = hmm_7_state

print("x_short")
x_trans = translate_observations_to_indices(x_short)
z_trans = translate_path_to_indices(z_short)

posterior_path = posterior_decoding(model, x_trans)
w = compute_w_log(model, x_trans)
viterbi_path = backtrack_log(w, model, x_trans)
print(posterior_path)
print(viterbi_path)

for idx, (i, j) in enumerate(zip(posterior_path, viterbi_path)):
    if i != j:
        print("Viterbi and Posterior disagreed at index", idx)


print("x_long")
x_trans = translate_observations_to_indices(x_long)
z_trans = translate_path_to_indices(z_long)

posterior_path = posterior_decoding(model, x_trans)
w = compute_w_log(model, x_trans)
viterbi_path = backtrack_log(w, model, x_trans)

for idx, (i, j) in enumerate(zip(posterior_path, viterbi_path)):
    if i != j:
        print("Viterbi and Posterior disagreed at index", idx)


x_short
[3 3 3 3 3 3 3 3 3 3 3 3 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2
 1 0 2 1 0 2 1 0 2 1 0 2 1]
[3 3 3 3 3 3 3 3 3 3 3 3 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2 1 0 2
 1 0 2 1 0 2 1 0 2 1 0 2 1]
x_long
Viterbi and Posterior disagreed at index 5
Viterbi and Posterior disagreed at index 6
Viterbi and Posterior disagreed at index 7
Viterbi and Posterior disagreed at index 8
Viterbi and Posterior disagreed at index 9
Viterbi and Posterior disagreed at index 10
Viterbi and Posterior disagreed at index 11
Viterbi and Posterior disagreed at index 12
Viterbi and Posterior disagreed at index 13
Viterbi and Posterior disagreed at index 14
Viterbi and Posterior disagreed at index 15
Viterbi and Posterior disagreed at index 16
Viterbi and Posterior disagreed at index 17
Viterbi and Posterior disagreed at index 18
Viterbi and Posterior disagreed at index 19
Viterbi and Posterior disagreed at index 20
Viterbi and Posterior disagreed at index 21
Viterbi and Posterior disagreed at i