In [13]:
import pandas as pd
import numpy as np
import warnings
warnings.filterwarnings('ignore')
import baostock as bs
import os
import datetime
import math
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from pandarallel import pandarallel
# all_path是raw文件夹中的所有文件名组成的列表
os.chdir('D:\\data\\raw\\')
all_path = os.listdir()
os.chdir('..')

pandarallel.initialize()



INFO: Pandarallel will run on 14 workers.
INFO: Pandarallel will use standard multiprocessing data transfer (pipe) to transfer data between the main process and workers.

https://nalepae.github.io/pandarallel/troubleshooting/


In [14]:
# 具体策略：布林带策略
def process(df):
    import pandas as pd
    # 止损线为85%
    loss = 0.85
    # 止盈线为130%
    target = 1.3
    # 布林带1.7倍标准差
    N = 1.7
    # 如果一支股票收盘价高于50元的总比例小于10%那么就对它运算，否则不考虑这支股票
    if ((df['close']>50).sum()/len(df) < 0.1):
        # 当天要有成交量,不能是ST的数据
        df = df[(df['amount']!=0)&(df['turn']!=0)&(df['isST']==0)].reset_index(drop=True)
        # 计算 股票收盘价的13 26 60 120天滑动平均
        for rolling_day in [13,26,60,120]:# mean代表均值,std是标准差,min是最小值,max是最大值
            df['MA'+str(rolling_day)] = df['close'].rolling(window=rolling_day).mean()
        # 布林带13天上线：MA13加上1.7倍的13天滑动标准差
        df['upper_13'] = df['MA13']+N*df['close'].rolling(window=13).std()
        # 其余同理
        df['down_13'] = df['MA13']-N*df['close'].rolling(window=13).std()      
        df['upper'] = df['MA26']+N*df['close'].rolling(window=26).std()
        df['down'] = df['MA26']-N*df['close'].rolling(window=26).std()
        df['upper_60'] = df['MA60']+N*df['close'].rolling(window=60).std()
        df['down_60'] = df['MA60']-N*df['close'].rolling(window=60).std()
        df['upper_120'] = df['MA120']+N*df['close'].rolling(window=120).std()
        df['down_120'] = df['MA120']-N*df['close'].rolling(window=120).std()
        # 收盘价到布林带上界的距离
        df['to_upper'] = df['upper']-df['close']
        # 收盘价到布林带下界的距离
        df['to_down'] = df['down']-df['close']
        df = df.dropna().reset_index(drop=True)
        #######################################################################
        # 以上就是布林带策略，下面的是我自己加的特征，为了提高模型效果#
        #######################################################################
        for day in [0,1,3,5,15,30]:
            # EWM的意思是计算指数平滑平均，shift的意思是用day天以前的值（为0就是当天的值）
            df['2/13ema'+str(day)] = df['close'].ewm(alpha=2/13,adjust=False,ignore_na=True).mean().shift(day)
            df['2/27ema'+str(day)] = df['close'].ewm(alpha=2/27,adjust=False,ignore_na=True).mean().shift(day)
            df['1/3ema'+str(day)] = df['close'].ewm(alpha=1/3,adjust=False,ignore_na=True).mean().shift(day)
            df['DIF'+str(day)] = df['2/13ema'+str(day)] - df['2/27ema'+str(day)]
            df['DEA'+str(day)] = df['DIF'+str(day)].ewm(alpha=1/5,adjust=False,ignore_na=True).mean()
            df['MACD'+str(day)] = 2*(df['DIF'+str(day)]-df['DEA'+str(day)])
        for day in [3,5,15,30,60,120,180]:
            df[str(day)+'close_min'] = df['low'].rolling(window=day,min_periods=1).min()
            df[str(day)+'close_min_rate'] = (df['close']-df[str(day)+'close_min'])/df[str(day)+'close_min']
            df[str(day)+'close_max'] = df['high'].rolling(window=day,min_periods=1).max()
            df[str(day)+'close_max_rate'] = (df[str(day)+'close_max']-df['close'])/df['close']
        for index in ['volume','amount','turn','to_down','to_upper']:
            for day in [3,5,15,30]:
                df[str(day)+index+'_mean'] = (df[index].rolling(window=day,min_periods=1).mean() - df[index]) / df[index]
                df[str(day)+index+'_min'] = (df[index].rolling(window=day,min_periods=1).min() - df[index]) / df[index]
                df[str(day)+index+'_max'] = (df[index].rolling(window=day,min_periods=1).max()-df[index])/df[index]
        for day in [1,2,3,4,5]:
            df['pctChg'+str(day)] = df['pctChg'].shift(day)
            df['to_down'+str(day)] = (df['to_down'].shift(day) - df['to_down']) / df['to_down']
            df['to_upper'+str(day)] = (df['to_upper'].shift(day) - df['to_upper']) / df['to_upper']
            df['close'+str(day)+'rate'] = (df['close'].shift(day) - df['close']) / df['close']
            df['open'+str(day)+'rate'] = (df['open'].shift(day) - df['open']) / df['open']
            df['high'+str(day)+'rate'] = (df['high'].shift(day) - df['high']) / df['high']
            df['low'+str(day)+'rate'] = (df['low'].shift(day) - df['low']) / df['low']
        df['to_down_before'] = df['to_down'].shift(1)
        df['once_profit'] = pd.Series()
        df['reason'] = pd.Series()
        df['sale_date'] = pd.Series()
       
        # 如果这一天穿过布林带下线了，那就进行运算，找到这个买入点对应的卖出点
        for i in set(df[(df['to_down_before']>0) & (df['to_down']<0) & (df['pctChg'] > 0)].index):
            # 买入价格记录为buy
            buy = df.loc[i,'close']
            # 位置标记，保证位置能对齐，能找到回家的路
            j = i+1
            # 对于这个买入点之后的每一天都进行运算，看是不是满足卖出策略，如果满足，就记录对应的卖出价格，并停止循环（break）
            while j <= len(df)-2:
                j += 1
                if df.loc[j,'close'] <= df.loc[i,'close'] * loss:
                    # 记录卖出原因
                    df.loc[i,'reason'] = '截面止损'
                    # 记录套牢的时间
                    df.loc[i,'sale_date'] = (j-i)
                    # 记录单次的盈利率
                    df.loc[i,'once_profit'] = round((df.loc[j,'close']-df.loc[i,'close'])/df.loc[i,'close'],4)
                    # 因为已经卖出，所以不用继续考虑后面的卖出点
                    break
                if df.loc[j-1,'close'] >= df.loc[i,'close'] * target and df.loc[j,'close'] <= df.loc[j-1,'close']:
                    df.loc[i,'reason'] = '截面止盈'
                    df.loc[i,'sale_date'] = (j-i)
                    df.loc[i,'once_profit'] = round((df.loc[j,'close']-df.loc[i,'close'])/df.loc[i,'close'],4)
                    break
                if df.loc[j,'to_upper'] < 0:
                    df.loc[i,'reason'] = '布林带上界止盈'
                    df.loc[i,'sale_date'] = (j-i)
                    df.loc[i,'once_profit'] = round((df.loc[j,'close']-df.loc[i,'close'])/df.loc[i,'close'],4)
                    break
    return df
