In [None]:
# Import necessary libraries
import pickle
import numpy as np
import pandas as pd
from sklearn.metrics import mean_squared_error
from scipy.stats import norm
from hmmlearn.hmm import GaussianHMM
from Bio import pairwise2
from Bio.pairwise2 import format_alignment
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import accuracy_score



#### Load training data

In [2]:
# Load k-mer baseline table
kmer_baseline_table_path = './kmer_baseline_table.tsv'
kmer_baseline_table = pd.read_csv(kmer_baseline_table_path, sep='\t')

# Load training data
train_data_path = './train_data.pkl'
with open(train_data_path, 'rb') as file:
    train_data = pickle.load(file)


In [3]:
import os
os.environ['LOKY_MAX_CPU_COUNT'] = '4'  

In [4]:
###################### I USED ONLY 50 training iterations
# Use GaussianHMM to solve problem 2

# Extract training data components
sequence = train_data['sequence']
kmers = train_data['kmers']
baseline_levels = train_data['baseline_levels']
simulations = train_data['simulations']

# Prepare data for GaussianHMM
observed_signals = np.concatenate([np.array(simulation['raw_signal']) for simulation in simulations])
n_states = len(kmers)


# Train GaussianHMM
hmm_model = GaussianHMM(n_components=n_states, covariance_type="diag", n_iter=50)
hmm_model.fit(observed_signals.reshape(-1, 1))

# Extract inferred parameters
learned_means = hmm_model.means_.flatten()
learned_stds = np.sqrt(hmm_model.covars_.flatten())
learned_trans_probs = hmm_model.transmat_

# Define true parameters
true_trans_probs = np.zeros((len(kmers), len(kmers)))
for i in range(len(kmers)):
    if i > 0:
        true_trans_probs[i, i - 1] = 0.5
    if i < len(kmers) - 1:
        true_trans_probs[i, i + 1] = 0.5
true_trans_probs = true_trans_probs / true_trans_probs.sum(axis=1, keepdims=True)

# True emission parameters
true_means = baseline_levels
true_stds = kmer_baseline_table['level_stdv'].values[:len(learned_stds)]

# Calculate differences (MSE)
trans_probs_diff_hmm = mean_squared_error(true_trans_probs.flatten(), learned_trans_probs.flatten())
means_diff_hmm = mean_squared_error(true_means, learned_means)
stds_diff_hmm = mean_squared_error(true_stds, learned_stds)

# Print differences
print(f"Difference in transition probabilities (MSE): {trans_probs_diff_hmm}")
print(f"Difference in emission means (MSE): {means_diff_hmm}")
print(f"Difference in emission stds (MSE): {stds_diff_hmm}")


Difference in transition probabilities (MSE): 0.07182149451784077
Difference in emission means (MSE): 671.3837928406379
Difference in emission stds (MSE): 6.570236829180282


Problem 2: Predict the DNA sequence corresponding to the test data test_data_raw_signals.npy, which contains a list of 100 raw signals corresponding to the same DNA sequence

In [5]:
test_data_path = 'test_data_raw_signals.pkl'
with open(test_data_path, 'rb') as file:
    test_data_raw_signals = pickle.load(file)

# I inspect the structure of the test data
type(test_data_raw_signals), len(test_data_raw_signals), test_data_raw_signals[0][:5]  # Check type, length, and first few values of the first signal


(list,
 100,
 array([112.5486547 , 113.32496114, 103.91446781, 112.99350533,
        113.15702902]))

In [6]:

# Initialize lists to store predicted sequences
predicted_sequences = []

# Iterate over each raw signal in the test data
for idx, raw_signal in enumerate(test_data_raw_signals, start=1):
    # Ensure raw signals are in 2D shape (num_samples, 1)
    raw_signal_array = np.array(raw_signal).reshape(-1, 1)
    
    # Predict the most likely states using the trained HMM (Viterbi algorithm)
    predicted_states = hmm_model.predict(raw_signal_array)
    
    # Convert state indices to k-mers
    predicted_kmers = [kmers[state] for state in predicted_states]
    
    # Reconstruct the DNA sequence from predicted k-mers
    if not predicted_kmers:
        predicted_sequence = ""
    else:
        predicted_sequence = predicted_kmers[0]  # Start with the first k-mer
        for kmer in predicted_kmers[1:]:
            predicted_sequence += kmer[-1]  # Append only the last base of each subsequent k-mer
    
    # Append the reconstructed sequence to the list
    predicted_sequences.append(predicted_sequence)

    # Print progress
    print(f"Predicted Sequence {idx}: {predicted_sequence}")

