In [1]:
import numpy as np
import matplotlib.pyplot as plt     
import scipy.special as special    
import pandas as pd

%matplotlib inline

In [2]:
# The HMM (given)
def make_hmm():
     p = 1/200   # Mean seg length of 100. 

     t = np.array([[ 0.0, 1/2,   1/2 ],     # state 0 = start/end
                   [ p, 1-2*p,     p ],     # state 1 = AluminumJesus
                   [ p,     p, 1-2*p ]] )   # state 2 = T4

     e = np.array([[   1/4,   1/4,   1/4,   1/4 ],    # unused; start/end state doesn't emit
                   [ 0.166, 0.334, 0.334, 0.166 ],    # AluminumJesus residue composition
                   [ 0.323, 0.177, 0.177, 0.323 ]] )  # T4 residue composition
     return (t,e)

In [3]:
# create our transition and emission matrices
t, e = make_hmm()

# Question 1: Implement the (four) standard HMM algorithms.

Viterbi: find an optimal state path $\pi$ (and its log probability log $P(x, \pi∣\theta)$), given the HMM parameters $\theta$ and a sequence x. This function takes in a transmission and emission matrix, as well as a DNA sequence (one read).

In [4]:
# create a dictionary for each nucleotide for ease of reference
base_map = {'A': 0, 'C': 1, 'G': 2, 'T': 3}

reversed_base_map = {0: 'A', 1: 'C', 2: 'G', 3: 'T'}

In [5]:
def viterbi(t, e, sequence):
    L = len(sequence)   
    
    # number of states (excluding start/end)
    M = t.shape[0]-1
    
    # initialize V: an L x 2 matrix filled with negative infinity in order to store all of the calculated probabilities
    V = np.full((M, L), -np.inf)    
    
    backtrack = np.zeros((M, L), dtype=int)  # Backpointer matrix
    
    # use the initial starting probabilities (from t) in order to initialize log-probabilities
    for j in range(M):
        V[j, 0] = np.log(t[0, j+1]) + np.log(e[j+1, base_map[sequence[0]]])
    
    # Recursion
    # iterate through each position in a read/sequence
    for i in range(1, L):  
        base = sequence[i]
        # iterate through every state
        for k in range(M):  
            # calculate probabilities for each transition and record the maximum in matrix V
            max_prob, max_state = max(
                (V[j, i-1] + np.log(t[j+1, k+1]) + np.log(e[k+1, base_map[base]]), 
                 j)
                for j in range(M)
            )
            V[k, i] = max_prob
            backtrack[k, i] = max_state

    # termination step: compute final log-prob and determine last state for optimal path
    last_probs = [(V[k, L-1] + np.log(t[k+1, 0]), k) for k in range(M)]
    log_prob, last_state = max(last_probs)

    # traceback to find the optimal path
    paths = []
    path = [last_state + 1]
    for i in range(L - 1, 0, -1):
        last_state = backtrack[last_state, i]
        path.insert(0, last_state + 1)  

    # return the optimal path and its log probability
    return ''.join(map(str, path)), log_prob  

Forward: calculate the Forward dynamic programming matrix and the total log probability log $ P(x∣\theta)$, given the HMM parameters $\theta$ and a sequence $x$, and the total log probability log $P(x∣\theta)$.

In [6]:
def forward(t, e, sequence):
    L = len(sequence)  
    M = t.shape[0]  

    # initialization of forward dynamic programming matrix, initialized to -inf
    F = np.full((M, L + 1), -np.inf)  
    F[0, 0] = 0 

    # using the starting probabilities, initialize the first column across all states
    for k in range(1, M):
        F[k, 1] = np.log(t[0, k]) + np.log(e[k, base_map[sequence[0]]])

    # iterate over observed symbols, from 2 to L
    for i in range(2, L + 1):  
        # grab the previous base: str, from the sequence as we iterate
        base = sequence[i-1]
        
        # iterate through every state, m (excluding start/end)
        for k in range(1, M):  
            # compute logsumexp over previous states j
            log_probs = [F[j, i-1] + np.log(t[j, k]) + np.log(e[k, base_map[base]]) for j in range(1, M)]
            F[k, i] = special.logsumexp(log_probs)

    # termination step:
    # calculate the total log probability by summing over transitions to the end state
    log_prob_x = special.logsumexp([F[k, L] + np.log(t[k, 0]) for k in range(1, M)])

    return F, log_prob_x

Backward: calculate the Backward dynamic programming matrix and the total log probability log $ P(x∣\theta)$, given the HMM parameters $\theta$ and a sequence $x$, and the total log probability log $P(x∣\theta)$.

