# Test Set Run

This notebook loads the pre-trained TF binding prediction model, runs inference on the held-out test sequence dataset (such as `chr22_sequences.txt.gz`), loads the corresponding ground truth scores (such as `chr22_scores.txt.gz`), and calculates the Pearson's R correlation between predictions and ground truth.

## 1. Library Set-Up

In [None]:
import torch
import torch.nn as nn
import pandas as pd
import os
import numpy as np
from scipy.stats import pearsonr

## 2. Configuration

In [None]:
DATA_DIR = 'data' # Replace here for the held-out test data directory
SEQ_FILE = os.path.join(DATA_DIR, 'chr22_sequences.txt.gz') # Replace here for the held-out test data name
SCORE_FILE = os.path.join(DATA_DIR, 'chr22_scores.txt.gz') # Ground truth scores file
MODEL_WEIGHTS_FILE = 'lee-inhyeok-model.pth'

# Model Hyperparameters
NUM_CHANNELS = 32
NUM_RES_BLOCKS = 3
DROPOUT_RATE = 0.2
KERNEL_SIZE = 3

## 3. Helper Functions

In [None]:
def one_hot_encode(sequence):
    """Converts a DNA sequence string to a one-hot encoded tensor."""
    mapping = {'A': 0, 'C': 1, 'G': 2, 'T': 3}
    seq_len = len(sequence)
    encoded = torch.zeros((4, seq_len), dtype=torch.float32)
    for i, base in enumerate(sequence.upper()):
        idx = mapping.get(base, -1)
        if idx != -1:
            encoded[idx, i] = 1.0
    return encoded

def calculate_pearsonr(preds, targets):
    """Calculates Pearson correlation between prediction and target tensors."""
    # Ensure inputs are tensors on CPU and flattened
    preds_flat = preds.detach().cpu().numpy().flatten()
    targets_flat = targets.detach().cpu().numpy().flatten()

    # Remove NaNs
    valid_indices = np.isfinite(preds_flat) & np.isfinite(targets_flat)
    preds_flat = preds_flat[valid_indices]
    targets_flat = targets_flat[valid_indices]

    if len(preds_flat) < 2:
        print("Warning: Not enough valid data points to calculate Pearson R.")
        return 0.0

    try:
        r, p_value = pearsonr(preds_flat, targets_flat)
        print(f"Pearson R p-value: {p_value}")
        return r if np.isfinite(r) else 0.0
    except ValueError as e:
        print(f"Error calculating Pearson R: {e}")
        return 0.0

## 4. Model Architecture

