In [13]:
%load_ext line_profiler
import numpy as np
import pandas as pd
from glob import glob
from numba import njit
from multiprocessing import Pool

column_names = [
    "time_id",           # 0
    "seconds_in_bucket", # 1
    "bid_price1",        # 2
    "ask_price1",        # 3
    "bid_price2",        # 4
    "ask_price2",        # 5
    "bid_size1",         # 6
    "ask_size1",         # 7
    "bid_size2",         # 8
    "ask_size2"          # 9
]

@njit
def fill_array(book_data, filled_data):
    filled_data[0] = book_data[0]
    last_read_idx = 0
    for row_idx in range(1, 600):
        # print(row_idx, last_read_idx, int(book_data[last_read_idx + 1][1]), int(book_data[last_read_idx + 1][1]) == row_idx)
        if int(book_data[last_read_idx + 1][1]) == row_idx:
            last_read_idx += 1
        filled_data[row_idx] = book_data[last_read_idx]
        filled_data[row_idx][1] = row_idx

@njit
def calculate_features(filled_data):
    filled_data = filled_data.transpose()

    trade_vols1 = filled_data[6] + filled_data[7]
    trade_vols2 = filled_data[8] + filled_data[9]
    trade_diffs1 = filled_data[7] - filled_data[6]
    trade_diffs2 = filled_data[9] - filled_data[8]

    spreads1 = (filled_data[2] / filled_data[3]) - 1
    spreads2 = (filled_data[4] / filled_data[5]) - 1

    waps1 = (filled_data[2] * filled_data[7] + filled_data[3] * filled_data[6]) / (filled_data[6] + filled_data[7])
    waps2 = (filled_data[4] * filled_data[9] + filled_data[5] * filled_data[8]) / (filled_data[8] + filled_data[9])

    logs1 = np.diff(np.log(waps1))
    logs2 = np.diff(np.log(waps2))

    return [
        waps1.mean(),
        waps2.mean(),
        waps1[300:].mean(),
        waps2[300:].mean(),
        waps1.std(),
        waps2.std(),
        waps1[300:].std(),
        waps2[300:].std(),
        logs1.mean(),
        logs2.mean(),
        logs1[300:].mean(),
        logs2[300:].mean(),
        logs1.std(), # Essentially volatility1
        logs2.std(), # Essentially volatility2
        trade_vols1.mean(),
        trade_vols2.mean(),
        trade_vols1[300:].mean(),
        trade_vols2[300:].mean(),
        trade_diffs1.mean(),
        trade_diffs2.mean(),
        trade_diffs1[300:].mean(),
        trade_diffs2[300:].mean(),
        int(filled_data[0][0])
    ]

@njit
def calculate_features(filled_data):
    filled_data = filled_data.transpose()
    
    trade_vols1 = filled_data[6] + filled_data[7]
    trade_vols2 = filled_data[8] + filled_data[9]
    trade_diffs1 = filled_data[7] - filled_data[6]
    trade_diffs2 = filled_data[9] - filled_data[8]
    
    spreads1 = (filled_data[2] / filled_data[3]) - 1
    spreads2 = (filled_data[4] / filled_data[5]) - 1
    
    waps1 = (filled_data[2] * filled_data[7] + filled_data[3] * filled_data[6]) / (filled_data[6] + filled_data[7])
    waps2 = (filled_data[4] * filled_data[9] + filled_data[5] * filled_data[8]) / (filled_data[8] + filled_data[9])
    
    logs1 = np.diff(np.log(waps1))
    logs2 = np.diff(np.log(waps2))
    
    return [
        waps1.mean(), 
        waps2.mean(),
        waps1[300:].mean(),
        waps2[300:].mean(),
        waps1.std(),
        waps2.std(),
        waps1[300:].std(),
        waps2[300:].std(),
        logs1.mean(),
        logs2.mean(),
        logs1[300:].mean(),
        logs2[300:].mean(),
        logs1.std(), # Essentially volatility1
        logs2.std(), # Essentially volatility2
        trade_vols1.mean(),
        trade_vols2.mean(),
        trade_vols1[300:].mean(),
        trade_vols2[300:].mean(),
        trade_diffs1.mean(),
        trade_diffs2.mean(),
        trade_diffs1[300:].mean(),
        trade_diffs2[300:].mean(),
        int(filled_data[0][0])
    ]

feature_columns = [
    "wap1", "wap2", "wap1l", "wap2l", "wap1_std", "wap2_std", "wap1l_std", "wap2l_std", "log1", "log2", "log1l", "log2l",
    "volume1", "volume2", "volume1l", "volume2l", "diff1", "diff2", "diff1l", "diff2l", "vol1", "vol2", "time_id", "stock_id"
]

