In [10]:
import numpy as np
import pandas as pd

### 金融特征衍生模块

In [None]:
def generate_pre_dws_group_cte():
    """生成预处理去重的CTE（核心优化：提前去重，避免重复distinct）"""
    pre_dws_group_sql = """, base_tag_layer AS (
    SELECT 
        mobile_md5,
        back_date,
        sign,
        event_type,
        ind_tag,
        dt_tag,
        count(mobile_md5) as times,
        concat_ws(',',collect_set(sign)) as sign_concat,
        min(event_time) as min_event_time,
        max(event_time) as max_event_time
    FROM(
        SELECT
            mobile_md5,
            back_date,
            sign,
            event_type,
            ind_tag,
            event_time,
            CASE WHEN DATEDIFF(back_date, event_time) > 0 AND DATEDIFF(back_date, event_time) <= 15 THEN 1
                WHEN DATEDIFF(back_date, event_time) > 15 AND DATEDIFF(back_date, event_time) <= 30 THEN 2
                WHEN DATEDIFF(back_date, event_time) > 30 AND DATEDIFF(back_date, event_time) <= 45 THEN 3
                WHEN DATEDIFF(back_date, event_time) > 45 AND DATEDIFF(back_date, event_time) <= 60 THEN 4
                WHEN DATEDIFF(back_date, event_time) > 60 AND DATEDIFF(back_date, event_time) <= 75 THEN 5
                WHEN DATEDIFF(back_date, event_time) > 75 AND DATEDIFF(back_date, event_time) <= 90 THEN 6
                WHEN DATEDIFF(back_date, event_time) > 90 AND DATEDIFF(back_date, event_time) <= 105 THEN 7
                WHEN DATEDIFF(back_date, event_time) > 105 AND DATEDIFF(back_date, event_time) <= 120 THEN 8
                WHEN DATEDIFF(back_date, event_time) > 120 AND DATEDIFF(back_date, event_time) <= 135 THEN 9
                WHEN DATEDIFF(back_date, event_time) > 135 AND DATEDIFF(back_date, event_time) <= 150 THEN 10
                WHEN DATEDIFF(back_date, event_time) > 150 AND DATEDIFF(back_date, event_time) <= 165 THEN 11
                WHEN DATEDIFF(back_date, event_time) > 165 AND DATEDIFF(back_date, event_time) <= 180 THEN 12
                WHEN DATEDIFF(back_date, event_time) > 180 AND DATEDIFF(back_date, event_time) <= 195 THEN 13
                WHEN DATEDIFF(back_date, event_time) > 195 AND DATEDIFF(back_date, event_time) <= 210 THEN 14
                WHEN DATEDIFF(back_date, event_time) > 210 AND DATEDIFF(back_date, event_time) <= 225 THEN 15
                WHEN DATEDIFF(back_date, event_time) > 225 AND DATEDIFF(back_date, event_time) <= 240 THEN 16
                WHEN DATEDIFF(back_date, event_time) > 240 AND DATEDIFF(back_date, event_time) <= 255 THEN 17
                WHEN DATEDIFF(back_date, event_time) > 255 AND DATEDIFF(back_date, event_time) <= 270 THEN 18
                WHEN DATEDIFF(back_date, event_time) > 270 AND DATEDIFF(back_date, event_time) <= 285 THEN 19
                WHEN DATEDIFF(back_date, event_time) > 285 AND DATEDIFF(back_date, event_time) <= 300 THEN 20
                WHEN DATEDIFF(back_date, event_time) > 300 AND DATEDIFF(back_date, event_time) <= 315 THEN 21
                WHEN DATEDIFF(back_date, event_time) > 315 AND DATEDIFF(back_date, event_time) <= 330 THEN 22
                WHEN DATEDIFF(back_date, event_time) > 330 AND DATEDIFF(back_date, event_time) <= 345 THEN 23
                ELSE 24 END AS dt_tag
        FROM customer_dwb_sms
        WHERE sign IS NOT NULL
    )customer_dwb_sms 
    GROUP BY mobile_md5, back_date, sign, event_type, ind_tag, dt_tag
    )"""
    return pre_dws_group_sql

