In [6]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import accuracy_score
from sklearn.feature_selection import SelectKBest, mutual_info_classif, f_classif, chi2
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC

def preprocess_data(csv_file, target_column):
    # 读取CSV文件
    df = pd.read_csv(csv_file)

    # 提取特征和目标变量
    X = df.drop(columns=[target_column])
    y = df[target_column]

    # 填充或删除空值，这里使用均值填充数值特征的空值
    X.fillna(X.mean(), inplace=True)

    return X, y

def select_best_features(X, y, methods=['mutual_info', 'f_classif', 'chi2'], k_range=range(1, 11)):
    best_method = None
    best_score = 0

    for method in methods:
        for k in k_range:
            if method == 'mutual_info':
                selector = SelectKBest(score_func=mutual_info_classif, k=k)
            elif method == 'f_classif':
                selector = SelectKBest(score_func=f_classif, k=k)
            elif method == 'chi2':
                selector = SelectKBest(score_func=chi2, k=k)

            X_new = selector.fit_transform(X, y)

            # 分割数据集并评估性能
            X_train, X_test, y_train, y_test = train_test_split(X_new, y, test_size=0.2, random_state=42)

            # 测试多个不同的分类器
            classifiers = {
                "Random Forest": RandomForestClassifier(),
                "Gradient Boosting": GradientBoostingClassifier(),
                "SVC": SVC()
            }
            
            best_classifier = None
            best_classifier_score = 0

            for classifier_name, classifier in classifiers.items():
                scores = cross_val_score(classifier, X_train, y_train, cv=5)
                mean_score = np.mean(scores)

                if mean_score > best_classifier_score:
                    best_classifier_score = mean_score
                    best_classifier = classifier_name

            if best_classifier_score > best_score:
                best_score = best_classifier_score
                best_method = method
                best_features = X_new

    return best_method, best_features

# 使用示例
path = r'C:\Users\10634\Desktop\CMV_figure2a_learn_pos_neg_cutoff3.csv'
target_column = 'phenotype'
X, y = preprocess_data(path, target_column)
best_method, best_features = select_best_features(X, y)
print("最佳特征选择方法:", best_method)
print("最佳特征集合:", best_features)

最佳特征选择方法: f_classif
最佳特征集合: [[0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 ...
 [1. 0. 0. ... 0. 1. 0.]
 [0. 0. 0. ... 0. 0. 1.]
 [0. 1. 1. ... 0. 0. 1.]]


In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import accuracy_score, roc_auc_score
from sklearn.feature_selection import SelectKBest, mutual_info_classif, f_classif, chi2
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
import multiprocessing

def preprocess_data(csv_file, target_column):
    # 读取CSV文件
    df = pd.read_csv(csv_file)

    # 提取特征和目标变量
    X = df.drop(columns=[target_column])
    y = df[target_column]

    # 填充或删除空值，这里使用均值填充数值特征的空值
    X.fillna(X.mean(), inplace=True)

    return X, y

def select_best_features(X, y, method, k_range):
    best_score = 0
    best_features = None
    best_classifier_name = None
    best_feature_names = None
    best_num_features = 0

    for k in k_range:
        if method == 'mutual_info':
            selector = SelectKBest(score_func=mutual_info_classif, k=k)
        elif method == 'f_classif':
            selector = SelectKBest(score_func=f_classif, k=k)
        elif method == 'chi2':
            selector = SelectKBest(score_func=chi2, k=k)

        X_new = selector.fit_transform(X, y)

        # 获取选择后的特征名称
        mask = selector.get_support()  # 获取选择的特征掩码
        feature_names = X.columns[mask]  # 使用掩码获取特征名称

        # 分割数据集并评估性能
        X_train, X_test, y_train, y_test = train_test_split(X_new, y, test_size=0.2, random_state=42)

        # 测试多个不同的分类器
        classifiers = {
            "Random Forest": RandomForestClassifier(),
            "Gradient Boosting": GradientBoostingClassifier(),
            "SVC": SVC()
        }

        for classifier_name, classifier in classifiers.items():
            scores = cross_val_score(classifier, X_train, y_train, cv=5, scoring='accuracy')
            mean_accuracy = np.mean(scores)

            if mean_accuracy > best_score:
                best_score = mean_accuracy
                best_features = X_new
                best_classifier_name = classifier_name
                best_feature_names = feature_names
                best_num_features = k

    return method, best_features, best_classifier_name, best_feature_names, best_num_features

def parallel_feature_selection(csv_file, target_column, methods=['mutual_info', 'f_classif', 'chi2'], k_range=range(1, 11)):
    X, y = preprocess_data(csv_file, target_column)
    
    # 使用多核处理并行运行特征选择任务
    pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
    results = [pool.apply_async(select_best_features, args=(X, y, method, k_range)) for method in methods]
    
    best_result = None
    
    for result in results:
        method, features, classifier_name, feature_names, num_features = result.get()
        
        if best_result is None or num_features > best_result[4]:
            best_result = (method, features, classifier_name, feature_names, num_features)
    
    return best_result

# 使用示例
csv_file = r'C:\Users\10634\Desktop\CMV_figure2a_learn_pos_neg_cutoff3.csv'
target_column = 'phenotype'
best_method, best_features, best_classifier_name, best_feature_names, best_num_features = parallel_feature_selection(csv_file, target_column)
print("最佳特征选择方法:", best_method)
print("最佳特征集合:", best_features)
print("最佳分类器:", best_classifier_name)
print("最佳特征名称:", best_feature_names)
print("选择的特征数量:", best_num_features)