Predicted Sequence 1: GTAAGCGACACACACACACGGGGGGGGGGGCGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGCACACACACACACACGGGGGGGGGGCCACACACACACACACACGGGGGGGGGGGGCGCACACACACACACACACACGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGCATATACACACACATATAGGGGACACACACACACACACACACACATAACACACACACACACACACAGTTTTTTTGCACACACCACACACACAGGGGG
Predicted Sequence 2: GTAAGACACACGGGGGGACACACACACAGGGGGGGGGGGACACAGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGCATATATATATATATATACACACACAGGGCACACACACACACACACACGCACACACACACCACACACACACACACATATATAG
Predicted Sequence 3: GTAAGCGGGGACAGGGGGGGGGGGGGCAGGCAGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGCATATAACACACACGGGCACACACACACACACACACATATATATAACGGGGGGGGGG
Predicted Sequence 4: GTAAGCACACACGGGGGGGGGGCGGGGGGGGGGGGGGGGGCACACACACACACACACACACACACACGGGACGAGGGGGGGGGGGGGCACAGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGCAAGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGCATACACACACAGGGGCACATATAAGTTACACACACACACA
Predicted Sequence 5: GTAAGCACACACACACACACACGGGGGGGGGGGGG

Problem 3 . Forward backward algorithm from scratch

Try to implement the Forward-Backward algorithm to infer the most likely path of the states from scratch. Do you get the same result as sklearn? (I first implemented the algorithm and now I use hmmlearn)

4) Try to implement the Forward-Backward algorithm to infer the most likely path of the states from scratch. Do you get the same result as sklearn?

In [7]:


# Define the custom NanoporeHMM class
class NanoporeHMM:
    def __init__(self, learned_trans_probs, learned_means, learned_stds):
        """
        Initialize the HMM model using learned parameters.
        """
        self.trans_probs = learned_trans_probs
        self.means = learned_means
        self.stds = learned_stds
        self.num_states = len(learned_means)

    def emission_probability(self, observation, state):
        """
        Compute the emission probability for a given observation and state.
        """
        mean = self.means[state]
        std = self.stds[state]
        return norm.pdf(observation, loc=mean, scale=std)

    def forward_algorithm(self, observations):
        """
        Forward algorithm to compute the likelihood of the observation sequence.
        """
        num_obs = len(observations)
        forward_probs = np.zeros((num_obs, self.num_states))

        # Initialize base case (t=0)
        forward_probs[0] = [self.emission_probability(observations[0], state) / self.num_states 
                            for state in range(self.num_states)]

        # Recursion step
        for t in range(1, num_obs):
            for state in range(self.num_states):
                forward_probs[t, state] = self.emission_probability(observations[t], state) * \
                                         np.sum(forward_probs[t - 1] * self.trans_probs[:, state])

        total_likelihood = np.sum(forward_probs[-1])
        return forward_probs, total_likelihood

    def predict_sequence(self, observations):
        """
        Use Viterbi algorithm to find the most likely state sequence.
        """
        num_obs = len(observations)
        viterbi_probs = np.zeros((num_obs, self.num_states))
        backpointers = np.zeros((num_obs, self.num_states), dtype=int)

        # Initialize base case
        viterbi_probs[0] = [self.emission_probability(observations[0], state) / self.num_states 
                            for state in range(self.num_states)]

        # Recursion step
        for t in range(1, num_obs):
            for state in range(self.num_states):
                max_prob, max_state = max(
                    (viterbi_probs[t - 1, prev_state] * self.trans_probs[prev_state, state], prev_state)
                    for prev_state in range(self.num_states)
                )
                viterbi_probs[t, state] = max_prob * self.emission_probability(observations[t], state)
                backpointers[t, state] = max_state

        # Backtrace
        most_likely_sequence = np.zeros(num_obs, dtype=int)
        most_likely_sequence[-1] = np.argmax(viterbi_probs[-1])
        for t in range(num_obs - 2, -1, -1):
            most_likely_sequence[t] = backpointers[t + 1, most_likely_sequence[t + 1]]

        return most_likely_sequence




In [8]:
#Use NanoporeHMM instead of hmmlearn
# Initialize with learned parameters
nanopore_hmm = NanoporeHMM(learned_trans_probs, learned_means, learned_stds)

