In [4]:
import os
import pandas as pd
import numpy as np
import multiprocessing
from datetime import datetime
import traceback
import warnings
warnings.filterWarnings('ignore')


def compute_hurst_series(log_returns):
    """计算赫斯特指数"""
    x = log_returns.values  # 将 Series 转换为 NumPy 数组
    N = len(x)
    if N < 20:
        return np.nan
    n_min = 10
    n_max = N // 2
    n_list = []
    n = n_min
    while n <= n_max:
        n_list.append(n)
        n *= 2
    if len(n_list) < 2:
        return np.nan
    log_n = np.log(n_list)
    log_RS = []
    for n in n_list:
        A = N // n
        if A < 1:
            continue
        x_sub = x[:A*n].reshape(A, n)
        M_a = np.mean(x_sub, axis=1)
        S_a = np.std(x_sub, axis=1, ddof=1)
        S_a = np.where(S_a == 0, 1e-10, S_a)
        dev = x_sub - M_a[:, None]
        X_ai = np.cumsum(dev, axis=1)
        R_a = np.max(X_ai, axis=1) - np.min(X_ai, axis=1)
        RS_a = R_a / S_a
        RS_n = np.mean(RS_a)
        if RS_n > 0:
            log_RS.append(np.log(RS_n))
        else:
            log_RS.append(np.nan)
    log_RS = [x for x in log_RS if not np.isnan(x)]
    if len(log_RS) < 2:
        return 0.5  # 默认值 0.5 表示随机游走
    slope, intercept = np.polyfit(log_n[:len(log_RS)], log_RS, 1)
    return slope

