In [4]:
from abc import ABC, abstractmethod
from typing import List, Optional, Tuple, Dict, Any, Union
import numpy as np
import pandas as pd
from rdkit import Chem
from rdkit.Chem import Descriptors, AllChem
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split, KFold
from sklearn.multioutput import MultiOutputClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import hamming_loss, roc_auc_score, f1_score, classification_report
from scipy import sparse
from pydantic import BaseModel, Field, validator
import warnings
warnings.filterwarnings('ignore')

# ==================== Pydantic Models ====================

class EvaluationMetrics(BaseModel):
    """评估指标数据模型"""
    hamming_loss: float = Field(..., description="Hamming损失")
    macro_auc: float = Field(..., description="Macro AUC")
    macro_f1: float = Field(..., description="Macro F1分数")
    per_class_auc: Dict[str, float] = Field(..., description="每个类别的AUC")
    per_class_f1: Dict[str, float] = Field(..., description="每个类别的F1分数")
    classification_report: Dict[str, Any] = Field(..., description="详细分类报告")

class CrossValidationResults(BaseModel):
    """交叉验证结果数据模型"""
    fold_metrics: List[EvaluationMetrics] = Field(..., description="每折的评估指标")
    mean_metrics: EvaluationMetrics = Field(..., description="平均评估指标")
    std_metrics: EvaluationMetrics = Field(..., description="评估指标标准差")

# ==================== Metrics Calculator ====================