# Predict DNA sequences for test signals
my_predicted_sequences = []
for raw_signal in test_data_raw_signals:  # Iterate over test signal sequences
    # Ensure raw signals are in the correct shape (1D array)
    raw_signal_array = np.array(raw_signal)

    # Predict the most likely states using your custom Viterbi algorithm
    predicted_states = nanopore_hmm.predict_sequence(raw_signal_array)
    
    # Convert state indices to k-mers
    predicted_kmers = [kmers[state] for state in predicted_states]
    
    # Reconstruct the DNA sequence from predicted k-mers
    predicted_sequence = predicted_kmers[0]  # Start with the first k-mer
    for kmer in predicted_kmers[1:]:
        predicted_sequence += kmer[-1]  # Append only the last base of each k-mer

    my_predicted_sequences.append(predicted_sequence)

# Output the reconstructed DNA sequences
for i, seq in enumerate(my_predicted_sequences):
    print(f"Predicted Sequence {i+1}: {seq}")

Predicted Sequence 1: GCCGACGACACACACACACGGGGGGGGGGGCGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGCACACACACACACACGGGGGGGGGGCCACACACACACACACACGGGGGGGGGACAGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGG
Predicted Sequence 2: CGTGCACACACGGGGGGACACACACACAGGGGGGGGGGGACACAGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGCATATATATATATATATACACACACAGTTGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGG
Predicted Sequence 3: GCCGACGGGGACAGGGGGGGGGGGGGCAGGCAGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGCATATAACACACACGGTGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGG
Predicted Sequence 4: GCCGACACACACGGGGGGGGGGCGGGGGGGGGGGGGGGGGCACACACACACACACACACACACACACGGGACGAGGGGGGGGGGGGGCACAGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGG
Predicted Sequence 5: GCCGACACACACACACACACACGGGGGGGGGGGGG

In [9]:
print(predicted_sequences[:5])
print(my_predicted_sequences[:5])

['GTAAGCGACACACACACACGGGGGGGGGGGCGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGCACACACACACACACGGGGGGGGGGCCACACACACACACACACGGGGGGGGGGGGCGCACACACACACACACACACGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGCATATACACACACATATAGGGGACACACACACACACACACACACATAACACACACACACACACACAGTTTTTTTGCACACACCACACACACAGGGGG', 'GTAAGACACACGGGGGGACACACACACAGGGGGGGGGGGACACAGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGCATATATATATATATATACACACACAGGGCACACACACACACACACACGCACACACACACCACACACACACACACATATATAG', 'GTAAGCGGGGACAGGGGGGGGGGGGGCAGGCAGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGCATATAACACACACGGGCACACACACACACACACACATATATATAACGGGGGGGGGG', 'GTAAGCACACACGGGGGGGGGGCGGGGGGGGGGGGGGGGGCACACACACACACACACACACACACACGGGACGAGGGGGGGGGGGGGCACAGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGCAAGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGCATACACACACAGGGGCACATATAAGTTACACACACACACA', 'GTAAGCACACACACACACACACGGGGGGGGGGGGGGGGGGGGGGGGAAACACACACGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGCACACACACACACGGGGGGCACACATATATAT

In [10]:


# Ensure both lists have the same length
assert len(predicted_sequences) == len(my_predicted_sequences), "Lists must have the same number of sequences!"

# Initialize a list to store alignment results
alignment_results = []

# Align and analyze each pair of sequences
for i, (seq1, seq2) in enumerate(zip(predicted_sequences, my_predicted_sequences), start=1):
    alignments = pairwise2.align.globalxx(seq1, seq2)
    best_alignment = alignments[0]
    
    # Extract alignment details
    aligned_seq1, aligned_seq2, score, start, end = best_alignment
    matches = sum(a == b for a, b in zip(aligned_seq1, aligned_seq2) if a != '-' and b != '-')
    total_length = max(len(seq1), len(seq2))
    percent_identity = (matches / total_length) * 100

    # Store details in a dictionary
    alignment_results.append({
        "Pair": i,
        "Score": score,
        "Matches": matches,
        "Length (Seq1)": len(seq1),
        "Length (Seq2)": len(seq2),
        "Percent Identity (%)": percent_identity,
        "Aligned Seq1": aligned_seq1,
        "Aligned Seq2": aligned_seq2
    })

# Convert results to a DataFrame for reporting
df_results = pd.DataFrame(alignment_results)

# Calculate average values
average_score = df_results["Score"].mean()
average_percent_identity = df_results["Percent Identity (%)"].mean()
average_matches = df_results["Matches"].mean()

# Print the average values
print("Average Values:")
print(f"Average Score: {average_score:.2f}")
print(f"Average Percent Identity: {average_percent_identity:.2f}%")
print(f"Average Matches: {average_matches:.2f}")

# Display the summary
print("Alignment Summary:")
print(df_results[["Pair", "Score", "Percent Identity (%)"]])


Average Values:
Average Score: 132.49
Average Percent Identity: 75.13%
Average Matches: 132.49
Alignment Summary:
    Pair  Score  Percent Identity (%)
0      1  185.0             62.925170
1      2  119.0             68.786127
2      3  113.0             77.397260
3      4  196.0             82.352941
4      5  135.0             59.210526
..   ...    ...                   ...
95    96  100.0             52.083333
96    97  116.0             82.857143
97    98  103.0             80.468750
98    99  129.0             90.209790
99   100  134.0             73.224044

[100 rows x 3 columns]


5) Could you design a RNN or Transformer model for this task?