def backtest_instrument(time_frame_params, instrument):
    """回测指数期货策略"""
    try:
        bar_size = time_frame_params['bar_size']
        N = time_frame_params['N']
        # 加载 VIX 数据
        vix_df = pd.read_csv(instrument['vix_path'], parse_dates=['date'])
        vix_df['trade_date'] = (vix_df['date'] + pd.Timedelta(days=1)).dt.date
        vix_df.set_index('date', inplace=True)
        vix_df['vix_mean_bb'] = vix_df['vix'].rolling(window=20).mean()
        vix_df['vix_std_bb'] = vix_df['vix'].rolling(window=20).std()
        vix_df['vix_upper_bb'] = vix_df['vix_mean_bb'] + 2 * vix_df['vix_std_bb']
        vix_df['vix_lower_bb'] = vix_df['vix_mean_bb'] - 2 * vix_df['vix_std_bb']
        vix_df = vix_df.reset_index()
        print(f"{instrument['name']} VIX 数据最小日期: {vix_df['date'].min()}, 最大日期: {vix_df['date'].max()}")
        # 加载期货数据
        df = pd.read_csv(instrument['futures_path'], index_col=0, parse_dates=[0])
        df.index.name = 'timestamp'
        if not isinstance(df.index, pd.DatetimeIndex):
            raise ValueError("索引未解析为 DatetimeIndex。请检查数据格式。")
        if 'close' not in df.columns:
            raise ValueError("未找到 'close' 列。")
        print(f"商品: {instrument['name']}, 索引最小日期: {df.index.min()}, 最大日期: {df.index.max()}, 列: {list(df.columns)}")
        # 转换 close 为数值型
        df['close'] = pd.to_numeric(df['close'], errors='coerce')
        # 删除 close 列为空的行
        df = df.dropna(subset=['close'])
        # 删除索引中的 NaT
        df = df[df.index.notna()]
        # 检查非正 close 值
        if (df['close'] <= 0).any():
            print(f"发现非正 close 值在 {instrument['futures_path']}")
            df = df[df['close'] > 0]
        # 过滤日期范围
        start_date = pd.to_datetime('2019-12-23')
        end_date = pd.to_datetime('2025-04-23')
        df = df[(df.index >= start_date) & (df.index <= end_date)]
        print(f"过滤后，最小日期: {df.index.min()}, 最大日期: {df.index.max()}, 行数: {len(df)}")
        if df.empty:
            return {
                'time_frame': time_frame_params['name'],
                'instrument': instrument['name'],
                'cumulative_return': np.nan,
                'annualized_return': np.nan,
                'sharpe_ratio': np.nan,
                'calmar_ratio': np.nan,
                'max_drawdown': np.nan,
                'total_fees': np.nan,
                'num_trades': 0,
                'num_h_not_nan': 0,
                'num_vix_not_nan': 0,
                'count_h_less_05': 0,
                'count_open_short_price': 0,
                'count_open_long_price': 0,
                'count_close_short_price': 0,
                'count_close_long_price': 0,
                'count_open_long_vix': 0,
                'count_open_short_vix': 0,
                'count_close_long_vix': 0,
                'count_close_short_vix': 0,
                'error': '过滤后数据为空'
            }
        # 按 bar_size 重采样
        if bar_size != '1min':
            df = df.resample(bar_size).agg({'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum'})
            df = df.ffill()  # 填充 NaN 值
            if df.empty:
                return {
                    'time_frame': time_frame_params['name'],
                    'instrument': instrument['name'],
                    'cumulative_return': np.nan,
                    'annualized_return': np.nan,
                    'sharpe_ratio': np.nan,
                    'calmar_ratio': np.nan,
                    'max_drawdown': np.nan,
                    'total_fees': np.nan,
                    'num_trades': 0,
                    'num_h_not_nan': 0,
                    'num_vix_not_nan': 0,
                    'count_h_less_05': 0,
                    'count_open_short_price': 0,
                    'count_open_long_price': 0,
                    'count_close_short_price': 0,
                    'count_close_long_price': 0,
                    'count_open_long_vix': 0,
                    'count_open_short_vix': 0,
                    'count_close_long_vix': 0,
                    'count_close_short_vix': 0,
                    'error': '重采样后数据为空'
                }
        else:
            df = df[['open', 'high', 'low', 'close', 'volume']]
        # 添加日期列
        df['date'] = df.index.date
        # 合并 VIX 数据
        df = pd.merge(df.reset_index(), vix_df[['trade_date', 'vix', 'vix_mean_bb', 'vix_upper_bb', 'vix_lower_bb']],
                      left_on='date', right_on='trade_date', how='left')
        df.set_index('timestamp', inplace=True)
        df.drop('trade_date', axis=1, inplace=True)
        print(f"VIX 合并后，非空 VIX 行数: {df['vix'].notna().sum()}")
        if df['vix'].isna().all():
            print(f"警告: 商品 {os.path.basename(instrument['futures_path'])} 无匹配 VIX 数据")
        # 计算对数收益率
        df['log_return'] = np.log(df['close'] / df['close'].shift(1))
        # 计算赫斯特指数
        df['H'] = df['log_return'].rolling(N).apply(compute_hurst_series).shift(1)
        # 计算价格布林带
        df['price_mean_bb'] = df['close'].rolling(N).mean().shift(1)
        df['price_std_bb'] = df['close'].rolling(N).std().shift(1)
        df['price_upper_bb'] = df['price_mean_bb'] + 2 * df['price_std_bb']
        df['price_lower_bb'] = df['price_mean_bb'] - 2 * df['price_std_bb']
        # 初始化持仓
        df['position'] = 0
        # 诊断计数器
        numවිත්තුන්මිත්තුන්මිත්තුන්මිත්තුන්මිත්තුන්මිත්තුන්මිත්තුන්මිත්තුන්මිත්තුන්මිත්තුන්මිත්තුන්මිත්තුන්මිත්තුන්මිත්තුන්මිත්තුන්මිත්තුන්මිත්
        num_h_not_nan = df['H'].notna().sum()
        num_vix_not_nan = df['vix'].notna().sum()
        count_h_less_05 = 0
        count_open_short_price = 0
        count_open_long_price = 0
        count_close_short_price = 0
        count_close_long_price = 0
        count_open_long_vix = 0
        count_open_short_vix = 0
        count_close_long_vix = 0
        count_close_short_vix = 0
        # 交易逻辑
        for t in range(max(N, 20), len(df)):
            if not np.isnan(df['H'].iloc[t-1]):
                if df['H'].iloc[t-1] < 0.5:
                    count_h_less_05 += 1
                    if df['close'].iloc[t] > df['price_upper_bb'].iloc[t] and df['position'].iloc[t-1] == 0:
                        df['position'].iloc[t] = -1
                        count_open_short_price += 1
                    elif df['close'].iloc[t] < df['price_lower_bb'].iloc[t] and df['position'].iloc[t-1] == 0:
                        df['position'].iloc[t] = 1
                        count_open_long_price += 1
                    elif df['position'].iloc[t-1] == -1 and df['close'].iloc[t] < df['price_mean_bb'].iloc[t]:
                        df['position'].iloc[t] = 0
                        count_close_short_price += 1
                    elif df['position'].iloc[t-1] == 1 and df['close'].iloc[t] > df['price_mean_bb'].iloc[t]:
                        df['position'].iloc[t] = 0
                        count_close_long_price += 1
                    else:
                        df['position'].iloc[t] = df['position'].iloc[t-1]
                else:
                    if not np.isnan(df['vix'].iloc[t]):
                        if df['vix'].iloc[t] > df['vix_upper_bb'].iloc[t] and df['position'].iloc[t-1] == 0:
                            df['position'].iloc[t] = 1
                            count_open_long_vix += 1
                        elif df['vix'].iloc[t] < df['vix_lower_bb'].iloc[t] and df['position'].iloc[t-1] == 0:
                            df['position'].iloc[t] = -1
                            count_open_short_vix += 1
                        elif df['position'].iloc[t-1] == 1 and df['vix'].iloc[t] < df['vix_mean_bb'].iloc[t]:
                            df['position'].iloc[t] = 0
                            count_close_long_vix += 1
                        elif df['position'].iloc[t-1] == -1 and df['vix'].iloc[t] > df['vix_mean_bb'].iloc[t]:
                            df['position'].iloc[t] = 0
                            count_close_short_vix += 1
                        else:
                            df['position'].iloc[t] = df['position'].iloc[t-1]
                    else:
                        df['position'].iloc[t] = df['position'].iloc[t-1]
        # 计算收益率
        df['price_change'] = df['close'].pct_change()
        df['strategy_return'] = df['position'].shift(1) * df['price_change']
        # 计算交易和费用
        df['trade'] = (df['position'] != df['position'].shift(1)).astype(int)
        df['fee'] = df['trade'] * 0.0001
        df['net_strategy_return'] = df['strategy_return'] - df['fee']
        # 处理 net_strategy_return 中的 NaN
        df['net_strategy_return'] = df['net_strategy_return'].fillna(0)
        # 删除 date 列中的 NaN
        df = df.dropna(subset=['date'])
        # 计算日净收益率
        daily_net_return = df.groupby(df['date'])['net_strategy_return'].sum()
        # 计算累计收益率序列用于绘图
        cumulative_return_series = (1 + daily_net_return).cumprod()
        # 绘制累计收益率折线图
        plt.figure(figsize=(10, 6))
        cumulative_return_series.plot()
        plt.title(f"Cumulative Returns - {time_frame_params['name']} - {instrument['name']}")
        plt.xlabel("Date")
        plt.ylabel("Cumulative Return")
        plt.savefig(f"returns_{time_frame_params['name']}_{instrument['name']}.png")
        plt.close()
        # 性能指标
        cumulative_return = (1 + daily_net_return).prod() - 1
        num_trading_days = len(daily_net_return)
        annualized_return = (1 + cumulative_return) ** (252 / num_trading_days) - 1 if num_trading_days > 0 else np.nan
        sharpe_ratio = daily_net_return.mean() / daily_net_return.std() * np.sqrt(252) if daily_net_return.std() > 0 else np.nan
        # 计算最大回撤
        cumulative_factor = (1 + df['net_strategy_return']).cumprod()
        running_max = cumulative_factor.cummax()
        drawdown = running_max - cumulative_factor
        max_drawdown = drawdown.max() if not drawdown.empty else np.nan
        calmar_ratio = annualized_return / max_drawdown if max_drawdown > 0 else np.nan
        total_fees = df['fee'].sum()
        num_trades = df['trade'].sum()
        # 返回结果
        return {
            'time_frame': time_frame_params['name'],
            'instrument': instrument['name'],
            'cumulative_return': cumulative_return,
            'annualized_return': annualized_return,
            'sharpe_ratio': sharpe_ratio,
            'calmar_ratio': calmar_ratio,
            'max_drawdown': max_drawdown,
            'total_fees': total_fees,
            'num_trades': num_trades,
            'num_h_not_nan': num_h_not_nan,
            'num_vix_not_nan': num_vix_not_nan,
            'count_h_less_05': count_h_less_05,
            'count_open_short_price': count_open_short_price,
            'count_open_long_price': count_open_long_price,
            'count_close_short_price': count_close_short_price,
            'count_close_long_price': count_close_long_price,
            'count_open_long_vix': count_open_long_vix,
            'count_open_short_vix': count_open_short_vix,
            'count_close_long_vix': count_close_long_vix,
            'count_close_short_vix': count_close_short_vix,
        }
    except Exception as e:
        commodity = os.path.basename(instrument['futures_path']).split('.')[0]
        error_msg = traceback.format_exc()
        return {
            'time_frame': time_frame_params['name'],
            'instrument': instrument['name'],
            'cumulative_return': np.nan,
            'annualized_return': np.nan,
            'sharpe_ratio': np.nan,
            'calmar_ratio': np.nan,
            'max_drawdown': np.nan,
            'total_fees': np.nan,
            'num_trades': 0,
            'num_h_not_nan': 0,
            'num_vix_not_nan': 0,
            'count_h_less_05': 0,
            'count_open_short_price': 0,
            'count_open_long_price': 0,
            'count_close_short_price': 0,
            'count_close_long_price': 0,
            'count_open_long_vix': 0,
            'count_open_short_vix': 0,
            'count_close_long_vix': 0,
            'count_close_short_vix': 0,
            'error': error_msg
        }

