In [None]:
import pandas as pd
import numpy as np
from itertools import combinations
from tqdm import tqdm
import math
import joblib  
from sklearn.model_selection import cross_val_score, StratifiedKFold, RandomizedSearchCV
from sklearn.metrics import roc_curve, roc_auc_score, make_scorer
from sklearn.neighbors import KNeighborsClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from xgboost import XGBClassifier
from sklearn.ensemble import RandomForestClassifier, StackingClassifier, VotingClassifier
from sklearn.neural_network import MLPClassifier
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.base import BaseEstimator, ClassifierMixin
from scipy.stats import loguniform, uniform
from minepy import MINE  
from sklearn.ensemble import GradientBoostingClassifier
from skrebate import ReliefF 
import matplotlib.pyplot as plt
import matplotlib as mpl

In [None]:
tqdm.pandas()
mpl.rcParams['pdf.fonttype'] = 42 
plt.rcParams['font.family'] = 'Arial'
plt.rcParams['font.size'] = 12
plt.rcParams['text.color'] = 'black'
torch.manual_seed(42)

# Gene-pair modeling

### 1. Gene-pair feature extraction

#### 1.1 Data import and matrix extraction

In [None]:
data = pd.read_csv("./data/01_train_df.csv", index_col=0)

pos_set = data.filter(like='_PTB_')
neg_set = data.filter(like='_nonPTB_')

#### 1.2 Sample size calculation and gene-pair matrix initialization

In [None]:
pos_nums = pos_set.shape[1]
neg_nums = neg_set.shape[1]

row_names = pos_set.index.tolist()
combinations_list = list(combinations(row_names, 2))
Gene_pairs = pd.DataFrame(combinations_list, columns=['Small_gene', 'Big_gene'])

#### 1.3 Compute GeneA < GeneB ratios for positive and negative groups

In [None]:
def neg_calculate_counts(df):
    diff = df - neg_set
    neg_value = diff < 0
    counts = neg_value.sum(axis=1)
    
    return counts
    
def pos_calculate_counts(df):
    diff = df - pos_set
    neg_value = diff < 0
    counts = neg_value.sum(axis=1)
    
    return counts

In [None]:
counts = neg_set.progress_apply(neg_calculate_counts, axis=1)
rows, cols = np.triu_indices(counts.shape[0], k=1)
neg_counts = counts.values[rows, cols]
neg_ratios = neg_counts / neg_nums
Gene_pairs = Gene_pairs.assign(Neg_ratio=neg_ratios)

In [None]:
counts = pos_set.progress_apply(pos_calculate_counts, axis=1)
rows, cols = np.triu_indices(counts.shape[0], k=1)
pos_counts = counts.values[rows, cols]
pos_ratios = pos_counts / pos_nums
Gene_pairs = Gene_pairs.assign(Pos_ratio=pos_ratios)

#### 1.4 Calculate differences, standardize gene order by swapping negative pairs, sort by absolute difference

In [None]:
Gene_pairs['diff_ratio'] = Gene_pairs['Neg_ratio'] - Gene_pairs['Pos_ratio']

neg_rows = Gene_pairs['diff_ratio'] < 0
Gene_pairs.loc[neg_rows, ['Small_gene', 'Big_gene']] = Gene_pairs.loc[neg_rows, ['Big_gene', 'Small_gene']].values

Gene_pairs['diff_ratio'] = Gene_pairs['diff_ratio'].abs()
Gene_pairs_sorted = Gene_pairs.sort_values(by='diff_ratio', ascending=False)
Gene_pairs_sorted.drop(columns=['Neg_ratio', 'Pos_ratio'], inplace=True)

#### 1.5 Threshold-based gene-pair selection

In [None]:
Gene_pairs = Gene_pairs_sorted[Gene_pairs_sorted['diff_ratio'] > 0.65]
Gene_pairs.to_csv("./result/01_gene_pairs.csv", index=False)

### 2. Model selection

#### 2.1 Feature encoding

