### Q4 遗传算法在资产分配选择上的优化研究

Q4使用遗传算法尝试在资产池中选出最优资产配置组合。

#### 1. 研究对象

此实验研究问题主要为，当给定大资产池中，从中选取一定数量的资产，保证此资产组合为最优资产配置组合。其问题包含两个变量，资产选择和权重分配。本实验尝试通过多目标遗传算法(NSGA-II)和风险预算模型结合来达到目标。

在实验过程中，将资产选择作为基因因子(01010100...)。其中1代表选择资产，0代表不选择资产。在进化过程中，evaluate()函数依赖风险预算模型，首先风险预算模型根据因子选择资产，然后计算出权重，根据计算得出权重计算回报，波动，夏普。以此三目标作为进化目的，进行进化。在进化结束后，以pareto front中的最大回报和最小波动两组解作为GA模型的预测权重，记作'RB-GA-Max-Ret'和'RB-GA-Min-Vol'，与平权进行回测比较。

#### 2. 测试数据

本实验使用A股每日收盘价进行计算，文件位置：[data/aidx_eod_prices.csv](data/aidx_eod_prices.csv)

实验过程中随机挑选10种资产组合（数量在5-10只，相关系数小于50%）进行计算。

#### 3. 分析结果概况
- 遗传算法在小范围的资产池中(5-10只)，可以对风险预算模型进行优化。
- 遗传算法可能无法在大范围的资产池中很有效（时间+准确率）的寻找最优资产分配。
- 遗传算法可以作为风险预算问题的求解器，相较于SLSQP，此方法能够寻找到更优解，但效率过低且对风险预算模型表现接近。

#### 4. 注意事项
- Notebook无法保存可互动图片，需要重新运行程序才能获得可互动结果。互动图中的图例是可点击，用于显示和隐藏对应的线。
- 此实验需要1小时左右时间运行（10个随机资产组合）。
- 此实验问题可分类为MINLP（Mixed-Integer Nonlinear Programming），使用传统的遗传算法可能无法很有效的直接求解此类问题。
- `ga_test/`目录下包含一些使用GA求解的问题，模拟了此类问题，可能提供一定思路。

In [None]:
import tool, ga_tool

import scipy, random
import pandas as pd
import numpy as np
from collections import defaultdict

%matplotlib widget
import matplotlib.pyplot as plt
from ipywidgets import interact, fixed, IntSlider, IntText

import warnings
warnings.filterwarnings('ignore')

In [None]:
"""
Data import
"""
asset_index = pd.read_csv("data/aidx_eod_prices.csv")

# data sorting/longer than 800 days
grouped_asset = asset_index.groupby("S_IRDCODE")
asset_dfs = {ird_code: group for ird_code, group in grouped_asset if len(group) >= 800}
for ird_code, grouped_df in asset_dfs.items():
    grouped_df['TRADE_DT'] = pd.to_datetime(grouped_df['TRADE_DT'], format='%Y%m%d')
    grouped_df.sort_values(by='TRADE_DT', inplace=True)

#### 参数输入

下方代码包含三类可调参数：

- 模型所需参数：目标回报，无风险率
- 实验运行设置：随机次数，资产组合数量限制，组合内资产相关性限制
- 实验测试对象：调仓频率，待测试模型

In [None]:
"""
Parameters
"""

BACKTEST_DAY = 30 # lookback period (not used)
TARGET_RETURN = 0.0 # target return
RISK_FREE_RATE = 0.02 # risk-free rate

NUM_ITERATION = 10 # test amounts
NUM_LIMIT = (5, 10) # assets amount limitation range
CORR_LIMIT = 0.5 # assets' correlation limiation

REBALANCE_DAYS = [210] # rebalancing days test
# MODEL_TYPES = ['RB-SLSQP', 'RB-GA'] # GA as RB problem solver
MODEL_TYPES = ['RB-SLSQP', 'RB-GA-Max-Ret', 'RB-GA-Min-Vol'] # GA and RB

#### 遗传算法和风险预算模型结合逻辑：

在实验过程中的每一次循环，GA模型会以随机挑选出的资产组合作为资产池(5到10个资产)，以遗传算法和风险预算模型结合来优化选择和权重分配。在一次循环当中，基因长度为资产池数量。每一个因子0/1则代表资产的选择和不选择。根据基因可以知道资产信息，通过新的资产组合，以风险预算模型来计算出权重，并以此权重计算回报，波动，夏普。以此三目标，GA模型使用NSGA-II多目标算法以种群50迭代10进行演变，从而得出最优解集。在最优解集中选取最大回报和最小波动两个解，作为模型的预测权重。