if __name__ == '__main__':
    instruments = [
        {
            'name': 'IF',
            'futures_path': '/home/u2024210271/jupyterlab/赫斯特指数/IMIF/沪深.csv',
            'vix_path': '/home/u2024210271/jupyterlab/赫斯特指数/沪深300.csv'
        },
        {
            'name': 'IM W',
            'futures_path': '/home/u2024210271/jupyterlab/赫斯特指数/IMIF/中证.csv',
            'vix_path': '/home/u2024210271/jupyterlab/赫斯特指数/中证1000.csv'
        }
    ]
    time_frame_params_list = [
        {'name': 'ultra_short', 'bar_size': '1min', 'N': 120},
        {'name': 'short', 'bar_size': '5min', 'N': 120},
        {'name': 'medium', 'bar_size': '30min', 'N': 100},
        {'name': 'long', 'bar_size': '30min', 'N': 200},
    ]
    tasks = [(time_frame_params, instrument) for time_frame_params in time_frame_params_list for instrument in instruments]
    with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool:
        results = pool.starmap(backtest_instrument, tasks)
    # 保存结果到 CSV
    results_df = pd.DataFrame(results)
    results_df.to_csv('backtest_results_IF_IM.csv', index=False)
    # 打印结果
    for _, row in results_df.iterrows():
        print(f"{row['time_frame']} - {row['instrument']}: ", end='')
        if 'error' in row and pd.notna(row['error']):
            print(f"error=\n{row['error']}")
        else:
            print(f"cumulative_return={row['cumulative_return']:.4f}, annualized_return={row['annualized_return']:.4f}, sharpe_ratio={row['sharpe_ratio']:.2f}, calmar_ratio={row['calmar_ratio']:.2f}, max_drawdown={row['max_drawdown']:.4f}, total_fees={row['total_fees']:.4f}, num_trades={row['num_trades']}, num_h_not_nan={row['num_h_not_nan']}, num_vix_not_nan={row['num_vix_not_nan']}, count_h_less_05={row['count_h_less_05']}, count_open_short_price={row['count_open_short_price']}, count_open_long_price={row['count_open_long_price']}, count_close_short_price={row['count_close_short_price']}, count_close_long_price={row['count_close_long_price']}, count_open_long_vix={row['count_open_long_vix']}, count_open_short_vix={row['count_open_short_vix']}, count_close_long_vix={row['count_close_long_vix']}, count_close_short_vix={row['count_close_short_vix']}")