In [23]:
# Clean version

# Load training data

train_data_path = './train_data.pkl'
with open(train_data_path, 'rb') as file:
    train_data = pickle.load(file)

# Extract components
sequence = train_data['sequence']        # DNA sequence
kmers = train_data['kmers']              # List of all k-mers
simulations = train_data['simulations']  # Each simulation is a dict with 'raw_signal'


# Prepare dataset for RNN

def sequence_to_indices(sequence):
    """Convert DNA sequence to numerical indices."""
    nuc_to_idx = {'A': 0, 'T': 1, 'C': 2, 'G': 3}
    return [nuc_to_idx[nuc] for nuc in sequence]

class SignalDataset(Dataset):
    def __init__(self, simulations, sequence):
        self.data = []
        # Convert sequence to indices once
        target_indices = sequence_to_indices(sequence)
        
        for sim in simulations:
            signal = np.array(sim['raw_signal'], dtype=np.float32)
            # Ensure target sequence length matches signal length
            target = self.align_target_to_signal(target_indices, len(signal))
            self.data.append((signal, target))
    
    def align_target_to_signal(self, target_indices, signal_length):
        """Align target sequence to match signal length."""
        # Calculate how many signal points per nucleotide on average
        points_per_nuc = signal_length / len(target_indices)
        
        # Create aligned target sequence
        aligned_target = np.zeros(signal_length, dtype=np.int64)
        for i in range(signal_length):
            # Map each signal point to corresponding nucleotide
            nuc_idx = min(int(i / points_per_nuc), len(target_indices) - 1)
            aligned_target[i] = target_indices[nuc_idx]
            
        return aligned_target
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        return self.data[idx]

def collate_fn(batch):
    # Find max length
    lengths = [len(signal) for signal, _ in batch]
    max_len = max(lengths)
    
    # Pad signals and targets
    signals_padded = []
    targets_padded = []
    for signal, target in batch:
        pad_len = max_len - len(signal)
        signals_padded.append(np.pad(signal, (0, pad_len), 'constant', constant_values=0.0))
        targets_padded.append(np.pad(target, (0, pad_len), 'constant', constant_values=-1))
    
    signals_padded = torch.tensor(signals_padded, dtype=torch.float32)
    targets_padded = torch.tensor(targets_padded, dtype=torch.int64)
    lengths = torch.tensor(lengths, dtype=torch.int64)
    
    return signals_padded, targets_padded, lengths

train_dataset = SignalDataset(simulations, sequence)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)


# Define RNN model

class RNNPredictor(nn.Module):
    def __init__(self, input_size=1, hidden_size=128, num_layers=2):
        super(RNNPredictor, self).__init__()
        self.lstm = nn.LSTM(input_size=input_size, 
                           hidden_size=hidden_size,
                           num_layers=num_layers,
                           batch_first=True,
                           bidirectional=True)
        # Bidirectional LSTM so multiply hidden_size by 2
        self.fc = nn.Linear(hidden_size * 2, 4)  # 4 for ATCG
        
    def forward(self, x, lengths):
        # x: (batch_size, seq_len)
        x = x.unsqueeze(-1)  # Add feature dimension: (batch_size, seq_len, 1)
        
        # Pack sequences
        packed = nn.utils.rnn.pack_padded_sequence(
            x, lengths.cpu(), batch_first=True, enforce_sorted=False
        )
        
        # Process with LSTM
        packed_output, _ = self.lstm(packed)
        output, _ = nn.utils.rnn.pad_packed_sequence(packed_output, batch_first=True)
        
        # Project to nucleotide probabilities
        logits = self.fc(output)  # (batch_size, seq_len, 4)
        return logits


