In [None]:
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader

import torch.optim as optim
import pandas as pd
import numpy as np
import matplotlib
import torch
from torch import nn
from torch.nn import functional as F
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score, r2_score
from sklearn.utils import shuffle
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import roc_auc_score, roc_curve
import glob

import os
from sklearn.metrics import confusion_matrix
from tqdm import tqdm

In [None]:
DATAPATH = '/home/jupyter/ADAPT_PCR_share/safe/dataset'
!ls $DATAPATH

In [None]:
train_df = pd.read_csv('%s/0716_dataset_train.csv'%DATAPATH,index_col=[0,1])
val_df = pd.read_csv('%s/0716_dataset_valid.csv'%DATAPATH,index_col=[0,1])
test_df = pd.read_csv('%s/0716_dataset_test.csv'%DATAPATH,index_col=[0,1])
print(train_df.shape)
print(val_df.shape)
print(test_df.shape)
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)
pd.set_option('display.width', None) # Adjust display width for long lines
pd.set_option('display.max_colwidth', None) # Display full content of cells

train_df.head(1)

In [None]:
column_names = train_df.columns
print(column_names)

In [None]:
print(len(train_df['prod_Tm'].unique()))

# Handcrafted Features

In [None]:
inputs = train_df[["f_length", "f_indel","f_mm","r_length", "r_indel","r_mm","f_Tm", "r_Tm", "prod_length", "prod_Tm"]]
scaler = StandardScaler()
scaler.fit(inputs)
X_train_scaled = scaler.transform(inputs)
X_val_scaled = scaler.transform(val_df[["f_length", "f_indel","f_mm","r_length", "r_indel","r_mm","f_Tm", "r_Tm", "prod_length", "prod_Tm"]])
X_test_scaled = scaler.transform(test_df[["f_length", "f_indel","f_mm","r_length", "r_indel","r_mm","f_Tm", "r_Tm", "prod_length", "prod_Tm"]])

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.svm import LinearSVC
from sklearn.neighbors import KNeighborsClassifier
import matplotlib.pyplot as plt
import numpy as np
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import roc_auc_score, make_scorer
from sklearn.pipeline import Pipeline

# results_df = pd.DataFrame()

cutoff = 0 # thresholds we will use to classify as active or inactive
results_df = pd.DataFrame()
y_train = np.where(train_df['score'] > cutoff, 1, 0) # if it is greater or equal to the threshold, we call it 0, inactive
y_test = np.where(test_df['score'] > cutoff, 1, 0)
y_val = np.where(val_df['score'] > cutoff, 1, 0)


def get_performance(results_df, pipeline, grid, name):
    results = []
    # Use AUROC as the scoring metric
    scorer = make_scorer(roc_auc_score)

    # Grid search with 5-fold cross-validation
    grid = GridSearchCV(pipeline, param_grid, cv=5, scoring=scorer)
    grid.fit(X_train_scaled, y_train)

    # Output best model and AUROC
#     print(f"Best C: {grid.best_params_['logreg__C']}")
    print(f"Best AUROC: {grid.best_score_:.4f}")
    clf = grid.best_estimator_

#     mean_scores = grid.cv_results_['mean_test_score']
#     std_scores = grid.cv_results_['std_test_score']
    from sklearn.calibration import CalibratedClassifierCV
    clf.fit(X_train_scaled, y_train)
    y_pred = clf.predict(X_test_scaled)
    if name != "SVM":
        y_proba = clf.predict_proba(X_test_scaled)
        # ROC AUC
        roc = roc_auc_score(y_test, y_proba[:, 1])
        # ROC Curve
        fpr, tpr, thresholds = roc_curve(y_test, y_proba[:, 1])
        plt.plot(fpr, tpr)
    else:
        y_score = clf.decision_function(X_test_scaled)
        roc = roc_auc_score(y_test, y_score)

    # Store metrics
    results.append({
        'Cutoff': cutoff,
        'Model': name,
        'Regularization': grid.best_params_,
        'Accuracy': accuracy_score(y_test, y_pred),
        'Precision': precision_score(y_test, y_pred, average='weighted'),
        'Recall': recall_score(y_test, y_pred, average='weighted'),
        'F1 Score': f1_score(y_test, y_pred, average='weighted'),
        'ROC AUC': roc
    })

    return pd.DataFrame(results)

