# 环状RNA组织表达预测 - 模型训练与比较

本notebook用于训练和比较多种机器学习模型，包括：
1. 数据加载
2. 基线模型训练
3. 高级模型训练（随机森林、XGBoost、LightGBM、CatBoost）
4. 模型性能比较
5. 超参数优化
6. 模型集成

In [None]:
# 导入必要的库
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
import pickle
import time
from datetime import datetime

# 机器学习库
from sklearn.ensemble import RandomForestClassifier, ExtraTreesClassifier, VotingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.tree import DecisionTreeClassifier

# 梯度提升库
import xgboost as xgb
import lightgbm as lgb
try:
    import catboost as cb
    CATBOOST_AVAILABLE = True
except ImportError:
    print("CatBoost not available, will skip CatBoost models")
    CATBOOST_AVAILABLE = False

# 模型评估
from sklearn.model_selection import cross_val_score, StratifiedKFold, GridSearchCV, RandomizedSearchCV
from sklearn.metrics import classification_report, confusion_matrix, f1_score, accuracy_score
from sklearn.metrics import precision_recall_fscore_support

# 设置显示选项
pd.set_option('display.max_columns', None)
warnings.filterwarnings('ignore')

# 设置随机种子
RANDOM_STATE = 42
np.random.seed(RANDOM_STATE)

# 设置图形样式
plt.style.use('seaborn-v0_8')
sns.set_palette('husl')

## 1. 数据加载

In [None]:
# 加载处理后的数据
with open('processed_data.pkl', 'rb') as f:
    processed_data = pickle.load(f)

# 提取数据
X_train = processed_data['X_train']
X_test = processed_data['X_test']
y_train = processed_data['y_train']
X_train_split = processed_data['X_train_split']
X_val_split = processed_data['X_val_split']
y_train_split = processed_data['y_train_split']
y_val_split = processed_data['y_val_split']
feature_names = processed_data['feature_names']
feature_importance_df = processed_data['feature_importance']
test_ids = processed_data['test_ids']

print(f"数据加载完成:")
print(f"训练集: {X_train.shape}")
print(f"测试集: {X_test.shape}")
print(f"训练分割: {X_train_split.shape}")
print(f"验证分割: {X_val_split.shape}")
print(f"特征数量: {len(feature_names)}")
print(f"类别数量: {len(np.unique(y_train))}")

# 类别分布
print(f"\n类别分布:")
class_counts = pd.Series(y_train).value_counts().sort_index()
for i, count in enumerate(class_counts):
    print(f"  类别 {i}: {count} 样本 ({count/len(y_train)*100:.1f}%)")

# 加载预处理器
with open('preprocessors.pkl', 'rb') as f:
    preprocessors = pickle.load(f)

target_encoder = preprocessors['encoders']['target']
print(f"\n目标类别: {target_encoder.classes_}")

## 2. 模型定义和配置

In [None]:
# 定义评估函数
def evaluate_model(model, X_train, y_train, X_val, y_val, model_name):
    """评估模型性能"""
    start_time = time.time()
    
    # 训练模型
    model.fit(X_train, y_train)
    
    # 预测
    y_pred_train = model.predict(X_train)
    y_pred_val = model.predict(X_val)
    
    # 计算指标
    train_accuracy = accuracy_score(y_train, y_pred_train)
    val_accuracy = accuracy_score(y_val, y_pred_val)
    
    train_f1_macro = f1_score(y_train, y_pred_train, average='macro')
    val_f1_macro = f1_score(y_val, y_pred_val, average='macro')
    
    train_f1_micro = f1_score(y_train, y_pred_train, average='micro')
    val_f1_micro = f1_score(y_val, y_pred_val, average='micro')
    
    train_f1_weighted = f1_score(y_train, y_pred_train, average='weighted')
    val_f1_weighted = f1_score(y_val, y_pred_val, average='weighted')
    
    training_time = time.time() - start_time
    
    results = {
        'model_name': model_name,
        'model': model,
        'train_accuracy': train_accuracy,
        'val_accuracy': val_accuracy,
        'train_f1_macro': train_f1_macro,
        'val_f1_macro': val_f1_macro,
        'train_f1_micro': train_f1_micro,
        'val_f1_micro': val_f1_micro,
        'train_f1_weighted': train_f1_weighted,
        'val_f1_weighted': val_f1_weighted,
        'training_time': training_time,
        'y_pred_val': y_pred_val
    }
    
    return results

