In [None]:
#%% 公共模块封装 cell
import pandas as pd
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.feature_selection import VarianceThreshold, SelectKBest, f_classif
from sklearn.model_selection import train_test_split
from imblearn.over_sampling import SMOTE
import joblib

class DataLoader:
    """统一数据加载器"""
    @staticmethod
    def load_meta(meta_path):
        return pd.read_csv(meta_path, sep='\t', dtype=str)
    
    @staticmethod
    def load_expr(expr_path):
        return pd.read_csv(expr_path, sep='\t', comment='!', 
                         index_col=0, encoding='utf-8').T.reset_index().rename(columns={'index': 'sample_title'})
    
    @classmethod
    def merge_data(cls, meta_path, expr_path):
        meta = cls.load_meta(meta_path)
        expr = cls.load_expr(expr_path)
        groups = meta[['geo_accession', 'samplegroup:ch1']].rename(
            columns={'geo_accession': 'sample_title', 'samplegroup:ch1': 'group'}
        )
        return pd.merge(expr, groups, on='sample_title', how='inner')

class Preprocessor(TransformerMixin):
    """可配置的预处理管道"""
    def __init__(self, 
                 variance_thresh=0.1*(1-0.1),
                 k_features=500,
                 use_pca=False,
                 pca_variance=0.95):
        self.pipeline = Pipeline([
            ('imputer', SimpleImputer(strategy='median')),
            ('variance_filter', VarianceThreshold(threshold=variance_thresh)),
            ('selector', SelectKBest(f_classif, k=k_features)),
            ('scaler', StandardScaler())
        ])
        if use_pca:
            self.pipeline.steps.append(('pca', PCA(n_components=pca_variance)))
            
    def fit_transform(self, X, y):
        return self.pipeline.fit_transform(X, y)
    
class ModelEvaluator:
    """统一模型评估器"""
    @staticmethod
    def evaluate(model, X_train, y_train, X_test, y_test):
        y_train_pred = model.predict(X_train)
        y_test_pred = model.predict(X_test)
        
        metrics = {
            'Train': {
                'Accuracy': accuracy_score(y_train, y_train_pred),
                'Precision': precision_score(y_train, y_train_pred, average='weighted'),
                'Recall': recall_score(y_train, y_train_pred, average='weighted'),
                'F1': f1_score(y_train, y_train_pred, average='weighted')
            },
            'Test': {
                'Accuracy': accuracy_score(y_test, y_test_pred),
                'Precision': precision_score(y_test, y_test_pred, average='weighted'),
                'Recall': recall_score(y_test, y_test_pred, average='weighted'),
                'F1': f1_score(y_test, y_test_pred, average='weighted')
            }
        }
        return pd.DataFrame(metrics)
    
    @staticmethod
    def save_report(model, metrics_df, importance_df, model_name):
        metrics_df.to_csv(f'model_metrics_{model_name}.csv', index=True)
        importance_df.to_csv(f'feature_importance_{model_name}.csv', index=False)
        joblib.dump(model, f'{model_name}_model.pkl')

In [None]:
#%% 算法专用cell示例：随机森林
# === 初始化部分 ===
merged = DataLoader.merge_data(
    meta_path="./Data/GSE235508.meta.txt",
    expr_path="./Data/GSE235508_mRNA_counts.txt"
)

X = merged.drop(['sample_title', 'group'], axis=1).astype(float)
y = LabelEncoder().fit_transform(merged['group'])

# === 预处理 ===
preprocessor = Preprocessor(k_features=500, use_pca=True)
X_preprocessed = preprocessor.fit_transform(X, y)

# === 数据划分与平衡 ===
X_train, X_test, y_train, y_test = train_test_split(
    X_preprocessed, y, test_size=0.2, stratify=y, random_state=42
)
X_train_res, y_train_res = SMOTE(k_neighbors=5).fit_resample(X_train, y_train)

# === 模型构建与调参 ===
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV

rf_pipeline = Pipeline([
    ('rf', RandomForestClassifier(max_depth=10, min_samples_leaf=5))
])

param_grid = {
    'rf__n_estimators': [50, 100, 200],
    'rf__max_depth': [None, 10, 20],
    'rf__min_samples_split': [2, 5]
}

grid_search = GridSearchCV(rf_pipeline, param_grid, scoring='f1_weighted', cv=5, n_jobs=-1)
grid_search.fit(X_train_res, y_train_res)

# === 评估与保存 ===
best_model = grid_search.best_estimator_
metrics_df = ModelEvaluator.evaluate(best_model, X_train_res, y_train_res, X_test, y_test)
importance_df = pd.DataFrame({
    'gene': X.columns[preprocessor.pipeline.named_steps['selector'].get_support()],
    'importance': best_model.named_steps['rf'].feature_importances_
}).sort_values('importance', ascending=False)

ModelEvaluator.save_report(best_model, metrics_df, importance_df, 'random_forest')