#l2    
pipeline = Pipeline([('logreg', LogisticRegression(max_iter=1000, penalty='l2', solver='liblinear'))])
param_grid = {'logreg__C': np.logspace(-5, 5, num=10)}
results_df = pd.concat([results_df, get_performance(results_df, pipeline, param_grid, "L2")])
#l1
pipeline = Pipeline([('logreg', LogisticRegression(max_iter=1000, penalty='l1', solver='liblinear'))])
param_grid = {'logreg__C': np.logspace(-5, 5, num=10)}
results_df = pd.concat([results_df, get_performance(results_df, pipeline, param_grid, "L1")])
#l1 + l2
pipeline = Pipeline([('logreg', LogisticRegression(max_iter=1000, penalty='elasticnet', solver='saga'))])
param_grid = {
    'logreg__C': np.logspace(-5, 5, num=10),
    'logreg__l1_ratio': np.linspace(0, 1, num=5)
}
results_df = pd.concat([results_df, get_performance(results_df, pipeline, param_grid, "Elastic Net")])
# random forest
pipeline = Pipeline([('RF', RandomForestClassifier())])
param_grid = {
    'RF__n_estimators': np.logspace(1, 3, num=5, dtype=int)
}
results_df = pd.concat([results_df, get_performance(results_df, pipeline, param_grid, "RF")])
# GB
pipeline = Pipeline([('GB', GradientBoostingClassifier())])
param_grid = {
    'GB__n_estimators': np.logspace(1, 3, num=5, dtype=int),
    'GB__learning_rate': np.logspace(-4, 1, num=5)
}
results_df = pd.concat([results_df, get_performance(results_df, pipeline, param_grid, "GB")])
# mlp
pipeline = Pipeline([('MLP', MLPClassifier(max_iter=1000))])
param_grid = {
    'MLP__hidden_layer_sizes': [(50),(100),(50, 100)],
    'MLP__alpha': np.logspace(-5, 0, num=10)
}
results_df = pd.concat([results_df, get_performance(results_df, pipeline, param_grid, "MLP")])
# svm
pipeline = Pipeline([('SVM', LinearSVC())])
param_grid = {
    'SVM__C': np.logspace(-3, 3, num=10)
}
results_df = pd.concat([results_df, get_performance(results_df, pipeline, param_grid, "SVM")])
# knn
pipeline = Pipeline([('KNN', KNeighborsClassifier())])
param_grid = {
    'KNN__n_neighbors': np.linspace(0, 20, num=5),
    'KNN__weights': ['uniform', 'distance']
}
results_df = pd.concat([results_df, get_performance(results_df, pipeline, param_grid, "KNN")])
display(results_df)


In [None]:
display(results_df)

# Sequence-based classification

In [None]:
class PCRDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.Tensor(np.array(X)) 
        self.y = torch.Tensor(np.array(y))
        self.len=len(self.X)

    def __getitem__(self, index):
        return self.X[index], self.y[index]

    def __len__(self):
        return self.len

In [None]:
MAXPLEN = max(train_df['f_penc'].apply(len).max(), train_df['r_penc'].apply(len).max())
print(MAXPLEN)

def one_hot_encode(seq, length=28):
    mapping = { 'A':[1, 0, 0, 0, 0],
                'T':[0, 1, 0, 0, 0],
                'C':[0, 0, 1, 0, 0],
                'G':[0, 0, 0, 1, 0],
                'N':[0, 0, 0, 0, 0],
                '-':[0, 0, 0, 0, 1] }
    seq = seq.ljust(length, 'N') # (6, ATCG) -> NNATCG
    return np.array([mapping[char.upper()] for char in seq])

def one_hot_encode_full_gap(df_seqs, maxl=1421):
    primer_encoded = []
    target_encoded = []
    for (tname,pname),row in df_seqs.iterrows():
        fseq, fst, rseq, rst, tseq = row[['f_seq','f_start','r_seq','r_start','target_seq']]
        fenc, ftenc, renc, rtenc = row[['f_penc','f_tenc','r_penc','r_tenc']]
        pseq = 'N'*fst + fenc + 'N'*(rst-(fst+len(fseq))) + renc + 'N'*(len(tseq)-(rst+len(rseq)))
        tseq = tseq[:fst] + ftenc + tseq[fst+len(fseq):rst] + rtenc + tseq[rst+len(rseq):]
        primer_encoded.append(one_hot_encode(pseq, maxl))
        target_encoded.append(one_hot_encode(tseq, maxl))
    final_encoded = np.append(np.array(target_encoded), np.array(primer_encoded), axis=2)
    print(final_encoded.shape)
    return torch.tensor(final_encoded, dtype=torch.float32)

def one_hot_encode_pbs_gap(df_seqs):
    primer_encoded = []
    target_encoded = []
    for (tname,pname),row in df_seqs.iterrows():
        fenc, ftenc, renc, rtenc = row[['f_penc','f_tenc','r_penc','r_tenc']].apply(one_hot_encode)
        prienc = np.append(fenc,renc,axis=0)
        tarenc = np.append(ftenc,rtenc,axis=0)
        primer_encoded.append(prienc)
        target_encoded.append(tarenc)
    primer_encoded = np.array(primer_encoded)
    target_encoded = np.array(target_encoded)
    final_encoded = np.append(target_encoded, primer_encoded, axis=2)
    print(final_encoded.shape)
    return torch.tensor(final_encoded, dtype=torch.float32)

