# 特征工程流水线深度解析

本 Notebook 旨在逐步分解并展示项目中从原始数据到特征生成的完整特征工程流程。

## 架构概述

```
Raw Data (CSV) → YAML Config → UniProcess Operations → Final Features
     ↓              ↓              ↓                    ↓
data/train/    config/feat.yml   env/UniProcess/    Processed DataFrame
```

## 流程步骤

1. **加载原始数据**：从 `data/train/` 目录加载样本数据
2. **解析 YAML 配置**：读取 `config/feat.yml` 并解析为结构化对象
3. **构建操作中心**：创建 OP_HUB 映射表
4. **执行特征工程**：逐步应用每个操作
5. **查看最终结果**：分析处理后的特征

## 步骤 1: 环境设置与导入

In [1]:
import os
import sys
import json
from functools import partial
from datetime import datetime
from hashlib import md5
from typing import Any, Dict, List, Union

import pandas as pd
import yaml
import numpy as np
from pprint import pprint
from glob import glob

print("基础库导入完成")

# 添加项目路径
project_root = os.getcwd()
env_path = os.path.join(project_root, 'env')
if env_path not in sys.path:
    sys.path.insert(0, env_path)
    print(f"已添加环境路径: {env_path}")

print("\n环境设置完成！")


基础库导入完成
已添加环境路径: /Users/main/Documents/02推荐算法/02同花顺实习/02ainvest-push-recall-group-master_wf/env

环境设置完成！


## 步骤 2: 加载原始数据

In [2]:
data_path = 'data/train/*.csv'
# 使用glob获取所有匹配的CSV文件路径
csv_files = glob(data_path)
if not csv_files:
    raise ValueError(f"No CSV files found in {data_path}")

# 读取并合并所有CSV文件
df_raw = pd.concat([pd.read_csv(f) for f in csv_files], ignore_index=True)
for col in df_raw.columns: print(f"{col}: {df_raw[col].iloc[0]}") # 直观展示各列的具体数据的样式

user_id: 1800001088
create_time: 2025-05-31 08:39:07
log_type: PR
watchlists: nan
holdings: nan
country: Germany
prefer_bid: nan
user_propernoun: germany#3.06|mid-america#1.02
push_title: Ainvest Newswire
push_content: Hims & Hers Health Lays Off 4% of Staff Amid Strategy Shift
item_code: [{"market":"169","score":0,"code":"HIMS","tagId":"U000012934","name":"Hims & Hers Health","type":0,"parentId":"US_ROBOT0f37d7fd3fca6a41"},{"market":"169","score":0,"code":"NVO","tagId":"U000002999","name":"Novo Nordisk","type":0,"parentId":"US_ROBOT0f37d7fd3fca6a41"}]
item_tags: [{"score":0.7803922295570374,"tagId":"51510","name":"us_high_importance","type":4,"parentId":"US_ROBOT0f37d7fd3fca6a41"},{"score":0.7803922295570374,"tagId":"57967","name":"Fusion","type":4,"parentId":"US_ROBOT0f37d7fd3fca6a41"},{"tagId":"1002","name":"no_penny_stock","type":4,"parentId":"US_ROBOT0f37d7fd3fca6a41"}]
submit_type: autoFlash


In [3]:
# 查看具体数据样例
print("各列数据样例:")
for col in df_raw.columns:
    print(f"{col}: {df_raw[col].iloc[0]}")

各列数据样例:
user_id: 1800001088
create_time: 2025-05-31 08:39:07
log_type: PR
watchlists: nan
holdings: nan
country: Germany
prefer_bid: nan
user_propernoun: germany#3.06|mid-america#1.02
push_title: Ainvest Newswire
push_content: Hims & Hers Health Lays Off 4% of Staff Amid Strategy Shift
item_code: [{"market":"169","score":0,"code":"HIMS","tagId":"U000012934","name":"Hims & Hers Health","type":0,"parentId":"US_ROBOT0f37d7fd3fca6a41"},{"market":"169","score":0,"code":"NVO","tagId":"U000002999","name":"Novo Nordisk","type":0,"parentId":"US_ROBOT0f37d7fd3fca6a41"}]
item_tags: [{"score":0.7803922295570374,"tagId":"51510","name":"us_high_importance","type":4,"parentId":"US_ROBOT0f37d7fd3fca6a41"},{"score":0.7803922295570374,"tagId":"57967","name":"Fusion","type":4,"parentId":"US_ROBOT0f37d7fd3fca6a41"},{"tagId":"1002","name":"no_penny_stock","type":4,"parentId":"US_ROBOT0f37d7fd3fca6a41"}]
submit_type: autoFlash


