In [9]:
"""
该策略的整体逻辑如下:

1 数据获取与合并:
   - 通过Binance API获取过去一年的BTCUSDT永续合约1分钟K线(OHLCV+taker主动买入量).
   - 获取资金费率(8小时更新).
   - 将资金费率以前向填充方式对齐到每分钟级别,并与K线合并.

2 数据预处理与特征工程:
   - 对原始价格、成交量、taker买入量等进行补缺、归一化等操作.
   - 新增一些滚动指标,如对数收益、短期波动率等,来辅助机器学习/强化学习模型.

3 策略环境与强化学习:
   - 搭建一个自定义Gymnasium环境,每个step对应1分钟.
   - 动作可以是做多、做空、平仓,或基于连续空间指定仓位大小.
   - 在环境中集成资金费率的影响(如持有多头需付资金费等).
   - 使用stable-baselines3(PPO/A2C/SAC等)训练策略,并观察回测收益.

4 额外展示TensorFlow/PyTorch/scikit-learn的使用:
   - 用scikit-learn做特征缩放、降维.
   - 用TensorFlow写个小预测模型(可预测下个周期价格或收益).
   - 用PyTorch作为stable-baselines3的后端(自动满足了pytorch需求).

"""

'\n该策略的整体逻辑如下:\n\n1 数据获取与合并:\n   - 通过Binance API获取过去一年的BTCUSDT永续合约1分钟K线(OHLCV+taker主动买入量).\n   - 获取资金费率(8小时更新).\n   - 将资金费率以前向填充方式对齐到每分钟级别,并与K线合并.\n\n2 数据预处理与特征工程:\n   - 对原始价格、成交量、taker买入量等进行补缺、归一化等操作.\n   - 新增一些滚动指标,如对数收益、短期波动率等,来辅助机器学习/强化学习模型.\n\n3 策略环境与强化学习:\n   - 搭建一个自定义Gymnasium环境,每个step对应1分钟.\n   - 动作可以是做多、做空、平仓,或基于连续空间指定仓位大小.\n   - 在环境中集成资金费率的影响(如持有多头需付资金费等).\n   - 使用stable-baselines3(PPO/A2C/SAC等)训练策略,并观察回测收益.\n\n4 额外展示TensorFlow/PyTorch/scikit-learn的使用:\n   - 用scikit-learn做特征缩放、降维.\n   - 用TensorFlow写个小预测模型(可预测下个周期价格或收益).\n   - 用PyTorch作为stable-baselines3的后端(自动满足了pytorch需求).\n\n'

In [None]:
import pandas as pd
import numpy as np
import datetime,time
import requests
import time
from binance.client import Client
from binance.exceptions import BinanceAPIException
import talib

# 机器学习 & Pipeline
from sklearn.model_selection import TimeSeriesSplit, cross_val_score, train_test_split
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.metrics import roc_auc_score
from sklearn.pipeline import Pipeline
from sklearn.ensemble import StackingClassifier, StackingRegressor
from sklearn.linear_model import LogisticRegression

# 用于可选的Boosting
import xgboost as xgb
import lightgbm as lgb
import catboost

# 可视化
import matplotlib.pyplot as plt
import seaborn as sns

# 强化学习相关
import gymnasium
from gymnasium import spaces
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv

import lightgbm as lgb
import optuna
import joblib

# 设定要抓取的数据区间
start_dt = datetime.datetime(2024,3,1)
end_dt = datetime.datetime(2025,3,1)
sym = 'BTCUSDT'

In [11]:
"""
================== 第一部分: 数据获取与合并 ==================
将得到一个df,行索引是分钟级datetime,列包含:
 - open, high, low, close;volume(买卖方二者主动的合集)
 - taker_buy_base_volume (买方主动吃单的交易量;只统计买方发起的,且吃掉卖单的部分)
 - fundingRate资金费率 (前向填充到每分钟)
"""

def fetch_1m_klines_futures(symbol: str, start_dt: datetime.datetime, end_dt: datetime.datetime) -> pd.DataFrame:
    """
    通过fapi(futures api)/binance接口循环分页获取1分钟K线数据(含taker买入量),
    并拼接成一个DataFrame后返回.
    """
    # 用于存放所有K线行
    all_klines = []
    # 将datetime转为毫秒,并转换成api接受的int时间戳
    fetch_start = int(start_dt.timestamp() * 1000)
    final_end = int(end_dt.timestamp() * 1000)

    while fetch_start < final_end:
        # 请求URL和参数
        url = "https://fapi.binance.com/fapi/v1/klines"
        params = {
            "symbol": symbol,
            "interval": "1m",
            "startTime": fetch_start,
            "limit": 1500
        }
        
        # 发出请求并解析得到的json数据
        r = requests.get(url, params=params)
        data = r.json() # data:List
        if not data: # 直到没有更多的k线可以抓
            break

        # 扩展进all_klines:List
        all_klines.extend(data)

        # 最后一根K线的开盘时间,再往后+1分钟(ms)
        last_open_time = data[-1][0]
        fetch_start = last_open_time + 60 * 1000

        # 若不足1500条,说明到尽头了(因为手动给的limit都没用完)
        if len(data) < 1500:
            break
        # 为防止限频,短暂sleep
        time.sleep(0.15)

    # 转DataFrame
    cols = ["open_time","open","high","low","close","volume",
            "close_time","quote_volume","number_of_trades",
            "taker_buy_base_volume","taker_buy_quote_volume","ignore"]
    df = pd.DataFrame(all_klines, columns=cols)

    # 转换类型
    numeric_cols = ["open","high","low","close","volume","taker_buy_base_volume"]
    for c in numeric_cols:
        df[c] = df[c].astype(float)

    df["open_time"] = pd.to_datetime(df["open_time"], unit='ms') # int转换为datetime
    df.set_index("open_time", inplace=True) # 设置索引

    # 删除不必要列
    df.drop(["close_time","quote_volume","taker_buy_quote_volume","ignore"], axis=1, inplace=True)
    return df