In [None]:
def feature_coding(df, feature_df, label, output_path):
    encoding_data_index = feature_df.apply(lambda x: x.iloc[0] + "|" + x.iloc[1], axis=1)
    encoding_data = pd.DataFrame(index=df.columns, columns=encoding_data_index)

    for gene_name in encoding_data_index:
        gene_name1, gene_name2 = gene_name.split("|")
        row1_data = df.loc[gene_name1]
        row2_data = df.loc[gene_name2]
        comparison_result = (row1_data < row2_data).astype(int) - (row1_data > row2_data).astype(int)
        encoding_data[gene_name] = comparison_result
    
    encoding_data['Label'] = np.where(encoding_data.index.str.contains(label), 1, 0)

    encoding_data.to_csv(output_path)
    return encoding_data

In [None]:
df = pd.read_csv("./data/01_train_df.csv", index_col=0)
feature_df = pd.read_csv("./result/01_gene_pairs.csv")
label = '_PTB_'
output_path = "./result/02_feature_coding_data.csv"
feature_coding(df, feature_df, label, output_path)

#### 2.2 Model training and performance evaluation

In [None]:
def model_selection(data, length, n_splits, selected_models, selected_base_models, output_path):    
    length = math.ceil(length / 2)
    
    X = data.iloc[:, :-1].values
    y = data['Label'].values
    
    models = {
        "KNN": KNeighborsClassifier(),
        "Naive Bayes": GaussianNB(),
        "LR": LogisticRegression(random_state=42),
        "SVM": SVC(probability=True, random_state=42),
        "XGBoost": XGBClassifier(use_label_encoder=False, random_state=42),
        "RF": RandomForestClassifier(random_state=42),
        "MLP": MLPClassifier(random_state=42),
        "CNN": PyTorchClassifier(input_dim=1, length=length, epochs=10, batch_size=16),
    }
    
    if selected_base_models is None:
        selected_base_models = list(models.keys())  

    base_estimators = [(name, models[name]) for name in selected_base_models if name in models]
    models["Stacking"] = StackingClassifier(estimators=base_estimators, final_estimator=LogisticRegression(random_state=42))
    models["Soft Voting"] = VotingClassifier(estimators=base_estimators, voting='soft')
    
    cv = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)
    scoring = make_scorer(roc_auc_score, needs_proba=True)
    
    auc_scores = []
    if selected_models is None:
        for model_name, model in models.items():
            try:
                auc = cross_val_score(model, X, y, cv=cv, scoring=scoring).mean()
                auc_scores.append({"Model": model_name, "AUC": auc})
            except Exception as e:
                print(f"Error training model {model_name}: {e}")
        auc_scores = pd.DataFrame(auc_scores)
        auc_scores = auc_scores.sort_values(by="AUC", ascending=True)

        auc_scores.to_csv(output_path, index=False)
        return auc_scores

    if isinstance(selected_models, str):
        selected_models = [selected_models]
    
    selected_results = []
    for model_name in selected_models:
        if model_name in models:
            model = models[model_name]
            try:
                auc = cross_val_score(model, X, y, cv=cv, scoring=scoring).mean()
                selected_results.append({"Model": model_name, "AUC": auc})
            except Exception as e:
                selected_results.append({"Model": model_name, "Error": str(e)})
        else:
            selected_results.append({"Model": model_name, "Error": "Model not found"})
    selected_results = pd.DataFrame(selected_results)
    selected_results = selected_results.sort_values(by="AUC", ascending=True)
    
    selected_results.to_csv(output_path, index=False)
    return selected_results

class CNNModel(nn.Module):
    def __init__(self, input_dim, length):
        super(CNNModel, self).__init__()

        self.conv1 = nn.Sequential(         
            nn.Conv1d(                        
                in_channels = input_dim,
                out_channels= input_dim * 32, 
                kernel_size = 3,
                stride = 1,                   
                padding = 1
            ),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size = 2, ceil_mode=True) 
        )
        
        self.out = nn.Linear(input_dim*32*length, 1)     

        self.sigmoid = nn.Sigmoid()
        
    def forward(self, x):
        x = self.conv1(x)         
        x = x.view(x.size(0), -1)  
        x = self.out(x)           
        x = self.sigmoid(x)
        
        return x