def process_single_stock(file_path):
    book = pd.read_parquet(file_path, engine="pyarrow").to_numpy(dtype=np.float32)
    group_features = process_groups(book, int(file_path.split('=')[1]))
    return group_features

def preprocess_data(train_path):
    worker_pool = Pool(processes=None)
    full_feature_list_matrix = worker_pool.map(process_single_stock, train_files)
    worker_pool.close()
    worker_pool.join()
    return_feature_list = []
    for feature_list in full_feature_list_matrix:
        return_feature_list += feature_list
        
    return pd.DataFrame(return_feature_list, columns=feature_columns)

The line_profiler extension is already loaded. To reload it, use:
  %reload_ext line_profiler


In [14]:
@njit
def process_groups(dataset, stock_id):
    ret_lis = []
    last_split_pos = 0
    filled_data = np.zeros((600, 10), dtype=np.float32)
    for split_pos in np.nonzero(np.diff(dataset[:,0]))[0][:-1]:
        data_split = dataset[last_split_pos:split_pos]
        fill_array(data_split, filled_data)
        features = calculate_features(filled_data)
        ret_lis.append(features + [stock_id])
        last_split_pos = split_pos
    return ret_lis

In [17]:
train_targets = pd.read_csv("../data/train.csv")
train_targets['row_id'] = train_targets['stock_id'].astype(str) + '-' + train_targets['time_id'].astype(str)
train_targets = train_targets[['row_id','target']].set_index("row_id")
train_files = glob("../data/book_train.parquet/*")

In [18]:
preprocessed_data = preprocess_data(train_files)
display(preprocessed_data)

Unnamed: 0,wap1,wap2,wap1l,wap2l,wap1_std,wap2_std,wap1l_std,wap2l_std,log1,log2,...,volume1l,volume2l,diff1,diff2,diff1l,diff2l,vol1,vol2,time_id,stock_id
0,1.003173,1.003192,1.002904,1.002947,7.948226e-04,0.000797,5.555469e-04,5.639873e-04,0.000002,9.314862e-07,...,158.491669,143.378326,134.893326,115.629997,39.828335,23.731667,18.126667,-14.803333,5.0,84.0
1,1.003072,1.002741,1.003070,1.002734,1.430511e-06,0.000006,0.000000e+00,0.000000e+00,0.000000,0.000000e+00,...,152.000000,150.000000,152.000000,150.000000,-102.000000,100.000000,-102.000000,100.000000,5.0,84.0
2,0.997537,0.997411,0.997538,0.997407,1.549721e-06,0.000001,2.086163e-06,2.682209e-06,0.000000,0.000000e+00,...,325.000000,125.000000,325.000000,125.000000,-125.000000,75.000000,-125.000000,75.000000,11.0,84.0
3,0.999083,0.998943,0.999081,0.998944,4.827976e-06,0.000002,3.278255e-06,5.960464e-07,0.000000,0.000000e+00,...,200.000000,348.000000,200.000000,348.000000,-50.000000,252.000000,-50.000000,252.000000,16.0,84.0
4,0.998209,0.998071,0.998209,0.998074,1.668930e-06,0.000001,2.086163e-06,4.053116e-06,0.000000,0.000000e+00,...,104.000000,50.000000,104.000000,50.000000,-96.000000,0.000000,-96.000000,0.000000,31.0,84.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
428703,0.992824,0.992778,0.992828,0.992780,1.430511e-06,0.000003,2.443790e-06,1.251698e-06,0.000000,0.000000e+00,...,345.000000,230.000000,345.000000,230.000000,-145.000000,-30.000000,-145.000000,-30.000000,32746.0,69.0
428704,0.998621,0.998656,0.998624,0.998655,1.788139e-07,0.000004,2.980232e-06,2.741814e-06,0.000000,0.000000e+00,...,403.000000,800.000000,403.000000,800.000000,197.000000,-200.000000,197.000000,-200.000000,32748.0,69.0
428705,0.999548,0.999578,0.999546,0.999583,5.960464e-07,0.000006,3.397465e-06,4.172325e-07,0.000000,0.000000e+00,...,430.000000,300.000000,430.000000,300.000000,30.000000,-100.000000,30.000000,-100.000000,32750.0,69.0
428706,0.996425,0.996419,0.996429,0.996417,2.861023e-06,0.000003,1.430511e-06,1.132488e-06,0.000000,0.000000e+00,...,399.000000,391.000000,399.000000,391.000000,-131.000000,-9.000000,-131.000000,-9.000000,32751.0,69.0


In [None]:
%timeit preprocessed_data = preprocess_data(train_files)