def generate_plat_cnt_fields(win_days, ind_arr, event_arr, field_meta):
    """
    适配base_tag_layer中间层：通过LATERAL VIEW explode展开sign_concat，用size(collect_set)实现全局去重统计
    核心调整：返回CTE SQL + 平台数字段列表（用于显式拼接）
    :return: 平台数统计CTE字符串、CTE名称、平台数字段列表
    """
    plat_cnt_fields = []
    plat_field_names = []  # 新增：存储平台数字段名（仅字段名，无AS）
    # 时间窗口与dt_tag的映射（核心：将win_days转为dt_tag阈值）
    win2dt_tag = {
        15: 1,
        30: 2,
        90: 6,
        180: 12,
        360: 24
    }
    # 行业/事件中文注释映射
    ind_cn_mapping = {
        "indA": "A类小贷",
        "indB": "B类小贷",
        "indC": "C类小贷",
        "indD": "三方催收",
        "indE": "消费金融",
        "indF": "银行",
        "indG": "其他机构"
    }
    event_cn_mapping = {
        "B01": "严重逾期",
        "B02": "三方催收",
        "B03": "中度逾期",
        "B04": "还款失败",
        "B05": "申请失败",
        "B06": "还款成功",
        "B07": "放款成功",
        "B08": "还款提醒",
        "B09": "贷款营销",
        "B10": "金融其他"
    }

    # ========== 1. 总平台数（全局去重：展开后筛选+collect_set） ==========
    for win in win_days:
        dt_threshold = win2dt_tag[win]
        plat_cnt_field = f"sp_fin_loan_plat_cnt_{win}d"
        plat_field_names.append(plat_cnt_field)  # 收集字段名
        # 核心逻辑：
        # 1. 筛选符合时间窗口的平台元素 → collect_set全局去重 → size统计
        plat_cnt_fields.append(f"""size(collect_set(CASE WHEN dt_tag <= {dt_threshold} THEN sign_element ELSE NULL END)) AS {plat_cnt_field}""")
        field_meta[plat_cnt_field] = {
            "type": "INT",
            "cn_desc": f"近{win}天内金融事件平台数",
            "biz_tag": "平台数"
        }

    # ========== 2. 事件平台数（带event_type筛选+全局去重） ==========
    for win in win_days:
        dt_threshold = win2dt_tag[win]
        for event in event_arr:
            event_lower = event.lower()
            event_cn = event_cn_mapping.get(event, event)
            event_plat_field = f"sp_fin_loan_event_{event_lower}_plat_cnt_{win}d"
            plat_field_names.append(event_plat_field)  # 收集字段名
            plat_cnt_fields.append(f"""size(collect_set(CASE WHEN dt_tag <= {dt_threshold} AND event_type='{event}' THEN sign_element ELSE NULL END)) AS {event_plat_field}""")
            field_meta[event_plat_field] = {
                "type": "INT",
                "cn_desc": f"近{win}天内{event_cn}事件平台数",
                "biz_tag": f"事件平台数|{event_cn}"
            }

    # ========== 3. 行业+事件组合平台数（全局去重） ==========
    for win in win_days:
        dt_threshold = win2dt_tag[win]
        for ind in ind_arr:
            ind_lower = ind.lower()
            if ind_lower == "indd":
                continue
            for event in event_arr:
                event_lower = event.lower()
                event_cn = event_cn_mapping.get(event, event)
                ind_cn = ind_cn_mapping.get(ind, ind)
                combo_plat_field = f"sp_fin_loan_{ind_lower}_event_{event_lower}_plat_cnt_{win}d"
                plat_field_names.append(combo_plat_field)  # 收集字段名
                plat_cnt_fields.append(f"""size(collect_set(CASE WHEN dt_tag <= {dt_threshold} AND ind_tag='{ind}' AND event_type='{event}' THEN sign_element ELSE NULL END)) AS {combo_plat_field}""")
                field_meta[combo_plat_field] = {
                    "type": "INT",
                    "cn_desc": f"近{win}天内{ind_cn}行业{event_cn}事件平台数",
                    "biz_tag": f"行业+事件平台数|{ind_cn}|{event_cn}"
                }

    # 拼接字段，生成平台数统计CTE（先通过LATERAL VIEW explode展开sign_concat）
    plat_cnt_fields_sql = ',\n'.join([f.strip() for f in plat_cnt_fields])
    plat_cnt_sql = f""",plat_cnt_stats AS (
        SELECT
            mobile_md5,
            back_date,
            {plat_cnt_fields_sql}
        FROM base_tag_layer
        LATERAL VIEW explode(split(sign_concat, ',')) t AS sign_element
        GROUP BY mobile_md5, back_date
    )"""
    # 新增返回：plat_field_names（平台数字段列表）
    return plat_cnt_sql, "plat_cnt_stats", plat_field_names

