In [7]:
import pandas as pd
import numpy as np
import _pickle as cPickle
import gzip
from collections import OrderedDict

In [8]:
from stock_helper import *
from stats import *
from imp import reload   
import stock_helper
import stats
reload(stock_helper)
reload(stats)

<module 'stats' from 'F:\\BigData\\my_code\\stats.py'>

In [9]:
# 警告过滤
import warnings
warnings.filterwarnings('ignore')

In [10]:
import os
CORE_NUM = int(os.environ['NUMBER_OF_PROCESSORS'])
CORE_NUM

8

In [11]:
# 函数主要用于实现并行计算，加速python算法
import functools
import dask
from dask import compute, delayed
def parLapply(CORE_NUM, iterable, func, *args, **kwargs): # 函数名（CPU核心数目，文件列表，函数，参数1，参数2...，对象1=参数a，对象2=参数b...）
    with dask.config.set(scheduler='processes', num_workers=CORE_NUM):
        f_par = functools.partial(func, *args, **kwargs)
        result = compute([delayed(f_par)(item) for item in iterable])[0]
    return result

In [12]:
# 加载数据函数load
def load(path):
    with gzip.open(path, 'rb', compresslevel=1) as file_object:
        raw_data = file_object.read()
    return cPickle.loads(raw_data)

In [13]:
# 保存数据函数save
def save(data, path):
    serialized = cPickle.dumps(data) # 将data对象序列化
    with gzip.open(path, 'wb', compresslevel=1) as file_object: # 以写二进制的模式打开文件
        file_object.write(serialized) # 将序列化的对象写入文件

In [14]:
HEAD_PATH = 'F:/BigData/data'
DATA_PATH = HEAD_PATH + '/stock pkl/'
SAVE_PATH = 'F:/BigData/data'
product_list = ["600276", "600535"] # 防止运行时间过长，只挑选两个股票品种

In [22]:
# 该函数用于除0引起的异常 当分母为0的时候zero_divide也是0
def zero_divide(x, y): 
    with warnings.catch_warnings():
        warnings.simplefilter("ignore")
        res = np.divide(x,y)
    if hasattr(y, "__len__"):
        res[y == 0] = 0 
    elif y == 0:
        res = 0
        
    return res

In [23]:
# 计算指数移动平均
def ewma(x, halflife, init=0, adjust=False):
    init_s = pd.Series(data=init)
    s = init_s.append(x)
    if adjust:
        xx = range(len(x))
        lamb=1 - 0.5**(1 / halflife)
        aa=1-np.power(1-lamb, xx)*(1-lamb)
        bb=s.ewm(halflife=halflife, adjust=False).mean().iloc[1:]
        return bb/aa
    else:
        return s.ewm(halflife=halflife, adjust=False).mean().iloc[1:]

In [24]:
# #计算移动平均
def moving_average(a, n=3):
    ret_sum = np.cumsum(a, dtype=float) # 将张量按照轴向相加
    ret = a
    ret[n:] = (ret_sum[n:] - ret_sum[:-n])/n
    return ret

In [25]:
# 该函数的作用是在dataframe中增加了wpr这一列
def addWpr(date, product):
    data = load(DATA_PATH+"/"+product+"/"+date) # 将传入路径下的文件反序列化放到data对象中
    data["wpr"] = (data["bid1"]*data["ask1.qty"]+data["ask1"]*data["bid1.qty"])/(data["bid1.qty"]+data["ask1.qty"])
    outlier = (data["bid1"]<1e-6) | (data["ask1"]<1e-6) | (np.isnan(data["wpr"])) # numpy中 | 的意思就是 或 ，& 的意思就是 且
    data["wpr"][outlier] = data["price"][outlier]
    data["next.bid"] = data["bid1"].shift(-1)
    data["next.ask"] = data["ask1"].shift(-1)
    data["wpr"] = data["wpr"]*data["adjust"]
    data["wpr.ret"] = data["wpr"]-data["wpr"].shift(1)
    data["wpr.ret"][0] = 0
    data["ret"] = np.log(data["wpr"]) - np.log(data["wpr"]).shift(1)
    data["ret"][0] = 0
    save(data, DATA_PATH+"/"+product+"/"+date) # 数据处理之后，再将对象序列化到文件中

In [26]:
#lambda表达式
product = product_list[0]
all_dates = list(map(lambda x: x,os.listdir(DATA_PATH + product)))
all_dates

['2016.pkl', '2017.pkl', '2018.pkl', '2019.pkl', '2020.pkl']

In [27]:
%%time
for product in product_list:
    result = parLapply(CORE_NUM, all_dates, addWpr, product=product)
# 对指定路径下的文件都进行addWpr函数中的操作

