<a href="https://colab.research.google.com/github/shanguanma/Aligners/blob/master/HMM.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Implementation of HMM algorithms

# Choosing the appropriate data structures

You can choose any data structure to represent your HMM as long as your implementation works. The important thing to bear in mind is that your nodel needs to contain:

- N states
  Represented by characters, strings or numbers. It can include the first and the last state explicity.
- A set of transition probabilities$\{{a_{aj}}\}$

  ${a_{kj}} = P(\pi(i)= j |\pi(i-1) =k) $

  The probability of being in the state j at step i given that at step i-1 we were in the state k.
- A set of emission probabilities$\{{e_k(c)}\}$
  $e_k(c) = P(s_i = c | \pi(i) = k)$

  The probability of observing the symbol c at the i-th position sequence given that at the i-th step we are in the state k.

One possible representation is described below.


# States

States are represented as a list of characters.


In [0]:
states = ['b','y','n','e']

# Transtions

Transitions are represented as a dictionary where keys are tuples and values are transition probabilities. 

The first element of tuple is the state from which we transition and the second element is the state we transition to.

In [0]:
transitions = {('b','y') : 0.2,
               ('b','n') : 0.8,
               ('y','y') : 0.7,
               ('y','n') : 0.2,
               ('y','e') : 0.1,
               ('n','n') : 0.8,
               ('n','y') : 0.1,
               ('n','e') : 0.1}
               

# Emission

Emissions are represented as a dictionary where keys are states and values are dictionaries. The internal dictionaries contain emitted symbols as keys and emission probabilities as values.

In [0]:
emissions = {'y' : {'A':0.1, 'C':0.4, 'G':0.4, 'T':0.1},
            'n': {'A': 0.25, 'C': 0.25,'G':0.25,'T':0.25}}

# Sequence

The sequence is simply a string.

In [0]:
sequence = 'ATGCG'

# Initializing matrices

We can write a simple utility function to initialize matrices that will be useful afterwards.It takes the desired number of rows, the desired of columns and the value to fill the matrix . If the third parameter is not provided, the matrix is filled with zeros by default.

In [0]:
def initialize_matrix(dim1, dim2, value=0):
    F = []
    for i in range(0, dim1):
        F.append([])
        for j in range(0,dim2):
            F[i].append(value)
    return F
            

# Visualizing matrices

For visualization purposed only we implement some printing functions. They are not important for the sake of the solution and the details of the implementation can be ignored.

In [0]:
def print_matrix(matrix, axis1,axis2):
    w = '{:<10}'
    print(w.format('') + w.format('<sos>') + ''.join([w.format(char) for char in axis2]) + w.format('<eos>'))
    for i, row in enumerate(matrix):
        print(w.format(axis1[i]) + ''.join(['{:<10.2e}'.format(item) for item in row]))

def print_matrix_p(matrix, axis1, axis2):
    w = '{:<10}'
    print(w.format('') + w.format('<sos>') + ''.join([w.format(char) for char in axis2])+w.format('eos'))
    for i, row in enumerate(matrix):
        print(w.format(axis1[i]) + ''.join(['{:<10s}'.format(item) for item in row]))


# Forward algorithm

We start by initializing matrix F that will contain as many rows as there are states and as many columns as there are symbols in the sequence plus two additional columns (the first(e.g: it's same as <sos>) and the last(e.g: it's same as <eos>)). The probability of starting from the begin state is 1 so we set the first elements of the matrix to 1.

In [39]:
F = initialize_matrix(len(states), len(sequence)+2)
F[0][0] = 1
print_matrix(F,states, sequence)

          <sos>     A         T         G         C         G         <eos>     
b         1.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  
y         0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  
n         0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  
e         0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  


Next, we calculate the values for the first symbol. For each state, it is the probability of transitioning from the begin state to the current state times the probability of emitting the first symbol of the sequence in the current state.