# 交叉验证评估函数
def cross_validate_model(model, X, y, cv=5, scoring='f1_macro'):
    """使用交叉验证评估模型"""
    skf = StratifiedKFold(n_splits=cv, shuffle=True, random_state=RANDOM_STATE)
    scores = cross_val_score(model, X, y, cv=skf, scoring=scoring, n_jobs=-1)
    return scores

print("模型评估函数定义完成")

## 3. 基线模型训练

In [None]:
# 定义基线模型
baseline_models = {
    'Logistic Regression': LogisticRegression(random_state=RANDOM_STATE, max_iter=1000),
    'Decision Tree': DecisionTreeClassifier(random_state=RANDOM_STATE),
    'K-Nearest Neighbors': KNeighborsClassifier(n_neighbors=5),
    'Naive Bayes': GaussianNB(),
    'SVM (RBF)': SVC(random_state=RANDOM_STATE, probability=True)
}

# 训练和评估基线模型
baseline_results = []

print("训练基线模型...")
print("=" * 60)

for name, model in baseline_models.items():
    print(f"训练 {name}...")
    
    try:
        result = evaluate_model(model, X_train_split, y_train_split, 
                               X_val_split, y_val_split, name)
        baseline_results.append(result)
        
        print(f"  验证集准确率: {result['val_accuracy']:.4f}")
        print(f"  验证集Macro-F1: {result['val_f1_macro']:.4f}")
        print(f"  训练时间: {result['training_time']:.2f}秒")
        print()
        
    except Exception as e:
        print(f"  错误: {str(e)}")
        print()

print("基线模型训练完成")

## 4. 高级模型训练

In [None]:
# 定义高级模型
advanced_models = {
    'Random Forest': RandomForestClassifier(
        n_estimators=100,
        random_state=RANDOM_STATE,
        n_jobs=-1
    ),
    'Extra Trees': ExtraTreesClassifier(
        n_estimators=100,
        random_state=RANDOM_STATE,
        n_jobs=-1
    ),
    'XGBoost': xgb.XGBClassifier(
        n_estimators=100,
        random_state=RANDOM_STATE,
        eval_metric='mlogloss',
        verbosity=0
    ),
    'LightGBM': lgb.LGBMClassifier(
        n_estimators=100,
        random_state=RANDOM_STATE,
        verbosity=-1
    )
}

# 如果CatBoost可用，添加到模型列表
if CATBOOST_AVAILABLE:
    advanced_models['CatBoost'] = cb.CatBoostClassifier(
        iterations=100,
        random_state=RANDOM_STATE,
        verbose=False
    )

# 训练和评估高级模型
advanced_results = []

print("训练高级模型...")
print("=" * 60)

for name, model in advanced_models.items():
    print(f"训练 {name}...")
    
    try:
        result = evaluate_model(model, X_train_split, y_train_split, 
                               X_val_split, y_val_split, name)
        advanced_results.append(result)
        
        print(f"  验证集准确率: {result['val_accuracy']:.4f}")
        print(f"  验证集Macro-F1: {result['val_f1_macro']:.4f}")
        print(f"  训练时间: {result['training_time']:.2f}秒")
        print()
        
    except Exception as e:
        print(f"  错误: {str(e)}")
        print()

print("高级模型训练完成")

## 5. 模型性能比较

In [None]:
# 合并所有结果
all_results = baseline_results + advanced_results

# 创建结果DataFrame
results_df = pd.DataFrame([
    {
        'Model': result['model_name'],
        'Train_Accuracy': result['train_accuracy'],
        'Val_Accuracy': result['val_accuracy'],
        'Train_F1_Macro': result['train_f1_macro'],
        'Val_F1_Macro': result['val_f1_macro'],
        'Train_F1_Weighted': result['train_f1_weighted'],
        'Val_F1_Weighted': result['val_f1_weighted'],
        'Training_Time': result['training_time'],
        'Overfitting': result['train_f1_macro'] - result['val_f1_macro']
    }
    for result in all_results
])

# 按验证集Macro-F1排序
results_df = results_df.sort_values('Val_F1_Macro', ascending=False)

print("模型性能比较 (按验证集Macro-F1排序):")
print("=" * 100)
display(results_df.round(4))

# 找到最佳模型
best_model_name = results_df.iloc[0]['Model']
best_result = next(r for r in all_results if r['model_name'] == best_model_name)

print(f"\n最佳模型: {best_model_name}")
print(f"验证集Macro-F1: {best_result['val_f1_macro']:.4f}")
print(f"验证集准确率: {best_result['val_accuracy']:.4f}")