CPU times: total: 31.2 ms
Wall time: 46.2 s


In [28]:
def add_min_max(file, period_list): # period_list = [1024, 2048, 4096]
    data = load(file)
    data = data.reset_index(drop=True)
    for period in period_list: # 这里一共添加了三个周期的最大最小值
        data["min."+str(period)] = data["wpr"].rolling(period).min() # numpy中的滑动窗口
        data.loc[:period-1, ("min."+str(period))] = data["wpr"][0] # 滑动窗口对第一个周期的数据无法赋值，所以让这些没有赋值的数据直接等于data["wpr"][0]
        
        data["max."+str(period)] = data["wpr"].rolling(period).max() # 这里处理逻辑和上面一样
        data.loc[:period-1, ("max."+str(period))] = data["wpr"][0]
    save(data, file) # 最后保存好添加完最大值和最小值的数据文件

In [29]:
%%time
for product in product_list:
    file_list = list(map(lambda x: DATA_PATH+product+"/"+x, all_dates))
    parLapply(CORE_NUM, file_list, add_min_max, period_list=[1024, 2048, 4096])

CPU times: total: 0 ns
Wall time: 48.4 s


## 以下操作是为了在数据中加入good这个column
- 由于很多代码错误沿用了期货量化的代码，各段代码中有较高的耦合性，导致后面代码用到good之后会频繁报错。
- good的含义，如果是最后一天的数据或者第一天的数据，good值为false，否则值为true。
- 事先加入good是为防止后面代码报错，加入good对后面代码的运行结果并无影响。

In [30]:
# 查看原始数据
data = load('F:/BigData/course/stock pkl/000538/2016.pkl')
data

Unnamed: 0,date.time,price,traded.num,turnover,qty,bid1,bid2,bid3,bid4,bid5,...,wpr.ret,ret,next.bid,next.ask,min.1024,max.1024,min.2048,max.2048,min.4096,max.4096
0,2016-01-04 09:25:00,72.69,27,676017.0,93,72.69,72.68,72.66,72.65,72.63,...,0.000000,0.000000,72.72,72.96,72.699951,72.699951,72.699951,72.699951,72.699951,72.699951
1,2016-01-04 09:30:01,72.70,7,109106.0,15,72.72,72.71,72.70,72.69,72.68,...,0.027841,0.000383,72.90,72.96,72.699951,72.699951,72.699951,72.699951,72.699951,72.699951
2,2016-01-04 09:30:05,72.90,1,14580.0,2,72.90,72.73,72.72,72.71,72.70,...,0.173392,0.002381,72.90,72.95,72.699951,72.699951,72.699951,72.699951,72.699951,72.699951
3,2016-01-04 09:30:09,72.96,1,14592.0,2,72.90,72.73,72.72,72.71,72.70,...,0.002566,0.000035,72.69,72.95,72.699951,72.699951,72.699951,72.699951,72.699951,72.699951
4,2016-01-04 09:30:12,72.69,37,1046946.0,144,72.69,72.68,72.66,72.65,72.63,...,-0.001850,-0.000025,72.69,72.95,72.699951,72.699951,72.699951,72.699951,72.699951,72.699951
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
328019,2016-12-30 14:54:51,76.15,1,22848.0,3,76.15,76.14,76.13,76.12,76.11,...,0.000000,0.000000,76.15,0.00,69.224898,76.150000,69.224898,76.150000,69.224898,76.150000
328020,2016-12-30 14:56:18,76.15,1,30448.0,4,76.15,76.14,76.13,76.12,76.11,...,0.000000,0.000000,76.15,0.00,69.224898,76.150000,69.224898,76.150000,69.224898,76.150000
328021,2016-12-30 14:56:36,76.15,1,15232.0,2,76.15,76.14,76.13,76.12,76.11,...,0.000000,0.000000,76.15,0.00,69.224898,76.150000,69.224898,76.150000,69.224898,76.150000
328022,2016-12-30 14:56:57,76.15,1,1637232.0,215,76.15,76.14,76.13,76.12,76.11,...,0.000000,0.000000,76.15,0.00,69.224898,76.150000,69.224898,76.150000,69.224898,76.150000


In [31]:
data['date.time'] = pd.to_datetime(data['date.time'])
data['date.time']

0        2016-01-04 09:25:00
1        2016-01-04 09:30:01
2        2016-01-04 09:30:05
3        2016-01-04 09:30:09
4        2016-01-04 09:30:12
                 ...        
328019   2016-12-30 14:54:51
328020   2016-12-30 14:56:18
328021   2016-12-30 14:56:36
328022   2016-12-30 14:56:57
328023   2016-12-30 15:00:03
Name: date.time, Length: 328024, dtype: datetime64[ns]