In [40]:
for i in range(1, len(states) -1):
    F[i][1] = transitions[(states[0],states[i])] * emissions[states[i]][sequence[0]]

print_matrix(F, states, sequence)
# F[1][1] = transitions[states[0],states[1]] * emissions[states[1]][sequence[0]]
# F[1][1] = transitions[('b','y')] * emissions['y']['A']
# F[1][1] = 0.2 * 0.1 
# F[1][1] = 0.02
# F[1][1] = 2.00e-02
# F[2][1] = transitions[states[0],states[2]] * emissions[states[2]][sequence[0]]
# F[2][1] = transitions[('b','n')] * emissioms['n']['A']
# F[2][1] = 0.8 * 0.25
# F[2][1] = 0.2
# F[2][1] = 2.00e-01
# F[3][1] = transitions[states[0],states[3]] * emissions[states[3]][sequence[0]]
# F[3][1] = transitions[('b','e')] * emissions['e']['A']
# F[3][1] = 0 * 0
# F[3][1] = 0
# F[3][1] = 0.00e+00

          <sos>     A         T         G         C         G         <eos>     
b         1.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  
y         0.00e+00  2.00e-02  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  
n         0.00e+00  2.00e-01  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  
e         0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  


For all of the other symbols, from the second to the last, we calculate the values as the sum of probabilities. For each state, it is the sum of probabilities of transitioning form any previous state to the current states times the probability of emitting the corresponding symbol of the sequence in the current state times the probability of the previous state.

In [41]:
# loops on the symbols ,from second symbol to last specify symbol (e.g: 0)
for j in range(2, len(sequence) + 1):
    # loops on the states, from second state to last state
    for i in range(1, len(states) - 1 ):
        p_sum = 0
        # loops on all of the possible previous states 
        for k in range(1, len(states) - 1):
            p_sum += F[k][j-1] * transitions[(states[k],states[i])] * emissions[states[i]][sequence[j-1]]
        F[i][j] = p_sum

print_matrix(F, states, sequence)




          <sos>     A         T         G         C         G         <eos>     
b         1.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  
y         0.00e+00  2.00e-02  3.40e-03  2.59e-03  1.06e-03  3.69e-04  0.00e+00  
n         0.00e+00  2.00e-01  4.10e-02  8.37e-03  1.80e-03  4.14e-04  0.00e+00  
e         0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  


Now, we need to calculate the final value. It is the sum of probabilities of transitioning from any previous state to the end state times the probability of the previous state.

In [42]:
# The final value  is equivalent to the end marker <eos>
# of the sentence in end-to-end speech recognition.
p_sum = 0
for k in range(1, len(states) - 1 ):
    p_sum += F[k][len(sequence)] * transitions[(states[k], states[-1])]

F[-1][-1] = p_sum
print_matrix(F, states, sequence)


          <sos>     A         T         G         C         G         <eos>     
b         1.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  
y         0.00e+00  2.00e-02  3.40e-03  2.59e-03  1.06e-03  3.69e-04  0.00e+00  
n         0.00e+00  2.00e-01  4.10e-02  8.37e-03  1.80e-03  4.14e-04  0.00e+00  
e         0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  7.83e-05  


We can now put everything into a function that takes states, transitions, emissions and a sequence and returns a matrix F.

In [0]:
def forward(states, transitions, emissions, sequence):
    # initialize matrix, set <sos> to 1.
    F = initialize_matrix(len(states), len(sequence)+2)
    F[0][0] = 1
    # first symbol(e.g: 'A') of sequence , initialize hmm state
    # Question:Why len(states) -1, not len(staes)
    # Reply: Because last state has no emit probabilty.
    #        It only serves as a link.
    for i in range(1, len(states) - 1):
        F[i][1] = transitions[(states[0],states[i])] * emissions[states[i]][sequence[0]]
    
    # Question:Why len(squence) + 1?
    # Reply: Because if you want to compute last symbol(e.g.: 'G').
    #        range function is [start,stop).
    for j in range(2, len(sequence) + 1 ):
        for i in range(1, len(states) - 1):
            p_sum = 0
            for k in range(1, len(states) - 1 ):
                # probabilty of previous symbol  * 
                # probablity from previous state to current state * 
                # observing symbol j-1 given current state i 
                p_sum += F[K][j-1] * transitions[(states[k],states[i])] * emissions[states[i]][sequence[j-1]]
            F[i][j] = p_sum
    p_sum = 0
    for k in range(1, len(states) -1 ):
        p_sum += F[k][len(sequence)] * transitions[(states[k], states[-1])]
    F[-1][-1] = p_sum
    return F


