In [1]:
import numpy as np
import pandas as pd

In [None]:
# 将数据转换为字典形式,加快计算
def change_df2dict(df):
    Dict = {}
    for code, data in df.groupby("code"):
        if len(data) == 0:
            continue
        Dict[code] = data
    return Dict

In [None]:
#%% 
'''
通过给的clouddata更新全天股票的数据，转化为指定格式，包括tick，order和trans三种
实盘中通过实际的交易接口进行更新和维护
'''
# Tick数据
def update_stock_total_tick_data(date):
    def get_valid_tick_data(tick_data):
        """
        由于Tick数据存在一些不需要的行情，这里进行一些剔除和过滤
        """
        invalid1 = (tick_data['time']>=92600000) & (tick_data['time']<93000000) & (tick_data['volume']==0) # 926-930选择性剔除
        invalid2 = (tick_data['time']>113000000) & (tick_data['time']<130000000) # 1130-1300全部剔除
        invalid = invalid1 | invalid2
        tick_data_valid = tick_data[~invalid]
        return tick_data_valid

    try:

        # 从clouddata里面读取全天的盘口数据
        cloud_data = pd.read_parquet(cloud_file)

        # 需要的字段：全部保留
        local_data = pd.DataFrame(index=cloud_data.index)
        local_data['date'] = date
        local_data['code'] = cloud_data['security_id'].map(lambda x: x[:6])
        local_data['time'] = cloud_data['trade_time']
        local_data['preclose'] = round(cloud_data['pre_close_price'] / 10000, 2)
        local_data['open'] = round(cloud_data['open_price'] / 10000, 2)
        local_data['high'] = round(cloud_data['high_price'] / 10000, 2)
        local_data['low']  =  round(cloud_data['low_price'] / 10000, 2)
        local_data['close']= round(cloud_data['last_price'] / 10000, 2)
        local_data['cjbs'] = cloud_data.groupby('security_id')['num_trades'].diff().fillna(0)
        local_data['volume'] = cloud_data['volume']
        local_data['amount'] = cloud_data['turnover']
        local_data['cjbs_sum'] = cloud_data['num_trades']
        local_data['volume_sum'] = cloud_data['total_volume']
        local_data['amount_sum'] = cloud_data['total_turnover']
        
        # 处理10档行情，但是如果是集合竞价期间数据，仅保留2档行情，节省空间
        level_num = 2 if 'Call' in name else 10
        for level in range(level_num):
            local_data[f'bp{level+1}'] = round(cloud_data[f'bid{level}'] / 10000, 2)
            local_data[f'sp{level+1}'] = round(cloud_data[f'ask{level}'] / 10000, 2)
            local_data[f'bv{level+1}'] = cloud_data[f'bid_volume{level}']
            local_data[f'sv{level+1}'] = cloud_data[f'ask_volume{level}']

        # 处理累计挂单盘口和平均挂单价格
        local_data['bp_avg'] = round(cloud_data['bid_avg_price'] / 10000, 2)
        local_data['sp_avg'] = round(cloud_data['ask_avg_price'] / 10000, 2)
        local_data['bv_sum'] = cloud_data['total_bid_volume']
        local_data['sv_sum'] = cloud_data['total_ask_volume']
    
        # 再执行一下行情过滤
        local_data = get_valid_tick_data(local_data)
        # 输出数据
        local_data = local_data.reset_index(drop=True)
        local_data.to_feather(local_file, compression='zstd')

    except Exception as e:
        log_info(name, date, str(e))


In [None]:

# Order数据
def update_stock_total_order_data(date):
    def get_order_data_by_code(code, order_data_dict, trans_data_dict, tick_data_dict):
        """
        分为SH和SZ部分单独进行处理
        SZ：从Trans数据中找出撤单委托，对应原始委托单中找出原始委托价格，
            然后取TransID作为这条撤单委托的orderid，以插入到委托队列中
        SH：用Trans中的主动成交部分，添加到Order数据集中，然后进行一些groupby操作
        """
        if code[:6] not in tick_data_dict.keys(): # 没有Tick数据的不要，因为无法给出昨收价
            return pd.DataFrame() # 返回一个空的df
        
        tick_data = tick_data_dict[code[:6]]
        order_data = order_data_dict[code]
        order_data['orderidorigin'] = order_data['orderid'] # 添加一列，为了兼容后面的撤单数据
        
        if code not in trans_data_dict.keys(): # 没有成交数据的，直接返回原值
            order_data_new = order_data.copy()
        else:
            if code[0] != '6': # sz股票，把撤单委托插入到委托队列中
                trans_data = trans_data_dict[code]
                cancel_data = trans_data[trans_data['transType']==0]
                cancel_data['orderid'] = cancel_data['bidID'] + cancel_data['askID']
                cancel_data = cancel_data.loc[:, ['orderid', 'time', 'volume', 'transID']]
                cancel_data = cancel_data.merge(order_data.drop(['time', 'volume'], axis=1), on=['orderid'], how='left')
                cancel_data = cancel_data[cancel_data['bsFlag'].notna()] # 只保留能找到原始委托的撤单委托
                cancel_data['ordertype'] = 'C' # 类型改为撤单
                cancel_data['orderidorigin'] = cancel_data['orderid'] # 对应的原始委托，保留这一列
                cancel_data['orderid'] = cancel_data['transID'] # 把委托编号改为transID，保证插入顺序精确
                cancel_data = cancel_data.drop('transID', axis=1)
                order_data_new = order_data.append(cancel_data) # 和撤单队列拼接到一起
                order_data_new = order_data_new.sort_values('orderid', ascending=True) # 做好唯一排序
            else: # sh股票，把主动成交还原为到原始委托量中，然后把撤单和原始委托进行不一样的操作
                trans_data = trans_data_dict[code]
                trans_data_use = trans_data[(trans_data['time']>=93000000)&
                                            (trans_data['time']<=145800000)] # 只取连续竞价期间的
                trans_data_use['orderid'] = trans_data_use[['askID', 'bidID']].max(axis=1) # 取主动成交的ID作为orderid
                trans_data_use['ordertype'] = 'A' # 委托类型=新增
                trans_data_use['orderidorigin'] = trans_data_use['orderid'] # 原始委托id
                trans_data_use = trans_data_use.reindex(order_data.columns, axis=1)
                order_data_new = order_data.append(trans_data_use) # 和成交队列拼接到一起
                order_data_newb = order_data_new[order_data_new['bsFlag']==1]
                order_data_newba = order_data_newb[order_data_newb['ordertype'].isin([3, 'A'])] # 买入挂单
                order_data_newbd = order_data_newb[order_data_newb['ordertype'].isin([4, 'D'])] # 买入撤单
                order_data_news = order_data_new[order_data_new['bsFlag']==-1]
                order_data_newsa = order_data_news[order_data_news['ordertype'].isin([3, 'A'])] # 卖出挂单
                order_data_newsd = order_data_news[order_data_news['ordertype'].isin([4, 'D'])] # 卖出撤单
                
                # 买入委托的价格取最高的，卖出委托的价格取最低的
                order_data_newba = order_data_newba.groupby('orderid').agg({'ds': 'first',
                                                                            'code': 'first',
                                                                            'time': 'first',
                                                                            'ordertype': 'first',
                                                                            'bsFlag': 'first',
                                                                            'price': 'max',
                                                                            'volume': 'sum',
                                                                            'orderidorigin': 'first'}).reset_index()
                                                                          
                order_data_newsa = order_data_newsa.groupby('orderid').agg({'ds': 'first',
                                                                            'code': 'first',
                                                                            'time': 'first',
                                                                            'ordertype': 'first',
                                                                            'bsFlag': 'first',
                                                                            'price': 'min',
                                                                            'volume': 'sum',
                                                                            'orderidorigin': 'first'}).reset_index()

                order_data_new = pd.concat([order_data_newba, order_data_newbd,
                                            order_data_newsa, order_data_newsd], axis=0)

                # 因为SH的委托中，D类型的委托id就是原始id，所以没法进行唯一排序，只能做其他类型的排序
                order_data_new = order_data_new.sort_values(['time', 'orderid', 'ordertype'],
                                                            ascending=[True, True, True]) # ordertype排序之后就是先挂单后撤单

        # 进行一些格式调整
        order_data_new['code'] = order_data_new['code'].map(lambda x: x[:6])
        order_data_new['ordertype'] = order_data_new['ordertype'].replace({3: 'A', 4: 'D'}).map(str)
        order_data_new['volumesignal'] = order_data_new['ordertype'].map(lambda x: -1 if x in ['C', 'D'] else 1) # 撤单的话就把委托量反向
        order_data_new['volume'] = order_data_new['volume'] * order_data_new['volumesignal']
        order_data_new['preclose'] = tick_data['preclose'].iloc[0] # 把昨收价装上

        # 制作local_data
        local_data_sub = pd.DataFrame(index=order_data_new.index)
        local_data_sub['date'] = date
        local_data_sub['code'] = order_data_new['code']
        local_data_sub['time'] = order_data_new['time']
        local_data_sub['preclose'] = order_data_new['preclose']
        local_data_sub['orderId'] = order_data_new['orderid']
        local_data_sub['orderType'] = order_data_new['ordertype']
        local_data_sub['orderSide'] = order_data_new['bsFlag'].replace({1: 'B', -1: 'S', 0: 'C'}).astype(str)
        local_data_sub['orderPrice'] = round(order_data_new['price'] / 10000, 2)
        local_data_sub['orderVolume'] = round(order_data_new['volume']).astype(np.int64) # 量还是用int最好
        local_data_sub['orderAmount'] = local_data_sub['orderPrice'] * local_data_sub['orderVolume']
        local_data_sub['orderIdOrigin'] = order_data_new['orderidorigin']

        return local_data_sub

    try:


        order_file = f'{stock_order_data_path}/{date}.parquet'
        trans_file = f'{stock_trans_data_path}/{date}.parquet'
        order_data_dict = change_df2dict(pd.read_parquet(order_file)) # 读取全天的委托数据
        trans_data_dict = change_df2dict(pd.read_parquet(trans_file)) # 读取全天的成交数据
        tick_data_dict = change_df2dict(read_local_data_daily('StockTotalTickData', date)) # 读取全天的Tick数据，用来给出收盘价
        code_list = list(order_data_dict.keys())
        code_list.sort()

        local_data_list = []
        for code in code_list:
            local_data_sub = get_order_data_by_code(code, order_data_dict, trans_data_dict, tick_data_dict)
            local_data_list.append(local_data_sub)
        del order_data_dict, trans_data_dict, tick_data_dict # 降低内存开销
        local_data = pd.concat(local_data_list).reset_index(drop=True)

        # 输出数据
        local_data.to_feather(local_file, compression='zstd')
        
    except Exception as e:
        log_info(name, date, str(e))