In [32]:
firstday = pd.to_datetime(data['date.time'][:1].values)
firstday = str(firstday)
firstday = firstday[16:26]+' 23:59:59'
firstday = pd.to_datetime(firstday)
data['firstday'] = firstday
data['firstday'] = pd.to_datetime(data['firstday'])
data['date.time'] > data['firstday']

0         False
1         False
2         False
3         False
4         False
          ...  
328019     True
328020     True
328021     True
328022     True
328023     True
Length: 328024, dtype: bool

In [33]:
lastday = pd.to_datetime(data['date.time'][-1:].values)
lastday = str(lastday)
lastday = lastday[16:26]
data['lastday'] = lastday
data['lastday'] = pd.to_datetime(data['lastday'])
data['date.time'] < data['lastday']

0          True
1          True
2          True
3          True
4          True
          ...  
328019    False
328020    False
328021    False
328022    False
328023    False
Length: 328024, dtype: bool

In [34]:
(data['date.time'] > firstday)&(data['date.time'] < lastday)

0         False
1         False
2         False
3         False
4         False
          ...  
328019    False
328020    False
328021    False
328022    False
328023    False
Name: date.time, Length: 328024, dtype: bool

In [35]:
sum((data['date.time'] > data['firstday'])&(data['date.time'] < data['lastday']))

325641

In [36]:
data['good'] = (data['date.time'] > firstday)&(data['date.time'] < lastday)

In [37]:
data

Unnamed: 0,date.time,price,traded.num,turnover,qty,bid1,bid2,bid3,bid4,bid5,...,next.ask,min.1024,max.1024,min.2048,max.2048,min.4096,max.4096,firstday,lastday,good
0,2016-01-04 09:25:00,72.69,27,676017.0,93,72.69,72.68,72.66,72.65,72.63,...,72.96,72.699951,72.699951,72.699951,72.699951,72.699951,72.699951,2016-01-04 23:59:59,2016-12-30,False
1,2016-01-04 09:30:01,72.70,7,109106.0,15,72.72,72.71,72.70,72.69,72.68,...,72.96,72.699951,72.699951,72.699951,72.699951,72.699951,72.699951,2016-01-04 23:59:59,2016-12-30,False
2,2016-01-04 09:30:05,72.90,1,14580.0,2,72.90,72.73,72.72,72.71,72.70,...,72.95,72.699951,72.699951,72.699951,72.699951,72.699951,72.699951,2016-01-04 23:59:59,2016-12-30,False
3,2016-01-04 09:30:09,72.96,1,14592.0,2,72.90,72.73,72.72,72.71,72.70,...,72.95,72.699951,72.699951,72.699951,72.699951,72.699951,72.699951,2016-01-04 23:59:59,2016-12-30,False
4,2016-01-04 09:30:12,72.69,37,1046946.0,144,72.69,72.68,72.66,72.65,72.63,...,72.95,72.699951,72.699951,72.699951,72.699951,72.699951,72.699951,2016-01-04 23:59:59,2016-12-30,False
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
328019,2016-12-30 14:54:51,76.15,1,22848.0,3,76.15,76.14,76.13,76.12,76.11,...,0.00,69.224898,76.150000,69.224898,76.150000,69.224898,76.150000,2016-01-04 23:59:59,2016-12-30,False
328020,2016-12-30 14:56:18,76.15,1,30448.0,4,76.15,76.14,76.13,76.12,76.11,...,0.00,69.224898,76.150000,69.224898,76.150000,69.224898,76.150000,2016-01-04 23:59:59,2016-12-30,False
328021,2016-12-30 14:56:36,76.15,1,15232.0,2,76.15,76.14,76.13,76.12,76.11,...,0.00,69.224898,76.150000,69.224898,76.150000,69.224898,76.150000,2016-01-04 23:59:59,2016-12-30,False
328022,2016-12-30 14:56:57,76.15,1,1637232.0,215,76.15,76.14,76.13,76.12,76.11,...,0.00,69.224898,76.150000,69.224898,76.150000,69.224898,76.150000,2016-01-04 23:59:59,2016-12-30,False


In [38]:
HEAD_PATH = 'F:/BigData/data'
DATA_PATH = HEAD_PATH + '/stock pkl/'

In [39]:
from datetime import datetime

