In [46]:
from fastai.tabular.all import *
from multiprocessing import Pool
import lightgbm as lgb
from sklearn.model_selection import GroupKFold


In [17]:
data_dir = Path('../input/optiver-realized-volatility-prediction')

In [24]:
# Function to calculate first WAP
def calc_wap1(df):
    wap = (df['bid_price1'] * df['ask_size1'] + df['ask_price1'] * df['bid_size1']) / (df['bid_size1'] + df['ask_size1'])
    return wap

# Function to calculate second WAP
def calc_wap2(df):
    wap = (df['bid_price2'] * df['ask_size2'] + df['ask_price2'] * df['bid_size2']) / (df['bid_size2'] + df['ask_size2'])
    return wap

# Function to calculate the log of the return
# Remember that logb(x / y) = logb(x) - logb(y)
def log_return(series):
    return np.log(series).diff()

# Calculate the realized volatility
def realized_volatility(series):
    return np.sqrt(np.sum(series**2))

# Function to count unique elements of a series
def count_unique(series):
    return len(np.unique(series))

# Function to read our base train and test set
def read_train_test():
    train = pd.read_csv('../input/optiver-realized-volatility-prediction/train.csv')
    test = pd.read_csv('../input/optiver-realized-volatility-prediction/test.csv')
    # Create a key to merge with book and trade data
    train['row_id'] = train['stock_id'].astype(str) + '-' + train['time_id'].astype(str)
    test['row_id'] = test['stock_id'].astype(str) + '-' + test['time_id'].astype(str)
    print(f'Our training set has {train.shape[0]} rows')
    return train, test

# Function to preprocess book data (for each stock id)
def book_preprocessor(file_path):
    df = pd.read_parquet(file_path)
    # Calculate Wap
    df['wap1'] = calc_wap1(df)
    df['wap2'] = calc_wap2(df)
    # Calculate log returns
    df['log_return1'] = df.groupby(['time_id'])['wap1'].apply(log_return)
    df['log_return2'] = df.groupby(['time_id'])['wap2'].apply(log_return)
    # Calculate wap balance
    df['wap_balance'] = abs(df['wap1'] - df['wap2'])
    # Calculate spread
    df['price_spread'] = (df['ask_price1'] - df['bid_price1']) / ((df['ask_price1'] + df['bid_price1']) / 2)
    df['bid_spread'] = df['bid_price1'] - df['bid_price2']
    df['ask_spread'] = df['ask_price1'] - df['ask_price2']
    df['total_volume'] = (df['ask_size1'] + df['ask_size2']) + (df['bid_size1'] + df['bid_size2'])
    df['volume_imbalance'] = abs((df['ask_size1'] + df['ask_size2']) - (df['bid_size1'] + df['bid_size2']))
    
    # Dict for aggregations
    create_feature_dict = {
        'wap1': [np.sum, np.mean, np.std],
        'wap2': [np.sum, np.mean, np.std],
        'log_return1': [np.sum, realized_volatility, np.mean, np.std],
        'log_return2': [np.sum, realized_volatility, np.mean, np.std],
        'wap_balance': [np.sum, np.mean, np.std],
        'price_spread':[np.sum, np.mean, np.std],
        'bid_spread':[np.sum, np.mean, np.std],
        'ask_spread':[np.sum, np.mean, np.std],
        'total_volume':[np.sum, np.mean, np.std],
        'volume_imbalance':[np.sum, np.mean, np.std]
    }
    
    # Function to get group stats for different windows (seconds in bucket)
    def get_stats_window(seconds_in_bucket, add_suffix = False):
        # Group by the window
        df_feature = df[df['seconds_in_bucket'] >= seconds_in_bucket].groupby(['time_id']).agg(create_feature_dict).reset_index()
        # Rename columns joining suffix
        df_feature.columns = ['_'.join(col) for col in df_feature.columns]
        # Add a suffix to differentiate windows
        if add_suffix:
            df_feature = df_feature.add_suffix('_' + str(seconds_in_bucket))
        return df_feature
    
    # Get the stats for different windows
    df_feature = get_stats_window(seconds_in_bucket = 0, add_suffix = False)
    df_feature_450 = get_stats_window(seconds_in_bucket = 450, add_suffix = True)
    df_feature_300 = get_stats_window(seconds_in_bucket = 300, add_suffix = True)
    df_feature_150 = get_stats_window(seconds_in_bucket = 150, add_suffix = True)
    
    # Merge all
    df_feature = df_feature.merge(df_feature_450, how = 'left', left_on = 'time_id_', right_on = 'time_id__450')
    df_feature = df_feature.merge(df_feature_300, how = 'left', left_on = 'time_id_', right_on = 'time_id__300')
    df_feature = df_feature.merge(df_feature_150, how = 'left', left_on = 'time_id_', right_on = 'time_id__150')
    # Drop unnecesary time_ids
    df_feature.drop(['time_id__450', 'time_id__300', 'time_id__150'], axis = 1, inplace = True)
    
    # Create row_id so we can merge
    stock_id = str(file_path).split('=')[1]
    df_feature['row_id'] = df_feature['time_id_'].apply(lambda x: f'{stock_id}-{x}')
    df_feature.drop(['time_id_'], axis = 1, inplace = True)
    return df_feature