此方法将问题中的两个变量拆解，将资产选择交给遗传算法，而权重分配则由风险预算模型去计算。一是避免遗传算法对此类问题难以直接求解的情况，二是依赖于风险预算模型本身的有效性，从而达到优化的效果。此逻辑也可拓展到池子数量较大的情况，但受限于遗传算法的效率，需要更多的研究。

具体实现请参考[ga_tool.py](ga_tool.py)。

In [None]:
"""
Model rebalancing function
"""

def rebalance(asset_index, rebalance_day, weight_constraints, model_type):
    predicts = []
    actuals = []
    realities = []
    
    for i in range(rebalance_day, len(asset_index), rebalance_day):
        
        if i+rebalance_day >= len(asset_index):
            break
        
        historical_data = asset_index[i-rebalance_day:i]
        future_data = asset_index[i:i+rebalance_day]
        
        if 'RB-GA-' in model_type:
            gamodel = ga_tool.GAModel(historical_data, future_data, model_type)
            ga_result = gamodel.main()
            
            predict_results, actual_results, weight_results, select_results = ga_result
            predict, actual = ga_tool.evaluate(predict_results, actual_results, model_type)
        else:
            predict, actual, _ = tool.evaluate(historical_data, future_data, weight_constraints, model_type, TARGET_RETURN, RISK_FREE_RATE)
        
        predicts.append(predict)
        actuals.append(actual)
        
        # equally weighed
        reality = tool.check([1 / len(asset_index.columns) for _ in range(len(asset_index.columns))], future_data, RISK_FREE_RATE)
        realities.append(reality)
    
    return predicts, actuals, realities



In [None]:
"""
Asset sampling
"""

def sample(num_limit, asset_dfs, corr_limit):
    index_list = random.sample(list(asset_dfs.keys()), num_limit)
    
    def is_non_related(index_list):
        for i in range(0, len(index_list)):
            for j in range(i+1, len(index_list)):
                i_df = asset_dfs[index_list[i]]
                j_df = asset_dfs[index_list[j]]
                min_length = min(len(i_df['PCHG']), len(j_df['PCHG']))
                corr, _ = scipy.stats.spearmanr(i_df['PCHG'].iloc[:min_length], j_df['PCHG'].iloc[:min_length])
                if corr > corr_limit:
                    return False
        return True
    
    while is_non_related(index_list) == False:
        index_list = random.sample(list(asset_dfs.keys()), num_limit)
    
    return index_list

# sample(20, asset_dfs, 0.5)

In [None]:
"""
Different Models with the same assets (randomly generated) and different rebalancing days
"""

def asset_rebalance(asset, num_limit, model_types, rebalancing_days, asset_dfs, corr_limit):
    
    asset_index = asset.copy()
    
    # randomly select assets
    actual_num_limit = np.random.randint(*num_limit)
    index_list = sample(actual_num_limit, asset_dfs, corr_limit)
    asset_index['TRADE_DT'] = pd.to_datetime(asset_index['TRADE_DT'], format='%Y%m%d')
    asset_index.sort_values(by='TRADE_DT', inplace=True)
    asset_index.set_index('TRADE_DT', inplace=True)
    asset_index = asset_index.pivot(columns='S_IRDCODE', values='CLOSE').ffill()[index_list].dropna()
    
    # weight constraints
    n = len(index_list)
    index_min_weight = [0 for _ in range(n)]
    index_max_weight = [1 for _ in range(n)]
    weight_constraints = list(zip(index_min_weight, index_max_weight))
    
    # start iteration
    results = {}
    for model_type in model_types:
        for rebalance_day in rebalancing_days:
            _, actuals, realities = rebalance(asset_index, rebalance_day, weight_constraints, model_type)
            results[(model_type, rebalance_day)] = list(zip(*actuals))
            results[('EW', rebalance_day)] = list(zip(*realities))
    
    return results, index_list

# results = asset_rebalance(asset_index, 5, MODEL_TYPES, [300, 600], asset_dfs, CORR_LIMIT)
# results = dict(sorted(results.items(), key=lambda item: item[0][1]))
# print(results)

In [None]:
"""
Calculate the average for one assets combination with each (model, rebalance_day)
"""