# Beckward algorithm

We start by initializing matrix F that will contain as many rows as there are states and as many columns as there are symbols in the sequence plus two additional columns(the first(e.g.: \<sos>) and the last(e.g: \<eos>)). The probability of starting from the end states is 1 so we set the last element of the matrix to 1. 


In [44]:
F = initialize_matrix(len(states), len(sequence) + 2)
F[-1][-1] = 1
print_matrix(F, states, sequence)


          <sos>     A         T         G         C         G         <eos>     
b         0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  
y         0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  
n         0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  
e         0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  1.00e+00  


Next, we calulate the values for the last symbol. For each state, it is the probability of transitioning from the current state to the end state.


In [45]:
for i in range(1, len(states) -1):
    F[i][-2] = transitions[(states[i], states[-1])]
print_matrix(F, states, sequence)

          <sos>     A         T         G         C         G         <eos>     
b         0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  
y         0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  1.00e-01  0.00e+00  
n         0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  1.00e-01  0.00e+00  
e         0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  1.00e+00  


For all of the other symbols, from the second to last to the first, we calculate the values as the sum of probabilities. For each state, it is the sum if probabilities of transitioning from the current state to any succesive state times the probability of emitting the corresponding symbol of the sequence in the successive state times the probability of the successive state.

In [48]:
# loops on the symbol
for j in range(len(sequence) -1, 0, -1):
    # loops on the states
    for i in range(1, len(states) -1 ):
        p_sum = 0
        # loops on all of the possible succesive states
        for k in range(1, len(states) -1 ):
            p_sum += F[k][j+1] * transitions[(states[i],states[k])] * \
                     emissions[states[k]][sequence[j]]                                                                
        F[i][j] = p_sum
print_matrix(F, states,sequence)

          <sos>     A         T         G         C         G         <eos>     
b         0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  
y         0.00e+00  3.08e-04  3.23e-03  1.04e-02  3.30e-02  1.00e-01  0.00e+00  
n         0.00e+00  3.61e-04  1.64e-03  6.12e-03  2.40e-02  1.00e-01  0.00e+00  
e         0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  1.00e+00  


Now, we need to calculate the final value. It is the sum of probabilities of transitioning from the begin state to any successive state times the probability of emitting the first symbol of the sequence in the successive state times the probability of the successive state.

In [49]:
p_sum = 0
for k in range(1, len(states) -1):
    p_sum += F[k][1] * transitions[(states[0],states[k])] * emissions[states[k]][sequence[0]]
F[0][0] = p_sum
print_matrix(F, states, sequence)

          <sos>     A         T         G         C         G         <eos>     
b         7.83e-05  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  
y         0.00e+00  3.08e-04  3.23e-03  1.04e-02  3.30e-02  1.00e-01  0.00e+00  
n         0.00e+00  3.61e-04  1.64e-03  6.12e-03  2.40e-02  1.00e-01  0.00e+00  
e         0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  1.00e+00  


Now we can put everything into a function that takes states,transitions, emissions and a sequence and returns a matrix F.