# Function to preprocess trade data (for each stock id)
def trade_preprocessor(file_path):
    df = pd.read_parquet(file_path)
    df['log_return'] = df.groupby('time_id')['price'].apply(log_return)
    
    # Dict for aggregations
    create_feature_dict = {
        'log_return':[realized_volatility],
        'seconds_in_bucket':[count_unique],
        'size':[np.sum],
        'order_count':[np.mean],
    }
    
    # Function to get group stats for different windows (seconds in bucket)
    def get_stats_window(seconds_in_bucket, add_suffix = False):
        # Group by the window
        df_feature = df[df['seconds_in_bucket'] >= seconds_in_bucket].groupby(['time_id']).agg(create_feature_dict).reset_index()
        # Rename columns joining suffix
        df_feature.columns = ['_'.join(col) for col in df_feature.columns]
        # Add a suffix to differentiate windows
        if add_suffix:
            df_feature = df_feature.add_suffix('_' + str(seconds_in_bucket))
        return df_feature
    
    # Get the stats for different windows
    df_feature = get_stats_window(seconds_in_bucket = 0, add_suffix = False)
    df_feature_450 = get_stats_window(seconds_in_bucket = 450, add_suffix = True)
    df_feature_300 = get_stats_window(seconds_in_bucket = 300, add_suffix = True)
    df_feature_150 = get_stats_window(seconds_in_bucket = 150, add_suffix = True)

    # Merge all
    df_feature = df_feature.merge(df_feature_450, how = 'left', left_on = 'time_id_', right_on = 'time_id__450')
    df_feature = df_feature.merge(df_feature_300, how = 'left', left_on = 'time_id_', right_on = 'time_id__300')
    df_feature = df_feature.merge(df_feature_150, how = 'left', left_on = 'time_id_', right_on = 'time_id__150')
    # Drop unnecesary time_ids
    df_feature.drop(['time_id__450', 'time_id__300', 'time_id__150'], axis = 1, inplace = True)
    
    df_feature = df_feature.add_prefix('trade_')
    stock_id = str(file_path).split('=')[1]
    df_feature['row_id'] = df_feature['trade_time_id_'].apply(lambda x:f'{stock_id}-{x}')
    df_feature.drop(['trade_time_id_'], axis = 1, inplace = True)
    return df_feature

