In [None]:
# 导入必要的库
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
from datetime import datetime, timedelta

# 设置绘图样式
plt.style.use('ggplot')
sns.set_style('whitegrid')
%matplotlib inline


In [None]:
# 加载Excel数据
def load_data_from_excel(file_path):
    """从Excel文件加载数据，自动识别股票代码列和日期列"""
    try:
        # 读取Excel文件
        df = pd.read_excel(file_path)
        
        # 显示列名，帮助识别
        print("Excel文件列名：")
        for col in df.columns:
            print(f"- {col}")
        
        # 自动识别股票代码列和日期列
        date_col = None
        code_col = None
        
        for col in df.columns:
            col_lower = str(col).lower()
            # 识别日期列
            if any(keyword in col_lower for keyword in ['date', '日期', 'time', '时间']):
                date_col = col
            # 识别股票代码列
            elif any(keyword in col_lower for keyword in ['code', '代码', 'stock', '股票', 'ticker', 'symbol']):
                code_col = col
        
        if date_col is None or code_col is None:
            print("警告：无法自动识别日期列或股票代码列，请手动指定")
            return df, None, None
        
        print(f"识别到的日期列: {date_col}")
        print(f"识别到的股票代码列: {code_col}")
        
        # 确保日期列是日期类型
        df[date_col] = pd.to_datetime(df[date_col])
        
        return df, date_col, code_col
    except Exception as e:
        print(f"加载数据时出错: {e}")
        return None, None, None

# 加载示例数据
file_path = "因子分析模版.xlsx"  # 请确保文件路径正确
data, date_col, code_col = load_data_from_excel(file_path)

if data is not None:
    # 显示数据前几行
    display(data.head())


In [None]:
def prepare_factor_data(df, date_col, code_col, forward_days=1):
    """准备因子分析数据，计算未来收益率"""
    if df is None or date_col is None or code_col is None:
        print("数据或列名未正确加载")
        return None
    
    # 复制数据以避免修改原始数据
    data = df.copy()
    
    # 识别可能的价格列
    price_cols = []
    for col in data.columns:
        col_lower = str(col).lower()
        if any(keyword in col_lower for keyword in ['price', '价格', 'close', '收盘']):
            price_cols.append(col)
    
    if not price_cols:
        print("警告：未找到价格列，无法计算未来收益率")
        return None
    
    price_col = price_cols[0]  # 使用第一个找到的价格列
    print(f"使用 '{price_col}' 列计算未来收益率")
    
    # 按股票代码和日期排序
    data = data.sort_values([code_col, date_col])
    
    # 计算未来收益率
    data['future_return'] = data.groupby(code_col)[price_col].pct_change(forward_days).shift(-forward_days)
    
    # 识别因子列（排除日期、代码、价格和未来收益列）
    factor_cols = [col for col in data.columns if col not in [date_col, code_col, price_col, 'future_return']]
    
    print(f"识别到的因子列: {len(factor_cols)}个")
    if len(factor_cols) > 0:
        print(f"前5个因子: {factor_cols[:5]}")
    
    # 删除缺失值
    data = data.dropna(subset=['future_return'] + factor_cols)
    
    return data, factor_cols

# 准备因子数据
processed_data, factor_cols = prepare_factor_data(data, date_col, code_col, forward_days=1)

if processed_data is not None:
    print(f"\n处理后的数据形状: {processed_data.shape}")
    display(processed_data.head())


In [None]:
def calculate_ic(data, factor_cols, date_col):
    """计算每个因子的IC值"""
    if data is None or not factor_cols:
        print("数据或因子列未正确加载")
        return None
    
    # 存储每个因子的IC值
    ic_dict = {}
    
    # 按日期分组计算IC
    for factor in factor_cols:
        # 计算每个日期的IC值
        daily_ic = data.groupby(date_col).apply(
            lambda x: x[factor].corr(x['future_return'], method='spearman')
        )
        
        # 计算平均IC
        avg_ic = daily_ic.mean()
        ic_dict[factor] = {
            'avg_ic': avg_ic,
            'ic_series': daily_ic
        }
    
    # 创建IC摘要DataFrame
    ic_summary = pd.DataFrame({
        'factor': list(ic_dict.keys()),
        'avg_ic': [ic_dict[f]['avg_ic'] for f in ic_dict]
    })
    
    # 按平均IC值排序
    ic_summary = ic_summary.sort_values('avg_ic', ascending=False)
    
    return ic_summary, ic_dict

# 计算IC值
ic_summary, ic_dict = calculate_ic(processed_data, factor_cols, date_col)

if ic_summary is not None:
    print("因子IC值摘要（按IC值降序排列）：")
    display(ic_summary)