IM VIX 数据最小日期: 2022-07-22 00:00:00, 最大日期: 2025-04-24 00:00:00IF VIX 数据最小日期: 2019-12-23 00:00:00, 最大日期: 2025-04-22 00:00:00IF VIX 数据最小日期: 2019-12-23 00:00:00, 最大日期: 2025-04-22 00:00:00IM VIX 数据最小日期: 2022-07-22 00:00:00, 最大日期: 2025-04-24 00:00:00IM VIX 数据最小日期: 2022-07-22 00:00:00, 最大日期: 2025-04-24 00:00:00IM VIX 数据最小日期: 2022-07-22 00:00:00, 最大日期: 2025-04-24 00:00:00IF VIX 数据最小日期: 2019-12-23 00:00:00, 最大日期: 2025-04-22 00:00:00IF VIX 数据最小日期: 2019-12-23 00:00:00, 最大日期: 2025-04-22 00:00:00







商品: IM, 索引最小日期: 2022-07-25 09:31:00, 最大日期: 2025-04-22 15:00:00, 列: ['open', 'high', 'low', 'close', 'volume']商品: IM, 索引最小日期: 2022-07-25 09:31:00, 最大日期: 2025-04-22 15:00:00, 列: ['open', 'high', 'low', 'close', 'volume']商品: IM, 索引最小日期: 2022-07-25 09:31:00, 最大日期: 2025-04-22 15:00:00, 列: ['open', 'high', 'low', 'close', 'volume']商品: IM, 索引最小日期: 2022-07-25 09:31:00, 最大日期: 2025-04-22 15:00:00, 列: ['open', 'high', 'low', 'close', 'volume']



过滤后，最小日期: 2022-07-25 09:31:00, 最大日期: 2025-04-22 15:00:00, 行数: 15

In [None]:
import os
import pandas as pd
import numpy as np
import multiprocessing
from datetime import datetime, time
import traceback
import warnings
warnings.filterwarnings('ignore')
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt

def compute_hurst_series(log_returns):
    """计算赫斯特指数"""
    if np.any(np.isnan(log_returns)):
        return np.nan
    x = log_returns.values
    N = len(x)
    if N < 20:
        return np.nan
    n_min = 4
    n_max = N // 2
    n_list = []
    n = n_min
    while n <= n_max:
        n_list.append(n)
        n *= 2
    if len(n_list) < 2:
        return 0.5
    valid_log_n = []
    valid_log_RS = []
    for n in n_list:
        A = N // n
        if A < 2:
            continue
        x_sub = x[:A*n].reshape(A, n)
        M_a = np.mean(x_sub, axis=1)
        S_a = np.std(x_sub, axis=1, ddof=1)
        S_a = np.where(S_a == 0, 1e-10, S_a)
        dev = x_sub - M_a[:, None]
        X_ai = np.cumsum(dev, axis=1)
        R_a = np.max(X_ai, axis=1) - np.min(X_ai, axis=1)
        RS_a = R_a / S_a
        RS_n = np.mean(RS_a)
        if RS_n > 0:
            valid_log_n.append(np.log(n))
            valid_log_RS.append(np.log(RS_n))
    if len(valid_log_RS) < 2:
        return 0.5
    slope, intercept = np.polyfit(valid_log_n, valid_log_RS, 1)
    return slope

def calculate_signals(df, N, lower_threshold, upper_threshold):
    """计算交易信号"""
    df['signal'] = 0
    signal_type = None
    for t in range(max(N,20), len(df)):
        if not pd.isna(df['H'].iloc[t]):
            H = df['H'].iloc[t]
            if signal_type is None:
                if H < 0.5:
                    signal_type = 'junxian'
                else:
                    signal_type = 'bulindai'
            else:
                if signal_type == 'junxian':
                    if H > upper_threshold:
                        signal_type = 'bulindai'
                    else:
                        signal_type = 'junxian'
                elif signal_type == 'bulindai':
                    if H < lower_threshold:
                        signal_type = 'junxian'
                    else:
                        signal_type = 'bulindai'
            # 根据信号类型生成数值信号
            if signal_type == 'junxian':
                if (not pd.isna(df['vix'].iloc[t])) and (not pd.isna(df['vix'].iloc[t-1])) and \
                   (not pd.isna(df['vix_mean_bb'].iloc[t])) and (not pd.isna(df['vix_mean_bb'].iloc[t-1])):
                    if (df['vix'].iloc[t] > df['vix_mean_bb'].iloc[t]) and (df['vix'].iloc[t-1] <= df['vix_mean_bb'].iloc[t-1]):
                        df['signal'].iloc[t] = -1  # 看空
                    elif (df['vix'].iloc[t] < df['vix_mean_bb'].iloc[t]) and (df['vix'].iloc[t-1] >= df['vix_mean_bb'].iloc[t-1]):
                        df['signal'].iloc[t] = 1  # 看多
            elif signal_type == 'bulindai':
                if not pd.isna(df['vix'].iloc[t]):
                    if df['vix'].iloc[t] > df['vix_upper_bb'].iloc[t]:
                        df['signal'].iloc[t] = 1  # 看多
                    elif df['vix'].iloc[t] < df['vix_lower_bb'].iloc[t]:
                        df['signal'].iloc[t] = -1  # 看空
    return df

def manage_positions(df, hold_days):
    """管理持仓和交易动作"""
    # 获取唯一交易日
    trading_days = sorted(df['date'].unique())
    # 映射日期到交易日索引
    trading_day_map = {date: idx for idx, date in enumerate(trading_days)}
    df['trading_day_index'] = df['date'].map(trading_day_map)
    
    positions = [0] * len(df)
    actions = [''] * len(df)
    current_position = 0
    planned_close_trading_day = None
    
    for t in range(len(df) - 1):
        today_trading_day = df['trading_day_index'][t]
        next_trading_day = df['trading_day_index'][t+1]
        signal_today = df['signal'][t]
        
        # 默认保持当前持仓
        positions[t+1] = current_position
        
        # 检查是否因持仓周期结束而平仓
        if current_position != 0 and planned_close_trading_day is not None:
            if next_trading_day >= planned_close_trading_day:
                actions[t+1] = 'close_long' if current_position > 0 else 'close_short'
                positions[t+1] = 0
                current_position = 0
                planned_close_trading_day = None
        
        # 检查信号以决定是否开仓或调整持仓
        if signal_today != 0:
            if current_position == 0 or np.sign(signal_today) != np.sign(current_position):
                # 关闭现有持仓（如果有）
                if current_position != 0:
                    actions[t+1] = 'close_long' if current_position > 0 else 'close_short'
                    positions[t+1] = 0
                # 开新仓
                positions[t+1] = signal_today
                actions[t+1] = 'open_long' if signal_today > 0 else 'open_short'
                current_position = signal_today
                planned_close_trading_day = next_trading_day + hold_days
            elif np.sign(signal_today) == np.sign(current_position):
                # 连续相同信号，重置持仓周期
                planned_close_trading_day = next_trading_day + hold_days
    
    # 最后一天，关闭任何未平仓位
    if current_position != 0:
        actions[-1] = 'close_long' if current_position > 0 else 'close_short'
        positions[-1] = 0
    
    df['position'] = positions
    df['action'] = actions
    return df