class MetricsCalculator:
    """指标计算器"""
    
    @staticmethod
    def calculate_macro_auc(y_true: np.ndarray, y_pred_proba: List[np.ndarray], 
                           label_cols: List[str]) -> Tuple[float, Dict[str, float]]:
        """
        计算macro AUC和每个类别的AUC
        """
        n_classes = y_true.shape[1]
        per_class_auc = {}
        auc_scores = []
        
        for i in range(n_classes):
            try:
                y_true_i = y_true[:, i]
                
                # 处理多输出分类器的概率结构
                if isinstance(y_pred_proba, list) and len(y_pred_proba) == n_classes:
                    y_pred_proba_i = y_pred_proba[i][:, 1] if y_pred_proba[i].shape[1] > 1 else y_pred_proba[i][:, 0]
                else:
                    y_pred_proba_i = y_pred_proba[:, i]
                
                # 检查是否有足够的正负样本
                if len(np.unique(y_true_i)) < 2:
                    print(f"警告: 类别 {label_cols[i]} 只有一个类别，跳过AUC计算")
                    auc_score = 0.5
                else:
                    auc_score = roc_auc_score(y_true_i, y_pred_proba_i)
                
                per_class_auc[label_cols[i]] = auc_score
                auc_scores.append(auc_score)
                
            except Exception as e:
                print(f"计算类别 {label_cols[i]} 的AUC时出错: {str(e)}")
                per_class_auc[label_cols[i]] = 0.5
                auc_scores.append(0.5)
        
        macro_auc = np.mean(auc_scores) if auc_scores else 0.5
        return macro_auc, per_class_auc
    
    @staticmethod
    def calculate_macro_f1(y_true: np.ndarray, y_pred: np.ndarray, 
                          label_cols: List[str]) -> Tuple[float, Dict[str, float]]:
        """
        计算macro F1和每个类别的F1
        """
        n_classes = y_true.shape[1]
        per_class_f1 = {}
        f1_scores = []
        
        for i in range(n_classes):
            try:
                f1 = f1_score(y_true[:, i], y_pred[:, i], zero_division=0)
                per_class_f1[label_cols[i]] = f1
                f1_scores.append(f1)
            except Exception as e:
                print(f"计算类别 {label_cols[i]} 的F1时出错: {str(e)}")
                per_class_f1[label_cols[i]] = 0.0
                f1_scores.append(0.0)
        
        macro_f1 = np.mean(f1_scores) if f1_scores else 0.0
        return macro_f1, per_class_f1
    
    @staticmethod
    def calculate_all_metrics(y_true: np.ndarray, y_pred: np.ndarray, 
                             y_pred_proba: List[np.ndarray], label_cols: List[str]) -> EvaluationMetrics:
        """计算所有评估指标"""
        # Hamming损失
        h_loss = hamming_loss(y_true, y_pred)
        
        # AUC指标
        macro_auc, per_class_auc = MetricsCalculator.calculate_macro_auc(y_true, y_pred_proba, label_cols)
        
        # F1指标
        macro_f1, per_class_f1 = MetricsCalculator.calculate_macro_f1(y_true, y_pred, label_cols)
        
        # 分类报告
        try:
            cls_report = classification_report(
                y_true, y_pred, 
                target_names=label_cols, 
                output_dict=True,
                zero_division=0
            )
        except Exception as e:
            print(f"生成分类报告时出错: {str(e)}")
            cls_report = {}
        
        return EvaluationMetrics(
            hamming_loss=h_loss,
            macro_auc=macro_auc,
            macro_f1=macro_f1,
            per_class_auc=per_class_auc,
            per_class_f1=per_class_f1,
            classification_report=cls_report
        )
    
    @staticmethod
    def aggregate_cv_metrics(cv_metrics: List[EvaluationMetrics]) -> CrossValidationResults:
        """聚合交叉验证结果"""
        # 计算平均值
        mean_metrics = EvaluationMetrics(
            hamming_loss=np.mean([m.hamming_loss for m in cv_metrics]),
            macro_auc=np.mean([m.macro_auc for m in cv_metrics]),
            macro_f1=np.mean([m.macro_f1 for m in cv_metrics]),
            per_class_auc={},
            per_class_f1={},
            classification_report={}
        )
        
        # 计算标准差
        std_metrics = EvaluationMetrics(
            hamming_loss=np.std([m.hamming_loss for m in cv_metrics]),
            macro_auc=np.std([m.macro_auc for m in cv_metrics]),
            macro_f1=np.std([m.macro_f1 for m in cv_metrics]),
            per_class_auc={},
            per_class_f1={},
            classification_report={}
        )
        
        # 聚合每个类别的指标
        if cv_metrics and cv_metrics[0].per_class_auc:
            for class_name in cv_metrics[0].per_class_auc.keys():
                auc_values = [m.per_class_auc.get(class_name, 0.5) for m in cv_metrics]
                f1_values = [m.per_class_f1.get(class_name, 0.0) for m in cv_metrics]
                
                mean_metrics.per_class_auc[class_name] = np.mean(auc_values)
                mean_metrics.per_class_f1[class_name] = np.mean(f1_values)
                std_metrics.per_class_auc[class_name] = np.std(auc_values)
                std_metrics.per_class_f1[class_name] = np.std(f1_values)
        
        return CrossValidationResults(
            fold_metrics=cv_metrics,
            mean_metrics=mean_metrics,
            std_metrics=std_metrics
        )

# ==================== Odor Classifier (完整版本) ====================