In [None]:
# 可视化模型性能比较
fig, axes = plt.subplots(2, 2, figsize=(15, 12))

# 1. Macro-F1分数比较
ax1 = axes[0, 0]
x_pos = np.arange(len(results_df))
ax1.bar(x_pos - 0.2, results_df['Train_F1_Macro'], 0.4, label='Train', alpha=0.8)
ax1.bar(x_pos + 0.2, results_df['Val_F1_Macro'], 0.4, label='Validation', alpha=0.8)
ax1.set_xlabel('Models')
ax1.set_ylabel('Macro-F1 Score')
ax1.set_title('Macro-F1 Score Comparison')
ax1.set_xticks(x_pos)
ax1.set_xticklabels(results_df['Model'], rotation=45, ha='right')
ax1.legend()
ax1.grid(True, alpha=0.3)

# 2. 准确率比较
ax2 = axes[0, 1]
ax2.bar(x_pos - 0.2, results_df['Train_Accuracy'], 0.4, label='Train', alpha=0.8)
ax2.bar(x_pos + 0.2, results_df['Val_Accuracy'], 0.4, label='Validation', alpha=0.8)
ax2.set_xlabel('Models')
ax2.set_ylabel('Accuracy')
ax2.set_title('Accuracy Comparison')
ax2.set_xticks(x_pos)
ax2.set_xticklabels(results_df['Model'], rotation=45, ha='right')
ax2.legend()
ax2.grid(True, alpha=0.3)

# 3. 训练时间比较
ax3 = axes[1, 0]
ax3.bar(x_pos, results_df['Training_Time'], alpha=0.8)
ax3.set_xlabel('Models')
ax3.set_ylabel('Training Time (seconds)')
ax3.set_title('Training Time Comparison')
ax3.set_xticks(x_pos)
ax3.set_xticklabels(results_df['Model'], rotation=45, ha='right')
ax3.grid(True, alpha=0.3)

# 4. 过拟合程度
ax4 = axes[1, 1]
colors = ['red' if x > 0.05 else 'green' for x in results_df['Overfitting']]
ax4.bar(x_pos, results_df['Overfitting'], alpha=0.8, color=colors)
ax4.set_xlabel('Models')
ax4.set_ylabel('Overfitting (Train F1 - Val F1)')
ax4.set_title('Overfitting Analysis')
ax4.set_xticks(x_pos)
ax4.set_xticklabels(results_df['Model'], rotation=45, ha='right')
ax4.axhline(y=0.05, color='red', linestyle='--', alpha=0.7, label='Overfitting Threshold')
ax4.legend()
ax4.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

## 6. 交叉验证评估

In [None]:
# 对表现最好的几个模型进行交叉验证
top_models = results_df.head(3)['Model'].tolist()

print("对Top 3模型进行5折交叉验证...")
print("=" * 60)

cv_results = {}

for model_name in top_models:
    # 找到对应的模型
    model_result = next(r for r in all_results if r['model_name'] == model_name)
    model = model_result['model']
    
    print(f"交叉验证 {model_name}...")
    
    # 进行交叉验证
    cv_scores = cross_validate_model(model, X_train, y_train, cv=5, scoring='f1_macro')
    
    cv_results[model_name] = {
        'scores': cv_scores,
        'mean': cv_scores.mean(),
        'std': cv_scores.std()
    }
    
    print(f"  CV Macro-F1: {cv_scores.mean():.4f} (+/- {cv_scores.std() * 2:.4f})")
    print(f"  各折分数: {[f'{score:.4f}' for score in cv_scores]}")
    print()

# 可视化交叉验证结果
plt.figure(figsize=(10, 6))

model_names = list(cv_results.keys())
means = [cv_results[name]['mean'] for name in model_names]
stds = [cv_results[name]['std'] for name in model_names]

x_pos = np.arange(len(model_names))
plt.bar(x_pos, means, yerr=stds, capsize=5, alpha=0.8)
plt.xlabel('Models')
plt.ylabel('Cross-Validation Macro-F1 Score')
plt.title('5-Fold Cross-Validation Results')
plt.xticks(x_pos, model_names)
plt.grid(True, alpha=0.3)

# 添加数值标签
for i, (mean, std) in enumerate(zip(means, stds)):
    plt.text(i, mean + std + 0.005, f'{mean:.4f}', ha='center', va='bottom')

