In [1]:
import os
import glob
from joblib import Parallel, delayed
import pandas as pd
import numpy as np
import scipy as sc
from scipy.stats import skew, kurtosis, median_absolute_deviation
from sklearn.model_selection import KFold
import lightgbm as lgb
import warnings
warnings.filterwarnings('ignore')
pd.set_option('max_columns', 300)
data_dir = '../../data/'

In [4]:
# Function to calculate first WAP
def calc_wap1(df):
    wap = (df['bid_price1'] * df['ask_size1'] + df['ask_price1'] * df['bid_size1']) / (df['bid_size1'] + df['ask_size1'])
    return wap

# Function to calculate second WAP
def calc_wap2(df):
    wap = (df['bid_price2'] * df['ask_size2'] + df['ask_price2'] * df['bid_size2']) / (df['bid_size2'] + df['ask_size2'])
    return wap

# Function to calculate the log of the return
# Remember that logb(x / y) = logb(x) - logb(y)
def log_return(series):
    return np.log(series).diff()

# Calculate the realized volatility
def realized_volatility(series):
    return np.sqrt(np.sum(series**2))

# Function to count unique elements of a series
def count_unique(series):
    return len(np.unique(series))

# Function to read our base train and test set
def read_train_test():
    train = pd.read_csv(data_dir + 'train.csv')
    test = pd.read_csv(data_dir + 'test.csv')
    # Create a key to merge with book and trade data
    train['row_id'] = train['stock_id'].astype(str) + '-' + train['time_id'].astype(str)
    test['row_id'] = test['stock_id'].astype(str) + '-' + test['time_id'].astype(str)
    print(f'Our training set has {train.shape[0]} rows')
    return train, test

# Function to preprocess book data (for each stock id)
def book_preprocessor(file_path):
    df = pd.read_parquet(file_path)
    # Calculate Wap
    df['wap1'] = calc_wap1(df)
    df['wap2'] = calc_wap2(df)
    # Calculate log returns
    df['log_return1'] = df.groupby(['time_id'])['wap1'].apply(log_return).fillna(0)
    df['log_return2'] = df.groupby(['time_id'])['wap2'].apply(log_return).fillna(0)
    # Calculate wap balance
    df['wap_balance'] = abs(df['wap1'] - df['wap2'])
    # Calculate spread
    df['price_spread'] = (df['ask_price1'] - df['bid_price1']) / ((df['ask_price1'] + df['bid_price1']) / 2)
    df['bid_spread'] = df['bid_price1'] - df['bid_price2']
    df['ask_spread'] = df['ask_price1'] - df['ask_price2']
    df['total_volume'] = (df['ask_size1'] + df['ask_size2']) + (df['bid_size1'] + df['bid_size2'])
    df['volume_imbalance'] = abs((df['ask_size1'] + df['ask_size2']) - (df['bid_size1'] + df['bid_size2']))
    
    # Dict for aggregations
    create_feature_dict = {
        'bid_price1': [np.min, np.max, np.mean, np.median, np.std, np.sum, skew, median_absolute_deviation],
        'ask_price1': [np.min, np.max, np.mean, np.median, np.std, np.sum, skew, median_absolute_deviation],
        'bid_price2': [np.min, np.max, np.mean, np.median, np.std, np.sum, skew, median_absolute_deviation],
        'ask_price2': [np.min, np.max, np.mean, np.median, np.std, np.sum, skew, median_absolute_deviation],
        'wap1': [np.min, np.max, np.mean, np.median, np.std, np.sum, skew, median_absolute_deviation],
        'wap2': [np.min, np.max, np.mean, np.median, np.std, np.sum, skew, median_absolute_deviation],
        'log_return1': [realized_volatility, np.min, np.max, np.mean, np.median, np.std, np.sum, skew, median_absolute_deviation],
        'log_return2': [np.sum, realized_volatility, np.mean, np.std, skew, kurtosis, median_absolute_deviation],
        'wap_balance': [np.min, np.max, np.mean, np.median, np.std, np.sum, skew, median_absolute_deviation],
        'price_spread': [np.min, np.max, np.mean, np.median, np.std, np.sum, skew, median_absolute_deviation],
        'bid_spread': [np.min, np.max, np.mean, np.median, np.std, np.sum, skew, median_absolute_deviation],
        'ask_spread': [np.min, np.max, np.mean, np.median, np.std, np.sum, skew, median_absolute_deviation],
        'total_volume':[np.min, np.max, np.mean, np.median, np.std, np.sum, skew, median_absolute_deviation],
        'volume_imbalance':[np.min, np.max, np.mean, np.median, np.std, np.sum, skew, median_absolute_deviation],
    }
    
    # Function to get group stats for different windows (seconds in bucket)
    def get_stats_window(seconds_in_bucket, add_suffix = False):
        # Group by the window
        df_feature = df[df['seconds_in_bucket'] >= seconds_in_bucket].groupby(['time_id']).agg(create_feature_dict).reset_index()
        # Rename columns joining suffix
        df_feature.columns = ['_'.join(col) for col in df_feature.columns]
        # Add a suffix to differentiate windows
        if add_suffix:
            df_feature = df_feature.add_suffix('_' + str(seconds_in_bucket))
        return df_feature
    
    # Get the stats for different windows
    df_feature = get_stats_window(seconds_in_bucket = 0, add_suffix = False)
    
    # Create row_id so we can merge
    stock_id = file_path.split('=')[1]
    df_feature['row_id'] = df_feature['time_id_'].apply(lambda x: f'{stock_id}-{x}')
    df_feature.drop(['time_id_'], axis = 1, inplace = True)
    return df_feature