def fetch_funding_rate(symbol: str, start_dt: datetime.datetime, end_dt: datetime.datetime) -> pd.DataFrame:
    """
    分段获取资金费率历史数据,并返回包含[fundingTime, fundingRate]的DataFrame,索引为fundingTime.
    注意:fundingRate每8小时一条.

    资金费率是永续合约特有的机制,用于使期货价格锚定现货价格,正值时多头付费给空头,负值时反之.
    """
    start_ms = int(start_dt.timestamp() * 1000)
    end_ms = int(end_dt.timestamp() * 1000)
    all_fr = []
    fetch_start = start_ms

    # 和获取k线数据逻辑差不多,也是不断循环之后调整格式
    while fetch_start < end_ms:
        url = "https://fapi.binance.com/fapi/v1/fundingRate"
        params = {
            "symbol": symbol,
            "startTime": fetch_start,
            "limit": 1000
        }
        r = requests.get(url, params=params)

        # data是一个列表,每个元素形如{"symbol":"BTCUSDT", "fundingTime":1709222400000, "fundingRate":"0.00010"}的字典
        data = r.json()
        if not data:
            break
        all_fr.extend(data)
        last_time = data[-1]["fundingTime"]
        fetch_start = last_time + 1 # api查询是>=startTime,避免重复.资金费率数据每8小时更新一次，但更新时间点不一定严格对齐到特定时刻
        if len(data) < 1000:
            break
        time.sleep(0.15)

    df_f = pd.DataFrame(all_fr) # 自动转换为键为列名,值为表格中数值的df
    df_f["fundingTime"] = pd.to_datetime(df_f["fundingTime"], unit='ms')
    df_f["fundingRate"] = df_f["fundingRate"].astype(float)

    df_f.set_index("fundingTime", inplace=True) # 设置时间为索引
    df_f.sort_index(inplace=True) # 再次按时间排序(以防万一)

    return df_f[["fundingRate"]] # 其实只能用到这一列


df_1m = fetch_1m_klines_futures(sym, start_dt, end_dt)
print("1m K线获取完成:", df_1m.shape)

df_fr = fetch_funding_rate(sym, start_dt, end_dt)
print("资金费率获取完成:", df_fr.shape)


"""将资金费率前向填充到分钟频"""

# 先创建一个完整的分钟级索引full_index,正好对上k线的区间索引:
full_index = pd.date_range(start=df_1m.index.min(), end=df_1m.index.max(), freq="1min")

# 资金费率前向填充
df_fr_ffill = df_fr.reindex(full_index, method='ffill')
# 同时把1mK线也reindex,防止K线中有某些分钟缺失(以防万一,向后填充补齐)
df_1m = df_1m.reindex(full_index, method='ffill')
# 在主df中添加同索引的新列
df_1m["fundingRate"] = df_fr_ffill["fundingRate"]

# 若开头可能出现NaN,可后向填充(交易所维护或者不是整点的八小时推送)
df_1m["fundingRate"].fillna(method='bfill', inplace=True)

print("合并完成,最终行数:", df_1m.shape[0]) # -> tuple(行数 * 列数)
df_1m.to_csv("/Users/gaohongjin/Desktop/btcdrl/BTCUSDT_1m_with_funding.csv")
print("已保存到:/Users/gaohongjin/Desktop/btcdrl/BTCUSDT_1m_with_funding.csv")

1m K线获取完成: (526500, 7)
资金费率获取完成: (1170, 1)
合并完成,最终行数: 526500


The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  df_1m["fundingRate"].fillna(method='bfill', inplace=True)
  df_1m["fundingRate"].fillna(method='bfill', inplace=True)


已保存到:/Users/gaohongjin/Desktop/btcdrl/BTCUSDT_1m_with_funding.csv


In [13]:
"""
================== 第二部分: 数据预处理与特征工程 ==================
主要内容
 - 填充缺失值
 - 生成一些衍生特征:对数收益、滚动波动率,或短期动量
 - 使用scikit-learn做标准化处理"
"""

df = pd.read_csv("/Users/gaohongjin/Desktop/btcdrl/BTCUSDT_1m_with_funding.csv")

# 第一列是时间列索引但没有列名，先设置列名
if 'open_time' not in df.columns:
    # 将第一列设置为索引，并命名为open_time
    df.rename(columns={df.columns[0]: 'open_time'}, inplace=True)
    df['open_time'] = pd.to_datetime(df['open_time'])
    df.set_index('open_time', inplace=True)

df = df[["open","high","low","close","volume","taker_buy_base_volume","fundingRate"]].copy()

# 再次以防万一
df.fillna(method='ffill', inplace=True)

# 计算对数收益
df["log_return"] = np.log(df["close"]).diff()
# 计算1分钟的买卖差(主动买量 vs 总量)
df["taker_sell_base_volume"] = df["volume"] - df["taker_buy_base_volume"]
df["buy_sell_diff"] = df["taker_buy_base_volume"] - df["taker_sell_base_volume"]

