# config配置管理

In [None]:

class Config:
    """配置类，管理所有参数和路径"""

    def __init__(self):
        # 数据路径
        self.data_path = "your_data_file.csv"
        self.output_dir = "output/"

        # 预处理参数
        self.missing_threshold = 0.5
        self.correlation_threshold = 0.8

        # 分析参数
        self.cluster_n = 3
        self.test_size = 0.2

        # 可视化参数
        self.figsize = (12, 8)
        self.colors = ['#3498db', '#e74c3c', '#2ecc71']

config = Config()

# dataloader数据加载

In [None]:
import pandas as pd
from pathlib import Path

class DataLoader:
    """数据加载器"""

    @staticmethod
    def load_data(file_path):
        """加载数据文件"""
        path = Path(file_path)

        if path.suffix == '.csv':
            return pd.read_csv(file_path)
        elif path.suffix in ['.xlsx', '.xls']:
            return pd.read_excel(file_path)
        else:
            raise ValueError(f"不支持的文件格式: {path.suffix}")

    @staticmethod
    def get_data_info(df):
        """获取数据基本信息"""
        info = {
            'shape': df.shape,
            'columns': list(df.columns),
            'dtypes': df.dtypes.to_dict(),
            'memory_usage': df.memory_usage(deep=True).sum()
        }
        return info

# preprocess数据预处理

In [None]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler

class DataPreprocessor:
    """数据预处理类"""

    def __init__(self, config):
        self.config = config
        self.scaler = StandardScaler()

    def handle_missing_values(self, df):
        """处理缺失值"""
        # 删除缺失值过多的列
        missing_ratio = df.isnull().sum() / len(df)
        cols_to_drop = missing_ratio[missing_ratio > self.config.missing_threshold].index
        df_clean = df.drop(columns=cols_to_drop)

        # 填充剩余缺失值
        numeric_cols = df_clean.select_dtypes(include=[np.number]).columns
        categorical_cols = df_clean.select_dtypes(include=['object']).columns

        df_clean[numeric_cols] = df_clean[numeric_cols].fillna(df_clean[numeric_cols].median())
        df_clean[categorical_cols] = df_clean[categorical_cols].fillna('Unknown')

        return df_clean

    def remove_highly_correlated_features(self, df):
        """移除高度相关的特征"""
        corr_matrix = df.corr().abs()
        upper_triangle = corr_matrix.where(
            np.triu(np.ones_like(corr_matrix, dtype=bool), k=1)
        )

        cols_to_drop = [
            column for column in upper_triangle.columns 
            if any(upper_triangle[column] > self.config.correlation_threshold)
        ]

        return df.drop(columns=cols_to_drop), cols_to_drop

    def scale_features(self, df):
        """标准化数值特征"""
        numeric_cols = df.select_dtypes(include=[np.number]).columns
        df_scaled = df.copy()
        df_scaled[numeric_cols] = self.scaler.fit_transform(df[numeric_cols])
        return df_scaled

# analysis数据分析

In [None]:

import pandas as pd
import numpy as np
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report

class DataAnalyzer:
    """数据分析器"""

    def __init__(self, config):
        self.config = config

    def perform_clustering(self, df, n_clusters=None):
        """执行聚类分析"""
        if n_clusters is None:
            n_clusters = self.config.cluster_n

        numeric_cols = df.select_dtypes(include=[np.number]).columns
        X = df[numeric_cols]

        kmeans = KMeans(n_clusters=n_clusters, random_state=42)
        clusters = kmeans.fit_predict(X)

        return clusters, kmeans

    def perform_pca(self, df, n_components=2):
        """执行PCA降维"""
        numeric_cols = df.select_dtypes(include=[np.number]).columns
        X = df[numeric_cols]

        pca = PCA(n_components=n_components)
        principal_components = pca.fit_transform(X)

        return principal_components, pca

    def train_classification_model(self, df, target_column):
        """训练分类模型"""
        X = df.drop(columns=[target_column])
        y = df[target_column]

        # 只保留数值列
        X = X.select_dtypes(include=[np.number])

        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=self.config.test_size, random_state=42
        )

        model = RandomForestClassifier(n_estimators=100, random_state=42)
        model.fit(X_train, y_train)

        y_pred = model.predict(X_test)
        report = classification_report(y_test, y_pred, output_dict=True)

        return model, report

# visual可视化

In [None]:

import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from pathlib import Path

class DataVisualizer:
    """数据可视化类"""

    def __init__(self, config):
        self.config = config
        self.setup_plot_style()

    def setup_plot_style(self):
        """设置绘图样式"""
        plt.style.use('default')
        sns.set_palette(self.config.colors)

    def plot_distributions(self, df, save_path=None):
        """绘制数值特征的分布图"""
        numeric_cols = df.select_dtypes(include=[np.number]).columns

        n_cols = 3
        n_rows = (len(numeric_cols) + n_cols - 1) // n_cols

        fig, axes = plt.subplots(n_rows, n_cols, figsize=self.config.figsize)
        axes = axes.flatten()

        for i, col in enumerate(numeric_cols):
            if i < len(axes):
                df[col].hist(bins=30, ax=axes[i])
                axes[i].set_title(f'Distribution of {col}')
                axes[i].set_xlabel(col)
                axes[i].set_ylabel('Frequency')

        # 隐藏多余的子图
        for i in range(len(numeric_cols), len(axes)):
            axes[i].set_visible(False)

        plt.tight_layout()

        if save_path:
            plt.savefig(save_path, dpi=300, bbox_inches='tight')
        plt.show()

    def plot_correlation_heatmap(self, df, save_path=None):
        """绘制相关性热力图"""
        numeric_cols = df.select_dtypes(include=[np.number]).columns

        plt.figure(figsize=self.config.figsize)
        corr_matrix = df[numeric_cols].corr()

        mask = np.triu(np.ones_like(corr_matrix, dtype=bool))
        sns.heatmap(corr_matrix, mask=mask, annot=True, cmap='coolwarm', 
                   center=0, square=True, fmt=".2f")
        plt.title('Feature Correlation Heatmap')

        if save_path:
            plt.savefig(save_path, dpi=300, bbox_inches='tight')
        plt.show()

    def plot_clustering_results(self, df, clusters, pca_components, save_path=None):
        """绘制聚类结果"""
        plt.figure(figsize=self.config.figsize)

        scatter = plt.scatter(pca_components[:, 0], pca_components[:, 1], 
                             c=clusters, cmap='viridis', alpha=0.7)
        plt.colorbar(scatter)
        plt.xlabel('Principal Component 1')
        plt.ylabel('Principal Component 2')
        plt.title('Clustering Results (PCA Visualization)')

        if save_path:
            plt.savefig(save_path, dpi=300, bbox_inches='tight')
        plt.show()