In [None]:
class ResidualBlock(nn.Module):
    """Residual Block with LayerNorm."""
    def __init__(self, num_channels, kernel_size, dropout_rate):
        super().__init__()
        norm_shape = [num_channels, 300]
        self.conv1 = nn.Conv1d(num_channels, num_channels, kernel_size, stride=1, padding=kernel_size // 2, bias=False)
        self.norm1 = nn.LayerNorm(norm_shape)
        self.relu1 = nn.ReLU()
        self.dropout = nn.Dropout(dropout_rate)
        self.conv2 = nn.Conv1d(num_channels, num_channels, kernel_size, stride=1, padding=kernel_size // 2, bias=False)
        self.norm2 = nn.LayerNorm(norm_shape)
        self.relu2 = nn.ReLU()

    def forward(self, x):
        identity = x
        out = self.conv1(x)
        out = self.norm1(out)
        out = self.relu1(out)
        out = self.dropout(out)
        out = self.conv2(out)
        out = self.norm2(out)
        out += identity
        out = self.relu2(out)
        return out

class BindingPredictorCNN(nn.Module):
    def __init__(self, num_channels, num_blocks, kernel_size, dropout_rate):
        super().__init__()
        initial_norm_shape = [num_channels, 300]
        self.initial_conv = nn.Conv1d(4, num_channels, kernel_size, stride=1, padding=kernel_size // 2, bias=False)
        self.initial_norm = nn.LayerNorm(initial_norm_shape)
        self.initial_relu = nn.ReLU()
        self.res_blocks = nn.Sequential(*[ResidualBlock(num_channels, kernel_size, dropout_rate) for _ in range(num_blocks)])
        self.final_conv = nn.Conv1d(num_channels, 1, kernel_size=1, stride=1, padding=0, bias=True)

    def forward(self, x):
        x = self.initial_conv(x)
        x = self.initial_norm(x)
        x = self.initial_relu(x)
        x = self.res_blocks(x)
        x = self.final_conv(x)
        x = x.squeeze(1)
        return x

## 5. Load Model and Weights

In [None]:
print("Loading model and weights...")
if torch.backends.mps.is_available():
    device = torch.device("mps")
    print("Using MPS device (Apple Silicon GPU).")
elif torch.cuda.is_available():
    device = torch.device("cuda")
    print("Using CUDA device.")
else:
    device = torch.device("cpu")
    print("Using CPU device.")

model = BindingPredictorCNN(
    num_channels=NUM_CHANNELS,
    num_blocks=NUM_RES_BLOCKS,
    kernel_size=KERNEL_SIZE,
    dropout_rate=DROPOUT_RATE
)

try:
    model.load_state_dict(torch.load(MODEL_WEIGHTS_FILE, map_location=device))
    model.to(device)
    model.eval() # Set model to evaluation mode
    print(f"Successfully loaded model weights from {MODEL_WEIGHTS_FILE}")
except FileNotFoundError:
    print(f"Error: Model weights file not found at {MODEL_WEIGHTS_FILE}. Cannot proceed.")
    # Optionally raise the error or exit
    # raise

## 6. Load Sequence and Score Data

In [None]:
print("Loading sequence and score data...")
sequences_df = None
scores_df = None
try:
    sequences_df = pd.read_csv(SEQ_FILE, sep="\t", compression='gzip')
    scores_df = pd.read_csv(SCORE_FILE, sep="\t", compression='gzip')
    print(f"Loaded {len(sequences_df)} sequences and {len(scores_df.columns)} score vectors.")
    
    # Basic check for consistency
    if len(sequences_df) != len(scores_df.columns):
        print(f"Warning: Number of sequences ({len(sequences_df)}) does not match number of score columns ({len(scores_df.columns)}). Ensure they correspond correctly.")
        min_len = min(len(sequences_df), len(scores_df.columns))
        sequences_df = sequences_df.iloc[:min_len]
        scores_df = scores_df.iloc[:, :min_len]
        print(f"Proceeding with {min_len} sequence/score pairs.")
        
except FileNotFoundError as e:
    print(f"Error: Data file not found. Ensure '{e.filename}' exists in '{DATA_DIR}'.")
    # Optionally raise the error or exit
    # raise

## 7. Prepare Input and Ground Truth Data

In [None]:
print("Preparing input and ground truth data...")
inputs_tensor = None
ground_truth_tensor = None

if sequences_df is not None and scores_df is not None:
    # One-hot encode all sequences from the file
    all_sequences = sequences_df['sequence'].tolist()
    all_inputs_list = [one_hot_encode(seq) for seq in all_sequences]

    # Prepare ground truth score tensors
    all_score_tensors = [torch.tensor(scores_df[col].values, dtype=torch.float32) for col in scores_df.columns]

    # Stack inputs and ground truth into single tensors
    inputs_tensor = torch.stack(all_inputs_list).to(device)
    ground_truth_tensor = torch.stack(all_score_tensors).to(device)

    print(f"Prepared {len(all_inputs_list)} inputs and corresponding ground truth scores.")
    print(f"Input tensor shape: {inputs_tensor.shape}")
    print(f"Ground truth tensor shape: {ground_truth_tensor.shape}")
else:
    print("Skipping data preparation due to previous loading errors.")

## 8. Predict on the Test Dataset

In [None]:
print("Performing inference on the test dataset...")
predictions = None
if inputs_tensor is not None:
    with torch.no_grad(): # Ensure gradients are not calculated
        predictions = model(inputs_tensor)
    print("Inference complete.")
else:
    print("Skipping inference due to missing input data.")

## 9. Calculate Pearson's R Correlation

In [None]:
print("Calculating Pearson's R correlation...")
if predictions is not None and ground_truth_tensor is not None:
    # Ensure predictions and ground truth have the same shape for comparison
    if predictions.shape == ground_truth_tensor.shape:
        overall_pearson_r = calculate_pearsonr(predictions, ground_truth_tensor)
        print(f"\nPearson's R score: {overall_pearson_r:.4f}")
    else:
        print(f"Error: Shape mismatch between predictions ({predictions.shape}) and ground truth ({ground_truth_tensor.shape}). Cannot calculate correlation.")
else:
    print("Skipping correlation calculation due to missing predictions or ground truth data.")