In [1]:
import h5py
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import optuna
from time import sleep
import itertools
import numpy as np
from sklearn import metrics
from sklearn.metrics import roc_auc_score, precision_score, recall_score
from torch.utils.data import ConcatDataset, DataLoader
from torch.utils.data import Dataset 

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
import random
def set_seed(seed):
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)
    random.seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_seed(42)

In [3]:
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader

# 读取训练集和测试集 CSV 文件
train_csv_file_path = '/root/ACE/pca_train.csv'  # 替换为实际的训练集 CSV 文件路径
test_csv_file_path = '/root/ACE/pca_test.csv'    # 替换为实际的测试集 CSV 文件路径

train_data = pd.read_csv(train_csv_file_path)
test_data = pd.read_csv(test_csv_file_path)

# 假设第一列是标签，后面的列都是特征
train_labels = train_data.iloc[:, 0]  # 训练集标签
train_embeddings = train_data.iloc[:, 1:]  # 训练集特征

test_labels = test_data.iloc[:, 0]  # 测试集标签
test_embeddings = test_data.iloc[:, 1:]  # 测试集特征

# 自定义 Dataset 类
class CustomDataset(Dataset):
    def __init__(self, embeddings, labels):
        self.embeddings = embeddings
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        embedding = torch.tensor(self.embeddings.iloc[idx].values, dtype=torch.float32)  # 转换为 torch 张量
        label = torch.tensor(self.labels.iloc[idx], dtype=torch.long)
        return embedding, label

# 创建训练和测试数据集
train_dataset = CustomDataset(train_embeddings, train_labels)
test_dataset = CustomDataset(test_embeddings, test_labels)

# 创建数据加载器
train_loader = DataLoader(train_dataset, batch_size=256, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=256, shuffle=False)


In [4]:
import warnings

# 忽略所有警告
warnings.filterwarnings("ignore")

In [5]:
import torch.nn as nn
import torch
import h5py

class Attention(nn.Module):
    def __init__(self, hidden_size):
        super(Attention, self).__init__()
        self.linear = nn.Linear(hidden_size, hidden_size)
        self.softmax = nn.Softmax(dim=-1)

    def forward(self, lstm_out):
        out = self.linear(lstm_out)
        score = torch.bmm(out, out.transpose(1, 2))
        attn = self.softmax(score)
        context = torch.bmm(attn, lstm_out)
        return context
    
    
class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes, drop):
        super(LSTMModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.dropout = nn.Dropout(drop)
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.attention = Attention(input_size)
        self.batch_norm = nn.BatchNorm1d(hidden_size)
        self.fc = nn.Linear(hidden_size, num_classes)
        self.dropout = nn.Dropout(drop)
        self.relu = nn.ReLU(True)
        # 批归一化层
        self.batch_norm1 = nn.BatchNorm1d(512)
        self.batch_norm2 = nn.BatchNorm1d(32)

        # 全连接层
        self.fc2 = nn.Linear(128, 512)
        self.fc3 = nn.Linear(512, 32)
        self.fc = nn.Linear(32, 2)
    def forward(self, x):
        out = self.attention(x)
        out, _ = self.lstm(out)
        out = out.permute(0, 2, 1)
        out = self.batch_norm(out)
        out = out.permute(0, 2, 1)
        # 全连接层操作
        out = self.fc2(out)
        # 批归一化层
        out = out.permute(0, 2, 1)  # 调整维度以适应 BatchNorm1d 的输入
        out = self.batch_norm1(out)
        out = self.relu(out)
        out = out.permute(0, 2, 1)  # 恢复原始的维度顺序

        out = self.dropout(out)
        out = self.fc3(out)
        out = out.permute(0, 2, 1)  # 调整维度以适应 BatchNorm1d 的输入
        out = self.batch_norm2(out)
        out = self.relu(out)
        out = out.permute(0, 2, 1)  # 恢复原始的维度顺序

        out = self.dropout(out)
        out = self.fc(out[:, -1, :])
        return out

# class CNNModel(nn.Module):
#     def __init__(self, input_size, hidden_size, num_layers, num_classes, drop):
#         super(CNNModel, self).__init__()
#         self.relu = nn.ReLU(True)
#         self.dropout = nn.Dropout(drop)
        
#         # Convolutional layers
#         self.conv1 = nn.Conv1d(input_size, 512, kernel_size=3, padding=1)
#         self.batch_norm1 = nn.BatchNorm1d(512)
        
#         self.conv2 = nn.Conv1d(512, 256, kernel_size=3, padding=1)
#         self.batch_norm2 = nn.BatchNorm1d(256)
        
#         self.conv3 = nn.Conv1d(256, 128, kernel_size=3, padding=1)
#         self.batch_norm3 = nn.BatchNorm1d(128)
        
#         self.maxpool = nn.MaxPool1d(kernel_size=3, stride=2, padding=1)
        
#         # Attention layer (ensure Attention class is defined)
#         self.attention = Attention(128)
        
#         # Fully connected layers
#         self.fc1 = nn.Linear(128, 256)
#         self.batch_norm_fc1 = nn.BatchNorm1d(256)
        
#         self.fc2 = nn.Linear(256, 32)
#         self.batch_norm_fc2 = nn.BatchNorm1d(32)
        
#         self.fc_final = nn.Linear(32, num_classes)
        
#     def forward(self, x):
#         # x shape: [batch_size, seq_length, input_size]
#         out = x.permute(0, 2, 1)  # [batch_size, input_size, seq_length]
        
#         # First convolutional layer
#         out = self.conv1(out)
#         out = self.batch_norm1(out)
#         out = self.relu(out)
        
#         # Second convolutional layer
#         out = self.conv2(out)
#         out = self.batch_norm2(out)
#         out = self.relu(out)
        
#         # Third convolutional layer
#         out = self.conv3(out)
#         out = self.batch_norm3(out)
#         out = self.relu(out)
#         out = self.maxpool(out)
        
#         out = out.permute(0, 2, 1)  # [batch_size, seq_length, features]
        
#         # Attention layer
#         out = self.attention(out)
        
#         # Global average pooling
#         out = out.mean(dim=1)  # [batch_size, features]
        
#         # Fully connected layers
#         out = self.fc1(out)
#         out = self.batch_norm_fc1(out)
#         out = self.relu(out)
#         out = self.dropout(out)
        
#         out = self.fc2(out)
#         out = self.batch_norm_fc2(out)
#         out = self.relu(out)
#         out = self.dropout(out)
        
#         out = self.fc_final(out)
#         return out
class CNNModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes, drop):
        super(CNNModel, self).__init__()
        self.relu = nn.ReLU(True)
        self.dropout = nn.Dropout(drop)
        
        # Convolutional layers
        self.conv1 = nn.Conv1d(input_size, 128, kernel_size=3, padding=1)
        self.batch_norm1 = nn.BatchNorm1d(128)
        
        self.conv2 = nn.Conv1d(128, 64, kernel_size=3, padding=1)
        self.batch_norm2 = nn.BatchNorm1d(64)
        
        self.conv3 = nn.Conv1d(64, 32, kernel_size=3, padding=1)
        self.batch_norm3 = nn.BatchNorm1d(32)
        
        self.maxpool = nn.MaxPool1d(kernel_size=3, stride=2, padding=1)
        
        # Attention layer (ensure Attention class is defined)
        self.attention = Attention(32)
        
        # Fully connected layers
        self.fc1 = nn.Linear(32, 64)
        self.batch_norm_fc1 = nn.BatchNorm1d(64)
        
        self.fc2 = nn.Linear(64, 16)
        self.batch_norm_fc2 = nn.BatchNorm1d(16)
        
        self.fc_final = nn.Linear(16, num_classes)
        
    def forward(self, x):
        # x shape: [batch_size, seq_length, input_size]
        out = x.permute(0, 2, 1)  # [batch_size, input_size, seq_length]
        
        # First convolutional layer
        out = self.conv1(out)
        out = self.batch_norm1(out)
        out = self.relu(out)
        
        # Second convolutional layer
        out = self.conv2(out)
        out = self.batch_norm2(out)
        out = self.relu(out)
        
        # Third convolutional layer
        out = self.conv3(out)
        out = self.batch_norm3(out)
        out = self.relu(out)
        out = self.maxpool(out)
        
        out = out.permute(0, 2, 1)  # [batch_size, seq_length, features]
        
        # Attention layer
        out = self.attention(out)
        
        # Global average pooling
        out = out.mean(dim=1)  # [batch_size, features]
        
        # Fully connected layers
        out = self.fc1(out)
        out = self.batch_norm_fc1(out)
        out = self.relu(out)
        out = self.dropout(out)
        
        out = self.fc2(out)
        out = self.batch_norm_fc2(out)
        out = self.relu(out)
        out = self.dropout(out)
        
        out = self.fc_final(out)
        return out