# Class that is necessary to access Dataloaders and other Pytorch utilities.
class PCRDataset(Dataset):
    def __init__(self, encoded_input, ct_values):
        """
        encoded_input: consists of a tensor containing (8 x (len of target sequence) encoding of the sequence)
        the upper 4 rows are the one-hot encoding of the primer sequences, the lower 4 rows are the one-hot encoding of the target sequence
        custom_features: a tensor containing the computed thermodynamic features for each primer pair and target sequence
        ct_values:
        a tensor containing the ct values for each primer pair and target sequence

        """
        self.encoded_input = encoded_input
        self.ct_values = ct_values
    def __len__(self):
        return len(self.encoded_input)
    def __getitem__(self, idx):
        return self.encoded_input[idx], self.ct_values[idx]

In [None]:
### Change between these for either full-sequence or core sequence testing


# train_inps = one_hot_encode_sequences(train_df)
# val_inps = one_hot_encode_sequences(val_df)
# test_inps = one_hot_encode_sequences(test_df)

# train_inps = one_hot_encode_full_gap(train_df)
# val_inps = one_hot_encode_full_gap(val_df)
# test_inps = one_hot_encode_full_gap(test_df)
# # useful for cross-validation
# train_and_val_inps = torch.cat((train_inps,val_inps))

cutoff=0

train_labels = torch.tensor(np.where(train_df['score'] > cutoff, 1, 0))
val_labels = torch.tensor(np.where(val_df['score'] > cutoff, 1, 0))
test_labels = torch.tensor(np.where(test_df['score'] > cutoff, 1, 0))
train_val_labels = torch.cat((train_labels, val_labels))


# Create dataset objects
train_dataset = PCRDataset(train_inps, train_labels)
val_dataset = PCRDataset(val_inps, val_labels)
test_dataset = PCRDataset(test_inps, test_labels)
train_val_dataset = PCRDataset(train_and_val_inps, train_val_labels)


# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True) # can play around
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)
train_val_dataset = DataLoader(train_val_dataset, batch_size=64, shuffle=False) # dont want it interfering with cross-val

# Model Definitions

In [None]:
import numpy as np
import pandas as pd
import os
import math
import matplotlib.pyplot as plt
import seaborn as sns

from einops import rearrange, repeat
from scipy.stats import spearmanr
from sklearn.metrics import r2_score
from sklearn.preprocessing import StandardScaler
from joblib import dump, load
from tqdm.auto import tqdm
from Bio import Seq

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torch.utils.data import TensorDataset, Subset
from sklearn.model_selection import KFold
torch.manual_seed(42)
device = torch.device('cuda' if torch.cuda.is_available() else "cpu")
device

In [None]:
# modified output layer, loss to be used for classification

class PGC(nn.Module):
    def __init__(self,d_model,expansion_factor = 1.0,dropout = 0.0):
        super().__init__()
        self.d_model = d_model
        self.expansion_factor = expansion_factor
        self.dropout = dropout
        self.conv = nn.Conv1d(int(d_model * expansion_factor), int(d_model * expansion_factor),
                              kernel_size=3, padding=1, groups=int(d_model * expansion_factor))
        self.in_proj = nn.Linear(d_model, int(d_model * expansion_factor * 2))
        self.out_norm = nn.RMSNorm(int(d_model), eps=1e-8)
        self.in_norm = nn.RMSNorm(int(d_model * expansion_factor * 2), eps=1e-8)
        self.out_proj = nn.Linear(int(d_model * expansion_factor), d_model)
        self.dropout = nn.Dropout(dropout)
    def forward(self, u):
        xv = self.in_norm(self.in_proj(u))
        x,v = xv.chunk(2,dim=-1)
        x_conv = self.conv(x.transpose(-1,-2)).transpose(-1,-2)
        gate =  v * x_conv
        x = self.out_norm(self.out_proj(gate))
        return x
    
class DropoutNd(nn.Module):
    def __init__(self, p: float = 0.5, tie=True, transposed=True):
        """
        tie: tie dropout mask across sequence lengths (Dropout1d/2d/3d)
        """
        super().__init__()
        if p < 0 or p >= 1:
            raise ValueError("dropout probability has to be in [0, 1), " "but got {}".format(p))
        self.p = p
        self.tie = tie
        self.transposed = transposed
        self.binomial = torch.distributions.binomial.Binomial(probs=1-self.p)

    def forward(self, X):
        """X: (batch, dim, lengths...)."""
        if self.training:
            if not self.transposed: X = rearrange(X, 'b ... d -> b d ...')
            # binomial = torch.distributions.binomial.Binomial(probs=1-self.p) 
            # This is incredibly slow because of CPU -> GPU copying
            mask_shape = X.shape[:2] + (1,)*(X.ndim-2) if self.tie else X.shape
            # mask = self.binomial.sample(mask_shape)
            mask = torch.rand(*mask_shape, device=X.device) < 1.-self.p
            X = X * mask * (1.0/(1-self.p))
            if not self.transposed: X = rearrange(X, 'b d ... -> b ... d')
            return X
        return X