# Function to get group stats for the stock_id and time_id
def get_time_stock(df):
    # Get realized volatility columns
    vol_cols = ['log_return1_realized_volatility', 'log_return2_realized_volatility', 'log_return1_realized_volatility_450', 'log_return2_realized_volatility_450', 
                'log_return1_realized_volatility_300', 'log_return2_realized_volatility_300', 'log_return1_realized_volatility_150', 'log_return2_realized_volatility_150', 
                'trade_log_return_realized_volatility', 'trade_log_return_realized_volatility_450', 'trade_log_return_realized_volatility_300', 'trade_log_return_realized_volatility_150']

    # Group by the stock id
    df_stock_id = df.groupby(['stock_id'])[vol_cols].agg(['mean', 'std', 'max', 'min', ]).reset_index()
    # Rename columns joining suffix
    df_stock_id.columns = ['_'.join(col) for col in df_stock_id.columns]
    df_stock_id = df_stock_id.add_suffix('_' + 'stock')

    # Group by the stock id
    df_time_id = df.groupby(['time_id'])[vol_cols].agg(['mean', 'std', 'max', 'min', ]).reset_index()
    # Rename columns joining suffix
    df_time_id.columns = ['_'.join(col) for col in df_time_id.columns]
    df_time_id = df_time_id.add_suffix('_' + 'time')
    
    # Merge with original dataframe
    df = df.merge(df_stock_id, how = 'left', left_on = ['stock_id'], right_on = ['stock_id__stock'])
    df = df.merge(df_time_id, how = 'left', left_on = ['time_id'], right_on = ['time_id__time'])
    df.drop(['stock_id__stock', 'time_id__time'], axis = 1, inplace = True)
    return df
    


In [25]:
def preprocess_one_stock(stock_id, typ='train'):
    file_path_book = data_dir / f'book_{typ}.parquet/stock_id={stock_id}'
    file_path_trade = data_dir / f'trade_{typ}.parquet/stock_id={stock_id}'
    return pd.merge(book_preprocessor(file_path_book), trade_preprocessor(file_path_trade), on = 'row_id', how = 'left')
        

In [26]:

def preprocess_all(list_stock_ids, typ='train'):
    pool = Pool(16)
    df = pool.starmap(preprocess_one_stock, zip(list_stock_ids, [typ]*len(list_stock_ids)))
    df = pd.concat(df, ignore_index = True)
    return df

In [40]:
train, test = read_train_test()
all_stock_ids = train['stock_id'].unique()

Our training set has 428932 rows


In [28]:
%%time
train_features = preprocess_all(all_stock_ids, 'train')

CPU times: user 1.88 s, sys: 538 ms, total: 2.42 s
Wall time: 2min 9s


In [41]:
train = train.merge(train_features, on = ['row_id'], how = 'left')

In [42]:
test_stock_ids = test['stock_id'].unique()
test_features = preprocess_all(test_stock_ids, 'test')
test = test.merge(test_features, on = ['row_id'], how = 'left')

In [43]:
train = get_time_stock(train)
test = get_time_stock(test)

In [55]:
train[train.wap1_std_450.isna()]

Unnamed: 0,stock_id,time_id,target,row_id,wap1_sum,wap1_mean,wap1_std,wap2_sum,wap2_mean,wap2_std,...,trade_log_return_realized_volatility_450_max_time,trade_log_return_realized_volatility_450_min_time,trade_log_return_realized_volatility_300_mean_time,trade_log_return_realized_volatility_300_std_time,trade_log_return_realized_volatility_300_max_time,trade_log_return_realized_volatility_300_min_time,trade_log_return_realized_volatility_150_mean_time,trade_log_return_realized_volatility_150_std_time,trade_log_return_realized_volatility_150_max_time,trade_log_return_realized_volatility_150_min_time
115584,33,5658,0.002988,33-5658,42.056143,1.001337,0.000821,42.067893,1.001617,0.000718,...,0.003732,0.0,0.001084,0.00079,0.005296,0.0,0.001354,0.000941,0.006211,6.3e-05
132867,37,22042,0.002851,37-22042,48.055167,1.001149,0.00084,48.079744,1.001661,0.001137,...,0.00221,0.0,0.000994,0.000426,0.003011,0.0,0.001246,0.000502,0.00332,0.0


