In [89]:
import polars as pl
import numpy as np
import random

import pathlib
import os
from datetime import datetime, timezone

import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots

this_dir = pathlib.Path('.').parent.resolve()
data_dir = os.path.join(this_dir, "..", "data")
trades_dir = os.path.join(data_dir, "2025")
output_dir = os.path.join(data_dir, "output")


In [151]:

def read_csvs(dir_path: str, random_sample: int | None = None) -> pl.DataFrame:
    csv_files = list(pathlib.Path(dir_path).rglob("*.csv.gz"))
    if random_sample is not None:
        csv_files = random.sample(csv_files, random_sample)
     
    dfs = [pl.read_csv(f) for f in csv_files]
    return pl.concat(dfs)

def parse_tickers(df: pl.DataFrame, ticker_col: str = "ticker", sip_col: str = "sip_timestamp") -> pl.DataFrame:
    # Regex to parse variable-length root
    pattern = r"^O:(?P<root>[A-Z]+)(?P<date>\d{6})(?P<cp>[CP])(?P<strike>\d+)$"

    # Parse ticker
    df = df.with_columns([
        pl.col(ticker_col).str.extract(pattern, 1).alias("root"),
        pl.col(ticker_col).str.extract(pattern, 2).alias("date"),
        pl.col(ticker_col).str.extract(pattern, 3).alias("cp"),
        pl.col(ticker_col).str.extract(pattern, 4).alias("strike"),
    ])

    # Convert date to Date type and strike to integer
    df = df.with_columns([
        pl.col("date").str.strptime(pl.Date, format="%y%m%d").alias("expiration"),
        pl.col("strike").cast(pl.Int64)
    ]).drop("date")

    # Efficiently convert from epoch nanoseconds to UTC datetime using numpy vectorized operations
    sip_np = df[sip_col].to_numpy()
    dt_np = sip_np.astype('datetime64[ns]')
    df = df.with_columns(pl.Series("datetime", dt_np)).drop(sip_col)

    return df

def compute_cumulative_volume(df: pl.DataFrame, time_col: str = "datetime", group_col: str = "ticker", volume_col: str = "size") -> pl.DataFrame:
    # Sort by time
    df = df.sort(time_col)

    # Compute cumulative volume using groupby and cumsum
    df = df.with_columns([
        pl.col(volume_col).cum_sum().over(group_col).alias("cumulative_volume"),
        pl.arange(1, pl.count() + 1).over(group_col).alias("num_trades")
        ])

    return df


In [152]:
test = read_csvs(trades_dir, random_sample=1)
test = parse_tickers(test)
test = compute_cumulative_volume(test)


`pl.count()` is deprecated. Please use `pl.len()` instead.
(Deprecated in version 0.20.5)



In [153]:
vol = test.group_by("ticker").agg([
    pl.col("size").sum().alias("total_volume")
]).sort("total_volume", descending=True)

In [None]:
tick_testers = vol.head(20)['ticker'].to_list()
TT = tick_testers[2]

fig = make_subplots(rows=4, cols=1, shared_xaxes=True, vertical_spacing=0.1,
                    subplot_titles=("Trade Price Over Time", "Trade Volume Over Time"))

ticker_data = test.filter(pl.col("ticker") == TT).sort("datetime")


fig.add_trace(go.Scatter(x=ticker_data['datetime'], y=ticker_data['price'], name='Price'), row=1, col=1)
fig.add_trace(go.Scatter(x=ticker_data['datetime'], y=ticker_data['cumulative_volume'], name='Cumulative Volume'), row=2, col=1)
fig.add_trace(go.Scatter(x=ticker_data['datetime'], y=ticker_data['num_trades'], name='Number of Trades',opacity=0.5), row=3, col=1)
fig.add_trace(go.Bar(x=ticker_data['datetime'], y=ticker_data['size'], name='Trade Size'), row=4, col=1)

# Extract symbol/root from TT (ticker string)
symbol = TT.split(":")[1][:-15]  # Remove date, cp, strike (assuming OCC symbology)
strike = ticker_data['strike'][0]
expiration = ticker_data['expiration'][0]
cp = TT[-15]  # 'C' for call, 'P' for put
cp_text = "Call" if cp == "C" else "Put"
fig.update_layout(
    title_text=f"Trade Data for {symbol} {cp_text}<br>Strike: {strike}, Expiration: {expiration}"
)


fig.show(renderer="browser")

In [157]:
fig = px.scatter(x=ticker_data['num_trades'], y=ticker_data['cumulative_volume'],)
fig.show(renderer="browser")