In [40]:
# 每个文件加入good
for product in product_list:
    for date in all_dates:
        data = load(DATA_PATH+product+'/'+date)
        
        firstday = pd.to_datetime(data['date.time'][:1].values)
        firstday = str(firstday)
        firstday = firstday[16:26]+' 23:59:59'
        # firstday = datetime.strptime(firstday, '%Y-%m-%d %H:%M:%S')
        data['firstday'] = firstday
        data['firstday'] = pd.to_datetime(data['firstday'])
        
        lastday = pd.to_datetime(data['date.time'][-1:].values)
        lastday = str(lastday)
        lastday = lastday[16:26]+' 00:00:00' 
        # lastday = datetime.strptime(lastday, '%Y-%m-%d %H:%M:%S')
        data['lastday'] = lastday
        data['lastday'] = pd.to_datetime(data['lastday'])
        
        data['good'] = ((data['date.time'] > data['firstday'])&(data['date.time'] < data['lastday']))
        save(data, DATA_PATH+product+'/'+date)

In [41]:
def get_good(date, product, HEAD_PATH, SAVE_PATH):
    data = load(HEAD_PATH+'/stock pkl/'+product+'/'+date)
    good = data['good']
    save(good, SAVE_PATH+'/good pkl/'+product+'/'+date)

In [42]:
os.makedirs(SAVE_PATH+'/good pkl', exist_ok=True)
for product in product_list:
    os.makedirs(SAVE_PATH+'/good pkl/'+product, exist_ok=True)

In [43]:
%%time
for product in product_list:
    parLapply(CORE_NUM, all_dates, get_good, product=product, HEAD_PATH=HEAD_PATH, SAVE_PATH=SAVE_PATH)

CPU times: total: 0 ns
Wall time: 15.4 s


In [44]:
# 数据所有columns
data.columns

Index(['date.time', 'price', 'traded.num', 'turnover', 'qty', 'bid1', 'bid2',
       'bid3', 'bid4', 'bid5', 'ask1', 'ask2', 'ask3', 'ask4', 'ask5',
       'bid1.qty', 'bid2.qty', 'bid3.qty', 'bid4.qty', 'bid5.qty', 'ask1.qty',
       'ask2.qty', 'ask3.qty', 'ask4.qty', 'ask5.qty', 'adjust', 'wpr',
       'wpr.ret', 'ret', 'next.bid', 'next.ask', 'min.1024', 'max.1024',
       'min.2048', 'max.2048', 'min.4096', 'max.4096', 'firstday', 'lastday',
       'good'],
      dtype='object')

各列数据的实际含义：
- price：价格  
- trade_num：总成交单数  
- turnover：最新的成交额  
- qty：最新的成交量  
- bid：买一价  
- ask：卖一价  
- bid.qty：买一量  
- ask.qty：卖一量  
- ret：最新一笔行情的对数收益率  
- wpr：挂单量加权平均价  
- wpr.ret：wpr 的价格变化

In [45]:
type(data)

pandas.core.frame.DataFrame

In [46]:
# 生成文件目录
for product in product_list:
    os.makedirs(SAVE_PATH + "/tmp pkl/" + product, exist_ok=True)

## 生成因子 上述因子保存到原始文件中

In [1]:
def create_signal_path(signal_list, product, HEAD_PATH):
    keys = list(signal_list.params.keys())
    for cartesian in itertools.product(*signal_list.params.values()):
        signal_name = signal_list.factor_name
        for i in range(len(cartesian)):
            signal_name = signal_name.replace(keys[i], str(cartesian[i]))
        
        os.makedirs(HEAD_PATH+"/tmp pkl/"+product+"/"+signal_name, exist_ok=True)
        print(HEAD_PATH+"/tmp pkl/"+product+"/"+signal_name)

# str.period

In [48]:
# 计算波动幅度
def get_atr(file, product, period_list):
    data = load(file)
    for period in period_list:
        S = (data["max."+str(period)]-data["min."+str(period)])/data["wpr"] # 最大值减去最小值除以wpr就得到了波动幅度 但是计算得到的数据并没有和原始数据一起保存
        save(S, SAVE_PATH+"/tmp pkl/"+product+"/atr."+str(period)+"/" + file[-8:]) # 根据路径保存

In [49]:
# 为波动率因子创建文件夹
for product in product_list:
    for period in [1024,2048,4096]:
        os.makedirs(SAVE_PATH + "/tmp pkl/" + product + "/atr." + str(period), exist_ok=True)

In [50]:
%%time
for product in product_list:
    file_list = list(map(lambda x: DATA_PATH+product+"/"+x, os.listdir(DATA_PATH + product)))
    parLapply(CORE_NUM, file_list, get_atr, product=product,period_list=np.power(2, range(10,13)))  # 传入函数发生变化

CPU times: total: 0 ns
Wall time: 21.8 s


# nr.period