# 计算过去30分钟的滚动标准差
df["roll_vol_30"] = df["log_return"].rolling(30).std()

# 简单去掉最开始30行NaN
df = df.dropna()

# 标准化,假设要对一些feature columns做scaling
feat_cols = ["close","volume","taker_buy_base_volume","fundingRate","log_return","buy_sell_diff","roll_vol_30"]
scaler = StandardScaler() # 实例类以创建标准化工具
df_scaled_part = scaler.fit_transform(df[feat_cols]) # 调用实例的方法,返回二维np
df_scaled = pd.DataFrame(df_scaled_part, columns=[f+"_scaled" for f in feat_cols], index=df.index) # 转换为df

# 将标准化列拼回去
df_final = pd.concat([df, df_scaled], axis=1)

print("完成特征工程后的DataFrame:", df_final.shape)
df_final.to_csv("/Users/gaohongjin/Desktop/btcdrl/preprocessed.csv")
print("保存特征工程结果:/Users/gaohongjin/Desktop/btcdrl/preprocessed.csv")

  df.fillna(method='ffill', inplace=True)


完成特征工程后的DataFrame: (526470, 18)
保存特征工程结果:/Users/gaohongjin/Desktop/btcdrl/preprocessed.csv


In [14]:
"""
对已经保存的 preprocessed.csv 继续读取,并添加:
    CFO (Chande Forecast Oscillator),
    PWMA (Pascal Weighted MA) 
    RVI (Relative Vigor Index)
"""

df_path = "/Users/gaohongjin/Desktop/btcdrl/preprocessed.csv"
df = pd.read_csv(df_path, parse_dates=["open_time"], index_col="open_time")

# CFO(Chande Forecast Oscillator)
def chande_forecast_oscillator(prices, period=14):
    """
    通过线性回归预测价格趋势。它计算当前价格与预测价格之间的百分比差异，
    以衡量价格偏离预期趋势的程度。正值表示价格高于预期（可能超买），负值表示价格低于预期（可能超卖）。
    CFO 基于线性回归预测，能够识别价格动量的变化和潜在的趋势反转点，在交易策略中常用于确认趋势方向和强度。
    """
    c = np.asarray(prices, dtype=float)
    length = len(c)
    CFO = np.full(length, np.nan)
    S = np.zeros(length + 1, dtype=float) # cumsum
    T = np.zeros(length + 1, dtype=float) # weighted cumsum

    for i in range(length):
        S[i+1] = S[i] + c[i]
        T[i+1] = T[i] + i*c[i]

    sum_x = (period - 1)*period/2.0
    sum_x2 = (period - 1)*period*(2*period-1)/6.0

    for i in range(period-1, length):
        sum_y = S[i+1] - S[i+1 - period]
        sum_xy = (T[i+1] - T[i+1 - period]) - (i+1-period)*sum_y
        numerator = period*sum_xy - sum_x*sum_y
        denominator = period*sum_x2 - sum_x*sum_x
        slope = numerator/denominator
        intercept = (sum_y - slope*sum_x)/period
        forecast = intercept + slope*(period-1)
        CFO[i] = (c[i] - forecast)/c[i]*100.0
    return CFO

df["CFO"] = chande_forecast_oscillator(df["close"].values, period=14)

# PWMA (Pascal Weighted MA)
from math import comb
"""
    Pascal Weighted Moving Average (PWMA) 是一种特殊的加权移动平均线，使用帕斯卡三角形的系数作为权重。
    这种加权方式赋予中间数据点最大权重，两端数据点权重逐渐减小，形成对称的钟形分布。
    与传统移动平均线相比，PWMA 能更好地平滑价格波动同时保留重要趋势信息，减少滞后性，
    在识别中期趋势和过滤市场噪音方面表现出色。
"""
def pascal_weights(n):
    return np.array([comb(n-1, k) for k in range(n)], dtype=float)

def pascal_weighted_moving_average(prices, window):
    w = pascal_weights(window)
    weighted_sum = np.convolve(prices, w, mode='valid')
    pwma_vals = weighted_sum / w.sum()
    result = np.full_like(prices, np.nan, dtype=float)
    result[window-1:] = pwma_vals
    return result

df["pwma_10"] = pascal_weighted_moving_average(df["close"].values, window=10)

# RVI(Relative Vigor Index)
# 需要 open high low close
def calc_rvi(df_in, window=10):
    """
    采用( (close - open)+2*(close.shift1 - open.shift1)+2*(close.shift2 - open.shift2)+(close.shift3 - open.shift3) )/6
    除以( (high - low)+2*(high.shift1 - low.shift1)+2*(high.shift2 - low.shift2)+(high.shift3 - low.shift3) )/6 的滚动平均.
    并再平滑出 rvi_signal.

    Relative Vigor Index (RVI) 通过比较价格变动的强度与波动性来衡量市场活力。
    它计算收盘价与开盘价之差（价格方向）与最高价和最低价之差（波动范围）的比率，
    并对这些值进行加权平滑处理。RVI 值高表示上涨动能强劲，值低表示下跌动能强劲。
    与MACD类似，RVI 通常与其信号线一起使用，信号线交叉可以生成买入和卖出信号，
    有效识别市场趋势转变和潜在的交易机会。
    """
    df_out = df_in.copy()

    num = (
        (df_out["close"] - df_out["open"]) 
        + 2*(df_out["close"].shift(1) - df_out["open"].shift(1))
        + 2*(df_out["close"].shift(2) - df_out["open"].shift(2))
        + (df_out["close"].shift(3) - df_out["open"].shift(3))
    )/6.0

    den = (
        (df_out["high"] - df_out["low"])
        + 2*(df_out["high"].shift(1) - df_out["low"].shift(1))
        + 2*(df_out["high"].shift(2) - df_out["low"].shift(2))
        + (df_out["high"].shift(3) - df_out["low"].shift(3))
    )/6.0

    # rolling mean
    num_sma = num.rolling(window).mean()
    den_sma = den.rolling(window).mean()

    df_out["RVI"] = num_sma / den_sma

    # rvi_signal
    rvi_signal = (
        df_out["RVI"]
        + 2*df_out["RVI"].shift(1)
        + 2*df_out["RVI"].shift(2)
        + df_out["RVI"].shift(3)
    )/6.0
    df_out["RVI_signal"] = rvi_signal

    return df_out