In [7]:
def backward(t, e, sequence):
    L = len(sequence)  
    M = t.shape[0]  

    # initialization of backward dynamic programming matrix, initialized to -inf
    B = np.full((M, L+1), -np.inf)  

    # using the starting probabilities, initialize the last column across all states
    for k in range(1, M):  
        # transition from state m to the end state (state 0)
        B[k, L] = np.log(t[k, 0])  

    # Recursion step:
    # iterate backwards through the sequence to position 1
    for i in range(L-1, 0, -1): 
        base = sequence[i]
        for k in range(1, M):  # For each main state m
            # Compute logsumexp over next states m_next
            log_probs = [B[j, i + 1] + np.log(t[k, j]) + np.log(e[j, base_map[base]]) for j in range(1, M)]
            B[k, i] = special.logsumexp(log_probs)

    # Termination step:
    # calculate the total log probability by summing over transitions from the start state (state 0) to each state j at the first position
    log_prob_x = special.logsumexp([B[k, 1] + np.log(t[0, k]) + np.log(e[k, base_map[sequence[0]]]) for k in range(1, M)])

    return B, log_prob_x

Decoding: given the Forward and Backward matrices, do posterior decoding to calculate $P(\pi_i = k| x, \theta)$, the probability that residue $i$ was generated by state $k$.

In [8]:
def decode(F, f_prob_matrix, B, b_prob_matrix, threshold = 0.9):
    """
    Parameters:
    - F: forward matrix
    - f_prob: forward total log probability
    - B: backward matrix 
    - b_prob: backward total log probability
    """

    M, L = F.shape  # M is the number of states, L+1 includes the start state at position 0

    # initialize the decoding matrix D to match the dimensions of the backward and forward algorithm total probability matrices
    D = np.zeros((M, L))  
    D[0, 0] = 1

    # start in position 1 (or the second position) until the length of the sequence
    for i in range(1, L):  
        for k in range(1, M):  #For each main state m
            D[k, i] = np.exp(F[k, i] + B[k, i] - f_prob_matrix)

    # initialize decoded path
    path = []

    for i in range(1, L): 
        # find the state with the highest posterior probability
        max_prob, max_state = max((D[1, i], 1), (D[2, i], 2))
        
        #check if the highest probability exceeds the threshold
        if max_prob >= threshold:
            path.append(str(max_state))
        else:
            # if the log probability is below the threshold, then we don't assume a state
            path.append("U")  
    decoded_paths = "".join(str(state) for state in path)

    # return a list of the decoded path as well as the posterior decoding matrix 
    return decoded_paths, D

The accuracy of an inferred path is $\frac{T}{T+F}$. Below, the <code>accuracy</code> calculates the accuracy of each algorithm for each of the 20 reads. It does this by taking in two arguments: a list of optimal paths from an HMM algorithm, and a list of reads that are the "truth" in order to check for accuracy.

In [9]:
def accuracy(groundtruth_path, inferred_path):
    T = 0 # true counts
    F = 0 # false counts
    for i in range(len(groundtruth_path)):
        # disregard positions that are unknown ('U'), in the case we are comparing ground truths to paths inferred from 
        # the decoding algorithm
        if inferred_path[i] == 'U':
            continue
        elif groundtruth_path[i] == inferred_path[i]:
            T += 1
        else:
            F += 1

    accuracy = T / (T+F)
    return accuracy

# Question 2: Do synthetic positive controls. Use the HMM to generate some synthetic positive control sequences.

Below, my function <code>pos_controls</code> generates 'n' synthetic positive control sequences of length L using the transmission matrix t and emission matrix e. It also returns necessary information about these positive control sequences, such as the position where the sequence switches to another state as well as which state it was originally in. This will be useful for reconstructing the path later on, which we will need to test the accuracy of my Viterbi and decode algorithm. This is stored and outputted in a dataframe titled <code>sequence_info</code>.

