In [4]:
import mnemosyne as ms
from mnemosyne.engines import ReturnsEngine, MetadataEngine
from datetime import datetime as Datetime, date as Date
from timedelta_isoformat import timedelta as Timedelta
import polars as pl
from pathlib import Path
from tqdm.auto import tqdm 
from mnemosyne.dataset import ByDateDataset

# Class-based example

In [5]:
grid_dataset = ms.binance.BinanceLastTradesGrid(
        peg_symbol='USDT',
        grid_interval=Timedelta(seconds=5),
        dataset_type=ms.DatasetType.BinanceSpotTrades, 
        parquet_names='*.parquet',
    )

metadata_engine = MetadataEngine(
    path='/data/midas/mnemosyne/notebooks/workflows/dev/metadata', num_workers=1, 
    backend_dataset=grid_dataset, 
)

In [6]:
metadata_engine.compute()

In [7]:
metadata_engine.lazyframe().collect()

symbol,time,date,last_event_time,liquidity_1d,sqrtliq_1d,excess_buy_ratio_1d,trade_count_1d,liquidity_7d,sqrtliq_7d,excess_buy_ratio_7d,trade_count_7d,daily_returns_drift_7d_lookback,daily_vol_7d_lookback,vol_ssize_7d_lookback,daily_returns_drift_30d_lookback,daily_vol_30d_lookback,vol_ssize_30d_lookback,liquidity_1d_q,liquidity_7d_q,daily_vol_7d_lookback_q,daily_vol_30d_lookback_q
enum,datetime[μs],date,datetime[μs],f64,f64,f64,u64,f64,f64,f64,u64,f64,f64,u64,f64,f64,u64,f64,f64,f64,f64
"""1INCH""",2022-01-01 01:00:00,2022-01-01,2022-01-01 00:59:46.473,461963.1408,679.678704,0.145341,2578,461963.1408,679.678704,0.145341,2578,0.299437,0.031425,4,0.299437,0.031425,4,0.705706,0.705706,0.38253,0.38253
"""AAVE""",2022-01-01 01:00:00,2022-01-01,2022-01-01 00:59:59.207,1.9722e6,1404.345523,0.215496,10411,1.9722e6,1404.345523,0.215496,10411,0.572448,0.063986,4,0.572448,0.063986,4,0.894895,0.894895,0.813253,0.813253
"""ACM""",2022-01-01 01:00:00,2022-01-01,2022-01-01 00:59:33.783,60102.7902,245.158704,0.098034,397,60102.7902,245.158704,0.098034,397,0.228686,0.007803,4,0.228686,0.007803,4,0.33033,0.33033,0.033133,0.033133
"""ADA""",2022-01-01 01:00:00,2022-01-01,2022-01-01 00:59:58.613,5.6464e6,2376.225047,0.18526,9254,5.6464e6,2376.225047,0.18526,9254,0.353817,0.015707,4,0.353817,0.015707,4,0.948949,0.948949,0.087349,0.087349
"""ADADOWN""",2022-01-01 01:00:00,2022-01-01,2022-01-01 00:58:43.535,72833.738578,269.877266,-0.21072,396,72833.738578,269.877266,-0.21072,396,-0.677815,0.067786,4,-0.677815,0.067786,4,0.384384,0.384384,0.837349,0.837349
…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…
"""ZIL""",2025-10-22 23:00:00,2025-10-22,2025-10-22 22:54:22.045624,1.5447e6,1242.862427,-0.199955,12037,6.7418e6,2596.493307,-0.050147,64611,-0.014623,0.048778,1008,-0.006546,0.096182,4320,0.430876,0.260369,0.205069,0.223502
"""ZK""",2025-10-22 23:00:00,2025-10-22,2025-10-22 22:59:59.573118,4.6756e6,2162.308817,-0.074665,80466,2.9939e7,5471.621105,-0.023599,563328,-0.019417,0.062789,1008,-0.004525,0.131191,4320,0.707373,0.656682,0.605991,0.520737
"""ZKC""",2025-10-22 23:00:00,2025-10-22,2025-10-22 22:59:43.552853,9.0517e6,3008.60374,0.057722,62978,1.7326e8,13162.774378,0.017276,1535698,0.02882,0.166723,1008,-0.028198,0.14544,4320,0.813364,0.891705,0.988479,0.633641
"""ZRO""",2025-10-22 23:00:00,2025-10-22,2025-10-22 22:59:14.069030,4.7691e6,2183.831338,0.00182,42300,3.8418e7,6198.230371,-0.01955,350613,-0.014994,0.0578,1008,0.003,0.116789,4320,0.709677,0.714286,0.451613,0.391705