plt.tight_layout()
plt.show()

# 确定最终最佳模型
best_cv_model = max(cv_results.keys(), key=lambda x: cv_results[x]['mean'])
print(f"\n交叉验证最佳模型: {best_cv_model}")
print(f"CV Macro-F1: {cv_results[best_cv_model]['mean']:.4f} (+/- {cv_results[best_cv_model]['std'] * 2:.4f})")

## 7. 超参数优化

In [None]:
# 对最佳模型进行超参数优化
print(f"对 {best_cv_model} 进行超参数优化...")
print("=" * 60)

# 定义超参数搜索空间
param_grids = {
    'Random Forest': {
        'n_estimators': [100, 200, 300],
        'max_depth': [10, 20, None],
        'min_samples_split': [2, 5, 10],
        'min_samples_leaf': [1, 2, 4],
        'max_features': ['sqrt', 'log2']
    },
    'XGBoost': {
        'n_estimators': [100, 200, 300],
        'max_depth': [3, 6, 9],
        'learning_rate': [0.01, 0.1, 0.2],
        'subsample': [0.8, 0.9, 1.0],
        'colsample_bytree': [0.8, 0.9, 1.0]
    },
    'LightGBM': {
        'n_estimators': [100, 200, 300],
        'max_depth': [3, 6, 9],
        'learning_rate': [0.01, 0.1, 0.2],
        'num_leaves': [31, 50, 100],
        'subsample': [0.8, 0.9, 1.0]
    }
}

if CATBOOST_AVAILABLE:
    param_grids['CatBoost'] = {
        'iterations': [100, 200, 300],
        'depth': [4, 6, 8],
        'learning_rate': [0.01, 0.1, 0.2],
        'l2_leaf_reg': [1, 3, 5]
    }

# 获取最佳模型的基础版本
if best_cv_model == 'Random Forest':
    base_model = RandomForestClassifier(random_state=RANDOM_STATE, n_jobs=-1)
elif best_cv_model == 'XGBoost':
    base_model = xgb.XGBClassifier(random_state=RANDOM_STATE, eval_metric='mlogloss', verbosity=0)
elif best_cv_model == 'LightGBM':
    base_model = lgb.LGBMClassifier(random_state=RANDOM_STATE, verbosity=-1)
elif best_cv_model == 'CatBoost' and CATBOOST_AVAILABLE:
    base_model = cb.CatBoostClassifier(random_state=RANDOM_STATE, verbose=False)
else:
    # 如果最佳模型不在优化列表中，使用Random Forest
    best_cv_model = 'Random Forest'
    base_model = RandomForestClassifier(random_state=RANDOM_STATE, n_jobs=-1)

# 使用RandomizedSearchCV进行超参数搜索
param_grid = param_grids[best_cv_model]

print(f"搜索空间大小: {np.prod([len(v) for v in param_grid.values()])} 种组合")
print(f"使用随机搜索，尝试 50 种组合...\n")

# 设置交叉验证
cv_strategy = StratifiedKFold(n_splits=3, shuffle=True, random_state=RANDOM_STATE)

# 执行随机搜索
random_search = RandomizedSearchCV(
    estimator=base_model,
    param_distributions=param_grid,
    n_iter=50,
    cv=cv_strategy,
    scoring='f1_macro',
    n_jobs=-1,
    random_state=RANDOM_STATE,
    verbose=1
)

start_time = time.time()
random_search.fit(X_train, y_train)
search_time = time.time() - start_time

print(f"\n超参数搜索完成，耗时: {search_time:.2f}秒")
print(f"最佳CV分数: {random_search.best_score_:.4f}")
print(f"最佳参数: {random_search.best_params_}")

# 获取优化后的模型
best_model = random_search.best_estimator_

# 在验证集上评估优化后的模型
optimized_result = evaluate_model(best_model, X_train_split, y_train_split, 
                                 X_val_split, y_val_split, f'{best_cv_model} (Optimized)')

print(f"\n优化后模型性能:")
print(f"验证集Macro-F1: {optimized_result['val_f1_macro']:.4f}")
print(f"验证集准确率: {optimized_result['val_accuracy']:.4f}")

## 8. 模型集成

In [None]:
# 创建集成模型
print("创建集成模型...")
print("=" * 60)

# 选择表现最好的几个模型进行集成
ensemble_models = []

# 添加优化后的最佳模型
ensemble_models.append((f'{best_cv_model}_optimized', best_model))