In [None]:
def plot_ic_bars(ic_summary, top_n=10):
    """绘制因子IC值的柱状图"""
    if ic_summary is None or len(ic_summary) == 0:
        print("没有IC值数据可供绘图")
        return
    
    # 选择前N个和后N个因子
    top_factors = ic_summary.head(top_n)
    bottom_factors = ic_summary.tail(top_n)
    
    # 合并为一个DataFrame
    plot_data = pd.concat([top_factors, bottom_factors])
    
    # 绘制柱状图
    plt.figure(figsize=(12, 8))
    bars = plt.bar(plot_data['factor'], plot_data['avg_ic'])
    
    # 为正负IC值设置不同颜色
    for i, bar in enumerate(bars):
        if plot_data['avg_ic'].iloc[i] > 0:
            bar.set_color('green')
        else:
            bar.set_color('red')
    
    plt.axhline(y=0, color='black', linestyle='-', alpha=0.3)
    plt.title(f'Top {top_n} and Bottom {top_n} Factors by IC Value', fontsize=14)
    plt.xlabel('Factor')
    plt.ylabel('Average IC')
    plt.xticks(rotation=90)
    plt.tight_layout()
    plt.show()

# 绘制IC值柱状图
plot_ic_bars(ic_summary, top_n=10)


In [None]:
def factor_quantile_analysis(data, factor, date_col, n_quantiles=5):
    """按因子值将股票分组，分析各组的未来收益"""
    if data is None:
        print("没有数据可供分析")
        return None
    
    # 复制数据
    df = data.copy()
    
    # 按日期分组，计算因子分位数
    df['quantile'] = df.groupby(date_col)[factor].transform(
        lambda x: pd.qcut(x, n_quantiles, labels=False, duplicates='drop')
    )
    
    # 计算每个分位数的平均未来收益
    quantile_returns = df.groupby(['quantile', date_col])['future_return'].mean().reset_index()
    avg_returns = quantile_returns.groupby('quantile')['future_return'].mean()
    
    return avg_returns

# 选择IC值最高的因子进行分组分析
if ic_summary is not None and len(ic_summary) > 0:
    top_factor = ic_summary.iloc[0]['factor']
    print(f"对IC值最高的因子进行分组分析: {top_factor}")
    
    quantile_returns = factor_quantile_analysis(processed_data, top_factor, date_col)
    
    if quantile_returns is not None:
        # 绘制分位数收益图
        plt.figure(figsize=(10, 6))
        quantile_returns.plot(kind='bar')
        plt.title(f'{top_factor} Factor Quantile Returns', fontsize=14)
        plt.xlabel('Quantile (0 = Lowest Factor Value, 4 = Highest)')
        plt.ylabel('Average Future Return')
        plt.axhline(y=0, color='black', linestyle='-', alpha=0.3)
        plt.tight_layout()
        plt.show()


In [None]:
def create_multi_factor(data, factor_cols, ic_summary, top_n=5):
    """创建多因子组合"""
    if data is None or ic_summary is None:
        print("数据或IC摘要未正确加载")
        return None
    
    # 选择IC值最高的N个因子
    top_factors = ic_summary.head(top_n)['factor'].tolist()
    print(f"选择的顶级因子: {top_factors}")
    
    # 复制数据
    df = data.copy()
    
    # 标准化因子值
    for factor in top_factors:
        df[f'{factor}_zscore'] = df.groupby(date_col)[factor].transform(lambda x: (x - x.mean()) / x.std())
    
    # 根据IC值的符号调整因子方向
    weights = {}
    for factor in top_factors:
        ic_value = ic_summary[ic_summary['factor'] == factor]['avg_ic'].values[0]
        # 如果IC为负，则反转因子方向
        weights[factor] = np.sign(ic_value)
    
    # 创建综合因子
    df['multi_factor'] = 0
    for factor in top_factors:
        df['multi_factor'] += df[f'{factor}_zscore'] * weights[factor]
    
    return df

# 创建多因子组合
if ic_summary is not None and len(ic_summary) >= 5:
    multi_factor_data = create_multi_factor(processed_data, factor_cols, ic_summary, top_n=5)
    
    if multi_factor_data is not None:
        # 计算多因子的IC值
        multi_factor_ic = multi_factor_data.groupby(date_col).apply(
            lambda x: x['multi_factor'].corr(x['future_return'], method='spearman')
        ).mean()
        
        print(f"多因子组合的平均IC值: {multi_factor_ic:.4f}")
        
        # 进行分组分析
        quantile_returns = factor_quantile_analysis(multi_factor_data, 'multi_factor', date_col)
        
        if quantile_returns is not None:
            # 绘制分位数收益图
            plt.figure(figsize=(10, 6))
            quantile_returns.plot(kind='bar')
            plt.title('Multi-Factor Quantile Returns', fontsize=14)
            plt.xlabel('Quantile (0 = Lowest Factor Value, 4 = Highest)')
            plt.ylabel('Average Future Return')
            plt.axhline(y=0, color='black', linestyle='-', alpha=0.3)
            plt.tight_layout()
            plt.show()