class DualModel(nn.Module):
    def __init__(self, input_size, hidden_size_cnn, hidden_size_lstm, num_layers_cnn, num_layers_lstm, num_classes, drop_cnn, drop_lstm):
        super(DualModel, self).__init__()
        self.cnn = CNNModel(input_size, hidden_size_cnn, num_layers_cnn, num_classes, drop_cnn)
        self.lstm = LSTMModel(input_size, hidden_size_lstm, num_layers_lstm, num_classes, drop_lstm)
        self.weight = nn.Parameter(torch.tensor(0.7))

    def forward(self, x):
        out_cnn = self.cnn(x)
        #out_lstm = self.lstm(x)
        #out = self.weight * out_cnn + (1 - self.weight) * out_lstm
        return out_cnn

In [6]:
# 定义多层感知机（MLP）模型
class MLP(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(MLP, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, output_size)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.5)  # Dropout for regularization

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        return x

In [7]:
import numpy as np
import joblib
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, matthews_corrcoef, roc_auc_score, balanced_accuracy_score, recall_score
import optuna

# 加载保存的模型
svm_model = joblib.load('/root/ACE/model/best_svm_model.pkl')
rl_model = joblib.load('/root/ACE/model/best_logistic_regression_model.pkl')

# 读取训练集和测试集 CSV 文件
train_csv_file_path = '/root/ACE/pca_train.csv'  # 替换为实际的训练集 CSV 文件路径
test_csv_file_path = '/root/ACE/pca_test.csv'    # 替换为实际的测试集 CSV 文件路径

train_data = pd.read_csv(train_csv_file_path)
test_data = pd.read_csv(test_csv_file_path)

# 假设第一列是标签，后面的列都是特征
train_labels = train_data.iloc[:, 0]  # 训练集标签
train_embeddings = train_data.iloc[:, 1:]  # 训练集特征

test_labels = test_data.iloc[:, 0]  # 测试集标签
test_embeddings = test_data.iloc[:, 1:]  # 测试集特征

# 自定义 Dataset 类
class CustomDataset(Dataset):
    def __init__(self, embeddings, labels):
        self.embeddings = embeddings
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        embedding = torch.tensor(self.embeddings.iloc[idx].values, dtype=torch.float32)  # 转换为 torch 张量
        label = torch.tensor(self.labels.iloc[idx], dtype=torch.long)
        return embedding, label

# 创建训练和测试数据集
train_dataset = CustomDataset(train_embeddings, train_labels)
test_dataset = CustomDataset(test_embeddings, test_labels)

# 创建数据加载器
train_loader = DataLoader(train_dataset, batch_size=256, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=256, shuffle=False)

# 加载 CNN 模型
model = torch.load('/root/ACE/model/best_model_trial_cnn.pth')
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
model.eval()

# 使用 CNN 模型对测试集进行预测
all_preds = []
all_labels = []
with torch.no_grad():
    for data, labels in test_loader:
        data = data.to(device).unsqueeze(1)
        labels = labels.to(device)

        outputs = model(data)
        probs = torch.softmax(outputs, dim=1)
        preds = torch.argmax(outputs, dim=1)

        all_preds.append(probs[:, 1].cpu().numpy())
        all_labels.append(labels.cpu().numpy())

all_preds = np.concatenate(all_preds)
all_labels = np.concatenate(all_labels)
binary_preds = (all_preds > 0.5).astype(int)

# 计算评估指标
conf_matrix = confusion_matrix(all_labels, binary_preds)
tn, fp, fn, tp = conf_matrix.ravel()

auc = roc_auc_score(all_labels, all_preds)
balanced_acc = balanced_accuracy_score(all_labels, binary_preds)
sensitivity = recall_score(all_labels, binary_preds)
specificity = tn / (tn + fp) if (tn + fp) > 0 else 0
mcc = matthews_corrcoef(all_labels, binary_preds)

# 打印评估指标
print(f'Balanced Accuracy (BAcc): {balanced_acc:.4f}, Sensitivity (Sn): {sensitivity:.4f}, '
      f'Specificity (Sp): {specificity:.4f}, MCC: {mcc:.4f}, AUROC: {auc:.4f}')

# 使用每个基模型对训练集和测试集进行预测
cnn_train_pred = model(torch.tensor(train_embeddings.values, dtype=torch.float32).to(device).unsqueeze(1)).detach().cpu().numpy()[:, 1]  # 使用 CNN 模型的预测结果
svm_train_pred = svm_model.predict(train_embeddings)
rl_train_pred = rl_model.predict(train_embeddings)

cnn_test_pred = model(torch.tensor(test_embeddings.values, dtype=torch.float32).to(device).unsqueeze(1)).detach().cpu().numpy()[:, 1]  # 使用 CNN 模型的预测结果
svm_test_pred = svm_model.predict(test_embeddings)
rl_test_pred = rl_model.predict(test_embeddings)

# 确保所有预测结果具有相同长度
cnn_train_pred = cnn_train_pred[:len(train_labels)]
svm_train_pred = svm_train_pred[:len(train_labels)]
rl_train_pred = rl_train_pred[:len(train_labels)]

cnn_test_pred = cnn_test_pred[:len(test_labels)]
svm_test_pred = svm_test_pred[:len(test_labels)]
rl_test_pred = rl_test_pred[:len(test_labels)]

