# GNNs

In [None]:
import os
import sys
import datetime
import logging
import numpy as np
import pandas as pd
import torch
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.manifold import TSNE
from sklearn.metrics import confusion_matrix, classification_report
from imblearn.over_sampling import SMOTE
from torch_geometric.data import Data

# 添加源代码路径
sys.path.append("src")

# 导入自定义模块
import gnns.data
import gnns.classifier
import gnns.metrics
from gnns.featurer import DataProcessor

# 数据路径设置
file_path = "data"  # dataset_path

In [None]:
import sys
sys.path.append("src")
import os
import argparse
import pandas as pd
import torch
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.manifold import TSNE
from sklearn.metrics import confusion_matrix, roc_curve, auc
from sklearn.preprocessing import StandardScaler
# 导入模块中的函数及类
from GNN.sample import (
    GeneGAT,
    create_full_connected_edge_index,
    create_correlation_edge_index,
    create_knn_edge_index,
    train_graph_classifier,
    extract_pooling_features,
    predict_with_loader,
    save_predictions
)
import gnns.data
from torch_geometric.data import Data, DataLoader

# 添加中文字体支持
plt.rcParams['font.sans-serif'] = ['SimHei']  # 用来正常显示中文标签
plt.rcParams['axes.unicode_minus'] = False  # 用来正常显示负号

# 设置计算设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
os.environ['CUDA_LAUNCH_BLOCKING'] = '1'

# 加载和预处理数据
def prepare_data():
    X, y, gene_names, sample_ids = gnns.data.load_data('data', method='lxs', read=True, use_deg=True)
    X, y, train_mask, val_mask, test_mask = gnns.data.get_split_data(X, y, oversample=False)
    X = StandardScaler().fit_transform(X)
    X_tensor = torch.from_numpy(X).float()
    y_tensor = torch.from_numpy(y).long()
    train_mask = torch.from_numpy(train_mask)
    val_mask = torch.from_numpy(val_mask)
    test_mask = torch.from_numpy(test_mask)

    X_train = X_tensor[train_mask]
    y_train = y_tensor[train_mask]
    X_val = X_tensor[val_mask]
    y_val = y_tensor[val_mask]
    X_test = X_tensor[test_mask]
    y_test = y_tensor[test_mask]

    print("数据集划分：")
    print(f"  训练样本数: {len(X_train)}")
    print(f"  验证样本数: {len(X_val)}")
    print(f"  测试样本数: {len(X_test)}")

    return X_train, y_train, X_val, y_val, X_test, y_test, sample_ids, test_mask

# 创建图结构
def create_graph(X_train, method='knn'):
    print(f"\n使用图构建方法: {method}")
    if method == 'full_connected':
        edge_index = create_full_connected_edge_index(X_train.shape[1]).to(device)
    elif method == 'correlation':
        edge_index = create_correlation_edge_index(X_train.cpu().numpy(), threshold=0.6).to(device)
    else:
        edge_index = create_knn_edge_index(X_train.cpu().numpy(), k=15, metric='correlation').to(device)

    print(f"创建的图结构: 边数={edge_index.shape[1]}")
    num_features = X_train.shape[1]
    if edge_index.numel() > 0:
        max_index = edge_index.max().item()
        if num_features <= max_index:
            print(f"警告: 特征数量 ({num_features}) <= 边索引中的最大值 ({max_index}). 调整索引...")
            mask = (edge_index[0] < num_features) & (edge_index[1] < num_features)
            edge_index = edge_index[:, mask]
            print(f"调整后的图结构: 边数={edge_index.shape[1]}")
    return edge_index

# 创建数据加载器
def create_data_objects(X, y, edge_index, batch_size=256):
    data_list = []
    for i in range(len(X)):
        x = X[i:i+1].T.float().to(device)
        data = Data(x=x, edge_index=edge_index, y=y[i:i+1].to(device))
        if data.edge_index.numel() > 0 and data.x.size(0) <= data.edge_index.max().item():
            print(f"警告: 样本 {i} 的节点数不足, 跳过.")
            continue
        data_list.append(data)
    loader = DataLoader(data_list, batch_size=batch_size, shuffle=False, num_workers=0)
    return loader