def calculate_averages(data, exclude_model='EW'):
    grouped_data = defaultdict(dict)

    # Group data by the second key of the tuple
    for (model, period), values in data.items():
        grouped_data[period][model] = values

    results = {}
    draws = defaultdict(dict)

    # Perform division and calculate averages
    for period, models in grouped_data.items():
        ew_values = models.get(exclude_model)
        if ew_values is None:
            continue  # Skip if 'EW' data is not present

        for model, values in models.items():
            if model != exclude_model:
                modified_values = []
                for index, (value, ew_value) in enumerate(zip(values, ew_values)):
                    if index == 0:  # For the first set, use subtraction (return)
                        result = [v - ew for v, ew in zip(value, ew_value)]
                    else:  # For the other sets, use division (volatility & sharpe)
                        result = [v / ew if ew != 0 else 0 for v, ew in zip(value, ew_value)]
                    modified_values.append(result)
                    
                averages = [sum(value) / len(value) for value in modified_values]
                results[(model, period)] = averages
                
                for i, value_set in enumerate(modified_values):
                    draws[i][(model, period)] = value_set

    return results, draws

# t = calculate_averages(results)
# print(t)

#### 实验开始

在实验中的每次循环下，实验会根据相关性限制和资产数量限制来随机挑选资产组合。在一次完整循环中，资产组合对于的所有（模型，调仓频率）预测情况将被计算，通过上方`asset_rebalance()`函数。

实验过程中会计算比较每个模型+调仓频率的表现（模型预测生成的权重vs平权），通过三个指标：

- 回报：模型预测权重（T之前）x 实际收益情况（T之后）- 平权 x 实际收益情况（T之后）
- 波动：模型预测权重对应的实际收益波动 / 平权对应的实际收益波动
- 夏普：模型预测权重对应的夏普率 / 平权对应的夏普率