# Training Loop

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = RNNPredictor().to(device)
criterion = nn.CrossEntropyLoss(ignore_index=-1)  # ignore padding
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

print(f"Training on device: {device}")

num_epochs = 10  # adjust as needed
for epoch in range(num_epochs):
    model.train()
    total_loss = 0.0
    
    for signals_padded, targets_padded, lengths in train_loader:
        signals_padded = signals_padded.to(device)
        targets_padded = targets_padded.to(device)
        
        optimizer.zero_grad()
        logits = model(signals_padded, lengths)
        
        # Calculate loss
        loss = criterion(logits.view(-1, 4), targets_padded.view(-1))
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
    
    avg_loss = total_loss / len(train_loader)
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {avg_loss:.4f}")


# Inference

def predict_sequence(model, raw_signal, device):
    """Predict DNA sequence from raw signal."""
    model.eval()
    with torch.no_grad():
        signal = torch.tensor(raw_signal, dtype=torch.float32).unsqueeze(0).to(device)
        lengths = torch.tensor([len(raw_signal)], dtype=torch.int64)
        
        logits = model(signal, lengths)
        pred_indices = torch.argmax(logits, dim=-1)
        
        # Convert indices to nucleotides
        nucleotides = ['A', 'T', 'C', 'G']
        predicted_sequence = ''.join([nucleotides[idx] for idx in pred_indices.squeeze().cpu().numpy()])
        
        return predicted_sequence

# Test prediction
test_data_path = 'test_data_raw_signals.pkl'
with open(test_data_path, 'rb') as file:
    test_data_raw_signals = pickle.load(file)

predicted_sequences = []
for idx, raw_signal in enumerate(test_data_raw_signals, start=1):
    pred_sequence = predict_sequence(model, raw_signal, device)
    predicted_sequences.append(pred_sequence)
    print(f"Predicted Sequence {idx}: {pred_sequence}")

# Save predictions
with open('predicted_sequences.pkl', 'wb') as file:
    pickle.dump(predicted_sequences, file)

Training on device: cuda
Epoch 1/10, Loss: 1.2701
Epoch 2/10, Loss: 1.0821
Epoch 3/10, Loss: 0.9446
Epoch 4/10, Loss: 0.8349
Epoch 5/10, Loss: 0.7265
Epoch 6/10, Loss: 0.6846
Epoch 7/10, Loss: 0.5992
Epoch 8/10, Loss: 0.5364
Epoch 9/10, Loss: 0.6611
Epoch 10/10, Loss: 0.4793
Predicted Sequence 1: TTTTTTTTTTTTTTCCCCCCCCCCCCCAAAAAAAAAAAACCCCCCCCCCCGGGGGGGGGGTTTTTTTTTTTTTTTGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGCCCCCCCCCCCCCCCCCCCCGGGGGGGGAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAATTTTTTTTTTTTTTTTAAAAAAAAAGGGGGGGGGGGGGGGGGGGGGGGGGGGGGTTTTTTTTTTTTTAAAAAAAAAAAAAAAAAAAAAAAAAGGGGGGGGGGGGG
Predicted Sequence 2: TTTTTTTTTCCCCCCCCAAAAAAAACCCCCCGGGGGGGGGTTTTTTTTTTTTGGGGGGGGGGGGGCCCCCCCCCCCCCGGGGGGGGAAAAAAAAAAAAAAAAAAAATTTTTTTTTTTAAAAAAAGGGGGGGGGGGGGGGGTTTTTTTTAAAAAAAAAAAAAAGGGGGGG
Predicted Sequence 3: TTTTTTTTTCCCCCCAAAAAAAACCCCCGGGGGGGGTTTTTTTTTTTGGGGGGCCCCCCCCCCCCCGGGGGGAAAAAAAAAAAAAAATTTTTTTTTAAAAAAGGGGGGGGGGGGGGGTTTTTTTAAAAAAAAAAAAGGGGGG
Predicted Sequence 4: TTTTTTTTTTTCCCCCCCCAAAAAAAAAAACC