# Function to preprocess trade data (for each stock id)
def trade_preprocessor(file_path):
    df = pd.read_parquet(file_path)
    df['log_return'] = df.groupby('time_id')['price'].apply(log_return).fillna(0)
    
    # Dict for aggregations
    create_feature_dict = {
        'log_return':[realized_volatility, np.min, np.max, np.mean, np.median, np.std, np.sum, skew, median_absolute_deviation],
        'seconds_in_bucket':[count_unique],
        'size': [np.min, np.max, np.mean, np.median, np.std, np.sum, skew, median_absolute_deviation],
        'order_count':[np.min, np.max, np.mean, np.median, np.std, np.sum, skew, median_absolute_deviation],
    }
    
    # Function to get group stats for different windows (seconds in bucket)
    def get_stats_window(seconds_in_bucket, add_suffix = False):
        # Group by the window
        df_feature = df[df['seconds_in_bucket'] >= seconds_in_bucket].groupby(['time_id']).agg(create_feature_dict).reset_index()
        # Rename columns joining suffix
        df_feature.columns = ['_'.join(col) for col in df_feature.columns]
        # Add a suffix to differentiate windows
        if add_suffix:
            df_feature = df_feature.add_suffix('_' + str(seconds_in_bucket))
        return df_feature
    
    # Get the stats for different windows
    df_feature = get_stats_window(seconds_in_bucket = 0, add_suffix = False)
    
    df_feature = df_feature.add_prefix('trade_')
    stock_id = file_path.split('=')[1]
    df_feature['row_id'] = df_feature['trade_time_id_'].apply(lambda x:f'{stock_id}-{x}')
    df_feature.drop(['trade_time_id_'], axis = 1, inplace = True)
    return df_feature

    
