In [42]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
pd.options.display.max_rows = 300
pd.options.display.max_columns = 5

import os
import glob

In [43]:
def calc_wap(df):
    wap = (df['bid_price1'] * df['ask_size1'] + df['ask_price1'] * df['bid_size1'])/(df['bid_size1'] + df['ask_size1'])
    return wap
def calc_wap2(df):
    wap = (df['bid_price2'] * df['ask_size2'] + df['ask_price2'] * df['bid_size2'])/(df['bid_size2'] + df['ask_size2'])
    return wap

In [44]:
def log_return(list_stock_prices):
    return np.log(list_stock_prices).diff() 
def realized_volatility(series):
    return np.sqrt(np.sum(series**2))
def count_unique(series):
    return len(np.unique(series))

In [45]:
# data directory
data_dir = '/optiver-realized-volatility-prediction/'

In [46]:
book_train = pd.read_parquet("book_train.parquet/stock_id=15")
book_train.head()

Unnamed: 0,time_id,seconds_in_bucket,...,bid_size2,ask_size2
0,5,0,...,2,12
1,5,1,...,100,20
2,5,2,...,400,20
3,5,3,...,1,20
4,5,4,...,400,20


In [53]:
def preprocessor_book(file_path):
    df = pd.read_parquet(file_path)
    #calculate return etc
    df['wap'] = calc_wap(df)
    df['log_return'] = df.groupby('time_id')['wap'].apply(log_return).values
    
    df['wap2'] = calc_wap2(df)
    df['log_return2'] = df.groupby('time_id')['wap2'].apply(log_return).values
    
    df['wap_balance'] = abs(df['wap'] - df['wap2'])
    
    df['price_spread'] = (df['ask_price1'] - df['bid_price1']) / ((df['ask_price1'] + df['bid_price1'])/2)
    df['bid_spread'] = df['bid_price1'] - df['bid_price2']
    df['ask_spread'] = df['ask_price1'] - df['ask_price2']
    df['total_volume'] = (df['ask_size1'] + df['ask_size2']) + (df['bid_size1'] + df['bid_size2'])
    df['volume_imbalance'] = abs((df['ask_size1'] + df['ask_size2']) - (df['bid_size1'] + df['bid_size2']))

    #dict for aggregate
    create_feature_dict = {
        'log_return':[realized_volatility],
        'log_return2':[realized_volatility],
        'wap_balance':[np.mean],
        'price_spread':[np.mean],
        'bid_spread':[np.mean],
        'ask_spread':[np.mean],
        'volume_imbalance':[np.mean],
        'total_volume':[np.mean],
        'wap':[np.mean],
            }

    #####groupby / all seconds
    df_feature = pd.DataFrame(df.groupby(['time_id']).agg(create_feature_dict)).reset_index()
    
    df_feature.columns = ['_'.join(col) for col in df_feature.columns] #time_id is changed to time_id_
        
    ######groupby / last XX seconds
    last_seconds = [300]
    
    for second in last_seconds:
        second = 600 - second 
    
        df_feature_sec = pd.DataFrame(df.query(f'seconds_in_bucket >= {second}').groupby(['time_id']).agg(create_feature_dict)).reset_index()

        df_feature_sec.columns = ['_'.join(col) for col in df_feature_sec.columns] #time_id is changed to time_id_
     
        df_feature_sec = df_feature_sec.add_suffix('_' + str(second))

        df_feature = pd.merge(df_feature,df_feature_sec,how='left',left_on='time_id_',right_on=f'time_id__{second}')
        df_feature = df_feature.drop([f'time_id__{second}'],axis=1)
    
    #create row_id
    stock_id = file_path.split('=')[1]
    df_feature['row_id'] = df_feature['time_id_'].apply(lambda x:f'{stock_id}-{x}')
    df_feature = df_feature.drop(['time_id_'],axis=1)
    
    return df_feature

In [57]:
%%time
file_path = "book_train.parquet/stock_id=0"
preprocessor_book(file_path)

  df_feature = pd.DataFrame(df.groupby(['time_id']).agg(create_feature_dict)).reset_index()


CPU times: total: 3.12 s
Wall time: 3.53 s


  df_feature_sec = pd.DataFrame(df.query(f'seconds_in_bucket >= {second}').groupby(['time_id']).agg(create_feature_dict)).reset_index()


Unnamed: 0,log_return_realized_volatility,log_return2_realized_volatility,...,wap_mean_300,row_id
0,0.004499,0.006999,...,1.003753,0-5
1,0.001204,0.002476,...,1.000397,0-11
2,0.002369,0.004801,...,0.998685,0-16
3,0.002574,0.003637,...,0.998436,0-31
4,0.001894,0.003257,...,0.999488,0-62
...,...,...,...,...,...
3825,0.002579,0.003821,...,0.997519,0-32751
3826,0.002206,0.002847,...,1.000682,0-32753
3827,0.002913,0.003266,...,1.000111,0-32758
3828,0.003046,0.005105,...,1.002277,0-32763


In [29]:
trade_train = pd.read_parquet(data_dir + "trade_train.parquet/stock_id=0")
trade_train.head(15)

Unnamed: 0,time_id,seconds_in_bucket,price,size,order_count
0,5,21,1.002301,326,12
1,5,46,1.002778,128,4
2,5,50,1.002818,55,1
3,5,57,1.003155,121,5
4,5,68,1.003646,4,1
5,5,78,1.003762,134,5
6,5,122,1.004207,102,3
7,5,127,1.004577,1,1
8,5,144,1.00437,6,1
9,5,147,1.003964,233,4