# 将每个模型的预测结果作为新的特征
stacked_train_features = np.vstack((cnn_train_pred, svm_train_pred, rl_train_pred)).T
stacked_test_features = np.vstack((cnn_test_pred, svm_test_pred, rl_test_pred)).T

# 使用逻辑回归进行堆叠并进行调参

def objective(trial):
    C = trial.suggest_loguniform('C', 1e-3, 1e2)
    solver = trial.suggest_categorical('solver', ['lbfgs', 'liblinear'])
    max_iter = trial.suggest_int('max_iter', 100, 1000)
    
    stacking_model = LogisticRegression(C=C, solver=solver, max_iter=max_iter)
    stacking_model.fit(stacked_train_features, train_labels)
    stacked_binary_preds = stacking_model.predict(stacked_test_features)
    conf_matrix = confusion_matrix(test_labels, stacked_binary_preds)
    tn, fp, fn, tp = conf_matrix.ravel()
    specificity = tn / (tn + fp) if (tn + fp) > 0 else 0
    return specificity

study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=50)

best_params = study.best_params
stacking_model = LogisticRegression(**best_params)
stacking_model.fit(stacked_train_features, train_labels)

# 对测试集进行预测
stacked_pred = stacking_model.predict_proba(stacked_test_features)[:, 1]

# 将预测结果转换为二分类（例如，0.5 为阈值）
stacked_binary_preds = (stacked_pred > 0.5).astype(int)

# 计算评估指标
conf_matrix = confusion_matrix(test_labels, stacked_binary_preds)
tn, fp, fn, tp = conf_matrix.ravel()

auc = roc_auc_score(test_labels, stacked_pred)
balanced_acc = balanced_accuracy_score(test_labels, stacked_binary_preds)
sensitivity = recall_score(test_labels, stacked_binary_preds)
specificity = tn / (tn + fp) if (tn + fp) > 0 else 0
mcc = matthews_corrcoef(test_labels, stacked_binary_preds)

# 打印评估指标
print(f'Balanced Accuracy (BAcc): {balanced_acc:.4f}, Sensitivity (Sn): {sensitivity:.4f}, '
      f'Specificity (Sp): {specificity:.4f}, MCC: {mcc:.4f}, AUROC: {auc:.4f}')

[I 2024-10-11 20:12:15,239] A new study created in memory with name: no-name-b89351e8-eb2f-4942-9194-f4579ec8e13b
[I 2024-10-11 20:12:15,246] Trial 0 finished with value: 0.9113924050632911 and parameters: {'C': 0.06711512209020946, 'solver': 'liblinear', 'max_iter': 951}. Best is trial 0 with value: 0.9113924050632911.
[I 2024-10-11 20:12:15,251] Trial 1 finished with value: 0.9113924050632911 and parameters: {'C': 0.05929412122396233, 'solver': 'liblinear', 'max_iter': 825}. Best is trial 0 with value: 0.9113924050632911.
[I 2024-10-11 20:12:15,258] Trial 2 finished with value: 0.9113924050632911 and parameters: {'C': 9.495743123274119, 'solver': 'lbfgs', 'max_iter': 925}. Best is trial 0 with value: 0.9113924050632911.
[I 2024-10-11 20:12:15,266] Trial 3 finished with value: 0.9113924050632911 and parameters: {'C': 13.621477763911821, 'solver': 'lbfgs', 'max_iter': 752}. Best is trial 0 with value: 0.9113924050632911.
[I 2024-10-11 20:12:15,276] Trial 4 finished with value: 0.911392

Balanced Accuracy (BAcc): 0.9020, Sensitivity (Sn): 0.8800, Specificity (Sp): 0.9241, MCC: 0.7908, AUROC: 0.9434


