In [1]:
import random as rd
from math import exp, cos, pi
from copy import deepcopy
import numpy as np
import pandas as pd

## 基于飞蛾扑火优化算法的股票组合权重优化

导入数据

In [2]:
# 指数权重前20的成分股近3年每天的数据
df=pd.read_excel("df_all_top_20.xlsx",index_col=0)
# 指数权重前20的成分股近3年收盘时序数据
df1=pd.pivot_table(df,index=["trade_date","ts_code"],values=["pct_chg"])
df2=pd.pivot_table(df,index=["trade_date","ts_code"],values=["close","pre_close"])
# 沪深300指数基本数据
df300=pd.read_excel("df_300_w.xlsx",index_col=0)
df20=df300.loc[:19,:] # 指数权重前20的成分股基本数据
# 沪深300指数近3年每天的数据
df300_daily=pd.read_excel('沪深300指数近3年.xlsx')

ts_code_20=df20['ts_code'].tolist()
close300=df300_daily[['日期Date','收盘Close']]
index2codes=ts_code_20
closei=df2

  warn("Workbook contains no default style, apply openpyxl's default")


定义mfo

In [14]:
# 基于飞蛾扑火优化算法的股票组合权重优化
class MFO:
    def __init__(self,n,iter,dim,ub,lb,chgi,chg300,index2codes,p,choose_obj):
        self.choose_obj=choose_obj # 选择什么目标函数
        self.pop=n # 种群个数
        # self.func=func
        self.iter=iter
        self.dim=dim # 个体维度
        self.ub=ub
        self.lb=lb
        self.chgi=chgi # 成分股当日涨跌幅, df透视表类型，index=[日期，股票代码]，values=[chg]
        self.chg300=chg300 # 沪深300指数当日涨跌幅,df类型[日期，chg]
        self.sigma=1 # 惩罚参数
        self.p=p # 惩罚因子
        self.index2codes=index2codes # 20支股票代码的列表

        # 算法参数
        self.b=1 # 对数螺旋形状常数
        self.N = 10; # 火焰的最大数量

    def ini_population(self):
        # 初始化种群
        # 都是np.array类型
        population=np.empty((self.pop,self.dim))
        for i in range(self.pop):
            moth=np.empty((self.dim,),dtype=list)
            # sum_i=0
            for j in range(self.dim):
                moth[j]=rd.uniform(0,0.1) # 随机
                # sum_i+=moth[j]
            population[i,:]=moth
        return population

    def getFitness(self,moths):
        # fitness = np.empty((len(moths),),dtype=list)
        # for i in range(len(moths)):
        if self.choose_obj==1:
            fitness=self.objFunction(moths)
        else:
            fitness=self.objFunction2(moths)
        # print('第',i,'个已完成')
        # fitness=np.array(fitness)
        return fitness

    def objFunction(self,moth):
        # 最小化涨跌幅绝对误差之和
        days=len(self.chg300)
        abs_error=0
        for i in range(days):
            # 计算20支股票组合的当日涨跌幅
            chg_i=0
            date=self.chg300.loc[i,'日期Date']
            str_query='trade_date == ['+ str(date)+']'
            df_i=self.chgi.query(str_query)
            df_i=df_i.reset_index()
            for j in range(len(moth)):
                # 根据索引找到股票代码，然后找到当日chg
                item=df_i[df_i['ts_code']==self.index2codes[j]]
                if not item.empty:
                    chg_i_j=np.sum(item['pct_chg']/100)
                else:
                    # 这只股票当天没数据的话就等于0
                    chg_i_j=0
                chg_i+=chg_i_j*moth[j]
            # 找到当日沪深300的涨跌幅
            chg300_i=self.chg300.loc[i,'涨跌幅(%)Change(%)']/100
            # 当日涨跌幅绝对误差(单位：1)
            abs_error_i=np.abs(chg300_i-chg_i)

            # 求和
            abs_error+=abs_error_i

            # print('days=',date)

        # 加惩罚项：moth求和=1
        P=0.5*self.sigma*np.square(np.sum(moth)-1)

        objFunctionValue=abs_error+P

        return objFunctionValue

    def objFunction2(self,x):
        # 最小化跟踪误差
        days=len(close300)
        track_error=0
        earning_rate=[]
        for i in range(days):
            if i==0:
                continue
            # 计算20支股票组合的当日收益率：(当日收盘价 - 前一日收盘价) / 前一日收盘价
            earning_rate_i=0
            date=close300.loc[i,'日期Date']
            str_query='trade_date == ['+ str(date)+']'
            df_i=closei.query(str_query)
            df_i=df_i.reset_index()
            for j in range(len(x)):
                # 根据索引找到股票代码，然后计算当日收益率
                item=df_i[df_i['ts_code']==index2codes[j]]
                if not item.empty:
                    earning_rate_i_j=np.sum((item['close']-item['pre_close'])/item['pre_close'])
                else:
                    # 这只股票当天没数据的话就等于0
                    earning_rate_i_j=0
                earning_rate_i+=earning_rate_i_j*x[j]
            # 找到当日沪深300的收益率
            earning_rate300_i=(close300.loc[i,'收盘Close']-close300.loc[i-1,'收盘Close'])/close300.loc[i-1,'收盘Close']
            # 当日跟踪误差误差(单位：1)
            earning_rate.append(earning_rate_i-earning_rate300_i)
        # 求标准差
        track_error=np.std(earning_rate)        
        # 加惩罚项：moth求和=1
        P=0.5*self.sigma*np.square(np.sum(x)-1)
        objFunctionValue=track_error+P
        return objFunctionValue

    def check(self,x):
        if x < 0:
            return 0
        elif x > 1:
            return 1
        else:
            return x

    def run(self):
        n=self.pop # 种群个数
        dim=self.dim
        b=self.b
        N=self.N
        iterx, max_iter = 0, self.iter

        X0=self.ini_population() # 初始种群位置

        fit = np.empty((n,),dtype=list)
        for i in range(n):
            fit[i]=self.getFitness(X0[i,:]) # 初始每个个体的适应值
            # print('第',i,'个已完成')

        Moth_pos = X0 # 记录更新后的飞蛾位置
        curve = np.empty((max_iter,),dtype=list) # 记录每一代的最佳适应度
        best_pos = np.zeros((max_iter,dim)) # 记录每一代飞蛾能达到的最佳位置

        while iterx < max_iter:
            flame_no=rd.randint(1,int(N-iterx*(N-1)/max_iter)) # 计算本次迭代的火焰数量

            if iterx == 0:
                index = np.argsort(fit)
                fit= fit[index]; # 求最小值按照ascend的排序
                flame_pos = np.zeros((flame_no,dim))
                flame_fit = np.empty((flame_no,),dtype=list)
                for i in range(flame_no):
                    flame_pos[i,:]=X0[index[i],:] # 记录第一代火焰位置
                    flame_fit[i] = self.getFitness(X0[index[i],:])
            
            # 更新飞蛾位置
            for i in range(n):
                for j in range(dim):
                    a=-1+iterx*((-1)/max_iter)
                    t = rd.uniform(a, 1)
                    if i <flame_no:
                        distance=abs(flame_pos[i][j] - Moth_pos[i][j])
                        Moth_pos[i][j]=distance * exp(b * t) * cos(2 * pi * t) + flame_pos[i][j]
                        Moth_pos[i][j]=self.check(Moth_pos[i][j])
                    else:
                        distance=abs(flame_pos[0][j] - Moth_pos[i][j])
                        Moth_pos[i][j]=distance * exp(b * t) * cos(2 * pi * t) + flame_pos[0][j]
                        Moth_pos[i][j]=self.check(Moth_pos[i][j])
                # 更新飞蛾适应度
                fit[i]=self.getFitness(Moth_pos[i,:])
            
            # 将更新后的飞蛾位置与火焰位置的适应度重新排序
            double_pop=np.vstack((Moth_pos,flame_pos))
            double_fitness=np.hstack((fit,flame_fit))
            # 排序
            double_index = np.argsort(double_fitness)
            double_fitness_sorted=double_fitness[double_index]
            double_sorted_pop= double_pop[double_index,:]
            # 更新下一代火焰位置
            flame_fit=double_fitness_sorted[:N]
            flame_pos = double_sorted_pop[:N,:]

            curve[iterx] = flame_fit[0] # 记录当代最佳适应度
            best_pos[iterx,:] =flame_pos[0,:] # 记录当代最佳位置
            best_score = curve[-1]# 记录最佳适应值

            print("迭代第",iterx,"次，best_score=",curve[iterx] )

            iterx+=1
            # 更新惩罚参数
            self.sigma=self.p*self.sigma

        return best_pos,best_score,curve