# Funtion to make preprocessing function in parallel (for each stock id)
def preprocessor(list_stock_ids, is_train = True):
    
    # Parrallel for loop
    def for_joblib(stock_id):
        # Train
        if is_train:
            file_path_book = data_dir + "book_train.parquet/stock_id=" + str(stock_id)
            file_path_trade = data_dir + "trade_train.parquet/stock_id=" + str(stock_id)
        # Test
        else:
            file_path_book = data_dir + "book_test.parquet/stock_id=" + str(stock_id)
            file_path_trade = data_dir + "trade_test.parquet/stock_id=" + str(stock_id)
    
        # Preprocess book and trade data and merge them
        df_tmp = pd.merge(book_preprocessor(file_path_book), trade_preprocessor(file_path_trade), on = 'row_id', how = 'left')
        
        # Return the merge dataframe
        return df_tmp
    
    # Use parallel api to call paralle for loop
    df = Parallel(n_jobs = -1, verbose = 1)(delayed(for_joblib)(stock_id) for stock_id in list_stock_ids)
    # Concatenate all the dataframes that return from Parallel
    df = pd.concat(df, ignore_index = True)
    return df

# Function to calculate the root mean squared percentage error
def rmspe(y_true, y_pred):
    return np.sqrt(np.mean(np.square((y_true - y_pred) / y_true)))

# Function to early stop with root mean squared percentage error
def feval_rmspe(y_pred, lgb_train):
    y_true = lgb_train.get_label()
    return 'RMSPE', rmspe(y_true, y_pred), False

def train_and_evaluate(train, test):
    # Hyperparammeters (just basic)
    params = {
      'objective': 'rmse',  
      'boosting_type': 'gbdt',
      'num_leaves': 100,
      'n_jobs': -1,
      'learning_rate': 0.1,
      'feature_fraction': 0.8,
      'bagging_fraction': 0.8,
      'verbose': -1
    }
    
    # Split features and target
    x = train.drop(['row_id', 'target', 'time_id'], axis = 1)
    
    y = train['target']
    x_test = test.drop(['row_id', 'time_id'], axis = 1)
    
    print("Training dataset shape: ", train.shape)
    # Transform stock id to a numeric value
    x['stock_id'] = x['stock_id'].astype(int)
    x_test['stock_id'] = x_test['stock_id'].astype(int)
    
    # Create out of folds array
    oof_predictions = np.zeros(x.shape[0])
    # Create test array to store predictions
    test_predictions = np.zeros(x_test.shape[0])
    # Create a KFold object
    
    f_importances = []
    
    kfold = KFold(n_splits = 5, random_state = 66, shuffle = True)
    # Iterate through each fold
    for fold, (trn_ind, val_ind) in enumerate(kfold.split(x)):
        print(f'Training fold {fold + 1}')
        x_train, x_val = x.iloc[trn_ind], x.iloc[val_ind]
        y_train, y_val = y.iloc[trn_ind], y.iloc[val_ind]
        # Root mean squared percentage error weights
        train_weights = 1 / np.square(y_train)
        val_weights = 1 / np.square(y_val)
        train_dataset = lgb.Dataset(x_train, y_train, weight = train_weights, categorical_feature = ['stock_id'])
        val_dataset = lgb.Dataset(x_val, y_val, weight = val_weights, categorical_feature = ['stock_id'])
        model = lgb.train(params = params, 
                          train_set = train_dataset, 
                          valid_sets = [train_dataset, val_dataset], 
                          num_boost_round = 10000, 
                          early_stopping_rounds = 50, 
                          verbose_eval = 50,
                          feval = feval_rmspe)
        # Add predictions to the out of folds array
        oof_predictions[val_ind] = model.predict(x_val)
        # Predict the test set
        test_predictions += model.predict(x_test) / 5
        f_importances.append(model.feature_importance(importance_type="gain"))
        
    rmspe_score = rmspe(y, oof_predictions)
    print(f'Our out of folds RMSPE is {rmspe_score}')
    # Return test predictions
    return test_predictions, f_importances

# Read train and test
train, test = read_train_test()

# Get unique stock ids 
train_stock_ids = train['stock_id'].unique()
# Preprocess them using Parallel and our single stock id functions
train_ = preprocessor(train_stock_ids, is_train = True)
train = train.merge(train_, on = ['row_id'], how = 'left').dropna()

