**Purpose**: This notebook imports raw market data from Databento, processing order book and trade records, and engineering features for analysis. It includes functions for filtering, merging, and aggregating the raw data to create a clean dataset with state-level and order-level features. Finally, it generates summary statistics and prepares train, validation, and test datasets for subsequent modeling.

In [4]:
import databento as db
import datetime
import io
import json
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
import pickle
import pytz
import requests
import statsmodels.api as sm
import sys
import warnings
from statsmodels.tsa.ar_model import AutoReg

import cupy as cp
import cudf
from cuml.preprocessing import StandardScaler
from cuml.linear_model import Ridge, ElasticNet
from cuml.ensemble import RandomForestRegressor as cuRF
from lightgbm import LGBMRegressor
import xgboost as xgb

from sklearn.ensemble import RandomForestRegressor

import gc

warnings.filterwarnings("ignore", category=DeprecationWarning)

### Prepare the Orderbook Data

In [14]:
def import_raw_mbp_data(ticker, year, month, day, exchange):
    """
    Imports mbp-10 data for a given day and ticker
    """
    client = db.Historical(#API_KEY)
    if exchange == 'xnas':
        try:
            df = db.DBNStore.from_file(f'/data/workspace_files/mbp-10/{ticker}/xnas-itch-{year}{month}{day}.mbp-10.{ticker}.dbn.zst').to_df()
        except:
            data = client.timeseries.get_range(
                    dataset="XNAS.ITCH",
                    schema="mbp-10",
                    stype_in="raw_symbol",
                    symbols=[ticker],
                    start=f"{year}-{month}-{day}T08:00:00",
                    end=f"{year}-{month}-{day}T24:00:00",
                )
            df = data.to_df()
    else:
        try:
            df = db.DBNStore.from_file(f'/data/workspace_files/mbp-10/{ticker}/xnys-pillar-{year}{month}{day}.mbp-10.{ticker}.dbn.zst').to_df()
        except:
            data = client.timeseries.get_range(
                    dataset="XNAS.ITCH",
                    schema="mbp-10",
                    stype_in="raw_symbol",
                    symbols=[ticker],
                    start=f"{year}-{month}-{day}T08:00:00",
                    end=f"{year}-{month}-{day}T24:00:00",
                )
            df = data.to_df()
            
    df['ts_event_local'] = pd.to_datetime(df['ts_event'], utc=True).dt.tz_convert('America/New_York') # convert time-zone
    df = df.reset_index(drop=True)
    df = df.set_index('ts_event_local', drop=False)                                                                                       
    df = df[(df['ts_event_local'].dt.time >= datetime.time(9, 40)) & 
            (df['ts_event_local'].dt.time <= datetime.time(15, 50))
        ]
    df['ts_event'] = df['ts_event_local']
    df = df.drop(columns=['rtype', 'publisher_id', 'instrument_id', 'action', 'side',
                        'depth', 'price', 'size', 'flags', 'ts_in_delta', 'sequence',
                        'symbol', 'ts_event_local'])
    return df


def import_raw_mbo_data(ticker, year, month, day, exchange):
    """
    Imports mbo data for a given day and ticker
    """
    client = db.Historical(API_KEY)
    if exchange == 'xnas':
        try:
            df = db.DBNStore.from_file(f'/data/workspace_files/mbo/{ticker}/xnas-itch-{year}{month}{day}.mbo.{ticker}.dbn.zst').to_df()
        except:
            data = client.timeseries.get_range(
                    dataset="XNAS.ITCH",
                    schema="mbo",
                    stype_in="raw_symbol",
                    symbols=[ticker],
                    start=f"{year}-{month}-{day}T08:00:00",
                    end=f"{year}-{month}-{day}T24:00:00",
                )
            df = data.to_df()

    else:
        try:
            df = db.DBNStore.from_file(f'/data/workspace_files/mbo/{ticker}/xnys-pillar-{year}{month}{day}.mbo.{ticker}.dbn.zst').to_df()
        except:
            data = client.timeseries.get_range(
                    dataset="XNYS.PILLAR",
                    schema="mbo",
                    stype_in="raw_symbol",
                    symbols=[ticker],
                    start=f"{year}-{month}-{day}T08:00:00",
                    end=f"{year}-{month}-{day}T24:00:00",
                )
            df = data.to_df()

    df['ts_event_local'] = pd.to_datetime(df['ts_event'], utc=True).dt.tz_convert('America/New_York') # convert time-zone
    df = df.reset_index(drop=True)
    df = df.set_index('ts_event_local', drop=False)                                                                                       
    df = df[(df['ts_event_local'].dt.time >= datetime.time(9, 40)) & 
            (df['ts_event_local'].dt.time <= datetime.time(15, 50))
        ]
    df['ts_event'] = df['ts_event_local']
    df = df.drop(columns=['rtype', 'publisher_id', 'instrument_id', 'channel_id',
                            'flags', 'ts_in_delta', 'sequence', 'ts_event_local'])

    return df

In [15]:
def filter_order_actions(df):
    """
    Given a DataFrame with columns 'ts_event', 'order_id', and 'action',
    drop rows where action is 'F' or 'C' at any timestamp (ts_event) where a 'T' action also appears.
    
    Parameters:
        df (pd.DataFrame): DataFrame containing 'ts_event', 'order_id', and 'action' columns.
        
    Returns:
        pd.DataFrame: Modified DataFrame with the undesired rows dropped.
    """
    # Identify ts_event timestamps where a 'T' action appears.
    ts_events_with_T = df.loc[df['action'] == 'T', 'ts_event'].unique()
    
    # Create a mask to drop rows with actions 'F' or 'C' at those ts_events.
    mask = ~(df['ts_event'].isin(ts_events_with_T) & df['action'].isin(['F', 'C']))
    
    # Return the filtered DataFrame.
    return df[mask].copy()


def combine_T_rows(df):
    """
    Some trades occur at the exact same timestamp with the same side. Treat these as one trade.
    Parameters:
        df (pd.DataFrame): DataFrame
    
    Returns:
        pd.DataFrame: Modified DataFrame with combined 'T' rows, sorted by ts_event.
    """
    # Separate rows with action 'T' from those with other actions
    df_T = df[df['action'] == 'T'].copy()
    df_non_T = df[df['action'] != 'T'].copy()
    
    # Function to combine rows in each group
    def combine_group(group):
        total_size = group['size'].sum()
        weighted_price = np.average(group['price'], weights=group['size'])
        # Use the first row as a template for the group
        combined = group.iloc[0].copy()
        combined['price'] = weighted_price
        combined['size'] = total_size
        return combined

    # Group by order_id, ts_event, and side, then combine rows within each group
    df_T_combined = (
        df_T.groupby(['order_id', 'ts_event', 'side'], as_index=False)
            .apply(combine_group)
            .reset_index(drop=True)
    )
    
    # Concatenate the combined 'T' rows with the non-'T' rows
    df_result = pd.concat([df_T_combined, df_non_T], ignore_index=True)
    
    # Resort the merged DataFrame by the ts_event column
    df_result = df_result.sort_values('ts_event').reset_index(drop=True)
    
    return df_result


def create_p_plus_minus(ob, mbp):
    """
    Merge the 'ob' dataframe with the 'mbp' dataframe based on the 'ts_event' column.
    For each row in 'ob', the corresponding minus row from 'mbp' is the one with the greatest
    'ts_event' that is strictly less than the 'ts_event' in 'ob'. The corresponding plus row 
    from 'mbp' is the one with the smallest 'ts_event' that is strictly greater than the 'ts_event' 
    in 'ob'. 

    Parameters:
        ob (pd.DataFrame): The orderbook dataframe with a 'ts_event' column.
        mbp (pd.DataFrame): The mbp dataframe with a 'ts_event' column.

    Returns:
        pd.DataFrame: The merged dataframe.
    """

    ob = ob.sort_values('ts_event')
    mbp = mbp.sort_values('ts_event')

    # Append the suffix 'minus' to all column names in 'mbp' except 'ts_event'
    mbp_renamed = mbp.rename(columns={col: f"{col}_minus" for col in mbp.columns if col != 'ts_event'})

    # Perform the merge-asof operation
    merged_df = pd.merge_asof(
        ob, 
        mbp_renamed, 
        on='ts_event', 
        direction='backward',  # Ensures the greatest time in 'mbp' that is strictly less than 'trades'
        allow_exact_matches=False
    )

    # Append the suffix 'minus' to all column names in 'mbp' except 'ts_event'
    mbp_renamed = mbp.rename(columns={col: f"{col}_plus" for col in mbp.columns if col != 'ts_event'})

    # Perform the merge-asof operation
    merged_df = pd.merge_asof(
        merged_df, 
        mbp_renamed, 
        on='ts_event', 
        direction='forward',  # Ensures the greatest time in 'mbp' that is strictly less than 'trades'
        allow_exact_matches=True
    )

    return merged_df


# Lee & Ready algo
def assign_side(df):
    def apply_tick_test(df, index):
        if index == 0:
            return 'B'  # Default to 'B' if no previous trade exists
        for i in range(index - 1, -1, -1):
            if df.loc[i, 'action'] != 'T':
                continue
            if df.loc[i, 'price'] != df.loc[index, 'price']:
                if df.loc[index, 'price'] > df.loc[i, 'price']:
                    return 'B'  
                else:
                    return 'A'  
        return 'B'  # Default to 'B' if no differing trade price is found

    # Apply the rules to update the 'side' column
    for index, row in df.iterrows():
        if row['side'] == 'N':
            if row['price'] > row['mid_px_00_minus']:
                df.at[index, 'side'] = 'B'
            elif row['price'] < row['mid_px_00_minus']:
                df.at[index, 'side'] = 'A'
            else:
                df.at[index, 'side'] = apply_tick_test(df, index)
    return df


def calculate_time_blocks(df, ts_event_col='ts_event', start_time='09:30:00', end_time='16:00:00', num_blocks=6):
    """
    Adds a 'block' column to the dataframe, dividing the time between start_time and end_time
    into roughly equal periods.

    Parameters:
        df (pd.DataFrame): The input dataframe.
        ts_event_col (str): The name of the timestamp column (default: 'ts_event').
        start_time (str): The start time of the period (default: '09:30:00').
        end_time (str): The end time of the period (default: '16:00:00').
        num_blocks (int): The number of blocks to divide the time into (default: 6).

    Returns:
        pd.DataFrame: The dataframe with the added 'block' column.
    """
    # Ensure the timestamp column is timezone-aware
    if not pd.api.types.is_datetime64tz_dtype(df[ts_event_col]):
        raise ValueError(f"The column '{ts_event_col}' must be timezone-aware.")

    # Get the timezone of the ts_event column
    tz = df[ts_event_col].dt.tz

    start_time = pd.to_datetime(start_time).time()
    end_time = pd.to_datetime(end_time).time()
    df['date'] = df[ts_event_col].dt.date
    df['start_datetime'] = pd.to_datetime(df['date'].astype(str) + ' ' + start_time.strftime('%H:%M:%S')).dt.tz_localize(tz)
    df['end_datetime'] = pd.to_datetime(df['date'].astype(str) + ' ' + end_time.strftime('%H:%M:%S')).dt.tz_localize(tz)

    # Calculate the total time range in seconds
    total_seconds = (df['end_datetime'] - df['start_datetime']).dt.total_seconds()

    # Divide the time range into equal blocks
    df['block'] = pd.cut(
        (df[ts_event_col] - df['start_datetime']).dt.total_seconds(),
        bins=num_blocks,
        labels=range(1, num_blocks + 1)
    )

    df.drop(columns=['date', 'start_datetime', 'end_datetime'], inplace=True)

    return df

In [16]:
# Functions to define order-level features used in the paper

def bbo_moving_trade(df):
    df['bbo_moving_trade'] = np.where(((df['action'] == 'T') & (df['side'] == 'A') & (df['size'] >= df['bid_sz_00_minus'])) |
                                      ((df['action'] == 'T') & (df['side'] == 'B') & (df['size'] >= df['ask_sz_00_minus'])),
                                      1, 0)
    return df


def non_bbo_moving_trade(df):
    df['non_bbo_moving_trade'] = np.where(((df['action'] == 'T') & (df['side'] == 'A') & (df['size'] < df['bid_sz_00_minus'])) |
                                          ((df['action'] == 'T') & (df['side'] == 'B') & (df['size'] < df['ask_sz_00_minus'])),
                                          1, 0)
    return df


def bbo_improving_limit(df):
    df['bbo_improving_limit'] = np.where(((df['action'] == 'A') & (df['side'] == 'A') & (df['price'] < df['ask_px_00_minus'])) |
                                         ((df['action'] == 'A') & (df['side'] == 'B') & (df['price'] > df['bid_px_00_minus'])),
                                         1, 0)
    return df


def bbo_worsening_cancel(df):
    df['bbo_worsening_cancel'] = np.where(((df['action'] == 'C') & (df['side'] == 'A') & (df['price'] <= df['ask_px_00_minus']) &
                                                (df['size'] >= df['ask_sz_00_minus'])) |
                                          ((df['action'] == 'C') & (df['side'] == 'B') & (df['price'] >= df['bid_px_00_minus']) &
                                                (df['size'] >= df['bid_sz_00_minus'])),
                                          1, 0)
    return df


def bbo_depth_add_limit(df):
    df['bbo_depth_add_limit'] = np.where(((df['action'] == 'A') & (df['side'] == 'A') & (df['price'] == df['ask_px_00_minus'])) |
                                         ((df['action'] == 'A') & (df['side'] == 'B') & (df['price'] == df['bid_px_00_minus'])),
                                          1, 0)
    return df


def bbo_depth_remove_cancel(df):
    df['bbo_depth_remove_cancel'] = np.where(((df['action'] == 'C') & (df['side'] == 'A') & (df['price'] == df['ask_px_00_minus']) &
                                                (df['size'] < df['ask_sz_00_minus'])) |
                                             ((df['action'] == 'C') & (df['side'] == 'B') & (df['price'] == df['bid_px_00_minus']) &
                                                (df['size'] < df['bid_sz_00_minus'])),
                                             1, 0)
    return df


def non_bbo_depth_add_limit(df):
    df['non_bbo_depth_add_limit'] = np.where(((df['action'] == 'A') & (df['side'] == 'A') & (df['price'] > df['ask_px_00_minus']) &
                                              (df['price'] < df['ask_px_05_minus'])) |
                                             ((df['action'] == 'A') & (df['side'] == 'B') & (df['price'] < df['bid_px_00_minus']) &
                                              (df['price'] > df['bid_px_05_minus'])),
                                             1, 0)
    return df


def non_bbo_depth_remove_cancel(df):
    df['non_bbo_depth_remove_cancel'] = np.where(((df['action'] == 'C') & (df['side'] == 'A') & (df['price'] > df['ask_px_00_minus']) &
                                                  (df['price'] < df['ask_px_05_minus'])) |
                                                 ((df['action'] == 'C') & (df['side'] == 'B') & (df['price'] < df['bid_px_00_minus']) &
                                                  (df['price'] > df['bid_px_05_minus'])),
                                                 1, 0)
    return df


def non_bbo_deep_depth_add_limit(df):
    df['non_bbo_deep_depth_add_limit'] = np.where(((df['action'] == 'A') & (df['side'] == 'A') & (df['price'] >= df['ask_px_05_minus'])) |
                                                  ((df['action'] == 'A') & (df['side'] == 'B') & (df['price'] < df['bid_px_05_minus'])),
                                                  1, 0)
    return df


def non_bbo_deep_depth_remove_cancel(df):
    df['non_bbo_deep_depth_remove_cancel'] = np.where(((df['action'] == 'C') & (df['side'] == 'A') & (df['price'] > df['ask_px_05_minus'])) |
                                                 ((df['action'] == 'C') & (df['side'] == 'B') & (df['price'] < df['bid_px_05_minus'])),
                                                 1, 0)
    return df


import pandas as pd


def lagged_ob_features(df):
    ob_features = ['bbo_moving_trade', 'non_bbo_moving_trade', 'bbo_improving_limit',
                   'bbo_worsening_cancel', 'bbo_depth_add_limit', 'bbo_depth_remove_cancel',
                   'non_bbo_depth_add_limit', 'non_bbo_depth_remove_cancel', 
                   'non_bbo_deep_depth_add_limit', 'non_bbo_deep_depth_remove_cancel']
    
    new_cols = {}
    for feat in ob_features:
        for r in range(1, 6):
            col_name = f'{feat}_{r}'
            new_cols[col_name] = df[feat].shift(1).rolling(r).sum().fillna(0)
    
    new_cols_df = pd.DataFrame(new_cols, index=df.index)
    df = pd.concat([df, new_cols_df], axis=1)
    
    return df


def add_ob_features(df):
    df = bbo_moving_trade(df)
    df = non_bbo_moving_trade(df)
    df = bbo_improving_limit(df)
    df = bbo_worsening_cancel(df)
    df = bbo_depth_add_limit(df)
    df = bbo_depth_remove_cancel(df)
    df = non_bbo_depth_add_limit(df)
    df = non_bbo_depth_remove_cancel(df)
    df = non_bbo_deep_depth_add_limit(df)
    df = non_bbo_deep_depth_remove_cancel(df)
    df = lagged_ob_features(df)

    return df

In [17]:
# Functions to define state-level features used in the paper

def add_mid_px(df):
    df['mid_px_00'] = (df['bid_px_00'] + df['ask_px_00']) / 2
    return df


def spread(df):
    df['spread'] = (df['ask_px_00'] - df['bid_px_00']) / df['mid_px_00']
    return df


def fill_missing(df):
    lvls = ['00', '01', '02', '03', '04', 
            '05', '06', '07', '08', '09']
    
    bid_fillers = [f'bid_sz_{i}' for i in lvls]
    ask_fillers = [f'ask_sz_{i}' for i in lvls]
    fillers = bid_fillers + ask_fillers

    df[fillers] = df[fillers].fillna(0)
    return df


def bbo_depth_imbalance(df):
    df['bbo_depth_imbalance'] = df['bid_sz_00'] - df['ask_sz_00']
    return df


def non_bbo_depth_imbalance(df):
    df['non_bbo_depth_imbalance'] = (df['bid_sz_00'] + df['bid_sz_01'] + df['bid_sz_02'] + df['bid_sz_03'] + df['bid_sz_04']) - \
                                    (df['ask_sz_00'] + df['ask_sz_01'] + df['ask_sz_02'] + df['ask_sz_03'] + df['ask_sz_04'])
    return df


def non_bbo_deep_depth_imbalance(df):
    df['non_bbo_deep_depth_imbalance'] = (df['bid_sz_05'] + df['bid_sz_06'] + df['bid_sz_07'] + df['bid_sz_08'] + df['bid_sz_09']) - \
                                         (df['ask_sz_05'] + df['ask_sz_06'] + df['ask_sz_07'] + df['ask_sz_08'] + df['ask_sz_09'])
    return df


def bbo_queue_length_immbalance(df):
    df['bbo_queue_length_immbalance'] = df['bid_ct_00'] - df['ask_ct_00']
    return df


def add_state_features(df):
    df = add_mid_px(df)
    df = spread(df)
    df = fill_missing(df)
    df = bbo_depth_imbalance(df)
    df = non_bbo_depth_imbalance(df)
    df = non_bbo_deep_depth_imbalance(df)
    df = bbo_queue_length_immbalance(df)
    return df

In [18]:
def add_px_impact(df):
    for i in [0, 1, 5, 10, 20]:
        if i == 0:
            df[f'px_imp_{i}'] = np.log(df['mid_px_00_plus'] / df['mid_px_00_minus'])
        else:
            df[f'px_imp_{i}'] = np.log(df['mid_px_00_plus'].shift(-i) / df['mid_px_00_minus'])
    
    return df

In [19]:
def process_day_of_data(ticker, year, month, day, exchange):
    ob = import_raw_mbo_data(ticker, year, month, day, exchange)
    mbp = import_raw_mbp_data(ticker, year, month, day, exchange)

    ob = filter_order_actions(ob)
    ob = combine_T_rows(ob)
    mbp = add_state_features(mbp)

    df = create_p_plus_minus(ob, mbp)
    df = assign_side(df)
    df = calculate_time_blocks(df)
    df = pd.get_dummies(df, columns=['block'], prefix='block', drop_first=True)
    df['bid'] = np.where(df['side'] == 'B', 1, 0)
    df = add_ob_features(df)
    df = add_px_impact(df)

    df = df.dropna(subset=['px_imp_0', 'px_imp_1', 'px_imp_5', 'px_imp_10', 'px_imp_20'])

    return df


def process_batch_of_data(tickers, years, months, days, exchanges):
    df_list = []

    for ticker in tickers:
        for year in years:
            for month in months:
                for day in days:
                    df0 = process_day_of_data(ticker, year, month, day, exchanges[0])
                    df1 = process_day_of_data(ticker, year, month, day, exchanges[1])
                    df2 = pd.concat([df0, df1])
                    df2.to_pickle(f'/data/workspace_files/{ticker}-{year}{month}{day}-df.pkl')
                    # df_list.append(df2)
        
    # dfs = pd.concat(df_list, ignore_index=True)
    # get_data_statistics(dfs)

    # return dfs

In [20]:
def get_data_statics(df):
    # List of event indicator columns
    list1 = ['bbo_moving_trade', 'non_bbo_moving_trade', 'bbo_improving_limit', 
             'bbo_worsening_cancel', 'bbo_depth_add_limit', 'bbo_depth_remove_cancel', 
             'non_bbo_depth_add_limit', 'non_bbo_depth_remove_cancel',  
             'non_bbo_deep_depth_add_limit', 'non_bbo_deep_depth_remove_cancel']
    

    data = []
    for feat in list1:
        # Filter rows where the event occurred (assuming an indicator value of 1)
        df_event = df[df[feat] == 1]
        num = len(df_event)
        median_size = df_event['size'].median()
        max_size = df_event['size'].max()
        avg_price = df_event['price'].mean()
        p_imp0 = np.abs(df_event['px_imp_0']).mean()
        p_imp20 = np.abs(df_event['px_imp_20']).mean()
        
        
        data.append({
            'Event': feat,
            'Count': num,
            'Median Size': median_size,
            'Max Size': max_size,
            'Avg Price': avg_price,
            'Avg Price Impact': p_imp0,
            'Avg Permanent Price Impact': p_imp20
        })

    event_metrics_df = pd.DataFrame(data)
    
    avg_spread = df['spread_minus'].mean()
    spread_df = pd.DataFrame({
        'Metric': ['Average Spread'],
        'Value': [avg_spread]
    })
    
    return event_metrics_df, spread_df

### Get Train and Test Data

In [8]:
tickers = ['JPM', 'HPE', 'LNT', 'LULU', 'CCL', 'BBIO', 'NTNX', 'KO', 'NFE', 'MVST']
years = ['2024']
months = ['10', '11', '12']

train_list_oct = [
    '01', '02', '03', '04',  
    '07', '08', '09', '10', '11',
    '14', '15', '16', '17', '18'  
]


val_list_oct = [ 
    '21', '22', '23', '24', '25', 
    '28', '29', '30', '31'  
]


train_list_nov = [
    '01',                     
    '04', '05', '06', '07', '08',  
    '11', '12', '13', '14', '15'
]


val_list_nov = [
    '18', '19', '20', '21', '22',  
    '25', '26', '27',       '29'   
]


test_list_dec = [
    '02', '03', '04', '05', '06',  
    '09', '10', '11', '12', '13'  
]

exchanges = ['xnas', 'nyse']

In [0]:
process_batch_of_data(tickers, years, ['11'], train_list_nov, exchanges)

In [None]:
process_batch_of_data(tickers, years, ['11'], val_list_nov, exchanges)

In [None]:
process_batch_of_data(tickers, years, ['12'], test_list_dec, exchanges)

In [5]:
df = pd.read_parquet('/data/workspace_files/train_oct/first_week_df.parquet')

In [11]:
get_data_statics(df)