### 1. 最小化涨跌幅绝对误差之和

In [6]:
# 方法：飞蛾扑火优化算法
lb=np.array([0]*20) # 下界
ub=np.array([1]*20) # 上界
max_i=10
dim=20
pop=50
chgi=df1
chg300=df300_daily[['日期Date','涨跌幅(%)Change(%)']]
p=10 # 惩罚因子
index2codes=ts_code_20

mymfo=MFO(pop,max_i,dim,ub,lb,chgi,chg300,index2codes,p,1)

best_pos,best_score,curve=mymfo.run()

print('best_pos:',best_pos)
print('best_score:',best_score)
print('curve:',curve)

迭代第 0 次，best_score= 2.272675092485652
迭代第 1 次，best_score= 2.2089247500433142
迭代第 2 次，best_score= 2.2007322185104248
迭代第 3 次，best_score= 2.2007322185104248
迭代第 4 次，best_score= 2.2007322185104248
迭代第 5 次，best_score= 2.2007322185104248
迭代第 6 次，best_score= 2.2007322185104248
迭代第 7 次，best_score= 2.2007322185104248
迭代第 8 次，best_score= 2.2007322185104248
迭代第 9 次，best_score= 2.2007322185104248
best_pos: [[0.05926205 0.13512036 0.06878236 0.1031403  0.06799468 0.14960229
  0.03727958 0.06869446 0.10841577 0.0631211  0.06593863 0.01358416
  0.02646112 0.         0.03227461 0.         0.         0.05354724
  0.04738318 0.01454129]
 [0.06216798 0.12957148 0.07519792 0.02380877 0.04605627 0.13981376
  0.02937324 0.11003972 0.03203279 0.06145386 0.07813051 0.01322412
  0.02520018 0.         0.0419789  0.         0.         0.11664168
  0.04805863 0.00924589]
 [0.06167974 0.13508307 0.07584419 0.00526664 0.05117638 0.14300095
  0.03125425 0.10783151 0.00564901 0.04431741 0.08270373 0.01985895
  0.042

评价指标

In [7]:
def get_total_return(x):
    # 计算收益率
    days=len(close300)
    total_return=0
    daily_returns=[]
    track_error=0
    earning_rate=[]
    for i in range(days):
        if i==0:
            continue
        # 计算20支股票组合的当日收益率：(当日收盘价 - 前一日收盘价) / 前一日收盘价
        earning_rate_i=0
        date=close300.loc[i,'日期Date']
        str_query='trade_date == ['+ str(date)+']'
        df_i=closei.query(str_query)
        df_i=df_i.reset_index()
        for j in range(len(x)):
            # 根据索引找到股票代码，然后计算当日收益率
            item=df_i[df_i['ts_code']==index2codes[j]]
            if not item.empty:
                earning_rate_i_j=np.sum((item['close']-item['pre_close'])/item['pre_close'])
            else:
                # 这只股票当天没数据的话就等于0
                earning_rate_i_j=0
            earning_rate_i+=earning_rate_i_j*x[j]

        # 找到当日沪深300的收益率
        earning_rate300_i=(close300.loc[i,'收盘Close']-close300.loc[i-1,'收盘Close'])/close300.loc[i-1,'收盘Close']
        # 当日跟踪误差误差(单位：1)
        earning_rate.append(earning_rate_i-earning_rate300_i)
        total_return+=earning_rate_i
        daily_returns.append(earning_rate_i)
    daily_returns=np.array(daily_returns)
    # 求标准差
    track_error=np.std(earning_rate)
    return total_return,daily_returns,track_error

In [9]:
def get_index(x):
    eva_index={}
    # 该组合跟踪误差
    total_return,daily_returns,track_error=get_total_return(x)
    eva_index['跟踪误差']=track_error
    
    # 年化收益率
    n=3 # 取的是近3年的数据
    annualized_return = (1 + total_return) ** (1 / n) - 1
    eva_index['年化收益率']=annualized_return
    
    # 年化波动率
    # 计算日波动率（日收益率的标准差）
    daily_volatility=np.std(daily_returns, ddof=1)
    # 将日波动率年化
    annualized_volatility=daily_volatility * np.sqrt(252)
    eva_index['年化波动率']=annualized_volatility
    
    # 夏普比率=（策略年化收益率 - 无风险年化收益率 ） / 策略年化波动率
    R_p = annualized_return  # 投资组合年化回报率
    R_f = 0.0125  # 无风险利率：3年期国债收益率
    sigma_p = annualized_volatility  # 投资组合的年化波动率
    sharpe_ratio = (R_p - R_f) / sigma_p
    eva_index['夏普比率']=sharpe_ratio
    return eva_index

In [10]:
x=best_pos[-1]
eva_index=get_index(x)

In [11]:
eva_index

{'跟踪误差': 0.003973507353727158,
 '年化收益率': 0.065654417917276,
 '年化波动率': 0.1715380885782075,
 '夏普比率': 0.30986947772268014}

In [12]:
sum(x)

0.9743971002578969

### 2. 最小化跟踪误差

In [15]:
mymfo=MFO(pop,max_i,dim,ub,lb,chgi,chg300,index2codes,p,2)

best_pos2,best_score2,curve2=mymfo.run()

print('best_pos:',best_pos2)
print('best_score:',best_score2)
print('curve:',curve2)

迭代第 0 次，best_score= 0.004486362561372005
迭代第 1 次，best_score= 0.004486362561372005
迭代第 2 次，best_score= 0.004486362561372005
迭代第 3 次，best_score= 0.004486362561372005
迭代第 4 次，best_score= 0.004486362561372005
迭代第 5 次，best_score= 0.004486362561372005
迭代第 6 次，best_score= 0.004486362561372005
迭代第 7 次，best_score= 0.004486362561372005
迭代第 8 次，best_score= 0.004486362561372005
迭代第 9 次，best_score= 0.004486362561372005
best_pos: [[0.00246808 0.09610023 0.08574116 0.06107258 0.06212572 0.01112134
  0.0817503  0.07705903 0.13927416 0.06998187 0.03694651 0.
  0.07654787 0.02045957 0.         0.0703855  0.         0.09456023
  0.         0.01671482]
 [0.00246808 0.09610023 0.08574116 0.06107258 0.06212572 0.01112134
  0.0817503  0.07705903 0.13927416 0.06998187 0.03694651 0.
  0.07654787 0.02045957 0.         0.0703855  0.         0.09456023
  0.         0.01671482]
 [0.00246808 0.09610023 0.08574116 0.06107258 0.06212572 0.01112134
  0.0817503  0.07705903 0.13927416 0.06998187 0.03694651 0.
  0.076547

评价指标

In [16]:
x=best_pos2[-1]
eva_index=get_index(x)

In [17]:
eva_index

{'跟踪误差': 0.00448369694341344,
 '年化收益率': 0.05395312450361889,
 '年化波动率': 0.15553480168185776,
 '夏普比率': 0.2665199303009376}

In [18]:
sum(x)

1.0023089469281752

总结：由于有等式约束，惩罚因子设置影响优化结果，因此采用随机优化算法可能不太合适