In [10]:
def pos_controls(n, t, e):
    """
    Parameters:
    - n: number of sequences to generate
    - t: transition probability matrix
    - e: emission probability matrix

    """
    M = t.shape[0]
    sequences_info = []

    for i in range(n):
        sequence = []
        switch_positions = []
        current_state = 0
        start_state = None
        end_state = None
        position = 0

        # allow the sequence to continue to generate until we return to the end state (state 0)
        while True:
            # transition to a new state based on the transition probabilities from current_state
            next_state = np.random.choice(M, p=t[current_state])
            
            # if we've returned to state 0, we terminate the sequence
            if next_state == 0:
                # we record the end_state, or the second state of the sequence 
                end_state = current_state
                break
            
            # record the first state after the start state as `start_state`
            if start_state is None:
                start_state = next_state
            
            # record the position where we transition to a new state
            if current_state != next_state:
                switch_positions.append(position)
            
            # produce a symbol based on the emission probabilities of the new state
            symbol_index = np.random.choice(4, p=e[next_state])
            symbol = reversed_base_map[symbol_index]
            sequence.append(symbol)
            
            # Move to the next state
            current_state = next_state
            position += 1
        
        # convert the sequence list to a string
        full_sequence = "".join(sequence)
        
        sequence_info = {
            'sequence': full_sequence,
            'switch_positions': switch_positions,
            'start_state': start_state,
            'end_state': end_state
        }
        # append the sequence information as a dictionary to the list 'sequences_info'
        sequences_info.append(sequence_info)
    
    # convert sequences_info to a DataFrame
    sequences_info_df = pd.DataFrame(sequences_info)

    # return the dataframe to be used in calculating accuracies
    return sequences_info_df

Below, I generate 10 synthetic sequences and display the positions at which they switched, as well as their start and end states, in a dataframe.

In [11]:
# generate 10 synthetic sequences
n = 10

syn_sequences_df = pos_controls(n, t, e)
syn_sequences_df

Unnamed: 0,sequence,switch_positions,start_state,end_state
0,TTACTTTACCTATTTAGATAAATAAACAAATTGAACGGCCCCGACG...,"[0, 34]",2,1
1,AGCCGATCGTTGTACCGCGCAAACGACGCCCAGGGGCTTGGGGGCG...,"[0, 115, 156, 400]",1,2
2,CTTGCCGGTGCACGGGGCACAGCTCTATAGAGGCATATTACATTTT...,"[0, 13, 389, 555, 686, 755, 775, 1522, 1615]",1,1
3,AAGGGGTCCGCGGTACGCGCCGAGTGGACCCATCAGTTGGCGCCCC...,"[0, 98, 234]",1,1
4,TGACAACATATGCACTACTCTAAGTAGAATGTTTACTTAGTTATAC...,[0],2,2
5,CCCGAGACCTCCGAGCGC,[0],1,1
6,CTGCGGGAGGAGCTCAGTGGCGCCCGGACCCATCGCGGCCGAGTGG...,[0],1,1
7,GAGTGGTTGATGCCATAATGAGAGTGTGCGCGACAATTGCGCCAGG...,[0],1,1
8,TCGTTCTCATGAGTATATTTAGCATTATAATTCTATATTCGGGAAA...,[0],2,2
9,CTTACGAGGCTATCCGCC,[0],1,1


Analyze the generated sequences (<code>syn_sequences</code>) with your HMM algorithms, obtaining a Viterbi state path and a decoded state path. Evaluate the accuracy of each path, for each generated sequence.

In [12]:
# run the viterbi and decoding algorithms on my synthetic sequences

# set syn_sequences to the column of reads from syn_sequences_df
syn_sequences= list(syn_sequences_df['sequence'])

p_control_viterbi = []
for sequence in syn_sequences:
    path = viterbi(t, e, sequence)
    p_control_viterbi.append(path[0])

p_control_decoded = []
for sequence in syn_sequences:
    F, f_prob = forward(t, e, sequence)
    B, b_prob = backward(t, e, sequence)
    path = decode(F, f_prob, B, b_prob, threshold=0.9)[0]
    p_control_decoded.append(path)

In [13]:
# convert the syn_sequences_df dataFrame into an array for ease of calculation 
truth_generated = np.array(syn_sequences_df)

# parse the true paths (the true state switches) 
p_true_paths = []
for i in range(truth_generated.shape[0]):
    length=len(truth_generated[i,0])
    if len(truth_generated[i, 1]) == 1:
        path = str(truth_generated[i, 2]) * length
    else:
        path = str(truth_generated[i, 2]) * truth_generated[i, 1][1] + str(truth_generated[i, 3]) * (length - truth_generated[i, 1][1])
    p_true_paths.append(path)

In [14]:
# calculate the accuracies of the viterbi and decode algorithms
p_viterbi_accuracies=[]

for i in range(10):
    p_viterbi_accuracies.append(accuracy(p_true_paths[i], p_control_viterbi[i]))

p_decoded_accuracies=[]

for i in range(10):
    p_decoded_accuracies.append(accuracy(p_true_paths[i], p_control_decoded[i]))

