In [23]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import r2_score, roc_auc_score, average_precision_score
from sklearn.model_selection import train_test_split
from scipy.stats import pearsonr, spearmanr

# Load dataset
df = pd.read_csv("final_activity_dataset.csv")
df = df.dropna()

print(df.columns)
# Should be: ['TargetGene', 'ID', 'GuideSeq', 'TargetSeqContext', 'log2FC']


Index(['GeneName', 'GuideID', 'GuideSeq', 'TargetSeqContext', 'log2FC'], dtype='object')


In [24]:
# Set random seed for reproducibility
np.random.seed(12)

# Get unique genes
genes = df['GeneName'].unique()

# Select holdout genes
n_holdout = 300 # choose how many genes to hold out
holdout_genes = np.random.choice(genes, n_holdout, replace=False)
print("Holdout Genes:", holdout_genes)

# Create splits
df_test = df[df['GeneName'].isin(holdout_genes)].reset_index(drop=True)
df_train = df[~df['GeneName'].isin(holdout_genes)].reset_index(drop=True)

print("Train size:", df_train.shape)
print("Test size:", df_test.shape)



Holdout Genes: ['PLP2' 'ZNF501' 'TIAL1' 'SRPR' 'RBM33' 'ACO2' 'SUPT4H1' 'MLX' 'ZNF699'
 'SEZ6L2' 'TDGF1' 'PC' 'TAF5L' 'ZFP30' 'PCID2' 'IKZF4' 'ZNF132' 'PHF21A'
 'SMU1' 'DIS3L' 'RPL10' 'CPSF7' 'ASNS' 'SP6' 'SCAF4' 'TDRD15' 'C4orf3'
 'AEN' 'SETX' 'NKX2-5' 'SDC1' 'ZIC4' 'ZNF530' 'ZNF391' 'YEATS4' 'PTMA'
 'RRN3' 'CTPS2' 'POLA1' 'KLF14' 'SETD2' 'PAN3' 'ZNF639' 'FOXA2' 'ZBTB45'
 'CLK3' 'DCLRE1B' 'TDRD9' 'OAS1' 'PPIL2' 'EXOSC2' 'ATL2' 'ING3' 'INSM1'
 'CMTR2' 'ZNF562' 'FAM98A' 'CNOT11' 'NGDN' 'TRIM37' 'BLOC1S6' 'RRP8'
 'NUDC' 'ZNF845' 'DARS' 'CPSF3' 'TNNT2' 'ZNF414' 'SSSCA1' 'SNRNP200'
 'ZCCHC3' 'PRDM15' 'TCF24' 'ID2' 'HOXC11' 'MRPL22' 'RBBP8' 'FAM210A'
 'MYF5' 'SRCAP' 'ZNF780B' 'ZNF696' 'PGM3' 'MPHOSPH8' 'TGFBRAP1' 'ZNF676'
 'GALP' 'ZNF442' 'PGS1' 'PSME2' 'SART3' 'EIF3A' 'RPP40' 'EPOR' 'SMYD2'
 'KLF13' 'GBX2' 'UBE2I' 'SCAF1' 'FAM171B' 'TRMT11' 'MXRA7' 'CREB3L4' 'CRX'
 'NKRF' 'LSM5' 'SLC51B' 'SNRPD1' 'USF2' 'BOLL' 'C9orf114' 'ETFA' 'C1D'
 'CDC37' 'PDCD11' 'NAA15' 'TEP1' 'PAK1IP1' 'POLR2I' 'ZBT

In [25]:
import torch
from torch.utils.data import Dataset
import numpy as np

import torch
from torch.utils.data import Dataset
import numpy as np

# 1) your one-hot encoder
def one_hot_encode(seq, maxlen=30):
    mapping = {'A':[1,0,0,0], 'C':[0,1,0,0], 'G':[0,0,1,0], 'T':[0,0,0,1]}
    seq = seq.upper()[:maxlen].ljust(maxlen, 'N')
    return np.array([mapping.get(b, [0,0,0,0]) for b in seq], dtype=np.float32)

# 2) reverse complement helper
_comp = {'A':'T','T':'A','C':'G','G':'C','N':'N'}
def rev_comp(s):
    return ''.join(_comp.get(b, 'N') for b in reversed(s.upper()))

# 3) faster align: first window with ≥75% matches to rc-guide
def align_center(s1, s2, match_len, maxlen):
    rc = rev_comp(s2)
    thr = int(np.ceil(match_len * 0.75))
    for i in range(len(s1) - match_len + 1):
        if sum(a==b for a,b in zip(s1[i:i+match_len], rc)) >= thr:
            center = i + match_len//2
            break
    else:
        center = len(s1)//2
    # crop around center
    start = max(0, center - maxlen//2)
    end = start + maxlen
    window = s1[start:end].ljust(maxlen, 'N')
    # guide window: place guide at same center within maxlen
    guide_win = ['N'] * maxlen
    offset = center - maxlen//2
    for j, ch in enumerate(s2):
        pos = j + (match_len//2) + offset - (match_len//2)
        if 0 <= pos < maxlen:
            guide_win[pos] = ch
    return window, ''.join(guide_win)

# 4) modified Dataset
class GuideRNADataset(Dataset):
    def __init__(self, df, match_len=5, maxlen=30):
        self.match_len = match_len
        self.maxlen    = maxlen
        self.contexts  = df['TargetSeqContext'].astype(str).tolist()
        self.guides    = df['GuideSeq'].astype(str).tolist()
        self.labels    = df['log2FC'].astype(np.float32).values
        # precompute aligned one-hots
        self.X = []
        for s1, s2 in zip(self.contexts, self.guides):
            a1, a2 = align_center(s1, s2, match_len, maxlen)
            x1 = one_hot_encode(a1, maxlen)
            x2 = one_hot_encode(a2, maxlen)
            self.X.append(np.concatenate([x1, x2], axis=1))  # (maxlen,8)
        self.X = np.stack(self.X)  # (N, maxlen,8)

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return torch.from_numpy(self.X[idx]), torch.tensor(self.labels[idx])




In [26]:
# Flattened input for RF
X_train_rf = np.stack(df_train["TargetSeqContext"].apply(one_hot_encode)).reshape(len(df_train), -1)
X_test_rf = np.stack(df_test["TargetSeqContext"].apply(one_hot_encode)).reshape(len(df_test), -1)

y_train = df_train["log2FC"].values
y_test = df_test["log2FC"].values


In [27]:
#rf = RandomForestRegressor(n_estimators=100, random_state=42)
#rf.fit(X_train_rf, y_train)

#y_pred_rf = rf.predict(X_test_rf)

#print("Random Forest R2 Score:", r2_score(y_test, y_pred_rf))



In [28]:
class SmallCNN(nn.Module):
    def __init__(self):
        super(SmallCNN, self).__init__()
        self.conv1 = nn.Conv1d(8, 64, kernel_size=3, padding=1)
        self.conv2 = nn.Conv1d(64, 128, kernel_size=3, padding=1)
        self.fc1 = nn.Linear(128 * 30, 256)
        self.fc2 = nn.Linear(256, 1)

    def forward(self, x):
        x = x.permute(0, 2, 1)
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = x.view(x.size(0), -1)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x.squeeze(1)


class LargeCNN(nn.Module):
    def __init__(self):
        super(LargeCNN, self).__init__()
        self.conv1 = nn.Conv1d(8, 128, kernel_size=5, padding=2)
        self.conv2 = nn.Conv1d(128, 256, kernel_size=5, padding=2)
        self.conv3 = nn.Conv1d(256, 512, kernel_size=3, padding=1)
        self.fc1 = nn.Linear(512 * 30, 512)
        self.fc2 = nn.Linear(512, 128)
        self.fc3 = nn.Linear(128, 1)

    def forward(self, x):
        x = x.permute(0, 2, 1)
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        x = x.view(x.size(0), -1)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x.squeeze(1)




In [29]:
train_dataset = GuideRNADataset(df_train)
test_dataset = GuideRNADataset(df_test)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)


In [30]:
def train_model(model, train_loader, test_loader, num_epochs=50, patience=5):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-5)
    loss_fn = nn.MSELoss()

    best_val_loss = float('inf')
    patience_counter = 0
    best_model_state = None

    for epoch in range(num_epochs):
        model.train()
        train_losses = []
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = loss_fn(outputs, labels)
            loss.backward()
            optimizer.step()
            train_losses.append(loss.item())

        # Validation
        model.eval()
        val_losses = []
        with torch.no_grad():
            for inputs, labels in test_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = loss_fn(outputs, labels)
                val_losses.append(loss.item())

        avg_val_loss = np.mean(val_losses)
        print(f"Epoch {epoch+1}: Train Loss={np.mean(train_losses):.4f}, Val Loss={avg_val_loss:.4f}")

        # Early stopping check
        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            best_model_state = model.state_dict()
            patience_counter = 0
        else:
            patience_counter += 1
            if patience_counter >= patience:
                print(f"Early stopping triggered at epoch {epoch+1}")
                break

    # Load best model state
    if best_model_state:
        model.load_state_dict(best_model_state)

    return model



In [31]:
small_cnn = SmallCNN()
trained_small_cnn = train_model(small_cnn, train_loader, test_loader, num_epochs=20)

large_cnn = LargeCNN()
trained_large_cnn = train_model(large_cnn, train_loader, test_loader, num_epochs=20)


Epoch 1: Train Loss=0.9020, Val Loss=0.7182
Epoch 2: Train Loss=0.7598, Val Loss=0.7199
Epoch 3: Train Loss=0.6358, Val Loss=0.7221
Epoch 4: Train Loss=0.5765, Val Loss=0.7306
Epoch 5: Train Loss=0.5391, Val Loss=0.7280
Epoch 6: Train Loss=0.5137, Val Loss=0.7523
Early stopping triggered at epoch 6
Epoch 1: Train Loss=0.9024, Val Loss=0.7081
Epoch 2: Train Loss=0.7583, Val Loss=0.7086
Epoch 3: Train Loss=0.6471, Val Loss=0.7149
Epoch 4: Train Loss=0.5864, Val Loss=0.7246
Epoch 5: Train Loss=0.5465, Val Loss=0.7231
Epoch 6: Train Loss=0.5186, Val Loss=0.7441
Early stopping triggered at epoch 6


In [33]:
def predict_model(model, data_loader):
    model.eval()
    preds = []
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    with torch.no_grad():
        for inputs, _ in data_loader:
            inputs = inputs.to(device)
            outputs = model(inputs)
            preds.extend(outputs.cpu().numpy())
    return np.array(preds)

def evaluate_predictions(y_true, y_pred):
    pearson_corr, _ = pearsonr(y_true, y_pred)
    spearman_corr, _ = spearmanr(y_true, y_pred)

    threshold = np.median(y_true)
    y_true_binary = (y_true > threshold).astype(int)

    auroc = roc_auc_score(y_true_binary, y_pred)
    auprc = average_precision_score(y_true_binary, y_pred)

    print(f"Pearson: {pearson_corr:.4f}")
    print(f"Spearman: {spearman_corr:.4f}")
    print(f"AUROC: {auroc:.4f}")
    print(f"AUPRC: {auprc:.4f}")

# Random Forest
#evaluate_predictions(y_test, y_pred_rf)

# Small CNN
y_pred_small = predict_model(trained_small_cnn, test_loader)
evaluate_predictions(y_test, y_pred_small)

# Large CNN
y_pred_large = predict_model(trained_large_cnn, test_loader)
evaluate_predictions(y_test, y_pred_large)


Pearson: 0.3073
Spearman: 0.3346
AUROC: 0.6574
AUPRC: 0.6201
Pearson: 0.3244
Spearman: 0.3648
AUROC: 0.6740
AUPRC: 0.6371


In [57]:
import torch
import numpy as np
from captum.attr import IntegratedGradients
from sklearn.calibration import calibration_curve
from sklearn.metrics import roc_auc_score, average_precision_score
from scipy.stats import pearsonr, spearmanr
import matplotlib.pyplot as plt
import seaborn as sns

# Assuming X_test_tensor exists, create it from test_dataset if needed
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# Create a batch of test data
X_test_tensor = torch.stack([x for x, y in test_dataset])
y_test_tensor = torch.tensor([y for x, y in test_dataset])
X_test_tensor = X_test_tensor.to(device)

def run_integrated_gradients(model, inputs, n_samples=100):
    model.eval()
    model.to(device)
    ig = IntegratedGradients(model)
    baseline = torch.zeros_like(inputs[0]).unsqueeze(0).to(device)
    attributions = []

    for i in range(min(n_samples, len(inputs))):
        attr, _ = ig.attribute(inputs[i].unsqueeze(0), baselines=baseline, return_convergence_delta=True)
        attributions.append(attr.squeeze(0).detach().cpu().numpy())
    return np.array(attributions)

def plot_calibration(y_true, y_pred):
    y_true_bin = (y_true > np.median(y_true)).astype(int)
    y_pred_norm = (y_pred - np.min(y_pred)) / (np.max(y_pred) - np.min(y_pred))
    prob_true, prob_pred = calibration_curve(y_true_bin, y_pred_norm, n_bins=10)

    plt.figure()
    plt.plot(prob_pred, prob_true, marker='o', label='Model')
    plt.plot([0, 1], [0, 1], linestyle='--', color='gray', label='Perfect Calibration')
    plt.xlabel("Mean Predicted Probability")
    plt.ylabel("Fraction of Positives")
    plt.title("Calibration Curve")
    plt.legend()
    plt.grid(True)
    plt.show()

def evaluate_predictions(y_true, y_pred):
    pearson, _ = pearsonr(y_true, y_pred)
    spearman, _ = spearmanr(y_true, y_pred)
    y_bin = (y_true > np.median(y_true)).astype(int)
    auroc = roc_auc_score(y_bin, y_pred)
    auprc = average_precision_score(y_bin, y_pred)
    print(f"Pearson: {pearson:.4f}, Spearman: {spearman:.4f}, AUROC: {auroc:.4f}, AUPRC: {auprc:.4f}")
    return pearson, spearman, auroc, auprc

import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

def plot_positive_contributions(attributions):
    """
    attributions: np.ndarray of shape (n_samples, seq_len, 8)
    Produces a barplot of positive contributions by (position, base, region).
    """
    ch_labels = ["T:A","T:C","T:G","T:T","G:A","G:C","G:G","G:T"]
    mean_attr = attributions.mean(axis=0)   # (seq_len, 8)
    seq_len, n_ch = mean_attr.shape

    records = []
    for pos in range(seq_len):
        for ch in range(n_ch):
            val = mean_attr[pos, ch]
            if val > 0:
                region, base = ch_labels[ch].split(":")
                records.append({
                    "Position": pos + 1,
                    "Region": "Target" if region == "T" else "gRNA",
                    "Base": base,
                    "Mean Attribution": float(val)
                })

    df_plot = pd.DataFrame.from_records(records)

    plt.figure(figsize=(14, 5))
    sns.heatmap(
        data=df_plot,
        x="Position", y="Mean Attribution",
        hue="Base", palette="deep",
        dodge=True
    )
    plt.axvline(x=28.5, color="black", linestyle="--", label="Target/gRNA boundary")
    plt.title("Positive Integrated Gradients Contributions by Position and Base")
    plt.xlabel("Position in Input Window")
    plt.ylabel("Mean Attribution (Positive Only)")
    plt.legend(title="Base")
    plt.tight_layout()
    plt.show()

    return df_plot.sort_values("Mean Attribution", ascending=False)


In [58]:
attributions = run_integrated_gradients(small_cnn, X_test_tensor)
plot_positive_contributions(attributions)
plot_calibration(y_test, y_pred_small)
evaluate_predictions(y_test, y_pred_small)

attributions = run_integrated_gradients(large_cnn, X_test_tensor)
plot_positive_contributions(attributions)
plot_calibration(y_test, y_pred_large)
evaluate_predictions(y_test, y_pred_large)

ValueError: could not convert string to float: 'Target'

<Figure size 1400x500 with 0 Axes>