In [1]:
import sys
import os 
module_path = os.path.abspath(os.path.join('..')) 
if module_path not in sys.path: 
    sys.path.append(module_path)

import pandas as pd  
import numpy as np  

import tools.Sample_Tools as smpl
import tools.Pretreat_Tools as pretreat
from tools.Cacher import (CACHE_TYPE, save_cache,load_cache_adv,load_cache)

from base.JuUnits import parallal_task,task_chunk_split
from base.JuUnits import excute_for_multidates


from QUANTAXIS.QAUtil import DATABASE
from QUANTAXIS.QAUtil import  trade_date_sse
from QUANTAXIS.QAUtil.QADate_trade import (
    QA_util_get_pre_trade_date,
    QA_util_get_next_trade_date,
    QA_util_if_tradetime
)

import inspect
import ind.Alpha101 as toys

from sklearn import linear_model

%load_ext autoreload
%autoreload 2
%aimport tools.Cacher

# def assemble_stocks_by_codes(codes):
#     files = list(map(lambda x:x+'_train_qfq',l))
#     return pd.concat(list(map(lambda file:load_cache(file,cache_type=CACHE_TYPE.STOCK),files))).sort_index(level=0)

def pretreate_data(data):
    returns = smpl.get_current_return(data,'close')
    returns.name = 'returns'
    ret_forward = smpl.get_forward_return(data,'close')
    ret_forward.name = 'ret_forward'
    # {'Open', 'cap', 'close', 'high', 'ind', 'low', 'returns', 'volume', 'vwap'}
    data = pd.concat([data, returns, ret_forward], axis=1)
    data = data.assign(vwap=data.amount/(data.volume*100))
    data.rename(columns = {"open":"Open",'market_value':'cap','industry':'ind'}, inplace=True)
    data['cap']=data['cap']/data['close'] # 数据取出来的是市值


    close_ind = pretreat.neutralize(data.close, data['ind'],categorical=['ind'])
    close_ind.name = 'close_ind'
    vwap_ind = pretreat.neutralize(data.vwap, data['ind'],categorical=['ind'])
    vwap_ind.name = 'vwap_ind'
    high_ind = pretreat.neutralize(data.high, data['ind'],categorical=['ind'])
    high_ind.name = 'high_ind'
    low_ind = pretreat.neutralize(data.low, data['ind'],categorical=['ind'])
    low_ind.name = 'low_ind'
    volume_ind = pretreat.neutralize(data.volume, data['ind'],categorical=['ind'])
    volume_ind.name = 'volume_ind'

    adv20 = excute_for_multidates(data.volume, lambda x:x.rolling(20).agg('mean'), level=1)
    adv20 = pd.concat([adv20,data['ind']],axis=1).dropna()
    adv20_ind = pretreat.neutralize(adv20.volume, adv20['ind'],categorical=['ind'])
    adv20_ind.name = 'adv20_ind'

    adv40 = excute_for_multidates(data.volume, lambda x:x.rolling(40).agg('mean'), level=1)
    adv40 = pd.concat([adv40, data['ind']],axis=1).dropna()
    adv40_ind = pretreat.neutralize(adv40.volume, adv40['ind'],categorical=['ind'])
    adv40_ind.name = 'adv40_ind'

    adv81 = excute_for_multidates(data.volume, lambda x:x.rolling(81).agg('mean'), level=1)
    adv81 = pd.concat([adv81, data['ind']],axis=1).dropna()
    adv81_ind = pretreat.neutralize(adv81.volume, adv81['ind'],categorical=['ind'])
    adv81_ind.name = 'adv81_ind'

    co_mixed = ((data.close * 0.60733) + (data.Open * (1 - 0.60733)))
    co_mixed_ind = pretreat.neutralize(co_mixed, data['ind'],categorical=['ind'])
    co_mixed_ind.name = 'co_mixed_ind'

    oh_mixed = ((data.Open * 0.868128) + (data.high * (1 - 0.868128)))
    oh_mixed_ind = pretreat.neutralize(oh_mixed, data['ind'],categorical=['ind'])
    oh_mixed_ind.name = 'oh_mixed_ind'

    lv_mixed = ((data.low * 0.721001) + (data.vwap * (1 - 0.721001)))
    lv_mixed_ind = pretreat.neutralize(lv_mixed, data['ind'],categorical=['ind'])
    lv_mixed_ind.name = 'lv_mixed_ind'

    return pd.concat([data, close_ind, vwap_ind, low_ind, high_ind, volume_ind, adv20_ind, adv40_ind, adv81_ind, co_mixed_ind, oh_mixed_ind, lv_mixed_ind], axis=1)





In [2]:
# l = smpl.get_codes_from_blockname('沪深300', sse='all')
# df_all =  assemble_stocks_by_codes(l) #文件已合并，不再适用
# df_treated = pretreate_data(df_all)

# #读取已经缓存的复权数据，并预处理
tail = False
if tail:
    df_all = load_cache('all_tail_qfq',cache_type=CACHE_TYPE.STOCK).sort_index()
else:
    df_all = load_cache('all_train_qfq',cache_type=CACHE_TYPE.STOCK).sort_index()
smpl.optimize_data_type(df_all)
df_treated = pretreate_data(df_all)

In [4]:
# ids = [11,24,38,41,42,47,57,69,80,82,83,88,93,97]
# ids = [11,24]

ids = np.arange(1,101)
np.random.shuffle(ids)
def generate_alpha_factors(fun_ids, df=None, type_tail=False):
    import inspect
    import ind.Alpha101 as a101
    from tools.Cacher import (CACHE_TYPE, save_cache,load_cache_adv,load_cache)
    from base.JuUnits import excute_for_multidates

    for i in fun_ids:
        fun_name = 'alpha'+str(i)
        params = inspect.signature(getattr(a101, fun_name)).parameters.keys()
        indx = excute_for_multidates(df, lambda x: getattr(a101, fun_name)(*[x[param].copy() for param in params]) ,level=1)
        indx.name = fun_name
        if type_tail:
            save_cache('{}_tail'.format(fun_name), indx, cache_type=CACHE_TYPE.FACTOR)
        else:
            save_cache('{}_train'.format(fun_name), indx, cache_type=CACHE_TYPE.FACTOR)

worker=6
task = task_chunk_split(ids, worker)
results = parallal_task(worker, generate_alpha_factors, task, df=df_treated, type_tail=tail)


Now in the main code. Process name is: base.JuUnits
base.JuUnits, subpid:18900  pid:3432


  0%|          | 0/6 [00:00<?, ?it/s]