In [15]:
# make a dataframe of accuracies
p_accuracy_df = pd.DataFrame(columns=['read_#', 'p_controls_viterbi', 'p_controls_decoded'])
p_accuracy_df['read_#'] = np.linspace(0,9, 10)
p_accuracy_df['p_controls_viterbi'] = p_viterbi_accuracies
p_accuracy_df['p_controls_decoded'] = p_decoded_accuracies
p_accuracy_df

Unnamed: 0,read_#,p_controls_viterbi,p_controls_decoded
0,0.0,0.980392,1.0
1,1.0,0.611301,0.58046
2,2.0,0.679675,0.672727
3,3.0,0.458955,0.533632
4,4.0,1.0,1.0
5,5.0,1.0,1.0
6,6.0,1.0,1.0
7,7.0,1.0,1.0
8,8.0,1.0,1.0
9,9.0,1.0,1.0


What trend(s) do you observe in these accuracies, with respect to features of the generated sequences?

**Answer:** Most noticeably, the viterbi and decoded algorithms perfectly traceback the path for the generated sequences that had no state switches at all. The viterbi algorithm performed close to 100% or read 0, but performed extremely poorly for reads 1-3. This may be because read 0 only has one switch, while reads 1-3 include multiple switches. 

The decoded algorithm, on the other hand, performs well on ALL reads except for reads 1-3. Once again, perhaps this is due to the multiple state switches occuring in reads 1-3. The decoded algorithm performing better on read 0 also indicates that it perhaps performs better than the viterbi algorithm when there is a state switch.

What do you observe about these generated sequences, compared to the actual features of Moriarty's reads? What are a couple of discrepancies where the HMM model is imperfectly describing what the actual data look like?

**Answer:** These generated sequences are of varying lengths and can also undergo either no switches between states (genomes); the generated sequences can undergo multiple switches within one sequence, as shown with multiple values in the column 'switch_positions.' On the other hand, all of Moriarty's reads are of length 200 and undergo one state switch ONLY. They  start out in either AluminumJesus or T4, and switch to the other state once. 

The reason that our generated sequences fail to capture this is because of our HMM model's t matrix, which includes the probability that there is a state switch (regardless of whether or not one already occurred), which is 0.005. This indicates that a state switch can occur in any position. The probability of the sequence ending is also 0.005 at any position.

**Question 3:** Analyze Moriarty's read sequences.

Use the HMM to analyze the 20 sequences in the file of Moriarty's reads. Here you'll need at least two additional functions: a function to parse the read sequences in the moriarty-reads.fa FASTA file, and a function to parse the ground truth table in 'moriarty-reads.truth' and convert those data for each read sequence into state paths.

In [16]:
# function to convert FASTA file to a list of sequences
def converter(file_name):

    # read and open the file
    with open(file_name, 'r') as file:
        sequences = []
        sequence = ''  

        for line in file:
            # if the line starts with '>', it's a new sequence header
            if line.startswith('>'):
                # if there's already a sequence, append it to sequences list
                if sequence:
                    sequences.append(sequence)
                    sequence = ''  # reset for the next sequence

            # strip whitespace/linebreaks and add the line to the current sequence
            else:
                sequence += line.strip()

        # append the last sequence after the loop ends
        if sequence:
            sequences.append(sequence)
    
    # return the list of sequences
    return sequences

In [17]:
# convert the reads in 'moriarty-reads.fa'
reads = converter('moriarty-reads.fa')

In [18]:
# run the viterbi algorithm on Moriarty's reads and store the optimal paths in viterbi_paths
viterbi_paths = []
for sequence in reads:
    path = viterbi(t, e, sequence)
    viterbi_paths.append(path[0])

In [19]:
# run the decode algorithm on Moriarty's reads and store the optimal paths in decoded_paths
decoded_paths = []
for sequence in reads:
    F, f_prob = forward(t, e, sequence)
    B, b_prob = backward(t, e, sequence)
    path = decode(F, f_prob, B, b_prob, threshold=0.9)[0]
    decoded_paths.append(path)

In [20]:
# function to parse the ground truth table so that we can compare the true paths with our inferred paths
def read_truthfile(file_name):

    with open(file_name, "r") as file:
        lines = file.readlines()
        # initialize a matrix to store our switch position, start state, and end state 
        truth_matrix = np.zeros((len(lines) - 1, 3), dtype=int)
        
        # skip the header line, so start at line 1
        for i, line in enumerate(lines[1:]):  
            if line.startswith('read'):  
                # split by '\t', or tab
                cols = line.strip().split('\t')  
                # Extract k, g0, g1 columns
                k, g0, g1 = int(cols[2]), int(cols[3]), int(cols[4])
                # append a new row for a new read
                truth_matrix[i, :] = [k, g0, g1]  
    
    paths = []
    for i in range(truth_matrix.shape[0]):
        # create the paths based on g0, g1, and k
        path = str(truth_matrix[i, 1]) * truth_matrix[i, 0] + str(truth_matrix[i, 2]) * (200 - truth_matrix[i, 0])
        paths.append(path)
    return truth_matrix, paths