注意事项：
- T代表调仓时刻，即模型预测发生时刻，T之前代表历史收益数据，T之后代表未来实际收益情况。
- 统计过程查看[tool.py](tool.py)中[check()](tool.py#L40)函数，以及上方`calculate_averages()`函数。
- 实验过程中模型预测失败情况下，则会用平权代替模型计算权重。

In [None]:
"""
Iteration start
"""

final_results = []
final_draws = []
index_lists = []
for i in range(0, NUM_ITERATION):
    results, index_list = asset_rebalance(asset_index, NUM_LIMIT, MODEL_TYPES, REBALANCE_DAYS, asset_dfs, CORR_LIMIT)
    results = dict(sorted(results.items(), key=lambda item: item[0][1]))
    results, draws = calculate_averages(results)
    final_results.append(results)
    final_draws.append(draws)
    index_lists.append(index_list)

print(final_results)

In [None]:
"""
Display a complete adjustment period under different (model, period) for a single 
"""

def display_draws(final_draws, index_lists, idx):
    print(index_lists[idx])
    draws = final_draws[idx]
    num_plots = len(draws)
    plot_width = max(6, num_plots * 4)  # Adjust width dynamically based on number of plots
    fig, axes = plt.subplots(1, num_plots, figsize=(plot_width, 4))
    titles = ['Return', 'Volatility', 'Sharpe Ratio']
    
    # If there's only one plot, make axes iterable
    if num_plots == 1:
        axes = [axes]
    
    line_groups = {}
    line_to_label_map = {}

    # Plotting
    for i, (ax, title) in enumerate(zip(axes, titles)):
        for (model, period), value_set in draws[i].items():
            line, = ax.plot(value_set)
            label = f'{model}, {period}'
            if label not in line_groups:
                line_groups[label] = []
            line_groups[label].append(line)
        ax.set_title(f'{title}')
    
    legend = fig.legend([list(group)[0] for group in line_groups.values()], line_groups.keys(), loc='lower center', bbox_to_anchor=(0.5, -0.01), ncol=8)

    # Update line_to_label_map
    for leg_line, text in zip(legend.get_lines(), legend.get_texts()):
        line_to_label_map[leg_line] = text.get_text()

    # Make legend clickable
    def on_legend_click(event):
        leg_line = event.artist
        label = line_to_label_map.get(leg_line)
        if label:
            lines = line_groups[label]
            visible = not lines[0].get_visible()  # Toggle based on the first line's visibility
            for line in lines:
                line.set_visible(visible)
            leg_line.set_alpha(1.0 if visible else 0.2)
        fig.canvas.draw()

    for leg_line in legend.get_lines():
        leg_line.set_picker(5)  # 5 pts tolerance
    
    fig.canvas.mpl_connect('pick_event', on_legend_click)

    plt.tight_layout()
    plt.subplots_adjust(bottom=0.2)
    plt.show()


interact(display_draws, 
         final_draws=fixed(final_draws), 
         index_lists=fixed(index_lists), 
         idx=IntText(value=0, description='Index:', min=0, max=len(final_draws)-1))

In [None]:
"""
Output
"""

def aggregate_results(dicts):
    aggregated_results = {}

    # Initialize aggregated_results with empty lists for each key
    for key in dicts[0].keys():
        aggregated_results[key] = []

    # Iterate over each dictionary
    for d in dicts:
        print(d)
        for key, values in d.items():
            # Assuming all dictionaries have the same structure
            for i, value in enumerate(values):
                if len(aggregated_results[key]) <= i:
                    aggregated_results[key].append([])
                aggregated_results[key][i].append(value)

    # Convert lists of values to tuples
    for key in aggregated_results:
        aggregated_results[key] = [tuple(lst) for lst in aggregated_results[key]]

    return aggregated_results

aggregated_results = aggregate_results(final_results)
print(aggregated_results)

In [None]:
"""
Visualisation
"""

def asset_display(data, i=0):
    line_styles = ['-', '--', ':']
    colors = plt.cm.viridis(np.linspace(0, 1, len(data)))

    line_style = line_styles[i]

    fig, ax = plt.subplots(figsize=(10, 6))
    lines = []

    for (key, lists), color in zip(data.items(), colors):
        if i < len(lists):
            lst = lists[i]
            l = np.array(lst)
            l = l[~np.isnan(l)]
            line, = ax.plot(l, line_style, color=color, label=f'{key}')
            lines.append(line)
    
    if i == 0:
        ax.axhline(y=0, color='red', linestyle=':')
    elif i == 1:
        ax.axhline(y=1, color='red', linestyle=':')
        
    plt.subplots_adjust(right=0.7)
    leg = ax.legend(fancybox=True, shadow=True, loc='center left', bbox_to_anchor=(1, 0.5))
    
    lined = {}
    for legline, origline in zip(leg.get_lines(), lines):
        legline.set_picker(5)
        lined[legline] = origline
        
    for legline, line in zip(leg.get_lines(), lines):
        legline.set_alpha(0.2)
        line.set_visible(False)

    avg_text_objects = {}
    
    def on_pick(event):
        legline = event.artist
        origline = lined[legline]
        visible = not origline.get_visible()
        origline.set_visible(visible)
        
        if visible:
            if origline not in avg_text_objects:
                display_position = (0.05, 0.95 - 0.05 * len(avg_text_objects))
                avg_value = np.nanmean(origline.get_ydata())
                avg_text_objects[origline] = ax.text(display_position[0], display_position[1], 
                                                     f'Avg {legline.get_label()}: {avg_value:.4f}', 
                                                     transform=ax.transAxes, color=origline.get_color(),
                                                     fontsize=9, verticalalignment='top')
            else:
                avg_text_objects[origline].set_visible(True)
        else:
            if origline in avg_text_objects:
                avg_text_objects[origline].set_visible(False)
                del avg_text_objects[origline]

        legline.set_alpha(1.0 if visible else 0.2)
        fig.canvas.draw()

    fig.canvas.mpl_connect('pick_event', on_pick)

    titles = {0 : 'Return', 1 : 'Volatility', 2 : 'Sharpe Ratio'}
    ax.set_title(titles.get(i, 'Q2 Line Plots'))
    ax.set_xlabel('Test Index')
    ax.set_ylabel('Value')

    plt.show()


interact(asset_display,
         data=fixed(aggregated_results),
         i=IntSlider(min=0, max=2, step=1, value=0))

### Q4 实验结果分析

#### 遗传算法结合风险预算模型

如下图所示，遗传算法在一定程度上能够优化风险预算模型，挑选出更优资产组合。GA模型中最小波动解相较于平权，展示出最好的回报情况，并且非常低的波动水平。此方法可能可以作为一种优化手段来加强各类预测模型的性能。但同时遗传算法存在一定问题，其进化方向取决于所设目标。在此实验中，目标为最大回报/夏普和最小波动，而这三目标计算都依靠历史回报数据，其合理性仍需更多研究。

<div style="text-align: left;">
    <img src="img/ga_rb_ret.png" style="width: 45%; margin-right: 5px;" />
    <img src="img/ga_rb_vol.png" style="width: 45%;" />
</div>

#### 遗传算法作为风险预算问题的求解器

如下图所示，遗传算法和SLSQP方法来求解风险预算对模型的影响不大，尽管遗传算法能够获得更优解但效率太低。所以不建议使用遗传算法作为风险预算问题的求解器。

<div style="text-align: left;">
    <img src="img/rb_ga_solver_ret.png" style="width: 45%; margin-right: 5px;" />
    <img src="img/rb_ga_solver_vol.png" style="width: 45%;" />
</div>