In [0]:
def backward(states, transitions, emissions,sequence):
    # initialize matrix, set last symbol (e.g: <eos>) to 1.
    F = initialize_matrix(len(states), len(sequence) + 2)
    F[-1][-1] = 1
    # at the second to last symbol and  last state to initialize matrix
    for i in range(1, len(states) -1):
        F[i][-2] = transitions[(states[i], states[-1])]
    
    for j in range(len(sequence) -1, 0 , -1):
        for i in range(1, len(states) -1):
            p_sum = 0
            for k in range(1, len(states) -1):
                p_sum += F[k][j+1] * \
                         transitions[(states[i], states[k])] * \
                         emissions[states[k]][sequence[j]]
            F[i][j] = p_sum


    p_sum = 0
    for k in range(1, len(states) -1):
        p_sum += F[k][1] * \
                 transitions[(states[0],states[k])] * \
                 emissions[states[k]][sequence[0]]
    F[0][0] = p_sum
    return F

# Viterbi algorithm

We start by initializing matrice F and FP that will contain as many rows as there are states and as many columns as there are symbols in the sequence plus two additional columns(the first(e.g.: \<sos>) and the last(e.g.: \<eos>)). Matrix FP is filled with 'b' symbols that indicate the begin state. The probability of starting from the begin state is 1 so we set the first element of the matrix F to 1.

In [55]:
F = initialize_matrix(len(states), len(sequence) + 2)
FP = initialize_matrix(len(states), len(sequence) + 2, states[0])
F[0][0] = 1
print_matrix(F, states, sequence)
print("\n\n")
print_matrix_p(FP, states, sequence)

          <sos>     A         T         G         C         G         <eos>     
b         1.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  
y         0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  
n         0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  
e         0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  



          <sos>     A         T         G         C         G         eos       
b         b         b         b         b         b         b         b         
y         b         b         b         b         b         b         b         
n         b         b         b         b         b         b         b         
e         b         b         b         b         b         b         b         


Next, we calculate the values for the first symbol. For each state, it is the probability of transitioning from the begin state to the current state times the probability of emitting the first symbol of the sequence in the current state.

In [56]:
for i in range(1, len(states) -1):
    F[i][1] = transitions[(states[0], states[i])] * \
              emissions[states[i]][sequence[0]]
print_matrix(F,states, sequence)


          <sos>     A         T         G         C         G         <eos>     
b         1.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  
y         0.00e+00  2.00e-02  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  
n         0.00e+00  2.00e-01  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  
e         0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  


In order to choose the maximum probability at each of the following steps we will need to find the maximum in a list of values. It will also be useful to know the index of the state that has this maximum probability so that we can set the corresponding pointer. Therefore, we implement a simple function that, given a list of values, returns the maximum value and the index of the maximum value.


In [0]:
def get_max_val_index(values):
    max_val = values[0]
    max_index = 0 
    for index, val in enumerate(values):
        if val > max_val:
            max_val = val
            max_index = index
    return max_val, max_index

For all of the other symbols, from the second to the last, we calculate the values as the maximum of probabilities. **For each states,it is the maximum of probabilities of transitioning from any previous state to the current state times the probability of the provious state.** We also set the corresponding pointers accordingly.


In [62]:
# loops on the symbol, from the second to the last, it excluldes <eos>.
for j in range(2, len(sequence) + 1):
    # loops on the states, from second state(e.g: 'y')
    # to third state(e.g.: 'n'), 
    # it excludes the last state(e.g.: 'e').
    for i in range(1, len(states) -1):
        values = []
        # loops on all of the possible previous states
        for k in range(1, len(states) -1):
            # appends the value to the list(e.g.: values)
            values.append(F[k][j-1] * 
                          transitions[(states[k],states[i])] * 
                          emissions[states[i]][sequence[j-1]])
        # finds the maximum and the index of the maximum in the list
        max_val, max_index = get_max_val_index(values)
        # sets the probability to the maximum probability
        F[i][j] = max_val
        # because now it starts from second state, 
        # so it should be puls one. 
        FP[i][j] = states[max_index + 1]
    
print_matrix(F, states, sequence)
print("\n\n")
print_matrix_p(FP, states, sequence)



          <sos>     A         T         G         C         G         <eos>     