def backtest_instrument(time_frame_params, instrument):
    """回测指数期货策略，仅在交易时间内执行"""
    try:
        bar_size = time_frame_params['bar_size']
        N = time_frame_params['N']
        hold_days = time_frame_params.get('hold_days', 3)  # 默认持有3天
        lower_threshold = 0.45
        upper_threshold = 0.55
        # 加载 VIX 数据
        vix_df = pd.read_csv(instrument['vix_path'], parse_dates=['date'])
        print(f"vix_df columns: {vix_df.columns}")
        if bar_size == '1D':
            vix_df['trade_date'] = vix_df['date'].dt.date
        else:
            vix_df['trade_date'] = (vix_df['date'] + pd.Timedelta(days=1)).dt.date
        vix_df.set_index('date', inplace=True)
        vix_df['vix_mean_bb'] = vix_df['vix'].rolling(window=20).mean()
        vix_df['vix_std_bb'] = vix_df['vix'].rolling(window=20).std()
        vix_df['vix_upper_bb'] = vix_df['vix_mean_bb'] + 2 * vix_df['vix_std_bb']
        vix_df['vix_lower_bb'] = vix_df['vix_mean_bb'] - 2 * vix_df['vix_std_bb']
        vix_df = vix_df.reset_index()
        print(f"{instrument['name']} VIX 数据最小日期: {vix_df['date'].min()}, 最大日期: {vix_df['date'].max()}")
        # 加载期货数据
        df = pd.read_csv(instrument['futures_path'], index_col=0, parse_dates=[0])
        df.index.name = 'timestamp'
        if not isinstance(df.index, pd.DatetimeIndex):
            raise ValueError("索引未解析为 DatetimeIndex。")
        if 'close' not in df.columns:
            raise ValueError("未找到 'close' 列。")
        print(f"商品: {instrument['name']}, 索引最小日期: {df.index.min()}, 最大日期: {df.index.max()}, 列: {list(df.columns)}")
        # 转换 close 为数值型
        df['close'] = pd.to_numeric(df['close'], errors='coerce')
        df = df.dropna(subset=['close'])
        df = df[df.index.notna()]
        if (df['close'] <= 0).any():
            print(f"发现非正 close 值在 {instrument['futures_path']}")
            df = df[df['close'] > 0]
        # 过滤日期范围
        start_date = pd.to_datetime('2019-12-23')
        end_date = pd.to_datetime('2025-04-23')
        df = df[(df.index >= start_date) & (df.index <= end_date)]
        print(f"过滤日期后，行数: {len(df)}")
        # 过滤交易时间：周一至周五，9:31-11:30 和 13:01-15:00
        trading_days = df.index.weekday < 5
        morning_session = (df.index.time >= time(9, 31)) & (df.index.time <= time(11, 30))
        afternoon_session = (df.index.time >= time(13, 1)) & (df.index.time <= time(15, 0))
        trading_hours = morning_session | afternoon_session
        df = df[trading_days & trading_hours]
        print(f"过滤交易时间后，行数: {len(df)}, close 中 NaN 数量: {df['close'].isna().sum()}")
        if df.empty:
            return {
                'time_frame': time_frame_params['name'],
                'instrument': instrument['name'],
                'cumulative_return': np.nan,
                'annualized_return': np.nan,
                'sharpe_ratio': np.nan,
                'calmar_ratio': np.nan,
                'max_drawdown': np.nan,
                'error': '过滤交易时间后数据为空'
            }
        # 按 bar_size 重采样
        if bar_size != '1min':
            agg_dict = {'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum'}
            if bar_size == '1D':
                df = df.resample('D').agg(agg_dict)
            else:
                df = df.resample(bar_size).agg(agg_dict)
            df = df.dropna(subset=['close'])
            df = df.ffill()
            print(f"重采样到 {bar_size} 后，行数: {len(df)}, close 中 NaN 数量: {df['close'].isna().sum()}")
        else:
            df = df[['open', 'high', 'low', 'close', 'volume']]
            df = df.ffill()
            print(f"选择 1min 数据后，行数: {len(df)}, close 中 NaN 数量: {df['close'].isna().sum()}")
        # 对于非'1D'，再次应用交易时间过滤
        if bar_size != '1D':
            trading_days = df.index.weekday < 5
            morning_session = (df.index.time >= time(9, 31)) & (df.index.time <= time(11, 30))
            afternoon_session = (df.index.time >= time(13, 1)) & (df.index.time <= time(15, 0))
            trading_hours = morning_session | afternoon_session
            df = df[trading_days & trading_hours]
            print(f"重采样后再次过滤交易时间，行数: {len(df)}, close 中 NaN 数量: {df['close'].isna().sum()}")
        else:
            print(f"bar_size is '1D', no need to filter trading hours")
        if df.empty:
            return {
                'time_frame': time_frame_params['name'],
                'instrument': instrument['name'],
                'cumulative_return': np.nan,
                'annualized_return': np.nan,
                'sharpe_ratio': np.nan,
                'calmar_ratio': np.nan,
                'max_drawdown': np.nan,
                'error': '重采样后数据为空'
            }
        # 添加日期列并合并 VIX 数据
        df['date'] = df.index.date
        vix_df['trade_date'] = pd.to_datetime(vix_df['trade_date']).dt.date
        df = pd.merge(df.reset_index(), vix_df[['trade_date', 'vix', 'vix_mean_bb', 'vix_upper_bb', 'vix_lower_bb']],
                      left_on='date', right_on='trade_date', how='left')
        df.set_index('timestamp', inplace=True)
        df.drop('trade_date', axis=1, inplace=True)
        print(f"VIX 合并后，非空 VIX 行数: {df['vix'].notna().sum()}")
        if df['vix'].isna().all():
            print(f"警告: 商品 {os.path.basename(instrument['futures_path'])} 无匹配 VIX 数据")
        # 对于 '1D'，过滤仅包含交易日
        if bar_size == '1D':
            df = df[df['vix'].notna()]
            print(f"'1D' 过滤交易日后，行数: {len(df)}")
        # 计算对数收益率
        df['log_return'] = np.log(df['close'] / df['close'].shift(1))
        print(f"log_return 中 NaN 数量: {df['log_return'].isna().sum()}")
        # 计算赫斯特指数
        if bar_size == '1D':
            df['H'] = df['log_return'].rolling(N).apply(compute_hurst_series)
        else:
            df['H'] = df['log_return'].rolling(N).apply(compute_hurst_series).shift(1)
        print(f"赫斯特指数非空数量: {df['H'].notna().sum()}")
        # 计算信号
        df = calculate_signals(df, N, lower_threshold, upper_threshold)
        # 管理持仓
        df = manage_positions(df, hold_days)
        # 计算收益率和绩效指标
        if bar_size == '1D':
            df['bar_return'] = (df['open'].shift(-1) - df['open']) / df['open']
        else:
            df['bar_return'] = (df['close'] - df['open']) / df['open']
        df['strategy_return'] = df['position'] * df['bar_return']
        df['trade'] = (df['position'] != df['position'].shift(1)).astype(int)
        df['fee'] = df['trade'] * 0.0001
        df['net_strategy_return'] = df['strategy_return'] - df['fee']
        df['net_strategy_return'] = df['net_strategy_return'].fillna(0)
        # 保存结果
        df.to_csv(f"backtest_details_{time_frame_params['name']}_{instrument['name']}.csv", index=True)
        if bar_size == '1D':
            cumulative_return_series = (1 + df['net_strategy_return']).cumprod()
            plt.figure(figsize=(10, 6))
            cumulative_return_series.plot()
            plt.title(f"Cumulative Returns - {time_frame_params['name']} - {instrument['name']}")
            plt.xlabel("Date")
            plt.ylabel("Cumulative Return")
            plt.savefig(f"returns_{time_frame_params['name']}_{instrument['name']}.png")
            plt.close()
            cumulative_return = (1 + df['net_strategy_return']).prod() - 1
            num_trading_days = len(df)
            annualized_return = (1 + cumulative_return) ** (252 / num_trading_days) - 1 if num_trading_days > 0 else np.nan
            sharpe_ratio = (df['net_strategy_return'].mean() - 0.025 / 252) / df['net_strategy_return'].std() * np.sqrt(252) if df['net_strategy_return'].std() > 0 else np.nan
            max_drawdown = ((1 + df['net_strategy_return']).cumprod().cummax() - (1 + df['net_strategy_return']).cumprod()).max()
            calmar_ratio = annualized_return / max_drawdown if max_drawdown > 0 else np.nan
        else:
            daily_net_return = df.groupby(df['date'])['net_strategy_return'].sum()
            cumulative_return_series = (1 + daily_net_return).cumprod()
            plt.figure(figsize=(10, 6))
            cumulative_return_series.plot()
            plt.title(f"Cumulative Returns - {time_frame_params['name']} - {instrument['name']}")
            plt.xlabel("Date")
            plt.ylabel("Cumulative Return")
            plt.savefig(f"returns_{time_frame_params['name']}_{instrument['name']}.png")
            plt.close()
            cumulative_return = (1 + daily_net_return).prod() - 1
            num_trading_days = len(daily_net_return)
            annualized_return = (1 + cumulative_return) ** (252 / num_trading_days) - 1 if num_trading_days > 0 else np.nan
            sharpe_ratio = (daily_net_return.mean() - 0.025 / 252) / daily_net_return.std() * np.sqrt(252) if daily_net_return.std() > 0 else np.nan
            max_drawdown = ((1 + daily_net_return).cumprod().cummax() - (1 + daily_net_return).cumprod()).max()
            calmar_ratio = annualized_return / max_drawdown if max_drawdown > 0 else np.nan
        return {
            'time_frame': time_frame_params['name'],
            'instrument': instrument['name'],
            'cumulative_return': cumulative_return,
            'annualized_return': annualized_return,
            'sharpe_ratio': sharpe_ratio,
            'calmar_ratio': calmar_ratio,
            'max_drawdown': max_drawdown
        }
    except Exception as e:
        error_msg = traceback.format_exc()
        return {
            'time_frame': time_frame_params['name'],
            'instrument': instrument['name'],
            'cumulative_return': np.nan,
            'annualized_return': np.nan,
            'sharpe_ratio': np.nan,
            'calmar_ratio': np.nan,
            'max_drawdown': np.nan,
            'error': error_msg
        }