# Get unique stock ids 
test_stock_ids = test['stock_id'].unique()
# Preprocess them using Parallel and our single stock id functions
test_ = preprocessor(test_stock_ids, is_train = False)
test = test.merge(test_, on = ['row_id'], how = 'left')


# Traing and evaluate
# feature_importances is ndarray
test_predictions, feature_importances = train_and_evaluate(train, test)
# Save test predictions
test['target'] = test_predictions
test[['row_id', 'target']].to_csv('submission.csv',index = False)

Our training set has 428932 rows


[Parallel(n_jobs=-1)]: Using backend LokyBackend with 16 concurrent workers.
[Parallel(n_jobs=-1)]: Done  18 tasks      | elapsed:  2.3min
[Parallel(n_jobs=-1)]: Done 112 out of 112 | elapsed:  8.1min finished
[Parallel(n_jobs=-1)]: Using backend LokyBackend with 16 concurrent workers.
[Parallel(n_jobs=-1)]: Done   1 out of   1 | elapsed:    0.1s finished


Training dataset shape:  (420621, 142)
Training fold 1
Training until validation scores don't improve for 50 rounds
[50]	training's rmse: 0.0004678	training's RMSPE: 0.218788	valid_1's rmse: 0.000504259	valid_1's RMSPE: 0.233776
[100]	training's rmse: 0.000448625	training's RMSPE: 0.20982	valid_1's rmse: 0.000501697	valid_1's RMSPE: 0.232588
[150]	training's rmse: 0.000436407	training's RMSPE: 0.204105	valid_1's rmse: 0.000501192	valid_1's RMSPE: 0.232354
Early stopping, best iteration is:
[140]	training's rmse: 0.000438574	training's RMSPE: 0.205119	valid_1's rmse: 0.000500681	valid_1's RMSPE: 0.232117
Training fold 2
Training until validation scores don't improve for 50 rounds
[50]	training's rmse: 0.000469341	training's RMSPE: 0.218936	valid_1's rmse: 0.000494197	valid_1's RMSPE: 0.231528
[100]	training's rmse: 0.000450115	training's RMSPE: 0.209968	valid_1's rmse: 0.000489786	valid_1's RMSPE: 0.229462
[150]	training's rmse: 0.000437815	training's RMSPE: 0.20423	valid_1's rmse: 0.00

In [3]:
f_i = pd.DataFrame()
for model in models:
    feature_importance = feature_importance.merge(model.feature_importance(), on=[])

142

In [5]:
feature_importances[0]

array([1.35249198e+04, 2.40152162e+01, 4.58709981e+01, 4.97444927e+01,
       1.12189130e+01, 7.93384120e+01, 3.82348950e+02, 1.02328694e+02,
       4.64841319e+01, 4.56175892e+01, 2.47017560e+01, 1.52302420e+01,
       2.39184192e+01, 1.11273730e+02, 4.09671437e+02, 8.37795531e+01,
       8.49422371e+01, 3.23437839e+01, 3.04708359e+01, 1.61440470e+01,
       2.10521741e+01, 8.21642724e+01, 1.86239649e+02, 1.41207875e+02,
       5.03543310e+01, 3.64080545e+01, 7.64349420e+01, 1.39386492e+01,
       3.37915808e+01, 1.51009350e+02, 4.27122683e+02, 9.97189777e+01,
       5.37505744e+01, 3.55982858e+01, 3.50211899e+01, 4.60330701e+00,
       1.03099200e+01, 5.03938670e+01, 1.59529124e+02, 1.05833455e+02,
       6.43741977e+01, 2.93169805e+01, 3.69461438e+01, 1.68317129e+01,
       9.06367827e+01, 2.92182587e+02, 1.07715222e+02, 9.91297313e+01,
       3.19641718e+02, 3.22436371e+05, 5.29715665e+02, 6.31493432e+02,
       4.90370741e+01, 2.59214442e+01, 5.01252868e+02, 1.57388002e+02,
      