class PyTorchClassifier(BaseEstimator, ClassifierMixin):
    def __init__(self, input_dim, length, epochs, batch_size):
        self.input_dim = input_dim
        self.length = length
        self.epochs = epochs
        self.batch_size = batch_size
        self.model = CNNModel(input_dim, length)
        self.optimizer = optim.Adam(self.model.parameters())
        self.criterion = nn.BCELoss()

    def fit(self, X, y):
        X_tensor = torch.tensor(X, dtype=torch.float32).unsqueeze(1)
        y_tensor = torch.tensor(y, dtype=torch.float32).view(-1, 1)
        dataset = TensorDataset(X_tensor, y_tensor)
        loader = DataLoader(dataset, batch_size=self.batch_size, shuffle=True)

        for epoch in range(self.epochs):
            self.model.train()
            for data, targets in loader:
                self.optimizer.zero_grad()
                outputs = self.model(data)
                loss = self.criterion(outputs, targets)
                loss.backward()
                self.optimizer.step()

        self.classes_ = torch.unique(torch.tensor(y)).numpy()

        return self

    def predict_proba(self, X):
        self.model.eval()
        X_tensor = torch.tensor(X, dtype=torch.float32).unsqueeze(1)
        with torch.no_grad():
            outputs = self.model(X_tensor)
        return torch.cat((1 - outputs, outputs), dim=1).numpy()

In [None]:
data = pd.read_csv("./result/02_feature_coding_data.csv", index_col=0)
length = 7
n_splits = 5
selected_models = ['KNN', 'Naive Bayes', 'LR', 'SVM', 'XGBoost', 'RF', 'Soft Voting', 'Stacking', 'MLP', 'CNN']
output_path = './result/03_model_select.csv'
selected_base_models = ['Naive Bayes', 'LR', 'SVM', 'XGBoost', 'RF', 'KNN']
model_selection(data, length, n_splits, selected_models, selected_base_models, output_path)

### 3. Feature selection

#### 3.1 Feature importance ranking

In [None]:
def feature_importance_rank(data, methods, output_path):
    # MIC
    def MIC_score(X, y):
        mine = MINE(alpha=0.6, c=15)
        mine.compute_score(X, y)
        return mine.mic()
    
    def MIC_feature_importance_rank(df):
        feature_list = []
        MIC_list = []
        selected_columns = df.columns[:-1]
        y = df.iloc[:, -1]
        
        for column_name in selected_columns:
            MIC = MIC_score(df[column_name], y)
            feature_list.append(column_name)
            MIC_list.append(MIC)
            
        MIC_order = pd.DataFrame({'MIC':feature_list, 'MIC_score':MIC_list})
        MIC_order = MIC_order.sort_values(by="MIC_score", ascending=False).reset_index(drop=True)
        MIC_order = MIC_order.iloc[:, :1]
        
        return MIC_order
    
    # GBDT
    def GBDT_feature_importance_rank(df):
        X = df.iloc[:, :-1]
        y = df.iloc[:, -1]
        GBDT = GradientBoostingClassifier(random_state=42)
        GBDT.fit(X, y)
        GBDT_importances = GBDT.feature_importances_
        GBDT_order = pd.DataFrame({'GBDT': X.columns, 'Importance': GBDT_importances})
        GBDT_order = GBDT_order.sort_values(by='Importance', ascending=False).reset_index(drop=True)
        GBDT_order = GBDT_order.iloc[:, :1]
        
        return GBDT_order

    # Relief
    def Relief_feature_importance_rank(df):
        X = df.iloc[:, :-1].values
        y = df.iloc[:, -1].values
        Relief = ReliefF(n_neighbors=10, n_features_to_select=X.shape[1])
        Relief.fit(X, y)
        Relief_importances = Relief.feature_importances_
        feature_names = df.columns[:-1]
        Relief_order = pd.DataFrame({'Relief':feature_names, 'Importance': Relief_importances})
        Relief_order = Relief_order.sort_values(by='Importance', ascending=False).reset_index(drop=True)
        Relief_order = Relief_order.iloc[:, :1]

        return Relief_order

    result_df = pd.DataFrame()

    if 'MIC' in methods:
        MIC_order = MIC_feature_importance_rank(data)
        result_df = pd.concat([result_df, MIC_order], axis=1)

    if 'GBDT' in methods:
        GBDT_order = GBDT_feature_importance_rank(data)
        result_df = pd.concat([result_df, GBDT_order], axis=1)

    if 'Relief' in methods:
        Relief_order = Relief_feature_importance_rank(data)
        result_df = pd.concat([result_df, Relief_order], axis=1)

    if output_path:
        result_df.to_csv(output_path, index=False)

    return result_df