In [35]:
# Function to calculate the root mean squared percentage error
def rmspe(y_true, y_pred):
    return np.sqrt(np.mean(np.square((y_true - y_pred) / y_true)))

# Function to early stop with root mean squared percentage error
def feval_rmspe(y_pred, lgb_train):
    y_true = lgb_train.get_label()
    return 'RMSPE', rmspe(y_true, y_pred), False

def train_and_evaluate(train, test):
    # Hyperparammeters (just basic)
    params = {
      'objective': 'rmse',  
      'boosting_type': 'gbdt',
      'num_leaves': 100,
      'n_jobs': -1,
      'learning_rate': 0.1,
      'feature_fraction': 0.8,
      'bagging_fraction': 0.8,
      'verbose': -1
    }
    
    # Split features and target
    x = train.drop(['row_id', 'target', 'time_id'], axis = 1)
    y = train['target']
    x_test = test.drop(['row_id', 'time_id'], axis = 1)
    # Transform stock id to a numeric value
    x['stock_id'] = x['stock_id'].astype(int)
    x_test['stock_id'] = x_test['stock_id'].astype(int)
    
    # Create out of folds array
    oof_predictions = np.zeros(x.shape[0])
    # Create test array to store predictions
    test_predictions = np.zeros(x_test.shape[0])
    # Create a KFold object
    kfold = KFold(n_splits = 5, random_state = 66, shuffle = True)
    # Iterate through each fold
    for fold, (trn_ind, val_ind) in enumerate(kfold.split(x)):
        print(f'Training fold {fold + 1}')
        x_train, x_val = x.iloc[trn_ind], x.iloc[val_ind]
        y_train, y_val = y.iloc[trn_ind], y.iloc[val_ind]
        # Root mean squared percentage error weights
        train_weights = 1 / np.square(y_train)
        val_weights = 1 / np.square(y_val)
        train_dataset = lgb.Dataset(x_train, y_train, weight = train_weights, categorical_feature = ['stock_id'])
        val_dataset = lgb.Dataset(x_val, y_val, weight = val_weights, categorical_feature = ['stock_id'])
        model = lgb.train(params = params, 
                          train_set = train_dataset, 
                          valid_sets = [train_dataset, val_dataset], 
                          num_boost_round = 10000, 
                          early_stopping_rounds = 50, 
                          verbose_eval = 50,
                          feval = feval_rmspe)
        # Add predictions to the out of folds array
        oof_predictions[val_ind] = model.predict(x_val)
        # Predict the test set
        test_predictions += model.predict(x_test) / 5
        
    rmspe_score = rmspe(y, oof_predictions)
    print(f'Our out of folds RMSPE is {rmspe_score}')
    # Return test predictions
    return test_predictions

In [45]:
test_predictions = train_and_evaluate(train, test)

Training fold 1




Training until validation scores don't improve for 50 rounds
[50]	training's rmse: 0.000437695	training's RMSPE: 0.202721	valid_1's rmse: 0.000468736	valid_1's RMSPE: 0.216291
[100]	training's rmse: 0.000404404	training's RMSPE: 0.187303	valid_1's rmse: 0.000447445	valid_1's RMSPE: 0.206467
[150]	training's rmse: 0.000386182	training's RMSPE: 0.178863	valid_1's rmse: 0.000441498	valid_1's RMSPE: 0.203722
[200]	training's rmse: 0.000371746	training's RMSPE: 0.172176	valid_1's rmse: 0.000439708	valid_1's RMSPE: 0.202896
[250]	training's rmse: 0.000360491	training's RMSPE: 0.166964	valid_1's rmse: 0.000436703	valid_1's RMSPE: 0.20151
[300]	training's rmse: 0.000351347	training's RMSPE: 0.162729	valid_1's rmse: 0.000435978	valid_1's RMSPE: 0.201175
[350]	training's rmse: 0.000343081	training's RMSPE: 0.1589	valid_1's rmse: 0.000435605	valid_1's RMSPE: 0.201003
[400]	training's rmse: 0.000335613	training's RMSPE: 0.155441	valid_1's rmse: 0.000435845	valid_1's RMSPE: 0.201114
Early stopping,