## 步骤 3: 加载并解析 YAML 配置
这是特征工程的"蓝图"，定义了每个特征如何从原始数据中提取和转换。

In [4]:
feat_config_path = 'config/feat.yml'

with open(feat_config_path, 'r', encoding='utf-8') as f:
    feat_config = yaml.safe_load(f)

print(feat_config)

{'exclude_features': {'current': 'default', 'default': [], 'exclude_user_behavior': ['user_watch_stk_code', 'prefer_bid_code', 'hold_bid_code', 'user_propernoun'], 'exclude_user_propernoun': ['user_propernoun']}, 'pipelines': [{'embedding_dim': 8, 'feat_name': 'hour', 'feat_type': 'sparse', 'input_sample': '2024-08-02 00:44:05', 'operations': [{'col_in': 'create_time', 'col_out': 'create_time', 'func_name': 'fillna', 'func_parameters': {'na_value': '2024-08-02 00:16:34'}}, {'col_in': 'create_time', 'col_out': 'hour', 'func_name': 'to_hour', 'func_parameters': {}}], 'vocabulary_size': 24}, {'embedding_dim': 8, 'feat_name': 'weekday', 'feat_type': 'sparse', 'input_sample': '2024-08-02 00:44:05', 'operations': [{'col_in': 'create_time', 'col_out': 'weekday', 'func_name': 'to_weekday', 'func_parameters': {}}], 'vocabulary_size': 7}, {'embedding_dim': 8, 'feat_name': 'user_watch_stk_code_hash', 'feat_type': 'varlen_sparse', 'input_sample': 'AAPL_185 & TSLA_185', 'operations': [{'col_in': 'w

In [5]:
# 查看一个完整的流水线配置示例
example_pipeline = feat_config['pipelines'][0]  # 取第一个流水线
print(f"示例流水线: {example_pipeline['feat_name']}")
pprint(example_pipeline)

示例流水线: hour
{'embedding_dim': 8,
 'feat_name': 'hour',
 'feat_type': 'sparse',
 'input_sample': '2024-08-02 00:44:05',
 'operations': [{'col_in': 'create_time',
                 'col_out': 'create_time',
                 'func_name': 'fillna',
                 'func_parameters': {'na_value': '2024-08-02 00:16:34'}},
                {'col_in': 'create_time',
                 'col_out': 'hour',
                 'func_name': 'to_hour',
                 'func_parameters': {}}],
 'vocabulary_size': 24}


## 步骤 4: 定义特征操作函数

由于我们无法直接导入 UniProcess 库，我们将手动实现一些关键的特征操作函数。


In [6]:
# 定义缺失值常量
MISSING_VALUE = [None, '', 'null', 'NULL', 'None', np.nan]

def fillna(x: Union[float, int, str], na_value: Union[float, int, str]) -> Union[float, int, str]:
    """填充缺失值"""
    if x in MISSING_VALUE or (isinstance(x, float) and pd.isna(x)):
        return na_value
    return x

def split(x: str, sep: str) -> List[str]:
    """字符串分割"""
    return str(x).split(sep)

def seperation(x: List[str], sep: str) -> List[List[str]]:
    """列表元素二次分割"""
    if not isinstance(x, list):
        return []
    return [item.split(sep) for item in x]

def list_get(x: List[List[Any]], item_index: int) -> List[Any]:
    """获取嵌套列表中指定位置的元素"""
    if not isinstance(x, list):
        return []
    result = []
    for sublist in x:
        if isinstance(sublist, list) and len(sublist) > item_index:
            result.append(sublist[item_index])
        else:
            result.append('null')
    return result

def remove_items(x: List[str], target_values: List[str]) -> List[str]:
    """移除列表中的指定元素"""
    if not isinstance(x, list):
        return []
    return [item for item in x if item not in target_values]

def padding(x: List[Any], pad_value: Union[str, float, int], max_len: int) -> List[Any]:
    """列表填充到指定长度"""
    if not isinstance(x, list):
        x = []
    if len(x) >= max_len:
        return x[:max_len]
    else:
        return x + [pad_value] * (max_len - len(x))

def list_hash(x: List[str], vocabulary_size: int) -> List[int]:
    """对列表中每个元素进行哈希"""
    if not isinstance(x, list):
        return []
    result = []
    for item in x:
        hash_val = int(md5(str(item).encode()).hexdigest(), 16) % vocabulary_size
        result.append(hash_val)
    return result

def str_hash(x: str, vocabulary_size: int) -> int:
    """字符串哈希"""
    return int(md5(str(x).encode()).hexdigest(), 16) % vocabulary_size

def to_hour(x: str) -> int:
    """提取时间中的小时"""
    try:
        dt = pd.to_datetime(x)
        return dt.hour
    except:
        return 0

def to_weekday(x: str) -> int:
    """提取时间中的星期"""
    try:
        dt = pd.to_datetime(x)
        return dt.weekday()
    except:
        return 0

def list_len(x: List) -> int:
    """列表长度"""
    if isinstance(x, list):
        return len(x)
    return 0

def int_max(x: int, max_value: int) -> int:
    """限制整数最大值"""
    return min(int(x), max_value)

def json_object_to_list(x: str, key: str) -> List[str]:
    """从JSON对象列表中提取指定键的值"""
    try:
        data = json.loads(x)
        if isinstance(data, list):
            return [item.get(key, 'null') for item in data if isinstance(item, dict)]
        return ['null']
    except:
        return ['null']

def map_to_int(x: Union[str, List], map_dict: Dict[str, int], default_code: int = 0) -> Union[List[int], int]:
    """映射到整数"""
    if isinstance(x, list):
        return [map_dict.get(item, default_code) for item in x]
    else:
        return map_dict.get(str(x), default_code)


In [7]:
# 构建操作中心 (OP_HUB)
OP_HUB = {
    'fillna': fillna,
    'split': split,
    'seperation': seperation,
    'list_get': list_get,
    'remove_items': remove_items,
    'padding': padding,
    'list_hash': list_hash,
    'str_hash': str_hash,
    'to_hour': to_hour,
    'to_weekday': to_weekday,
    'list_len': list_len,
    'int_max': int_max,
    'json_object_to_list': json_object_to_list,
    'map_to_int': map_to_int
}

print(f"OP_HUB 构建完成，包含 {len(OP_HUB)} 个操作函数")
print(f"可用函数: {list(OP_HUB.keys())}")


OP_HUB 构建完成，包含 14 个操作函数
可用函数: ['fillna', 'split', 'seperation', 'list_get', 'remove_items', 'padding', 'list_hash', 'str_hash', 'to_hour', 'to_weekday', 'list_len', 'int_max', 'json_object_to_list', 'map_to_int']


## 步骤 5: 实现特征工程执行引擎

这是核心部分：我们将实现 `run_one_op` 函数来执行单个操作。


In [8]:
def run_one_op(df: pd.DataFrame, operation: dict) -> pd.DataFrame:
    """执行单个特征操作"""
    # 获取操作配置
    col_in = operation['col_in']
    col_out = operation['col_out']
    func_name = operation['func_name']
    parameters = operation.get('func_parameters', {})
    
    # 检查函数是否存在
    if func_name not in OP_HUB:
        return df
    
    # 检查输入列是否存在
    input_cols = [col_in] if isinstance(col_in, str) else col_in
    if not all(col in df.columns for col in input_cols):
        return df
    
    # 准备特征转换函数
    transform_func = partial(OP_HUB[func_name], **parameters)
    
    # 执行特征转换
    if isinstance(col_in, list):
        df[col_out] = df[col_in].apply(lambda row: transform_func(*row), axis=1)
    else:
        df[col_out] = df[col_in].apply(transform_func)
    
    return df

## 步骤 6: 执行完整的特征工程流水线

现在我们将遍历所有的特征流水线，逐一执行每个操作。


In [9]:
def process_feature_pipelines(df_raw: pd.DataFrame, feat_config: dict) -> tuple[pd.DataFrame, list]:
    """执行特征工程流水线"""
    # 创建数据副本
    df = df_raw.copy()
    
    # 获取需要处理的流水线
    pipelines = feat_config['pipelines']

    # 记录成功处理的特征
    processed_features = []
    
    # 执行每个特征处理流水线
    for pipeline in pipelines:
        feat_name = pipeline['feat_name']
        operations = pipeline['operations']
        
        # 执行流水线中的每个操作
        for operation in operations:
            df = run_one_op(df, operation)

        # 记录处理成功的特征
        processed_features.append(feat_name)
    
    return df,processed_features

# 处理特征
df_processed, processed_features = process_feature_pipelines(df_raw, feat_config)

## 步骤 7: 分析处理结果

In [10]:
# 比较处理前后的数据结构
print("数据结构对比:")
print(f"原始列数: {len(df_raw.columns)}")
print(f"处理后列数: {len(df_processed.columns)}")
print(f"新增列数: {len(df_processed.columns) - len(df_raw.columns)}")

print("\n原始列名:")
print(list(df_raw.columns))

print("\n新增列名:")
new_columns = [col for col in df_processed.columns if col not in df_raw.columns]
print(new_columns)

数据结构对比:
原始列数: 13
处理后列数: 30
新增列数: 17

原始列名:
['user_id', 'create_time', 'log_type', 'watchlists', 'holdings', 'country', 'prefer_bid', 'user_propernoun', 'push_title', 'push_content', 'item_code', 'item_tags', 'submit_type']

新增列名:
['hour', 'weekday', 'user_watch_stk_code', 'user_watch_stk_code_hash', 'country_hash', 'prefer_bid_code', 'prefer_bid_code_hash', 'hold_bid_code', 'hold_bid_code_hash', 'user_propernoun_code', 'user_propernoun_hash', 'push_title_hash', 'title_len', 'item_code_hash', 'submit_type_hash', 'tagIds', 'tag_id_hash']


In [11]:
# 查看成功生成的特征
print("成功生成的特征详情:")
for feat_name in processed_features:
    if feat_name in df_processed.columns:
        sample_data = df_processed[feat_name].iloc[0]
        data_type = type(sample_data).__name__
        print(f"  {feat_name}: {data_type} = {sample_data}")


成功生成的特征详情:
  hour: int64 = 8
  weekday: int64 = 5
  user_watch_stk_code_hash: list = [8381, 8381, 8381, 8381, 8381]
  country_hash: int64 = 71
  prefer_bid_code_hash: list = [8381, 8381, 8381, 8381, 8381]
  hold_bid_code_hash: list = [8381, 8381, 8381, 8381, 8381]
  user_propernoun_hash: list = [178, 417, 8381, 8381, 8381]
  push_title_hash: int64 = 7
  title_len: int64 = 12
  item_code_hash: list = [6837, 3491, 8381, 8381, 8381]
  submit_type_hash: int64 = 6
  tag_id_hash: list = [8139, 8993, 880]


In [12]:
# 显示最终处理结果的部分数据
print("最终处理结果预览:")
display_cols = ['user_id', 'log_type'] + processed_features  # 显示前6个新特征
display_cols = [col for col in display_cols if col in df_processed.columns]

df_processed[display_cols].head()


最终处理结果预览:


Unnamed: 0,user_id,log_type,hour,weekday,user_watch_stk_code_hash,country_hash,prefer_bid_code_hash,hold_bid_code_hash,user_propernoun_hash,push_title_hash,title_len,item_code_hash,submit_type_hash,tag_id_hash
0,1800001088,PR,8,5,"[8381, 8381, 8381, 8381, 8381]",71,"[8381, 8381, 8381, 8381, 8381]","[8381, 8381, 8381, 8381, 8381]","[178, 417, 8381, 8381, 8381]",7,12,"[6837, 3491, 8381, 8381, 8381]",6,"[8139, 8993, 880]"
1,1800001417,PR,22,5,"[8381, 8381, 8381, 8381, 8381]",145,"[8381, 8381, 8381, 8381, 8381]","[8381, 8381, 8381, 8381, 8381]","[8381, 8381, 8381, 8381, 8381]",7,10,"[7762, 6902, 6157, 1986, 5551]",4,"[4634, 2106, 9827]"
2,1800001501,PC,10,5,"[1895, 8808, 1021, 8381, 8381]",145,"[8381, 8381, 8381, 8381, 8381]","[8381, 8381, 8381, 8381, 8381]","[323, 9351, 3453, 8381, 8381]",5,17,"[9724, 8381, 8381, 8381, 8381]",1,"[2601, 4380, 4797]"
3,1800001501,PR,22,5,"[1895, 8808, 1021, 8381, 8381]",145,"[8381, 8381, 8381, 8381, 8381]","[8381, 8381, 8381, 8381, 8381]","[323, 9351, 3453, 8381, 8381]",7,10,"[7762, 6902, 6157, 1986, 5551]",4,"[4634, 2106, 9827]"
4,1800001819,PR,21,5,"[8381, 8381, 8381, 8381, 8381]",145,"[8381, 8381, 8381, 8381, 8381]","[8381, 8381, 8381, 8381, 8381]","[8381, 8381, 8381, 8381, 8381]",4,11,"[916, 8381, 8381, 8381, 8381]",1,"[9593, 4380, 8381]"


## 总结

通过这个 Notebook，我们完整地演示了特征工程流水线的执行过程：

### 🎯 核心流程

1. **数据加载**: 从 CSV 文件加载原始数据
2. **配置解析**: 将 YAML 配置文件解析为可执行的操作序列
3. **操作执行**: 通过 OP_HUB 查找并执行具体的特征变换函数
4. **结果生成**: 生成最终的特征数据

### 🔧 关键组件

- **OP_HUB**: 操作函数注册表，连接配置文件中的 `func_name` 和实际的 Python 函数
- **run_one_op**: 执行引擎，负责应用单个操作到 DataFrame
- **Pipeline**: 操作序列，定义了特征的完整变换过程

### 📊 特征类型

- **sparse**: 稀疏特征，通常是分类变量的哈希值
- **varlen_sparse**: 变长稀疏特征，处理列表型数据
- **dense**: 稠密特征，数值型特征

### 💡 设计优势

这个流水线的设计使得特征工程变得：
- **高度可配置**: 通过 YAML 文件定义特征变换
- **模块化**: 每个操作函数都是独立的
- **可复用**: 操作函数可以在不同的流水线中重复使用
- **易于扩展**: 添加新的操作函数只需要在 OP_HUB 中注册

这种架构在生产环境中非常有价值，因为它允许数据科学家快速实验不同的特征工程策略，而无需修改核心代码。


In [13]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
特征分析模块 - 表格化展示
对特征工程处理后的数据进行统计分析，以表格形式呈现结果
"""

import pandas as pd
import numpy as np
from typing import Dict, List, Any
from collections import Counter
import warnings
warnings.filterwarnings('ignore')

class FeatureTableAnalyzer:
    """特征表格分析器 - 专注于表格化结果展示"""
    
    def __init__(self, df: pd.DataFrame, feature_list: List[str]):
        self.df = df
        self.features = feature_list
        self.analysis_results = {}
    
    def analyze_all_features(self) -> None:
        """分析所有特征并生成表格化报告"""
        print("🔍 开始特征分析...")
        
        # 1. 特征概览表
        self.create_feature_overview_table()
        
        # 2. 数值型特征详细分析
        self.analyze_numeric_features_table()
        
        # 3. 列表型特征详细分析
        self.analyze_list_features_table()
        
        # 4. 数据质量报告
        self.create_data_quality_table()
        
        print("\n✅ 特征分析完成!")
    
    def create_feature_overview_table(self) -> None:
        """创建特征概览表"""
        print("\n" + "="*80)
        print("📊 特征概览表")
        print("="*80)
        
        overview_data = []
        for feature in self.features:
            if feature in self.df.columns:
                sample_value = self.df[feature].iloc[0]
                feature_type = "列表型" if isinstance(sample_value, list) else "数值型"
                
                # 基本统计
                unique_count = self.df[feature].nunique() if not isinstance(sample_value, list) else "N/A"
                missing_count = self.df[feature].isnull().sum()
                missing_rate = f"{missing_count/len(self.df)*100:.2f}%" if missing_count > 0 else "0%"
                
                # 样例值
                if isinstance(sample_value, list):
                    sample_str = f"[{', '.join(map(str, sample_value[:3]))}...]" if len(sample_value) > 3 else str(sample_value)
                    if len(sample_str) > 50:
                        sample_str = sample_str[:47] + "..."
                else:
                    sample_str = str(sample_value)
                
                overview_data.append({
                    '特征名': feature,
                    '类型': feature_type,
                    '样本数': len(self.df),
                    '唯一值数': unique_count,
                    '缺失值': missing_rate,
                    '样例值': sample_str
                })
        
        overview_df = pd.DataFrame(overview_data)
        print(overview_df.to_string(index=False))
    
    def analyze_numeric_features_table(self) -> None:
        """分析数值型特征并生成表格"""
        print("\n" + "="*80)
        print("📈 数值型特征统计表")
        print("="*80)
        
        numeric_features = []
        stats_data = []
        
        for feature in self.features:
            if feature in self.df.columns:
                sample_value = self.df[feature].iloc[0]
                if not isinstance(sample_value, list):
                    numeric_features.append(feature)
                    data = self.df[feature]
                    
                    stats_data.append({
                        '特征名': feature,
                        '最小值': data.min(),
                        '最大值': data.max(),
                        '均值': f"{data.mean():.4f}",
                        '中位数': data.median(),
                        '标准差': f"{data.std():.4f}",
                        '25%分位': data.quantile(0.25),
                        '75%分位': data.quantile(0.75),
                        '偏度': f"{data.skew():.4f}"
                    })
        
        if stats_data:
            stats_df = pd.DataFrame(stats_data)
            print(stats_df.to_string(index=False))
            
            # 数值型特征分布表
            print("\n📊 数值型特征分布表 (前5个高频值)")
            print("-"*80)
            for feature in numeric_features[:5]:  # 只显示前5个特征避免输出过长
                data = self.df[feature]
                value_counts = data.value_counts().head(5)
                print(f"\n{feature}:")
                dist_data = []
                for value, count in value_counts.items():
                    dist_data.append({
                        '值': value,
                        '频次': count,
                        '占比': f"{count/len(data)*100:.2f}%"
                    })
                dist_df = pd.DataFrame(dist_data)
                print(dist_df.to_string(index=False))
        else:
            print("未找到数值型特征")
    
    def analyze_list_features_table(self) -> None:
        """分析列表型特征并生成表格"""
        print("\n" + "="*80)
        print("📋 列表型特征统计表")
        print("="*80)
        
        list_features = []
        list_stats_data = []
        
        for feature in self.features:
            if feature in self.df.columns:
                sample_value = self.df[feature].iloc[0]
                if isinstance(sample_value, list):
                    list_features.append(feature)
                    data = self.df[feature]
                    
                    # 计算长度统计
                    lengths = [len(x) if isinstance(x, list) else 0 for x in data]
                    
                    # 计算元素统计
                    all_elements = []
                    for item in data:
                        if isinstance(item, list):
                            all_elements.extend(item)
                    
                    element_counter = Counter(all_elements)
                    
                    list_stats_data.append({
                        '特征名': feature,
                        '平均长度': f"{np.mean(lengths):.2f}",
                        '最小长度': min(lengths),
                        '最大长度': max(lengths),
                        '总元素数': len(all_elements),
                        '唯一元素数': len(element_counter),
                        '元素重复率': f"{(1-len(element_counter)/len(all_elements))*100:.2f}%" if all_elements else "0%"
                    })
        
        if list_stats_data:
            list_stats_df = pd.DataFrame(list_stats_data)
            print(list_stats_df.to_string(index=False))
            
            # 列表长度分布表
            print("\n📏 列表长度分布表")
            print("-"*80)
            for feature in list_features:
                data = self.df[feature]
                lengths = [len(x) if isinstance(x, list) else 0 for x in data]
                length_dist = Counter(lengths)
                
                print(f"\n{feature}:")
                length_data = []
                for length, count in sorted(length_dist.items()):
                    length_data.append({
                        '长度': length,
                        '样本数': count,
                        '占比': f"{count/len(data)*100:.2f}%"
                    })
                length_df = pd.DataFrame(length_data)
                print(length_df.to_string(index=False))
                
                # 高频元素表
                print(f"\n{feature} - 高频元素 (前8个):")
                all_elements = []
                for item in data:
                    if isinstance(item, list):
                        all_elements.extend(item)
                
                if all_elements:
                    element_counter = Counter(all_elements)
                    element_data = []
                    for element, count in element_counter.most_common(8):
                        element_data.append({
                            '元素': element,
                            '出现次数': count,
                            '占比': f"{count/len(all_elements)*100:.2f}%"
                        })
                    element_df = pd.DataFrame(element_data)
                    print(element_df.to_string(index=False))
        else:
            print("未找到列表型特征")
    
    def create_data_quality_table(self) -> None:
        """创建数据质量报告表"""
        print("\n" + "="*80)
        print("🔍 数据质量报告")
        print("="*80)
        
        quality_data = []
        for feature in self.features:
            if feature in self.df.columns:
                data = self.df[feature]
                missing_count = data.isnull().sum()
                
                # 数据一致性检查
                sample_value = data.iloc[0]
                if isinstance(sample_value, list):
                    # 检查列表特征的一致性
                    lengths = [len(x) if isinstance(x, list) else 0 for x in data]
                    length_variance = np.var(lengths)
                    consistency = "高" if length_variance < 1 else "中" if length_variance < 4 else "低"
                else:
                    # 检查数值特征的分布
                    cv = data.std() / data.mean() if data.mean() != 0 else 0
                    consistency = "高" if cv < 0.1 else "中" if cv < 0.5 else "低"
                
                quality_data.append({
                    '特征名': feature,
                    '完整性': f"{(1-missing_count/len(data))*100:.1f}%",
                    '缺失值数量': missing_count,
                    '数据一致性': consistency,
                    '数据类型': "列表型" if isinstance(sample_value, list) else "数值型",
                    '是否可用': "✅" if missing_count < len(data) * 0.5 else "⚠️"
                })
        
        quality_df = pd.DataFrame(quality_data)
        print(quality_df.to_string(index=False))
        
        # 总体质量评估
        total_features = len(quality_data)
        usable_features = sum(1 for item in quality_data if item['是否可用'] == "✅")
        avg_completeness = np.mean([float(item['完整性'].strip('%')) for item in quality_data])
        
        print(f"\n📋 总体评估:")
        print(f"   • 特征总数: {total_features}")
        print(f"   • 可用特征: {usable_features} ({usable_features/total_features*100:.1f}%)")
        print(f"   • 平均完整性: {avg_completeness:.1f}%")
        print(f"   • 数据质量: {'优秀' if avg_completeness > 95 else '良好' if avg_completeness > 85 else '需改进'}")

# 使用示例函数
def run_feature_analysis(df_processed: pd.DataFrame, processed_features: List[str]) -> None:
    """运行特征分析的便捷函数"""
    analyzer = FeatureTableAnalyzer(df_processed, processed_features)
    analyzer.analyze_all_features()

print("📊 特征表格分析模块已加载!")
print("使用方法: run_feature_analysis(df_processed, processed_features)")


📊 特征表格分析模块已加载!
使用方法: run_feature_analysis(df_processed, processed_features)


In [14]:
# ===============================
# 🚀 运行完整特征分析 
# ===============================
# 注意：此代码块需要在特征工程完成后运行
# 将在notebook末尾执行特征工程后，再回到此处运行分析

def run_comprehensive_feature_analysis():
    """运行完整的特征分析 - 在特征工程完成后调用"""
    try:
        # 检查是否已经完成特征工程
        if 'df_processed' in globals() and 'processed_features' in globals():
            print("🎯 开始运行完整特征分析...")
            print("="*80)
            
            # 运行表格化特征分析
            run_feature_analysis(df_processed, processed_features)
            
            print("\n" + "="*80)
            print("🎉 特征分析完成！以上报告展示了所有特征的详细统计信息")
            
        else:
            print("⚠️  请先运行特征工程流水线，生成 df_processed 和 processed_features")
            print("   建议按顺序执行以下步骤：")
            print("   1. 运行数据加载代码块")
            print("   2. 运行YAML配置解析代码块") 
            print("   3. 运行特征工程执行代码块")
            print("   4. 再回到此处运行特征分析")
            
    except Exception as e:
        print(f"❌ 分析过程中出现错误: {e}")
        print("   请确保已经正确执行了前面的特征工程步骤")

# 显示使用说明
print("📋 特征分析使用说明:")
print("   在完成特征工程后，运行: run_comprehensive_feature_analysis()")
print("   或者直接运行: run_feature_analysis(df_processed, processed_features)")
print("\n💡 提示: 建议在notebook最后执行特征工程后再运行此分析")


📋 特征分析使用说明:
   在完成特征工程后，运行: run_comprehensive_feature_analysis()
   或者直接运行: run_feature_analysis(df_processed, processed_features)

💡 提示: 建议在notebook最后执行特征工程后再运行此分析


In [15]:
run_comprehensive_feature_analysis()

🎯 开始运行完整特征分析...
🔍 开始特征分析...

📊 特征概览表
                     特征名  类型   样本数 唯一值数 缺失值                   样例值
                    hour 数值型 50000   24  0%                     8
                 weekday 数值型 50000    6  0%                     5
user_watch_stk_code_hash 列表型 50000  N/A  0% [8381, 8381, 8381...]
            country_hash 数值型 50000  120  0%                    71
    prefer_bid_code_hash 列表型 50000  N/A  0% [8381, 8381, 8381...]
      hold_bid_code_hash 列表型 50000  N/A  0% [8381, 8381, 8381...]
    user_propernoun_hash 列表型 50000  N/A  0%   [178, 417, 8381...]
         push_title_hash 数值型 50000    8  0%                     7
               title_len 数值型 50000   27  0%                    12
          item_code_hash 列表型 50000  N/A  0% [6837, 3491, 8381...]
        submit_type_hash 数值型 50000    3  0%                     6
             tag_id_hash 列表型 50000  N/A  0%     [8139, 8993, 880]

📈 数值型特征统计表
             特征名  最小值  最大值       均值   中位数     标准差  25%分位  75%分位      偏度
            hour    0

# 08 树模型定义与训练评估

In [16]:
import yaml
import lightgbm as lgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score

def prepare_features(df_processed, processed_features, max_list_length=5):
    """展开列表特征"""
    df_tree = df_processed[processed_features].copy()
    
    for feat in processed_features:
        if isinstance(df_tree[feat].iloc[0], list):
            expanded = df_tree[feat].apply(pd.Series).iloc[:, :max_list_length]
            expanded.columns = [f"{feat}_{i}" for i in range(expanded.shape[1])]
            df_tree = df_tree.drop(columns=[feat]).join(expanded)
    
    return df_tree

def train_model(X, y, train_params):
    """训练LightGBM模型"""
    X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)
    
    train_data = lgb.Dataset(X_train, y_train)
    val_data = lgb.Dataset(X_val, y_val, reference=train_data)
    
    model = lgb.train(
        train_params,
        train_data,
        num_boost_round=train_params.pop('num_iterations', 1000),
        callbacks=[lgb.early_stopping(train_params.pop('early_stopping_rounds', 100))],
        valid_sets=[train_data, val_data],
        valid_names=['train', 'valid']
    )
    
    return model, X_train, X_val, y_train, y_val

def evaluate_model(model, X_train, X_val, y_train, y_val):
    """评估模型性能"""
    y_train_pred = model.predict(X_train, num_iteration=model.best_iteration)
    y_val_pred = model.predict(X_val, num_iteration=model.best_iteration)
    
    train_auc = roc_auc_score(y_train, y_train_pred)
    val_auc = roc_auc_score(y_val, y_val_pred)
    
    print(f"训练集 AUC: {train_auc:.4f}")
    print(f"验证集 AUC: {val_auc:.4f}")
    
    return pd.DataFrame({
        'feature': model.feature_name(),
        'importance': model.feature_importance(importance_type='gain')
    }).sort_values('importance', ascending=False)

In [17]:
# 主流程
with open('config/config.yml', 'r', encoding='utf-8') as f:
    train_config = yaml.safe_load(f)

if 'log_type' in df_processed.columns:
    # 准备数据
    df_processed['label'] = df_processed['log_type'].apply(lambda x: 1 if x == 'PC' else 0)
    X = prepare_features(df_processed, processed_features)
    y = df_processed['label']
    
    # 训练模型
    train_params = {**train_config['train'], 'verbose': -1, 'n_jobs': -1, 'seed': 42}
    model, X_train, X_val, y_train, y_val = train_model(X, y, train_params)
    
    # 评估并输出结果
    feature_importance = evaluate_model(model, X_train, X_val, y_train, y_val)
    print("\n特征重要性 (Top 20):")
    print(feature_importance.head(20))
else:
    print("错误: 找不到 'log_type' 列，无法进行模型训练。")

Training until validation scores don't improve for 50 rounds
Did not meet early stopping. Best iteration is:
[300]	train's auc: 0.863989	valid's auc: 0.847059
训练集 AUC: 0.8640
验证集 AUC: 0.8471

特征重要性 (Top 20):
                       feature     importance
23      user_propernoun_hash_2  346906.059675
21      user_propernoun_hash_0  116131.237506
2                 country_hash   60277.225478
22      user_propernoun_hash_1   55774.542730
1                      weekday   24687.482483
0                         hour    9143.383609
5             submit_type_hash    5356.176477
10  user_watch_stk_code_hash_4    4607.537248
3              push_title_hash    3455.920555
6   user_watch_stk_code_hash_0    3359.792433
4                    title_len    3198.852174
7   user_watch_stk_code_hash_1    2804.459620
8   user_watch_stk_code_hash_2    2645.200194
33               tag_id_hash_2    2603.419945
9   user_watch_stk_code_hash_3    2591.612250
32               tag_id_hash_1    2223.176195
11      pr

# 09 深度模型全流程：类似Huggig Face的框架 这里用的是tensorflow的框架