def Bolinge(start,end):
    df_ = pd.DataFrame()
    for code in all_path[start:end]:
        df = pd.read_csv('D:\\data\\raw\\'+code,parse_dates=['date'],usecols=['date','code','open','high','low','close','volume','amount','turn','pctChg','peTTM','pbMRQ','psTTM','pcfNcfTTM','isST'])
        df_ = df_.append(df)
    return df_

In [15]:
# 启用八个进程来运算这个策略，提高效率
num = 8
with ThreadPoolExecutor(max_workers=num) as executor:
    length = math.ceil(len(all_path)/num)
    for i in range(num):
        exec(f'task{i+1} = executor.submit(Bolinge, *(length*{i},length*{i+1}))')
    for i in range(num):
        exec(f'df{i+1} = task{i+1}.result()')

In [16]:
for i in range(num):
    exec(f"df{i+1} = df{i+1}.groupby('code').parallel_apply(process).dropna()")
df = pd.concat([df1,df2,df3,df4,df5,df6,df7,df8])
del df1,df2,df3,df4,df5,df6,df7,df8
del df['isST']

In [17]:
df = df.reset_index(drop=True)
df['label'] = 0
df.loc[df['once_profit'] <= 0,'label'] = 0
df.loc[df['once_profit'] > 0,'label'] = 1
df = df[df['date']>'2010-01-03'].reset_index(drop=True)

"""每次买卖的记录存入mlinput，
date是买入日期，
code是股票代码，
once_profit是一次交易的盈亏情况，
reason是卖出原因，
sale_date是持有天数，
label为1表示挣钱，为0表示此次交易亏钱，
输出的csv中reason可能会有中文乱码，自己转一下格式。"""

df.to_csv('D:\\data\\mlinput.csv',index=False)


In [18]:
print('胜率为:',df['label'].mean())
print('收益率为'+str(df['once_profit'].sum()))
print('买入次数为'+str(len(df)))
df_ = df[df['reason']!=0][['code','date','sale_date','reason','once_profit']].dropna()
df_['date']  = pd.to_datetime(df_['date'],format="%Y-%m-%d")
for year,group in df.groupby(df['date'].dt.year)['once_profit']:
    print(year,group.sum())

胜率为: 0.7527259055019426
收益率为372.6502
买入次数为7979
2022 372.6502