def generate_event_stats_fields(win_days, ind_arr, event_arr, field_meta):
    """
    适配base_tag_layer中间层：生成轻量聚合字段（sum/MAX/MIN）
    """
    fields = []
    # 时间窗口与dt_tag的映射
    win2dt_tag = {
        15: 1,
        30: 2,
        90: 6,
        180: 12,
        360: 24
    }
    # 行业/事件中文注释映射
    ind_cn_mapping = {
        "indA": "A类小贷",
        "indB": "B类小贷",
        "indC": "C类小贷",
        "indD": "三方催收",
        "indE": "消费金融",
        "indF": "银行",
        "indG": "其他机构"
    }
    event_cn_mapping = {
        "B01": "严重逾期",
        "B02": "三方催收",
        "B03": "中度逾期",
        "B04": "还款失败",
        "B05": "申请失败",
        "B06": "还款成功",
        "B07": "放款成功",
        "B08": "还款提醒",
        "B09": "贷款营销",
        "B10": "金融其他"
    }

    # ========== 1. 金融事件总次数 ==========
    for win in win_days:
        dt_threshold = win2dt_tag[win]
        total_times_field = f"sp_fin_loan_total_times_{win}d"
        fields.append(f"""SUM(CASE WHEN dt_tag <= {dt_threshold} THEN times ELSE 0 END) AS {total_times_field}""")
        field_meta[total_times_field] = {
            "type": "BIGINT",
            "cn_desc": f"近{win}天内金融事件总次数",
            "biz_tag": "总次数"
        }

    # ========== 2. 不同行业行为次数 ==========
    for win in win_days:
        dt_threshold = win2dt_tag[win]
        for ind in ind_arr:
            ind_lower = ind.lower()
            ind_cn = ind_cn_mapping.get(ind, ind)
            ind_times_field = f"sp_fin_loan_{ind_lower}_times_{win}d"
            fields.append(f"""SUM(CASE WHEN dt_tag <= {dt_threshold} AND ind_tag='{ind}' THEN times ELSE 0 END) AS {ind_times_field}""")
            field_meta[ind_times_field] = {
                "type": "BIGINT",
                "cn_desc": f"近{win}天内{ind_cn}行业金融事件次数",
                "biz_tag": f"行业次数|{ind_cn}"
            }

    # ========== 3. 不同事件类型行为次数 ==========
    for win in win_days:
        dt_threshold = win2dt_tag[win]
        for event in event_arr:
            event_lower = event.lower()
            event_cn = event_cn_mapping.get(event, event)
            event_times_field = f"sp_fin_loan_event_{event_lower}_times_{win}d"
            fields.append(f"""SUM(CASE WHEN dt_tag <= {dt_threshold} AND event_type='{event}' THEN times ELSE 0 END) AS {event_times_field}""")
            field_meta[event_times_field] = {
                "type": "BIGINT",
                "cn_desc": f"近{win}天内{event_cn}事件次数",
                "biz_tag": f"事件次数|{event_cn}"
            }

    # ========== 4. 行业+事件类型组合次数 ==========
    for win in win_days:
        dt_threshold = win2dt_tag[win]
        for ind in ind_arr:
            ind_lower = ind.lower()
            if ind_lower == "indd":
                continue
            ind_cn = ind_cn_mapping.get(ind, ind)
            for event in event_arr:
                event_lower = event.lower()
                event_cn = event_cn_mapping.get(event, event)
                combo_times_field = f"sp_fin_loan_{ind_lower}_event_{event_lower}_times_{win}d"
                fields.append(f"""SUM(CASE WHEN dt_tag <= {dt_threshold} AND ind_tag='{ind}' AND event_type='{event}' THEN times ELSE 0 END) AS {combo_times_field}""")
                field_meta[combo_times_field] = {
                    "type": "BIGINT",
                    "cn_desc": f"近{win}天内{ind_cn}行业{event_cn}事件次数",
                    "biz_tag": f"行业+事件次数|{ind_cn}|{event_cn}"
                }

    # ========== 5. 不同事件行为最近/最远一次距今天数 ==========
    dt_threshold = win2dt_tag[win]
    for event in event_arr:
        event_lower = event.lower()
        event_cn = event_cn_mapping.get(event, event)
        # 最近天数
        event_latest_field = f"sp_fin_loan_event_{event_lower}_recent_dura_d_beforep_{win}d"
        fields.append(f"""datediff(back_date, MAX(CASE WHEN dt_tag <= {dt_threshold} AND event_type='{event}' THEN max_event_time END)) AS {event_latest_field}""")
        field_meta[event_latest_field] = {
            "type": "INT",
            "cn_desc": f"近{win}天内{event_cn}事件最近一次距今天数",
            "biz_tag": f"事件天数|{event_cn}|最近"
        }
        # 最远天数
        event_farthest_field = f"sp_fin_loan_event_{event_lower}_remote_dura_d_beforep_{win}d"
        fields.append(f"""datediff(back_date, MIN(CASE WHEN dt_tag <= {dt_threshold} AND event_type='{event}' THEN min_event_time END)) AS {event_farthest_field}""")
        field_meta[event_farthest_field] = {
            "type": "INT",
            "cn_desc": f"近{win}天内{event_cn}事件最远一次距今天数",
            "biz_tag": f"事件天数|{event_cn}|最远"
        }
    
    # 行业+事件组合时间差
    dt_threshold = win2dt_tag[win]
    for ind in ind_arr:
        ind_lower = ind.lower()
        if ind_lower == "indd":
            continue
        ind_cn = ind_cn_mapping.get(ind, ind)
        for event in event_arr:
            event_lower = event.lower()
            event_cn = event_cn_mapping.get(event, event)
            # 最近天数
            combo_latest_field = f"sp_fin_loan_{ind_lower}_event_{event_lower}_recent_dura_d_beforep_{win}d"
            fields.append(f"""datediff(back_date, MAX(CASE WHEN dt_tag <= {dt_threshold} AND ind_tag='{ind}' AND event_type='{event}' THEN max_event_time END)) AS {combo_latest_field}""")
            field_meta[combo_latest_field] = {
                "type": "INT",
                "cn_desc": f"近{win}天内{ind_cn}行业{event_cn}事件最近一次距今天数",
                "biz_tag": f"行业+事件天数|{ind_cn}|{event_cn}|最近"
            }
            # 最远天数
            combo_farthest_field = f"sp_fin_loan_{ind_lower}_event_{event_lower}_remote_dura_d_beforep_{win}d"
            fields.append(f"""datediff(back_date, MIN(CASE WHEN dt_tag <= {dt_threshold} AND ind_tag='{ind}' AND event_type='{event}' THEN min_event_time END)) AS {combo_farthest_field}""")
            field_meta[combo_farthest_field] = {
                "type": "INT",
                "cn_desc": f"近{win}天内{ind_cn}行业{event_cn}事件最远一次距今天数",
                "biz_tag": f"行业+事件天数|{ind_cn}|{event_cn}|最远"
            }

    return ",\n".join([f.strip() for f in fields])