class OdorClassifier:
    """气味分类器"""
    
    def __init__(self, n_estimators: int = 100, random_state: int = 42, 
                 test_size: float = 0.2, n_jobs: int = -1):
        self.config = {
            'n_estimators': n_estimators,
            'random_state': random_state,
            'test_size': test_size,
            'n_jobs': n_jobs
        }
        self.scaler = StandardScaler()
        self.model = MultiOutputClassifier(
            RandomForestClassifier(
                n_estimators=n_estimators,
                random_state=random_state,
                n_jobs=n_jobs
            )
        )
        self.label_cols: Optional[List[str]] = None
        self.metrics_calculator = MetricsCalculator()
    
    def extract_features(self, smiles_list: List[str]) -> Tuple[np.ndarray, List[int], List[str]]:
        """提取分子特征"""
        all_features = []
        valid_indices = []
        valid_smiles = []
        
        for i, smiles in enumerate(smiles_list):
            try:
                mol = Chem.MolFromSmiles(smiles)
                if mol is None:
                    continue
                    
                # Morgan指纹
                fp = AllChem.GetMorganFingerprintAsBitVect(mol, radius=2, nBits=2048)
                fp_array = np.array(fp)
                
                # 分子描述符
                descriptors = {
                    'MolWt': Descriptors.MolWt(mol),
                    'MolLogP': Descriptors.MolLogP(mol),
                    'NumHDonors': Descriptors.NumHDonors(mol),
                    'NumHAcceptors': Descriptors.NumHAcceptors(mol),
                    'TPSA': Descriptors.TPSA(mol),
                    'NumRotatableBonds': Descriptors.NumRotatableBonds(mol),
                }
                
                desc_values = np.array(list(descriptors.values()))
                combined_features = np.concatenate([fp_array, desc_values])
                
                all_features.append(combined_features)
                valid_indices.append(i)
                valid_smiles.append(smiles)
                
            except Exception as e:
                print(f"处理SMILES {smiles} 时出错: {str(e)}")
                continue
        
        return np.array(all_features), valid_indices, valid_smiles
    
    def prepare_data(self, df: pd.DataFrame, smiles_col: str = 'smiles') -> Tuple[np.ndarray, np.ndarray, List[str]]:
        """准备训练数据"""
        print("准备数据...")
        smiles_list = df[smiles_col].tolist()
        
        # 提取特征
        features, indices, valid_smiles = self.extract_features(smiles_list)
        
        # 获取标签
        valid_df = df.iloc[indices].copy()
        self.label_cols = [col for col in df.columns 
                          if col != smiles_col and pd.api.types.is_numeric_dtype(df[col])]
        
        labels = valid_df[self.label_cols].values
        
        # 处理NaN值
        mask = ~np.isnan(features).any(axis=1)
        features = features[mask]
        labels = labels[mask]
        
        print(f"最终数据集: {features.shape[0]} 个样本, {features.shape[1]} 个特征, {labels.shape[1]} 个标签")
        return features, labels, self.label_cols
    
    def train(self, features: np.ndarray, labels: np.ndarray) -> Dict[str, Any]:
        """训练模型"""
        print("训练模型...")
        
        # 标准化
        scaled_features = self.scaler.fit_transform(features)
        
        # 划分数据集
        X_train, X_test, y_train, y_test = train_test_split(
            scaled_features, labels, 
            test_size=self.config['test_size'], 
            random_state=self.config['random_state']
        )
        
        # 训练
        self.model.fit(X_train, y_train)
        
        # 预测
        y_pred = self.model.predict(X_test)
        y_pred_proba = self.model.predict_proba(X_test)
        
        # 计算指标
        metrics = self.metrics_calculator.calculate_all_metrics(y_test, y_pred, y_pred_proba, self.label_cols)
        
        print("模型训练完成！")
        print(f"Hamming Loss: {metrics.hamming_loss:.4f}")
        print(f"Macro AUC: {metrics.macro_auc:.4f}")
        print(f"Macro F1: {metrics.macro_f1:.4f}")
        
        return {
            'X_test': X_test,
            'y_test': y_test,
            'y_pred': y_pred,
            'y_pred_proba': y_pred_proba,
            'metrics': metrics
        }
    
    def cross_validate(self, features: np.ndarray, labels: np.ndarray, 
                      n_splits: int = 5) -> CrossValidationResults:
        """交叉验证"""
        print(f"开始 {n_splits} 折交叉验证...")
        
        kf = KFold(n_splits=n_splits, shuffle=True, random_state=self.config['random_state'])
        cv_metrics = []
        
        for fold, (train_idx, test_idx) in enumerate(kf.split(features)):
            print(f"第 {fold + 1}/{n_splits} 折...")
            
            X_train, X_test = features[train_idx], features[test_idx]
            y_train, y_test = labels[train_idx], labels[test_idx]
            
            # 标准化
            X_train_scaled = self.scaler.fit_transform(X_train)
            X_test_scaled = self.scaler.transform(X_test)
            
            # 训练和预测
            model = MultiOutputClassifier(
                RandomForestClassifier(
                    n_estimators=self.config['n_estimators'],
                    random_state=self.config['random_state'],
                    n_jobs=self.config['n_jobs']
                )
            )
            model.fit(X_train_scaled, y_train)
            
            y_pred = model.predict(X_test_scaled)
            y_pred_proba = model.predict_proba(X_test_scaled)
            
            # 计算指标
            metrics = self.metrics_calculator.calculate_all_metrics(y_test, y_pred, y_pred_proba, self.label_cols)
            cv_metrics.append(metrics)
        
        # 聚合结果
        cv_results = self.metrics_calculator.aggregate_cv_metrics(cv_metrics)
        
        print("交叉验证完成！")
        print(f"平均 Macro AUC: {cv_results.mean_metrics.macro_auc:.4f} ± {cv_results.std_metrics.macro_auc:.4f}")
        print(f"平均 Macro F1: {cv_results.mean_metrics.macro_f1:.4f} ± {cv_results.std_metrics.macro_f1:.4f}")
        
        return cv_results

