In [None]:
import optuna
import pandas as pd

df = pd.read_parquet('cb_data.pq')
index = pd.read_parquet('index.pq')

In [None]:
# 基础设置
n_trials = 1000  # 迭代次数
n_jobs = 10  # 并行数量
start_date = '20220801'  # 开始日期
end_date = '20240325'  # 结束日期
num_factors = 6  # 因子数量
hold_num = 5  # 持仓数量
threshold_num = 5  # 阈值轮动
price_min = 100  # 最小价格
price_max = 150  # 最大价格

In [None]:
from cal_factor_util import simple_momentum, rsi, stochastic_oscillator, macd, momentum, adx, velocity, pvt, \
    volatility_breakout, trend_strength, dema

df['pc1'] = simple_momentum(df['close'], period=1)
df['pc3'] = simple_momentum(df['close'], period=3)
df['pc5'] = simple_momentum(df['close'], period=5)
df['pc7'] = simple_momentum(df['close'], period=7)

df['rsi1'] = rsi(df, period=1)
df['rsi3'] = rsi(df, period=3)
df['rsi5'] = rsi(df, period=5)
df['rsi7'] = rsi(df, period=7)

df['stoch1'], df['stoch_signal1'] = stochastic_oscillator(df, k_period=3, d_period=1)
df['stoch2'], df['stoch_signal2'] = stochastic_oscillator(df, k_period=7, d_period=2)
df['stoch3'], df['stoch_signal3'] = stochastic_oscillator(df, k_period=14, d_period=3)

df['macd'], df['macd_signal'], df['macd_diff'] = macd(df, fast_period=12, slow_period=26, signal_period=9)

df['adx7'] = adx(df, period=7)
df['adx14'] = adx(df, period=14)

df['momentum3'] = momentum(df['close'], period=3)
df['momentum6'] = momentum(df['close'], period=6)
df['momentum12'] = momentum(df['close'], period=12)

df['velocity3'] = velocity(df['close'], period=3)
df['velocity5'] = velocity(df['close'], period=5)
df['velocity7'] = velocity(df['close'], period=7)

df['pvt'] = pvt(df)

df['volatility_stk5'] = volatility_breakout(df, period=5)
df['volatility_stk10'] = volatility_breakout(df, period=10)
df['volatility_stk20'] = volatility_breakout(df, period=20)

df['trend_strength'] = trend_strength(df, short_window=12, long_window=26)

df['dema5'] = dema(df, period=5)
df['dema21'] = dema(df, period=21)

In [None]:
# 参数空间定义
factors = ['pre_close', 'open', 'high', 'low', 'close', 'pct_chg', 'vol',  # 7
           'amount', 'volatility_stk', 'mod_conv_prem', 'remain_cap', 'conv_prem',  # 12
           'turnover', 'theory_value', 'option_value', 'dblow',  # 16
           'theory_bias', 'ytm', 'cap_mv_rate', 'pure_value', 'bond_prem',  # 21
           'remain_size', 'theory_conv_prem', 'pb', 'pe_ttm', 'ps_ttm',  # 26
           'pc1', 'pc3', 'pc5', 'pc7', 'rsi1',  # 31
           'rsi3', 'rsi5', 'rsi7', 'stoch1', 'stoch_signal1', 'stoch2',  # 37
           'stoch_signal2', 'stoch3', 'stoch_signal3', 'macd', 'macd_signal',  # 42
           'macd_diff', 'adx7', 'adx14', 'momentum3', 'momentum6', 'momentum12',  # 48
           'velocity3', 'velocity5', 'velocity7', 'pvt', 'volatility_stk5',  # 53
           'volatility_stk10', 'volatility_stk20', 'trend_strength', 'dema5', 'dema21']  # 58

In [None]:
import itertools
from more_factor_test_origin_code import cal_cagr


def decode_combination(encoded):
    return [factors[i] for i in encoded]


combinations = list(itertools.combinations(range(len(factors)), num_factors))
encoded_combinations = {i: combo for i, combo in enumerate(combinations)}


def objective(trial):
    encoded_id = trial.suggest_int('encoded_id', 0, len(encoded_combinations) - 1)
    factor_ids = encoded_combinations[encoded_id]

    rank_factors = []
    decoded_factors = decode_combination(factor_ids)
    for i in range(num_factors):
        factor_info = {
            'name': decoded_factors[i],
            'weight': trial.suggest_categorical(f'factor{i + 1}_weight', [1, 2, 3, 4, 5]),
            'ascending': trial.suggest_categorical(f'factor{i + 1}_ascending', [True, False])
        }
        rank_factors.append(factor_info)

    cagr = cal_cagr(df, start_date, end_date, hold_num, threshold_num, price_min, price_max, rank_factors)
    print("factor_combination:{}, cagr:{}".format(rank_factors, cagr))
    return cagr

In [None]:
# 创建一个研究对象并指定TPESampler
study = optuna.create_study(sampler=optuna.samplers.TPESampler(seed=1212), direction='maximize')
study.optimize(lambda trial: objective(trial), n_trials=n_trials, n_jobs=n_jobs)

In [None]:
# 打印最优参数
best_params = study.best_params
best_value = study.best_value
print("最优参数：", best_params)
print("最优参数下的目标函数值：", best_value)

In [None]:
def flexible_decode_combination(encoded_params):
    # 解码因子组合索引
    factor_indices = combinations[encoded_params['encoded_id']]
    # 构建详细的因子组合列表
    rank_factors = []
    for i, index in enumerate(factor_indices):
        factor_info = {
            'name': factors[index],
            'weight': encoded_params[f'factor{i + 1}_weight'],
            'ascending': encoded_params[f'factor{i + 1}_ascending']
        }
        rank_factors.append(factor_info)

    return rank_factors

In [None]:
factor_combination = flexible_decode_combination(best_params)
factor_combination