def generate_fin_loan_stat_fields_levelv2(win_days, event_arr, ind_arr, field_meta):
    """
    适配base_tag_layer中间层：生成比率类字段（逻辑不变，依赖上述聚合字段）
    """
    fields = []
    # 事件/行业中文映射
    event_cn_mapping = {
        "B01": "严重逾期",
        "B02": "三方催收",
        "B03": "中度逾期",
        "B04": "还款失败",
        "B05": "申请失败",
        "B06": "还款成功",
        "B07": "放款成功",
        "B08": "还款提醒",
        "B09": "贷款营销",
        "B10": "金融其他"
    }
    ind_cn_mapping = {
        "indA": "A类小贷",
        "indB": "B类小贷",
        "indC": "C类小贷",
        "indD": "三方催收",
        "indE": "消费金融",
        "indF": "银行",
        "indG": "其他机构"
    }
    
    # ========== 6. 不同事件/行业占总事件比率 ==========
    for win in win_days:
        # 6.1 单个事件占总事件的比率
        for event in event_arr:
            event_lower = event.lower()
            event_cn = event_cn_mapping.get(event, event)
            event_ratio_field = f"sp_fin_loan_event_{event_lower}_ratio_total_{win}d"
            fields.append(f"""ROUND(CASE WHEN sp_fin_loan_total_times_{win}d = 0 THEN 0 ELSE sp_fin_loan_event_{event_lower}_times_{win}d / sp_fin_loan_total_times_{win}d END, 6) AS {event_ratio_field}""")
            field_meta[event_ratio_field] = {
                "type": "DOUBLE",
                "cn_desc": f"近{win}天内{event_cn}事件次数占总事件次数的比率（保留6位小数）",
                "biz_tag": f"事件占比|{event_cn}"
            }
        
        # 6.2 单个行业占总事件的比率
        for ind in ind_arr:
            ind_lower = ind.lower()
            ind_cn = ind_cn_mapping.get(ind, ind)
            ind_ratio_field = f"sp_fin_loan_{ind_lower}_ratio_total_{win}d"
            fields.append(f"""ROUND(CASE WHEN sp_fin_loan_total_times_{win}d = 0 THEN 0 ELSE sp_fin_loan_{ind_lower}_times_{win}d / sp_fin_loan_total_times_{win}d END, 6) AS {ind_ratio_field}""")
            field_meta[ind_ratio_field] = {
                "type": "DOUBLE",
                "cn_desc": f"近{win}天内{ind_cn}行业事件次数占总事件次数的比率（保留6位小数）",
                "biz_tag": f"行业占比|{ind_cn}"
            }

    # ========== 7. 不同事件/行业+事件跨窗口占比 ==========
    win_mapping = {
        15: 30,   # 15天/30天
        30: 90,   # 30天/90天
    }
    for small_win in [15, 30]:
        big_win = win_mapping[small_win]
        
        # 7.1 纯事件维度跨窗口占比
        for event in event_arr:
            event_lower = event.lower()
            event_cn = event_cn_mapping.get(event, event)
            event_win_ratio_field = f"sp_fin_loan_event_{event_lower}_ratio_{small_win}to{big_win}d"
            fields.append(f"""ROUND(CASE WHEN sp_fin_loan_event_{event_lower}_times_{big_win}d = 0 THEN 0 ELSE sp_fin_loan_event_{event_lower}_times_{small_win}d / sp_fin_loan_event_{event_lower}_times_{big_win}d END, 6) AS {event_win_ratio_field}""")
            field_meta[event_win_ratio_field] = {
                "type": "DOUBLE",
                "cn_desc": f"近{small_win}天内{event_cn}事件次数占近{big_win}天次数的比率（保留6位小数）",
                "biz_tag": f"事件跨窗口占比|{event_cn}|{small_win}d/{big_win}d"
            }
        
        # 7.2 行业+事件组合维度跨窗口占比
        for ind in ind_arr:
            ind_lower = ind.lower()
            if ind_lower == "indd":
                continue
            ind_cn = ind_cn_mapping.get(ind, ind)
            for event in event_arr:
                event_lower = event.lower()
                event_cn = event_cn_mapping.get(event, event)
                combo_win_ratio_field = f"sp_fin_loan_{ind_lower}_event_{event_lower}_ratio_{small_win}to{big_win}d"
                fields.append(f"""ROUND(CASE WHEN sp_fin_loan_{ind_lower}_event_{event_lower}_times_{big_win}d = 0 THEN 0 ELSE sp_fin_loan_{ind_lower}_event_{event_lower}_times_{small_win}d / sp_fin_loan_{ind_lower}_event_{event_lower}_times_{big_win}d END, 6) AS {combo_win_ratio_field}""")
                field_meta[combo_win_ratio_field] = {
                    "type": "DOUBLE",
                    "cn_desc": f"近{small_win}天内{ind_cn}行业{event_cn}事件次数占近{big_win}天次数的比率（保留6位小数）",
                    "biz_tag": f"行业+事件跨窗口占比|{ind_cn}|{event_cn}|{small_win}d/{big_win}d"
                }
    
    return ",\n".join([f.strip() for f in fields])

# ===================== 主流程（适配新的中间层） =====================
# 初始化字段元数据字典
field_mapping_dict = {
    "mobile_md5": {"type": "STRING", "cn_desc": "手机号MD5加密值", "biz_tag": "基础标识"},
    "back_date": {"type": "DATE", "cn_desc": "回溯日期", "biz_tag": "基础标识"}
}

# 配置项
WIN_DAYS = [15, 30, 90, 180, 360]
IND_ARR = ["indA", "indB", "indC", "indD", "indE", "indF", "indG"]
EVENT_ARR = ["B01", "B02", "B03", "B04", "B05", "B06", "B07", "B08", "B09", "B10"]

# 基础SQL（保持不变）
base_sql = """
-- CREATE TABLE sms_bd_data.customer_test_offline_fin_loan_feature_2026011201 AS
WITH cus_sample AS (
    SELECT 
        phone as mobile_md5,
        the_date as back_date
    FROM sms_bd_data.customer_test_sample_id
    group by phone, the_date
),
customer_dwb_sms AS (
    select
        mobile_md5, 
        back_date, 
        sign, 
        event_time,
        event_type, 
        ind_tag
    from(
        select
            cus_smp.mobile_md5,
            cus_smp.back_date,
            dwb_loan.sign,
            dwb_loan.event_type,
            dwb_loan.event_time,
            ind_tag
        from cus_sample cus_smp
        inner join(
            select
                phone,
                sign,
                event_type,
                ind_tag,
                from_unixtime(unix_timestamp(the_date, 'yyyyMMdd'), 'yyyy-MM-dd') as event_time
            from sms_bd_data.sms_dwb_fin_loan_ind_event_fdt
        )dwb_loan
        on cus_smp.mobile_md5=dwb_loan.phone 
        where cus_smp.back_date > dwb_loan.event_time and dwb_loan.event_time >= date_sub(cus_smp.back_date, 360)
    )cus_dwb
    group by mobile_md5, back_date, sign, event_type, event_time, ind_tag
)
"""

