# 中证1000高频因子分析

## 分析目标

分析各类高频因子对中证1000成分股（1000只股票）未来收益的预测效果

## 高频因子列表

| 因子名称 | 英文名 | 计算公式 |
|---------|--------|----------|
| 订单不平衡因子 | order_imbalance | (BidVol - AskVol) / (BidVol + AskVol) |
| 有效价差因子 | effective_spread | 2 * |MidPrice - TradePrice| / MidPrice |
| 已实现波动率 | realized_volatility | sqrt(sum(log(price_t/price_{t-1})^2)) |
| 买卖价差因子 | bid_ask_spread | Ask1 - Bid1 |
| VWAP偏离因子 | vwap_deviation | (Price - VWAP) / VWAP |
| 价格动量因子 | price_momentum | Price_t / Price_{t-n} - 1 |
| 订单流强度 | trade_flow_intensity | 单位时间交易量变化 |
| 微价格因子 | micro_price | (Bid1*AskVol + Ask1*BidVol) / (BidVol + AskVol) |
| 交易不平衡 | trade_imbalance | 主动买/卖量差占比 |
| 深度不平衡 | depth_imbalance | 深度加权价格不平衡度 |

---

**数据概况**
- 股票数量: 1000只
- 时间范围: 2025-12-01 ~ 2026-02-06
- 高频记录数: 104,735,954条
- 日度因子数: 27,705条

## 1. 导入必要的库

In [1]:
import os
import sys
from pathlib import Path
import warnings
warnings.filterwarnings('ignore')