# ==================== 使用示例 ====================

def main():
    """使用示例"""
    # 创建分类器
    classifier = OdorClassifier(n_estimators=100, random_state=42)
    
    # 示例数据（替换为您的实际数据）
#     example_data = {
#         'smiles': ['CCO', 'CC(=O)O', 'C1=CC=CC=C1', 'CCCC(=O)O'] * 10,
#         'fruity': [1, 0, 0, 0] * 10,
#         'floral': [0, 0, 1, 0] * 10,
#         'musky': [0, 0, 0, 1] * 10,
#     }
#     df = pd.DataFrame(example_data)
    df = pd.read_csv('curated_GS_LF_merged_4983.csv')
    
    try:
        # 准备数据
        features, labels, label_cols = classifier.prepare_data(df, 'nonStereoSMILES')
        
        # 训练模型
        results = classifier.train(features, labels)
        
        # 交叉验证
        cv_results = classifier.cross_validate(features, labels, n_splits=3)
        
        # 打印详细结果
        print("\n=== 详细评估结果 ===")
        print(f"Macro AUC: {results['metrics'].macro_auc:.4f}")
        print(f"Macro F1: {results['metrics'].macro_f1:.4f}")
        print(f"Hamming Loss: {results['metrics'].hamming_loss:.4f}")
        
        print("\n=== 交叉验证结果 ===")
        print(f"平均 Macro AUC: {cv_results.mean_metrics.macro_auc:.4f} ± {cv_results.std_metrics.macro_auc:.4f}")
        print(f"平均 Macro F1: {cv_results.mean_metrics.macro_f1:.4f} ± {cv_results.std_metrics.macro_f1:.4f}")
        
    except Exception as e:
        print(f"运行出错: {str(e)}")
        import traceback
        traceback.print_exc()

if __name__ == "__main__":
    main()

准备数据...






















































最终数据集: 4983 个样本, 2054 个特征, 138 个标签
训练模型...
模型训练完成！
Hamming Loss: 0.0320
Macro AUC: 0.8136
Macro F1: 0.1654
开始 3 折交叉验证...
第 1/3 折...
第 2/3 折...
第 3/3 折...
交叉验证完成！
平均 Macro AUC: 0.8027 ± 0.0043
平均 Macro F1: 0.1605 ± 0.0016

=== 详细评估结果 ===
Macro AUC: 0.8136
Macro F1: 0.1654
Hamming Loss: 0.0320

=== 交叉验证结果 ===
平均 Macro AUC: 0.8027 ± 0.0043
平均 Macro F1: 0.1605 ± 0.0016