truth_file = "moriarty-reads.truth"
truth_matrix, truth_paths = read_truthfile(truth_file)

In [21]:
# calculate the accuracies for the paths produced using the viterbi and decode algorithm, respectively
viterbi_accuracies=[]
for i in range(20):
    viterbi_accuracies.append(accuracy(truth_paths[i], viterbi_paths[i]))

decoded_accuracies=[]
for i in range(20):
    decoded_accuracies.append(accuracy(truth_paths[i], decoded_paths[i]))

In [22]:
# make a dataframe of accuracies
accuracy_df = pd.DataFrame(columns=['read_#', 'viterbi', 'decoded'])
accuracy_df['read_#'] = np.linspace(0,19, 20)
accuracy_df['viterbi'] = viterbi_accuracies
accuracy_df['decoded'] = decoded_accuracies
accuracy_df

Unnamed: 0,read_#,viterbi,decoded
0,0.0,0.995,1.0
1,1.0,0.97,1.0
2,2.0,0.98,1.0
3,3.0,1.0,1.0
4,4.0,0.995,1.0
5,5.0,0.995,1.0
6,6.0,0.99,1.0
7,7.0,0.97,1.0
8,8.0,0.9,0.917647
9,9.0,0.995,1.0


Does the HMM accurately infer the structure of each chimeric read - which residues came from which source genome? How does posterior decoding compare to Viterbi for doing this inference?

**Answer:** The HMM more accurately infers the structure of each chimeric read when using the posterior decoding algorithm. The viterbi is slow to recognize quick switches. Thus, the accuracy dips when the switch point happens either very late or very early in the sequence. In the viterbi algorithm, the penalty of switching is higher than the gain you wuld get by switching. Therefore, the  forward/backward is better for cases in which there are quick switches.

However, for some positions, we input a "U" in the inferred path of a sequence, as the decoding algorithm does not categorize  the state at that position (we don't always get an "answer" to compare to to the true paths). Therefore, we have less values to compare when using the decoding algorithm. The decoding algorithm thus performs worse in shorter sequences. 

Viterbi always gives us an "answer" in the sense that there is an inferred state at every position. Unlike the decoding algorithm, it does not check you how certain you are about a state, or it does not check for a most probable state assignment above a set threshold.

# Question 4: Suggest a better HMM, and a better approach altogether

Two of the main issues identified with our HMM are that we do not produce sequences of a fixed length and there is also no limit to the number of state switches possible. This was evident in our synthetically generated positive controls, which were all of varying length and ranged in number of state switches, from no state switches to more than just one. This does not accurately capture the problem that is occuring in Moriarty's reads. Ideally, a reducible markov chain, which includes statess you cannot "go out" of, would be ideal; however, we still want to have the ability to start in either state. 

The first change would be to ensure that we run through 200 possible state changes, or include 200 positions in our path. This can be done by removing the probability that you could move from state 1 or 2 to state 0.

In order to limit the number of state switches to 1, we could alter our model to include "irreversible" arrows. We would start in state 0 and have the option to move to state 1 or state 2. However, instead of connecting these two states with a transition probability of 0.005, we draw arrows from state 1 and state 2 to two newly labeled states, states 3 and 4, respectively. Although these states are labeled differently, only the labels are different -- state 3 will be the same in principle as state 2 and state 4 will be the same as state 1. In other words, states 1 and 4 could represent AluminumJesus and states 2 and 3 could represent T4. The transition and emission probabilities would be the same except the model can only move in two paths:
1. from state 0 (start) -> state 1 (AlumnimumJesus) -> state 3 (T4)
2. from state 0 (start) -> state 2 (T4) -> state 4 (AlumnimumJesus)

The transition probability from one state to another will still be 0.005, but this model limits the number of state switches (after leaving state 0) to one switch.

In [23]:
%load_ext watermark
%watermark -v -m -p jupyter,numpy,matplotlib

Python implementation: CPython
Python version       : 3.12.5
IPython version      : 8.27.0

jupyter   : 1.1.1
numpy     : 2.1.1
matplotlib: 3.9.2

Compiler    : Clang 16.0.6 
OS          : Darwin
Release     : 21.6.0
Machine     : x86_64
Processor   : i386
CPU cores   : 4
Architecture: 64bit