In [16]:
# 构造的第一个因子
class foctor_nr_period(factor_template):
    factor_name = "nr.period"
    params = OrderedDict([
        ("period", np.power(2, range(10,13))) # 有序字典：{"period": [1024 2048 4096]} 
    ])
    def formula(self, data, period): # data["ret"]的指数移动平均除以data["ret"]绝对值的指数移动平均
        return zero_divide(ewma(data["ret"], period, adjust=True), ewma(data["ret"].abs(), period, adjust=True)).values 
    # adjust=True 会对结果进行调整，防止开头数据异常 会增加一定的计算量

In [17]:
# 实例化类
x3 = foctor_nr_period()

In [18]:
# 该函数在stock_helper中 函数作用是创建名称为nr.xxxx文件夹 文件夹创建好会输出到下方
for product in product_list:
    create_signal_path(x3, product, SAVE_PATH); 

F:/BigData/data/tmp pkl/600276/nr.1024
F:/BigData/data/tmp pkl/600276/nr.2048
F:/BigData/data/tmp pkl/600276/nr.4096
F:/BigData/data/tmp pkl/600535/nr.1024
F:/BigData/data/tmp pkl/600535/nr.2048
F:/BigData/data/tmp pkl/600535/nr.4096


In [5]:
!pip install pandas==1.5.3

Looking in indexes: https://pypi.tuna.tsinghua.edu.cn/simple
Collecting pandas==1.5.3
  Downloading https://pypi.tuna.tsinghua.edu.cn/packages/c2/45/801ecd8434eef0b39cc02795ffae273fe3df3cfcb3f6fff215efbe92d93c/pandas-1.5.3-cp39-cp39-win_amd64.whl (10.9 MB)
     ---------------------------------------- 0.0/10.9 MB ? eta -:--:--
      --------------------------------------- 0.2/10.9 MB 4.6 MB/s eta 0:00:03
      --------------------------------------- 0.2/10.9 MB 4.6 MB/s eta 0:00:03
      --------------------------------------- 0.2/10.9 MB 4.6 MB/s eta 0:00:03
     - -------------------------------------- 0.3/10.9 MB 1.6 MB/s eta 0:00:07
     - -------------------------------------- 0.3/10.9 MB 1.6 MB/s eta 0:00:07
     - -------------------------------------- 0.3/10.9 MB 1.2 MB/s eta 0:00:09
     - -------------------------------------- 0.4/10.9 MB 1.5 MB/s eta 0:00:08
     - -------------------------------------- 0.5/10.9 MB 1.3 MB/s eta 0:00:09
     - --------------------------------

ERROR: Could not install packages due to an OSError: [WinError 5] 拒绝访问。: 'D:\\Anaconda3\\envs\\tensorflow-gpu\\Lib\\site-packages\\~andas.libs\\msvcp140-ef6047a69b174ada5cb2eff1d2bc9a62.dll'
Consider using the `--user` option or check the permissions.



In [19]:
%%time 
for product in product_list:
    file_list = list(map(lambda x: DATA_PATH+product+"/"+x, os.listdir(DATA_PATH + product))) ## files of each day map和lambda的配合使用
    parLapply(CORE_NUM, file_list, build_composite_signal,signal_list=x3, product=product, HEAD_PATH=SAVE_PATH, n=8) # 这里为n指定了值为8
# file_list中的每一项是传入函数的第一个参数，传入函数名后面的参数全都是传入函数的参数，parLapply函数的作用是让CPU核心并行工作，提高效率。
# 第二个参数一般是一个列表，是把列表中的每个元素分别传给函数的，并不是把整个列表传给函数
# 生成第一个因子

CPU times: total: 0 ns
Wall time: 30.4 s


# dbook.period

In [55]:
# 这个因子和挂单量有关 这个因子利用了state.py中的因子模版
class foctor_dbook_period(factor_template):
    factor_name = "dbook.period" # 因子名称
    
    params = OrderedDict([
        ("period", np.power(2, range(10,13))) # 三个周期 1024 2048 4096
    ])
    
    def formula(self, data, period):
        bid_qty = data["bid1.qty"]+data["bid2.qty"]+data["bid3.qty"]+data["bid4.qty"]+data["bid5.qty"] # 5个列的加和
        diff_bid_qty = bid_qty - bid_qty.shift() # shift将数据向下移动移动一位 具体来说就是第一个时间节点的数据减去第二个时间节点的数据,减出来的有正值有负值
        diff_bid_qty[0] = 0 # 由于第一个数据被shift掉了，所以赋值为零 原来是nan
        ask_qty = data["ask1.qty"]+data["ask2.qty"]+data["ask3.qty"]+data["ask4.qty"]+data["ask5.qty"]
        diff_ask_qty = ask_qty - ask_qty.shift()
        diff_ask_qty[0] = 0 # 下面这段代码和上面的代码很类似
        return ewma(np.sign(diff_bid_qty)-np.sign(diff_ask_qty), period, adjust=True).values 
    # np.sign抽取每个元素的符号1 -1 是负数返回-1 是正数返回1 是0就还返回0

