In [None]:
import pandas as pd 
import numpy as np
import os
import json
from scipy.stats import pearsonr, spearmanr

In [None]:
# 获取所有的数据文件
# input : 文件夹路径
# output : 文件夹中前两个字文件夹，每个子文件夹中的前三个文件
def get_files_by_subfolder(folder_path):
    folder_files = []
    subfolders = [f.path for f in os.scandir(folder_path) if f.is_dir()]
    subfolders.sort()
    for root in subfolders[:2]:
        files_in_subfolder = [os.path.join(root,filename) for filename in os.listdir(root)]
        files_in_subfolder.sort()
        files_in_subfolder = [pd.read_json(path) for path in files_in_subfolder[:3]]
        folder_files.append(files_in_subfolder)
    return folder_files

In [9]:
folder_path_1 = 'quote/order_in_hour'
folder_path_2 = 'quote/snap_in_hour'
folder_path_3 = 'quote/trade_in_hour'
df1_list = get_files_by_subfolder(folder_path_1)
df2_list = get_files_by_subfolder(folder_path_2)
df3_list = get_files_by_subfolder(folder_path_3)

In [None]:
df2_list

In [11]:
def cal_wei_price(df):
    total_sum = 0
    for i in range(df.shape[0]):
        temp_sum = df.iloc[i]['trade_price'] * df.iloc[i]['trade_volume']
        total_sum += temp_sum 
    sum_vol = sum(df[df['trade_price']!=0]['trade_volume'])
    if sum_vol == 0:
        wei_price = 0
    else:
        wei_price = total_sum / sum_vol
    return wei_price

In [12]:
def cal_win(list):
    df_1,df_2,df_3 = list[0],list[1],list[2]
    start_time = df_1.loc[0]['timestamp'] + np.timedelta64(910,'s')
    end_time = df_1.iloc[-1]['timestamp'] + np.timedelta64(-201,'s')

    # 进行类型划分
    for i in range(df_3.shape[0]):
        if df_3.loc[i]['market_time'] < start_time or df_3.loc[i]['market_time'] > end_time:
            continue
        current_order = df_3.loc[i]
        # 获取挂单方向
        cur_flag = current_order['flag']
        # 获取挂单价格
        cur_price = current_order['price']
        # 获取挂单量
        cur_vol = current_order['volume']
        # 获取时间
        cur_time = current_order['market_time']

        # 获取快照数据
        if (df_2['market_time'].isin(['cur_time']).any()):
            cor_market = df_2[df_2['market_time'] ==  cur_time ]
        else:
            prev_snap = df_2[df_2['market_time'] < cur_time]
            if not prev_snap.empty:
                cor_market = prev_snap.iloc[-1:]
            else:
                continue
        # print(cor_market)
        # 获取买 1 价和交易量, 买一价和交易量
        bid_1 = cor_market['buy_delegations'].iloc[0][0]['price']
        bid_vol_1 = cor_market['buy_delegations'].iloc[0][0]['volume']
        bid_2 = cor_market['buy_delegations'].iloc[0][1]['price']

        ask_1 = cor_market['sell_delegations'].iloc[0][0]['price']
        ask_vol_1 = cor_market['sell_delegations'].iloc[0][0]['volume']
        ask_2 = cor_market['sell_delegations'].iloc[0][1]['price']

        # 对当前订单进行判断攻击性类型
        if cur_flag == 'B':
            if (cur_price > ask_1) and (cur_vol > ask_vol_1):
                df_3.loc[i,'type_agg'] = 'type1'
            elif (cur_price == ask_1) and (cur_vol > ask_vol_1):
                df_3.loc[i,'type_agg'] = 'type2'
        else :
            if (cur_price < bid_1) and (cur_vol > bid_vol_1):
                df_3.loc[i,'type_agg'] = 'type7'
            elif (cur_price == bid_1) and (cur_vol > bid_vol_1):
                df_3.loc[i,'type_agg'] = 'type8'

    for i in range(df_3.shape[0]):
        if df_3.loc[i]['market_time'] < start_time or df_3.loc[i]['market_time'] > end_time:
            continue
        if df_3.loc[i]['type_agg'] in ['type1','type2','type7','type8']:
            current_order = df_3.loc[i]
            # 获取挂单价格
            cur_price = current_order['price']
            # 获取挂单量'
            cur_vol = current_order['volume']
            # 获取时间
            cur_time = current_order['market_time']

            # 获取交易窗口数据
            weight_price = []
            for j in range(-10,21):
                cor_time = cur_time + np.timedelta64(j, 's')
                # print(cor_time)
                cor_trade = df_1[df_1['timestamp'] == cor_time]
                # print(cor_trade)
                weight_price.append({j:cal_wei_price(cor_trade)})
            # print(weight_price)
            if 'time_win' not in df_3.columns:
                df_3['time_win'] = None
            df_3.at[i,'time_win'] = weight_price

    index1 = df_3[df_3['type_agg']=='type1']['market_time']
    index2 = df_3[df_3['type_agg']=='type2']['market_time']
    index7 = df_3[df_3['type_agg']=='type7']['market_time']
    index8 = df_3[df_3['type_agg']=='type8']['market_time']

    type1_win = df_3[df_3['type_agg']=='type1']['time_win']
    type1_win.index = index1
    type2_win = df_3[df_3['type_agg']=='type2']['time_win']
    type2_win.index = index2
    type7_win = df_3[df_3['type_agg']=='type7']['time_win']
    type7_win.index = index7
    type8_win = df_3[df_3['type_agg']=='type8']['time_win']
    type8_win.index = index8

    type_1_df = pd.DataFrame(type1_win)
    type_2_df = pd.DataFrame(type2_win)
    type_7_df = pd.DataFrame(type7_win)
    type_8_df = pd.DataFrame(type8_win)

    for i in range (-10,21):
        type_1_df[str(i)]=type_1_df['time_win'].apply(lambda x: x[i+10][i])
        type_2_df[str(i)]=type_2_df['time_win'].apply(lambda x: x[i+10][i])
        type_7_df[str(i)]=type_7_df['time_win'].apply(lambda x: x[i+10][i])
        type_8_df[str(i)]=type_8_df['time_win'].apply(lambda x: x[i+10][i])

    type_1_gr = type_1_df.groupby(level=0).min()
    type_2_gr = type_2_df.groupby(level=0).min()
    type_7_gr = type_7_df.groupby(level=0).min()
    type_8_gr = type_8_df.groupby(level=0).min()
    type_win = [type_1_gr,type_2_gr,type_7_gr,type_8_gr]
    return type_win