df = calc_rvi(df, window=10)

df.dropna(inplace=True)

print("已添加3类额外因子(CFO, PWMA, RVI).")
df.to_csv("/Users/gaohongjin/Desktop/btcdrl/df_factors.csv")
print("已保存到 /Users/gaohongjin/Desktop/btcdrl/df_factors.csv, shape:", df.shape)

已添加3类额外因子(CFO, PWMA, RVI).
已保存到 /Users/gaohongjin/Desktop/btcdrl/df_factors.csv, shape: (526450, 22)


In [17]:
"""
cta_like因子:
用(1) taker 交易量、(2) 资金费率 (fundingRate)、(3) buy-sell 差值 做逻辑替换(短线投机 vs 中线持仓)

移植自本人原先的CTA策略思路,但考虑到Binance 只提供最近 30 天的 5 分钟 OI 历史,做出如下假设:
	1.	资金费率的滚动平均可在一定程度上替代 OI 增减
        •(有时资金费率正但 OI 并不一定上升，也可能是流动性薄弱或价格短时被推高；不过统计上资金费率极端正值确实常伴随更多多头杠杆。)
    2.	buy_sell_diff 的滚动均值可辅助衡量多空净主动力量在更长时间（比如1天）的累计方向。(主动挂单买 - 主动挂单卖)
        •这仍是对“持仓延续”的近似，毕竟真正的 T+1 还需要 OI 确定，但至少能捕捉“多数成交都在做多”这种迹象。
    3.	taker_ratio = taker_buy_base_volume / volume 用来表示当下(1分钟内) T+0 短线投机多头情绪。
        •无法区分“真的做多并平仓当日”还是“多头开仓后隔夜”；但主动买量高度集中确实常意味着短期看涨情绪。
         有时 buy-sell 差值突增是日内来回震荡，也可能不带来真正持仓规模变化。但在高频层面，这类突变通常对应情绪面临极端(短期大单把市场推到另一极端)。
	4.	DOV 替代：形成 cta_dov = abs(Δ buy_sell_diff)/Δ volume，用来衡量市场 瞬间(短线) 的情绪波动强度。

"""

"""
==================【优化后的CTA因子: 动态加权 & 资金费率插值 & 波动率状态化处理】==================
本单元格替代之前的简易CTA因子构建方式,兼顾:
1) 短线 / 中线 情绪因子(移植原CTA的T+0/T+1思路)
2) 动态加权(波动率分区 + 信号一致性 + 资金费率极端性)
3) 资金费率稀疏性处理:三次样条插值 + 指数加权(EWMA)平滑
4) 结果保存在 df_cta_optimized.csv
"""

# df_factors.csv 中应包含:
#   - buy_sell_diff (净主动买入-卖出量)
#   - roll_vol_30   (30分钟滚动波动率,用来判别市场波动水平)
#   - fundingRate   (每8小时公布,前向填充后)
#   - taker_buy_base_volume, volume, 等基础列
df_path = "/Users/gaohongjin/Desktop/btcdrl/df_factors.csv"
df = pd.read_csv(df_path, parse_dates=["open_time"], index_col="open_time")

# ========== 1) 对资金费率做一点滚动平滑 ==========
# 资金费率在8小时换挡时常有瞬时跳变,我们选个小窗口(4)做简单mean,
# 以减弱尖刺,同时不会过度扭曲原始数据.
df['funding_smooth'] = df['fundingRate'].rolling(window=4, min_periods=1).mean().fillna(method='ffill')

# ========== 2) 构造短线情绪(近5分钟买卖差均值) ==========
# ofr = (taker_buy_base_volume - taker_sell_base_volume) / volume => [-1,1]
# 代表这一分钟主动买卖力量对比.
df['taker_sell_base_volume'] = df['volume'] - df['taker_buy_base_volume']
df['ofr'] = (df['taker_buy_base_volume'] - df['taker_sell_base_volume']) / (df['volume']+1e-9)

# 对最近5分钟ofr求平均,作为短线情绪sent_short
short_window = 5
df['sent_short'] = df['ofr'].rolling(short_window, min_periods=1).mean()

# ========== 3) 构造中线情绪(近2小时,120分钟) ==========
# 资金费率 * buy_sell_diff, 通过两者乘积来表达"若funding依然偏正 & buy_sell_diff>0 => 多头延续"
# 这里用roll_funding_120, roll_bs_120均值后再做softsign(x/(1+|x|)) => [-1,1], 避免值无限放大
df['roll_funding_120'] = df['funding_smooth'].rolling(120, min_periods=1).mean()
df['roll_bs_120'] = df['buy_sell_diff'].rolling(120, min_periods=1).mean()