Training until validation scores don't improve for 50 rounds
[50]	training's rmse: 0.000437079	training's RMSPE: 0.202178	valid_1's rmse: 0.000455373	valid_1's RMSPE: 0.211199
[100]	training's rmse: 0.000404658	training's RMSPE: 0.187181	valid_1's rmse: 0.000435852	valid_1's RMSPE: 0.202145
[150]	training's rmse: 0.000386611	training's RMSPE: 0.178834	valid_1's rmse: 0.000431531	valid_1's RMSPE: 0.200141
[200]	training's rmse: 0.000372167	training's RMSPE: 0.172152	valid_1's rmse: 0.000427041	valid_1's RMSPE: 0.198059
[250]	training's rmse: 0.000361305	training's RMSPE: 0.167128	valid_1's rmse: 0.000424855	valid_1's RMSPE: 0.197045
[300]	training's rmse: 0.000351938	training's RMSPE: 0.162795	valid_1's rmse: 0.00042387	valid_1's RMSPE: 0.196588
[350]	training's rmse: 0.000343607	training's RMSPE: 0.158941	valid_1's rmse: 0.000424168	valid_1's RMSPE: 0.196726
Early stopping, best iteration is:
[313]	training's rmse: 0.000349602	training's RMSPE: 0.161715	valid_1's rmse: 0.000423492	vali



Training until validation scores don't improve for 50 rounds
[50]	training's rmse: 0.000438414	training's RMSPE: 0.20292	valid_1's rmse: 0.000457371	valid_1's RMSPE: 0.211609
[100]	training's rmse: 0.00040464	training's RMSPE: 0.187287	valid_1's rmse: 0.000436721	valid_1's RMSPE: 0.202055
[150]	training's rmse: 0.000385302	training's RMSPE: 0.178337	valid_1's rmse: 0.000428476	valid_1's RMSPE: 0.19824
[200]	training's rmse: 0.000372146	training's RMSPE: 0.172248	valid_1's rmse: 0.00042642	valid_1's RMSPE: 0.197289
[250]	training's rmse: 0.000361012	training's RMSPE: 0.167094	valid_1's rmse: 0.00042447	valid_1's RMSPE: 0.196387
[300]	training's rmse: 0.000351913	training's RMSPE: 0.162883	valid_1's rmse: 0.000423726	valid_1's RMSPE: 0.196043
[350]	training's rmse: 0.000343777	training's RMSPE: 0.159117	valid_1's rmse: 0.000425037	valid_1's RMSPE: 0.196649
Early stopping, best iteration is:
[311]	training's rmse: 0.000350061	training's RMSPE: 0.162025	valid_1's rmse: 0.000423527	valid_1'



Training until validation scores don't improve for 50 rounds
[50]	training's rmse: 0.000437357	training's RMSPE: 0.202107	valid_1's rmse: 0.000472618	valid_1's RMSPE: 0.220056
[100]	training's rmse: 0.000404156	training's RMSPE: 0.186765	valid_1's rmse: 0.000451593	valid_1's RMSPE: 0.210267
[150]	training's rmse: 0.0003861	training's RMSPE: 0.178421	valid_1's rmse: 0.000444464	valid_1's RMSPE: 0.206948
[200]	training's rmse: 0.000371469	training's RMSPE: 0.17166	valid_1's rmse: 0.000439703	valid_1's RMSPE: 0.204731
[250]	training's rmse: 0.000360705	training's RMSPE: 0.166685	valid_1's rmse: 0.000438431	valid_1's RMSPE: 0.204138
[300]	training's rmse: 0.00035159	training's RMSPE: 0.162474	valid_1's rmse: 0.000437656	valid_1's RMSPE: 0.203777
Early stopping, best iteration is:
[266]	training's rmse: 0.000357593	training's RMSPE: 0.165248	valid_1's rmse: 0.00043743	valid_1's RMSPE: 0.203672
Training fold 5