In [None]:
data = pd.read_csv("./result/02_feature_coding_data.csv", index_col=0)
methods = ['MIC', 'GBDT', 'Relief']
output_path = False
result = feature_importance_rank(data, methods, output_path)

In [None]:
# TSP score
feature_df = pd.read_csv("./result/01_gene_pairs.csv")
result['TSP'] = feature_df.apply(lambda x: x.iloc[0] + "|" + x.iloc[1], axis=1)
result.to_csv('./result/04_feature_importance_rank.csv', index=False)

#### 3.2 Incremental feature selection

In [None]:
def incremental_feature_selection(data, feature_df, feature_num, model, n_splits, line_colors, result_path, plot_path):
    X = data.iloc[:, :-1]
    y = data.iloc[:, -1]

    result_df = pd.DataFrame(columns=["Num_Features"] + feature_df.columns.tolist())

    cv = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)
    auc_scorer = make_scorer(roc_auc_score, needs_proba=True)
    
    feature_range = range(1, feature_num + 1)

    if line_colors is None:
        line_colors = plt.cm.tab10(np.linspace(0, 1, len(feature_df.columns)))
        
    for num_features in tqdm(feature_range, desc="Incremental feature selection progress"):
        row = {"Num_Features": num_features}
        
        for method in feature_df.columns:
            selected_features = feature_df[method].head(num_features)
            X_selected = X[selected_features]
            aucs = cross_val_score(model, X_selected, y, cv=cv, scoring=auc_scorer)
            row[method] = np.mean(aucs)

        result_df = pd.concat([result_df, pd.DataFrame([row])], ignore_index=True)
    
    result_df.to_csv(result_path, index=False)
    
    plt.figure(figsize=(5, 3))
    for i, method in enumerate(feature_df.columns): 
        plt.plot(result_df["Num_Features"], result_df[method], label=method, marker='o', color=line_colors[i])

    plt.grid(True, linestyle='--', alpha=0.5)
    plt.legend(loc="lower right", fontsize=10) 
    plt.xlabel("Number of Features", fontsize=10)
    plt.ylabel("AUC", fontsize=10)
    plt.xticks(np.arange(1, feature_num+1), np.arange(1, feature_num+1), fontsize=10)
    plt.yticks(fontsize=10)

    plt.savefig(plot_path, format='pdf', dpi=300, bbox_inches='tight')  
    plt.show()
    
    return result_df

In [None]:
data = pd.read_csv("./result/02_feature_coding_data.csv", index_col=0)
feature_df = pd.read_csv("./result/04_feature_importance_rank.csv")
feature_num = 7
model = GaussianNB()
n_splits = 5
line_colors = ['#576493', '#af454a', '#72a551', '#e69829']
result_path = "./result/05_incremental_feature_selection.csv"
plot_path = "./result/06_incremental_feature_selection.pdf"
result = incremental_feature_selection(data, feature_df, feature_num, model, n_splits, line_colors, result_path, plot_path)

### 4. Hyperparameter optimization and model construction

In [None]:
def train_optimized_model(data, feature_df, method, num_features, model, params, n_splits, n_iter, save_path):
    selected_features = feature_df[method].head(num_features).tolist()
    X = data[selected_features]
    y = data.iloc[:, -1] 
    
    scorer = make_scorer(roc_auc_score, needs_proba=True)
    
    cv = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)

    random_search = RandomizedSearchCV(
        estimator=model,
        param_distributions=params,
        scoring=scorer,
        n_iter=n_iter,
        cv=cv,
        verbose=1,
        random_state=42,
        n_jobs=-1
    )
    
    print("Starting Randomized Search...")
    random_search.fit(X, y)
    
    best_params = random_search.best_params_
    best_score = random_search.best_score_
    print(f"Best parameters found: {best_params}")
    print(f"Best AUC score: {best_score}")
    
    best_model = random_search.best_estimator_
    best_model.fit(X, y)
    
    joblib.dump(best_model, save_path)
    print(f"Model saved to {save_path}")
    
    return best_model, best_params, best_score