# 生成预处理CTE（原函数，未修改）
pre_dws_group_sql = generate_pre_dws_group_cte()

# 生成平台数统计字段
plat_cnt_sql, plat_cnt_cte_name, plat_field_names = generate_plat_cnt_fields(WIN_DAYS, IND_ARR, EVENT_ARR, field_mapping_dict)
pcs_fields_str = ",\n    ".join([f"pcs.{field}" for field in plat_field_names])

# 生成轻量聚合字段
event_stats_fields = generate_event_stats_fields(WIN_DAYS, IND_ARR, EVENT_ARR, field_mapping_dict)

# 生成事件统计CTE（基于base_tag_layer）
event_stats_sql = f""",event_stats AS (
    SELECT
        mobile_md5,
        back_date,
        {event_stats_fields}
    FROM base_tag_layer
    GROUP BY mobile_md5, back_date
)
"""

# 生成比率类字段
ratio_fields = generate_fin_loan_stat_fields_levelv2(WIN_DAYS, EVENT_ARR, IND_ARR, field_mapping_dict)

# 拼接最终SQL
final_sql = f"""{base_sql}
{pre_dws_group_sql}
{plat_cnt_sql}
{event_stats_sql},aggregated_data AS (
    SELECT
        es.*,
        {pcs_fields_str}
    FROM event_stats es
    LEFT JOIN {plat_cnt_cte_name} pcs
    ON es.mobile_md5 = pcs.mobile_md5 AND es.back_date = pcs.back_date
)
insert overwrite table sms_bd_data.customer_test_fin_loan_feature_2026011302
SELECT
   *,
    {ratio_fields}
FROM aggregated_data
"""

# 打印生成的SQL
print(final_sql)