In [None]:
def analyze_ic_stability(ic_dict, top_n=3):
    """分析顶级因子的IC稳定性"""
    if ic_dict is None or not ic_dict:
        print("没有IC数据可供分析")
        return
    
    # 选择IC值最高的N个因子
    top_factors = ic_summary.head(top_n)['factor'].tolist()
    
    # 创建IC时间序列DataFrame
    ic_ts_data = {factor: ic_dict[factor]['ic_series'] for factor in top_factors}
    ic_ts_df = pd.DataFrame(ic_ts_data)
    
    # 绘制IC时间序列
    plt.figure(figsize=(14, 7))
    for factor in top_factors:
        plt.plot(ic_ts_df.index, ic_ts_df[factor], label=factor)
    
    plt.axhline(y=0, color='black', linestyle='-', alpha=0.3)
    plt.title('Factor IC Time Series', fontsize=14)
    plt.xlabel('Date')
    plt.ylabel('IC Value')
    plt.legend()
    plt.tight_layout()
    plt.show()
    
    # 计算IC稳定性指标
    ic_stats = {}
    for factor in top_factors:
        ic_series = ic_ts_df[factor].dropna()
        ic_stats[factor] = {
            'mean': ic_series.mean(),
            'std': ic_series.std(),
            'ir': ic_series.mean() / ic_series.std() if ic_series.std() > 0 else 0,
            'positive_ratio': (ic_series > 0).mean()
        }
    
    # 创建统计摘要DataFrame
    stats_df = pd.DataFrame(ic_stats).T
    stats_df.columns = ['平均IC', '标准差', 'IR比率', 'IC为正的比例']
    
    return stats_df

# 分析IC稳定性
if ic_dict is not None and ic_summary is not None and len(ic_summary) >= 3:
    ic_stability_stats = analyze_ic_stability(ic_dict, top_n=3)
    
    if ic_stability_stats is not None:
        print("因子IC稳定性统计：")
        display(ic_stability_stats)


In [None]:
# 股票因子分析完整示例

本笔记本演示如何使用 Alphalens 进行专业的股票因子分析，包括：
- 从 Excel 文件加载数据
- 构建和测试股票因子
- 使用 Alphalens 进行因子分析
- 解释分析结果