In [56]:
x4 = foctor_dbook_period()

In [57]:
# 创建因子目录
for product in product_list:
    create_signal_path(x4, product, SAVE_PATH)

F:/BigData/data/tmp pkl/600276/dbook.1024
F:/BigData/data/tmp pkl/600276/dbook.2048
F:/BigData/data/tmp pkl/600276/dbook.4096
F:/BigData/data/tmp pkl/600535/dbook.1024
F:/BigData/data/tmp pkl/600535/dbook.2048
F:/BigData/data/tmp pkl/600535/dbook.4096


In [58]:
%%time
for product in product_list:
    file_list = list(map(lambda x: DATA_PATH+product+"/"+x, os.listdir(DATA_PATH + product)))
    parLapply(CORE_NUM, file_list, build_composite_signal,signal_list=x4, product=product, HEAD_PATH=SAVE_PATH,n=8)
# 生成因子

AttributeError: 'Series' object has no attribute 'append'

Traceback
---------
  File "D:\Anaconda3\envs\tensorflow-gpu\lib\site-packages\dask\local.py", line 225, in execute_task
    result = _execute_task(task, data)
  File "D:\Anaconda3\envs\tensorflow-gpu\lib\site-packages\dask\core.py", line 127, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "F:\BigData\my_code\stats.py", line 137, in build_composite_signal
    S = signal_list.formula(data, *cartesian)
  File "C:\Users\ASUS\AppData\Local\Temp\ipykernel_15648\3001410274.py", line 16, in formula
  File "C:\Users\ASUS\AppData\Local\Temp\ipykernel_15648\2228110220.py", line 4, in ewma
  File "D:\Anaconda3\envs\tensorflow-gpu\lib\site-packages\pandas\core\generic.py", line 6296, in __getattr__
    return object.__getattribute__(self, name)


# range.pos.period

In [59]:
def get_range_pos(wpr, min_period, max_period, period):
    return ewma(zero_divide(wpr-min_period, max_period-min_period), period, adjust=True) - 0.5

In [60]:
class foctor_range_pos_period(factor_template):
    factor_name = "range.pos.period"
    
    params = OrderedDict([
        ("period", np.power(2, range(10,13)))
    ])
    
    def formula(self, data, period):
        return get_range_pos(data["wpr"], data["min."+str(period)], data["max."+str(period)], period).values

In [61]:
x5 = foctor_range_pos_period()

In [62]:
for product in product_list:
    create_signal_path(x5, product, SAVE_PATH)

F:/BigData/data/tmp pkl/600276/range.pos.1024
F:/BigData/data/tmp pkl/600276/range.pos.2048
F:/BigData/data/tmp pkl/600276/range.pos.4096
F:/BigData/data/tmp pkl/600535/range.pos.1024
F:/BigData/data/tmp pkl/600535/range.pos.2048
F:/BigData/data/tmp pkl/600535/range.pos.4096


In [63]:
%%time
for product in product_list:
    file_list = list(map(lambda x: DATA_PATH+product+"/"+x, os.listdir(DATA_PATH + product)))
    parLapply(CORE_NUM, file_list, build_composite_signal,signal_list=x5, product=product, HEAD_PATH=SAVE_PATH,n=8)

AttributeError: 'Series' object has no attribute 'append'

Traceback
---------
  File "D:\Anaconda3\envs\tensorflow-gpu\lib\site-packages\dask\local.py", line 225, in execute_task
    result = _execute_task(task, data)
  File "D:\Anaconda3\envs\tensorflow-gpu\lib\site-packages\dask\core.py", line 127, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "F:\BigData\my_code\stats.py", line 137, in build_composite_signal
    S = signal_list.formula(data, *cartesian)
  File "C:\Users\ASUS\AppData\Local\Temp\ipykernel_15648\456807279.py", line 9, in formula
  File "C:\Users\ASUS\AppData\Local\Temp\ipykernel_15648\2460966742.py", line 2, in get_range_pos
  File "C:\Users\ASUS\AppData\Local\Temp\ipykernel_15648\2228110220.py", line 4, in ewma
  File "D:\Anaconda3\envs\tensorflow-gpu\lib\site-packages\pandas\core\generic.py", line 6296, in __getattr__
    return object.__getattribute__(self, name)


# ma.diff.period