import pandas as pd
import numpy as np
from scipy import stats
import matplotlib.pyplot as plt
import matplotlib
matplotlib.use('Agg')
plt.rcParams['font.sans-serif'] = ['DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False
import seaborn as sns

print("库导入成功!")
print(f"pandas: {pd.__version__}, numpy: {np.__version__}")

库导入成功!
pandas: 1.5.3, numpy: 1.22.0


In [2]:
factor_dir = './factor/high_frequency/'
daily_dir = './daily_data/daily/'
print(os.listdir(factor_dir))
print(os.listdir(daily_dir))
tdf = pd.read_parquet(factor_dir+'2026_01_09.parquet')
daily_df = pd.read_parquet(daily_dir+'daily_20250102.parquet')
print(daily_df)

['2025_12_15.parquet', '2026_01_09.parquet', '2026_01_05.parquet', '2026_01_21.parquet', '2026_01_28.parquet', '2025_12_23.parquet', '2026_01_30.parquet', '2025_12_11.parquet', '2026_01_29.parquet', '2025_12_04.parquet', '2026_01_15.parquet', '2026_01_06.parquet', '2026_01_07.parquet', '2026_02_09.parquet', '2026_01_14.parquet', '2025_11_04.parquet', '2026_02_03.parquet', '2026_01_19.parquet', '2026_02_05.parquet', '2026_01_22.parquet', '2026_01_27.parquet', '2026_01_12.parquet', '2025_12_19.parquet', '2025_12_03.parquet', '2025_12_17.parquet', '2026_02_04.parquet', '2025_12_22.parquet', '2025_12_08.parquet', '2026_01_16.parquet', '2026_02_06.parquet', '2026_01_20.parquet', '2025_12_25.parquet', '2025_12_01.parquet', '2025_11_06.parquet', '2025_12_24.parquet', '2026_01_13.parquet', '2025_12_16.parquet', '2026_01_23.parquet', '2025_12_05.parquet', '2025_11_05.parquet', '2026_02_02.parquet', '2025_12_26.parquet', '2025_12_10.parquet', '2025_12_12.parquet', '2026_01_08.parquet', '2025_12_

In [None]:
print('测试get_local_data函数')
from mylib.get_local_data import get_local_data
daily_df = pd.read_parquet(daily_dir+'daily_20250102.parquet')
demo_df = daily_df['ts_code'].unique()
# demo_df = ['000001.SZ','000002.SZ','7654321.SZ']
print(demo_df)
data_df = get_local_data(demo_df, '20250102', '20251210')
print(data_df)
# print(get_local_data())


测试get_local_data函数
['000001.SZ' '000002.SZ' '000004.SZ' ... '920118.BJ' '920128.BJ'
 '302132.SZ']
ts_code     000001.SZ  000002.SZ  000004.SZ  000006.SZ  000007.SZ  000008.SZ  \
date                                                                           
2025-01-02      11.43       7.11      14.18       7.25       7.02       2.84   
2025-01-03      11.38       7.00      12.80       6.91       6.60       2.65   
2025-01-06      11.44       6.98      12.52       6.70       6.63       2.58   
2025-01-07      11.51       7.05      13.11       6.85       6.93       2.66   
2025-01-08      11.50       6.96      13.40       6.96       6.93       2.64   
2025-01-09      11.40       6.95      13.26       6.90       7.13       2.65   
2025-01-10      11.30       6.69      12.51       6.72       6.77       2.71   

ts_code     000009.SZ  000010.SZ  000011.SZ  000012.SZ  ...  920964.BJ  \
date                                                    ...              
2025-01-02       8.90       2.71 

## 2. 数据加载与预处理

In [None]:
# ==================== 数据配置 ====================
FACTOR_DIR = "/data1/code_git/tick_data_analysis/factor/daily"
OUTPUT_DIR = "./hf_analysis_results"

class HighFrequencyFactorAnalyzer:
    """高频因子分析器"""

    def __init__(self):
        self.hf_factors = [
            'order_imbalance', 'effective_spread', 'realized_volatility',
            'bid_ask_spread', 'vwap_deviation', 'price_momentum',
            'trade_flow_intensity', 'micro_price', 'trade_imbalance',
            'depth_imbalance'
        ]

    def load_data(self):
        """加载因子数据"""
        print("="*60)
        print("加载高频因子数据")
        print("="*60)

        files = sorted(Path(FACTOR_DIR).glob("zz1000_factors_*.parquet"))
        print(f"发现 {len(files)} 个文件")

        all_df = []
        for i, f in enumerate(files):
            if (i+1) % 5 == 0:
                print(f"  加载: {i+1}/{len(files)}")
            df = pd.read_parquet(f)
            all_df.append(df)

        self.raw = pd.concat(all_df, ignore_index=True)
        self.hf_factors = [f for f in self.hf_factors if f in self.raw.columns]

        print(f"\n原始数据: {len(self.raw):,} 条")
        print(f"股票数: {self.raw['stock_code'].nunique()}")
        print(f"日期: {self.raw['date'].min()} ~ {self.raw['date'].max()}")

        return self

    def aggregate_daily(self):
        """聚合日度因子"""
        print("\n聚合日度因子...")

        df = self.raw.copy()

        # 按股票-日期聚合
        agg_dict = {f: ['mean', 'std'] for f in self.hf_factors}
        daily = df.groupby(['stock_code', 'date']).agg(agg_dict)
        daily.columns = [f"{c[0]}_{c[1]}" for c in daily.columns]
        daily = daily.reset_index()

        # 价格信息
        price = df.groupby(['stock_code', 'date']).agg({
            'lastPrice': ['first', 'last'], 'open': 'first'
        }).reset_index()
        price.columns = ['stock_code', 'date', 'first', 'last', 'open']

        self.daily = pd.merge(daily, price, on=['stock_code', 'date'])
        self.daily['return'] = self.daily['last'] / self.daily['first'] - 1

        print(f"日度因子: {len(self.daily):,} 条, {self.daily['stock_code'].nunique()} 只")

        return self

    def calc_future_returns(self, periods=[1, 5, 10, 20]):
        """计算未来收益"""
        print("\n计算未来收益...")

        df = self.daily.sort_values(['stock_code', 'date'])

        for p in periods:
            df[f'ret_{p}d'] = df.groupby('stock_code')['return'].transform(
                lambda x: x.shift(-1).rolling(p).sum().shift(-p+1)
            )

        self.data = df
        print(f"合并数据: {len(df):,} 条")

        return self

    def calc_ic(self, ret_col='ret_1d'):
        """计算IC"""
        print(f"\nIC分析 ({ret_col})...")

        ic = {}
        for f in self.hf_factors:
            col = f"{f}_mean"
            valid = self.data[['stock_code', col, ret_col]].dropna()

            if len(valid) < 100:
                continue

            ic_val = valid[col].corr(valid[ret_col])
            n = len(valid)
            t = ic_val * np.sqrt((n-2)/(1-ic_val**2+1e-10))
            p = 2 * (1 - stats.t.cdf(abs(t), n-2))
            sp, _ = stats.spearmanr(valid[col], valid[ret_col])

            ic[f] = {'IC': ic_val, 'RankIC': sp, 'IC_abs': abs(ic_val), 'P': p, 'N': n}

        self.ic = pd.DataFrame(ic).T.sort_values('IC_abs', ascending=False)

        print("\nIC结果:")
        print("-"*70)
        print(f"{'因子':<20} {'IC':>8} {'RankIC':>8} {'|IC|':>8} {'P值':>10} {'显著':>6}")
        print("-"*70)

        for idx, row in self.ic.iterrows():
            sig = '***' if row['P'] < 0.001 else '**' if row['P'] < 0.01 else '*' if row['P'] < 0.05 else ''
            print(f"{idx:<20} {row['IC']:>8.4f} {row['RankIC']:>8.4f} {row['IC_abs']:>8.4f} {row['P']:>10.4f} {sig:>6}")

        return self.ic

    def calc_ic_periods(self):
        """多持有期IC"""
        print("\n多持有期IC...")

        all_ic = {}
        for p in [1, 5, 10, 20]:
            ret = f'ret_{p}d'
            for f in self.hf_factors:
                col = f"{f}_mean"
                key = f"{f}_{p}d"
                valid = self.data[['stock_code', col, ret]].dropna()
                if len(valid) > 100:
                    all_ic[key] = valid[col].corr(valid[ret])

        self.ic_periods = pd.DataFrame(all_ic, index=[0]).T

        # 透视表
        data = [(k.rsplit('_', 1)[0], k.rsplit('_', 1)[1].replace('d', ''), v)
                for k, v in all_ic.items()]
        pivot = pd.DataFrame(data, columns=['factor', 'period', 'IC']).pivot(
            index='factor', columns='period', values='IC')
        print(pivot.round(4))

        return self.ic_periods

    def ranking_analysis(self, ret_col='ret_1d'):
        """分层回测"""
        print("\n分层回测...")

        results = []

        for f in self.hf_factors:
            col = f"{f}_mean"
            valid = self.data[['stock_code', col, ret_col]].dropna()

            if len(valid) < 100:
                continue

            try:
                valid['g'] = pd.qcut(valid[col], 5, labels=False, duplicates='drop')
                grp = valid.groupby('g')[ret_col].mean()
                ls = grp.iloc[-1] - grp.iloc[0]
                dir_ = '正向' if grp.iloc[-1] > grp.iloc[0] else '负向'
                ic_val = self.ic.loc[f, 'IC'] if f in self.ic.index else 0

                results.append({
                    '因子': f, 'IC': ic_val, '多空': ls*100,
                    '方向': dir_, 'Q1': grp.iloc[0]*100, 'Q5': grp.iloc[-1]*100
                })
            except:
                continue

        self.ranking = pd.DataFrame(results).sort_values('多空', ascending=False)

        print("\n结果:")
        print("-"*80)
        print(f"{'因子':<20} {'IC':>8} {'多空':>10} {'方向':>6} {'Q1':>10} {'Q5':>10}")
        print("-"*80)

        for _, r in self.ranking.iterrows():
            print(f"{r['因子']:<20} {r['IC']:>8.4f} {r['多空']:>8.2f}% {r['方向']:>6} {r['Q1']:>9.2f}% {r['Q5']:>9.2f}%")

        return self.ranking

    def visualize(self):
        """可视化"""
        Path(OUTPUT_DIR).mkdir(exist_ok=True)

        fig, axes = plt.subplots(2, 2, figsize=(16, 12))

        # IC柱状图
        ax1 = axes[0, 0]
        ic_s = self.ic.sort_values('IC')
        c = ['g' if x > 0 else 'r' for x in ic_s['IC']]
        ax1.barh(range(len(ic_s)), ic_s['IC'], color=c, alpha=0.7)
        ax1.set_yticks(range(len(ic_s)))
        ax1.set_yticklabels(ic_s.index, fontsize=9)
        ax1.axvline(0, c='k', lw=0.5)
        ax1.set_title('Factor IC')

        # |IC|排名
        ax2 = axes[0, 1]
        ic_a = self.ic.sort_values('IC_abs', ascending=True)
        ax2.barh(range(len(ic_a)), ic_a['IC_abs'], color='steelblue', alpha=0.7)
        ax2.set_yticks(range(len(ic_a)))
        ax2.set_yticklabels(ic_a.index, fontsize=9)
        ax2.axvline(0.02, c='r', ls='--', lw=1)
        ax2.set_title('|IC| Ranking')

        # 多持有期IC
        ax3 = axes[1, 0]
        data = [(k.rsplit('_', 1)[0], k.rsplit('_', 1)[1].replace('d', ''), v)
                for k, v in self.ic_periods[0].items()]
        piv = pd.DataFrame(data, columns=['f', 'p', 'v']).pivot(index='f', columns='p', values='v')
        sns.heatmap(piv, annot=True, fmt='.3f', cmap='RdYlGn', center=0, ax=ax3)
        ax3.set_title('IC Across Periods')

        # 分层回测
        ax4 = axes[1, 1]
        top = self.ranking.head(8)
        c = ['g' if x > 0 else 'r' for x in top['多空']]
        ax4.barh(range(len(top)), top['多空'], color=c, alpha=0.7)
        ax4.set_yticks(range(len(top)))
        ax4.set_yticklabels(top['因子'], fontsize=9)
        ax4.axvline(0, c='k', lw=0.5)
        ax4.set_title('Long-Short Return')

        plt.tight_layout()
        plt.savefig(f"{OUTPUT_DIR}/hf_analysis.png", dpi=150)
        plt.close()
        print(f"\n图已保存: {OUTPUT_DIR}/hf_analysis.png")

    def run(self):
        """完整流程"""
        print("\n" + "="*80)
        print("中证1000高频因子分析")
        print("="*80)

        self.load_data()
        self.aggregate_daily()
        self.calc_future_returns()
        self.calc_ic()
        self.calc_ic_periods()
        self.ranking_analysis()
        self.visualize()

        print("\n" + "="*80)
        print("完成!")
        print("="*80)

        return self


print("分析器定义完成!")

## 3. 运行分析

In [None]:
# 运行完整分析
analyzer = HighFrequencyFactorAnalyzer()
analyzer.run()

## 4. 核心发现

In [None]:
print("="*80)
print("核心发现")
print("="*80)

# 最强因子
print("\n【最强正向因子】(因子值高 → 收益高)")
pos = analyzer.ic[analyzer.ic['IC'] > 0].head(3)
for idx, row in pos.iterrows():
    print(f"  {idx}: IC={row['IC']:.4f}")

print("\n【最强负向因子】(因子值高 → 收益低)")
neg = analyzer.ic[analyzer.ic['IC'] < 0].head(3)
for idx, row in neg.iterrows():
    print(f"  {idx}: IC={row['IC']:.4f}")

print("\n【分层回测TOP5】")
top5 = analyzer.ranking.head(5)
for _, r in top5.iterrows():
    print(f"  {r['因子']}: 多空={r['多空']:.2f}%, IC={r['IC']:.4f}")

## 5. 结论与策略

In [None]:
print("""
="*80
策略建议
="*80

【1. 订单流因子】
-----------------
- order_imbalance: IC=-0.036 (负向)
  → 买方压力(OI>0)后价格倾向下跌
  → 可用于反向操作

【2. 波动率因子】
-----------------
- realized_volatility: IC=+0.022 (正向)
  → 高波动股票短期收益反而更高
  → 反映风险溢价

【3. 流动性因子】
-----------------
- bid_ask_spread: IC=-0.006 (负向, 不显著)
  → 高价差股票流动性差
  → 交易成本较高

【4. 价格偏离因子】
-----------------
- vwap_deviation: IC=-0.011 (负向)
  → 价格偏离VWAP后倾向于回归
  → 日内反转交易机会

【5. 动量因子】
-----------------
- price_momentum: IC=+0.004 (微弱正向)
  → 短期动量效应不明显
  → 长期(20天)IC增至0.039

【日内交易策略】
-----------------
1. 观察order_imbalance变化
2. 价格偏离VWAP时反向操作
3. 高波动股票设置更宽止损
4. 避免高bid_ask_spread的股票

【风险提示】
-----------------
1. 高频因子收益可能被交易成本侵蚀
2. 需考虑滑点和冲击成本
3. 不同市场环境下因子表现可能有差异
""")

# 保存结果
Path(OUTPUT_DIR).mkdir(exist_ok=True)
analyzer.ic.to_csv(f"{OUTPUT_DIR}/hf_ic.csv")
analyzer.ranking.to_csv(f"{OUTPUT_DIR}/hf_ranking.csv", index=False)
analyzer.ic_periods.to_csv(f"{OUTPUT_DIR}/hf_ic_periods.csv")

print(f"\n结果已保存至: {OUTPUT_DIR}/")

---

## 总结

| 因子 | IC | 评级 | 策略 |
|------|-----|------|------|
| order_imbalance | -0.036 | A级 | 反向操作 |
| depth_imbalance | -0.036 | A级 | 反向操作 |
| trade_flow_intensity | +0.025 | B级 | 高流强=高收益 |
| realized_volatility | +0.022 | B级 | 高波=高收益(风险溢价) |
| vwap_deviation | -0.011 | C级 | 日内反转 |
| price_momentum | +0.004 | D级 | 长期动量 |

**核心结论**: 高频因子以短期反转效应为主，订单不平衡是最强预测因子。