In [1]:
"""
Olist 电商数据分析 - 数据清洗
====================================
此脚本用于清洗数据，处理缺失值、异常值等
"""

import pandas as pd
import numpy as np
from pathlib import Path

# 设置路径
BASE_DIR = Path("..") 
DATA_DIR = BASE_DIR / "Brazilian E-Commerce Public Dataset by Olist"
OUTPUT_DIR = BASE_DIR / "outputs" / "cleaned_data"

# 创建输出目录
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

def load_data():
    print("加载原始数据...")

    data_files = {
        'customers': 'olist_customers_dataset.csv',
        'orders': 'olist_orders_dataset.csv',
        'order_items': 'olist_order_items_dataset.csv',
        'products': 'olist_products_dataset.csv',
        'order_payments': 'olist_order_payments_dataset.csv',
        'order_reviews': 'olist_order_reviews_dataset.csv',
        'sellers': 'olist_sellers_dataset.csv',
        'geolocation': 'olist_geolocation_dataset.csv',
        'category_translation': 'product_category_name_translation.csv'
    }

    datasets = {}
    for name, file in data_files.items():
        path = DATA_DIR / file
        datasets[name] = pd.read_csv(path, low_memory=False)
        print(f"✓ {name}: {datasets[name].shape}")

    return datasets


def clean_customers(df):
    """清洗客户数据"""
    print("\n清洗客户数据...")
    df_clean = df.copy()
    
    # 移除重复的customer_id
    df_clean = df_clean.drop_duplicates(subset=['customer_id'], keep='first')
    
    # 处理城市和州名（转换为小写并去除空格）
    df_clean['customer_city'] = df_clean['customer_city'].astype(str).str.lower().str.strip()
    df_clean['customer_state'] = df_clean['customer_state'].astype(str).str.upper().str.strip()
    
    print(f"  原始记录数: {len(df)}")
    print(f"  清洗后记录数: {len(df_clean)}")
    print(f"  缺失值: {df_clean.isnull().sum().sum()}")
    
    return df_clean

def clean_orders(df):
    """清洗订单数据"""
    print("\n清洗订单数据...")
    df_clean = df.copy()
    
    # 转换时间戳列为datetime类型
    date_columns = [
        'order_purchase_timestamp',
        'order_approved_at',
        'order_delivered_carrier_date',
        'order_delivered_customer_date',
        'order_estimated_delivery_date'
    ]
    
    for col in date_columns:
        if col in df_clean.columns:
            df_clean[col] = pd.to_datetime(df_clean[col], errors='coerce')
    
    # 移除异常的订单（购买时间晚于交付时间）
    mask = df_clean['order_delivered_customer_date'] >= df_clean['order_purchase_timestamp']
    df_clean = df_clean[mask | df_clean['order_delivered_customer_date'].isna()]
    
    # 计算配送时间（天数）
    df_clean['delivery_days'] = (
        df_clean['order_delivered_customer_date'] - 
        df_clean['order_purchase_timestamp']
    ).dt.days
    
    # 计算是否按时交付
    df_clean['on_time_delivery'] = (
        df_clean['order_delivered_customer_date'] <= 
        df_clean['order_estimated_delivery_date']
    )
    
    print(f"  原始记录数: {len(df)}")
    print(f"  清洗后记录数: {len(df_clean)}")
    print(f"  已交付订单: {df_clean[df_clean['order_status'] == 'delivered'].shape[0]}")
    
    return df_clean