def softsign(x):
    '''
    softsign函数，将数据平滑映射到[-1,1]区间。
    
    相比于tanh或sigmoid：
    1. 计算效率更高（无需指数运算）
    2. 在大值区域衰减更慢，保留更多信号强度差异
    3. 梯度消失问题较轻，对极端值反应更平滑
    4. 特别适合金融时间序列中需要保留信号强度但又要限制范围的情况
    1e-9用于数值稳定性，防止除零错误
    '''
    return x/(1+np.abs(x)+1e-9)

df['t1_raw'] = df['roll_funding_120'] * df['roll_bs_120']
df['sent_medium'] = softsign(df['t1_raw'])  # 作为中线因子

# ========== 4) 动态权重:根据波动率高低分区,并检查短中线同向时增强 ==========
# 波动率越高(roll_vol_30大),越依赖短线因子 => (0.7,0.3)
# 波动率越低,越依赖中线 => (0.3,0.7)
high_q = df['roll_vol_30'].quantile(0.67)
low_q  = df['roll_vol_30'].quantile(0.33)

df['w_short'] = 0.5
df['w_medium'] = 0.5

# 高波动 => w_short=0.7, w_medium=0.3
df.loc[df['roll_vol_30']>high_q, 'w_short']  = 0.7
df.loc[df['roll_vol_30']>high_q, 'w_medium'] = 0.3

# 低波动 => w_short=0.3, w_medium=0.7
df.loc[df['roll_vol_30']<low_q,  'w_short']  = 0.3
df.loc[df['roll_vol_30']<low_q,  'w_medium'] = 0.7

# 先线性合成
df['cta_base'] = df['w_short']*df['sent_short'] + df['w_medium']*df['sent_medium']

# 同向增强(align boost): 若短线与中线信号同号,说明多周期共振,放大该方向信号
def same_sign(x,y):
    return np.where(np.sign(x*y)>0, 1, 0) 

df['align_mask'] = same_sign(df['sent_short'], df['sent_medium'])
align_boost_val = 0.4  # 放大的幅度(40%)

df['cta_factor_temp'] = df['cta_base'].copy()
df.loc[df['align_mask']>0, 'cta_factor_temp'] *= (1 + align_boost_val)

# ========== 5) DOV 替代: abs(Δ buy_sell_diff)/Δ volume, 并截断极值 ==========
df['delta_bs'] = df['buy_sell_diff'].diff().abs()
df['delta_vol'] = df['volume'].diff().abs() + 1e-9
df['cta_dov'] = df['delta_bs']/df['delta_vol']
# 防止极端分钟的跳变(可能是脏数据或极端事件)
df['cta_dov_optimized'] = df['cta_dov'].clip(upper=10)

# ========== 6) 对资金费率的极端值进行小幅反向修正 ==========
# 很多数字货币的策略在做
# 当funding_smooth过正 => 多头过于拥挤,适度下调cta_factor_temp
# 当funding_smooth过负 => 空头过拥挤,上调因子(表示可能反弹)
pos_thr = 0.0002
neg_thr = -0.0002
df['cta_factor_optimized'] = df['cta_factor_temp'].copy() # 复制到新列(默认深copy)

df['excess_pos'] = (df['funding_smooth'] - pos_thr).clip(lower=0)
df['excess_neg'] = (df['funding_smooth'] - neg_thr).clip(upper=0)

# 这里选0.1作为修正系数,可回测调参
df['cta_factor_optimized'] -= 0.1*df['excess_pos']/pos_thr
df['cta_factor_optimized'] += 0.1*(df['excess_neg']/neg_thr)

# ========== 7) 清理 NaN 并保存 ==========
df.dropna(inplace=True)
out_path = "/Users/gaohongjin/Desktop/btcdrl/df_cta_optimized.csv"
df.to_csv(out_path)
print("改进的CTA情绪因子已生成 => cta_factor_optimized, cta_dov_optimized")
print("保存到:", out_path, "shape:", df.shape)

  df['funding_smooth'] = df['fundingRate'].rolling(window=4, min_periods=1).mean().fillna(method='ffill')


改进的CTA情绪因子已生成 => cta_factor_optimized, cta_dov_optimized
保存到: /Users/gaohongjin/Desktop/btcdrl/df_cta_optimized.csv shape: (526449, 41)


In [None]:
# ————————————————————————————数据处理的内容以上完毕,接下来是实现策略的内容——————————————————————————-
'''预测未来短期（5-30分钟）价格变动概率分布，并根据信号强度、波动率和资金费率动态调整头寸大小'''

In [None]:
"""
1 label_create:
从 df_cta_optimized.csv 中读取数据, 生成 5分钟后、15分钟后、30分钟后的对数收益/方向标签.
可灵活改成分类(上涨=1,下跌=0)或多分类(涨/平/跌).
y_5min, y_15min, y_30min(值{0,1})
"""

df = pd.read_csv("/Users/gaohongjin/Desktop/btcdrl/df_cta_optimized.csv", parse_dates=["open_time"], index_col="open_time")

# 若 'close'列已存在就拿它当作价格. 否则你有别的收盘价列,请改名.
price_col = 'close'
df['log_price'] = np.log(df[price_col].clip(lower=1e-6))

