# 1.导入相关依赖和配置

In [1]:
# 代码块1：导入依赖和配置
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import sqlite3
from sqlite3 import Error
import logging
import os
import json
import warnings
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Dict, List, Any, Optional
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots

# 设置中文字体和样式
plt.rcParams['font.sans-serif'] = ['DejaVu Sans', 'Arial', 'Helvetica']
plt.rcParams['axes.unicode_minus'] = False
sns.set_style("whitegrid")
warnings.filterwarnings('ignore')

# 配置日志系统
def setup_logging():
    """设置日志系统"""
    logger = logging.getLogger('ETL_Pipeline')
    logger.setLevel(logging.INFO)
    
    # 创建文件处理器
    file_handler = logging.FileHandler('etl_pipeline.log')
    file_handler.setLevel(logging.INFO)
    
    # 创建控制台处理器
    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.INFO)
    
    # 创建格式化器
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    file_handler.setFormatter(formatter)
    console_handler.setFormatter(formatter)
    
    # 添加处理器
    logger.addHandler(file_handler)
    logger.addHandler(console_handler)
    
    return logger

# 初始化日志
logger = setup_logging()

print("✅ 依赖库导入完成，日志系统已初始化")
print(f"当前时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

✅ 依赖库导入完成，日志系统已初始化
当前时间: 2025-12-02 16:19:42


# 2.ETL管道架构设计

In [6]:
class EcommerceETLPipeline:
    """
    电商数据ETL管道架构类
    包含完整的数据提取、转换、加载流程
    """
    
    def __init__(self, source_dir: str = '.', db_path: str = 'ecommerce_clean.db'):
        """
        初始化ETL管道
        
        Args:
            source_dir: 源数据目录
            db_path: SQLite数据库路径
        """
        self.source_dir = source_dir
        self.db_path = db_path
        self.connection = None
        self.execution_history = []
        self.data_quality_metrics = {}
        self.pipeline_config = {
            'batch_size': 10000,
            'enable_validation': True,
            'enable_backup': True,
            'cleanup_threshold': 0.95
        }
        
        logger.info(f"ETL管道初始化 - 源目录: {source_dir}, 数据库: {db_path}")
    
    def run_full_pipeline(self, generate_reports: bool = True) -> Dict:
        """
        运行完整的ETL管道
        
        Returns:
            Dict: 包含处理结果的字典
        """
        logger.info("="*80)
        logger.info(" [START] 开始运行完整ETL管道")
        logger.info("="*80)
        
        pipeline_start_time = datetime.now()
        execution_id = f"ETL_{pipeline_start_time.strftime('%Y%m%d_%H%M%S')}"
        
        results = {
            'execution_id': execution_id,
            'start_time': pipeline_start_time.isoformat(),
            'status': 'running',
            'stages': {},
            'errors': []
        }
        
        try:
            # 阶段1: 数据提取
            logger.info("[阶段1] 数据提取")
            stage1_start = datetime.now()
            raw_data = self.extract_data()
            stage1_end = datetime.now()
            results['stages']['extraction'] = {
                'status': 'completed',
                'start_time': stage1_start.isoformat(),
                'end_time': stage1_end.isoformat(),
                'duration': (stage1_end - stage1_start).total_seconds(),
                'tables_extracted': len(raw_data)
            }
            
            # 阶段2: 数据清洗和转换
            logger.info("[阶段2] 数据清洗和转换")
            stage2_start = datetime.now()
            cleaned_data = self.transform_data(raw_data)
            stage2_end = datetime.now()
            results['stages']['transformation'] = {
                'status': 'completed',
                'start_time': stage2_start.isoformat(),
                'end_time': stage2_end.isoformat(),
                'duration': (stage2_end - stage2_start).total_seconds(),
                'tables_cleaned': len(cleaned_data)
            }
            
            # 阶段3: 数据加载
            logger.info("[阶段3] 数据加载")
            stage3_start = datetime.now()
            self.load_data(cleaned_data)
            stage3_end = datetime.now()
            results['stages']['loading'] = {
                'status': 'completed',
                'start_time': stage3_start.isoformat(),
                'end_time': stage3_end.isoformat(),
                'duration': (stage3_end - stage3_start).total_seconds()
            }
            
            # 阶段4: 数据质量验证
            logger.info("[阶段4] 数据质量验证")
            stage4_start = datetime.now()
            quality_report = self.validate_data_quality()
            stage4_end = datetime.now()
            results['stages']['validation'] = {
                'status': 'completed',
                'start_time': stage4_start.isoformat(),
                'end_time': stage4_end.isoformat(),
                'duration': (stage4_end - stage4_start).total_seconds(),
                'quality_score': quality_report.get('overall_score', 0)
            }
            
            # 阶段5: 数据仓库视图创建
            logger.info("[阶段5] 数据仓库视图创建")
            stage5_start = datetime.now()
            self.create_data_warehouse_views()
            stage5_end = datetime.now()
            results['stages']['warehouse'] = {
                'status': 'completed',
                'start_time': stage5_start.isoformat(),
                'end_time': stage5_end.isoformat(),
                'duration': (stage5_end - stage5_start).total_seconds()
            }
            
            # 更新结果
            pipeline_end_time = datetime.now()
            results.update({
                'status': 'success',
                'end_time': pipeline_end_time.isoformat(),
                'total_duration': (pipeline_end_time - pipeline_start_time).total_seconds(),
                'tables_processed': len(cleaned_data),
                'quality_report': quality_report
            })
            
            logger.info(f"[SUCCESS] ETL管道运行成功! 总耗时: {results['total_duration']:.2f}秒")
            
            # 生成报告
            if generate_reports:
                self.generate_execution_report(results)
                self.generate_dashboard(results, quality_report)
            
            # 保存执行历史
            self.execution_history.append({
                'execution_id': execution_id,
                'timestamp': pipeline_start_time,
                'duration': results['total_duration'],
                'status': 'success',
                'quality_score': quality_report.get('overall_score', 0)
            })
            
        except Exception as e:
            logger.error(f"[ERROR] ETL管道运行失败: {str(e)}")
            pipeline_end_time = datetime.now()
            results.update({
                'status': 'failed',
                'end_time': pipeline_end_time.isoformat(),
                'total_duration': (pipeline_end_time - pipeline_start_time).total_seconds(),
                'error': str(e)
            })
            results['errors'].append(str(e))
            
            # 保存失败记录
            self.execution_history.append({
                'execution_id': execution_id,
                'timestamp': pipeline_start_time,
                'duration': results['total_duration'],
                'status': 'failed',
                'error': str(e)
            })
        
        return results
    
    def extract_data(self) -> Dict[str, pd.DataFrame]:
        """
        从CSV文件提取数据
        
        Returns:
            Dict[str, pd.DataFrame]: 原始数据字典
        """
        logger.info("[INFO] 开始数据提取...")
        
        data_files = {
            'customers': 'customers.csv',
            'products': 'products.csv', 
            'time_dim': 'time_dim.csv',
            'regions': 'regions.csv',
            'orders': 'orders.csv',
            'behavior_logs': 'behavior_logs.csv'
        }
        
        raw_data = {}
        extraction_stats = {}
        
        for table_name, file_name in data_files.items():
            file_path = os.path.join(self.source_dir, file_name)
            
            try:
                if os.path.exists(file_path):
                    # 读取CSV文件
                    df = pd.read_csv(file_path)
                    
                    # 记录提取统计
                    extraction_stats[table_name] = {
                        'rows': len(df),
                        'columns': len(df.columns),
                        'missing_values': df.isnull().sum().sum(),
                        'duplicates': df.duplicated().sum(),
                        'file_size_mb': os.path.getsize(file_path) / (1024 * 1024)
                    }
                    
                    raw_data[table_name] = df
                    logger.info(f"   [OK] 已提取: {file_name} ({len(df):,} 行)")
                else:
                    logger.warning(f"   [WARN] 文件不存在: {file_path}")
                    raw_data[table_name] = pd.DataFrame()
                    
            except Exception as e:
                logger.error(f"   [ERROR] 提取失败 {file_name}: {str(e)}")
                raw_data[table_name] = pd.DataFrame()
        
        # 记录提取统计
        self.data_quality_metrics['extraction'] = extraction_stats
        
        logger.info(f"[INFO] 数据提取完成，共加载 {len([d for d in raw_data.values() if not d.empty])} 个表")
        return raw_data
    
    def transform_data(self, raw_data: Dict[str, pd.DataFrame]) -> Dict[str, pd.DataFrame]:
        """
        数据清洗和转换
        
        Args:
            raw_data: 原始数据字典
            
        Returns:
            Dict[str, pd.DataFrame]: 清洗后的数据字典
        """
        logger.info("[INFO] 开始数据清洗和转换...")
        
        cleaned_data = {}
        transformation_stats = {}
        
        # 创建数据清洗工厂
        cleaner_factory = {
            'customers': self._clean_customers,
            'products': self._clean_products,
            'orders': self._clean_orders,
            'time_dim': self._clean_time_dim,
            'behavior_logs': self._clean_behavior_logs,
            'regions': self._clean_general
        }
        
        for table_name, df in raw_data.items():
            if df.empty:
                logger.warning(f"[WARN] 跳过空表: {table_name}")
                continue
            
            logger.info(f"[INFO] 清洗表: {table_name}")
            
            try:
                # 获取清洗函数
                cleaner_func = cleaner_factory.get(table_name, self._clean_general)
                
                # 记录清洗前的统计
                before_stats = {
                    'rows': len(df),
                    'columns': len(df.columns),
                    'missing_values': df.isnull().sum().sum(),
                    'duplicates': df.duplicated().sum(),
                    'memory_mb': df.memory_usage(deep=True).sum() / (1024 * 1024)
                }
                
                # 执行清洗
                cleaned_df = cleaner_func(df, table_name)
                
                # 记录清洗后的统计
                after_stats = {
                    'rows': len(cleaned_df),
                    'columns': len(cleaned_df.columns),
                    'missing_values': cleaned_df.isnull().sum().sum(),
                    'duplicates': cleaned_df.duplicated().sum(),
                    'memory_mb': cleaned_df.memory_usage(deep=True).sum() / (1024 * 1024)
                }
                
                # 计算改进指标
                improvement = {
                    'rows_removed': before_stats['rows'] - after_stats['rows'],
                    'rows_removed_pct': ((before_stats['rows'] - after_stats['rows']) / before_stats['rows'] * 100) if before_stats['rows'] > 0 else 0,
                    'missing_reduced': before_stats['missing_values'] - after_stats['missing_values'],
                    'missing_reduced_pct': ((before_stats['missing_values'] - after_stats['missing_values']) / before_stats['missing_values'] * 100) if before_stats['missing_values'] > 0 else 0,
                    'duplicates_removed': before_stats['duplicates'] - after_stats['duplicates'],
                    'memory_reduced_mb': before_stats['memory_mb'] - after_stats['memory_mb']
                }
                
                transformation_stats[table_name] = {
                    'before': before_stats,
                    'after': after_stats,
                    'improvement': improvement
                }
                
                cleaned_data[table_name] = cleaned_df
                logger.info(f"   [OK] 清洗完成: {before_stats['rows']:,} → {after_stats['rows']:,} 行 (移除: {improvement['rows_removed_pct']:.1f}%)")
                
            except Exception as e:
                logger.error(f"   [ERROR] 清洗失败 {table_name}: {str(e)}")
                # 如果清洗失败，保留原始数据
                cleaned_data[table_name] = df
        
        # 记录转换统计
        self.data_quality_metrics['transformation'] = transformation_stats
        
        logger.info(f"[INFO] 数据清洗完成，共处理 {len(cleaned_data)} 个表")
        return cleaned_data
    
    def _clean_general(self, df: pd.DataFrame, table_name: str) -> pd.DataFrame:
        """
        通用清洗函数
        """
        df_clean = df.copy()
        
        # 1. 删除完全重复的行
        df_clean = df_clean.drop_duplicates()
        
        # 2. 处理ID列
        id_columns = ['customer_id', 'order_id', 'product_id', 'log_id', 'region_id']
        for col in id_columns:
            if col in df_clean.columns:
                try:
                    df_clean[col] = pd.to_numeric(df_clean[col], errors='coerce').astype('Int64')
                except:
                    pass
        
        # 3. 标准化列名
        df_clean.columns = [col.strip().lower().replace(' ', '_') for col in df_clean.columns]
        
        return df_clean
    
    def _clean_customers(self, df: pd.DataFrame, table_name: str) -> pd.DataFrame:
        """清洗客户数据"""
        df_clean = df.copy()
        
        # 标准化列名
        df_clean.columns = [col.strip().lower().replace(' ', '_') for col in df_clean.columns]
        
        # 删除完全重复的行
        df_clean = df_clean.drop_duplicates()
        
        # 处理客户ID
        if 'customer_id' in df_clean.columns:
            df_clean['customer_id'] = pd.to_numeric(df_clean['customer_id'], errors='coerce').astype('Int64')
            df_clean = df_clean.dropna(subset=['customer_id'])
        
        # 清洗姓名
        if 'customer_name' in df_clean.columns:
            df_clean['customer_name'] = df_clean['customer_name'].astype(str).str.strip().str.title()
            missing_names = df_clean['customer_name'].isna() | (df_clean['customer_name'] == '')
            df_clean.loc[missing_names, 'customer_name'] = 'Unknown Customer'
        
        # 标准化性别
        if 'gender' in df_clean.columns:
            gender_mapping = {
                'male': 'Male', 'M': 'Male', '男': 'Male',
                'female': 'Female', 'F': 'Female', '女': 'Female',
                '未知': 'Unknown', '': 'Unknown'
            }
            df_clean['gender'] = df_clean['gender'].astype(str).str.strip().str.lower()
            df_clean['gender'] = df_clean['gender'].map(gender_mapping).fillna('Unknown')
        
        # 清洗出生年份
        if 'birth_year' in df_clean.columns:
            df_clean['birth_year'] = pd.to_numeric(df_clean['birth_year'], errors='coerce')
            current_year = datetime.now().year
            # 删除不合理的出生年份
            mask = (df_clean['birth_year'] >= 1900) & (df_clean['birth_year'] <= current_year - 10)
            df_clean = df_clean[mask | df_clean['birth_year'].isna()]
            # 计算年龄
            df_clean['age'] = current_year - df_clean['birth_year']
        
        # 清洗邮箱
        if 'email' in df_clean.columns:
            df_clean['email'] = df_clean['email'].astype(str).str.strip().str.lower()
            email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
            valid_emails = df_clean['email'].str.contains(email_pattern, na=False)
            df_clean.loc[~valid_emails, 'email'] = None
        
        # 标准化国家
        if 'country' in df_clean.columns:
            country_mapping = {
                'US': 'USA', 'United States': 'USA', '美国': 'USA',
                'CA': 'Canada', 'CANADA': 'Canada',
                'GB': 'UK', 'United Kingdom': 'UK',
                'AU': 'Australia'
            }
            df_clean['country'] = df_clean['country'].astype(str).str.strip().str.upper()
            df_clean['country'] = df_clean['country'].map(country_mapping).fillna('Other')
        
        # 标准化忠诚度等级
        if 'loyalty_tier' in df_clean.columns:
            loyalty_mapping = {
                'bronze': 'Bronze', 'silver': 'Silver', 'gold': 'Gold', 
                'platinum': 'Platinum', 'vip': 'Platinum'
            }
            df_clean['loyalty_tier'] = df_clean['loyalty_tier'].astype(str).str.strip().str.lower()
            df_clean['loyalty_tier'] = df_clean['loyalty_tier'].map(loyalty_mapping).fillna('Bronze')
        
        # 处理注册日期
        if 'registration_date' in df_clean.columns:
            df_clean['registration_date'] = pd.to_datetime(df_clean['registration_date'], errors='coerce')
            # 删除异常日期
            df_clean = df_clean[df_clean['registration_date'].notna()]
        
        # 处理数字列中的异常值
        numeric_cols = ['total_orders', 'total_spent']
        for col in numeric_cols:
            if col in df_clean.columns:
                df_clean[col] = pd.to_numeric(df_clean[col], errors='coerce')
                # 将负数转为正数
                df_clean[col] = df_clean[col].abs()
        
        return df_clean
    
    def _clean_products(self, df: pd.DataFrame, table_name: str) -> pd.DataFrame:
        """清洗产品数据"""
        df_clean = df.copy()
        
        # 标准化列名
        df_clean.columns = [col.strip().lower().replace(' ', '_') for col in df_clean.columns]
        
        # 删除完全重复的行
        df_clean = df_clean.drop_duplicates()
        
        # 处理产品ID
        if 'product_id' in df_clean.columns:
            df_clean['product_id'] = pd.to_numeric(df_clean['product_id'], errors='coerce').astype('Int64')
            df_clean = df_clean.dropna(subset=['product_id'])
        
        # 清洗产品名称
        if 'product_name' in df_clean.columns:
            df_clean['product_name'] = df_clean['product_name'].astype(str).str.strip().str.title()
            df_clean['product_name'] = df_clean['product_name'].fillna('Unknown Product')
        
        # 标准化产品类别
        if 'category' in df_clean.columns:
            category_mapping = {
                'electronics': 'Electronics', 'clothing': 'Clothing', 
                'home': 'Home', 'books': 'Books', 'sports': 'Sports'
            }
            df_clean['category'] = df_clean['category'].astype(str).str.strip().str.lower()
            df_clean['category'] = df_clean['category'].map(category_mapping).fillna('Other')
        
        # 清洗价格
        if 'price' in df_clean.columns:
            df_clean['price'] = pd.to_numeric(df_clean['price'], errors='coerce')
            # 处理异常价格
            min_price, max_price = 0.01, 10000
            df_clean.loc[df_clean['price'] < min_price, 'price'] = min_price
            df_clean.loc[df_clean['price'] > max_price, 'price'] = max_price
            # 填充缺失的价格
            if df_clean['price'].isna().any():
                avg_price = df_clean['price'].mean()
                if pd.isna(avg_price):
                    avg_price = 50
                df_clean['price'] = df_clean['price'].fillna(avg_price)
        
        # 清洗库存
        if 'stock_quantity' in df_clean.columns:
            df_clean['stock_quantity'] = pd.to_numeric(df_clean['stock_quantity'], errors='coerce')
            df_clean.loc[df_clean['stock_quantity'] < 0, 'stock_quantity'] = 0
            df_clean['stock_quantity'] = df_clean['stock_quantity'].fillna(0)
        
        return df_clean
    
    def _clean_orders(self, df: pd.DataFrame, table_name: str) -> pd.DataFrame:
        """清洗订单数据"""
        df_clean = df.copy()
        
        # 标准化列名
        df_clean.columns = [col.strip().lower().replace(' ', '_') for col in df_clean.columns]
        
        # 删除完全重复的行
        df_clean = df_clean.drop_duplicates()
        
        # 处理订单ID
        if 'order_id' in df_clean.columns:
            df_clean['order_id'] = pd.to_numeric(df_clean['order_id'], errors='coerce').astype('Int64')
            df_clean = df_clean.dropna(subset=['order_id'])
        
        # 处理客户ID和产品ID
        id_cols = ['customer_id', 'product_id', 'region_id']
        for col in id_cols:
            if col in df_clean.columns:
                df_clean[col] = pd.to_numeric(df_clean[col], errors='coerce').astype('Int64')
        
        # 清洗订单日期
        if 'order_date' in df_clean.columns:
            df_clean['order_date'] = pd.to_datetime(df_clean['order_date'], errors='coerce')
            min_date, max_date = pd.Timestamp('2023-01-01'), pd.Timestamp('2024-12-31')
            df_clean = df_clean[
                (df_clean['order_date'] >= min_date) & 
                (df_clean['order_date'] <= max_date)
            ]
        
        # 清洗数量
        if 'quantity' in df_clean.columns:
            df_clean['quantity'] = pd.to_numeric(df_clean['quantity'], errors='coerce')
            df_clean.loc[df_clean['quantity'] <= 0, 'quantity'] = 1
            df_clean.loc[df_clean['quantity'] > 100, 'quantity'] = 100
            df_clean['quantity'] = df_clean['quantity'].fillna(1)
        
        # 清洗单价
        if 'unit_price' in df_clean.columns:
            df_clean['unit_price'] = pd.to_numeric(df_clean['unit_price'], errors='coerce')
            df_clean.loc[df_clean['unit_price'] < 0, 'unit_price'] = 0
            df_clean.loc[df_clean['unit_price'] > 10000, 'unit_price'] = 10000
            if df_clean['unit_price'].isna().any():
                avg_price = df_clean['unit_price'].mean()
                if pd.isna(avg_price):
                    avg_price = 50
                df_clean['unit_price'] = df_clean['unit_price'].fillna(avg_price)
        
        # 清洗金额
        if 'amount' in df_clean.columns:
            df_clean['amount'] = df_clean['amount'].astype(str).str.replace('$', '', regex=False)
            df_clean['amount'] = pd.to_numeric(df_clean['amount'], errors='coerce')
            # 修正金额计算错误
            calculated_amount = df_clean['unit_price'] * df_clean['quantity']
            amount_diff = abs(df_clean['amount'] - calculated_amount)
            mask_large_diff = amount_diff > (df_clean['amount'] * 0.01)
            df_clean.loc[mask_large_diff, 'amount'] = calculated_amount[mask_large_diff]
            df_clean.loc[df_clean['amount'] < 0, 'amount'] = df_clean['amount'].abs()
        
        # 标准化支付方式
        if 'payment_method' in df_clean.columns:
            payment_mapping = {
                'credit card': 'Credit Card', 'cc': 'Credit Card',
                'paypal': 'PayPal', 'apple pay': 'Apple Pay',
                'google pay': 'Google Pay', '现金': 'Cash'
            }
            df_clean['payment_method'] = df_clean['payment_method'].astype(str).str.strip().str.lower()
            df_clean['payment_method'] = df_clean['payment_method'].map(payment_mapping).fillna('Other')
        
        # 标准化订单状态
        if 'order_status' in df_clean.columns:
            status_mapping = {
                'completed': 'Completed', 'shipped': 'Shipped', 
                'processing': 'Processing', 'cancelled': 'Cancelled',
                '待处理': 'Processing', '已取消': 'Cancelled'
            }
            df_clean['order_status'] = df_clean['order_status'].astype(str).str.strip().str.lower()
            df_clean['order_status'] = df_clean['order_status'].map(status_mapping).fillna('Processing')
        
        return df_clean
    
    def _clean_time_dim(self, df: pd.DataFrame, table_name: str) -> pd.DataFrame:
        """清洗时间维度数据"""
        df_clean = df.copy()
        
        # 标准化列名
        df_clean.columns = [col.strip().lower().replace(' ', '_') for col in df_clean.columns]
        
        # 删除完全重复的行
        df_clean = df_clean.drop_duplicates()
        
        # 确保日期唯一性
        if 'date' in df_clean.columns:
            df_clean = df_clean.drop_duplicates(subset=['date'])
            df_clean['date'] = pd.to_datetime(df_clean['date'], errors='coerce')
        
        return df_clean
    
    def _clean_behavior_logs(self, df: pd.DataFrame, table_name: str) -> pd.DataFrame:
        """清洗用户行为日志数据"""
        df_clean = df.copy()
        
        # 标准化列名
        df_clean.columns = [col.strip().lower().replace(' ', '_') for col in df_clean.columns]
        
        # 删除完全重复的行
        df_clean = df_clean.drop_duplicates()
        
        # 处理日志ID
        if 'log_id' in df_clean.columns:
            df_clean['log_id'] = pd.to_numeric(df_clean['log_id'], errors='coerce').astype('Int64')
            df_clean = df_clean.dropna(subset=['log_id'])
        
        # 处理客户ID和产品ID
        id_cols = ['customer_id', 'product_id']
        for col in id_cols:
            if col in df_clean.columns:
                df_clean[col] = pd.to_numeric(df_clean[col], errors='coerce').astype('Int64')
        
        # 标准化行为类型
        if 'behavior_type' in df_clean.columns:
            behavior_mapping = {
                'view': 'view', 'click': 'click', 
                'add_to_cart': 'add_to_cart', 'purchase': 'purchase',
                'wishlist': 'wishlist', '浏览': 'view', 'VIEW': 'view',
                'CLICK': 'click', '': 'view'
            }
            df_clean['behavior_type'] = df_clean['behavior_type'].astype(str).str.strip().str.lower()
            df_clean['behavior_type'] = df_clean['behavior_type'].map(behavior_mapping).fillna('view')
        
        return df_clean
    
    def connect_to_db(self):
        """连接到SQLite数据库"""
        try:
            self.connection = sqlite3.connect(self.db_path)
            logger.info(f"[INFO] 成功连接到数据库: {self.db_path}")
        except Error as e:
            logger.error(f"[ERROR] 数据库连接失败: {e}")
            raise
    
    def create_tables(self):
        """创建数据库表"""
        if not self.connection:
            self.connect_to_db()
        
        cursor = self.connection.cursor()
        
        # 客户表
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS customers (
            customer_id INTEGER PRIMARY KEY,
            customer_name TEXT,
            gender TEXT,
            birth_year INTEGER,
            registration_date TEXT,
            email TEXT,
            phone TEXT,
            country TEXT,
            city TEXT,
            zip_code TEXT,
            registration_channel TEXT,
            loyalty_tier TEXT,
            preferred_category TEXT,
            avg_order_value_segment TEXT,
            last_login_date TEXT,
            total_orders INTEGER,
            total_spent REAL,
            age INTEGER
        )
        ''')
        
        # 产品表
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS products (
            product_id INTEGER PRIMARY KEY,
            product_name TEXT,
            category TEXT,
            subcategory TEXT,
            brand TEXT,
            price REAL,
            cost_price REAL,
            stock_quantity INTEGER,
            supplier TEXT,
            rating REAL,
            review_count INTEGER,
            created_date TEXT,
            is_active BOOLEAN
        )
        ''')
        
        # 时间维度表
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS time_dim (
            date TEXT PRIMARY KEY,
            day INTEGER,
            month INTEGER,
            month_name TEXT,
            quarter INTEGER,
            year INTEGER,
            day_of_week INTEGER,
            day_name TEXT,
            is_weekend BOOLEAN,
            is_holiday BOOLEAN
        )
        ''')
        
        # 地区表
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS regions (
            region_id INTEGER PRIMARY KEY,
            region_name TEXT,
            region_manager TEXT
        )
        ''')
        
        # 订单表
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS orders (
            order_id INTEGER PRIMARY KEY,
            customer_id INTEGER,
            product_id INTEGER,
            order_date TEXT,
            quantity INTEGER,
            unit_price REAL,
            amount REAL,
            region_id INTEGER,
            payment_method TEXT,
            shipping_method TEXT,
            order_status TEXT,
            browsing_duration_seconds INTEGER,
            click_count INTEGER,
            add_to_cart_count INTEGER,
            wishlist_added BOOLEAN,
            discount_applied REAL,
            customer_rating INTEGER,
            return_requested BOOLEAN,
            FOREIGN KEY (customer_id) REFERENCES customers (customer_id),
            FOREIGN KEY (product_id) REFERENCES products (product_id),
            FOREIGN KEY (region_id) REFERENCES regions (region_id)
        )
        ''')
        
        # 行为日志表
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS behavior_logs (
            log_id INTEGER PRIMARY KEY,
            customer_id INTEGER,
            product_id INTEGER,
            behavior_type TEXT,
            timestamp TEXT,
            session_id TEXT,
            device_type TEXT,
            browser TEXT,
            FOREIGN KEY (customer_id) REFERENCES customers (customer_id),
            FOREIGN KEY (product_id) REFERENCES products (product_id)
        )
        ''')
        
        self.connection.commit()
        logger.info("[INFO] 数据库表创建完成")
    
    def load_data(self, cleaned_data: Dict[str, pd.DataFrame]):
        """加载清洗后的数据到数据库"""
        logger.info("[INFO] 开始加载数据到数据库...")
        
        if not self.connection:
            self.connect_to_db()
        
        # 创建表
        self.create_tables()
        
        # 按依赖顺序加载数据
        load_order = ['customers', 'products', 'time_dim', 'regions', 'orders', 'behavior_logs']
        
        # 创建cleaned_data目录
        os.makedirs('cleaned_data', exist_ok=True)
        
        load_stats = {}
        
        for table_name in load_order:
            if table_name in cleaned_data and not cleaned_data[table_name].empty:
                df = cleaned_data[table_name]
                
                try:
                    # 保存到CSV（备份）
                    csv_path = f'cleaned_data/cleaned_{table_name}.csv'
                    df.to_csv(csv_path, index=False)
                    
                    # 插入到数据库
                    df.to_sql(table_name, self.connection, if_exists='replace', index=False)
                    
                    # 记录加载统计
                    load_stats[table_name] = {
                        'rows_loaded': len(df),
                        'csv_path': csv_path,
                        'csv_size_mb': os.path.getsize(csv_path) / (1024 * 1024)
                    }
                    
                    logger.info(f"   [OK] 已加载: {table_name} ({len(df):,} 行)")
                    
                except Exception as e:
                    logger.error(f"   [ERROR] 加载失败 {table_name}: {str(e)}")
                    load_stats[table_name] = {
                        'rows_loaded': 0,
                        'error': str(e)
                    }
        
        # 记录加载统计
        self.data_quality_metrics['loading'] = load_stats
        
        logger.info("[INFO] 数据加载完成")
    
    def validate_data_quality(self) -> Dict:
        """验证数据质量"""
        logger.info("[INFO] 开始数据质量验证...")
        
        if not self.connection:
            self.connect_to_db()
        
        quality_report = {
            'timestamp': datetime.now().isoformat(),
            'tables': {},
            'overall_score': 0,
            'issues_found': 0,
            'detailed_issues': []
        }
        
        tables = ['customers', 'products', 'orders', 'behavior_logs', 'time_dim', 'regions']
        
        for table_name in tables:
            try:
                df = pd.read_sql_query(f"SELECT * FROM {table_name}", self.connection)
                table_report = self._validate_table_quality(df, table_name)
                quality_report['tables'][table_name] = table_report
                quality_report['issues_found'] += table_report['issues_count']
                
                # 记录详细问题
                for issue in table_report['issues']:
                    quality_report['detailed_issues'].append({
                        'table': table_name,
                        'type': issue['type'],
                        'description': issue['description']
                    })
                    
            except Exception as e:
                logger.warning(f"[WARN] 无法验证表 {table_name}: {str(e)}")
                quality_report['detailed_issues'].append({
                    'table': table_name,
                    'type': 'validation_error',
                    'description': f"验证失败: {str(e)}"
                })
        
        # 计算总体质量评分
        if quality_report['tables']:
            total_score = sum([r['quality_score'] for r in quality_report['tables'].values()])
            quality_report['overall_score'] = total_score / len(quality_report['tables'])
        
        logger.info(f"[INFO] 质量验证完成 - 总体评分: {quality_report['overall_score']:.1f}/100")
        
        return quality_report
    
    def _validate_table_quality(self, df: pd.DataFrame, table_name: str) -> Dict:
        """验证单个表的数据质量"""
        report = {
            'table_name': table_name,
            'row_count': len(df),
            'column_count': len(df.columns),
            'issues': [],
            'issues_count': 0,
            'quality_score': 100
        }
        
        # 检查缺失值
        missing_counts = df.isnull().sum()
        missing_cols = missing_counts[missing_counts > 0]
        
        if len(missing_cols) > 0:
            missing_issue = {
                'type': 'missing_values',
                'description': f"{len(missing_cols)} 列有缺失值",
                'details': missing_cols.to_dict()
            }
            report['issues'].append(missing_issue)
            report['issues_count'] += 1
            report['quality_score'] -= 10
        
        # 检查重复行
        duplicate_rows = df.duplicated().sum()
        if duplicate_rows > 0:
            duplicate_issue = {
                'type': 'duplicate_rows',
                'description': f"{duplicate_rows} 个重复行",
                'details': {'duplicate_count': duplicate_rows}
            }
            report['issues'].append(duplicate_issue)
            report['issues_count'] += 1
            report['quality_score'] -= 10
        
        # 检查数据类型
        type_issues = []
        for col in df.columns:
            if df[col].dtype == 'object':
                try:
                    pd.to_numeric(df[col], errors='raise')
                except:
                    pass
                else:
                    type_issues.append(col)
        
        if type_issues:
            type_issue = {
                'type': 'data_type_inconsistency',
                'description': f"{len(type_issues)} 列可能有数据类型问题",
                'details': {'columns': type_issues}
            }
            report['issues'].append(type_issue)
            report['issues_count'] += 1
            report['quality_score'] -= 5
        
        # 检查订单表的异常值
        if table_name == 'orders':
            if 'amount' in df.columns:
                negative_amount = (df['amount'] < 0).sum()
                if negative_amount > 0:
                    amount_issue = {
                        'type': 'negative_amount',
                        'description': f"{negative_amount} 个订单金额为负数",
                        'details': {'negative_count': negative_amount}
                    }
                    report['issues'].append(amount_issue)
                    report['issues_count'] += 1
                    report['quality_score'] -= 15
        
        # 确保质量分数不低于0
        report['quality_score'] = max(0, report['quality_score'])
        
        return report
    
    def create_data_warehouse_views(self):
        """创建数据仓库视图"""
        try:
            if not self.connection:
                self.connect_to_db()
            
            cursor = self.connection.cursor()
            
            # 客户分析视图
            cursor.execute('''
            CREATE VIEW IF NOT EXISTS vw_customer_analysis AS
            SELECT 
                c.customer_id,
                c.customer_name,
                c.country,
                c.city,
                c.loyalty_tier,
                c.age,
                COUNT(o.order_id) as total_orders,
                SUM(o.amount) as total_spent,
                AVG(o.amount) as avg_order_value,
                MAX(o.order_date) as last_order_date
            FROM customers c
            LEFT JOIN orders o ON c.customer_id = o.customer_id
            GROUP BY c.customer_id, c.customer_name, c.country, c.city, c.loyalty_tier, c.age
            ''')
            
            # 产品销售分析视图
            cursor.execute('''
            CREATE VIEW IF NOT EXISTS vw_product_sales AS
            SELECT 
                p.product_id,
                p.product_name,
                p.category,
                p.subcategory,
                p.brand,
                p.price,
                COUNT(o.order_id) as units_sold,
                SUM(o.amount) as revenue,
                SUM(o.quantity) as total_quantity,
                AVG(o.customer_rating) as avg_rating
            FROM products p
            LEFT JOIN orders o ON p.product_id = o.product_id
            GROUP BY p.product_id, p.product_name, p.category, p.subcategory, p.brand, p.price
            ''')
            
            # 月度销售视图
            cursor.execute('''
            CREATE VIEW IF NOT EXISTS vw_monthly_sales AS
            SELECT 
                strftime('%Y-%m', o.order_date) as month,
                COUNT(o.order_id) as total_orders,
                SUM(o.amount) as total_revenue,
                AVG(o.amount) as avg_order_value,
                COUNT(DISTINCT o.customer_id) as unique_customers
            FROM orders o
            GROUP BY strftime('%Y-%m', o.order_date)
            ORDER BY month
            ''')
            
            # 用户行为分析视图
            cursor.execute('''
            CREATE VIEW IF NOT EXISTS vw_user_behavior AS
            SELECT 
                bl.customer_id,
                bl.behavior_type,
                COUNT(bl.log_id) as behavior_count,
                COUNT(DISTINCT bl.product_id) as unique_products,
                COUNT(DISTINCT DATE(bl.timestamp)) as active_days
            FROM behavior_logs bl
            GROUP BY bl.customer_id, bl.behavior_type
            ''')
            
            self.connection.commit()
            logger.info("[INFO] 数据仓库视图创建完成")
            
        except Exception as e:
            logger.error(f"[ERROR] 创建数据仓库视图失败: {str(e)}")
    
    def generate_execution_report(self, results: Dict):
        """生成执行报告"""
        try:
            report_data = {
                'execution_summary': {
                    'execution_id': results['execution_id'],
                    'start_time': results['start_time'],
                    'end_time': results['end_time'],
                    'total_duration': results['total_duration'],
                    'status': results['status'],
                    'tables_processed': results.get('tables_processed', 0)
                },
                'stage_details': results.get('stages', {}),
                'quality_metrics': self.data_quality_metrics,
                'quality_report': results.get('quality_report', {}),
                'generated_at': datetime.now().isoformat()
            }
            
            # 保存JSON报告
            with open('etl_execution_report.json', 'w', encoding='utf-8') as f:
                json.dump(report_data, f, indent=2, ensure_ascii=False, default=str)
            
            # 生成文本报告
            self._generate_text_report(report_data)
            
            logger.info(f"[INFO] 执行报告已生成: etl_execution_report.json")
            
        except Exception as e:
            logger.error(f"[ERROR] 生成执行报告失败: {str(e)}")
    
    def _generate_text_report(self, report_data: Dict):
        """生成文本格式报告"""
        report_lines = []
        report_lines.append("="*80)
        report_lines.append(" ETL管道执行报告")
        report_lines.append(f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        report_lines.append("="*80)
        
        # 执行摘要
        summary = report_data['execution_summary']
        report_lines.append(f"\n [SUMMARY] 执行摘要")
        report_lines.append(f"-"*40)
        report_lines.append(f"执行ID: {summary['execution_id']}")
        report_lines.append(f"状态: {summary['status']}")
        report_lines.append(f"开始时间: {summary['start_time']}")
        report_lines.append(f"结束时间: {summary['end_time']}")
        report_lines.append(f"总耗时: {summary['total_duration']:.2f} 秒")
        report_lines.append(f"处理表数: {summary['tables_processed']}")
        
        # 阶段详情
        stages = report_data['stage_details']
        report_lines.append(f"\n [STAGES] 阶段详情")
        report_lines.append(f"-"*40)
        
        for stage_name, stage_info in stages.items():
            duration = stage_info.get('duration', 0)
            status = stage_info.get('status', 'unknown')
            report_lines.append(f"{stage_name:15s}: {status:10s} | 耗时: {duration:.2f}秒")
        
        # 质量报告
        quality = report_data.get('quality_report', {})
        if quality:
            report_lines.append(f"\n [QUALITY] 数据质量报告")
            report_lines.append(f"-"*40)
            report_lines.append(f"总体质量评分: {quality.get('overall_score', 0):.1f}/100")
            report_lines.append(f"发现问题总数: {quality.get('issues_found', 0)}")
            
            # 各表质量
            tables = quality.get('tables', {})
            if tables:
                report_lines.append(f"\n 各表质量:")
                for table_name, table_info in tables.items():
                    score = table_info.get('quality_score', 0)
                    issues = table_info.get('issues_count', 0)
                    report_lines.append(f"  {table_name:15s}: {score:5.1f}/100 | 问题数: {issues}")
        
        # 保存文本报告
        with open('etl_report.txt', 'w', encoding='utf-8') as f:
            f.write('\n'.join(report_lines))
        
        # 打印报告
        print('\n'.join(report_lines))
    
    def generate_dashboard(self, results: Dict, quality_report: Dict):
        """生成可视化仪表板"""
        try:
            # 创建仪表板
            fig = make_subplots(
                rows=2, cols=3,
                subplot_titles=('ETL阶段耗时', '数据质量评分', '表级质量分布',
                              '执行历史趋势', '问题类型分布', '数据量变化'),
                specs=[[{'type': 'bar'}, {'type': 'gauge'}, {'type': 'bar'}],
                       [{'type': 'line'}, {'type': 'pie'}, {'type': 'bar'}]]
            )
            
            # 1. ETL阶段耗时柱状图
            stages = results.get('stages', {})
            stage_names = list(stages.keys())
            stage_durations = [s.get('duration', 0) for s in stages.values()]
            
            fig.add_trace(
                go.Bar(x=stage_names, y=stage_durations, name='阶段耗时', marker_color='skyblue'),
                row=1, col=1
            )
            
            # 2. 数据质量评分仪表盘
            quality_score = quality_report.get('overall_score', 0)
            
            fig.add_trace(
                go.Indicator(
                    mode="gauge+number",
                    value=quality_score,
                    title={'text': "数据质量评分"},
                    domain={'row': 0, 'column': 1},
                    gauge={
                        'axis': {'range': [0, 100]},
                        'bar': {'color': "darkblue"},
                        'steps': [
                            {'range': [0, 60], 'color': "red"},
                            {'range': [60, 80], 'color': "yellow"},
                            {'range': [80, 100], 'color': "green"}
                        ]
                    }
                ),
                row=1, col=2
            )
            
            # 3. 表级质量分布
            tables = quality_report.get('tables', {})
            table_names = list(tables.keys())
            table_scores = [t.get('quality_score', 0) for t in tables.values()]
            
            fig.add_trace(
                go.Bar(x=table_names, y=table_scores, name='表质量', marker_color='lightgreen'),
                row=1, col=3
            )
            
            # 4. 执行历史趋势（模拟数据）
            if self.execution_history:
                history_dates = [h['timestamp'] for h in self.execution_history[-5:]]
                history_scores = [h.get('quality_score', 0) for h in self.execution_history[-5:]]
                
                fig.add_trace(
                    go.Scatter(x=history_dates, y=history_scores, mode='lines+markers', name='质量趋势'),
                    row=2, col=1
                )
            
            # 5. 问题类型分布
            issues = quality_report.get('detailed_issues', [])
            if issues:
                issue_types = [i['type'] for i in issues]
                issue_counts = pd.Series(issue_types).value_counts()
                
                fig.add_trace(
                    go.Pie(labels=issue_counts.index.tolist(), values=issue_counts.values.tolist(), name='问题分布'),
                    row=2, col=2
                )
            
            # 6. 数据量变化
            if 'transformation' in self.data_quality_metrics:
                transform_stats = self.data_quality_metrics['transformation']
                table_names = list(transform_stats.keys())
                before_rows = [s['before']['rows'] for s in transform_stats.values()]
                after_rows = [s['after']['rows'] for s in transform_stats.values()]
                
                fig.add_trace(
                    go.Bar(x=table_names, y=before_rows, name='清洗前', marker_color='red'),
                    row=2, col=3
                )
                
                fig.add_trace(
                    go.Bar(x=table_names, y=after_rows, name='清洗后', marker_color='green'),
                    row=2, col=3
                )
            
            # 更新布局
            fig.update_layout(
                height=800,
                showlegend=True,
                title_text="ETL管道监控仪表板",
                title_font_size=20
            )
            
            # 保存为HTML
            fig.write_html("etl_dashboard.html")
            
            # 在Jupyter中显示
            fig.show()
            
            logger.info(f"[INFO] 仪表板已生成: etl_dashboard.html")
            
        except Exception as e:
            logger.error(f"[ERROR] 生成仪表板失败: {str(e)}")
    
    def close_connection(self):
        """关闭数据库连接"""
        if self.connection:
            self.connection.close()
            logger.info("[INFO] 数据库连接已关闭")

logger.info("[OK] ETL管道架构类定义完成")

2025-12-02 17:39:50,725 - ETL_Pipeline - INFO - [OK] ETL管道架构类定义完成


# 3.数据质量监控模块

In [7]:
class DataQualityMonitor:
    """
    数据质量监控模块
    提供实时的数据质量检查和告警功能
    """
    
    def __init__(self, db_path: str = 'ecommerce_clean.db'):
        """
        初始化数据质量监控器
        
        Args:
            db_path: 数据库路径
        """
        self.db_path = db_path
        self.connection = None
        self.monitoring_rules = self._load_monitoring_rules()
        self.alert_history = []
        self.quality_metrics_history = []
        
        logger.info(f"[INFO] 数据质量监控器初始化 - 数据库: {db_path}")
    
    def _load_monitoring_rules(self) -> Dict:
        """加载监控规则"""
        return {
            'missing_values': {
                'threshold': 0.05,  # 5%缺失值阈值
                'severity': 'warning',
                'message': '缺失值超过阈值'
            },
            'duplicate_rows': {
                'threshold': 0.01,  # 1%重复行阈值
                'severity': 'warning',
                'message': '重复行超过阈值'
            },
            'negative_values': {
                'threshold': 0,
                'severity': 'error',
                'message': '发现负值'
            },
            'outliers': {
                'threshold': 3,  # 3倍标准差
                'severity': 'info',
                'message': '发现异常值'
            },
            'referential_integrity': {
                'threshold': 0,
                'severity': 'error',
                'message': '引用完整性错误'
            }
        }
    
    def connect_to_db(self):
        """连接到数据库"""
        try:
            self.connection = sqlite3.connect(self.db_path)
            logger.info(f"[INFO] 监控器成功连接到数据库: {self.db_path}")
        except Error as e:
            logger.error(f"[ERROR] 数据库连接失败: {e}")
            raise
    
    def run_comprehensive_monitoring(self) -> Dict:
        """
        运行全面的数据质量监控
        
        Returns:
            Dict: 监控结果
        """
        logger.info("="*80)
        logger.info(" [MONITOR] 开始全面数据质量监控")
        logger.info("="*80)
        
        monitoring_start = datetime.now()
        results = {
            'timestamp': monitoring_start.isoformat(),
            'tables_monitored': [],
            'alerts': [],
            'metrics': {},
            'summary': {
                'total_alerts': 0,
                'critical_alerts': 0,
                'warning_alerts': 0,
                'info_alerts': 0
            }
        }
        
        try:
            if not self.connection:
                self.connect_to_db()
            
            # 获取所有表
            cursor = self.connection.cursor()
            cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
            tables = [row[0] for row in cursor.fetchall()]
            
            # 监控每个表
            for table_name in tables:
                logger.info(f"[INFO] 监控表: {table_name}")
                table_results = self._monitor_table(table_name)
                results['tables_monitored'].append(table_name)
                results['metrics'][table_name] = table_results['metrics']
                results['alerts'].extend(table_results['alerts'])
            
            # 运行跨表监控
            cross_table_results = self._monitor_cross_table_integrity()
            results['alerts'].extend(cross_table_results['alerts'])
            
            # 运行业务规则监控
            business_rule_results = self._monitor_business_rules()
            results['alerts'].extend(business_rule_results['alerts'])
            
            # 更新摘要统计
            for alert in results['alerts']:
                results['summary']['total_alerts'] += 1
                if alert['severity'] == 'error':
                    results['summary']['critical_alerts'] += 1
                elif alert['severity'] == 'warning':
                    results['summary']['warning_alerts'] += 1
                elif alert['severity'] == 'info':
                    results['summary']['info_alerts'] += 1
            
            # 记录监控历史
            monitoring_end = datetime.now()
            monitoring_record = {
                'timestamp': monitoring_start,
                'duration': (monitoring_end - monitoring_start).total_seconds(),
                'tables_monitored': len(tables),
                'total_alerts': results['summary']['total_alerts'],
                'critical_alerts': results['summary']['critical_alerts']
            }
            self.quality_metrics_history.append(monitoring_record)
            
            # 生成监控报告
            self._generate_monitoring_report(results)
            
            logger.info(f"[INFO] 监控完成 - 发现告警: {results['summary']['total_alerts']} 个")
            
        except Exception as e:
            logger.error(f"[ERROR] 监控运行失败: {str(e)}")
            results['error'] = str(e)
        
        return results
    
    def _monitor_table(self, table_name: str) -> Dict:
        """监控单个表"""
        results = {
            'table_name': table_name,
            'metrics': {},
            'alerts': []
        }
        
        try:
            # 读取表数据
            df = pd.read_sql_query(f"SELECT * FROM {table_name}", self.connection)
            
            # 计算基本指标
            metrics = {
                'row_count': len(df),
                'column_count': len(df.columns),
                'missing_values': df.isnull().sum().sum(),
                'missing_percentage': (df.isnull().sum().sum() / (len(df) * len(df.columns))) * 100,
                'duplicate_rows': df.duplicated().sum(),
                'duplicate_percentage': (df.duplicated().sum() / len(df)) * 100 if len(df) > 0 else 0
            }
            
            results['metrics'] = metrics
            
            # 检查缺失值
            missing_threshold = self.monitoring_rules['missing_values']['threshold']
            if metrics['missing_percentage'] > missing_threshold * 100:
                alert = {
                    'table': table_name,
                    'type': 'missing_values',
                    'severity': self.monitoring_rules['missing_values']['severity'],
                    'message': f"{table_name}: 缺失值比例 {metrics['missing_percentage']:.2f}% 超过阈值 {missing_threshold*100}%",
                    'value': metrics['missing_percentage'],
                    'threshold': missing_threshold * 100
                }
                results['alerts'].append(alert)
                self._record_alert(alert)
            
            # 检查重复行
            duplicate_threshold = self.monitoring_rules['duplicate_rows']['threshold']
            if metrics['duplicate_percentage'] > duplicate_threshold * 100:
                alert = {
                    'table': table_name,
                    'type': 'duplicate_rows',
                    'severity': self.monitoring_rules['duplicate_rows']['severity'],
                    'message': f"{table_name}: 重复行比例 {metrics['duplicate_percentage']:.2f}% 超过阈值 {duplicate_threshold*100}%",
                    'value': metrics['duplicate_percentage'],
                    'threshold': duplicate_threshold * 100
                }
                results['alerts'].append(alert)
                self._record_alert(alert)
            
            # 检查数值列的负值
            numeric_cols = df.select_dtypes(include=[np.number]).columns
            for col in numeric_cols:
                if (df[col] < 0).any():
                    negative_count = (df[col] < 0).sum()
                    alert = {
                        'table': table_name,
                        'type': 'negative_values',
                        'severity': self.monitoring_rules['negative_values']['severity'],
                        'message': f"{table_name}.{col}: 发现 {negative_count} 个负值",
                        'value': negative_count,
                        'threshold': 0
                    }
                    results['alerts'].append(alert)
                    self._record_alert(alert)
            
            # 检查异常值（针对数值列）
            for col in numeric_cols:
                if df[col].dtype in [np.float64, np.int64]:
                    mean = df[col].mean()
                    std = df[col].std()
                    if std > 0:
                        outliers = df[abs(df[col] - mean) > self.monitoring_rules['outliers']['threshold'] * std]
                        if len(outliers) > 0:
                            outlier_percentage = (len(outliers) / len(df)) * 100
                            if outlier_percentage > 1:  # 超过1%的异常值才告警
                                alert = {
                                    'table': table_name,
                                    'type': 'outliers',
                                    'severity': self.monitoring_rules['outliers']['severity'],
                                    'message': f"{table_name}.{col}: 发现 {len(outliers)} 个异常值 ({outlier_percentage:.2f}%)",
                                    'value': outlier_percentage,
                                    'threshold': 1
                                }
                                results['alerts'].append(alert)
                                self._record_alert(alert)
            
        except Exception as e:
            error_alert = {
                'table': table_name,
                'type': 'monitoring_error',
                'severity': 'error',
                'message': f"监控表 {table_name} 时出错: {str(e)}"
            }
            results['alerts'].append(error_alert)
            self._record_alert(error_alert)
        
        return results
    
    def _monitor_cross_table_integrity(self) -> Dict:
        """监控跨表引用完整性"""
        results = {
            'type': 'cross_table_integrity',
            'alerts': []
        }
        
        try:
            # 检查订单-客户引用完整性
            cursor = self.connection.cursor()
            
            cursor.execute('''
            SELECT COUNT(*) FROM orders o 
            LEFT JOIN customers c ON o.customer_id = c.customer_id 
            WHERE c.customer_id IS NULL
            ''')
            missing_customers = cursor.fetchone()[0]
            
            if missing_customers > 0:
                alert = {
                    'type': 'referential_integrity',
                    'severity': self.monitoring_rules['referential_integrity']['severity'],
                    'message': f"订单表引用了 {missing_customers} 个不存在的客户ID",
                    'value': missing_customers,
                    'threshold': 0
                }
                results['alerts'].append(alert)
                self._record_alert(alert)
            
            # 检查订单-产品引用完整性
            cursor.execute('''
            SELECT COUNT(*) FROM orders o 
            LEFT JOIN products p ON o.product_id = p.product_id 
            WHERE p.product_id IS NULL
            ''')
            missing_products = cursor.fetchone()[0]
            
            if missing_products > 0:
                alert = {
                    'type': 'referential_integrity',
                    'severity': self.monitoring_rules['referential_integrity']['severity'],
                    'message': f"订单表引用了 {missing_products} 个不存在的产品ID",
                    'value': missing_products,
                    'threshold': 0
                }
                results['alerts'].append(alert)
                self._record_alert(alert)
            
            # 检查行为日志引用完整性
            cursor.execute('''
            SELECT COUNT(*) FROM behavior_logs bl 
            LEFT JOIN customers c ON bl.customer_id = c.customer_id 
            WHERE c.customer_id IS NULL
            ''')
            missing_behavior_customers = cursor.fetchone()[0]
            
            if missing_behavior_customers > 0:
                alert = {
                    'type': 'referential_integrity',
                    'severity': self.monitoring_rules['referential_integrity']['severity'],
                    'message': f"行为日志表引用了 {missing_behavior_customers} 个不存在的客户ID",
                    'value': missing_behavior_customers,
                    'threshold': 0
                }
                results['alerts'].append(alert)
                self._record_alert(alert)
            
        except Exception as e:
            error_alert = {
                'type': 'integrity_check_error',
                'severity': 'error',
                'message': f"检查引用完整性时出错: {str(e)}"
            }
            results['alerts'].append(error_alert)
            self._record_alert(error_alert)
        
        return results
    
    def _monitor_business_rules(self) -> Dict:
        """监控业务规则"""
        results = {
            'type': 'business_rules',
            'alerts': []
        }
        
        try:
            # 检查订单金额计算正确性
            cursor = self.connection.cursor()
            
            cursor.execute('''
            SELECT COUNT(*) FROM orders 
            WHERE ABS(amount - (unit_price * quantity)) > 0.01
            ''')
            amount_errors = cursor.fetchone()[0]
            
            if amount_errors > 0:
                alert = {
                    'type': 'business_rule',
                    'severity': 'error',
                    'message': f"发现 {amount_errors} 个订单金额计算错误",
                    'value': amount_errors,
                    'threshold': 0
                }
                results['alerts'].append(alert)
                self._record_alert(alert)
            
            # 检查产品价格合理性
            cursor.execute('''
            SELECT COUNT(*) FROM products WHERE price <= 0
            ''')
            invalid_prices = cursor.fetchone()[0]
            
            if invalid_prices > 0:
                alert = {
                    'type': 'business_rule',
                    'severity': 'error',
                    'message': f"发现 {invalid_prices} 个无效的产品价格",
                    'value': invalid_prices,
                    'threshold': 0
                }
                results['alerts'].append(alert)
                self._record_alert(alert)
            
            # 检查库存合理性
            cursor.execute('''
            SELECT COUNT(*) FROM products WHERE stock_quantity < 0
            ''')
            negative_stock = cursor.fetchone()[0]
            
            if negative_stock > 0:
                alert = {
                    'type': 'business_rule',
                    'severity': 'error',
                    'message': f"发现 {negative_stock} 个负库存产品",
                    'value': negative_stock,
                    'threshold': 0
                }
                results['alerts'].append(alert)
                self._record_alert(alert)
            
            # 检查订单日期合理性
            cursor.execute('''
            SELECT COUNT(*) FROM orders WHERE order_date > date('now')
            ''')
            future_orders = cursor.fetchone()[0]
            
            if future_orders > 0:
                alert = {
                    'type': 'business_rule',
                    'severity': 'warning',
                    'message': f"发现 {future_orders} 个未来日期的订单",
                    'value': future_orders,
                    'threshold': 0
                }
                results['alerts'].append(alert)
                self._record_alert(alert)
            
        except Exception as e:
            error_alert = {
                'type': 'business_rule_error',
                'severity': 'error',
                'message': f"检查业务规则时出错: {str(e)}"
            }
            results['alerts'].append(error_alert)
            self._record_alert(error_alert)
        
        return results
    
    def _record_alert(self, alert: Dict):
        """记录告警"""
        alert['timestamp'] = datetime.now().isoformat()
        self.alert_history.append(alert)
        
        # 根据严重程度打印不同颜色的日志
        severity = alert.get('severity', 'info')
        message = alert.get('message', '')
        
        if severity == 'error':
            logger.error(f"[ERROR] {message}")
        elif severity == 'warning':
            logger.warning(f"[WARNING] {message}")
        else:
            logger.info(f"[INFO] {message}")
    
    def _generate_monitoring_report(self, results: Dict):
        """生成监控报告"""
        try:
            report_data = {
                'monitoring_summary': results['summary'],
                'alerts': results['alerts'],
                'metrics': results['metrics'],
                'timestamp': results['timestamp'],
                'tables_monitored': results['tables_monitored']
            }
            
            # 保存JSON报告
            with open('data_quality_monitoring_report.json', 'w', encoding='utf-8') as f:
                json.dump(report_data, f, indent=2, ensure_ascii=False, default=str)
            
            # 生成文本报告
            self._generate_monitoring_text_report(report_data)
            
            # 生成可视化报告
            self._generate_monitoring_visualization(report_data)
            
            logger.info(f"[INFO] 监控报告已生成: data_quality_monitoring_report.json")
            
        except Exception as e:
            logger.error(f"[ERROR] 生成监控报告失败: {str(e)}")
    
    def _generate_monitoring_text_report(self, report_data: Dict):
        """生成文本格式监控报告"""
        report_lines = []
        report_lines.append("="*80)
        report_lines.append(" [REPORT] 数据质量监控报告")
        report_lines.append(f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        report_lines.append("="*80)
        
        # 摘要
        summary = report_data['monitoring_summary']
        report_lines.append(f"\n [SUMMARY] 监控摘要")
        report_lines.append(f"-"*40)
        report_lines.append(f"监控表数: {len(report_data['tables_monitored'])}")
        report_lines.append(f"总告警数: {summary['total_alerts']}")
        report_lines.append(f"严重告警: {summary['critical_alerts']}")
        report_lines.append(f"警告告警: {summary['warning_alerts']}")
        report_lines.append(f"信息告警: {summary['info_alerts']}")
        
        # 告警详情
        alerts = report_data['alerts']
        if alerts:
            report_lines.append(f"\n [ALERTS] 告警详情")
            report_lines.append(f"-"*40)
            
            # 按严重程度分组
            error_alerts = [a for a in alerts if a['severity'] == 'error']
            warning_alerts = [a for a in alerts if a['severity'] == 'warning']
            info_alerts = [a for a in alerts if a['severity'] == 'info']
            
            if error_alerts:
                report_lines.append(f"\n[严重告警] ({len(error_alerts)} 个):")
                for alert in error_alerts[:5]:  # 只显示前5个
                    report_lines.append(f"  • {alert['message']}")
                if len(error_alerts) > 5:
                    report_lines.append(f"  ... 还有 {len(error_alerts) - 5} 个严重告警")
            
            if warning_alerts:
                report_lines.append(f"\n[警告告警] ({len(warning_alerts)} 个):")
                for alert in warning_alerts[:5]:
                    report_lines.append(f"  • {alert['message']}")
                if len(warning_alerts) > 5:
                    report_lines.append(f"  ... 还有 {len(warning_alerts) - 5} 个警告告警")
            
            if info_alerts:
                report_lines.append(f"\n[信息告警] ({len(info_alerts)} 个):")
                for alert in info_alerts[:3]:
                    report_lines.append(f"  • {alert['message']}")
                if len(info_alerts) > 3:
                    report_lines.append(f"  ... 还有 {len(info_alerts) - 3} 个信息告警")
        
        else:
            report_lines.append(f"\n [OK] 未发现任何告警，数据质量良好!")
        
        # 保存文本报告
        with open('data_quality_report.txt', 'w', encoding='utf-8') as f:
            f.write('\n'.join(report_lines))
        
        # 打印报告
        print('\n'.join(report_lines))
    
    def _generate_monitoring_visualization(self, report_data: Dict):
        """生成监控可视化图表"""
        try:
            fig = make_subplots(
                rows=2, cols=2,
                subplot_titles=('告警严重程度分布', '告警类型分布', 
                              '监控历史趋势', '表级问题数量'),
                specs=[[{'type': 'pie'}, {'type': 'bar'}],
                       [{'type': 'line'}, {'type': 'bar'}]]
            )
            
            # 1. 告警严重程度分布饼图
            summary = report_data['monitoring_summary']
            severity_counts = [
                summary['critical_alerts'],
                summary['warning_alerts'],
                summary['info_alerts']
            ]
            severity_labels = ['严重', '警告', '信息']
            severity_colors = ['red', 'yellow', 'blue']
            
            fig.add_trace(
                go.Pie(labels=severity_labels, values=severity_counts, 
                      marker_colors=severity_colors, name='告警严重程度'),
                row=1, col=1
            )
            
            # 2. 告警类型分布柱状图
            alerts = report_data['alerts']
            if alerts:
                alert_types = [a['type'] for a in alerts]
                type_counts = pd.Series(alert_types).value_counts()
                
                fig.add_trace(
                    go.Bar(x=type_counts.index.tolist(), y=type_counts.values.tolist(),
                          name='告警类型', marker_color='lightcoral'),
                    row=1, col=2
                )
            
            # 3. 监控历史趋势线图
            if self.quality_metrics_history:
                history_dates = [h['timestamp'] for h in self.quality_metrics_history]
                history_alerts = [h['total_alerts'] for h in self.quality_metrics_history]
                history_critical = [h['critical_alerts'] for h in self.quality_metrics_history]
                
                fig.add_trace(
                    go.Scatter(x=history_dates, y=history_alerts, mode='lines+markers',
                              name='总告警数', line=dict(color='blue')),
                    row=2, col=1
                )
                
                fig.add_trace(
                    go.Scatter(x=history_dates, y=history_critical, mode='lines+markers',
                              name='严重告警数', line=dict(color='red')),
                    row=2, col=1
                )
            
            # 4. 表级问题数量
            metrics = report_data.get('metrics', {})
            if metrics:
                table_names = list(metrics.keys())
                table_problems = []
                
                for table in table_names:
                    table_metrics = metrics[table]
                    # 计算问题分数
                    problem_score = (table_metrics.get('missing_percentage', 0) / 5 + 
                                   table_metrics.get('duplicate_percentage', 0))
                    table_problems.append(problem_score)
                
                fig.add_trace(
                    go.Bar(x=table_names, y=table_problems, name='表问题分数',
                          marker_color='lightgreen'),
                    row=2, col=2
                )
            
            # 更新布局
            fig.update_layout(
                height=700,
                showlegend=True,
                title_text="数据质量监控仪表板",
                title_font_size=20
            )
            
            # 保存为HTML
            fig.write_html("data_quality_dashboard.html")
            
            # 在Jupyter中显示
            fig.show()
            
            logger.info(f"[INFO] 监控仪表板已生成: data_quality_dashboard.html")
            
        except Exception as e:
            logger.error(f"[ERROR] 生成监控可视化失败: {str(e)}")
    
    def setup_real_time_monitoring(self, interval_minutes: int = 5):
        """设置实时监控（模拟）"""
        logger.info(f"[INFO] 设置实时监控，每 {interval_minutes} 分钟运行一次")
        
        def monitoring_job():
            logger.info("[TIME] 执行定时监控任务...")
            results = self.run_comprehensive_monitoring()
            
            # 如果有严重告警，发送通知
            if results['summary']['critical_alerts'] > 0:
                self._send_alert_notification(results)
        
        # 在实际应用中，这里会使用定时任务调度器
        # 这里只是模拟
        logger.info("[INFO] 实时监控已设置（模拟模式）")
        logger.info(f"[INFO] 下次监控将在 {interval_minutes} 分钟后运行")
    
    def _send_alert_notification(self, results: Dict):
        """发送告警通知（模拟）"""
        critical_alerts = results['summary']['critical_alerts']
        
        notification = f"""
        [ALERT] 数据质量告警通知
        ====================
        时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
        发现 {critical_alerts} 个严重告警！
        
        请立即检查数据质量监控报告：
        - data_quality_monitoring_report.json
        - data_quality_report.txt
        - data_quality_dashboard.html
        
        需要立即处理的问题：
        """
        
        # 获取前3个严重告警
        critical_alerts_list = [a for a in results['alerts'] if a['severity'] == 'error'][:3]
        for alert in critical_alerts_list:
            notification += f"\n• {alert['message']}"
        
        # 在实际应用中，这里会发送邮件、Slack消息等
        logger.warning("[WARNING] 发送告警通知（模拟）:")
        print(notification)
    
    def get_quality_score(self) -> float:
        """计算数据质量综合评分"""
        try:
            if not self.connection:
                self.connect_to_db()
            
            # 运行监控获取最新数据
            results = self.run_comprehensive_monitoring()
            
            # 计算质量评分
            total_alerts = results['summary']['total_alerts']
            critical_alerts = results['summary']['critical_alerts']
            
            # 基础分100分
            score = 100
            
            # 扣分规则
            score -= critical_alerts * 10  # 每个严重告警扣10分
            score -= (total_alerts - critical_alerts) * 2  # 每个非严重告警扣2分
            
            # 确保分数在0-100之间
            score = max(0, min(100, score))
            
            return score
            
        except Exception as e:
            logger.error(f"[ERROR] 计算质量评分失败: {str(e)}")
            return 0
    
    def close_connection(self):
        """关闭数据库连接"""
        if self.connection:
            self.connection.close()
            logger.info("[INFO] 监控器数据库连接已关闭")

logger.info("[OK] 数据质量监控模块定义完成")

2025-12-02 17:39:57,626 - ETL_Pipeline - INFO - [OK] 数据质量监控模块定义完成


# 4.ETL管道自动化运行代码

In [8]:
class ETLOrchestrator:
    """
    ETL管道自动化编排器
    管理ETL管道的调度、执行和监控
    """
    
    def __init__(self):
        """初始化编排器"""
        self.etl_pipeline = None
        self.quality_monitor = None
        self.schedule_config = {
            'etl_schedule': 'daily',  # daily, weekly, manual
            'monitoring_interval': 60,  # 监控间隔（分钟）
            'retry_count': 3,
            'notify_on_failure': True
        }
        self.execution_tracker = []
        
        logger.info("[INFO] ETL编排器初始化完成")
    
    def initialize_components(self, source_dir: str = '.', db_path: str = 'ecommerce_clean.db'):
        """初始化组件"""
        logger.info("[INFO] 初始化ETL组件...")
        
        # 初始化ETL管道
        self.etl_pipeline = EcommerceETLPipeline(source_dir=source_dir, db_path=db_path)
        logger.info("[OK] ETL管道初始化完成")
        
        # 初始化数据质量监控器
        self.quality_monitor = DataQualityMonitor(db_path=db_path)
        logger.info("[OK] 数据质量监控器初始化完成")
        
        # 检查数据文件
        self._check_data_files(source_dir)
        
        logger.info("[INFO] 所有组件初始化完成")
    
    def _check_data_files(self, source_dir: str):
        """检查数据文件"""
        logger.info("[INFO] 检查数据文件...")
        
        required_files = [
            'customers.csv',
            'products.csv',
            'orders.csv',
            'behavior_logs.csv',
            'time_dim.csv',
            'regions.csv'
        ]
        
        missing_files = []
        
        for file_name in required_files:
            file_path = os.path.join(source_dir, file_name)
            if os.path.exists(file_path):
                file_size = os.path.getsize(file_path) / (1024 * 1024)  # MB
                logger.info(f"  [OK] {file_name}: {file_size:.2f} MB")
            else:
                missing_files.append(file_name)
                logger.warning(f"  [WARNING] {file_name}: 文件不存在")
        
        if missing_files:
            logger.error(f"[ERROR] 缺失文件: {', '.join(missing_files)}")
            logger.error("[ERROR] 请确保所有CSV文件在当前目录中")
        else:
            logger.info("[INFO] 所有数据文件检查通过")
    
    def run_full_workflow(self):
        """运行完整的工作流"""
        logger.info("="*80)
        logger.info(" [WORKFLOW] 开始运行完整ETL工作流")
        logger.info("="*80)
        
        workflow_start = datetime.now()
        workflow_id = f"WORKFLOW_{workflow_start.strftime('%Y%m%d_%H%M%S')}"
        
        print(f"\n [PLAN] 工作流执行计划")
        print("-"*40)
        print("1.  数据提取")
        print("2.  数据清洗和转换")
        print("3.  数据加载到数据库")
        print("4.  数据质量验证")
        print("5.   创建数据仓库视图")
        print("6.  数据质量监控")
        print("7.  生成报告和仪表板")
        print("-"*40)
        
        try:
            # 步骤1: 运行ETL管道
            logger.info("\n[STEP1] 运行ETL管道")
            etl_results = self.etl_pipeline.run_full_pipeline(generate_reports=True)
            
            if etl_results['status'] != 'success':
                logger.error("[ERROR] ETL管道运行失败，停止工作流")
                return self._handle_workflow_failure(workflow_id, workflow_start, "ETL管道失败")
            
            # 步骤2: 运行数据质量监控
            logger.info("\n[STEP2] 运行数据质量监控")
            monitoring_results = self.quality_monitor.run_comprehensive_monitoring()
            
            # 步骤3: 计算总体质量评分
            logger.info("\n[STEP3] 计算总体质量评分")
            quality_score = self.quality_monitor.get_quality_score()
            
            # 步骤4: 设置实时监控
            logger.info("\n[STEP4] 设置实时监控")
            self.quality_monitor.setup_real_time_monitoring(
                interval_minutes=self.schedule_config['monitoring_interval']
            )
            
            # 记录执行历史
            workflow_end = datetime.now()
            execution_record = {
                'workflow_id': workflow_id,
                'start_time': workflow_start,
                'end_time': workflow_end,
                'duration': (workflow_end - workflow_start).total_seconds(),
                'etl_status': etl_results['status'],
                'quality_score': quality_score,
                'monitoring_alerts': monitoring_results['summary']['total_alerts']
            }
            self.execution_tracker.append(execution_record)
            
            # 生成工作流报告
            self._generate_workflow_report(execution_record, etl_results, monitoring_results)
            
            # 显示结果
            self._display_workflow_summary(execution_record, etl_results, monitoring_results)
            
            logger.info(f"[SUCCESS] 完整ETL工作流运行成功!")
            
            return {
                'workflow_id': workflow_id,
                'status': 'success',
                'etl_results': etl_results,
                'monitoring_results': monitoring_results,
                'quality_score': quality_score
            }
            
        except Exception as e:
            logger.error(f"[ERROR] 工作流运行失败: {str(e)}")
            return self._handle_workflow_failure(workflow_id, workflow_start, str(e))
    
    def _handle_workflow_failure(self, workflow_id: str, start_time: datetime, error: str):
        """处理工作流失败"""
        end_time = datetime.now()
        
        execution_record = {
            'workflow_id': workflow_id,
            'start_time': start_time,
            'end_time': end_time,
            'duration': (end_time - start_time).total_seconds(),
            'status': 'failed',
            'error': error
        }
        self.execution_tracker.append(execution_record)
        
        # 发送失败通知
        if self.schedule_config['notify_on_failure']:
            self._send_failure_notification(execution_record)
        
        logger.error(f"[ERROR] 工作流执行失败: {error}")
        
        return {
            'workflow_id': workflow_id,
            'status': 'failed',
            'error': error
        }
    
    def _generate_workflow_report(self, execution_record: Dict, etl_results: Dict, monitoring_results: Dict):
        """生成工作流报告"""
        try:
            report_data = {
                'workflow_summary': execution_record,
                'etl_execution': etl_results,
                'quality_monitoring': monitoring_results,
                'generated_files': self._list_generated_files(),
                'recommendations': self._generate_recommendations(monitoring_results),
                'generated_at': datetime.now().isoformat()
            }
            
            # 保存JSON报告
            with open('etl_workflow_report.json', 'w', encoding='utf-8') as f:
                json.dump(report_data, f, indent=2, ensure_ascii=False, default=str)
            
            # 生成文本报告
            self._generate_workflow_text_report(report_data)
            
            logger.info(f"[INFO] 工作流报告已生成: etl_workflow_report.json")
            
        except Exception as e:
            logger.error(f"[ERROR] 生成工作流报告失败: {str(e)}")
    
    def _list_generated_files(self) -> List[str]:
        """列出生成的文件"""
        generated_files = []
        
        # 检查ETL生成的文件
        etl_files = ['etl_execution_report.json', 'etl_report.txt', 'etl_dashboard.html']
        for file in etl_files:
            if os.path.exists(file):
                generated_files.append(file)
        
        # 检查监控生成的文件
        monitor_files = ['data_quality_monitoring_report.json', 'data_quality_report.txt', 
                        'data_quality_dashboard.html']
        for file in monitor_files:
            if os.path.exists(file):
                generated_files.append(file)
        
        # 检查清洗后的数据文件
        if os.path.exists('cleaned_data'):
            for file in os.listdir('cleaned_data'):
                if file.endswith('.csv'):
                    generated_files.append(f'cleaned_data/{file}')
        
        return generated_files
    
    def _generate_recommendations(self, monitoring_results: Dict) -> List[str]:
        """生成改进建议"""
        recommendations = []
        alerts = monitoring_results.get('alerts', [])
        
        # 分析告警生成建议
        error_alerts = [a for a in alerts if a['severity'] == 'error']
        warning_alerts = [a for a in alerts if a['severity'] == 'warning']
        
        if error_alerts:
            recommendations.append("立即处理严重告警，确保数据准确性")
        
        if warning_alerts:
            recommendations.append("定期检查警告告警，预防数据质量问题")
        
        # 检查缺失值
        missing_alerts = [a for a in alerts if a['type'] == 'missing_values']
        if missing_alerts:
            recommendations.append("考虑实施缺失值填充策略")
        
        # 检查重复数据
        duplicate_alerts = [a for a in alerts if a['type'] == 'duplicate_rows']
        if duplicate_alerts:
            recommendations.append("实施数据去重流程，定期清理重复数据")
        
        # 检查引用完整性
        integrity_alerts = [a for a in alerts if a['type'] == 'referential_integrity']
        if integrity_alerts:
            recommendations.append("修复引用完整性错误，确保数据一致性")
        
        # 通用建议
        recommendations.append("定期备份清洗后的数据")
        recommendations.append("建立数据质量监控计划")
        recommendations.append("培训团队数据质量标准和最佳实践")
        
        return recommendations
    
    def _generate_workflow_text_report(self, report_data: Dict):
        """生成工作流文本报告"""
        report_lines = []
        report_lines.append("="*80)
        report_lines.append(" [REPORT] ETL工作流执行报告")
        report_lines.append(f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        report_lines.append("="*80)
        
        # 工作流摘要
        summary = report_data['workflow_summary']
        report_lines.append(f"\n [SUMMARY] 工作流摘要")
        report_lines.append(f"-"*40)
        report_lines.append(f"工作流ID: {summary['workflow_id']}")
        report_lines.append(f"状态: {summary.get('status', 'completed')}")
        report_lines.append(f"开始时间: {summary['start_time']}")
        report_lines.append(f"结束时间: {summary['end_time']}")
        report_lines.append(f"总耗时: {summary['duration']:.2f} 秒")
        
        if 'quality_score' in summary:
            report_lines.append(f"数据质量评分: {summary['quality_score']:.1f}/100")
        
        if 'monitoring_alerts' in summary:
            report_lines.append(f"监控告警数: {summary['monitoring_alerts']}")
        
        # 生成的文件
        generated_files = report_data.get('generated_files', [])
        report_lines.append(f"\n [FILES] 生成的文件")
        report_lines.append(f"-"*40)
        
        if generated_files:
            for file in generated_files:
                report_lines.append(f"  [OK] {file}")
        else:
            report_lines.append("  未生成文件")
        
        # 改进建议
        recommendations = report_data.get('recommendations', [])
        if recommendations:
            report_lines.append(f"\n [RECOMMEND] 改进建议")
            report_lines.append(f"-"*40)
            for i, rec in enumerate(recommendations, 1):
                report_lines.append(f"{i}. {rec}")
        
        # 保存文本报告
        with open('etl_workflow_report.txt', 'w', encoding='utf-8') as f:
            f.write('\n'.join(report_lines))
    
    def _display_workflow_summary(self, execution_record: Dict, etl_results: Dict, monitoring_results: Dict):
        """显示工作流摘要"""
        print("\n" + "="*80)
        print(" [COMPLETED] ETL工作流执行完成!")
        print("="*80)
        
        print(f"\n [SUMMARY] 执行摘要")
        print("-"*40)
        print(f"工作流ID: {execution_record['workflow_id']}")
        print(f"执行状态: 成功")
        print(f"总耗时: {execution_record['duration']:.2f} 秒")
        
        if 'quality_score' in execution_record:
            score = execution_record['quality_score']
            if score >= 80:
                rating = "[PASS]"
            elif score >= 60:
                rating = "[WARN]"
            else:
                rating = "[FAIL]"
            print(f"数据质量评分: {rating} {score:.1f}/100")
        
        # ETL结果
        etl_stages = etl_results.get('stages', {})
        print(f"\n [ETL] ETL管道结果")
        print("-"*40)
        
        for stage_name, stage_info in etl_stages.items():
            duration = stage_info.get('duration', 0)
            status = stage_info.get('status', 'unknown')
            if status == 'completed':
                status_icon = '[OK]'
            elif status == 'partial':
                status_icon = '[PARTIAL]'
            else:
                status_icon = '[FAIL]'
            print(f"{status_icon} {stage_name:15s}: {duration:6.2f} 秒")
        
        # 监控结果
        monitoring_summary = monitoring_results.get('summary', {})
        print(f"\n [MONITOR] 数据质量监控结果")
        print("-"*40)
        print(f"总告警数: {monitoring_summary.get('total_alerts', 0)}")
        print(f"严重告警:  {monitoring_summary.get('critical_alerts', 0)}")
        print(f"警告告警:  {monitoring_summary.get('warning_alerts', 0)}")
        print(f"信息告警:  {monitoring_summary.get('info_alerts', 0)}")
        
        # 生成的文件
        print(f"\n [FILES] 生成的文件和报告")
        print("-"*40)
        
        generated_files = self._list_generated_files()
        if generated_files:
            categories = {}
            for file in generated_files:
                if 'etl' in file:
                    categories.setdefault('ETL报告', []).append(file)
                elif 'quality' in file:
                    categories.setdefault('质量报告', []).append(file)
                elif 'dashboard' in file:
                    categories.setdefault('仪表板', []).append(file)
                elif 'cleaned_data' in file:
                    categories.setdefault('清洗数据', []).append(file)
                else:
                    categories.setdefault('其他', []).append(file)
            
            for category, files in categories.items():
                print(f"\n{category}:")
                for file in files:
                    print(f"  [OK] {file}")
        else:
            print("未生成文件")
        
        print("\n" + "="*80)
        print(" [SUCCESS] 所有任务完成! 建议查看生成的报告和仪表板")
        print("="*80)
    
    def _send_failure_notification(self, execution_record: Dict):
        """发送失败通知（模拟）"""
        notification = f"""
        [ALERT] ETL工作流失败通知
        ====================
        工作流ID: {execution_record['workflow_id']}
        失败时间: {execution_record['end_time']}
        错误信息: {execution_record['error']}
        持续时间: {execution_record['duration']:.2f} 秒
        
        请立即检查:
        1. 数据源文件是否存在
        2. 数据库连接是否正常
        3. 查看 etl_pipeline.log 获取详细错误信息
        """
        
        logger.error("[ERROR] 发送失败通知（模拟）:")
        print(notification)
    
    def schedule_daily_etl(self, hour: int = 2, minute: int = 0):
        """安排每日ETL任务（模拟）"""
        logger.info(f"[INFO] 安排每日ETL任务，时间: {hour:02d}:{minute:02d}")
        
        # 在实际应用中，这里会使用调度器如APScheduler或cron
        # 这里只是模拟
        logger.info("[INFO] 每日调度已设置（模拟模式）")
        logger.info(f"[INFO] ETL任务将每天 {hour:02d}:{minute:02d} 自动运行")
    
    def get_execution_history(self) -> pd.DataFrame:
        """获取执行历史"""
        if self.execution_tracker:
            return pd.DataFrame(self.execution_tracker)
        else:
            return pd.DataFrame()

logger.info("[OK] ETL编排器类定义完成")

2025-12-02 17:41:12,125 - ETL_Pipeline - INFO - [OK] ETL编排器类定义完成


# 5.运行ETL工作流

In [9]:
# 代码块5：运行完整的ETL工作流
print("[START] 开始运行完整的ETL工作流")
print("="*80)

# 1. 初始化编排器
print("1. [STEP1] 初始化ETL编排器...")
orchestrator = ETLOrchestrator()

# 2. 初始化组件
print("2. [STEP2] 初始化ETL组件...")
orchestrator.initialize_components(source_dir='.', db_path='ecommerce_clean.db')

# 3. 运行完整工作流
print("3. [STEP3] 运行完整ETL工作流...")
print("-"*80)

workflow_results = orchestrator.run_full_workflow()

# 4. 显示最终结果
print("\n" + "="*80)
print("[COMPLETED] ETL工作流执行完成")
print("="*80)

if workflow_results.get('status') == 'success':
    print("\n[SUCCESS] 工作流执行成功!")
    
    # 显示关键指标
    quality_score = workflow_results.get('quality_score', 0)
    if quality_score >= 90:
        rating = "优秀 [TOP]"
    elif quality_score >= 75:
        rating = "良好 [GOOD]"
    elif quality_score >= 60:
        rating = "一般 [WARN]"
    else:
        rating = "需要改进 [FIX]"
    
    print(f"\n[METRICS] 关键指标:")
    print(f"  数据质量评分: {quality_score:.1f}/100 ({rating})")
    
    # 检查生成的文件
    print(f"\n[FILES] 生成的文件:")
    if os.path.exists('cleaned_data'):
        csv_files = [f for f in os.listdir('cleaned_data') if f.endswith('.csv')]
        for file in csv_files:
            file_path = os.path.join('cleaned_data', file)
            size_mb = os.path.getsize(file_path) / (1024 * 1024)
            print(f"  [OK] {file}: {size_mb:.2f} MB")
    
    print(f"\n[REPORTS] 生成的报告:")
    report_files = ['etl_report.txt', 'data_quality_report.txt', 
                   'etl_workflow_report.txt']
    for file in report_files:
        if os.path.exists(file):
            print(f"  [OK] {file}")
    
    print(f"\n[DASHBOARDS] 生成的仪表板:")
    dashboard_files = ['etl_dashboard.html', 'data_quality_dashboard.html']
    for file in dashboard_files:
        if os.path.exists(file):
            print(f"  [OK] {file}")
    
    print(f"\n[DATABASE] 生成的数据库: ecommerce_clean.db")
    
    # 下一步建议
    print(f"\n[TIPS] 下一步建议:")
    print("  1. 查看 etl_dashboard.html 了解ETL执行详情")
    print("  2. 查看 data_quality_dashboard.html 监控数据质量")
    print("  3. 使用清洗后的数据进行业务分析")
    print("  4. 定期运行数据质量监控")
    
else:
    print(f"\n[FAILURE] 工作流执行失败!")
    print(f"   错误: {workflow_results.get('error', '未知错误')}")
    print(f"\n[TROUBLESHOOTING] 故障排除:")
    print("  1. 检查所有CSV文件是否在当前目录")
    print("  2. 查看 etl_pipeline.log 获取详细错误信息")
    print("  3. 确保有足够的磁盘空间")
    print("  4. 检查数据库文件权限")

print("\n" + "="*80)
print("[COMPLETED] ETL管道架构和数据质量监控系统已部署完成!")
print("="*80)

2025-12-02 17:41:26,974 - ETL_Pipeline - INFO - [INFO] ETL编排器初始化完成
2025-12-02 17:41:26,976 - ETL_Pipeline - INFO - [INFO] 初始化ETL组件...
2025-12-02 17:41:26,977 - ETL_Pipeline - INFO - ETL管道初始化 - 源目录: ., 数据库: ecommerce_clean.db
2025-12-02 17:41:26,977 - ETL_Pipeline - INFO - [OK] ETL管道初始化完成
2025-12-02 17:41:26,978 - ETL_Pipeline - INFO - [INFO] 数据质量监控器初始化 - 数据库: ecommerce_clean.db
2025-12-02 17:41:26,979 - ETL_Pipeline - INFO - [OK] 数据质量监控器初始化完成
2025-12-02 17:41:26,980 - ETL_Pipeline - INFO - [INFO] 检查数据文件...
2025-12-02 17:41:26,981 - ETL_Pipeline - INFO -   [OK] customers.csv: 0.69 MB
2025-12-02 17:41:26,982 - ETL_Pipeline - INFO -   [OK] products.csv: 0.02 MB
2025-12-02 17:41:26,984 - ETL_Pipeline - INFO -   [OK] orders.csv: 4.30 MB
2025-12-02 17:41:26,985 - ETL_Pipeline - INFO -   [OK] behavior_logs.csv: 6.71 MB
2025-12-02 17:41:26,986 - ETL_Pipeline - INFO -   [OK] time_dim.csv: 0.05 MB
2025-12-02 17:41:26,987 - ETL_Pipeline - INFO -   [OK] regions.csv: 0.00 MB
2025-12-02 17:41:26,988

[START] 开始运行完整的ETL工作流
1. [STEP1] 初始化ETL编排器...
2. [STEP2] 初始化ETL组件...
3. [STEP3] 运行完整ETL工作流...
--------------------------------------------------------------------------------

 [PLAN] 工作流执行计划
----------------------------------------
1.  数据提取
2.  数据清洗和转换
3.  数据加载到数据库
4.  数据质量验证
5.   创建数据仓库视图
6.  数据质量监控
7.  生成报告和仪表板
----------------------------------------


2025-12-02 17:41:27,231 - ETL_Pipeline - INFO -    [OK] 已提取: orders.csv (50,150 行)
2025-12-02 17:41:27,492 - ETL_Pipeline - INFO -    [OK] 已提取: behavior_logs.csv (100,500 行)
2025-12-02 17:41:27,494 - ETL_Pipeline - INFO - [INFO] 数据提取完成，共加载 6 个表
2025-12-02 17:41:27,495 - ETL_Pipeline - INFO - [阶段2] 数据清洗和转换
2025-12-02 17:41:27,496 - ETL_Pipeline - INFO - [INFO] 开始数据清洗和转换...
2025-12-02 17:41:27,497 - ETL_Pipeline - INFO - [INFO] 清洗表: customers
2025-12-02 17:41:27,591 - ETL_Pipeline - INFO -    [OK] 清洗完成: 5,000 → 4,911 行 (移除: 1.8%)
2025-12-02 17:41:27,593 - ETL_Pipeline - INFO - [INFO] 清洗表: products
2025-12-02 17:41:27,607 - ETL_Pipeline - INFO -    [OK] 清洗完成: 200 → 200 行 (移除: 0.0%)
2025-12-02 17:41:27,608 - ETL_Pipeline - INFO - [INFO] 清洗表: time_dim
2025-12-02 17:41:27,619 - ETL_Pipeline - INFO -    [OK] 清洗完成: 1,096 → 1,096 行 (移除: 0.0%)
2025-12-02 17:41:27,620 - ETL_Pipeline - INFO - [INFO] 清洗表: regions
2025-12-02 17:41:27,624 - ETL_Pipeline - INFO -    [OK] 清洗完成: 6 → 6 行 (移除: 0.0%)
2025-

 ETL管道执行报告
生成时间: 2025-12-02 17:41:31

 [SUMMARY] 执行摘要
----------------------------------------
执行ID: ETL_20251202_174126
状态: success
开始时间: 2025-12-02T17:41:26.996870
结束时间: 2025-12-02T17:41:31.333903
总耗时: 4.34 秒
处理表数: 6

 [STAGES] 阶段详情
----------------------------------------
extraction     : completed  | 耗时: 0.50秒
transformation : completed  | 耗时: 1.04秒
loading        : completed  | 耗时: 1.77秒
validation     : completed  | 耗时: 1.02秒
warehouse      : completed  | 耗时: 0.00秒

 [QUALITY] 数据质量报告
----------------------------------------
总体质量评分: 91.7/100
发现问题总数: 5

 各表质量:
  customers      :  90.0/100 | 问题数: 1
  products       :  90.0/100 | 问题数: 1
  orders         :  90.0/100 | 问题数: 1
  behavior_logs  :  90.0/100 | 问题数: 1
  time_dim       :  90.0/100 | 问题数: 1
  regions        : 100.0/100 | 问题数: 0


2025-12-02 17:41:32,037 - ETL_Pipeline - ERROR - [ERROR] orders.browsing_duration_seconds: 发现 245 个负值
2025-12-02 17:41:32,039 - ETL_Pipeline - ERROR - [ERROR] orders.click_count: 发现 198 个负值
2025-12-02 17:41:32,063 - ETL_Pipeline - INFO - [INFO] 监控表: behavior_logs
2025-12-02 17:41:32,752 - ETL_Pipeline - ERROR - [ERROR] 订单表引用了 1457 个不存在的客户ID
2025-12-02 17:41:32,782 - ETL_Pipeline - ERROR - [ERROR] 订单表引用了 141 个不存在的产品ID
2025-12-02 17:41:32,856 - ETL_Pipeline - ERROR - [ERROR] 行为日志表引用了 3490 个不存在的客户ID
2025-12-02 17:41:32,899 - ETL_Pipeline - ERROR - [ERROR] 生成监控可视化失败: Unsupported subplot type: 'line'
2025-12-02 17:41:32,900 - ETL_Pipeline - INFO - [INFO] 监控报告已生成: data_quality_monitoring_report.json
2025-12-02 17:41:32,901 - ETL_Pipeline - INFO - [INFO] 监控完成 - 发现告警: 8 个
2025-12-02 17:41:32,903 - ETL_Pipeline - INFO - 
[STEP3] 计算总体质量评分
2025-12-02 17:41:32,906 - ETL_Pipeline - INFO -  [MONITOR] 开始全面数据质量监控
2025-12-02 17:41:32,908 - ETL_Pipeline - INFO - [INFO] 监控表: customers
2025-12-02 17:41:33

 [REPORT] 数据质量监控报告
生成时间: 2025-12-02 17:41:32

 [SUMMARY] 监控摘要
----------------------------------------
监控表数: 6
总告警数: 8
严重告警: 5
警告告警: 2
信息告警: 1

 [ALERTS] 告警详情
----------------------------------------

[严重告警] (5 个):
  • orders.browsing_duration_seconds: 发现 245 个负值
  • orders.click_count: 发现 198 个负值
  • 订单表引用了 1457 个不存在的客户ID
  • 订单表引用了 141 个不存在的产品ID
  • 行为日志表引用了 3490 个不存在的客户ID

[警告告警] (2 个):
  • customers: 缺失值比例 6.02% 超过阈值 5.0%
  • behavior_logs: 缺失值比例 6.99% 超过阈值 5.0%

[信息告警] (1 个):
  • customers.total_spent: 发现 134 个异常值 (2.73%)


2025-12-02 17:41:33,707 - ETL_Pipeline - ERROR - [ERROR] orders.browsing_duration_seconds: 发现 245 个负值
2025-12-02 17:41:33,709 - ETL_Pipeline - ERROR - [ERROR] orders.click_count: 发现 198 个负值
2025-12-02 17:41:33,731 - ETL_Pipeline - INFO - [INFO] 监控表: behavior_logs
2025-12-02 17:41:34,490 - ETL_Pipeline - ERROR - [ERROR] 订单表引用了 1457 个不存在的客户ID
2025-12-02 17:41:34,526 - ETL_Pipeline - ERROR - [ERROR] 订单表引用了 141 个不存在的产品ID
2025-12-02 17:41:34,606 - ETL_Pipeline - ERROR - [ERROR] 行为日志表引用了 3490 个不存在的客户ID
2025-12-02 17:41:34,647 - ETL_Pipeline - ERROR - [ERROR] 生成监控可视化失败: Unsupported subplot type: 'line'
2025-12-02 17:41:34,648 - ETL_Pipeline - INFO - [INFO] 监控报告已生成: data_quality_monitoring_report.json
2025-12-02 17:41:34,649 - ETL_Pipeline - INFO - [INFO] 监控完成 - 发现告警: 8 个
2025-12-02 17:41:34,650 - ETL_Pipeline - INFO - 
[STEP4] 设置实时监控
2025-12-02 17:41:34,650 - ETL_Pipeline - INFO - [INFO] 设置实时监控，每 60 分钟运行一次
2025-12-02 17:41:34,651 - ETL_Pipeline - INFO - [INFO] 实时监控已设置（模拟模式）
2025-12-02 17:41:3

 [REPORT] 数据质量监控报告
生成时间: 2025-12-02 17:41:34

 [SUMMARY] 监控摘要
----------------------------------------
监控表数: 6
总告警数: 8
严重告警: 5
警告告警: 2
信息告警: 1

 [ALERTS] 告警详情
----------------------------------------

[严重告警] (5 个):
  • orders.browsing_duration_seconds: 发现 245 个负值
  • orders.click_count: 发现 198 个负值
  • 订单表引用了 1457 个不存在的客户ID
  • 订单表引用了 141 个不存在的产品ID
  • 行为日志表引用了 3490 个不存在的客户ID

[警告告警] (2 个):
  • customers: 缺失值比例 6.02% 超过阈值 5.0%
  • behavior_logs: 缺失值比例 6.99% 超过阈值 5.0%

[信息告警] (1 个):
  • customers.total_spent: 发现 134 个异常值 (2.73%)

 [COMPLETED] ETL工作流执行完成!

 [SUMMARY] 执行摘要
----------------------------------------
工作流ID: WORKFLOW_20251202_174126
执行状态: 成功
总耗时: 7.66 秒
数据质量评分: [FAIL] 44.0/100

 [ETL] ETL管道结果
----------------------------------------
[OK] extraction     :   0.50 秒
[OK] transformation :   1.04 秒
[OK] loading        :   1.77 秒
[OK] validation     :   1.02 秒
[OK] warehouse      :   0.00 秒

 [MONITOR] 数据质量监控结果
----------------------------------------
总告警数: 8
严重告警:  5
警告告警:  2
信息告警: 