[I 2024-10-11 20:12:15,342] Trial 10 finished with value: 0.9240506329113924 and parameters: {'C': 0.001072670638035624, 'solver': 'liblinear', 'max_iter': 100}. Best is trial 10 with value: 0.9240506329113924.
[I 2024-10-11 20:12:15,358] Trial 11 finished with value: 0.9240506329113924 and parameters: {'C': 0.0013584047732548591, 'solver': 'liblinear', 'max_iter': 147}. Best is trial 10 with value: 0.9240506329113924.
[I 2024-10-11 20:12:15,373] Trial 12 finished with value: 0.9240506329113924 and parameters: {'C': 0.001111774726122675, 'solver': 'liblinear', 'max_iter': 107}. Best is trial 10 with value: 0.9240506329113924.
[I 2024-10-11 20:12:15,391] Trial 13 finished with value: 0.9240506329113924 and parameters: {'C': 0.0010367939980779034, 'solver': 'liblinear', 'max_iter': 131}. Best is trial 10 with value: 0.9240506329113924.
[I 2024-10-11 20:12:15,411] Trial 14 finished with value: 0.9240506329113924 and parameters: {'C': 0.006248629757705615, 'solver': 'liblinear', 'max_iter'

Balanced Accuracy (BAcc): 0.9060, Sensitivity (Sn): 0.8880, Specificity (Sp): 0.9241, MCC: 0.7998, AUROC: 0.9477


In [8]:
import numpy as np
import joblib
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import VotingClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, matthews_corrcoef, roc_auc_score, balanced_accuracy_score, recall_score
import optuna

# 加载保存的模型
svm_model = joblib.load('/root/ACE/model/best_svm_model.pkl')
rl_model = joblib.load('/root/ACE/model/best_logistic_regression_model.pkl')

# 读取训练集和测试集 CSV 文件
train_csv_file_path = '/root/ACE/pca_train.csv'  # 替换为实际的训练集 CSV 文件路径
test_csv_file_path = '/root/ACE/pca_test.csv'    # 替换为实际的测试集 CSV 文件路径

train_data = pd.read_csv(train_csv_file_path)
test_data = pd.read_csv(test_csv_file_path)

# 假设第一列是标签，后面的列都是特征
train_labels = train_data.iloc[:, 0]  # 训练集标签
train_embeddings = train_data.iloc[:, 1:]  # 训练集特征

test_labels = test_data.iloc[:, 0]  # 测试集标签
test_embeddings = test_data.iloc[:, 1:]  # 测试集特征

# 自定义 Dataset 类
class CustomDataset(Dataset):
    def __init__(self, embeddings, labels):
        self.embeddings = embeddings
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        embedding = torch.tensor(self.embeddings.iloc[idx].values, dtype=torch.float32)  # 转换为 torch 张量
        label = torch.tensor(self.labels.iloc[idx], dtype=torch.long)
        return embedding, label

# 创建训练和测试数据集
train_dataset = CustomDataset(train_embeddings, train_labels)
test_dataset = CustomDataset(test_embeddings, test_labels)

# 创建数据加载器
train_loader = DataLoader(train_dataset, batch_size=256, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=256, shuffle=False)

# 加载 CNN 模型
model = torch.load('/root/ACE/model/best_model_trial_cnn.pth')
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
model.eval()

# 使用 CNN 模型对测试集进行预测
all_preds = []
all_labels = []
with torch.no_grad():
    for data, labels in test_loader:
        data = data.to(device).unsqueeze(1)
        labels = labels.to(device)

        outputs = model(data)
        probs = torch.softmax(outputs, dim=1)
        preds = torch.argmax(outputs, dim=1)

        all_preds.append(probs[:, 1].cpu().numpy())
        all_labels.append(labels.cpu().numpy())

all_preds = np.concatenate(all_preds)
all_labels = np.concatenate(all_labels)
binary_preds = (all_preds > 0.5).astype(int)

# 计算评估指标
conf_matrix = confusion_matrix(all_labels, binary_preds)
tn, fp, fn, tp = conf_matrix.ravel()

auc = roc_auc_score(all_labels, all_preds)
balanced_acc = balanced_accuracy_score(all_labels, binary_preds)
sensitivity = recall_score(all_labels, binary_preds)
specificity = tn / (tn + fp) if (tn + fp) > 0 else 0
mcc = matthews_corrcoef(all_labels, binary_preds)

# 打印评估指标
print(f'Balanced Accuracy (BAcc): {balanced_acc:.4f}, Sensitivity (Sn): {sensitivity:.4f}, '
      f'Specificity (Sp): {specificity:.4f}, MCC: {mcc:.4f}, AUROC: {auc:.4f}')

# 使用每个基模型对训练集和测试集进行预测
cnn_train_pred = model(torch.tensor(train_embeddings.values, dtype=torch.float32).to(device).unsqueeze(1)).detach().cpu().numpy()[:, 1]  # 使用 CNN 模型的预测结果
svm_train_pred = svm_model.predict(train_embeddings)
rl_train_pred = rl_model.predict(train_embeddings)

cnn_test_pred = model(torch.tensor(test_embeddings.values, dtype=torch.float32).to(device).unsqueeze(1)).detach().cpu().numpy()[:, 1]  # 使用 CNN 模型的预测结果
svm_test_pred = svm_model.predict(test_embeddings)
rl_test_pred = rl_model.predict(test_embeddings)

# 确保所有预测结果具有相同长度
cnn_train_pred = cnn_train_pred[:len(train_labels)]
svm_train_pred = svm_train_pred[:len(train_labels)]
rl_train_pred = rl_train_pred[:len(train_labels)]

cnn_test_pred = cnn_test_pred[:len(test_labels)]
svm_test_pred = svm_test_pred[:len(test_labels)]
rl_test_pred = rl_test_pred[:len(test_labels)]

# 创建 VotingClassifier 模型
voting_model = VotingClassifier(estimators=[
    ('svm', svm_model),
    ('rl', rl_model),
    ('cnn', LogisticRegression().fit(stacked_train_features, train_labels))  # 使用 CNN 特征训练逻辑回归
], voting='soft')

# 训练 VotingClassifier
voting_model.fit(stacked_train_features, train_labels)

# 对测试集进行预测
stacked_pred = voting_model.predict_proba(stacked_test_features)[:, 1]

# 将预测结果转换为二分类（例如，0.5 为阈值）
stacked_binary_preds = (stacked_pred > 0.5).astype(int)

# 计算评估指标
conf_matrix = confusion_matrix(test_labels, stacked_binary_preds)
tn, fp, fn, tp = conf_matrix.ravel()

auc = roc_auc_score(test_labels, stacked_pred)
balanced_acc = balanced_accuracy_score(test_labels, stacked_binary_preds)
sensitivity = recall_score(test_labels, stacked_binary_preds)
specificity = tn / (tn + fp) if (tn + fp) > 0 else 0
mcc = matthews_corrcoef(test_labels, stacked_binary_preds)

# 打印评估指标
print(f'Balanced Accuracy (BAcc): {balanced_acc:.4f}, Sensitivity (Sn): {sensitivity:.4f}, '
      f'Specificity (Sp): {specificity:.4f}, MCC: {mcc:.4f}, AUROC: {auc:.4f}')

Balanced Accuracy (BAcc): 0.9020, Sensitivity (Sn): 0.8800, Specificity (Sp): 0.9241, MCC: 0.7908, AUROC: 0.9434
Balanced Accuracy (BAcc): 0.9174, Sensitivity (Sn): 0.9360, Specificity (Sp): 0.8987, MCC: 0.8347, AUROC: 0.9477


In [9]:
import numpy as np
import joblib
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import VotingClassifier, StackingClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, matthews_corrcoef, roc_auc_score, balanced_accuracy_score, recall_score
import optuna

# 加载保存的模型
svm_model = joblib.load('/root/ACE/model/best_svm_model.pkl')
rl_model = joblib.load('/root/ACE/model/best_logistic_regression_model.pkl')

# 读取训练集和测试集 CSV 文件
train_csv_file_path = '/root/ACE/pca_train.csv'  # 替换为实际的训练集 CSV 文件路径
test_csv_file_path = '/root/ACE/pca_test.csv'    # 替换为实际的测试集 CSV 文件路径

train_data = pd.read_csv(train_csv_file_path)
test_data = pd.read_csv(test_csv_file_path)

# 假设第一列是标签，后面的列都是特征
train_labels = train_data.iloc[:, 0]  # 训练集标签
train_embeddings = train_data.iloc[:, 1:]  # 训练集特征

test_labels = test_data.iloc[:, 0]  # 测试集标签
test_embeddings = test_data.iloc[:, 1:]  # 测试集特征

# 自定义 Dataset 类
class CustomDataset(Dataset):
    def __init__(self, embeddings, labels):
        self.embeddings = embeddings
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        embedding = torch.tensor(self.embeddings.iloc[idx].values, dtype=torch.float32)  # 转换为 torch 张量
        label = torch.tensor(self.labels.iloc[idx], dtype=torch.long)
        return embedding, label

# 创建训练和测试数据集
train_dataset = CustomDataset(train_embeddings, train_labels)
test_dataset = CustomDataset(test_embeddings, test_labels)

# 创建数据加载器
train_loader = DataLoader(train_dataset, batch_size=256, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=256, shuffle=False)

# 加载 CNN 模型
model = torch.load('/root/ACE/model/best_model_trial_cnn.pth')
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
model.eval()

# 使用 CNN 模型对测试集进行预测
all_preds = []
all_labels = []
with torch.no_grad():
    for data, labels in test_loader:
        data = data.to(device).unsqueeze(1)
        labels = labels.to(device)

        outputs = model(data)
        probs = torch.softmax(outputs, dim=1)
        preds = torch.argmax(outputs, dim=1)

        all_preds.append(probs[:, 1].cpu().numpy())
        all_labels.append(labels.cpu().numpy())

all_preds = np.concatenate(all_preds)
all_labels = np.concatenate(all_labels)
binary_preds = (all_preds > 0.5).astype(int)

# 计算评估指标
conf_matrix = confusion_matrix(all_labels, binary_preds)
tn, fp, fn, tp = conf_matrix.ravel()

auc = roc_auc_score(all_labels, all_preds)
balanced_acc = balanced_accuracy_score(all_labels, binary_preds)
sensitivity = recall_score(all_labels, binary_preds)
specificity = tn / (tn + fp) if (tn + fp) > 0 else 0
mcc = matthews_corrcoef(all_labels, binary_preds)

# 打印评估指标
print(f'Balanced Accuracy (BAcc): {balanced_acc:.4f}, Sensitivity (Sn): {sensitivity:.4f}, '
      f'Specificity (Sp): {specificity:.4f}, MCC: {mcc:.4f}, AUROC: {auc:.4f}')

# 使用每个基模型对训练集和测试集进行预测
cnn_train_pred = model(torch.tensor(train_embeddings.values, dtype=torch.float32).to(device).unsqueeze(1)).detach().cpu().numpy()[:, 1]  # 使用 CNN 模型的预测结果
svm_train_pred = svm_model.predict(train_embeddings)
rl_train_pred = rl_model.predict(train_embeddings)

cnn_test_pred = model(torch.tensor(test_embeddings.values, dtype=torch.float32).to(device).unsqueeze(1)).detach().cpu().numpy()[:, 1]  # 使用 CNN 模型的预测结果
svm_test_pred = svm_model.predict(test_embeddings)
rl_test_pred = rl_model.predict(test_embeddings)

# 确保所有预测结果具有相同长度
cnn_train_pred = cnn_train_pred[:len(train_labels)]
svm_train_pred = svm_train_pred[:len(train_labels)]
rl_train_pred = rl_train_pred[:len(train_labels)]

cnn_test_pred = cnn_test_pred[:len(test_labels)]
svm_test_pred = svm_test_pred[:len(test_labels)]
rl_test_pred = rl_test_pred[:len(test_labels)]

# 将每个模型的预测结果作为新的特征
stacked_train_features = np.vstack((cnn_train_pred, svm_train_pred, rl_train_pred)).T
stacked_test_features = np.vstack((cnn_test_pred, svm_test_pred, rl_test_pred)).T

# 创建 VotingClassifier 模型
voting_model = VotingClassifier(estimators=[
    ('svm', svm_model),
    ('rl', rl_model)
], voting='soft')

# 使用 VotingClassifier 和 CNN 的结果进行堆叠
stacking_estimators = [
    ('voting', voting_model),
    ('cnn', LogisticRegression().fit(stacked_train_features, train_labels))  # 使用 CNN 特征训练逻辑回归
]

# 创建 StackingClassifier 模型
stacking_model = StackingClassifier(estimators=stacking_estimators, final_estimator=LogisticRegression())

# 训练 StackingClassifier
stacking_model.fit(stacked_train_features, train_labels)

# 对测试集进行预测
stacked_pred = stacking_model.predict_proba(stacked_test_features)[:, 1]

# 将预测结果转换为二分类（例如，0.5 为阈值）
stacked_binary_preds = (stacked_pred > 0.5).astype(int)

# 计算评估指标
conf_matrix = confusion_matrix(test_labels, stacked_binary_preds)
tn, fp, fn, tp = conf_matrix.ravel()

auc = roc_auc_score(test_labels, stacked_pred)
balanced_acc = balanced_accuracy_score(test_labels, stacked_binary_preds)
sensitivity = recall_score(test_labels, stacked_binary_preds)
specificity = tn / (tn + fp) if (tn + fp) > 0 else 0
mcc = matthews_corrcoef(test_labels, stacked_binary_preds)

# 打印评估指标
print(f'Balanced Accuracy (BAcc): {balanced_acc:.4f}, Sensitivity (Sn): {sensitivity:.4f}, '
      f'Specificity (Sp): {specificity:.4f}, MCC: {mcc:.4f}, AUROC: {auc:.4f}')

Balanced Accuracy (BAcc): 0.9020, Sensitivity (Sn): 0.8800, Specificity (Sp): 0.9241, MCC: 0.7908, AUROC: 0.9434
Balanced Accuracy (BAcc): 0.9174, Sensitivity (Sn): 0.9360, Specificity (Sp): 0.8987, MCC: 0.8347, AUROC: 0.9476


In [10]:
import numpy as np
import joblib
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import VotingClassifier, StackingClassifier
from sklearn.metrics import confusion_matrix, matthews_corrcoef, roc_auc_score, balanced_accuracy_score, recall_score

# 加载保存的模型
svm_model = joblib.load('/root/ACE/model/best_svm_model.pkl')
rl_model = joblib.load('/root/ACE/model/best_logistic_regression_model.pkl')
mlp_model = torch.load('/root/ACE/model/best_mlp_model_entire.pth')

# 读取训练集和测试集 CSV 文件
train_csv_file_path = '/root/ACE/pca_train.csv'
test_csv_file_path = '/root/ACE/pca_test.csv'

train_data = pd.read_csv(train_csv_file_path)
test_data = pd.read_csv(test_csv_file_path)

# 假设第一列是标签，后面的列都是特征
train_labels = train_data.iloc[:, 0]
train_embeddings = train_data.iloc[:, 1:]

test_labels = test_data.iloc[:, 0]
test_embeddings = test_data.iloc[:, 1:]

# 创建数据加载器
train_loader = DataLoader(CustomDataset(train_embeddings, train_labels), batch_size=256, shuffle=True)
test_loader = DataLoader(CustomDataset(test_embeddings, test_labels), batch_size=256, shuffle=False)

# 设置设备
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# 加载 CNN 模型
cnn_model = torch.load('/root/ACE/model/best_model_trial_cnn.pth').to(device)
cnn_model.eval()

# Define the prediction function
def predict_with_torch_model(model, data, device, unsqueeze=False):
    model = model.to(device)
    model.eval()
    with torch.no_grad():
        data_tensor = torch.tensor(data.values, dtype=torch.float32).to(device)
        if unsqueeze:
            data_tensor = data_tensor.unsqueeze(1)  # Add an extra dimension for seq_length
        outputs = model(data_tensor)
        if outputs.size(1) == 1:
            # For models with single output (e.g., MLP)
            probs = torch.sigmoid(outputs).cpu().numpy().flatten()
        else:
            # For models with multi-class outputs (e.g., CNN)
            probs = torch.softmax(outputs, dim=1)[:, 1].cpu().numpy()
    return probs

# Use the updated function
cnn_train_pred = predict_with_torch_model(cnn_model, train_embeddings, device, unsqueeze=True)
cnn_test_pred = predict_with_torch_model(cnn_model, test_embeddings, device, unsqueeze=True)

mlp_train_pred = predict_with_torch_model(mlp_model, train_embeddings, device, unsqueeze=False)
mlp_test_pred = predict_with_torch_model(mlp_model, test_embeddings, device, unsqueeze=False)


# 计算 SVM 和逻辑回归的预测概率
svm_train_pred = svm_model.predict_proba(train_embeddings)[:, 1]
svm_test_pred = svm_model.predict_proba(test_embeddings)[:, 1]

rl_train_pred = rl_model.predict_proba(train_embeddings)[:, 1]
rl_test_pred = rl_model.predict_proba(test_embeddings)[:, 1]

# 将每个模型的预测结果作为新的特征
stacked_train_features = np.vstack((cnn_train_pred, svm_train_pred, rl_train_pred, mlp_train_pred)).T
stacked_test_features = np.vstack((cnn_test_pred, svm_test_pred, rl_test_pred, mlp_test_pred)).T

# 创建 VotingClassifier 模型
voting_model = VotingClassifier(estimators=[
    ('svm', svm_model),
    ('rl', rl_model),
    ('mlp', LogisticRegression().fit(stacked_train_features[:, 3].reshape(-1, 1), train_labels))
], voting='soft')

# 使用 VotingClassifier 和 CNN、MLP 的结果进行堆叠
stacking_estimators = [
    ('voting', voting_model),
    ('cnn', LogisticRegression().fit(stacked_train_features[:, 0].reshape(-1, 1), train_labels))
]

# 创建 StackingClassifier 模型
stacking_model = StackingClassifier(estimators=stacking_estimators, final_estimator=LogisticRegression())

# 训练 StackingClassifier
stacking_model.fit(stacked_train_features, train_labels)

# 对测试集进行预测
stacked_pred = stacking_model.predict_proba(stacked_test_features)[:, 1]
stacked_binary_preds = (stacked_pred > 0.5).astype(int)

# 计算评估指标
auc = roc_auc_score(test_labels, stacked_pred)
balanced_acc = balanced_accuracy_score(test_labels, stacked_binary_preds)
sensitivity = recall_score(test_labels, stacked_binary_preds)
specificity = recall_score(test_labels, stacked_binary_preds, pos_label=0)
mcc = matthews_corrcoef(test_labels, stacked_binary_preds)

# 打印评估指标
print(f'Balanced Accuracy (BAcc): {balanced_acc:.4f}, Sensitivity (Sn): {sensitivity:.4f}, '
      f'Specificity (Sp): {specificity:.4f}, MCC: {mcc:.4f}, AUROC: {auc:.4f}')


Balanced Accuracy (BAcc): 0.9007, Sensitivity (Sn): 0.9280, Specificity (Sp): 0.8734, MCC: 0.8033, AUROC: 0.9608


In [11]:
# Load data
train_csv_file_path = '/root/ACE/pca_train.csv'
test_csv_file_path = '/root/ACE/pca_test.csv'

train_data = pd.read_csv(train_csv_file_path)
test_data = pd.read_csv(test_csv_file_path)

# Assume the first column is labels, the rest are features
train_labels = train_data.iloc[:, 0]
train_embeddings = train_data.iloc[:, 1:]

test_labels = test_data.iloc[:, 0]
test_embeddings = test_data.iloc[:, 1:]

# Create datasets and dataloaders (not strictly necessary here but included for completeness)
train_dataset = CustomDataset(train_embeddings, train_labels)
test_dataset = CustomDataset(test_embeddings, test_labels)

train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False)

# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Load saved models
svm_model = joblib.load('/root/ACE/model/best_svm_model.pkl')
rl_model = joblib.load('/root/ACE/model/best_logistic_regression_model.pkl')
mlp_model = torch.load('/root/ACE/model/best_mlp_model_entire.pth', map_location=device)
cnn_model = torch.load('/root/ACE/model/best_model_trial_cnn.pth', map_location=device)

# Define a wrapper class to integrate PyTorch models with scikit-learn
from sklearn.base import BaseEstimator, ClassifierMixin

class PyTorchClassifier(BaseEstimator, ClassifierMixin):
    def __init__(self, model, device, unsqueeze=False):
        self.model = model.to(device)
        self.device = device
        self.unsqueeze = unsqueeze

    def fit(self, X, y):
        # Assuming the model is already trained
        return self

    def predict(self, X):
        probs = self.predict_proba(X)
        return (probs[:, 1] > 0.5).astype(int)

    def predict_proba(self, X):
        self.model.eval()
        with torch.no_grad():
            # Convert X to tensor if it's not already
            if isinstance(X, pd.DataFrame) or isinstance(X, pd.Series):
                X = X.values
            X_tensor = torch.tensor(X, dtype=torch.float32).to(self.device)
            if self.unsqueeze:
                X_tensor = X_tensor.unsqueeze(1)
            outputs = self.model(X_tensor)
            if outputs.size(1) == 1:
                probs_pos = torch.sigmoid(outputs).cpu().numpy().flatten()
                probs_neg = 1 - probs_pos
                probs = np.vstack((probs_neg, probs_pos)).T
            else:
                probs = torch.softmax(outputs, dim=1).cpu().numpy()
        return probs