In [64]:
class foctor_ma_diff_period(factor_template):
    factor_name = "ma.dif.10.period"
    
    params = OrderedDict([
        ("period", np.power(2, range(10,13)))
    ])
    
    def formula(self, data, period):
        return zero_divide(ewma(data["wpr"], round(period/10), adjust=True) - ewma(data["wpr"], period, adjust=True), data["wpr"]).values

In [65]:
x6 = foctor_ma_diff_period()

In [66]:
for product in product_list:
    create_signal_path(x6, product, SAVE_PATH)

F:/BigData/data/tmp pkl/600276/ma.dif.10.1024
F:/BigData/data/tmp pkl/600276/ma.dif.10.2048
F:/BigData/data/tmp pkl/600276/ma.dif.10.4096
F:/BigData/data/tmp pkl/600535/ma.dif.10.1024
F:/BigData/data/tmp pkl/600535/ma.dif.10.2048
F:/BigData/data/tmp pkl/600535/ma.dif.10.4096


In [67]:
%%time
for product in product_list:
    file_list = list(map(lambda x: DATA_PATH+product+"/"+x, os.listdir(DATA_PATH + product)))
    parLapply(CORE_NUM, file_list, build_composite_signal,signal_list=x6, product=product, HEAD_PATH=SAVE_PATH, n=8)

AttributeError: 'Series' object has no attribute 'append'

Traceback
---------
  File "D:\Anaconda3\envs\tensorflow-gpu\lib\site-packages\dask\local.py", line 225, in execute_task
    result = _execute_task(task, data)
  File "D:\Anaconda3\envs\tensorflow-gpu\lib\site-packages\dask\core.py", line 127, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "F:\BigData\my_code\stats.py", line 137, in build_composite_signal
    S = signal_list.formula(data, *cartesian)
  File "C:\Users\ASUS\AppData\Local\Temp\ipykernel_15648\2248539299.py", line 9, in formula
  File "C:\Users\ASUS\AppData\Local\Temp\ipykernel_15648\2228110220.py", line 4, in ewma
  File "D:\Anaconda3\envs\tensorflow-gpu\lib\site-packages\pandas\core\generic.py", line 6296, in __getattr__
    return object.__getattribute__(self, name)


# price.osci.period

In [68]:
class foctor_price_osci_period(factor_template):
    factor_name = "price.osci.period"
    
    params = OrderedDict([
        ("period", np.power(2, range(10,13)))
    ])
    
    def formula(self, data, period):
        return zero_divide(ewma(data["wpr"], round(period/20), adjust=True)-ewma(data["wpr"], period, adjust=True), data["max."+str(period)]-data["min."+str(period)]).values

In [69]:
x7 = foctor_price_osci_period()

In [None]:
for product in product_list:
    create_signal_path(x7, product, SAVE_PATH)

In [None]:
%%time
for product in product_list:
    file_list = list(map(lambda x: DATA_PATH+product+"/"+x, os.listdir(DATA_PATH + product)))
    parLapply(CORE_NUM, file_list, build_composite_signal,signal_list=x7, product=product, HEAD_PATH=SAVE_PATH, n=8)

# kdj.k.period

In [None]:
class foctor_kdj_k_period(factor_template):
    factor_name = "kdj.k.period"
    
    params = OrderedDict([
        ("period", np.power(2, range(10,13)))
    ])
    
    def formula(self, data, period):
        return ewma((zero_divide(data["wpr"]-data["min."+str(period)], data["max."+str(period)]-data["min."+str(period)])-0.5)*2, round(period/5), adjust=True).values

In [None]:
x8 = foctor_kdj_k_period()

In [None]:
for product in product_list:
    create_signal_path(x8, product, SAVE_PATH)

In [None]:
%%time
for product in product_list:
    file_list = list(map(lambda x: DATA_PATH+product+"/"+x, os.listdir(DATA_PATH + product)))
    parLapply(CORE_NUM, file_list, build_composite_signal,signal_list=x8, product=product, HEAD_PATH=SAVE_PATH, n=8)

# kdj.j.period

In [None]:
class foctor_kdj_j_period(factor_template):
    factor_name = "kdj.j.period"
    
    params = OrderedDict([
        ("period", np.power(2, range(10,13)))
    ])
    
    def formula(self, data, period):
        return ewma(ewma((zero_divide(data["wpr"]-data["min."+str(period)], data["max."+str(period)]-data["min."+str(period)])-0.5)*2, round(period/5),
                        adjust=True), round(period/5), adjust=True).values

In [None]:
x9 = foctor_kdj_j_period()

In [None]:
for product in product_list:
    create_signal_path(x9, product, SAVE_PATH)