# 准备 label: future log-return over 5,15,30 min, 对数收益率
df['fwd_ret_5']  = df['log_price'].shift(-5)  - df['log_price']
df['fwd_ret_15'] = df['log_price'].shift(-15) - df['log_price']
df['fwd_ret_30'] = df['log_price'].shift(-30) - df['log_price']

# 二分
df['y_5min']  = (df['fwd_ret_5']  >0).astype(int)
df['y_15min'] = (df['fwd_ret_15'] >0).astype(int)
df['y_30min'] = (df['fwd_ret_30'] >0).astype(int)

# 去除末尾无效行(因为shift(-30))
df.dropna(inplace=True)

# 输出特征列X,和三个目标列y_5min, y_15min, y_30min
# 这里示例选择了 CTA与技术因子等. 你可自由增删.
feature_cols = [
    'cta_factor_optimized','cta_dov_optimized','funding_smooth','ofr','sent_short','sent_medium',
    'CFO','pwma_10','RVI','RVI_signal','roll_vol_30','close','volume','buy_sell_diff'
    # ...可以继续加你想要的特征
]

X = df[feature_cols].copy()
Y = df[['y_5min','y_15min','y_30min']].copy()

# 保存一下
X.to_csv("/Users/gaohongjin/Desktop/btcdrl/X_features.csv", index=True)
Y.to_csv("/Users/gaohongjin/Desktop/btcdrl/Y_labels.csv", index=True)

print("Label creation done. X shape:", X.shape, "Y shape:", Y.shape)

"""
之后可能还想换成回归目标(直接预测 fwd_ret 的数值)，或者做三分类(涨/平/跌)——只需在此处改 label 即可。
"""


Label creation done. X shape: (526419, 14) Y shape: (526419, 3)


'\n之后可能还想换成回归目标(直接预测 fwd_ret 的数值)，或者做三分类(涨/平/跌)——只需在此处改 label 即可。\n'

In [11]:
"""
2. model_training_optuna:

替代 skopt 的 BayesSearchCV, 采用 optuna 进行超参搜索 + TimeSeriesSplit 评估.

Optuna相比其他超参优化库的优势:
1) 更高效的搜索算法 - 支持TPE、CMA-ES等多种采样器，比传统贝叶斯优化收敛更快
2) 灵活的参数定义 - 支持条件参数和嵌套搜索空间，适合复杂模型结构
3) 内置剪枝功能 - 自动停止表现不佳的trial，节省计算资源
4) 并行优化支持 - 可利用多核或分布式环境加速搜索
5) 直观的可视化 - 提供丰富的可视化工具分析超参影响
6) 活跃维护 - 持续更新并兼容最新科学计算库，而skopt已停止维护

具体流程:
1) 读取X_features.csv 和 Y_labels.csv
2) 对y_5min / y_15min / y_30min分别做LightGBM二分类训练
   - 在Objective函数里用TimeSeriesSplit做多折滚动评估
   - 优化目标为平均AUC(或F1, Accuracy) => 'score'
   - 选出best trial后,用其params训练final model
3) 保存3个模型pkl.
"""
# 添加警告过滤器忽略此警告
# 训练时用了带列名的df,预测时使用无列明的np数组,顺序一样,为了性能
import warnings
warnings.filterwarnings('ignore', category=UserWarning, 
                       message='X does not have valid feature names')


X = pd.read_csv("/Users/gaohongjin/Desktop/btcdrl/X_features.csv", index_col="open_time")
Y = pd.read_csv("/Users/gaohongjin/Desktop/btcdrl/Y_labels.csv", index_col="open_time")

tscv = TimeSeriesSplit(n_splits=5) # 训练集不断变大,测试集不断向后推