# Create instances of the wrapped PyTorch models
cnn_wrapper = PyTorchClassifier(cnn_model, device, unsqueeze=True)
mlp_wrapper = PyTorchClassifier(mlp_model, device, unsqueeze=False)

# Create VotingClassifier with the four models
voting_model = VotingClassifier(estimators=[
    ('svm', svm_model),
    ('rl', rl_model),
    ('cnn', cnn_wrapper),
    ('mlp', mlp_wrapper)
], voting='soft')

# Fit the voting model on the training data
voting_model.fit(train_embeddings, train_labels)

# Predict on the test data
stacked_pred = voting_model.predict_proba(test_embeddings)[:, 1]
stacked_binary_preds = (stacked_pred > 0.5).astype(int)

# Calculate evaluation metrics
auc = roc_auc_score(test_labels, stacked_pred)
balanced_acc = balanced_accuracy_score(test_labels, stacked_binary_preds)
sensitivity = recall_score(test_labels, stacked_binary_preds)
specificity = recall_score(test_labels, stacked_binary_preds, pos_label=0)
mcc = matthews_corrcoef(test_labels, stacked_binary_preds)

# Print evaluation metrics
print(f'Balanced Accuracy (BAcc): {balanced_acc:.4f}, Sensitivity (Sn): {sensitivity:.4f}, '
      f'Specificity (Sp): {specificity:.4f}, MCC: {mcc:.4f}, AUROC: {auc:.4f}')

Balanced Accuracy (BAcc): 0.8927, Sensitivity (Sn): 0.9120, Specificity (Sp): 0.8734, MCC: 0.7836, AUROC: 0.9597


In [12]:
import numpy as np
import pandas as pd
import joblib
import torch
from torch.utils.data import Dataset, DataLoader
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import (
    confusion_matrix, matthews_corrcoef, roc_auc_score,
    balanced_accuracy_score, recall_score
)
from sklearn.base import BaseEstimator, ClassifierMixin

# Set the random seed for reproducibility
def set_seed(seed):
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)
    import random
    random.seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_seed(42)

# Load saved models
svm_model = joblib.load('/root/ACE/model/best_svm_model.pkl')
rl_model = joblib.load('/root/ACE/model/best_logistic_regression_model.pkl')
mlp_model = torch.load('/root/ACE/model/best_mlp_model_entire.pth', map_location='cpu')
cnn_model = torch.load('/root/ACE/model/best_model_trial_cnn.pth', map_location='cpu')

# Read CSV files
train_csv_file_path = '/root/ACE/pca_train.csv'
test_csv_file_path = '/root/ACE/pca_test.csv'

train_data = pd.read_csv(train_csv_file_path)
test_data = pd.read_csv(test_csv_file_path)

# Assume first column is labels, rest are features
train_labels = train_data.iloc[:, 0]
train_embeddings = train_data.iloc[:, 1:]

test_labels = test_data.iloc[:, 0]
test_embeddings = test_data.iloc[:, 1:]

# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Define PyTorchClassifier class
class PyTorchClassifier(BaseEstimator, ClassifierMixin):
    def __init__(self, model, device, unsqueeze=False):
        self.model = model.to(device)
        self.device = device
        self.unsqueeze = unsqueeze

    def fit(self, X, y):
        self.classes_ = np.unique(y)
        # Since the model is already trained, we don't need to do anything else
        return self

    def predict_proba(self, X):
        self.model.eval()
        with torch.no_grad():
            if isinstance(X, pd.DataFrame):
                X = X.values
            data_tensor = torch.tensor(X, dtype=torch.float32).to(self.device)
            if self.unsqueeze:
                data_tensor = data_tensor.unsqueeze(1)
            outputs = self.model(data_tensor)
            if outputs.size(1) == 1:
                probs = torch.sigmoid(outputs).cpu().numpy()
                # Ensure columns correspond to self.classes_
                probs = np.hstack([1 - probs, probs])
            else:
                probs = torch.softmax(outputs, dim=1).cpu().numpy()
        return probs

    def predict(self, X):
        probs = self.predict_proba(X)
        class_index = np.argmax(probs, axis=1)
        return self.classes_[class_index]

# Wrap the PyTorch models
mlp_estimator = PyTorchClassifier(mlp_model, device=device, unsqueeze=False)
cnn_estimator = PyTorchClassifier(cnn_model, device=device, unsqueeze=True)

# Fit the estimators (to set classes_)
mlp_estimator.fit(train_embeddings, train_labels)
cnn_estimator.fit(train_embeddings, train_labels)

# Collect predictions from base estimators
svm_train_pred = svm_model.predict_proba(train_embeddings)[:, 1]
svm_test_pred = svm_model.predict_proba(test_embeddings)[:, 1]

rl_train_pred = rl_model.predict_proba(train_embeddings)[:, 1]
rl_test_pred = rl_model.predict_proba(test_embeddings)[:, 1]

mlp_train_pred = mlp_estimator.predict_proba(train_embeddings)[:, 1]
mlp_test_pred = mlp_estimator.predict_proba(test_embeddings)[:, 1]

cnn_train_pred = cnn_estimator.predict_proba(train_embeddings)[:, 1]
cnn_test_pred = cnn_estimator.predict_proba(test_embeddings)[:, 1]

# Stack predictions as new features
stacked_train_features = np.vstack((
    svm_train_pred,
    rl_train_pred,
    mlp_train_pred,
    cnn_train_pred
)).T

stacked_test_features = np.vstack((
    svm_test_pred,
    rl_test_pred,
    mlp_test_pred,
    cnn_test_pred
)).T


C = 1e3
solver = 'saga'
penalty = 'l1'
tol = 1e-4
max_iter = 3000
    

# Train final estimator
final_estimator = LogisticRegression(C=C, solver=solver, penalty=penalty, tol=tol, max_iter=max_iter)
final_estimator.fit(stacked_train_features, train_labels)

# Evaluate on test set
stacked_pred = final_estimator.predict_proba(stacked_test_features)[:, 1]
stacked_binary_preds = (stacked_pred > 0.5).astype(int)

# Compute evaluation metrics
auc = roc_auc_score(test_labels, stacked_pred)
balanced_acc = balanced_accuracy_score(test_labels, stacked_binary_preds)
sensitivity = recall_score(test_labels, stacked_binary_preds)
specificity = recall_score(test_labels, stacked_binary_preds, pos_label=0)
mcc = matthews_corrcoef(test_labels, stacked_binary_preds)

# Print evaluation metrics
print(f'Balanced Accuracy (BAcc): {balanced_acc:.4f}, Sensitivity (Sn): {sensitivity:.4f}, '
      f'Specificity (Sp): {specificity:.4f}, MCC: {mcc:.4f}, AUROC: {auc:.4f}')


Balanced Accuracy (BAcc): 0.9070, Sensitivity (Sn): 0.9280, Specificity (Sp): 0.8861, MCC: 0.8141, AUROC: 0.9595


In [13]:
import numpy as np
import pandas as pd
import joblib
import torch
from torch.utils.data import Dataset, DataLoader
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import VotingClassifier
from sklearn.metrics import (
    confusion_matrix, matthews_corrcoef, roc_auc_score,
    balanced_accuracy_score, recall_score
)
from sklearn.base import BaseEstimator, ClassifierMixin
from itertools import combinations

# Set the random seed for reproducibility
def set_seed(seed):
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)
    import random
    random.seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_seed(42)

# Load saved models
svm_model = joblib.load('/root/ACE/model/best_svm_model.pkl')
rl_model = joblib.load('/root/ACE/model/best_logistic_regression_model.pkl')
mlp_model = torch.load('/root/ACE/model/best_mlp_model_entire.pth', map_location='cpu')
cnn_model = torch.load('/root/ACE/model/best_model_trial_cnn.pth', map_location='cpu')

# Read CSV files
train_csv_file_path = '/root/ACE/pca_train.csv'
test_csv_file_path = '/root/ACE/pca_test.csv'

train_data = pd.read_csv(train_csv_file_path)
test_data = pd.read_csv(test_csv_file_path)

# Assume first column is labels, rest are features
train_labels = train_data.iloc[:, 0]
train_embeddings = train_data.iloc[:, 1:]

test_labels = test_data.iloc[:, 0]
test_embeddings = test_data.iloc[:, 1:]

# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Define PyTorchClassifier class
class PyTorchClassifier(BaseEstimator, ClassifierMixin):
    def __init__(self, model, device, unsqueeze=False):
        self.model = model.to(device)
        self.device = device
        self.unsqueeze = unsqueeze

    def fit(self, X, y):
        self.classes_ = np.unique(y)
        # Since the model is already trained, we don't need to do anything else
        return self

    def predict_proba(self, X):
        self.model.eval()
        with torch.no_grad():
            if isinstance(X, pd.DataFrame):
                X = X.values
            data_tensor = torch.tensor(X, dtype=torch.float32).to(self.device)
            if self.unsqueeze:
                data_tensor = data_tensor.unsqueeze(1)
            outputs = self.model(data_tensor)
            if outputs.size(1) == 1:
                probs = torch.sigmoid(outputs).cpu().numpy()
                probs = np.hstack([1 - probs, probs])
            else:
                probs = torch.softmax(outputs, dim=1).cpu().numpy()
        return probs

    def predict(self, X):
        probs = self.predict_proba(X)
        class_index = np.argmax(probs, axis=1)
        return self.classes_[class_index]

# Wrap the PyTorch models
mlp_estimator = PyTorchClassifier(mlp_model, device=device, unsqueeze=False)
cnn_estimator = PyTorchClassifier(cnn_model, device=device, unsqueeze=True)

# Fit the estimators (to set classes_)
mlp_estimator.fit(train_embeddings, train_labels)
cnn_estimator.fit(train_embeddings, train_labels)

# Define all estimators
all_estimators = [('svm', svm_model), ('rl', rl_model), ('mlp', mlp_estimator), ('cnn', cnn_estimator)]

# Create combinations of three estimators for voting
voting_models = []
for combo in combinations(all_estimators, 3):
    voting_clf = VotingClassifier(estimators=list(combo), voting='soft')
    voting_clf.fit(train_embeddings, train_labels)
    voting_models.append(voting_clf)