# Debugging dev

In [None]:
def default_metadata(returns_interval: Timedelta):
    liquidity = (pl.col('vwap_price') * pl.col('volume')).sum()
    sqrtliq = liquidity.pow(0.5)
    excess_buy_ratio = ((pl.col('taker_buy_volume') - pl.col('taker_sell_volume')).sum() / pl.col('volume').sum())
    trade_count = pl.col('trade_count').sum()

    num_intervals_in_day = Timedelta(days=1) / returns_interval
    returns_drift = pl.col('return').mean() * num_intervals_in_day
    volatility = pl.col('return').std() * num_intervals_in_day ** .5
    vol_ssize = pl.col('return').count().alias('vol_ssize')
    return {
        # These are calculated 
        'by_symbol_index': {
            Timedelta(days=1): [
                liquidity.alias('liquidity_1d'),
                sqrtliq.alias('sqrtliq_1d'),
                excess_buy_ratio.alias('excess_buy_ratio_1d'), 
                trade_count.alias('trade_count_1d')
            ], 
            Timedelta(days=7): [
                liquidity.alias('liquidity_7d'),
                sqrtliq.alias('sqrtliq_7d'),
                excess_buy_ratio.alias('excess_buy_ratio_7d'), 
                trade_count.alias('trade_count_7d'), 
            ], 
        },
        'accum_returns': {
            Timedelta(days=7): [
                returns_drift.alias('daily_returns_drift_7d_lookback'), 
                volatility.alias('daily_vol_7d_lookback') , 
                vol_ssize.alias('vol_ssize_7d_lookback'),
            ],
            Timedelta(days=30): [
                returns_drift.alias('daily_returns_drift_30d_lookback'), 
                volatility.alias('daily_vol_30d_lookback') , 
                vol_ssize.alias('vol_ssize_30d_lookback'),
            ]
        }
    }

dataset_type, backend_grid_interval, peg_symbol = (ms.DatasetType.BinanceSpotTrades, Timedelta(seconds=5), 'USDC')
backend_dataset = ms.binance.BinanceLastTradesGrid(
        peg_symbol=peg_symbol, 
        grid_interval=backend_grid_interval, 
        dataset_type=dataset_type, 
        parquet_names='*.parquet',
    )

In [None]:
# Arguments and preparation
returns_engine_kwargs = {}
returns_query_kwargs = {
    'filter_by_query_dates': True, 
}

# Class initialization argument (whole dataset)
# Should support initializing a ReturnsEngine
# Supplies both (1) symbol-date universe (via .universe()) and (2) source for metadata computation
backend_dataset = backend_dataset

# Some more arguments. Users should pass these in manually
last_event_time_expr = pl.col('last_event_time')

# returns_interval should ideally partition grid_interval. 
# Vol and drift are ""essentially" for returns over this range
returns_interval = Timedelta(minutes=10)

# This is applied on grid_interval (coarser compared to returns) grids pre joining with returns. 
# Directly applied to backend_dataset's db (match schema there), grouped by symbol and grid_timestamp
# Indexed by rolling window duration over which these are calculated
metadata_exprs = default_metadata(returns_interval)
quantile_expand_exprs = pl.col('^daily_vol.*$', '^liquidity.*$')
dir(returns_engine)

grid_interval = Timedelta(hours=1) # grid_interval **must** be larger than returns_interval! 

In [None]:
# Initialization: prepare for later queries by initializing returns_engine
backend_db = backend_dataset.lazyframe()
returns_engine = ReturnsEngine(backend_db, **returns_engine_kwargs)