Training until validation scores don't improve for 50 rounds
[50]	training's rmse: 0.000437719	training's RMSPE: 0.202829	valid_1's rmse: 0.000457484	valid_1's RMSPE: 0.210691
[100]	training's rmse: 0.000403501	training's RMSPE: 0.186974	valid_1's rmse: 0.000435485	valid_1's RMSPE: 0.20056
[150]	training's rmse: 0.000385006	training's RMSPE: 0.178403	valid_1's rmse: 0.000428976	valid_1's RMSPE: 0.197563
[200]	training's rmse: 0.00037173	training's RMSPE: 0.172251	valid_1's rmse: 0.000425177	valid_1's RMSPE: 0.195813
[250]	training's rmse: 0.000360851	training's RMSPE: 0.167211	valid_1's rmse: 0.000422936	valid_1's RMSPE: 0.19478
[300]	training's rmse: 0.000351332	training's RMSPE: 0.1628	valid_1's rmse: 0.000421686	valid_1's RMSPE: 0.194205
[350]	training's rmse: 0.00034373	training's RMSPE: 0.159277	valid_1's rmse: 0.000421806	valid_1's RMSPE: 0.19426
[400]	training's rmse: 0.000336268	training's RMSPE: 0.155819	valid_1's rmse: 0.000421895	valid_1's RMSPE: 0.194301
Early stopping, bes

In [6]:
# Save test predictions
test['target'] = test_predictions
test[['row_id', 'target']].to_csv('submission.csv',index = False)

Our training set has 428932 rows


[Parallel(n_jobs=-1)]: Using backend LokyBackend with 16 concurrent workers.
[Parallel(n_jobs=-1)]: Done  18 tasks      | elapsed:   44.4s
[Parallel(n_jobs=-1)]: Done 112 out of 112 | elapsed:  2.3min finished
[Parallel(n_jobs=-1)]: Using backend LokyBackend with 16 concurrent workers.
[Parallel(n_jobs=-1)]: Done   1 out of   1 | elapsed:    0.2s finished


Training fold 1
Training until validation scores don't improve for 50 rounds
[50]	training's rmse: 0.000437695	training's RMSPE: 0.202721	valid_1's rmse: 0.000468736	valid_1's RMSPE: 0.216291
[100]	training's rmse: 0.000404404	training's RMSPE: 0.187303	valid_1's rmse: 0.000447445	valid_1's RMSPE: 0.206467
[150]	training's rmse: 0.000386182	training's RMSPE: 0.178863	valid_1's rmse: 0.000441498	valid_1's RMSPE: 0.203722
[200]	training's rmse: 0.000371746	training's RMSPE: 0.172176	valid_1's rmse: 0.000439708	valid_1's RMSPE: 0.202896
[250]	training's rmse: 0.000360491	training's RMSPE: 0.166964	valid_1's rmse: 0.000436703	valid_1's RMSPE: 0.20151
[300]	training's rmse: 0.000351347	training's RMSPE: 0.162729	valid_1's rmse: 0.000435978	valid_1's RMSPE: 0.201175
[350]	training's rmse: 0.000343081	training's RMSPE: 0.1589	valid_1's rmse: 0.000435605	valid_1's RMSPE: 0.201003
[400]	training's rmse: 0.000335613	training's RMSPE: 0.155441	valid_1's rmse: 0.000435845	valid_1's RMSPE: 0.201114

In [47]:
train.to_csv('train_with_features.csv', index=False)