In [None]:
# Trans数据
def update_stock_total_trans_data(date):
    try:
        name = get_update_function_name(sys._getframe().f_code.co_name)
        local_file = get_local_file(name, date)
        trans_file = f'{stock_trans_data_path}/{date}.parquet'

        if os.path.exists(local_file):
            #log_info(name, date, f'{local_file} is Exsited!')
            return None

        if not os.path.exists(trans_file):
            log_info(name, date, f'{trans_file} Not Exsits!')
            return None
 
        trans_data = pd.read_parquet(trans_file) # 读取全天的成交数据
        log_info(name, date, "Read Data, Done")

        # Trans数据不需要撤单部分，因为撤单部分已经放到Order中去了
        trans_data = trans_data[trans_data['transType']==1]
        trans_data['code'] = trans_data['code'].map(lambda x: x[:6])

        # 制作LocalData
        local_data = pd.DataFrame(index=trans_data.index)
        local_data['date'] = date # str
        local_data['code'] = trans_data['code']
        local_data['time'] = trans_data['time']
        local_data['transId'] = trans_data['transID']
        local_data['transType'] = trans_data['transType'].map(str) # 跟Order那边格式对齐
        local_data['transSide'] = trans_data['bsFlag'].replace({1: 'B', -1: 'S', 0: 'C'}).astype(str) # str
        local_data['transPrice'] = round(trans_data['price'] / 10000, 2)
        local_data['transVolume'] = trans_data['volume']
        local_data['transAmount'] = local_data['transPrice'] * local_data['transVolume']
        local_data['askId'] = trans_data['askID']
        local_data['bidId'] = trans_data['bidID']
        local_data = local_data.reset_index(drop=True)
        log_info(name, date, "Proc Data, Done")

        # 输出数据
        local_data.to_feather(local_file, compression='zstd')
        log_info(name, date, "Save Data, Done")
        del trans_data # 删除数据，节省内存

    except Exception as e:
        log_info(name, date, str(e))