# 主函数
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='GeneGAT 训练与推理脚本')
    parser.add_argument('--load_model', type=str, default='models/k_fold/model_fold_0.pt', help='指定已保存模型的路径, 若提供则直接加载并跳过训练')
    # parser.add_argument('--load_model', type=str, default='', help='指定已保存模型的路径, 若提供则直接加载并跳过训练')
    parser.add_argument('--image_dir', type=str, default='images/single_run', help='保存图片的目录')
    args, unknown = parser.parse_known_args()

    os.makedirs(args.image_dir, exist_ok=True)

    # 数据准备
    X_train, y_train, X_val, y_val, X_test, y_test, sample_ids, test_mask = prepare_data()
    # 图构建
    edge_index = create_graph(X_train, method='full_connected')
    # 加载器创建
    train_loader = create_data_objects(X_train, y_train, edge_index, batch_size=32)
    val_loader   = create_data_objects(X_val,   y_val,   edge_index, batch_size=32)
    test_loader  = create_data_objects(X_test,  y_test,  edge_index, batch_size=32)

    # 模型实例化
    model = GeneGAT(
        input_dim=1, output_dim=2, hidden_dim=128, pooling_dim=8,
        num_heads=8, gat_layers=2, dropout=0.5,
        activation='swish', cluster_num=8, 
    ).to(device)

    # 加载或训练
    if args.load_model and os.path.exists(args.load_model):
        checkpoint = torch.load(args.load_model, map_location=device)
        model.load_state_dict(checkpoint['model_state_dict'])
        best_threshold = checkpoint.get('threshold', 0.5)
        print(f"已加载模型: {args.load_model}, 阈值: {best_threshold}")
    else:
        print("\n开始训练模型...")
        model, best_threshold = train_graph_classifier(
            model=model, class_weights=[1,3],
            train_data=train_loader, val_data=val_loader,
            epochs=300, lr=1e-4, weight_decay=1e-4, patience=100
        )

    # 推理与评估
    y_pred, y_probs, features, metrics = predict_with_loader(model, test_loader, device, threshold=best_threshold)
    print(f"评估结果 -> Accuracy: {metrics['accuracy']:.4f}, F1: {metrics['f1_score']:.4f}, AUC: {metrics['auc']:.4f}")

    # 混淆矩阵与 ROC 曲线
    # 绘制混淆矩阵
    cm = confusion_matrix(y_test.cpu().numpy(), y_pred.cpu().numpy())
    plt.figure(figsize=(7, 5))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                annot_kws={'size': 12})
    plt.xlabel('预测标签')
    plt.ylabel('真实标签')
    plt.title('混淆矩阵')
    plt.savefig(os.path.join(args.image_dir, 'confusion_matrix.png'), 
                dpi=300, bbox_inches='tight')
    plt.close()

    # 绘制ROC曲线
    fpr, tpr, _ = roc_curve(y_test.cpu().numpy(), y_probs[:,1].cpu().numpy())
    roc_auc = auc(fpr, tpr)
    
    plt.figure(figsize=(7, 5))
    plt.plot(fpr, tpr, color='#1e88e5', lw=2, 
             label=f'ROC曲线 (AUC = {roc_auc:.2f})')
    plt.plot([0,1], [0,1], '--', color='#757575', lw=1.5)
    
    plt.xlabel('假阳性率')
    plt.ylabel('真阳性率')
    plt.title('ROC曲线')
    
    plt.legend(loc='lower right')
    
    plt.grid(True, linestyle='--', alpha=0.2)
    plt.savefig(os.path.join(args.image_dir, 'roc_curve.png'), 
                dpi=300, bbox_inches='tight')
    plt.close()

    # 生成测试集样本ID列表，确保与预测数量一致
    test_sample_ids = None
    if sample_ids is not None:
        sample_ids_array = np.array(sample_ids)
        test_mask_np = test_mask.numpy() if isinstance(test_mask, torch.Tensor) else np.array(test_mask)
        if len(test_mask_np) == len(sample_ids_array):
            test_sample_ids = sample_ids_array[test_mask_np].tolist()
    # 确保结果目录存在
    os.makedirs('results', exist_ok=True)
    pred_df = save_predictions(
        y_true=y_test.cpu(),
        y_pred=y_pred.cpu(),
        y_probs=y_probs.cpu(),
        sample_ids=test_sample_ids,
        features=features.cpu(),
        save_path=os.path.join('results', 'predictions.csv')
    )
    print("推理结果保存在 results/predictions.csv")