## 目录
1. [环境设置和数据加载](#环境设置)
2. [数据预处理](#数据预处理)
3. [因子构建](#因子构建)
4. [Alphalens 因子分析](#Alphalens因子分析)
5. [结果解释](#结果解释)


In [None]:
## 1. 环境设置和数据加载 {#环境设置}


In [1]:
# 导入必要的库
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
from datetime import datetime, timedelta

# Alphalens 相关
import alphalens as al
from alphalens.utils import get_clean_factor_and_forward_returns
from alphalens.tears import create_full_tear_sheet

# 设置显示和警告
plt.rcParams['font.sans-serif'] = ['SimHei']  # 中文字体
plt.rcParams['axes.unicode_minus'] = False     # 负号显示
warnings.filterwarnings('ignore')
pd.set_option('display.max_columns', None)
pd.set_option('display.width', 1000)

print("环境设置完成！")
print(f"Alphalens 版本: {al.__version__}")
print(f"Pandas 版本: {pd.__version__}")


ModuleNotFoundError: No module named 'alphalens'

In [None]:
# 加载 Excel 数据
# 请将您的 Excel 文件路径替换为实际路径
excel_file_path = "./data/因子分析模版.xlsx"  # 修改为您的文件路径

def load_stock_data(file_path):
    """
    从 Excel 文件加载股票数据
    数据格式：
    - 第一列(A列)：股票代码
    - 第二列(B列)：时间戳
    - 后续列(C列开始)：各种因子数据
    """
    try:
        # 读取 Excel 文件
        df = pd.read_excel(file_path, header=[0, 1, 2, 3])  # 读取多级表头
        print(f"成功加载数据，共 {len(df)} 行 {len(df.columns)} 列")
        
        # 获取第一行作为因子名称
        factor_names = []
        for col in df.columns:
            if isinstance(col, tuple) and len(col) > 0:
                factor_names.append(col[0])
            else:
                factor_names.append(col)
                
        print("\n因子名称：")
        print(factor_names)
        
        # 重命名列，使用第一行文字作为因子名称
        df.columns = factor_names
        
        print("\n数据预览：")
        print(df.head())
        return df
    except FileNotFoundError:
        print(f"文件 {file_path} 不存在，使用模拟数据")
        return create_sample_data()
    except Exception as e:
        print(f"加载数据时出错: {e}")
        print("使用模拟数据")
        return create_sample_data()

def create_sample_data():
    """
    创建示例数据用于演示
    """
    # 创建日期范围
    dates = pd.date_range('2023-01-01', '2023-12-31', freq='D')
    
    # 创建股票代码
    stocks = ['000001.SZ', '000002.SZ', '600000.SH', '600036.SH', '000858.SZ']
    
    # 模拟因子名称，与图片中的列名一致
    factor_names = [
        '股票代码', '时间', 
        '前收盘价', '收盘价', '最高价', '最低价', '成交额', '成交笔数', '涨跌',
        'RSI相对强弱指标', 'MACD指数平滑移动平均', 'MA简单移动平均', 'MA简单移动平均',
        'MTM动力指标', 'ROC变动速率', 'RC变化率指数', 'PVT量价趋势指标', 
        'PRICEOSC价格振荡指标', 'ADTM动态买卖气指标', 'ATR真实波幅', 'BBI多空指数', 'BBIBOLL多空布林线'
    ]
    
    data = []
    
    for date in dates:
        for stock in stocks:
            # 模拟股票数据
            price = 10 + np.random.randn() * 2
            volume = np.random.randint(1000, 10000)
            
            row_data = {
                '股票代码': stock,
                '时间': date,
                '前收盘价': price - np.random.randn() * 0.1,
                '收盘价': price,
                '最高价': price + abs(np.random.randn()) * 0.2,
                '最低价': price - abs(np.random.randn()) * 0.2,
                '成交额': volume * price,
                '成交笔数': np.random.randint(100, 1000),
                '涨跌': np.random.randn() * 0.05,
                'RSI相对强弱指标': 50 + np.random.randn() * 10,
                'MACD指数平滑移动平均': np.random.randn() * 0.5,
                'MA简单移动平均': price + np.random.randn() * 0.3,
                'MTM动力指标': np.random.randn() * 2,
                'ROC变动速率': np.random.randn() * 5,
                'RC变化率指数': np.random.randn() * 3,
                'PVT量价趋势指标': volume * np.random.randn(),
                'PRICEOSC价格振荡指标': np.random.randn() * 2,
                'ADTM动态买卖气指标': np.random.randn() * 0.8,
                'ATR真实波幅': abs(np.random.randn()) * 0.3,
                'BBI多空指数': price + np.random.randn() * 0.2,
                'BBIBOLL多空布林线': price + np.random.randn() * 0.4
            }
            data.append(row_data)
    
    df = pd.DataFrame(data)
    print("创建了模拟数据用于演示")
    return df

# 加载数据
raw_data = load_stock_data(excel_file_path)


In [None]:
## 2. 数据预处理 {#数据预处理}


In [None]:
def preprocess_data(df):
    """
    数据预处理函数
    """
    # 复制数据避免修改原始数据
    data = df.copy()
    
    # 确保时间列是 datetime 类型
    time_col = '时间'
    stock_col = '股票代码'
    
    if time_col in data.columns:
        data[time_col] = pd.to_datetime(data[time_col])
    else:
        # 假设第二列是时间
        data.rename(columns={data.columns[1]: time_col}, inplace=True)
        data[time_col] = pd.to_datetime(data[time_col])
    
    # 确保有股票代码列
    if stock_col not in data.columns:
        # 假设第一列是股票代码
        data.rename(columns={data.columns[0]: stock_col}, inplace=True)
    
    # 计算涨跌幅（如果没有）
    if '涨跌幅' not in data.columns and '收盘价' in data.columns and '前收盘价' in data.columns:
        data['涨跌幅'] = (data['收盘价'] / data['前收盘价'] - 1) * 100
        print("计算了涨跌幅因子")
    
    # 设置多重索引
    data = data.set_index([time_col, stock_col])
    
    # 删除缺失值过多的列
    data = data.dropna(axis=1, thresh=len(data)*0.5)
    
    # 填充缺失值
    data = data.fillna(method='ffill').fillna(method='bfill')
    
    print(f"预处理后数据形状: {data.shape}")
    print(f"时间范围: {data.index.get_level_values(time_col).min()} 到 {data.index.get_level_values(time_col).max()}")
    print(f"股票数量: {len(data.index.get_level_values(stock_col).unique())}")
    
    return data

# 预处理数据
processed_data = preprocess_data(raw_data)
processed_data.head(10)


In [None]:
## 3. 因子构建 {#因子构建}


In [None]:
def calculate_future_returns(data, periods=[1, 5, 10]):
    """
    计算未来收益率
    """
    # 确定价格列和股票代码列
    price_col = '收盘价' if '收盘价' in data.columns else None
    target_col = '涨跌' if '涨跌' in data.columns else None
    stock_col = '股票代码'
    
    if not price_col:
        print("警告: 未找到收盘价列，使用第一个数值列")
        price_col = data.select_dtypes(include=[np.number]).columns[0]
    
    returns = pd.DataFrame(index=data.index)
    
    def forward_returns(group):
        """计算个股的未来收益率"""
        stock_returns = pd.DataFrame(index=group.index)
        
        for period in periods:
            # 计算未来 period 天的收益率
            future_price = group[price_col].shift(-period)
            stock_returns[f'{period}D'] = (future_price / group[price_col] - 1) * 100  # 转为百分比
        
        # 如果有涨跌列，直接使用
        if target_col:
            stock_returns['1D_actual'] = group[target_col]
            
        return stock_returns
    
    # 使用正确的分组键
    returns = data.groupby(level=stock_col).apply(forward_returns).droplevel(0)
    
    print(f"计算了 {periods} 天的未来收益率")
    print(f"收益率数据形状: {returns.shape}")
    
    return returns

# 计算未来收益率
forward_returns = calculate_future_returns(processed_data)
forward_returns.head()


In [None]:
def calculate_technical_factors(data):
    """
    计算技术分析因子
    """
    factors = pd.DataFrame(index=data.index)
    
    # 确保我们有基本的价格数据
    if 'close' in data.columns:
        price_col = 'close'
    else:
        # 如果没有 close 列，使用第一个数值列作为价格
        price_col = data.select_dtypes(include=[np.number]).columns[0]
    
    if 'volume' in data.columns:
        volume_col = 'volume'
    else:
        # 如果没有 volume 列，尝试找到相关列
        volume_candidates = [col for col in data.columns if 'vol' in col.lower()]
        volume_col = volume_candidates[0] if volume_candidates else None
    
    print(f"使用价格列: {price_col}")
    print(f"使用成交量列: {volume_col}")
    
    # 1. 动量因子
    def momentum_factor(group):
        """计算动量因子（过去20日收益率）"""
        if len(group) < 21:
            return pd.Series(index=group.index, dtype=float)
        return group[price_col].pct_change(20)
    
    factors['momentum_20d'] = (processed_data.groupby('code')
                              .apply(momentum_factor)
                              .droplevel(0))
    
    # 2. 反转因子
    def reversal_factor(group):
        """计算反转因子（过去5日收益率的负值）"""
        if len(group) < 6:
            return pd.Series(index=group.index, dtype=float)
        return -group[price_col].pct_change(5)
    
    factors['reversal_5d'] = (processed_data.groupby('code')
                             .apply(reversal_factor)
                             .droplevel(0))
    
    # 3. 波动率因子
    def volatility_factor(group):
        """计算波动率因子（过去20日收益率标准差）"""
        if len(group) < 21:
            return pd.Series(index=group.index, dtype=float)
        returns = group[price_col].pct_change()
        return returns.rolling(20).std()
    
    factors['volatility_20d'] = (processed_data.groupby('code')
                                .apply(volatility_factor)
                                .droplevel(0))
    
    # 4. 成交量因子（如果有成交量数据）
    if volume_col:
        def volume_factor(group):
            """计算成交量因子（过去20日平均成交量比率）"""
            if len(group) < 21:
                return pd.Series(index=group.index, dtype=float)
            avg_volume = group[volume_col].rolling(20).mean()
            return group[volume_col] / avg_volume
        
        factors['volume_ratio_20d'] = (processed_data.groupby('code')
                                     .apply(volume_factor)
                                     .droplevel(0))
    
    # 5. 价值因子（如果有相关数据）
    if 'pe_ratio' in data.columns:
        factors['pe_factor'] = -data['pe_ratio']  # PE 越低越好，所以取负值
    
    if 'pb_ratio' in data.columns:
        factors['pb_factor'] = -data['pb_ratio']  # PB 越低越好，所以取负值
    
    # 删除缺失值
    factors = factors.dropna()
    
    print(f"\n计算出 {len(factors.columns)} 个因子：")
    print(factors.columns.tolist())
    print(f"因子数据形状: {factors.shape}")
    
    return factors

# 计算因子
factor_data = calculate_technical_factors(processed_data)
factor_data.head()


In [None]:
## 4. Alphalens 因子分析 {#Alphalens因子分析}


In [None]:
def analyze_factors_ic(factor_data, forward_returns):
    """
    计算因子IC值（信息系数）
    """
    print("开始计算因子IC值...")
    
    # 确保索引一致
    common_index = factor_data.index.intersection(forward_returns.index)
    factors = factor_data.loc[common_index]
    returns = forward_returns.loc[common_index]
    
    # 创建结果DataFrame
    ic_results = pd.DataFrame(
        index=factor_data.columns,
        columns=['IC_1D', 'IC_5D', 'IC_10D', 'IC_Rank_1D', 'IC_Rank_5D', 'IC_Rank_10D']
    )
    
    # 计算每个因子的IC值
    for factor_name in factors.columns:
        factor_series = factors[factor_name].dropna()
        
        # 对于每个预测期
        for period in ['1D', '5D', '10D']:
            if period in returns.columns:
                return_series = returns[period].dropna()
                
                # 确保数据对齐
                common_idx = factor_series.index.intersection(return_series.index)
                if len(common_idx) > 0:
                    f = factor_series.loc[common_idx]
                    r = return_series.loc[common_idx]
                    
                    # 计算Pearson相关系数（线性IC）
                    ic = f.corr(r)
                    ic_results.loc[factor_name, f'IC_{period}'] = ic
                    
                    # 计算Spearman秩相关系数（秩IC）
                    rank_ic = f.corr(r, method='spearman')
                    ic_results.loc[factor_name, f'IC_Rank_{period}'] = rank_ic
    
    # 按1日IC绝对值排序
    ic_results = ic_results.sort_values('IC_1D', key=abs, ascending=False)
    
    print("因子IC计算完成！")
    return ic_results

# 计算因子IC
ic_results = analyze_factors_ic(factor_data, forward_returns)
ic_results


In [None]:
def visualize_ic_results(ic_results):
    """
    可视化因子IC结果
    """
    # 设置图表大小
    plt.figure(figsize=(14, 8))
    
    # 选择前10个因子
    top_factors = ic_results.head(10).index
    
    # 绘制1日IC柱状图
    plt.subplot(1, 2, 1)
    ic_1d = ic_results.loc[top_factors, 'IC_1D'].sort_values()
    bars = plt.barh(ic_1d.index, ic_1d.values, color=['r' if x < 0 else 'g' for x in ic_1d.values])
    plt.axvline(x=0, color='k', linestyle='-', alpha=0.3)
    plt.title('前10个因子的1日IC值', fontsize=12)
    plt.xlabel('IC值')
    plt.grid(axis='x', linestyle='--', alpha=0.5)
    
    # 为每个柱状图添加数值标签
    for bar in bars:
        width = bar.get_width()
        label_x_pos = width if width > 0 else width - 0.03
        plt.text(label_x_pos, bar.get_y() + bar.get_height()/2, f'{width:.4f}', 
                 va='center', ha='left' if width > 0 else 'right', fontsize=9)
    
    # 绘制5日IC柱状图
    plt.subplot(1, 2, 2)
    ic_5d = ic_results.loc[top_factors, 'IC_5D'].sort_values()
    bars = plt.barh(ic_5d.index, ic_5d.values, color=['r' if x < 0 else 'g' for x in ic_5d.values])
    plt.axvline(x=0, color='k', linestyle='-', alpha=0.3)
    plt.title('前10个因子的5日IC值', fontsize=12)
    plt.xlabel('IC值')
    plt.grid(axis='x', linestyle='--', alpha=0.5)
    
    # 为每个柱状图添加数值标签
    for bar in bars:
        width = bar.get_width()
        label_x_pos = width if width > 0 else width - 0.03
        plt.text(label_x_pos, bar.get_y() + bar.get_height()/2, f'{width:.4f}', 
                 va='center', ha='left' if width > 0 else 'right', fontsize=9)
    
    plt.tight_layout()
    plt.show()

# 可视化IC结果
visualize_ic_results(ic_results)


In [None]:
def analyze_factor_with_alphalens(factor_data, forward_returns, factor_name):
    """
    使用Alphalens进行单因子分析
    """
    print(f"使用Alphalens分析因子: {factor_name}")
    
    # 获取因子数据
    factor_series = factor_data[factor_name]
    
    # 获取价格数据（用于Alphalens）
    if '收盘价' in processed_data.columns:
        price_series = processed_data['收盘价']
    else:
        price_series = processed_data.select_dtypes(include=[np.number]).columns[0]
        price_series = processed_data[price_series]
    
    try:
        # 准备Alphalens格式的数据
        factor_data_clean = get_clean_factor_and_forward_returns(
            factor=factor_series,
            prices=price_series,
            periods=(1, 5, 10),
            quantiles=5,
            binning_by_group=False,
            max_loss=0.35
        )
        
        print("Alphalens数据准备完成，生成分析报告...")
        
        # 创建完整的因子分析报告
        create_full_tear_sheet(factor_data_clean)
        
        return factor_data_clean
        
    except Exception as e:
        print(f"Alphalens分析出错: {e}")
        print("尝试手动计算分位数收益...")
        
        # 手动计算分位数收益
        factor_quantiles = pd.qcut(factor_series, 5, labels=False) + 1
        
        # 合并因子分位数和收益率
        combined = pd.DataFrame({
            'factor': factor_series,
            'quantile': factor_quantiles,
            'return_1d': forward_returns['1D']
        }).dropna()
        
        # 计算各分位数的平均收益
        quantile_returns = combined.groupby('quantile')['return_1d'].mean()
        
        # 绘制分位数收益图
        plt.figure(figsize=(10, 6))
        quantile_returns.plot(kind='bar', color='skyblue')
        plt.title(f'{factor_name} 因子分位数收益')
        plt.xlabel('分位数')
        plt.ylabel('平均收益率 (%)')
        plt.grid(axis='y', linestyle='--', alpha=0.5)
        plt.show()
        
        return quantile_returns

# 选择IC值最高的因子进行Alphalens分析
if not ic_results.empty:
    top_factor = ic_results.index[0]
    print(f"选择IC值最高的因子进行详细分析: {top_factor}")
    alphalens_result = analyze_factor_with_alphalens(factor_data, forward_returns, top_factor)
else:
    print("没有有效的因子IC结果")


In [None]:
## 5. 结果解释 {#结果解释}


In [None]:
def interpret_ic_results(ic_results):
    """
    解释IC结果
    """
    print("因子IC分析结果解释:")
    print("=" * 50)
    
    # 筛选有效的因子（IC值不为NaN）
    valid_factors = ic_results.dropna(subset=['IC_1D'])
    
    if valid_factors.empty:
        print("没有找到有效的因子IC结果")
        return
    
    # 1. 最佳因子
    best_factor = valid_factors.iloc[0].name
    best_ic = valid_factors.iloc[0]['IC_1D']
    best_rank_ic = valid_factors.iloc[0]['IC_Rank_1D']
    
    print(f"1. 最佳因子: {best_factor}")
    print(f"   - 1日IC值: {best_ic:.4f}")
    print(f"   - 1日秩IC值: {best_rank_ic:.4f}")
    
    # 评估IC值
    if abs(best_ic) > 0.05:
        print("   - 评价: 🏆 优秀 (|IC| > 0.05)")
    elif abs(best_ic) > 0.02:
        print("   - 评价: 👍 良好 (|IC| > 0.02)")
    else:
        print("   - 评价: ⚠️ 一般 (|IC| < 0.02)")
    
    # 2. 因子稳定性分析
    print("\n2. 因子稳定性分析:")
    for i, (factor, row) in enumerate(valid_factors.head(5).iterrows()):
        ic_1d = row['IC_1D']
        ic_5d = row['IC_5D']
        ic_10d = row['IC_10D'] if 'IC_10D' in row and not pd.isna(row['IC_10D']) else None
        
        print(f"   {i+1}. {factor}:")
        print(f"      - 1日IC: {ic_1d:.4f}")
        print(f"      - 5日IC: {ic_5d:.4f}")
        if ic_10d is not None:
            print(f"      - 10日IC: {ic_10d:.4f}")
        
        # 判断因子稳定性
        if ic_5d is not None and not pd.isna(ic_5d):
            if np.sign(ic_1d) == np.sign(ic_5d) and abs(ic_5d) > 0.01:
                print("      - 稳定性: ✓ 良好 (预测方向一致)")
            else:
                print("      - 稳定性: ✗ 不稳定 (预测方向不一致或衰减快)")
    
    # 3. 投资建议
    print("\n3. 投资建议:")
    top_positive = valid_factors[valid_factors['IC_1D'] > 0].head(3)
    top_negative = valid_factors[valid_factors['IC_1D'] < 0].head(3)
    
    if not top_positive.empty:
        print("   做多策略:")
        for factor, row in top_positive.iterrows():
            print(f"   - 选择 {factor} 因子值较高的股票 (IC={row['IC_1D']:.4f})")
    
    if not top_negative.empty:
        print("   做空策略:")
        for factor, row in top_negative.iterrows():
            print(f"   - 选择 {factor} 因子值较低的股票 (IC={row['IC_1D']:.4f})")
    
    # 4. 综合评价
    print("\n4. 综合评价:")
    avg_abs_ic = valid_factors['IC_1D'].abs().mean()
    
    if avg_abs_ic > 0.03:
        print(f"   整体预测能力: 👍 良好 (平均|IC|={avg_abs_ic:.4f})")
    else:
        print(f"   整体预测能力: ⚠️ 一般 (平均|IC|={avg_abs_ic:.4f})")
    
    # 建议使用的因子组合
    print("\n5. 建议使用的因子组合:")
    
    # 选择IC绝对值大于0.02的因子
    good_factors = valid_factors[valid_factors['IC_1D'].abs() > 0.02]
    if not good_factors.empty:
        print(f"   推荐使用以下 {len(good_factors)} 个因子构建多因子模型:")
        for factor, row in good_factors.iterrows():
            direction = "正向" if row['IC_1D'] > 0 else "反向"
            print(f"   - {factor}: {direction} (IC={row['IC_1D']:.4f})")
    else:
        print("   没有找到预测能力足够强的因子，建议尝试其他因子或组合方法")

# 解释IC结果
interpret_ic_results(ic_results)


In [None]:
def create_multi_factor_model(factor_data, forward_returns, ic_results, threshold=0.02):
    """
    创建简单的多因子模型
    """
    print("创建多因子模型...")
    
    # 选择IC绝对值大于阈值的因子
    good_factors = ic_results[ic_results['IC_1D'].abs() > threshold].index.tolist()
    
    if not good_factors:
        print("没有找到足够强的因子，降低阈值重试")
        threshold = 0.01
        good_factors = ic_results[ic_results['IC_1D'].abs() > threshold].index.tolist()
    
    if not good_factors:
        print("没有找到有效的因子，无法创建多因子模型")
        return None
    
    print(f"选择了 {len(good_factors)} 个因子: {good_factors}")
    
    # 确保索引一致
    common_index = factor_data.index.intersection(forward_returns.index)
    selected_factors = factor_data.loc[common_index, good_factors]
    returns = forward_returns.loc[common_index, '1D']
    
    # 创建多因子得分
    scores = pd.DataFrame(index=selected_factors.index)
    
    # 根据IC符号确定因子方向
    for factor in good_factors:
        ic = ic_results.loc[factor, 'IC_1D']
        # 如果IC为正，因子值越大越好；如果IC为负，因子值越小越好
        direction = np.sign(ic)
        scores[factor] = selected_factors[factor] * direction
    
    # 计算综合得分（简单平均）
    scores['total_score'] = scores.mean(axis=1)
    
    # 按日期分组，计算分位数
    def calculate_quantiles(group):
        return pd.qcut(group['total_score'], 5, labels=False) + 1
    
    # 按日期分组计算分位数
    quantiles = scores.groupby(level=0).apply(
        lambda x: pd.Series(calculate_quantiles(x), index=x.index)
    ).droplevel(0)
    
    scores['quantile'] = quantiles
    
    # 合并得分和收益率
    result = pd.DataFrame({
        'score': scores['total_score'],
        'quantile': scores['quantile'],
        'return': returns
    })
    
    # 计算各分位数的平均收益
    quantile_returns = result.groupby('quantile')['return'].mean()
    
    # 计算多空组合收益
    long_short_return = quantile_returns.iloc[-1] - quantile_returns.iloc[0]
    
    print(f"\n各分位数平均收益:")
    for q, ret in quantile_returns.items():
        print(f"分位数 {q}: {ret:.4f}%")
    
    print(f"\n多空组合收益: {long_short_return:.4f}%")
    
    # 绘制分位数收益图
    plt.figure(figsize=(10, 6))
    quantile_returns.plot(kind='bar', color='skyblue')
    plt.title('多因子模型分位数收益')
    plt.xlabel('分位数')
    plt.ylabel('平均收益率 (%)')
    plt.grid(axis='y', linestyle='--', alpha=0.5)
    plt.show()
    
    return result

# 创建多因子模型
multi_factor_result = create_multi_factor_model(factor_data, forward_returns, ic_results)


In [None]:
## 总结

本笔记本演示了如何使用 Excel 文件中的列作为因子进行股票因子分析，主要步骤包括：

1. **数据加载与预处理**：
   - 从 Excel 文件加载数据，使用第一行作为因子名称
   - 设置股票代码和时间为多重索引
   - 处理缺失值和异常值

2. **因子准备**：
   - 直接使用 Excel 文件中的列作为因子
   - 对因子进行标准化处理
   - 计算未来收益率作为预测目标

3. **因子分析**：
   - 计算因子 IC 值（信息系数）
   - 可视化因子 IC 结果
   - 使用 Alphalens 进行详细分析

4. **结果解释**：
   - 评估因子预测能力
   - 分析因子稳定性
   - 提供投资建议

5. **多因子模型**：
   - 选择 IC 值较高的因子构建模型
   - 计算综合因子得分
   - 分析分位数收益表现

### IC 值解释

IC（信息系数）是衡量因子预测能力的关键指标：

- **|IC| > 0.05**：优秀因子，预测能力强
- **0.02 < |IC| < 0.05**：良好因子，有一定预测能力
- **|IC| < 0.02**：一般因子，预测能力较弱

### 后续优化方向

1. **因子处理**：
   - 中性化处理（行业、市值中性）
   - 极值处理（Winsorize）
   - 因子正交化

2. **多因子组合**：
   - IC 加权
   - 风险平价
   - 最大信息比率

3. **回测验证**：
   - 加入交易成本
   - 考虑流动性约束
   - 设计完整交易策略