if __name__ == '__main__':
    instruments = [
        {
            'name': 'IF',
            'futures_path': '/home/u2024210271/jupyterlab/赫斯特指数/IMIF/沪深.csv',
            'vix_path': '/home/u2024210271/jupyterlab/赫斯特指数/沪深300.csv'
        },
        {
            'name': 'IM',
            'futures_path': '/home/u2024210271/jupyterlab/赫斯特指数/IMIF/中证.csv',
            'vix_path': '/home/u2024210271/jupyterlab/赫斯特指数/中证1000.csv'
        }
    ]
    time_frame_params_list = [
        {'name': 'ultra_short', 'bar_size': '1min', 'N': 120, 'hold_days': 3},
        {'name': 'short', 'bar_size': '5min', 'N': 120, 'hold_days': 3},
        {'name': 'medium', 'bar_size': '30min', 'N': 100, 'hold_days': 3},
        {'name': 'long', 'bar_size': '30min', 'N': 200, 'hold_days': 3},
        {'name': 'daily', 'bar_size': '1D', 'N': 20, 'hold_days': 3},
    ]
    tasks = [(time_frame_params, instrument) for time_frame_params in time_frame_params_list for instrument in instruments]
    with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool:
        results = pool.starmap(backtest_instrument, tasks)
    results_df = pd.DataFrame(results)
    results_df.to_csv('backtest_results_IF_IM.csv', index=False)
    for _, row in results_df.iterrows():
        print(f"{row['time_frame']} - {row['instrument']}: ", end='')
        if 'error' in row and pd.notna(row['error']):
            print(f"error=\n{row['error']}")
        else:
            print(f"cumulative_return={row['cumulative_return']:.4f}, annualized_return={row['annualized_return']:.4f}, sharpe_ratio={row['sharpe_ratio']:.2f}")