-- CREATE TABLE sms_bd_data.customer_test_offline_fin_loan_feature_2026011201 AS
WITH cus_sample AS (
    SELECT 
        phone as mobile_md5,
        the_date as back_date
    FROM sms_bd_data.customer_test_sample_id
    group by phone, the_date
),
customer_dwb_sms AS (
    select
        mobile_md5, 
        back_date, 
        sign, 
        event_time,
        event_type, 
        ind_tag
    from(
        select
            cus_smp.mobile_md5,
            cus_smp.back_date,
            dwb_loan.sign,
            dwb_loan.event_type,
            dwb_loan.event_time,
            ind_tag
        from cus_sample cus_smp
        inner join(
            select
                phone,
                sign,
                event_type,
                from_unixtime(unix_timestamp(the_date, 'yyyyMMdd'), 'yyyy-MM-dd') as event_time
            from sms_bd_data.sms_dwb_fin_loan_ind_event_fdt
        )dwb_loan
        on cus_smp.mobile_md5=dwb_loan.phone 
        where cus_smp.back_date > dwb_

In [20]:
# 3. 输出字段类型统计
print("\n===== Express模块字段类型统计 =====")
type_count = {}
for field, meta in field_mapping_dict.items():
    field_type = meta["type"]
    type_count[field_type] = type_count.get(field_type, 0) + 1
for t, cnt in type_count.items():
    print(f"{t}类型字段数：{cnt}")


===== Express模块字段类型统计 =====
STRING类型字段数：1
DATE类型字段数：1
INT类型字段数：495
BIGINT类型字段数：390
DOUBLE类型字段数：225


In [21]:
field_mapping_dataframe = pd.DataFrame(field_mapping_dict).T
field_mapping_dataframe.to_excel("fin_loan_field_mapping_output_v3.xlsx", index_label="field_name")

### 

### 快递特征衍生模块

In [34]:
def generate_pre_dws_express_cte():
    """生成快递数据预处理去重的CTE（核心优化：提前去重+生成时间分桶dt_tag）"""
    pre_dws_express_sql = """, base_express_layer AS (
    SELECT 
        mobile_md5,
        back_date,
        sign,
        event_type,
        dt_tag,
        count(mobile_md5) as times,
        concat_ws(',',collect_set(sign)) as sign_concat,
        min(event_time) as min_event_time,
        max(event_time) as max_event_time
    FROM(
        SELECT
            mobile_md5,
            back_date,
            sign,
            event_type,
            event_time,
            -- 严格保留15天一组的时间分桶逻辑，不做任何改动
            CASE WHEN DATEDIFF(back_date, event_time) > 0 AND DATEDIFF(back_date, event_time) <= 15 THEN 1
                WHEN DATEDIFF(back_date, event_time) > 15 AND DATEDIFF(back_date, event_time) <= 30 THEN 2
                WHEN DATEDIFF(back_date, event_time) > 30 AND DATEDIFF(back_date, event_time) <= 45 THEN 3
                WHEN DATEDIFF(back_date, event_time) > 45 AND DATEDIFF(back_date, event_time) <= 60 THEN 4
                WHEN DATEDIFF(back_date, event_time) > 60 AND DATEDIFF(back_date, event_time) <= 75 THEN 5
                WHEN DATEDIFF(back_date, event_time) > 75 AND DATEDIFF(back_date, event_time) <= 90 THEN 6
                WHEN DATEDIFF(back_date, event_time) > 90 AND DATEDIFF(back_date, event_time) <= 105 THEN 7
                WHEN DATEDIFF(back_date, event_time) > 105 AND DATEDIFF(back_date, event_time) <= 120 THEN 8
                WHEN DATEDIFF(back_date, event_time) > 120 AND DATEDIFF(back_date, event_time) <= 135 THEN 9
                WHEN DATEDIFF(back_date, event_time) > 135 AND DATEDIFF(back_date, event_time) <= 150 THEN 10
                WHEN DATEDIFF(back_date, event_time) > 150 AND DATEDIFF(back_date, event_time) <= 165 THEN 11
                WHEN DATEDIFF(back_date, event_time) > 165 AND DATEDIFF(back_date, event_time) <= 180 THEN 12
                WHEN DATEDIFF(back_date, event_time) > 180 AND DATEDIFF(back_date, event_time) <= 195 THEN 13
                WHEN DATEDIFF(back_date, event_time) > 195 AND DATEDIFF(back_date, event_time) <= 210 THEN 14
                WHEN DATEDIFF(back_date, event_time) > 210 AND DATEDIFF(back_date, event_time) <= 225 THEN 15
                WHEN DATEDIFF(back_date, event_time) > 225 AND DATEDIFF(back_date, event_time) <= 240 THEN 16
                WHEN DATEDIFF(back_date, event_time) > 240 AND DATEDIFF(back_date, event_time) <= 255 THEN 17
                WHEN DATEDIFF(back_date, event_time) > 255 AND DATEDIFF(back_date, event_time) <= 270 THEN 18
                WHEN DATEDIFF(back_date, event_time) > 270 AND DATEDIFF(back_date, event_time) <= 285 THEN 19
                WHEN DATEDIFF(back_date, event_time) > 285 AND DATEDIFF(back_date, event_time) <= 300 THEN 20
                WHEN DATEDIFF(back_date, event_time) > 300 AND DATEDIFF(back_date, event_time) <= 315 THEN 21
                WHEN DATEDIFF(back_date, event_time) > 315 AND DATEDIFF(back_date, event_time) <= 330 THEN 22
                WHEN DATEDIFF(back_date, event_time) > 330 AND DATEDIFF(back_date, event_time) <= 345 THEN 23
                ELSE 24 END AS dt_tag
        FROM customer_dwb_express
        WHERE sign IS NOT NULL
    )customer_dwb_express 
    GROUP BY mobile_md5, back_date, sign, event_type, dt_tag
    )"""
    return pre_dws_express_sql

# ===================== 拆分1：仅处理次数相关字段 =====================
def generate_express_times_fields(win_days, event_arr, field_meta):
    """
    单独生成快递事件次数相关字段（总次数、各事件次数、时间差）
    不包含平台数逻辑，15天一组的时间分桶规则完全保留
    """
    fields = []
    # 时间窗口与dt_tag的映射（严格保留15天一组逻辑）
    win2dt_tag = {
        15: 1,
        30: 2,
        90: 6,
        180: 12,
        360: 24
    }
    # 快递事件中文注释映射
    event_cn_mapping = {
        "A01": "快递签收",
        "A02": "快递投诉",
        "A03": "快递寄件",
        "A04": "其他快递"
    }

    # ========== 1. 快递事件总次数 ==========
    for win in win_days:
        dt_threshold = win2dt_tag[win]
        
        # 总次数字段
        total_times_field = f"sp_express_total_times_{win}d"
        fields.append(f"""
            SUM(CASE WHEN dt_tag <= {dt_threshold} THEN times ELSE 0 END) AS {total_times_field}""")
        field_meta[total_times_field] = {
            "type": "BIGINT",
            "cn_desc": f"近{win}天内快递事件总次数",
            "biz_tag": "总次数"
        }

    # ========== 2. 不同事件类型行为次数 ==========
    for win in win_days:
        dt_threshold = win2dt_tag[win]
        for event in event_arr:
            event_lower = event.lower()
            event_cn = event_cn_mapping.get(event, event)
            
            # 事件次数字段
            event_times_field = f"sp_express_event_{event_lower}_times_{win}d"
            fields.append(f"""
            SUM(CASE WHEN dt_tag <= {dt_threshold} AND event_type='{event}' THEN times ELSE 0 END) AS {event_times_field}""")
            field_meta[event_times_field] = {
                "type": "BIGINT",
                "cn_desc": f"近{win}天内{event_cn}事件次数",
                "biz_tag": f"事件次数|{event_cn}"
            }

    # ========== 3. 不同事件行为最近一次与最远一次距今天数 ==========
    dt_threshold = win2dt_tag[win]
    for event in event_arr:
        event_lower = event.lower()
        event_cn = event_cn_mapping.get(event, event)
        
        # 单个事件最近天数（处理NULL值）
        event_latest_field = f"sp_express_event_{event_lower}_recent_dura_d_beforep_{win}d"
        fields.append(f"""datediff(back_date, MAX(CASE WHEN dt_tag <= {dt_threshold} AND event_type='{event}' THEN max_event_time ELSE NULL END)) AS {event_latest_field}""")
        field_meta[event_latest_field] = {
            "type": "INT",
            "cn_desc": f"近{win}天内{event_cn}事件最近一次距回溯日期的天数",
            "biz_tag": f"事件天数|{event_cn}|最近"
        }
        
        # 单个事件最远天数（处理NULL值）
        event_farthest_field = f"sp_express_event_{event_lower}_remote_dura_d_beforep_{win}d"
        fields.append(f"""datediff(back_date, MIN(CASE WHEN dt_tag <= {dt_threshold} AND event_type='{event}' THEN min_event_time ELSE NULL END)) AS {event_farthest_field}""")
        field_meta[event_farthest_field] = {
            "type": "INT",
            "cn_desc": f"近{win}天内{event_cn}事件最远一次距回溯日期的天数",
            "biz_tag": f"事件天数|{event_cn}|最远"
        }

    return ",\n".join([f.strip() for f in fields])

# ===================== 拆分2：仅处理平台数相关字段（独立CTE） =====================
def generate_express_plat_cnt_fields(win_days, event_arr, field_meta):
    """
    单独生成快递平台数相关字段（通过LATERAL VIEW explode展开后去重）
    15天一组的时间分桶规则完全保留
    改造点：新增返回平台数字段列表，支持显式拼接字段（替代*）
    """
    plat_fields = []
    plat_field_names = []  # 新增：存储平台数字段名（仅字段名，无AS）
    # 时间窗口与dt_tag的映射（严格保留15天一组逻辑）
    win2dt_tag = {
        15: 1,
        30: 2,
        90: 6,
        180: 12,
        360: 24
    }
    # 快递事件中文注释映射
    event_cn_mapping = {
        "A01": "快递签收",
        "A02": "快递投诉",
        "A03": "快递寄件",
        "A04": "其他快递"
    }

    # ========== 1. 总平台数 ==========
    for win in win_days:
        dt_threshold = win2dt_tag[win]
        plat_cnt_field = f"sp_express_plat_cnt_{win}d"
        plat_field_names.append(plat_cnt_field)  # 收集字段名
        plat_fields.append(f"""size(collect_set(CASE WHEN dt_tag <= {dt_threshold} THEN sign_element ELSE NULL END)) AS {plat_cnt_field}""")
        field_meta[plat_cnt_field] = {
            "type": "INT",
            "cn_desc": f"近{win}天内快递事件平台数",
            "biz_tag": "平台数"
        }

    # ========== 2. 不同事件类型平台数 ==========
    for win in win_days:
        dt_threshold = win2dt_tag[win]
        for event in event_arr:
            event_lower = event.lower()
            event_cn = event_cn_mapping.get(event, event)
            event_plat_field = f"sp_express_event_{event_lower}_plat_cnt_{win}d"
            plat_field_names.append(event_plat_field)  # 收集字段名
            plat_fields.append(f"""size(collect_set(CASE WHEN dt_tag <= {dt_threshold} AND event_type='{event}' THEN sign_element ELSE NULL END)) AS {event_plat_field}""")
            field_meta[event_plat_field] = {
                "type": "INT",
                "cn_desc": f"近{win}天内{event_cn}事件涉及的平台数",
                "biz_tag": f"事件平台数|{event_cn}"
            }

    # 生成平台数专属CTE（必须包含LATERAL VIEW explode展开逻辑）
    plat_cnt_fields_sql = ',\n'.join([f.strip() for f in plat_fields])
    plat_cnt_sql = f""",express_plat_cnt_stats AS (
        SELECT
            mobile_md5,
            back_date,
            {plat_cnt_fields_sql}
        FROM base_express_layer
        -- 严格按要求展开sign_concat后统计平台数
        LATERAL VIEW explode(split(sign_concat, ',')) t AS sign_element
        GROUP BY mobile_md5, back_date
    )"""
    # 新增返回：plat_field_names（平台数字段列表）
    return plat_cnt_sql, "express_plat_cnt_stats", plat_field_names

# ===================== 比率字段（简化分母判断，仅保留=0条件） =====================
def generate_express_stat_fields_levelv2(win_days, event_arr, field_meta):
    """
    生成快递模块比率类字段，简化分母判断：仅保留=0条件，去掉IS NULL判断
    """
    fields = []
    # 快递事件中文注释映射
    event_cn_mapping = {
        "A01": "快递签收",
        "A02": "快递投诉",
        "A03": "快递寄件",
        "A04": "其他快递"
    }
    
    # ========== 4. 不同事件占总事件的比率 ==========
    for win in win_days:
        for event in event_arr:
            event_lower = event.lower()
            event_cn = event_cn_mapping.get(event, event)
            event_ratio_field = f"sp_express_event_{event_lower}_ratio_total_{win}d"
            # 仅保留分母=0的判断，去掉IS NULL
            fields.append(f"""ROUND(CASE WHEN sp_express_total_times_{win}d = 0 THEN 0 ELSE sp_express_event_{event_lower}_times_{win}d / sp_express_total_times_{win}d END, 6) AS {event_ratio_field}""")
            field_meta[event_ratio_field] = {
                "type": "DOUBLE",
                "cn_desc": f"近{win}天内{event_cn}事件次数占总事件次数的比率（保留6位小数）",
                "biz_tag": f"事件占比|{event_cn}"
            }

    # ========== 5. 不同事件跨窗口占比（小窗口/大窗口） ==========
    win_mapping = {
        15: 30,   # 15天/30天
        30: 90    # 30天/90天
    }
    for small_win in [15, 30]:
        big_win = win_mapping[small_win]
        if big_win not in win_days:
            continue
        
        for event in event_arr:
            event_lower = event.lower()
            event_cn = event_cn_mapping.get(event, event)
            event_win_ratio_field = f"sp_express_event_{event_lower}_ratio_{small_win}to{big_win}d"
            # 仅保留分母=0的判断，去掉IS NULL
            fields.append(f"""ROUND(CASE WHEN sp_express_event_{event_lower}_times_{big_win}d = 0 THEN 0 ELSE sp_express_event_{event_lower}_times_{small_win}d / sp_express_event_{event_lower}_times_{big_win}d END, 6) AS {event_win_ratio_field}""")
            field_meta[event_win_ratio_field] = {
                "type": "DOUBLE",
                "cn_desc": f"近{small_win}天内{event_cn}事件次数占近{big_win}天次数的比率（保留6位小数）",
                "biz_tag": f"事件跨窗口占比|{event_cn}|{small_win}d/{big_win}d"
            }
    
    return ",\n".join([f.strip() for f in fields])

# ===================== 主流程（整合拆分的次数+平台数逻辑） =====================
# 初始化字段元数据字典
field_mapping_dict = {
    "mobile_md5": {"type": "STRING", "cn_desc": "手机号MD5加密值", "biz_tag": "基础标识"},
    "back_date": {"type": "DATE", "cn_desc": "回溯日期", "biz_tag": "基础标识"}
}

# 配置项
WIN_DAYS = [15, 30, 90, 180, 360]
EVENT_ARR = ["A01", "A02", "A03", "A04"]

# 基础SQL（保持原有业务逻辑）
base_sql = """
-- CREATE TABLE sms_bd_data.customer_test_offline_express_feature_2026010901 AS
WITH cus_sample AS (
    SELECT 
        phone as mobile_md5,
        the_date as back_date
    FROM sms_bd_data.customer_test_sample_id
    group by phone, the_date
),customer_dwb_express AS (
    select
        mobile_md5, 
        back_date, 
        sign, 
        event_type,
        event_time
    from(
        select
            cus_smp.mobile_md5,
            cus_smp.back_date,
            dwb_express.sign,
            dwb_express.event_type,
            dwb_express.event_time
        from cus_sample cus_smp
        left join(
            select
                phone,
                sign,
                event_type,
                from_unixtime(unix_timestamp(the_date, 'yyyyMMdd'), 'yyyy-MM-dd') as event_time
            from sms_bd_data.sms_dwb_express_event_fdt
        )dwb_express
        on cus_smp.mobile_md5=dwb_express.phone 
        where cus_smp.back_date > dwb_express.event_time and dwb_express.event_time >= date_sub(cus_smp.back_date, 360)
    )cus_dwb
    group by mobile_md5, back_date, sign, event_type, event_time
)
"""

# 生成预处理CTE
pre_dws_express_sql = generate_pre_dws_express_cte()

# 生成次数字段（独立）
exp_times_fields = generate_express_times_fields(WIN_DAYS, EVENT_ARR, field_mapping_dict)

# 生成平台数字段（独立CTE，包含explode展开）
exp_plat_cnt_sql, plat_cte_name, plat_field_names = generate_express_plat_cnt_fields(WIN_DAYS, EVENT_ARR, field_mapping_dict)
pcs_fields_str = ",\n    ".join([f"pcs.{field}" for field in plat_field_names])

# 生成比率字段（简化分母判断）
exp_ratio_fields = generate_express_stat_fields_levelv2(WIN_DAYS, EVENT_ARR, field_mapping_dict)

# 拼接最终SQL（先统计次数，再关联平台数，最后计算比率）
final_sql = f"""{base_sql}
{pre_dws_express_sql}
-- 第一步：统计次数相关字段
,express_times_stats AS (
    SELECT
        mobile_md5,
        back_date,
        {exp_times_fields}
    FROM base_express_layer
    GROUP BY mobile_md5, back_date
)
{exp_plat_cnt_sql}
,aggregated_data AS (
    SELECT
        ts.*,
        {pcs_fields_str}  -- 显式列出所有平台数字段，替代pcs.*
    FROM express_times_stats ts
    LEFT JOIN {plat_cte_name} pcs
    ON ts.mobile_md5 = pcs.mobile_md5 AND ts.back_date = pcs.back_date
)
insert overwrite table sms_bd_data.customer_test_express_feature_2026011301
SELECT
   *,
    {exp_ratio_fields}
FROM aggregated_data
"""

# 输出最终SQL
print(final_sql)


-- CREATE TABLE sms_bd_data.customer_test_offline_express_feature_2026010901 AS
WITH cus_sample AS (
    SELECT 
        phone as mobile_md5,
        the_date as back_date
    FROM sms_bd_data.customer_test_sample_id
    group by phone, the_date
),customer_dwb_express AS (
    select
        mobile_md5, 
        back_date, 
        sign, 
        event_type,
        event_time
    from(
        select
            cus_smp.mobile_md5,
            cus_smp.back_date,
            dwb_express.sign,
            dwb_express.event_type,
            dwb_express.event_time
        from cus_sample cus_smp
        left join(
            select
                phone,
                sign,
                event_type,
                from_unixtime(unix_timestamp(the_date, 'yyyyMMdd'), 'yyyy-MM-dd') as event_time
            from sms_bd_data.sms_dwb_express_event_fdt
        )dwb_express
        on cus_smp.mobile_md5=dwb_express.phone 
        where cus_smp.back_date > dwb_express.event_time and dwb_e

In [35]:
# 3. 输出字段类型统计
print("\n===== Express模块字段类型统计 =====")
type_count = {}
for field, meta in field_mapping_dict.items():
    field_type = meta["type"]
    type_count[field_type] = type_count.get(field_type, 0) + 1
for t, cnt in type_count.items():
    print(f"{t}类型字段数：{cnt}")


===== Express模块字段类型统计 =====
STRING类型字段数：1
DATE类型字段数：1
BIGINT类型字段数：25
INT类型字段数：33
DOUBLE类型字段数：28


In [36]:
import pandas as pd
field_mapping_dataframe = pd.DataFrame(field_mapping_dict).T
field_mapping_dataframe.to_excel("express_field_mapping_output_v3.xlsx", index_label="field_name")