In [None]:
import pandas as pd
import numpy as np

# 示例数据（实际中从数据库或文件加载）
# df = pd.read_csv("customer_info.csv")
# 或
# df = pd.read_sql("SELECT * FROM customer_info", connection)

# 模拟数据
data = {
    'customer_name': ['张三', '', '李四', None, '未知', '王五'],
    'id_card': ['11010119900101001X', '', 'N/A', None, 'UNKNOWN', '11010119850101002X'],
    'phone': ['13800138000', '000-000-0000', '-', None, '13900139000', ''],
    'birth_date': ['1990-01-01', '1900-01-01', '1985-06-15', '1970-01-01', None, '2000-03-20'],
    'address': ['北京市朝阳区', '暂无', '', 'N/A', '上海市浦东新区', 'Unknown'],
    'status': ['ACTIVE', 'ACTIVE', 'INACTIVE', 'ACTIVE', 'ACTIVE', 'ACTIVE']
}
df = pd.DataFrame(data)

# 只分析有效客户
df = df[df['status'] == 'ACTIVE'].copy()

# =============================
# 配置区：可由业务与数据团队共同定义
# =============================

# 1. 关键字段及其权重（总和应为1）
critical_fields = {
    'customer_name': 0.2,
    'id_card': 0.3,
    'phone': 0.25,
    'birth_date': 0.15,
    'address': 0.1
}

# 2. 缺失值识别规则（可扩展）
missing_indicators = {
    'text': ['null', 'n/a', 'na', 'unknown', '未知', '暂无', '-', ''],
    'phone': ['000-000-0000', '123-456-7890'],  # 异常号码
    'date': ['1900-01-01', '1970-01-01']       # 常见默认日期
}

# 3. 联合缺失检查组合（业务核心字段组）
joint_missing_groups = {
    '核心身份信息': ['customer_name', 'id_card', 'phone']
}

# =============================
# 核心函数：字段完整性检查
# =============================

def is_missing(series: pd.Series, field_name: str) -> pd.Series:
    """
    判断某列中每个值是否为“缺失”（根据业务规则）
    """
    if series.dtype == 'object':
        # 转为字符串并统一小写处理
        temp = series.astype(str).str.strip().str.lower()

        # 基础缺失：NaN 或 空白
        missing = series.isna() | (temp == '') | (temp == 'nan') | (temp == 'none')

        # 添加通用占位符
        for placeholder in missing_indicators.get('text', []):
            missing |= (temp == placeholder.lower())

        # 特殊字段额外规则
        if field_name == 'phone':
            for bad_value in missing_indicators.get('phone', []):
                missing |= (temp == bad_value.lower())
        elif field_name == 'birth_date':
            for bad_value in missing_indicators.get('date', []):
                missing |= (temp == bad_value.lower())

        return missing
    else:
        # 非文本字段（如数值、日期），先处理 NaN
        missing = series.isna()
        if field_name in missing_indicators:
            for val in missing_indicators[field_name]:
                missing |= (series == val)
        return missing

def check_field_completeness(df: pd.DataFrame, critical_fields: dict, joint_groups: dict):
    """
    执行字段完整性检查，返回详细报告
    """
    total_records = len(df)
    results = []

    # Step 1: 单字段缺失统计
    for field in critical_fields.keys():
        if field not in df.columns:
            print(f"⚠️ 字段 {field} 不存在于数据中，跳过")
            continue

        missing_mask = is_missing(df[field], field)
        missing_count = missing_mask.sum()

        results.append({
            '字段名称': field,
            '缺失数量': missing_count,
            '总记录数': total_records,
            '缺失率(%)': round(100 * missing_count / total_records, 2),
            '类型': '单字段缺失'
        })

    # Step 2: 加权缺失率
    weighted_missing = 0.0
    for field, weight in critical_fields.items():
        if field not in df.columns:
            continue
        missing_mask = is_missing(df[field], field)
        missing_rate = missing_mask.sum() / total_records
        weighted_missing += weight * missing_rate

    results.append({
        '字段名称': '【加权缺失率】',
        '缺失数量': None,
        '总记录数': total_records,
        '缺失率(%)': round(100 * weighted_missing, 2),
        '类型': '综合指标'
    })

    # Step 3: 联合缺失率
    for group_name, fields in joint_groups.items():
        all_exist = True
        for f in fields:
            if f not in df.columns:
                print(f"⚠️ 联合检查中字段 {f} 不存在，跳过组: {group_name}")
                continue
            missing_mask = is_missing(df[f], f)
            all_exist = all_exist & ~missing_mask  # 所有字段都存在才为True

        joint_missing_count = len(df) - all_exist.sum()  # 至少一个缺失
        # 或者：同时全部缺失（更严格）
        all_missing_mask = pd.Series([True] * len(df), index=df.index)
        for f in fields:
            if f not in df.columns:
                continue
            all_missing_mask &= is_missing(df[f], f)
        full_joint_missing_count = all_missing_mask.sum()

        results.append({
            '字段名称': f'【联合缺失率】{group_name}（任一缺失）',
            '缺失数量': joint_missing_count,
            '总记录数': total_records,
            '缺失率(%)': round(100 * joint_missing_count / total_records, 2),
            '类型': '联合缺失'
        })

        results.append({
            '字段名称': f'【联合完全缺失】{group_name}（全部为空）',
            '缺失数量': full_joint_missing_count,
            '总记录数': total_records,
            '缺失率(%)': round(100 * full_joint_missing_count / total_records, 2),
            '类型': '联合缺失'
        })

    return pd.DataFrame(results)

# =============================
# 执行检查
# =============================

report_df = check_field_completeness(
    df=df,
    critical_fields=critical_fields,
    joint_groups=joint_missing_groups
)

# =============================
# 输出结果
# =============================

print("📊 字段完整性检查报告\n")
print(report_df.to_string(index=False))

# 可选：导出为 Excel
# report_df.to_excel("field_completeness_report.xlsx", index=False)