In [None]:
data = pd.read_csv("./result/02_feature_coding_data.csv", index_col=0)
feature_df = pd.read_csv('./result/04_feature_importance_rank.csv')
method = "GBDT"
num_features = 5
model = GaussianNB()
params = {
    'var_smoothing': loguniform(1e-9, 1e-2)
}
n_splits = 5
n_iter = 10000
save_path = "./result/07_best_model.joblib"
result = train_optimized_model(data, feature_df, method, num_features, model, params, n_splits, n_iter, save_path)

### 5. Model validation

In [None]:
test_df = pd.read_csv("./data/02_test_df.csv", index_col=0)
E_valid_df = pd.read_csv("./data/03_E_valid_df.csv", index_col=0)
feature_df = pd.read_csv("./result/01_gene_pairs.csv")
label = '_PTB_'
test_df_output_path = "./result/08_test_feature_coding_data.csv"
E_valid_df_output_path = "./result/09_val_feature_coding_data.csv"
feature_coding(test_df, feature_df, label, test_df_output_path)
feature_coding(E_valid_df, feature_df, label, E_valid_df_output_path)

In [None]:
def plot_roc_curve(train_data, test_data, val_data, feature_df, method, num_features, model_path, colors, save_path):
    selected_features = feature_df[method].head(num_features).tolist()
    X_train = train_data[selected_features]
    y_train = train_data.iloc[:, -1] 
    X_test = test_data[selected_features]
    y_test = test_data.iloc[:, -1] 
    X_val = val_data[selected_features]
    y_val = val_data.iloc[:, -1] 

    model = joblib.load(model_path)
    
    y_train_pred = model.predict_proba(X_train)[:, 1]  
    y_test_pred = model.predict_proba(X_test)[:, 1]    
    y_val_pred = model.predict_proba(X_val)[:, 1] 
    
    plt.figure(figsize=(5, 5))
    for y_true, y_pred, color, label in zip(
        [y_train, y_test, y_val],
        [y_train_pred, y_test_pred, y_val_pred],
        colors,
        ['Train', 'Test', 'Val']
    ):
        fpr, tpr, _ = roc_curve(y_true, y_pred)
        auc_score = roc_auc_score(y_true, y_pred)
        plt.plot(fpr, tpr, color=color, label=f'{label} AUC = {auc_score:.3f}', linewidth=2)

    plt.plot([0, 1], [0, 1], 'k--', label="Random", linewidth=2)
    plt.grid(True, linestyle='--', alpha=0.5)
    plt.legend(loc='lower right', fontsize=11)
    plt.title("ROC Curve", fontsize=16)
    plt.xlabel("False Positive Rate", fontsize=14)
    plt.ylabel("True Positive Rate", fontsize=14)
    plt.xticks(fontsize=12)
    plt.yticks(fontsize=12)

    if save_path:
        plt.savefig(save_path, format='pdf', dpi=300, bbox_inches='tight')   
    plt.show()

In [None]:
train_data = pd.read_csv('./result/02_feature_coding_data.csv', index_col=0) 
test_data = pd.read_csv('./result/08_test_feature_coding_data.csv', index_col=0) 
val_data = pd.read_csv('./result/09_val_feature_coding_data.csv', index_col=0) 
feature_df = pd.read_csv('./result/04_feature_importance_rank.csv')
method = 'GBDT'
num_features = 5
model_path = './result/07_best_model.joblib'
colors = ['#226c87', '#f8d672', '#e48e11']
save_path = './result/10_gene_pair_model_AUC.pdf'
plot_roc_curve(train_data, test_data, val_data, feature_df, method, num_features, model_path, colors, plot_ci, n_bootstrap, save_path)