In [5]:
import sys
import os
from pathlib import Path
from datetime import datetime
import glob

current_dir = Path.cwd()
src_path = current_dir.parent / "src"
if str(src_path) not in sys.path:
    sys.path.append(str(src_path))

import polars as pl
from tqdm import tqdm
from config import TRADE_DIR, BBO_DIR, FEATURES_OUTPUT_DIR

In [6]:
sample_trade_files = glob.glob(str(TRADE_DIR / "**" / "*.parquet"), recursive=True)

f_trade = sample_trade_files[0]
print(Path(f_trade).name)

df_trade = pl.read_parquet(f_trade, n_rows=5)

2010-01-04-AAPL.OQ-trade.parquet


In [7]:
print(df_trade.columns) 
print(df_trade.schema)

['xltime', 'trade-price', 'trade-volume', 'trade-stringflag', 'trade-rawflag']
Schema({'xltime': Float64, 'trade-price': Float64, 'trade-volume': Int32, 'trade-stringflag': String, 'trade-rawflag': String})


In [8]:
sample_bbo_files = glob.glob(str(BBO_DIR / "**" / "*.parquet"), recursive=True)

f_bbo = sample_bbo_files[0]
print(Path(f_bbo).name)

df_bbo = pl.read_parquet(f_bbo, n_rows=5)

2010-01-04-AAPL.OQ-bbo.parquet


In [9]:
print(df_bbo.columns)
print(df_bbo.schema)

['xltime', 'bid-price', 'bid-volume', 'ask-price', 'ask-volume']
Schema({'xltime': Float64, 'bid-price': Float64, 'bid-volume': Int32, 'ask-price': Float64, 'ask-volume': Int32})


In [10]:
df_bbo

xltime,bid-price,bid-volume,ask-price,ask-volume
f64,f64,i32,f64,i32
40182.500345,0.0,0,213.0,10
40182.50068,211.86,5,213.0,10
40182.500752,0.0,0,213.0,10
40182.500752,211.83,5,213.0,10
40182.500783,0.0,0,213.0,10


In [13]:
def process_ticker(ticker_name, trade_path_glob, bbo_path_glob):
    try:
        q_trades = pl.scan_parquet(trade_path_glob)
        q_bbo = pl.scan_parquet(bbo_path_glob)
        
        base_date = datetime(1899, 12, 30)
        
        q_trades = (
            q_trades
            .select([
                (pl.lit(base_date) + pl.duration(days=pl.col("xltime"))).alias("timestamp"),
                pl.col("trade-price").alias("price"),
                pl.col("trade-volume").alias("volume"),
                pl.col("trade-stringflag").alias("flag") 
            ])
            .filter(pl.col("price") > 0)
            # Remove off-market
            .filter(pl.col("flag").str.contains("marketclosed").not_()) 
        )
        
        q_bbo = (
            q_bbo
            .select([
                (pl.lit(base_date) + pl.duration(days=pl.col("xltime"))).alias("timestamp"),
                pl.col("bid-price").alias("bid"),
                pl.col("ask-price").alias("ask")
            ])
            .filter((pl.col("bid") > 0) & (pl.col("ask") > 0))
        )
        
        # Keep only market hourse
        q_trades = q_trades.filter(
            (pl.col("timestamp").dt.hour() > 9) | 
            ((pl.col("timestamp").dt.hour() == 9) & (pl.col("timestamp").dt.minute() >= 30))
        ).filter(
            pl.col("timestamp").dt.hour() < 16
        )

        q_trades = q_trades.sort("timestamp")
        q_bbo = q_bbo.sort("timestamp")

        combined = q_trades.join_asof(q_bbo, on="timestamp", strategy="backward")
        
        combined = combined.with_columns([
            ((pl.col("bid") + pl.col("ask")) / 2).alias("mid_price"),
            (pl.col("ask") - pl.col("bid")).alias("spread")
        ])
        
        # Filter neutral trades
        combined = combined.with_columns(
            pl.when(pl.col("price") > pl.col("mid_price")).then(1)
              .when(pl.col("price") < pl.col("mid_price")).then(-1)
              .otherwise(0).alias("trade_sign")
        ).filter(pl.col("trade_sign") != 0)

        TAU = 100
        combined = combined.with_columns(
            pl.col("mid_price").shift(-TAU).alias("future_mid")
        )
        
        combined = combined.with_columns(
            (pl.col("trade_sign") * (pl.col("future_mid") - pl.col("mid_price"))).alias("price_impact")
        )
        
        features = combined.group_by_dynamic(
            "timestamp",
            every="5m",
            period="30m",
            closed="left",
            label="left"
        ).agg([
            pl.col("price_impact").mean().alias("R_tau"),
            pl.col("price").std().alias("volatility"),
            pl.col("spread").mean().alias("avg_spread"),
            pl.len().alias("trade_count"),
            pl.col("mid_price").last().alias("mid_5m"),
        ]).with_columns([
            (pl.col("mid_5m").log().diff()).alias("ret_5m")
        ])
        
        return features.with_columns(pl.lit(ticker_name).alias("ticker")).collect(streaming=True)
        
    except Exception as e:
        return None

In [14]:
ticker_paths = sorted([p for p in TRADE_DIR.iterdir() if p.is_dir()])
print(f"{len(ticker_paths)} of tickers")

all_features_list = []

pbar = tqdm(ticker_paths)
for ticker_path in pbar:
    ticker = ticker_path.name
    pbar.set_description(f"Processing {ticker}")
    
    trade_glob = str(TRADE_DIR / ticker / "*.parquet")
    bbo_glob = str(BBO_DIR / ticker / "*.parquet")
    
    if not (BBO_DIR / ticker).exists():
        continue
        
    df_feat = process_ticker(ticker, trade_glob, bbo_glob)
    
    if df_feat is not None and not df_feat.is_empty():
        all_features_list.append(df_feat)

print(f"Processed {len(all_features_list)} tickers successfully.")

101 of tickers


  return features.with_columns(pl.lit(ticker_name).alias("ticker")).collect(streaming=True)
Processing XOM.OQ: 100%|██████████| 101/101 [02:01<00:00,  1.20s/it] 

Processed 89 tickers successfully.





In [16]:
final_df = pl.concat(all_features_list)
final_df = final_df.sort(["timestamp", "ticker"])

output_file = FEATURES_OUTPUT_DIR / "features_2010.parquet"
final_df.write_parquet(output_file)

In [17]:
crash_subset = final_df.filter(
    (pl.col("timestamp") >= pl.datetime(2010, 5, 6, 14, 30, 0)) &
    (pl.col("timestamp") <= pl.datetime(2010, 5, 6, 14, 45, 0))
)

crash_subset.select([
    pl.col("R_tau").mean().alias("Mean_Fragility"),
    pl.col("volatility").mean().alias("Mean_Volatility")
])

Mean_Fragility,Mean_Volatility
f64,f64
0.009621,0.144385