class S4DKernel(nn.Module):
    """Generate convolution kernel from diagonal SSM parameters."""

    def __init__(self, d_model, N=64, dt_min=0.001, dt_max=0.1, lr=None):
        super().__init__()
        # Generate dt
        H = d_model
        log_dt = torch.rand(H) * (
            math.log(dt_max) - math.log(dt_min)
        ) + math.log(dt_min)

        C = torch.randn(H, N // 2, dtype=torch.cfloat)
        self.C = nn.Parameter(torch.view_as_real(C))
        self.register("log_dt", log_dt, lr)

        log_A_real = torch.log(0.5 * torch.ones(H, N//2))
        A_imag = math.pi * repeat(torch.arange(N//2), 'n -> h n', h=H)
        self.register("log_A_real", log_A_real, lr)
        self.register("A_imag", A_imag, lr)

    def forward(self, L):
        """
        returns: (..., c, L) where c is number of channels (default 1)
        """

        # Materialize parameters
        dt = torch.exp(self.log_dt) # (H)
        C = torch.view_as_complex(self.C) # (H N)
        A = -torch.exp(self.log_A_real) + 1j * self.A_imag # (H N)

        # Vandermonde multiplication
        dtA = A * dt.unsqueeze(-1)  # (H N)
        K = dtA.unsqueeze(-1) * torch.arange(L, device=A.device) # (H N L)
        C = C * (torch.exp(dtA)-1.) / A
        K = 2 * torch.einsum('hn, hnl -> hl', C, torch.exp(K)).real

        return K

    def register(self, name, tensor, lr=None):
        """Register a tensor with a configurable learning rate and 0 weight decay"""

        if lr == 0.0:
            self.register_buffer(name, tensor)
        else:
            self.register_parameter(name, nn.Parameter(tensor))

            optim = {"weight_decay": 0.0}
            if lr is not None: optim["lr"] = lr
            setattr(getattr(self, name), "_optim", optim)


class S4D(nn.Module):
    def __init__(self, d_model, d_state=64, dropout=0.0, transposed=True, **kernel_args):
        super().__init__()

        self.h = d_model
        self.n = d_state
        self.d_output = self.h
        self.transposed = transposed

        self.D = nn.Parameter(torch.randn(self.h))
        # SSM Kernel
        self.kernel = S4DKernel(self.h, N=self.n, **kernel_args)
        # Pointwise
        self.activation = nn.GELU()
        dropout_fn = DropoutNd
        self.dropout = dropout_fn(dropout) if dropout > 0.0 else nn.Identity()

        # position-wise output transform to mix features
        self.output_linear = nn.Sequential(
            nn.Conv1d(self.h, 2*self.h, kernel_size=1),
            nn.GLU(dim=-2),
        )

    def forward(self, u, **kwargs): # absorbs return_output and transformer src mask
        """ Input and output shape (B, H, L) """
        if not self.transposed: u = u.transpose(-1, -2)
        L = u.size(-1)
        # Compute SSM Kernel
        k = self.kernel(L=L) # (H L)

        # Convolution
        k_f = torch.fft.rfft(k, n=2*L)  # (H L)
        u_f = torch.fft.rfft(u, n=2*L) # (B H L)
        y = torch.fft.irfft(u_f*k_f, n=2*L)[..., :L] # (B H L)

        # Compute D term in state space equation - essentially a skip connection
        y = y + u * self.D.unsqueeze(-1)

        y = self.dropout(self.activation(y))
        y = self.output_linear(y)
        if not self.transposed: y = y.transpose(-1, -2)
        return y
    
class Janus(nn.Module):
    def __init__(self, d_input, d_output, d_model, d_state=64, dropout=0.2, transposed=False, **kernel_args):
        super().__init__()
        self.encoder = nn.Linear(d_input, d_model)
        self.pgc1 = PGC(d_model, expansion_factor=0.25, dropout=dropout)
        self.pgc2 = PGC(d_model, expansion_factor=2, dropout=dropout)
        self.s4d = S4D(d_model, d_state=d_state, dropout=dropout, transposed=transposed, **kernel_args)
        self.norm = nn.RMSNorm(d_model)
        self.decoder = nn.Linear(d_model, d_output)
        self.output = nn.Sigmoid()
        self.dropout = nn.Dropout(dropout)

    def forward(self, u):
        x = self.encoder(u)
        x = self.pgc1(x)
        x = self.pgc2(x)
        z = x
        z = self.norm(z)
        x = self.dropout(self.s4d(z)) + x
        x = x.mean(dim=1)
        #x = self.dropout(x)
        x = self.decoder(x)
        x = self.output(x)
        return x

In [None]:
from sklearn.metrics import confusion_matrix


def train_loop(model, lr):
    metrics = []
    num_epochs = 50  # Adjust as needed
    criterion = nn.BCELoss()
    
    # Train Janus model
    optimizer = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=0.01) #lr = 0.001
    best_cross_loss = float('inf')
    best_r2 = float('-inf') 
    best_model_state = None
    
    # setup the k-fold cross validation
    k_folds = 5
    kfold = KFold(n_splits=k_folds, shuffle=True, random_state=42)

    trues, preds, tprs, fprs = {}, {}, {}, {}

    for epoch in range(num_epochs):
        model.train()
        train_loss = 0
        train_true = []
        train_pred = []

        for inputs, labels in tqdm(train_loader, desc=f'Epoch {epoch+1}/{num_epochs} - Training'):
            inputs, labels = inputs.to(device).float(), labels.to(device).float()
            outputs = model(inputs)
            loss = criterion(outputs.squeeze(), labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            train_loss += loss.item()
            train_true.append(labels.detach().cpu().numpy())
            outputs = (outputs >= 0.5).int()
            train_pred.append(outputs.squeeze().detach().cpu().numpy())

        # Evaluate on cross-validation set
        model.eval()
        cross_loss = 0
        cross_true = []
        cross_pred = []

        with torch.no_grad():

            for inputs, labels in tqdm(val_loader, desc=f'Epoch {epoch+1}/{num_epochs} - Cross-validation'):
                inputs, labels = inputs.to(device).float(), labels.to(device).float()
                outputs = model(inputs)
                loss = criterion(outputs.squeeze(), labels)
                cross_loss += loss.item()
                cross_true.append(labels.detach().cpu().numpy())
                outputs = (outputs >= 0.5).int()
                cross_pred.append(outputs.squeeze().detach().cpu().numpy())

            cross_true = np.concatenate(cross_true)
            cross_pred = np.concatenate(cross_pred)

        # Flatten the collected predictions and true labels
        all_true = np.concatenate([cross_true])
        all_pred = np.concatenate([cross_pred])

        # Compute confusion matrix: [[TN, FP], [FN, TP]]
        tn, fp, fn, tp = confusion_matrix(all_true, all_pred).ravel()

        # Compute TPR and FPR
        tpr = tp / (tp + fn) if (tp + fn) > 0 else 0.0
        fpr = fp / (fp + tn) if (fp + tn) > 0 else 0.0

        # Print statistics
        print(f"\nEpoch {epoch+1} Cross-validation Statistics:")
        print(f"Cross-validation Loss: {cross_loss/len(val_loader):.4f}")
        print(f"TPR (Recall): {tpr:.4f}")
        print(f"FPR: {fpr:.4f}")

        # Save metrics
        trues[epoch] = cross_true
        preds[epoch] = cross_pred
        tprs[epoch] = tpr
        fprs[epoch] = fpr

#         Save the best model
#         if tpr > best_tpr:  
#             best_tpr = tpr
#             best_model_state = model.state_dict()

#         if cross_loss < best_cross_loss:
#             best_cross_loss = cross_loss
    
    plt.title(f"lr = {lr}")
    plt.plot(list(range(num_epochs)), list(tprs.values()))
    plt.plot(list(range(num_epochs)), list(fprs.values()))
    plt.legend(["TPR", "FPR"])
    plt.show()
    
    
    

In [None]:

def train_kfold(model_class, 
                dataset,
                inps, labels,
                device,
                lr       = 1e-3,
                model_args=None,
                k_folds  = 5,
                num_epochs = 50,
                batch_size = 64):
    
    kfold = KFold(n_splits=k_folds, shuffle=True, random_state=42)
    
    fold_tpr, fold_fpr = [], []
    best_model_state, best_tpr = None, -np.inf
    
    for fold, (train_idx, val_idx) in enumerate(kfold.split(X=inps, y=labels)):
        print(f"\n=== Fold {fold+1}/{k_folds} ===")
        
        train_loader = DataLoader(Subset(dataset, train_idx),
                                  batch_size=batch_size, shuffle=True)
        val_loader   = DataLoader(Subset(dataset, val_idx),
                                  batch_size=batch_size, shuffle=False)
        
        # Fresh model & optimiser
        model = model_class(*model_args).to(device)
        optimizer = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=0.01)
        criterion = nn.BCELoss()
        
        # Per-epoch metric history for this fold
        tpr_hist, fpr_hist = [], []
        
        for epoch in range(num_epochs):
            model.train()
            for inputs, labels in tqdm(train_loader,
                                       desc=f"Fold {fold+1} | Epoch {epoch+1}/{num_epochs} - train",
                                       leave=False):
                inputs  = inputs.to(device).float()
                labels  = labels.to(device).float()
                outputs = model(inputs)
                loss    = criterion(outputs.squeeze(), labels)
                
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
            
            model.eval()
            cross_true, cross_pred = [], []
            with torch.no_grad():
                for inputs, labels in tqdm(val_loader,
                                           desc=f"Fold {fold+1} | Epoch {epoch+1}/{num_epochs} - val",
                                           leave=False):
                    inputs  = inputs.to(device).float()
                    labels  = labels.to(device).float()
                    outputs = model(inputs)
                    preds   = (outputs >= 0.5).int()
                    
                    cross_true.append(labels.cpu().numpy().ravel())
                    cross_pred.append(preds.cpu().numpy().ravel())
            
            # Flatten for confusion-matrix
            y_true = np.concatenate(cross_true)
            y_pred = np.concatenate(cross_pred)
            tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
            tpr = tp / (tp + fn) if (tp + fn) else 0.0
            fpr = fp / (fp + tn) if (fp + tn) else 0.0
            tpr_hist.append(tpr)
            fpr_hist.append(fpr)
            
            print(f"  Epoch {epoch+1}: TPR={tpr:.4f}, FPR={fpr:.4f}")
            
            # Keep best model across ALL folds
            if tpr > best_tpr:
                best_tpr = tpr
                best_model_state = model.state_dict()
        
        fold_tpr.append(tpr_hist[-1])
        fold_fpr.append(fpr_hist[-1])
        
        plt.figure(figsize=(5,3))
        plt.plot(range(1, num_epochs+1), tpr_hist, label="TPR")
        plt.plot(range(1, num_epochs+1), fpr_hist, label="FPR")
        plt.title(f"Fold {fold+1} | lr={lr}")
        plt.xlabel("Epoch")
        plt.legend(); plt.tight_layout(); plt.show()
    
    print("Cross-validation summary")
    print(f"Mean  TPR: {np.mean(fold_tpr):.4f}  ± {np.std(fold_tpr):.4f}")
    print(f"Mean  FPR: {np.mean(fold_fpr):.4f}  ± {np.std(fold_fpr):.4f}")
    

        
    return best_model_state

In [None]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score


def evaluate_auroc(model, data_loader, device, results_df=None, model_name="Model"):
    model.eval()
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for inputs, labels in data_loader:
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)

            preds = outputs.squeeze().cpu().numpy() # probability values for each in [0,1]

            all_preds.extend(preds)
            all_labels.extend(labels.cpu().numpy())
            
    output = (np.array(all_preds) >= 0.5).astype(int) # thresholding to convert to 0s and 1s
    # Compute metrics
    accuracy = accuracy_score(all_labels, output)
    precision = precision_score(all_labels, output, zero_division=0)
    recall = recall_score(all_labels, output, zero_division=0)
    f1 = f1_score(all_labels, output, zero_division=0)
    try:
        auroc = roc_auc_score(all_labels, all_preds) # the only one we actually want preds from
    except ValueError:
        auroc = float('nan')  # in case only one class in y_true

    # Prepare results
    result_row = pd.DataFrame([{
        'Model': model_name,
        'Accuracy': accuracy,
        'Precision': precision,
        'Recall': recall,
        'F1': f1,
        'AUROC': auroc
    }])

    # Append to existing DataFrame
    if results_df is None:
        results_df = result_row
    else:
        results_df = pd.concat([results_df, result_row], ignore_index=True)

    return all_labels, all_preds, results_df

def plot_auroc(labels, probs, threshold=0.5):
    fpr, tpr, thresholds = roc_curve(labels, probs)
    auc = roc_auc_score(labels, probs)

    # Find the closest threshold index to 0.5
    idx = np.argmin(np.abs(thresholds - threshold))

    plt.figure()
    plt.plot(fpr, tpr, label=f'AUROC = {auc:.4f}')
    plt.plot([0, 1], [0, 1], 'k--')  # random guess
    plt.scatter(fpr[idx], tpr[idx], color='red', label=f'Threshold = {threshold}')
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('AUROC Curve')
    plt.legend(loc='lower right')
    plt.grid(True)
    plt.show()


In [None]:
janus_inp_dim = 8
janus_out_dim = 1
janus_mod_dims = [8,16,32,64,128]
janus_lr = [10**(-4),10**(-3)]
janus_weight_decay = [0, 0.01]

torch_results_df = pd.DataFrame()

for janus_mod_dim in janus_mod_dims:
    for lr in janus_lr:
        model_state = train_kfold(Janus, train_val_dataset.dataset, train_and_val_inps, train_val_labels, device, lr, model_args=(janus_inp_dim, janus_out_dim, janus_mod_dim))
        model = Janus(janus_inp_dim, janus_out_dim, janus_mod_dim)
        model.load_state_dict(model_state)
        model.to(device)
        labels, preds, torch_results_df = evaluate_auroc(model, test_loader, device, torch_results_df, model_name=f"Janus Dim:{janus_mod_dim}, LR:{lr}")
        plot_auroc(labels, preds)
        
display(torch_results_df)

In [None]:
torch_results_df.to_csv('0820_fullseq_lyra_classifications.csv')

In [None]:
class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size, bidirectional=False):
        super(LSTMModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.bidirectional = bidirectional
        self.num_directions = 2 if bidirectional else 1

        self.lstm = nn.LSTM(
            input_size,
            hidden_size,
            num_layers,
            batch_first=True,
            bidirectional=bidirectional
        )

        self.fc = nn.Linear(hidden_size * self.num_directions, output_size)
        self.sigout = nn.Sigmoid()

    def forward(self, x):
        device = x.device

        h0 = torch.zeros(self.num_layers * self.num_directions, x.size(0), self.hidden_size).to(device)
        c0 = torch.zeros(self.num_layers * self.num_directions, x.size(0), self.hidden_size).to(device)

        out, _ = self.lstm(x, (h0, c0))  # out: (batch_size, seq_len, hidden_size * num_directions)

        out = self.fc(out[:, -1, :])
        out = self.sigout(out)
        return out

In [None]:
# Hyperparameters
input_size = 8      # Number of input features
hidden_sizes = np.
num_layers_search = [1,5,10]       # Number of LSTM layers
bidirectional = [False, True]
output_size = 1      # Output size

lstm_results_df = pd.DataFrame()

for hidden_size in hidden_sizes:
    for num_layers in num_layers_search:
            for direction in bidirectional:
                model_state = train_kfold(LSTMModel, train_val_dataset.dataset, train_and_val_inps, train_val_labels, device, lr, model_args=(input_size, hidden_size, num_layers, output_size, direction))
                model = LSTMModel(input_size, hidden_size, num_layers, output_size, direction)
                model.load_state_dict(model_state)
                model.to(device)
                labels, preds, lstm_results_df = evaluate_auroc(model, test_loader, device, torch_results_df, model_name=f"LSTM Hidden Units:{hidden_size}, Num Layers:{num_layers}, Bidirectional:{direction}")
                plot_auroc(labels, preds)

            
display(lstm_results_df)

In [None]:
class SimpleTransformer(nn.Module):
    def __init__(self, input_dim, model_dim, num_heads, output_dim=2, num_layers=1, dropout=0.1, max_len=1500):
        super(SimpleTransformer, self).__init__()
        self.embedding = nn.Linear(input_dim, model_dim)
        self.positional_encoding = self._generate_positional_encoding(model_dim, max_len)

        encoder_layer = nn.TransformerEncoderLayer(
            d_model=model_dim, nhead=num_heads,
            dropout=dropout, batch_first=True
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.output_layer = nn.Linear(model_dim, output_dim)  

    def _generate_positional_encoding(self, d_model, max_len):
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-np.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        return pe  

    def forward(self, x):
        x = self.embedding(x)
        x = x + self.positional_encoding[:, :x.size(1)].to(x.device)
        x = self.transformer(x)
        x = x.mean(dim=1) 
        return self.output_layer(x) 


In [None]:
from sklearn.metrics import confusion_matrix

def train_loop_tf(model, lr):
    metrics = []
    num_epochs = 50
    criterion = nn.CrossEntropyLoss()

    optimizer = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=0.01)
    best_cross_loss = float('inf')

    trues, preds, tprs, fprs = {}, {}, {}, {}

    for epoch in range(num_epochs):
        model.train()
        train_loss = 0
        train_true = []
        train_pred = []

        for inputs, labels in tqdm(train_loader, desc=f'Epoch {epoch+1}/{num_epochs} - Training'):
            inputs = inputs.to(device).float()
            labels = labels.to(device).long()  # class indices: 0 or 1

            outputs = model(inputs)  # [batch, 2]
            loss = criterion(outputs, labels)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            train_loss += loss.item()
            train_true.append(labels.detach().cpu().numpy())
            pred_labels = torch.argmax(outputs, dim=1)
            train_pred.append(pred_labels.detach().cpu().numpy())

        model.eval()
        cross_loss = 0
        cross_true = []
        cross_pred = []

        with torch.no_grad():
            for inputs, labels in tqdm(val_loader, desc=f'Epoch {epoch+1}/{num_epochs} - Cross-validation'):
                inputs = inputs.to(device).float()
                labels = labels.to(device).long()

                outputs = model(inputs)
                loss = criterion(outputs, labels)
                cross_loss += loss.item()

                cross_true.append(labels.detach().cpu().numpy())
                pred_labels = torch.argmax(outputs, dim=1)
                cross_pred.append(pred_labels.detach().cpu().numpy())

        cross_true = np.concatenate(cross_true)
        cross_pred = np.concatenate(cross_pred)

        tn, fp, fn, tp = confusion_matrix(cross_true, cross_pred).ravel()

        tpr = tp / (tp + fn) if (tp + fn) > 0 else 0.0
        fpr = fp / (fp + tn) if (fp + tn) > 0 else 0.0

        print(f"\nEpoch {epoch+1} Cross-validation Statistics:")
        print(f"Cross-validation Loss: {cross_loss/len(val_loader):.4f}")
        print(f"TPR (Recall): {tpr:.4f}")
        print(f"FPR: {fpr:.4f}")

        trues[epoch] = cross_true
        preds[epoch] = cross_pred
        tprs[epoch] = tpr
        fprs[epoch] = fpr

    plt.title(f"lr = {lr}")
    plt.plot(list(range(num_epochs)), list(tprs.values()))
    plt.plot(list(range(num_epochs)), list(fprs.values()))
    plt.legend(["TPR", "FPR"])
    plt.show()


In [None]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, roc_curve
import pandas as pd

def evaluate_auroc_tf(model, data_loader, device, results_df=None, model_name="Model"):
    model.eval()
    all_probs = []   # soft predictions
    all_preds = []   # hard predictions
    all_labels = []

    with torch.no_grad():
        for inputs, labels in data_loader:
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            probs = torch.softmax(outputs, dim=1)[:, 1].cpu().numpy()  
            preds = (probs >= 0.5).astype(int)

            all_probs.extend(probs)
            all_preds.extend(preds)
            all_labels.extend(labels.cpu().numpy())

    # Compute metrics using thresholded predictions
    accuracy = accuracy_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds, zero_division=0)
    recall = recall_score(all_labels, all_preds, zero_division=0)
    f1 = f1_score(all_labels, all_preds, zero_division=0)

    try:
        auroc = roc_auc_score(all_labels, all_probs)
    except ValueError:
        auroc = float('nan')  # if only one class present

    result_row = pd.DataFrame([{
        'Model': model_name,
        'Accuracy': accuracy,
        'Precision': precision,
        'Recall': recall,
        'F1': f1,
        'AUROC': auroc
    }])

    if results_df is None:
        results_df = result_row
    else:
        results_df = pd.concat([results_df, result_row], ignore_index=True)

    return all_labels, all_probs, results_df


In [None]:
# Parameters
vocab_size = 10
model_dims = [64]
num_heads_list = [2, 4, 8, 16, 32]
num_layers_list = [2, 4, 8, 16]
num_classes = 2  # e.g., binary classification
max_seq_len = 1421
# max_seq_len = 56 # change between either full or core-seq representations

tf_df = pd.DataFrame()

for model_dim in model_dims:
    for num_heads in num_heads_list:
        for num_layers in num_layers_list:
            model = SimpleTransformer(input_dim=vocab_size, model_dim=model_dim, num_heads=num_heads, output_dim=num_classes,
                                     num_layers=num_layers, max_len=max_seq_len)
            model.to(device)
            train_loop_tf(model, lr=lr)
            labels, preds, tf_df = evaluate_auroc_tf(model, test_loader, device, tf_df, model_name=f"Transformer, model dim:{model_dim}, num_heads:{num_heads}, num_layers:{num_layers}")
            plot_auroc(labels, preds)

display(tf_df)

In [None]:
tf_df.to_csv("0820_fullseq_transformer_classifications.csv")

In [None]:
# adapted from Green et al. 2022

class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()

        # input: (batch_size, 8, 56)

        self.conv1 = nn.Conv1d(in_channels=10, out_channels=64, kernel_size=4)
        self.conv2 = nn.Conv1d(64, 64, kernel_size=12)
        self.pool1 = nn.MaxPool1d(kernel_size=3)

        self.conv3 = nn.Conv1d(64, 32, kernel_size=3)
        self.conv4 = nn.Conv1d(32, 32, kernel_size=3)
        self.pool2 = nn.MaxPool1d(kernel_size=3)

        self.flatten = nn.Flatten()

        # flatten out linear input size
        dummy = torch.zeros(1, 1421, 10)
        with torch.no_grad():
            dummy_out = self._forward_conv(dummy.permute(0, 2, 1))
            flat_size = dummy_out.numel()

        self.dense1 = nn.Linear(flat_size, 256)
        self.dense2 = nn.Linear(256, 256)
        self.output = nn.Linear(256, 1)
        self.sigmoid = nn.Sigmoid()

    def _forward_conv(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = self.pool1(x)
        x = F.relu(self.conv3(x))
        x = F.relu(self.conv4(x))
        x = self.pool2(x)
        return x

    def forward(self, x):
        # x: (batch_size, 56, 8) to (batch_size, 8, 56)
        x = x.permute(0, 2, 1)
        x = self._forward_conv(x)
        x = self.flatten(x)
        x = F.relu(self.dense1(x))
        x = F.relu(self.dense2(x))
        return self.sigmoid(self.output(x))


In [None]:
model = CNN()
model.to(device)
train_loop(model, lr=0.00005)
labels, preds, _ = evaluate_auroc(model, test_loader, device, pd.DataFrame(), model_name=f"fff")
plot_auroc(labels, preds)
display(_)