# utils工具函数

In [None]:
import pandas as pd
import numpy as np
from pathlib import Path
import json
import datetime

class Logger:
    """简单的日志记录器"""

    def __init__(self, log_file="analysis_log.txt"):
        self.log_file = log_file

    def log(self, message):
        """记录日志"""
        timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        log_message = f"[{timestamp}] {message}"

        print(log_message)

        with open(self.log_file, 'a', encoding='utf-8') as f:
            f.write(log_message + '\n')

def save_results(results, file_path):
    """保存分析结果"""
    path = Path(file_path)
    path.parent.mkdir(parents=True, exist_ok=True)

    # 转换numpy类型为Python原生类型
    def convert_types(obj):
        if isinstance(obj, (np.integer, np.floating)):
            return obj.item()
        elif isinstance(obj, np.ndarray):
            return obj.tolist()
        elif isinstance(obj, dict):
            return {k: convert_types(v) for k, v in obj.items()}
        elif isinstance(obj, list):
            return [convert_types(item) for item in obj]
        else:
            return obj

    results = convert_types(results)

    with open(file_path, 'w', encoding='utf-8') as f:
        json.dump(results, f, indent=2, ensure_ascii=False)

def create_output_directory(config):
    """创建输出目录"""
    output_dir = Path(config.output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)

    return output_dir

# main主程序

In [None]:
from config import config
from data_loader import DataLoader
from preprocessor import DataPreprocessor
from analyzer import DataAnalyzer
from visualizer import DataVisualizer
from utils import Logger, save_results, create_output_directory

def main():
    """主函数"""
    # 初始化组件
    logger = Logger()
    output_dir = create_output_directory(config)

    logger.log("开始数据分析流程")

    try:
        # 1. 加载数据
        logger.log("步骤1: 加载数据")
        df = DataLoader.load_data(config.data_path)
        data_info = DataLoader.get_data_info(df)
        logger.log(f"数据加载成功: {data_info['shape']}")

        # 2. 数据预处理
        logger.log("步骤2: 数据预处理")
        preprocessor = DataPreprocessor(config)

        # 处理缺失值
        df_clean = preprocessor.handle_missing_values(df)
        logger.log(f"缺失值处理完成，原始形状: {df.shape}, 清理后: {df_clean.shape}")

        # 移除高度相关特征
        df_final, dropped_cols = preprocessor.remove_highly_correlated_features(df_clean)
        logger.log(f"移除高度相关特征: {dropped_cols}")

        # 特征标准化
        df_scaled = preprocessor.scale_features(df_final)
        logger.log("特征标准化完成")

        # 3. 数据分析
        logger.log("步骤3: 数据分析")
        analyzer = DataAnalyzer(config)

        # 聚类分析
        clusters, kmeans_model = analyzer.perform_clustering(df_scaled)
        logger.log(f"聚类分析完成，找到 {len(set(clusters))} 个簇")

        # PCA降维
        pca_components, pca_model = analyzer.perform_pca(df_scaled)
        logger.log("PCA分析完成")

        # 4. 可视化
        logger.log("步骤4: 生成可视化")
        visualizer = DataVisualizer(config)

        # 绘制分布图
        visualizer.plot_distributions(
            df_final, 
            save_path=output_dir / "distributions.png"
        )

        # 绘制相关性热力图
        visualizer.plot_correlation_heatmap(
            df_final,
            save_path=output_dir / "correlation_heatmap.png"
        )

        # 绘制聚类结果
        visualizer.plot_clustering_results(
            df_scaled, clusters, pca_components,
            save_path=output_dir / "clustering_results.png"
        )

        # 5. 保存结果
        logger.log("步骤5: 保存结果")
        results = {
            'data_info': data_info,
            'preprocessing': {
                'original_shape': df.shape,
                'cleaned_shape': df_clean.shape,
                'final_shape': df_final.shape,
                'dropped_columns': dropped_cols
            },
            'clustering': {
                'n_clusters': len(set(clusters)),
                'cluster_sizes': pd.Series(clusters).value_counts().to_dict()
            },
            'pca': {
                'explained_variance_ratio': pca_model.explained_variance_ratio_.tolist()
            }
        }

        save_results(results, output_dir / "analysis_results.json")
        df_final.to_csv(output_dir / "processed_data.csv", index=False)

        logger.log("分析流程完成！")
        logger.log(f"结果保存在: {output_dir}")

    except Exception as e:
        logger.log(f"错误发生: {str(e)}")
        raise

if __name__ == "__main__":
    main()