In [None]:
# 基因为节点 提取特征 简单版

print("\n提取池化后降维后的特征...")
all_features = {}
all_features['test'], pool_labels = extract_pooling_features(
    model=model,
    data_loader=test_loader,
    device=device  
)
all_features['train'], pool_labels = extract_pooling_features(
    model=model,
    data_loader=train_loader,
    device=device
)
all_features['val'], pool_labels = extract_pooling_features(
    model=model,
    data_loader=val_loader,
    device=device 
)
all_labels = {}
all_labels['test'] = y_test
all_labels['train'] = y_train
all_labels['val'] = y_val
print("训练集特征形状:", all_features['train'].shape)
print("训练集标签形状:", all_labels['train'].shape)
print("验证集特征形状:", all_features['val'].shape) 
print("验证集标签形状:", all_labels['val'].shape)
print("测试集特征形状:", all_features['test'].shape)
print("测试集标签形状:", all_labels['test'].shape)



# 分类


In [None]:
# xgboost
import numpy as np
import xgboost as xgb
from sklearn.model_selection import ParameterSampler
from sklearn.metrics import accuracy_score

def xgboost_train_or_search(
    X_train, y_train, X_val, y_val, X_test, y_test,
    user_params=None, param_dist=None,
    n_iter=20, random_state=42
):
    """
    简洁版：
    - 只用 user_params 时：直接训练并评估
    - 否则：在 param_dist 上随机搜索，并包含 user_params
    全程静默，使用验证集早停，只打印最终结果
    """
    # 基础配置
    base = {
        'objective': 'binary:logistic' if len(np.unique(y_train)) == 2 else 'multi:softprob',
        'use_label_encoder': False,
        'eval_metric': 'logloss',
        'verbosity': 0,
        'random_state': random_state
    }

    # 仅直接训练自定义参数
    if user_params and not param_dist:
        cfg = {**base, **user_params}
        model = xgb.XGBClassifier(**cfg)
        model.fit(
            X_train, y_train,
            eval_set=[(X_val, y_val)],
            verbose=False
        )
        acc = accuracy_score(y_test, model.predict(X_test))
        print("使用自定义参数：", cfg)
        print("测试集准确率：", acc)
        return model, cfg

    # 构建参数候选列表
    candidates = []
    if user_params:
        candidates.append(user_params)
    if param_dist:
        candidates += list(ParameterSampler(param_dist, n_iter=n_iter, random_state=random_state))

    best_score, best_cfg, best_model = 0, None, None
    # 随机搜索
    for params in candidates:
        cfg = {**base, **params}
        model = xgb.XGBClassifier(**cfg)
        model.fit(
            X_train, y_train,
            eval_set=[(X_val, y_val)],
            verbose=False
        )
        score = accuracy_score(y_val, model.predict(X_val))
        if score > best_score:
            best_score, best_cfg, best_model = score, cfg, model

    # 最终评估并打印
    test_acc = accuracy_score(y_test, best_model.predict(X_test))
    print("最优参数：", best_cfg)
    print("测试集准确率：", test_acc)
    return best_model, best_cfg


In [None]:
# XGboost
import numpy as np

# 定义超参数搜索空间
param_dist = {
}