In [61]:
def preprocessor_trade(file_path):
    df = pd.read_parquet(file_path)
    df['log_return'] = df.groupby('time_id')['price'].apply(log_return).values
    
    
    aggregate_dictionary = {
        'log_return':[realized_volatility],
        'seconds_in_bucket':[count_unique],
        'size':[np.sum],
        'order_count':[np.mean],
    }
    
    df_feature = df.groupby('time_id').agg(aggregate_dictionary)
    
    df_feature = df_feature.reset_index()
    df_feature.columns = ['_'.join(col) for col in df_feature.columns]

    
    ######groupby / last XX seconds
    last_seconds = [300]
    
    for second in last_seconds:
        second = 600 - second
    
        df_feature_sec = df.query(f'seconds_in_bucket >= {second}').groupby('time_id').agg(aggregate_dictionary)
        df_feature_sec = df_feature_sec.reset_index()
        
        df_feature_sec.columns = ['_'.join(col) for col in df_feature_sec.columns]
        df_feature_sec = df_feature_sec.add_suffix('_' + str(second))
        
        df_feature = pd.merge(df_feature,df_feature_sec,how='left',left_on='time_id_',right_on=f'time_id__{second}')
        df_feature = df_feature.drop([f'time_id__{second}'],axis=1)
    
    df_feature = df_feature.add_prefix('trade_')
    stock_id = file_path.split('=')[1]
    df_feature['row_id'] = df_feature['trade_time_id_'].apply(lambda x:f'{stock_id}-{x}')
    df_feature = df_feature.drop(['trade_time_id_'],axis=1)
    
    return df_feature

In [62]:
%%time
file_path ="trade_train.parquet/stock_id=0"
preprocessor_trade(file_path)

  df_feature = df.groupby('time_id').agg(aggregate_dictionary)
  df_feature = df.groupby('time_id').agg(aggregate_dictionary)


CPU times: total: 1.59 s
Wall time: 1.79 s


  df_feature_sec = df.query(f'seconds_in_bucket >= {second}').groupby('time_id').agg(aggregate_dictionary)
  df_feature_sec = df.query(f'seconds_in_bucket >= {second}').groupby('time_id').agg(aggregate_dictionary)


Unnamed: 0,trade_log_return_realized_volatility,trade_seconds_in_bucket_count_unique,...,trade_order_count_mean_300,row_id
0,0.002006,40,...,2.571429,0-5
1,0.000901,30,...,2.250000,0-11
2,0.001961,25,...,3.166667,0-16
3,0.001561,15,...,5.111111,0-31
4,0.000871,22,...,4.909091,0-62
...,...,...,...,...,...
3825,0.001519,52,...,3.257143,0-32751
3826,0.001411,28,...,4.250000,0-32753
3827,0.001521,36,...,3.727273,0-32758
3828,0.001794,53,...,1.920000,0-32763


In [63]:
def preprocessor(list_stock_ids, is_train = True):
    from joblib import Parallel, delayed # parallel computing to save time
    df = pd.DataFrame()
    
    def for_joblib(stock_id):
        if is_train:
            file_path_book = "book_train.parquet/stock_id=" + str(stock_id)
            file_path_trade = "trade_train.parquet/stock_id=" + str(stock_id)
        else:
            file_path_book = "book_test.parquet/stock_id=" + str(stock_id)
            file_path_trade = "trade_test.parquet/stock_id=" + str(stock_id)
            
        df_tmp = pd.merge(preprocessor_book(file_path_book),preprocessor_trade(file_path_trade),on='row_id',how='left')
     
        return pd.concat([df,df_tmp],axis=0)
    
    for stock_id in list_stock_ids:
        df = for_joblib(stock_id)
    
 #   df =  pd.concat(df,ignore_index = True)
    return df

In [64]:
list_stock_ids = [0,1]
preprocessor(list_stock_ids, is_train = True)

  df_feature = pd.DataFrame(df.groupby(['time_id']).agg(create_feature_dict)).reset_index()
  df_feature_sec = pd.DataFrame(df.query(f'seconds_in_bucket >= {second}').groupby(['time_id']).agg(create_feature_dict)).reset_index()
  df_feature = df.groupby('time_id').agg(aggregate_dictionary)
  df_feature = df.groupby('time_id').agg(aggregate_dictionary)
  df_feature_sec = df.query(f'seconds_in_bucket >= {second}').groupby('time_id').agg(aggregate_dictionary)
  df_feature_sec = df.query(f'seconds_in_bucket >= {second}').groupby('time_id').agg(aggregate_dictionary)
  df_feature = pd.DataFrame(df.groupby(['time_id']).agg(create_feature_dict)).reset_index()
  df_feature_sec = pd.DataFrame(df.query(f'seconds_in_bucket >= {second}').groupby(['time_id']).agg(create_feature_dict)).reset_index()
  df_feature = df.groupby('time_id').agg(aggregate_dictionary)
  df_feature = df.groupby('time_id').agg(aggregate_dictionary)
  df_feature_sec = df.query(f'seconds_in_bucket >= {second}').groupby('time_id

Unnamed: 0,log_return_realized_volatility,log_return2_realized_volatility,...,trade_size_sum_300,trade_order_count_mean_300
0,0.004499,0.006999,...,1587.0,2.571429
1,0.001204,0.002476,...,900.0,2.250000
2,0.002369,0.004801,...,1189.0,3.166667
3,0.002574,0.003637,...,1556.0,5.111111
4,0.001894,0.003257,...,1219.0,4.909091
...,...,...,...,...,...
3825,0.003723,0.004996,...,1889.0,3.608696
3826,0.010829,0.012168,...,30858.0,8.136364
3827,0.003135,0.004268,...,980.0,2.727273
3828,0.003750,0.005773,...,8274.0,2.701754