In [None]:
import multiprocessing
from itertools import zip_longest
def process_rows(row_data):
    rowq,row2,row3 = row_data
    result = []
    for val1,val2,val3 in zip_longest(row1,row2,row3,fillvalue = None):
        result.append([val1,val2,val3])
    return result

def parallel_process(list1,list2,list3):
    with multiprocessing.Pool() as pool:
            rows = zip_longest(list1,list2,list3,fillvalue=[])
            results = pool.map(process_rows,rows)
    for result in results:
        mat1 = []
        for row in result:
            mat =  cal_win(result)
            mat1.append(mat)
        mat2.append(mat1)
    return mat2
        

In [6]:
type1_code=[]
type2_code=[]
type7_code=[]
type8_code=[]
type1_code_date=[]
type2_code_date=[]
type7_code_date=[]
type8_code_date=[]
for row_1,row_2,row_3 in zip(df1_list,df2_list,df3_list): 
    for df_1,df_2,df_3 in zip(row_1,row_2,row_3):
        start_time = df_1.loc[0]['timestamp'] + np.timedelta64(910,'s')
        end_time = df_1.iloc[-1]['timestamp'] + np.timedelta64(-201,'s')
        
        # 进行类型划分
        for i in range(df_3.shape[0]):
            if df_3.loc[i]['market_time'] < start_time or df_3.loc[i]['market_time'] > end_time:
                continue
            current_order = df_3.loc[i]
            # 获取挂单方向
            cur_flag = current_order['flag']
            # 获取挂单价格
            cur_price = current_order['price']
            # 获取挂单量
            cur_vol = current_order['volume']
            # 获取时间
            cur_time = current_order['market_time']

            # 获取快照数据
            if (df_2['market_time'].isin(['cur_time']).any()):
                cor_market = df_2[df_2['market_time'] ==  cur_time ]
            else:
                prev_snap = df_2[df_2['market_time'] < cur_time]
                if not prev_snap.empty:
                    cor_market = prev_snap.iloc[-1:]
                else:
                    continue
            # print(cor_market)
            # 获取买 1 价和交易量, 买一价和交易量
            bid_1 = cor_market['buy_delegations'].iloc[0][0]['price']
            bid_vol_1 = cor_market['buy_delegations'].iloc[0][0]['volume']
            bid_2 = cor_market['buy_delegations'].iloc[0][1]['price']

            ask_1 = cor_market['sell_delegations'].iloc[0][0]['price']
            ask_vol_1 = cor_market['sell_delegations'].iloc[0][0]['volume']
            ask_2 = cor_market['sell_delegations'].iloc[0][1]['price']

            # 对当前订单进行判断攻击性类型
            if cur_flag == 'B':
                if (cur_price > ask_1) and (cur_vol > ask_vol_1):
                    df_3.loc[i,'type_agg'] = 'type1'
                elif (cur_price == ask_1) and (cur_vol > ask_vol_1):
                    df_3.loc[i,'type_agg'] = 'type2'
            else :
                if (cur_price < bid_1) and (cur_vol > bid_vol_1):
                    df_3.loc[i,'type_agg'] = 'type7'
                elif (cur_price == bid_1) and (cur_vol > bid_vol_1):
                    df_3.loc[i,'type_agg'] = 'type8'

        for i in range(df_3.shape[0]):
            if df_3.loc[i]['market_time'] < start_time or df_3.loc[i]['market_time'] > end_time:
                continue
            if df_3.loc[i]['type_agg'] in ['type1','type2','type7','type8']:
                current_order = df_3.loc[i]
                # 获取挂单价格
                cur_price = current_order['price']
                # 获取挂单量'
                cur_vol = current_order['volume']
                # 获取时间
                cur_time = current_order['market_time']

                # 获取交易窗口数据
                weight_price = []
                for j in range(-10,21):
                    cor_time = cur_time + np.timedelta64(j, 's')
                    # print(cor_time)
                    cor_trade = df_1[df_1['timestamp'] == cor_time]
                    # print(cor_trade)
                    weight_price.append({j:cal_wei_price(cor_trade)})
                # print(weight_price)
                if 'time_win' not in df_3.columns:
                    df_3['time_win'] = None
                df_3.at[i,'time_win'] = weight_price

        index1 = df_3[df_3['type_agg']=='type1']['market_time']
        index2 = df_3[df_3['type_agg']=='type2']['market_time']
        index7 = df_3[df_3['type_agg']=='type7']['market_time']
        index8 = df_3[df_3['type_agg']=='type8']['market_time']

        type1_win = df_3[df_3['type_agg']=='type1']['time_win']
        type1_win.index = index1
        type2_win = df_3[df_3['type_agg']=='type2']['time_win']
        type2_win.index = index2
        type7_win = df_3[df_3['type_agg']=='type7']['time_win']
        type7_win.index = index7
        type8_win = df_3[df_3['type_agg']=='type8']['time_win']
        type8_win.index = index8

        type_1_df = pd.DataFrame(type1_win)
        type_2_df = pd.DataFrame(type2_win)
        type_7_df = pd.DataFrame(type7_win)
        type_8_df = pd.DataFrame(type8_win)

        for i in range (-10,21):
            type_1_df[str(i)]=type_1_df['time_win'].apply(lambda x: x[i+10][i])
            type_2_df[str(i)]=type_2_df['time_win'].apply(lambda x: x[i+10][i])
            type_7_df[str(i)]=type_7_df['time_win'].apply(lambda x: x[i+10][i])
            type_8_df[str(i)]=type_8_df['time_win'].apply(lambda x: x[i+10][i])

        type_1_gr = type_1_df.groupby(level=0).min()
        type_2_gr = type_2_df.groupby(level=0).min()
        type_7_gr = type_7_df.groupby(level=0).min()
        type_8_gr = type_8_df.groupby(level=0).min()
        
        type1_code.append(type_1_gr)
        type2_code.append(type_2_gr)
        type7_code.append(type_7_gr)
        type8_code.append(type_8_gr)
    type1_code_date.append(type1_code)
    type2_code_date.append(type2_code)
    type7_code_date.append(type7_code)
    type8_code_date.append(type8_code)