def clean_order_items(df):
    """清洗订单商品数据"""
    print("\n清洗订单商品数据...")
    df_clean = df.copy()
    
    # 移除价格和运费为负数的记录
    df_clean = df_clean[df_clean['price'] >= 0]
    df_clean = df_clean[df_clean['freight_value'] >= 0]
    
    # 移除异常高的价格（假设超过10000为异常）
    df_clean = df_clean[df_clean['price'] <= 10000]
    
    # 计算总价值
    df_clean['total_value'] = df_clean['price'] + df_clean['freight_value']
    
    # 转换时间戳
    if 'shipping_limit_date' in df_clean.columns:
        df_clean['shipping_limit_date'] = pd.to_datetime(
            df_clean['shipping_limit_date'], errors='coerce'
        )
    
    print(f"  原始记录数: {len(df)}")
    print(f"  清洗后记录数: {len(df_clean)}")
    print(f"  平均商品价格: {df_clean['price'].mean():.2f}")
    print(f"  平均运费: {df_clean['freight_value'].mean():.2f}")
    
    return df_clean

def clean_products(df, translation_df):
    """清洗产品数据"""
    print("\n清洗产品数据...")
    df_clean = df.copy()
    
    # 合并类别翻译
    if translation_df is not None:
        df_clean = df_clean.merge(
            translation_df,
            on='product_category_name',
            how='left'
        )
    
    # 处理数值列的缺失值
    numeric_columns = [
        'product_weight_g', 'product_length_cm', 
        'product_height_cm', 'product_width_cm'
    ]
    
    for col in numeric_columns:
        if col in df_clean.columns:
            # 用中位数填充缺失值
            median_value = df_clean[col].median()
            df_clean[col].fillna(median_value)
    
    # 处理分类列的缺失值
    if 'product_category_name' in df_clean.columns:
        df_clean['product_category_name'].fillna('unknown')
    if 'product_category_name_english' in df_clean.columns:
        df_clean['product_category_name_english'].fillna('unknown')
    
    # 计算产品体积（cm³）
    df_clean['product_volume_cm3'] = (
        df_clean['product_length_cm'] * 
        df_clean['product_height_cm'] * 
        df_clean['product_width_cm']
    )
    
    print(f"  原始记录数: {len(df)}")
    print(f"  清洗后记录数: {len(df_clean)}")
    print(f"  产品类别数: {df_clean['product_category_name'].nunique()}")
    
    return df_clean

def clean_order_payments(df):
    """清洗支付数据"""
    print("\n清洗支付数据...")
    df_clean = df.copy()
    
    # 移除支付金额为负数的记录
    df_clean = df_clean[df_clean['payment_value'] >= 0]
    
    # 移除异常高的支付金额
    df_clean = df_clean[df_clean['payment_value'] <= 20000]
    
    print(f"  原始记录数: {len(df)}")
    print(f"  清洗后记录数: {len(df_clean)}")
    print(f"  支付方式: {df_clean['payment_type'].unique()}")
    print(f"  总支付金额: {df_clean['payment_value'].sum():.2f}")
    
    return df_clean

def clean_order_reviews(df):
    """清洗评价数据"""
    print("\n清洗评价数据...")
    df_clean = df.copy()
    
    # 转换时间戳
    date_columns = ['review_creation_date', 'review_answer_timestamp']
    for col in date_columns:
        if col in df_clean.columns:
            df_clean[col] = pd.to_datetime(df_clean[col], errors='coerce')
    
    # 处理评分缺失值（用中位数填充）
    if 'review_score' in df_clean.columns:
        df_clean['review_score'].fillna(
            df_clean['review_score'].median(), 
        )
    
    # 创建评分分类
    df_clean['review_category'] = pd.cut(
        df_clean['review_score'],
        bins=[0, 2, 3, 4, 5],
        labels=['Very Bad', 'Bad', 'Good', 'Very Good'],
        include_lowest=True
    )
    
    print(f"  原始记录数: {len(df)}")
    print(f"  清洗后记录数: {len(df_clean)}")
    print(f"  平均评分: {df_clean['review_score'].mean():.2f}")
    
    return df_clean