In [None]:
%%time
for product in product_list:
    file_list = list(map(lambda x: DATA_PATH+product+"/"+x, os.listdir(DATA_PATH + product)))
    parLapply(CORE_NUM, file_list, build_composite_signal,signal_list=x9, product=product, HEAD_PATH=SAVE_PATH, n=8)

# std.period

In [None]:
# 该函数用于std.period因子的计算 该函数可以快速计算方差
def fast_roll_var(x, period):
      x_ma = cum(x,period)/period
      x2 = x*x
      x2_ma = cum(x2,period)/period
      var_x = x2_ma-x_ma*x_ma
      return(var_x)

In [None]:
class foctor_std_period(factor_template): # 计算波动性因子 波动性因子区别于前面说的带有方向的因子
    factor_name = "std.period"
    
    params = OrderedDict([
        ("period", np.power(2, range(10,13)))
    ])
    
    def formula(self, data, period):
        return np.sqrt(fast_roll_var(data["wpr"], period)) # 该因子的计算公式

In [None]:
x10 = foctor_std_period()

In [None]:
for product in product_list:
    create_signal_path(x10, product, SAVE_PATH)

In [None]:
%%time
for product in product_list:
    file_list = list(map(lambda x: DATA_PATH+product+"/"+x, os.listdir(DATA_PATH + product)))
    parLapply(CORE_NUM, file_list, build_composite_signal,signal_list=x10, product=product, HEAD_PATH=SAVE_PATH,n=8)

# range.period

In [None]:
class foctor_range_period(factor_template): # 波动范围因子
    factor_name = "range.period"
    
    params = OrderedDict([
        ("period", np.power(2, range(10,13)))
    ])
    
    def formula(self, data, period):
        return data["max."+str(period)]-data["min."+str(period)]

In [None]:
x11 = foctor_range_period()

In [None]:
for product in product_list:
    create_signal_path(x11, product, SAVE_PATH)

In [None]:
%%time
for product in product_list:
    file_list = list(map(lambda x: DATA_PATH+product+"/"+x, os.listdir(DATA_PATH + product)))
    parLapply(CORE_NUM, file_list, build_composite_signal,signal_list=x11, product=product, HEAD_PATH=SAVE_PATH,n=8)

# trend.index.period

In [None]:
class foctor_trend_index_period(factor_template): # 趋势度因子 衡量波动性
    factor_name = "trend.index.period"
    
    params = OrderedDict([
        ("period", np.power(2, range(10,13)))
    ])
    
    def formula(self, data, period):
        aa = zero_divide(abs(data["wpr"]-data["wpr"].shift(period)), data["max."+str(period)]-data["min."+str(period)])
        aa[0:period]=0
        return aa

In [None]:
x12 = foctor_trend_index_period()

In [None]:
for product in product_list:
    create_signal_path(x12, product, SAVE_PATH)

In [None]:
%%time
for product in product_list:
    file_list = list(map(lambda x: DATA_PATH+product+"/"+x, os.listdir(DATA_PATH + product)))
    parLapply(CORE_NUM, file_list, build_composite_signal,signal_list=x12, product=product, HEAD_PATH=SAVE_PATH,n=8)

# 批量生成组合因子

- 预测因子：带方向的因子
- 波动率因子：不带方向的因子
- 方向因子*不带方向的因子 => 新的带方向的因子
- 两个带方向的因子相乘没有意义

In [None]:
# 创建组合因子
def construct_composite_signal(dire_signal, range_signal, period_list, product_list, HEAD_PATH):
    from collections import OrderedDict
    class foctor_xx_period(factor_template):
        factor_name = dire_signal+"."+range_signal+".period" # 新因子名称
        params = OrderedDict([
            ("period", period_list)
        ]) 
        def formula(self, data, period):
            return (data[dire_signal+"."+str(period)]*data[range_signal+"."+str(period)]).values # 因子计算
    xx = foctor_xx_period()
    for product in product_list:
        create_signal_path(xx, product, HEAD_PATH)
        file_list = list(map(lambda x: DATA_PATH+product+"/"+x, os.listdir(DATA_PATH + product)))
        parLapply(CORE_NUM, file_list, build_composite_signal,signal_list=xx, product=product, HEAD_PATH=HEAD_PATH, n=8)

In [None]:
dire_signal_list = ["nr", "dbook", "range.pos", "price.osci", "ma.dif.10", "kdj.k", "kdj.j"] # 方向性因子
range_signal_list = ["range", "std", "trend.index"] # 波动性因子
period_list = np.power(2, range(10, 13)) # 周期列表

In [None]:
%%time
for range_signal in range_signal_list:
    for dire_signal in dire_signal_list:
        construct_composite_signal(dire_signal, range_signal, period_list, product_list, SAVE_PATH)