user_params =   {'objective': 'binary:logistic', 'use_label_encoder': False, 'eval_metric': 'logloss', 'verbosity': 0, 'random_state': 42, 'subsample': 0.8, 'n_estimators': 200, 'max_depth': 5, 'learning_rate': 0.01, 'gamma': 0.1, 'colsample_bytree': 0.8}
# 打印参数列表，便于确认搜索空间
print("超参数列表 (param_dist):")
for k, v in param_dist.items():
    print(f"  {k}: {v}")

# XGBoost: 支持 raw/graph/hybrid 特征调用
for feature_type in ['raw', 'graph', 'hybrid']:
    if feature_type == 'raw':
        X_tr, y_tr = X_train, y_train
        X_va, y_va = X_test,  y_test
    elif feature_type == 'graph':
        X_tr, y_tr = all_features['train'], all_labels['train']
        X_va, y_va = all_features['test'],  all_labels['test']
    else:  # hybrid
        X_tr = np.hstack([X_train, all_features['train']])
        y_tr = y_train
        X_va = np.hstack([X_test,  all_features['test']])
        y_va = y_test

    print(f"Using {feature_type} features...")
    best_model, best_params = xgboost_train_or_search(
        X_tr, y_tr,
        X_va, y_va,
        X_va, y_va,
        user_params=user_params,
        param_dist=None,
        n_iter=30,
        random_state=42
    )





In [None]:
# RF
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import ParameterSampler
from sklearn.metrics import accuracy_score
import torch

def random_forest_train_or_search(
    X_train, y_train, X_val, y_val, X_test, y_test,
    user_params=None, param_dist=None,
    n_iter=20, random_state=42
):
    """
    随机森林训练/搜索简洁版：
    - 只给 user_params 时：直接训练并评估
    - 否则：在 param_dist 上随机采样，包括 user_params
    全程静默，只在末尾打印最优参数及准确率/F1。
    返回 (best_model, best_params)
    """
    # Tensor 转 NumPy
    def to_np(x):
        return x.detach().cpu().numpy() if isinstance(x, torch.Tensor) else x

    X_train, y_train = to_np(X_train), to_np(y_train)
    X_val,   y_val   = to_np(X_val),   to_np(y_val)
    X_test,  y_test  = to_np(X_test),  to_np(y_test)

    # 构建候选参数列表
    candidates = []
    if user_params:
        candidates.append(user_params)
    if param_dist:
        candidates += list(ParameterSampler(param_dist, n_iter=n_iter, random_state=random_state))

    # 如果仅提供自定义参数，忽略候选搜索
    if user_params and not param_dist:
        candidates = [user_params]

    best_score = 0.0
    best_cfg = None
    best_model = None

    # 遍历候选
    for cfg in candidates:
        model = RandomForestClassifier(random_state=random_state, n_jobs=-1, **cfg)
        model.fit(X_train, y_train)
        val_pred = model.predict(X_val)
        score = accuracy_score(y_val, val_pred)
        if score > best_score:
            best_score, best_cfg, best_model = score, cfg, model

    # 最终评估
    test_pred = best_model.predict(X_test)
    acc = accuracy_score(y_test, test_pred)

    print("最优随机森林参数：", best_cfg)
    print(f"测试集集准确率: {best_score:.4f}")

    return best_model, best_cfg


In [None]:
# RF
import numpy as np
# 1. 定义 RF 参数搜索空间
param_dist = {
}
user_params = {'n_estimators': 25, 'min_samples_split': 5, 'min_samples_leaf': 4, 'max_features': None, 'max_depth': 20}
print("随机森林参数调优范围：")
for k, v in param_dist.items():
    print(f"  {k}: {v}")