# Compute maximum lookbacks for returns & metadata
max_returns_lookback = max(metadata_exprs['accum_returns'].keys())
max_metadata_lookback = max(metadata_exprs['by_symbol_index'].keys())

# Filter laxly around given date ranges: returns & index grids symbol-date pairs are constructed off this dataframe
backend_universe_df = backend_dataset.cast_symbol_col_to_enum(backend_dataset.universe())
returns_grid = backend_universe_df.with_columns(
        returns_grid_time=pl.datetime_ranges(
            pl.col('date'), 
            pl.col('date').dt.offset_by('1d'),
            interval=returns_interval,
            closed='left'
        )
    ).explode('returns_grid_time').sort('symbol', 'returns_grid_time').lazy()

In [None]:
# Now, argument for subset metadata computation
start_date = Date(2025, 1, 1)
end_date = Date(2025, 2, 1) # Inclusive on the left, exclusive on the right

In [None]:
## Step 1: compute returns_interval gridded returns
# [symbol, date, time] with `returns_interval` interspaced "time" 
returns_query = returns_grid.filter(
    (pl.col('date') >= start_date - max_returns_lookback) & 
    (pl.col('date') <= pl.lit(end_date))
).sort('symbol', 'returns_grid_time')

index_with_returns = returns_engine.query(
    returns_query, 
    start_time_expr=pl.col('returns_grid_time'), 
    mark_duration=returns_interval, 
    tick_lag_tolerance=returns_interval,
    append_lag=True, **returns_query_kwargs
).sort('symbol', 'returns_grid_time')

index_with_returns.collect()

In [None]:
returns_metadata = pl.concat([
    index_with_returns.select('symbol', 'returns_grid_time')
] + [
    index_with_returns.rolling(
        pl.col('returns_grid_time'), period=interval, closed='left', group_by='symbol'
    ).agg(cols).sort('symbol', 'returns_grid_time').drop('symbol', 'returns_grid_time')
    for interval, cols in metadata_exprs['accum_returns'].items()
], how='horizontal').with_columns(
    # Add grid_interval to make grid_time point-in-time as well
    grid_time = pl.col('returns_grid_time').dt.truncate(grid_interval) + grid_interval
).drop('returns_grid_time').filter(
    # We can filter early here since here's a direct join to the final result
    pl.col('grid_time').is_between(start_date, end_date, closed='left')
).group_by('symbol', 'grid_time').agg(pl.all().last()).sort('symbol', 'grid_time')
returns_metadata.collect()

In [None]:
# Collect rolling database of metadata. 
# Only collect index-gridded entries
inrange_db = backend_db.filter(
    (pl.col('date') >= start_date - max_metadata_lookback) & 
    (pl.col('date') <= end_date)
).sort('symbol', 'last_event_time')

rolling_metadata = pl.concat([
    inrange_db.select('symbol', last_event_time_expr)
] + [
    inrange_db.rolling(
        last_event_time_expr, period=interval, closed='left', group_by='symbol'
    ).agg(cols).sort('symbol', last_event_time_expr).drop('symbol', last_event_time_expr)
    for interval, cols in metadata_exprs['by_symbol_index'].items()
], how='horizontal').with_columns(
    # Add grid_interval to make grid_time point-in-time as well
    grid_time = last_event_time_expr.dt.truncate(grid_interval) + grid_interval
).filter(
    # We can filter early here since here's a direct join to the final result
    pl.col('grid_time').is_between(start_date, end_date, closed='left')
).group_by('symbol', 'grid_time').agg(pl.all().last()).sort('symbol', 'grid_time')
rolling_metadata.collect()

In [None]:
final_metadata = rolling_metadata.join(
    returns_metadata, 
    on=['symbol', 'grid_time']
).with_columns(
    (
        quantile_expand_exprs.rank('average') / quantile_expand_exprs.count()
    ).name.suffix('_q').over('grid_time')
).sort('symbol', last_event_time_expr)
final_metadata = final_metadata.collect(engine='streaming')

In [None]:
final_metadata

In [None]:
final_metadata.schema