In [69]:
        cor_8 = []
        vec_1 = type_8_gr[str(0)] -type_8_gr['-1'] 
        vec_2 = type_8_gr[str(1)] -type_8_gr[str(0)]
        cor_8.append(spearmanr(vec_1, vec_2))
        for t in range(0,16):
            vec_1 = type_8_gr[str(t)] -type_8_gr['-1'] 
            vec_2 = type_8_gr[str(t+5)] -type_8_gr[str(t)]
            cor_8.append(spearmanr(vec_1, vec_2))
        cor_mat_8.append(cor_8)

        cor_7 = []
        vec_1 = type_7_gr[str(0)] -type_7_gr['-1'] 
        vec_2 = type_7_gr[str(1)] -type_7_gr[str(0)]
        cor_7.append(spearmanr(vec_1, vec_2))
        for t in range(0,16):
            vec_1 = type_7_gr[str(t)] -type_7_gr['-1'] 
            vec_2 = type_7_gr[str(t+5)] -type_7_gr[str(t)]
            cor_7.append(spearmanr(vec_1, vec_2))
        cor_mat_7.append(cor_7)

        cor_2 = []
        vec_1 = type_2_gr[str(0)] -type_2_gr['-1'] 
        vec_2 = type_2_gr[str(1)] -type_2_gr[str(0)]
        cor_2.append(spearmanr(vec_1, vec_2))
        for t in range(0,16):
            vec_1 = type_2_gr[str(t)] -type_2_gr['-1'] 
            vec_2 = type_2_gr[str(t+5)] -type_2_gr[str(t)]
            cor_2.append(spearmanr(vec_1, vec_2))
        cor_mat_2.append(cor_2)

        cor_1 = []
        vec_1 = type_1_gr[str(0)] -type_1_gr['-1'] 
        vec_2 = type_1_gr[str(1)] -type_1_gr[str(0)]
        cor_1.append(spearmanr(vec_1, vec_2))
        for t in range(0,16):
            vec_1 = type_1_gr[str(t)] -type_1_gr['-1'] 
            vec_2 = type_1_gr[str(t+5)] -type_1_gr[str(t)]
            cor_1.append(spearmanr(vec_1, vec_2))
        cor_mat_1.append(cor_1)
    # 包含所有的股票
    cor_mat_1_.append(cor_mat_1)
    cor_mat_2_.append(cor_mat_2)
    cor_mat_3_.append(cor_mat_7)
    cor_mat_4_.append(cor_mat_8)

[]