# 定义一个目标函数，告诉optuna如何评价参数好不好（试验的参数组合）--从‘告诉模型’这个角度来理解
def objective_lgbm(trial, X_array, y_array):
    """
    Objective函数: 给定trial, 先采样一批超参, 然后做TimeSeriesSplit,
    训练若干次, 取平均AUC作为score => 返回 -meanAUC (因为optuna默认最小化)
    """
    # 从 optuna 试验对象 trial 中随机挑一组参数，供模型使用
    param = {
        'objective': 'binary',           # 定义问题类型为二分
        'metric': 'auc',                 # 使用AUC作为评估指标,数值越高越好
        'boosting_type': 'gbdt',         # 使用梯度提升决策树算法
        'n_estimators': 3000,            # 最大树的数量(迭代次数)，设置较大以配合早停(不进步了)
        'learning_rate': trial.suggest_float('learning_rate', 1e-3, 0.1, log=True),  # 学习率，在1e-3到0.1之间取对数均匀分布的值(随机选一个对数尺度的,保证小区间也能均匀探索)
        'num_leaves': trial.suggest_int('num_leaves', 8, 128),                  # 单棵树的最大叶子数，影响模型复杂度
        'min_child_samples': trial.suggest_int('min_child_samples', 5, 100),    # 叶子节点最少要有多少个数据样本，防止树学得太复杂而过拟合
        'subsample': trial.suggest_float('subsample', 0.5, 1.0),              # 随机抽取样本比例（0.5~1.0之间），提高模型的泛化能力
        'colsample_bytree': trial.suggest_float('colsample_bytree', 0.5, 1.0), # 每棵树随机抽取的特征比例（0.5~1.0之间），让每棵树不完全一样
    }

    aucs = [] # 存放不同split的验证结果

    if trial.number == 0:  # 只在第一个trial打印，避免重复信息
        print(f"\n数据总长度: {len(X_array)}")
        print("TimeSeriesSplit索引示例:")
        for i, (train_idx, test_idx) in enumerate(tscv.split(X_array)):
            print(f"split{i+1}:")
            print(f"  训练集: {len(train_idx)}个样本, 索引范围[{min(train_idx)}-{max(train_idx)}]")
            print(f"  测试集: {len(test_idx)}个样本, 索引范围[{min(test_idx)}-{max(test_idx)}]")
            # 只打印前几个索引示例
            print(f"  训练索引前5个: {train_idx[:5]}...")
            print(f"  测试索引前5个: {test_idx[:5]}...")
            print()


    for train_idx, test_idx in tscv.split(X_array):  # 按时间顺序逐次取训练集和测试集
        X_train, X_test = X_array[train_idx], X_array[test_idx]  # 切分特征数据为训练和测试
        y_train, y_test = y_array[train_idx], y_array[test_idx]  # 切分标签数据为训练和测试

        # 创建一个LightGBM分类模型，解包传入刚刚挑的参数
        model = lgb.LGBMClassifier(**param, verbosity=-1)

        # 训练模型，同时给一个验证集，防止模型训练得太久（early stopping）
        model.fit(
            X_train, y_train,
            eval_set=[(X_test, y_test)],  # 验证数据（用的对应的测试集）告诉模型什么时候停止训练
            eval_metric='auc',            # 用 AUC 来评估模型效果
            callbacks=[
                lgb.early_stopping(stopping_rounds=100, verbose=False),  # 早停
                lgb.log_evaluation(period=0)                             # 关闭日志
            ]
        )

        # 预测测试集数据，得到每个样本属于正类的概率
        y_pred_prob = model.predict_proba(X_test)[:, 1]
        # 计算这一split交叉验证的 AUC 得分
        fold_auc = roc_auc_score(y_test, y_pred_prob)
        # 将这一split的得分记录下来
        aucs.append(fold_auc)

    mean_auc = np.mean(aucs)  # 计算所有split的 AUC 得分的平均值
    return -mean_auc  # 因为 optuna 只能最小化，我们取负数，让它最大化平均 AUC

# 定义一个函数专门训练 LightGBM 模型并且自动调参
def train_lgbm_optuna(X, Y, y_col, n_trials=88):
    y_array = Y[y_col].values  # 从标签数据中提取目标列（比如 y_5min）
    X_array = X.values  # 特征数据转换为 numpy 数组，更快更方便计算

    def callback(study, trial):
        # 获取当前最佳值
        current_best = -study.best_value
        current_trial_value = -trial.value if trial.value is not None else None
        
        # 只有在第一个trial或发现更好结果时才打印详细信息
        if trial.number == 0 or (current_trial_value is not None and current_trial_value >= current_best):
            params_str = ", ".join([f"{k}={v:.4f}" if isinstance(v, float) else f"{k}={v}" 
                                for k, v in trial.params.items()])
            
            if current_trial_value is not None:
                print(f"Trial {trial.number:3d} | AUC: {current_trial_value:.4f} | {params_str}")
            
            # 如果这是新的最佳结果，特别标记
            if current_trial_value is not None and current_trial_value >= current_best:
                print(f"★ 新的最佳结果! AUC: {current_trial_value:.4f} ★")
        
        # 每10个trials显示一次进度摘要
        if (trial.number + 1) % 10 == 0:
            print(f"完成 {trial.number + 1}/{n_trials} 次试验 | 当前最佳 AUC: {current_best:.4f}")


    # 创建 optuna 的优化对象，目标是最小化（其实是最大化 AUC，因为我们取了负数）
    study = optuna.create_study(direction='minimize')

    # 运行 optuna 来自动搜索最佳参数组合
    study.optimize(
        lambda trial: objective_lgbm(trial, X_array, y_array), 
        n_trials=n_trials,     # 总共试验30组参数组合
        callbacks=[callback],  # 每次试验后的回调函数，这里我啥也没做.(就是前面那一大串)
        show_progress_bar=True # 显示进度条，知道还有多久完成
    )

    # 得到最好的参数组合和最佳得分（负的平均 AUC）
    best_params = study.best_params
    best_value = study.best_value

    print(f"[{y_col}] Best params:", best_params)
    print(f"[{y_col}] Best -meanAUC:", best_value, "=> meanAUC=", -best_value)

    # 用best_params训练final
    final_model = lgb.LGBMClassifier(
        objective='binary', 
        metric='auc',
        boosting_type='gbdt',
        n_estimators=3000,
        **best_params
    )

    # 用全数据X_array,y_array再训练(可再加early_stop对Val,看你情况)
    final_model.fit(X_array, y_array)
    return final_model

# 分别为不同的时间尺度训练并保存三个模型
model_5min  = train_lgbm_optuna(X, Y, 'y_5min')
model_15min = train_lgbm_optuna(X, Y, 'y_15min')
model_30min = train_lgbm_optuna(X, Y, 'y_30min')

# 保留模型的完整状态（包括内部参数、树结构等）(不用再次训练)
joblib.dump(model_5min,  "/Users/gaohongjin/Desktop/btcdrl/lgb_optuna_5min.pkl")
joblib.dump(model_15min, "/Users/gaohongjin/Desktop/btcdrl/lgb_optuna_15min.pkl")
joblib.dump(model_30min, "/Users/gaohongjin/Desktop/btcdrl/lgb_optuna_30min.pkl")