# 添加其他表现良好的模型
for result in all_results:
    if result['model_name'] != best_cv_model and result['val_f1_macro'] > 0.7:  # 阈值可调整
        ensemble_models.append((result['model_name'], result['model']))

print(f"集成模型包含 {len(ensemble_models)} 个基模型:")
for name, _ in ensemble_models:
    print(f"  - {name}")

if len(ensemble_models) >= 2:
    # 创建投票分类器
    voting_classifier = VotingClassifier(
        estimators=ensemble_models,
        voting='soft'  # 使用概率投票
    )
    
    # 评估集成模型
    ensemble_result = evaluate_model(voting_classifier, X_train_split, y_train_split, 
                                    X_val_split, y_val_split, 'Ensemble (Voting)')
    
    print(f"\n集成模型性能:")
    print(f"验证集Macro-F1: {ensemble_result['val_f1_macro']:.4f}")
    print(f"验证集准确率: {ensemble_result['val_accuracy']:.4f}")
    
    # 比较单模型和集成模型
    print(f"\n性能提升:")
    improvement = ensemble_result['val_f1_macro'] - optimized_result['val_f1_macro']
    print(f"Macro-F1提升: {improvement:.4f} ({improvement/optimized_result['val_f1_macro']*100:.2f}%)")
    
    # 选择最终模型
    if ensemble_result['val_f1_macro'] > optimized_result['val_f1_macro']:
        final_model = voting_classifier
        final_model_name = 'Ensemble (Voting)'
        final_score = ensemble_result['val_f1_macro']
        print(f"\n选择集成模型作为最终模型")
    else:
        final_model = best_model
        final_model_name = f'{best_cv_model} (Optimized)'
        final_score = optimized_result['val_f1_macro']
        print(f"\n选择优化后的单模型作为最终模型")
else:
    print("\n只有一个高性能模型，跳过集成")
    final_model = best_model
    final_model_name = f'{best_cv_model} (Optimized)'
    final_score = optimized_result['val_f1_macro']

print(f"\n最终模型: {final_model_name}")
print(f"最终验证集Macro-F1: {final_score:.4f}")

## 9. 保存模型和结果

In [None]:
# 在完整训练集上重新训练最终模型
print("在完整训练集上重新训练最终模型...")
final_model.fit(X_train, y_train)

# 保存最终模型
model_data = {
    'model': final_model,
    'model_name': final_model_name,
    'validation_score': final_score,
    'feature_names': feature_names,
    'target_encoder': target_encoder,
    'training_timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}

with open('final_model.pkl', 'wb') as f:
    pickle.dump(model_data, f)

# 保存训练结果
training_results = {
    'all_results': results_df,
    'cv_results': cv_results,
    'best_params': random_search.best_params_ if 'random_search' in locals() else None,
    'final_model_name': final_model_name,
    'final_score': final_score
}

with open('training_results.pkl', 'wb') as f:
    pickle.dump(training_results, f)

# 保存结果到CSV
results_df.to_csv('model_comparison.csv', index=False)

print("模型和结果已保存到:")
print("- final_model.pkl")
print("- training_results.pkl")
print("- model_comparison.csv")

## 10. 训练总结

In [None]:
# 训练总结报告
print("=" * 80)
print("                    模型训练总结报告")
print("=" * 80)

print(f"1. 数据概况:")
print(f"   - 训练样本数: {X_train.shape[0]}")
print(f"   - 特征数量: {X_train.shape[1]}")
print(f"   - 类别数量: {len(np.unique(y_train))}")
print(f"   - 目标类别: {target_encoder.classes_}")

print(f"2. 模型训练:")
print(f"   - 训练模型数量: {len(all_results)}")
print(f"   - 基线模型: {len(baseline_results)} 个")
print(f"   - 高级模型: {len(advanced_results)} 个")

print(f"3. 性能排行 (Top 5):")
for i, (_, row) in enumerate(results_df.head(5).iterrows(), 1):
    print(f"   {i}. {row['Model']:20s} - Macro-F1: {row['Val_F1_Macro']:.4f}")

print(f"4. 最终模型:")
print(f"   - 模型名称: {final_model_name}")
print(f"   - 验证集Macro-F1: {final_score:.4f}")
if 'random_search' in locals():
    print(f"   - 最佳参数: {random_search.best_params_}")

print(f"5. 下一步:")
print(f"   - 进行详细的模型评估和分析")
print(f"   - 生成测试集预测结果")
print(f"   - 创建提交文件")

print("=" * 80)