b         1.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  
y         0.00e+00  2.00e-02  2.00e-03  1.60e-03  4.48e-04  1.25e-04  0.00e+00  
n         0.00e+00  2.00e-01  4.00e-02  8.00e-03  1.60e-03  3.20e-04  0.00e+00  
e         0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  



          <sos>     A         T         G         C         G         eos       
b         b         b         b         b         b         b         b         
y         b         b         n         n         y         y         b         
n         b         b         n         n         n         n         b         
e         b         b         b         b         b         b         b         


Now, we need to calculate the final value, It is the maximum of probabilities of transitioning from any previous state to the end state times the probability of the previous state.

In [64]:
values = []
for k in range(1, len(states) -1 ):
    values.append(F[k][len(sequence)] * 
                  transitions[(states[k], states[-1])])
max_val, max_index = get_max_val_index(values)
F[-1][-1] = max_val
FP[-1][-1] = states[max_index + 1]

print_matrix(F, states, sequence)
print("\n\n")
print_matrix_p(FP, states, sequence)

          <sos>     A         T         G         C         G         <eos>     
b         1.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  
y         0.00e+00  2.00e-02  2.00e-03  1.60e-03  4.48e-04  1.25e-04  0.00e+00  
n         0.00e+00  2.00e-01  4.00e-02  8.00e-03  1.60e-03  3.20e-04  0.00e+00  
e         0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  0.00e+00  3.20e-05  



          <sos>     A         T         G         C         G         eos       
b         b         b         b         b         b         b         b         
y         b         b         n         n         y         y         b         
n         b         b         n         n         n         n         b         
e         b         b         b         b         b         b         n         


we can now put everything into a function that takes states, transtions, emissions and a sequence and returns a matrix F and a matrix FP of pinters.

In [0]:
def viterbi(states, transitions, emissions, sequence):
    # initialize two matrix,
    F = initialize_matrix(len(states), len(sequence) + 2)
    FP = initialize_matrix(len(states), len(sequence) + 2, states[0])
    F[0][0] = 1 

    for i in range(1, len(states) -1 ):
        F[i][1] = transitions[(states[0], states[i])] * \
                  emissions[states[i]][sequence[0]]
    
    for j in range(2, len(sequence) + 1 ):
        for i in range(1, len(states) -1 ):
            values = []
            for k in range(1, len(states) -1 ):
                values.append(F[k][j-1] * 
                              transitions[(states[k], states[i])] *
                              emissions[states[i]][sequence[j-1]])
            max_val, max_index = get_max_val_index(values)
            F[i][j] = max_val
            FP[i][j] = states[max_index + 1]
    values = []
    for k in range(1, len(states) -1 ):
        values.append(F[k][len(sequence)] * 
                      transitions[(states[k], states[-1])])
    max_val, max_index = get_max_val_index(values)
    F[-1][-1] = max_val
    FP[-1][-1] = states[max_index + 1]
    return F, FP

In order to traceback the path we need to start from the end state and move through the FP matrix, starts from the last element of the matrix and  moves through the matrix appending, at each step, the current state to the path variable.

In [0]:
def traceback(states, FP):
    # the last element of the path is the end state
    path = ['e']
    # the current state is the one written in the last cell 
    # of the matrix 
    current = FP[-1][-1]
    # Loops on the symbols 
    for i in range(len(FP[0]) - 2, 0, -1):
        # appends the current state to the path
        path = [current] + path
        # finds the index of the current state in the list
        # of states and moves to the corresponding row of FP
        current = FP[states.index(current)][i]
        # the first element of the path is the begin state
    path = ['b'] + path
    # coverts the  list to a string where elements 
    # are separated by space
    return ' '.join(path)

Now, we can visualize the most likely path we obtained.

In [69]:
path = traceback(states, FP)
print('<sos>'+ ' '.join(sequence) + '<eos>')
print(path)

<sos>A T G C G<eos>
b n n n n n e