# 2. 针对 raw / graph / hybrid 三种特征类型调用
for feature_type in ['raw', 'graph', 'hybrid']:
    if feature_type == 'raw':
        X_tr, y_tr = X_train, y_train
        X_va, y_va = X_val,   y_val
        X_te, y_te = X_test,  y_test

    elif feature_type == 'graph':
        X_tr, y_tr = all_features['train'], all_labels['train']
        X_va, y_va = all_features['val'],   all_labels['val']
        X_te, y_te = all_features['test'],  all_labels['test']

    else:  # hybrid
        X_tr = np.hstack([X_train,       all_features['train']])
        y_tr = y_train
        X_va = np.hstack([X_val,         all_features['val']])
        y_va = y_val
        X_te = np.hstack([X_test,        all_features['test']])
        y_te = y_test

    print(f"\nUsing {feature_type} features for RF...")
    best_model, best_params = random_forest_train_or_search(
        X_tr, y_tr,
        X_va, y_va,
        X_te, y_te,
        user_params=user_params,
        param_dist=None,
        n_iter=30,
        random_state=42
    )
    print("-" * 50)


In [None]:
# SVM
from sklearn.svm import SVC
from sklearn.model_selection import ParameterSampler
from sklearn.metrics import accuracy_score
import torch
import numpy as np

def svm_train_or_search(
    X_train, y_train,
    X_val, y_val,
    X_test, y_test,
    user_params=None, param_dist=None,
    n_iter=20, random_state=42
):
    """
    SVM 训练/搜索简洁版：
    - 只给 user_params 时：直接训练并评估
    - 否则：在 param_dist 上随机采样，包括 user_params
    全程静默，只在末尾打印最优参数及准确率。
    返回 (best_model, best_params)
    """
    def to_np(x):
        return x.detach().cpu().numpy() if isinstance(x, torch.Tensor) else x

    X_train, y_train = to_np(X_train), to_np(y_train)
    X_val,   y_val   = to_np(X_val),   to_np(y_val)
    X_test,  y_test  = to_np(X_test),  to_np(y_test)

    # 构建候选参数列表
    candidates = []
    if user_params:
        candidates.append(user_params)
    if param_dist:
        candidates += list(ParameterSampler(param_dist, n_iter=n_iter, random_state=random_state))
    if user_params and not param_dist:
        candidates = [user_params]

    best_score = 0.0
    best_cfg = None
    best_model = None

    for cfg in candidates:
        model = SVC(random_state=random_state, **cfg)
        model.fit(X_train, y_train)
        val_pred = model.predict(X_val)
        score = accuracy_score(y_val, val_pred)
        if score > best_score:
            best_score, best_cfg, best_model = score, cfg, model

    test_pred = best_model.predict(X_test)
    test_acc = accuracy_score(y_test, test_pred)

    print("最优 SVM 参数：", best_cfg)
    print(f"测试集准确率: {test_acc:.4f}")

    return best_model, best_cfg



In [None]:
# 1. 定义 SVM 参数搜索空间
param_dist = {
}
user_params =  {'tol': 0.01, 'kernel': 'poly', 'gamma': 'scale', 'degree': 3, 'C': 100}


# 2. 针对 raw / graph / hybrid 三种特征类型调用
for feature_type in ['raw', 'graph', 'hybrid']:
    if feature_type == 'raw':
        X_tr, y_tr = X_train, y_train
        X_va, y_va = X_val,   y_val
        X_te, y_te = X_test,  y_test

    elif feature_type == 'graph':
        X_tr, y_tr = all_features['train'], all_labels['train']
        X_va, y_va = all_features['val'],   all_labels['val']
        X_te, y_te = all_features['test'],  all_labels['test']

    else:  # hybrid
        X_tr = np.hstack([X_train,       all_features['train']])
        y_tr = y_train
        X_va = np.hstack([X_val,         all_features['val']])
        y_va = y_val
        X_te = np.hstack([X_test,        all_features['test']])
        y_te = y_test

    print(f"\nUsing {feature_type} features for SVM...")
    best_model, best_params = svm_train_or_search(
        X_tr, y_tr,
        X_va, y_va,
        X_te, y_te,
        user_params=user_params,
        param_dist=None,
        n_iter=30,
        random_state=42
    )
    print("-" * 50)