print("3个周期的LightGBM Optuna模型都已训练完成并保存。")


[I 2025-03-27 23:18:48,174] A new study created in memory with name: no-name-2bf8f53b-29ae-439a-a9e9-a1853a97168b


  0%|          | 0/88 [00:00<?, ?it/s]


数据总长度: 526419
TimeSeriesSplit索引示例:
split1:
  训练集: 87739个样本, 索引范围[0-87738]
  测试集: 87736个样本, 索引范围[87739-175474]
  训练索引前5个: [0 1 2 3 4]...
  测试索引前5个: [87739 87740 87741 87742 87743]...

split2:
  训练集: 175475个样本, 索引范围[0-175474]
  测试集: 87736个样本, 索引范围[175475-263210]
  训练索引前5个: [0 1 2 3 4]...
  测试索引前5个: [175475 175476 175477 175478 175479]...

split3:
  训练集: 263211个样本, 索引范围[0-263210]
  测试集: 87736个样本, 索引范围[263211-350946]
  训练索引前5个: [0 1 2 3 4]...
  测试索引前5个: [263211 263212 263213 263214 263215]...

split4:
  训练集: 350947个样本, 索引范围[0-350946]
  测试集: 87736个样本, 索引范围[350947-438682]
  训练索引前5个: [0 1 2 3 4]...
  测试索引前5个: [350947 350948 350949 350950 350951]...

split5:
  训练集: 438683个样本, 索引范围[0-438682]
  测试集: 87736个样本, 索引范围[438683-526418]
  训练索引前5个: [0 1 2 3 4]...
  测试索引前5个: [438683 438684 438685 438686 438687]...

[I 2025-03-27 23:19:01,474] Trial 0 finished with value: -0.521835585117658 and parameters: {'learning_rate': 0.01902929125999233, 'num_leaves': 44, 'min_child_samples': 73, 'subsample': 0.990

[I 2025-03-27 23:36:57,023] A new study created in memory with name: no-name-17bb4a08-7199-4948-9baa-a2cf3d17bc86


  0%|          | 0/88 [00:00<?, ?it/s]


数据总长度: 526419
TimeSeriesSplit索引示例:
split1:
  训练集: 87739个样本, 索引范围[0-87738]
  测试集: 87736个样本, 索引范围[87739-175474]
  训练索引前5个: [0 1 2 3 4]...
  测试索引前5个: [87739 87740 87741 87742 87743]...

split2:
  训练集: 175475个样本, 索引范围[0-175474]
  测试集: 87736个样本, 索引范围[175475-263210]
  训练索引前5个: [0 1 2 3 4]...
  测试索引前5个: [175475 175476 175477 175478 175479]...

split3:
  训练集: 263211个样本, 索引范围[0-263210]
  测试集: 87736个样本, 索引范围[263211-350946]
  训练索引前5个: [0 1 2 3 4]...
  测试索引前5个: [263211 263212 263213 263214 263215]...

split4:
  训练集: 350947个样本, 索引范围[0-350946]
  测试集: 87736个样本, 索引范围[350947-438682]
  训练索引前5个: [0 1 2 3 4]...
  测试索引前5个: [350947 350948 350949 350950 350951]...

split5:
  训练集: 438683个样本, 索引范围[0-438682]
  测试集: 87736个样本, 索引范围[438683-526418]
  训练索引前5个: [0 1 2 3 4]...
  测试索引前5个: [438683 438684 438685 438686 438687]...

[I 2025-03-27 23:37:27,102] Trial 0 finished with value: -0.5249106328503887 and parameters: {'learning_rate': 0.020218042532383695, 'num_leaves': 67, 'min_child_samples': 93, 'subsample': 0.9

[I 2025-03-27 23:59:39,372] A new study created in memory with name: no-name-a0b1071e-3ca3-4e66-aac0-8ec1edcdb2bc


  0%|          | 0/88 [00:00<?, ?it/s]


数据总长度: 526419
TimeSeriesSplit索引示例:
split1:
  训练集: 87739个样本, 索引范围[0-87738]
  测试集: 87736个样本, 索引范围[87739-175474]
  训练索引前5个: [0 1 2 3 4]...
  测试索引前5个: [87739 87740 87741 87742 87743]...

split2:
  训练集: 175475个样本, 索引范围[0-175474]
  测试集: 87736个样本, 索引范围[175475-263210]
  训练索引前5个: [0 1 2 3 4]...
  测试索引前5个: [175475 175476 175477 175478 175479]...

split3:
  训练集: 263211个样本, 索引范围[0-263210]
  测试集: 87736个样本, 索引范围[263211-350946]
  训练索引前5个: [0 1 2 3 4]...
  测试索引前5个: [263211 263212 263213 263214 263215]...

split4:
  训练集: 350947个样本, 索引范围[0-350946]
  测试集: 87736个样本, 索引范围[350947-438682]
  训练索引前5个: [0 1 2 3 4]...
  测试索引前5个: [350947 350948 350949 350950 350951]...

split5:
  训练集: 438683个样本, 索引范围[0-438682]
  测试集: 87736个样本, 索引范围[438683-526418]
  训练索引前5个: [0 1 2 3 4]...
  测试索引前5个: [438683 438684 438685 438686 438687]...

[I 2025-03-27 23:59:48,784] Trial 0 finished with value: -0.5227453883643488 and parameters: {'learning_rate': 0.08470913357951602, 'num_leaves': 34, 'min_child_samples': 81, 'subsample': 0.76