vix_df columns: Index(['date', 'vix'], dtype='object')vix_df columns: Index(['date', 'vix'], dtype='object')vix_df columns: Index(['date', 'vix'], dtype='object')vix_df columns: Index(['date', 'vix'], dtype='object')vix_df columns: Index(['date', 'vix'], dtype='object')vix_df columns: Index(['date', 'vix'], dtype='object')vix_df columns: Index(['date', 'vix'], dtype='object')vix_df columns: Index(['date', 'vix'], dtype='object')vix_df columns: Index(['date', 'vix'], dtype='object')vix_df columns: Index(['date', 'vix'], dtype='object')









IM VIX 数据最小日期: 2022-07-22 00:00:00, 最大日期: 2025-04-24 00:00:00
IM VIX 数据最小日期: 2022-07-22 00:00:00, 最大日期: 2025-04-24 00:00:00IF VIX 数据最小日期: 2019-12-23 00:00:00, 最大日期: 2025-04-22 00:00:00

IF VIX 数据最小日期: 2019-12-23 00:00:00, 最大日期: 2025-04-22 00:00:00
IF VIX 数据最小日期: 2019-12-23 00:00:00, 最大日期: 2025-04-22 00:00:00IM VIX 数据最小日期: 2022-07-22 00:00:00, 最大日期: 2025-04-24 00:00:00IF VIX 数据最小日期: 2019-12-23 00:00:00, 最大日期: 2025-04-22 00:00:00
IF VIX 数据最小日期: 201