# Collect predictions from each voting model for stacking
train_preds = []
test_preds = []

for voting_clf in voting_models:
    train_pred = voting_clf.predict_proba(train_embeddings)[:, 1]
    test_pred = voting_clf.predict_proba(test_embeddings)[:, 1]
    train_preds.append(train_pred)
    test_preds.append(test_pred)

# Stack the voting model predictions as new features
stacked_train_features = np.vstack(train_preds).T
stacked_test_features = np.vstack(test_preds).T

# Train final estimator using the stacked features
C = 1e3
solver = 'saga'
penalty = 'l1'
tol = 1e-4
max_iter = 3000

final_estimator = LogisticRegression(max_iter=max_iter)
final_estimator.fit(stacked_train_features, train_labels)

# Evaluate on test set
stacked_pred = final_estimator.predict_proba(stacked_test_features)[:, 1]
stacked_binary_preds = (stacked_pred > 0.5).astype(int)

# Compute evaluation metrics
auc = roc_auc_score(test_labels, stacked_pred)
balanced_acc = balanced_accuracy_score(test_labels, stacked_binary_preds)
sensitivity = recall_score(test_labels, stacked_binary_preds)
specificity = recall_score(test_labels, stacked_binary_preds, pos_label=0)
mcc = matthews_corrcoef(test_labels, stacked_binary_preds)

# Print evaluation metrics
print(f'Balanced Accuracy (BAcc): {balanced_acc:.4f}, Sensitivity (Sn): {sensitivity:.4f}, '
      f'Specificity (Sp): {specificity:.4f}, MCC: {mcc:.4f}, AUROC: {auc:.4f}')


Balanced Accuracy (BAcc): 0.9007, Sensitivity (Sn): 0.9280, Specificity (Sp): 0.8734, MCC: 0.8033, AUROC: 0.9600


In [14]:
import numpy as np
import pandas as pd
import joblib
import torch
from torch.utils.data import Dataset, DataLoader
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import VotingClassifier
from sklearn.metrics import (
    confusion_matrix, matthews_corrcoef, roc_auc_score,
    balanced_accuracy_score, recall_score
)
from sklearn.base import BaseEstimator, ClassifierMixin

# Set the random seed for reproducibility
def set_seed(seed):
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)
    import random
    random.seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_seed(42)

# Load saved models
svm_model = joblib.load('/root/ACE/model/best_svm_model.pkl')
rl_model = joblib.load('/root/ACE/model/best_logistic_regression_model.pkl')
mlp_model = torch.load('/root/ACE/model/best_mlp_model_entire.pth', map_location='cpu')
cnn_model = torch.load('/root/ACE/model/best_model_trial_cnn.pth', map_location='cpu')

# Read CSV files
train_csv_file_path = '/root/ACE/pca_train.csv'
test_csv_file_path = '/root/ACE/pca_test.csv'

train_data = pd.read_csv(train_csv_file_path)
test_data = pd.read_csv(test_csv_file_path)

# Assume first column is labels, rest are features
train_labels = train_data.iloc[:, 0]
train_embeddings = train_data.iloc[:, 1:]

test_labels = test_data.iloc[:, 0]
test_embeddings = test_data.iloc[:, 1:]

# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Define PyTorchClassifier class
class PyTorchClassifier(BaseEstimator, ClassifierMixin):
    def __init__(self, model, device, unsqueeze=False):
        self.model = model.to(device)
        self.device = device
        self.unsqueeze = unsqueeze

    def fit(self, X, y):
        self.classes_ = np.unique(y)
        return self

    def predict_proba(self, X):
        self.model.eval()
        with torch.no_grad():
            if isinstance(X, pd.DataFrame):
                X = X.values
            data_tensor = torch.tensor(X, dtype=torch.float32).to(self.device)
            if self.unsqueeze:
                data_tensor = data_tensor.unsqueeze(1)
            outputs = self.model(data_tensor)
            if outputs.size(1) == 1:
                probs = torch.sigmoid(outputs).cpu().numpy()
                probs = np.hstack([1 - probs, probs])
            else:
                probs = torch.softmax(outputs, dim=1).cpu().numpy()
        return probs

    def predict(self, X):
        probs = self.predict_proba(X)
        class_index = np.argmax(probs, axis=1)
        return self.classes_[class_index]

# Wrap the PyTorch models
mlp_estimator = PyTorchClassifier(mlp_model, device=device, unsqueeze=False)
cnn_estimator = PyTorchClassifier(cnn_model, device=device, unsqueeze=True)

# Fit the estimators to set classes_
mlp_estimator.fit(train_embeddings, train_labels)
cnn_estimator.fit(train_embeddings, train_labels)

# Define Voting for svm+cnn
voting1 = VotingClassifier(estimators=[
    ('svm', svm_model),
    ('cnn', cnn_estimator)
], voting='soft')

# Define Voting for mlp+rl
voting2 = VotingClassifier(estimators=[
    ('mlp', mlp_estimator),
    ('rl', rl_model)
], voting='soft')

# Fit Voting models
voting1.fit(train_embeddings, train_labels)
voting2.fit(train_embeddings, train_labels)

# Get predictions from Voting classifiers
voting1_train_pred = voting1.predict_proba(train_embeddings)[:, 1]
voting1_test_pred = voting1.predict_proba(test_embeddings)[:, 1]

voting2_train_pred = voting2.predict_proba(train_embeddings)[:, 1]
voting2_test_pred = voting2.predict_proba(test_embeddings)[:, 1]

# Stack predictions as new features
stacked_train_features = np.vstack((voting1_train_pred, voting2_train_pred)).T
stacked_test_features = np.vstack((voting1_test_pred, voting2_test_pred)).T

# Define hyperparameters for final estimator
C = 1e3
solver = 'saga'
penalty = 'l1'
tol = 1e-4
max_iter = 3000

# Train final estimator (Logistic Regression)
final_estimator = LogisticRegression(C=C, solver=solver, penalty=penalty, tol=tol, max_iter=max_iter)
final_estimator.fit(stacked_train_features, train_labels)

# Evaluate on test set
stacked_pred = final_estimator.predict_proba(stacked_test_features)[:, 1]
stacked_binary_preds = (stacked_pred > 0.5).astype(int)

# Compute evaluation metrics
auc = roc_auc_score(test_labels, stacked_pred)
balanced_acc = balanced_accuracy_score(test_labels, stacked_binary_preds)
sensitivity = recall_score(test_labels, stacked_binary_preds)
specificity = recall_score(test_labels, stacked_binary_preds, pos_label=0)
mcc = matthews_corrcoef(test_labels, stacked_binary_preds)

# Print evaluation metrics
print(f'Balanced Accuracy (BAcc): {balanced_acc:.4f}, Sensitivity (Sn): {sensitivity:.4f}, '
      f'Specificity (Sp): {specificity:.4f}, MCC: {mcc:.4f}, AUROC: {auc:.4f}')


Balanced Accuracy (BAcc): 0.9110, Sensitivity (Sn): 0.9360, Specificity (Sp): 0.8861, MCC: 0.8240, AUROC: 0.9615