def clean_sellers(df):
    """清洗卖家数据"""
    print("\n清洗卖家数据...")
    df_clean = df.copy()
    
    # 处理城市和州名
    df_clean['seller_city'] = df_clean['seller_city'].astype(str).str.lower().str.strip()
    df_clean['seller_state'] = df_clean['seller_state'].astype(str).str.upper().str.strip()
    
    print(f"  原始记录数: {len(df)}")
    print(f"  清洗后记录数: {len(df_clean)}")
    print(f"  卖家所在州数: {df_clean['seller_state'].nunique()}")
    
    return df_clean

def clean_all_datasets(datasets):
    """清洗所有数据集"""
    print("=" * 60)
    print("开始数据清洗...")
    print("=" * 60)
    
    cleaned_datasets = {}
    
    # 清洗各个数据集
    if 'customers' in datasets:
        cleaned_datasets['customers'] = clean_customers(datasets['customers'])
    
    if 'orders' in datasets:
        cleaned_datasets['orders'] = clean_orders(datasets['orders'])
    
    if 'order_items' in datasets:
        cleaned_datasets['order_items'] = clean_order_items(datasets['order_items'])
    
    if 'products' in datasets:
        translation = datasets.get('category_translation')
        cleaned_datasets['products'] = clean_products(datasets['products'], translation)
    
    if 'order_payments' in datasets:
        cleaned_datasets['order_payments'] = clean_order_payments(datasets['order_payments'])
    
    if 'order_reviews' in datasets:
        cleaned_datasets['order_reviews'] = clean_order_reviews(datasets['order_reviews'])
    
    if 'sellers' in datasets:
        cleaned_datasets['sellers'] = clean_sellers(datasets['sellers'])
    
    # 保存清洗后的数据
    print("\n" + "=" * 60)
    print("保存清洗后的数据...")
    print("=" * 60)
    
    for name, df in cleaned_datasets.items():
        # 定义清洗后数据存放位置
        output_file = OUTPUT_DIR / f"{name}_cleaned.csv"
        # 写入清洗后的数据
        df.to_csv(output_file, index=False)
        print(f"✓ 已保存: {output_file}")
    
    print("\n✓ 数据清洗完成！")
    return cleaned_datasets

def main():
    """主函数"""
    # 加载数据
    datasets = load_data()
    
    # 清洗数据
    cleaned_datasets = clean_all_datasets(datasets)
    
    return cleaned_datasets

cleaned_datasets = main()


加载原始数据...
✓ customers: (99441, 5)
✓ orders: (99441, 8)
✓ order_items: (112650, 7)
✓ products: (32951, 9)
✓ order_payments: (103886, 5)
✓ order_reviews: (99224, 7)
✓ sellers: (3095, 4)
✓ geolocation: (1000163, 5)
✓ category_translation: (71, 2)
开始数据清洗...

清洗客户数据...
  原始记录数: 99441
  清洗后记录数: 99441
  缺失值: 0

清洗订单数据...
  原始记录数: 99441
  清洗后记录数: 99441
  已交付订单: 96478

清洗订单商品数据...
  原始记录数: 112650
  清洗后记录数: 112650
  平均商品价格: 120.65
  平均运费: 19.99

清洗产品数据...
  原始记录数: 32951
  清洗后记录数: 32951
  产品类别数: 73

清洗支付数据...
  原始记录数: 103886
  清洗后记录数: 103886
  支付方式: ['credit_card' 'boleto' 'voucher' 'debit_card' 'not_defined']
  总支付金额: 16008872.12

清洗评价数据...
  原始记录数: 99224
  清洗后记录数: 99224
  平均评分: 4.09

清洗卖家数据...
  原始记录数: 3095
  清洗后记录数: 3095
  卖家所在州数: 23

保存清洗后的数据...
✓ 已保存: ../outputs/cleaned_data/customers_cleaned.csv
✓ 已保存: ../outputs/cleaned_data/orders_cleaned.csv
✓ 已保存: ../outputs/cleaned_data/order_items_cleaned.csv
✓ 已保存: ../outputs/cleaned_data/products_cleaned.csv
✓ 已保存: ../outputs/cleaned_data/order_paym