In [1]:
import os
import numpy as np
import pandas as pd
import polars as pl

In [6]:
# Lazy scan all orderbook files
orderbook_lf = pl.scan_parquet("data/raw/ready/ccxt/orderbook/**/*.parquet")

In [8]:
# Add derived features (all lazy - no execution yet)
orderbook_lf = orderbook_lf.with_columns([
    # Best bid/ask from nested struct arrays
    pl.col("bids").list.get(0).struct.field("price").alias("best_bid"),
    pl.col("asks").list.get(0).struct.field("price").alias("best_ask"),
])

orderbook_lf = orderbook_lf.with_columns([
    ((pl.col("best_bid") + pl.col("best_ask")) / 2).alias("mid_price"),
    (pl.col("best_ask") - pl.col("best_bid")).alias("spread"),
])

# Filter for specific symbol (still lazy)
btc_lf = orderbook_lf.filter(
    (pl.col("symbol") == "BTC-USD")
    & (pl.col("exchange") == "coinbaseadvanced")
    )

In [9]:
# Now collect to execute
result = btc_lf.drop("bids", "asks", "symbol", "exchange").collect()
result

collected_at,capture_ts,year,month,day,hour,timestamp,nonce,best_bid,best_ask,mid_price,spread
i64,"datetime[μs, UTC]",i32,i32,i32,i32,i64,i64,f64,f64,f64,f64
1765559042716,2025-12-12 17:04:02.715946 UTC,2025,12,12,17,1765559042672,,90136.01,90138.0,90137.005,1.99
1765559042810,2025-12-12 17:04:02.810648 UTC,2025,12,12,17,1765559042770,,90136.02,90138.0,90137.01,1.98
1765559042862,2025-12-12 17:04:02.862768 UTC,2025,12,12,17,1765559042821,,90136.02,90137.99,90137.005,1.97
1765559042910,2025-12-12 17:04:02.910881 UTC,2025,12,12,17,1765559042870,,90136.02,90137.99,90137.005,1.97
1765559042959,2025-12-12 17:04:02.959453 UTC,2025,12,12,17,1765559042920,,90136.04,90137.99,90137.015,1.95
…,…,…,…,…,…,…,…,…,…,…,…
1766198123489,2025-12-20 02:35:23.489190 UTC,2025,12,20,2,1766198123453,,88133.98,88133.99,88133.985,0.01
1766198123538,2025-12-20 02:35:23.537952 UTC,2025,12,20,2,1766198123501,,88133.98,88133.99,88133.985,0.01
1766198123591,2025-12-20 02:35:23.591319 UTC,2025,12,20,2,1766198123553,,88133.98,88133.99,88133.985,0.01
1766198123738,2025-12-20 02:35:23.738109 UTC,2025,12,20,2,1766198123701,,88133.98,88133.99,88133.985,0.01


## Universe Selection

In [1]:
import ccxt.pro as ccxtpro
import time
import pandas as pd
import numpy as np

In [2]:
e1 = ccxtpro.coinbaseadvanced()
e2 = ccxtpro.binanceus()
e3 = ccxtpro.kraken()

In [3]:
# Fetch tickers for all symbols to get volume data
tickers = await e3.fetch_tickers()

# Convert to list of tuples (symbol, volume) and sort by volume
volume_data = [(symbol, ticker.get('quoteVolume', 0)) for symbol, ticker in tickers.items() if ticker.get('quoteVolume')]
volume_data_sorted = sorted(volume_data, key=lambda x: x[1], reverse=True)

# Get top N symbols
N = 10000000
top_symbols = [symbol for symbol, volume in volume_data_sorted[:N]]

print(f"Top {N} symbols by 24h quote volume:")
for i, (symbol, volume) in enumerate(volume_data_sorted[:N], 1):
    print(f"{i}. {symbol}: ${volume:,.0f}")

Top 10000000 symbols by 24h quote volume:
1. BTC/USD: $322,993,564
2. USDT/USD: $267,667,174
3. ETH/USD: $162,346,716
4. USDC/EUR: $106,835,998
5. SOL/USD: $92,497,122
6. BTC/EUR: $66,292,379
7. USDC/USD: $66,033,869
8. EUR/USD: $54,426,888
9. USDT/EUR: $51,089,303
10. XRP/USD: $45,857,165
11. XMR/USD: $32,851,334
12. USDC/USDT: $32,229,453
13. ETH/EUR: $30,238,118
14. BTC/USDC: $28,446,542
15. DASH/USD: $25,317,608
16. SUI/USD: $22,011,569
17. BTC/JPY: $19,697,685
18. XMR/USDT: $19,279,475
19. ZEC/USD: $18,950,293
20. BTC/USDT: $17,646,357
21. LTC/USD: $15,235,481
22. SOL/EUR: $12,609,798
23. DOGE/USD: $12,059,357
24. ETH/USDC: $11,087,691
25. XRP/EUR: $10,547,699
26. ICP/USD: $10,232,376
27. USDC/GBP: $9,980,570
28. ETH/USDT: $9,878,240
29. GBP/USD: $9,797,884
30. SOL/USDT: $9,278,346
31. SOL/USDC: $8,942,754
32. FARTCOIN/USD: $8,924,330
33. ADA/USD: $8,432,785
34. USD/JPY: $8,129,082
35. TAO/USD: $5,881,874
36. BTC/GBP: $5,085,211
37. EUR/JPY: $5,082,479
38. LINK/USD: $4,858,533
39.

In [4]:
# Define universe size
universe_size = 200

# Filter to get symbols ending with /USDC for consistency
usdc_symbols = [(sym, vol) for sym, vol in volume_data_sorted if sym.endswith('/USD') and not sym.startswith("USD")]

# Define group sizes (proportions of universe_size)
tier1_size = 40  # Top 40 most liquid
tier2_size = 80  # Next 40 moderately liquid
tier3_size = 80  # Next 40 less liquid but still tradeable

# Create the three tiers
tier1_institutional = [sym for sym, vol in usdc_symbols[:tier1_size]]
tier2_competitive = [sym for sym, vol in usdc_symbols[tier1_size:tier1_size + tier2_size]]
tier3_niche = [sym for sym, vol in usdc_symbols[tier1_size + tier2_size:universe_size]]

# print(f"Universe Size: {universe_size}")
# print(f"\n{'='*80}")
# print(f"TIER 1 - INSTITUTIONAL (Top {tier1_size} by volume)")
# print(f"{'='*80}")
# print(f"Characteristics: Highest liquidity, tight spreads, heavy competition")
# print(f"Volume range: ${usdc_symbols[0][1]:,.0f} - ${usdc_symbols[tier1_size-1][1]:,.0f}")
# for i, (sym, vol) in enumerate(usdc_symbols[:tier1_size], 1):
#     print(f"{i:2d}. {sym:20s} ${vol:>15,.0f}")

# print(f"\n{'='*80}")
# print(f"TIER 2 - COMPETITIVE (Next {tier2_size} by volume)")
# print(f"{'='*80}")
# print(f"Characteristics: Good liquidity, reasonable spreads, moderate competition")
# print(f"Volume range: ${usdc_symbols[tier1_size][1]:,.0f} - ${usdc_symbols[tier1_size + tier2_size-1][1]:,.0f}")
# for i, (sym, vol) in enumerate(usdc_symbols[tier1_size:tier1_size + tier2_size], 1):
#     print(f"{i:2d}. {sym:20s} ${vol:>15,.0f}")

# print(f"\n{'='*80}")
# print(f"TIER 3 - NICHE/EDGE (Next {tier3_size} by volume)")
# print(f"{'='*80}")
# print(f"Characteristics: Adequate liquidity, wider spreads, potential edge opportunities")
# print(f"Volume range: ${usdc_symbols[tier1_size + tier2_size][1]:,.0f} - ${usdc_symbols[universe_size-1][1]:,.0f}")
# for i, (sym, vol) in enumerate(usdc_symbols[tier1_size + tier2_size:universe_size], 1):
#     print(f"{i:2d}. {sym:20s} ${vol:>15,.0f}")

# Create final symbols list for the universe
symbols = tier1_institutional + tier2_competitive + tier3_niche

print(f"\n{'='*80}")
print(f"SUMMARY")
print(f"{'='*80}")
print(f"Total symbols in universe: {len(symbols)}")
print(f"Tier 1 (Institutional): {len(tier1_institutional)}")
print(f"Tier 2 (Competitive): {len(tier2_competitive)}")
print(f"Tier 3 (Niche/Edge): {len(tier3_niche)}")


SUMMARY
Total symbols in universe: 200
Tier 1 (Institutional): 40
Tier 2 (Competitive): 80
Tier 3 (Niche/Edge): 80


In [5]:
len(symbols)

200

In [8]:
print(symbols[:25])

['BTC/USD', 'ETH/USD', 'SOL/USD', 'EUR/USD', 'XRP/USD', 'XMR/USD', 'DASH/USD', 'SUI/USD', 'ZEC/USD', 'LTC/USD', 'DOGE/USD', 'ICP/USD', 'GBP/USD', 'FARTCOIN/USD', 'ADA/USD', 'TAO/USD', 'LINK/USD', 'BCH/USD', 'PAXG/USD', 'PUMP/USD', 'PEPE/USD', 'CC/USD', 'RENDER/USD', 'XAUT/USD', 'WIF/USD']


In [3]:
import json

# Read NDJSON file
ndjson_data = []
fpath = "C:/Users/longp/FluxForge/data/raw/ready/ccxt_binanceus/segment_20251209T00_00001.ndjson"
with open(fpath, 'r') as f:
    for line in f:
        ndjson_data.append(json.loads(line))

In [4]:
orderbook_data = [entry for entry in ndjson_data if entry['type'] == 'orderbook']

In [6]:
orderbook_entry = orderbook_data[0]
orderbook_entry

{'type': 'orderbook',
 'exchange': 'binanceus',
 'symbol': 'BTC/USDT',
 'method': 'watchOrderBook',
 'data': {'bids': [[90390.05, 2.23167],
   [90390.04, 0.1275],
   [90389.98, 0.10912],
   [90389.82, 0.94748],
   [90368.32, 0.00163],
   [90282.94, 0.1001],
   [90282.93, 0.12979],
   [90282.92, 0.00082],
   [90268.75, 0.00011],
   [90236.1, 0.27416],
   [90202.86, 0.0001],
   [90191.0, 0.32148],
   [90163.7, 0.33093],
   [90129.95, 0.00037],
   [90052.63, 0.0016],
   [90048.7, 0.10849],
   [90000.0, 0.00549],
   [89964.7, 0.11121],
   [89919.82, 0.00234],
   [89892.53, 7e-05],
   [89886.91, 0.00158],
   [89879.1, 0.10731],
   [89842.2, 0.10638],
   [89833.12, 0.00025],
   [89810.37, 0.00011],
   [89780.13, 0.00089],
   [89771.76, 0.00043],
   [89744.45, 0.00083],
   [89714.33, 7e-05],
   [89686.82, 4e-05],
   [89671.73, 0.00127],
   [89580.68, 0.00037],
   [89556.03, 0.00014],
   [89534.74, 0.00073],
   [89516.66, 0.00079],
   [89510.78, 0.00044],
   [89500.0, 0.00138],
   [89457.24, 0

In [2]:
def extract_orderbook_features(orderbook_entry):
    """
    Extract comprehensive orderbook features from a single orderbook snapshot.
    Returns a dictionary of features with predictive power for modeling.
    """
    data = orderbook_entry['data']
    bids = np.array(data['bids'])  # [[price, size], ...]
    asks = np.array(data['asks'])  # [[price, size], ...]
    
    features = {
        'timestamp': orderbook_entry['collected_at'],
        'symbol': orderbook_entry['symbol'],
        'exchange': orderbook_entry['exchange'],
    }
    
    # ===== BASIC FEATURES =====
    # Best bid/ask
    features['best_bid'] = bids[0][0] if len(bids) > 0 else np.nan
    features['best_ask'] = asks[0][0] if len(asks) > 0 else np.nan
    features['mid_price'] = (features['best_bid'] + features['best_ask']) / 2
    features['spread'] = features['best_ask'] - features['best_bid']
    features['spread_bps'] = (features['spread'] / features['mid_price']) * 10000
    
    # Volume at best
    features['bid_size_level_0'] = bids[0][1] if len(bids) > 0 else 0
    features['ask_size_level_0'] = asks[0][1] if len(asks) > 0 else 0
    
    # ===== DEPTH FEATURES =====
    # Cumulative volume across levels
    n_levels = min(len(bids), len(asks), 10)
    for i in range(n_levels):
        if i < len(bids):
            features[f'bid_size_level_{i}'] = bids[i][1]
            features[f'bid_price_level_{i}'] = bids[i][0]
        if i < len(asks):
            features[f'ask_size_level_{i}'] = asks[i][1]
            features[f'ask_price_level_{i}'] = asks[i][0]
    
    # Total volume by depth
    for depth in [5, 10, 20, 50]:
        depth = min(depth, len(bids), len(asks))
        features[f'bid_volume_{depth}'] = np.sum(bids[:depth, 1])
        features[f'ask_volume_{depth}'] = np.sum(asks[:depth, 1])
        features[f'total_volume_{depth}'] = features[f'bid_volume_{depth}'] + features[f'ask_volume_{depth}']
        features[f'volume_imbalance_{depth}'] = (features[f'bid_volume_{depth}'] - features[f'ask_volume_{depth}']) / features[f'total_volume_{depth}']
    
    # ===== IMBALANCE FEATURES =====
    # Order flow imbalance at different levels
    features['ofi_level_0'] = (features['bid_size_level_0'] - features['ask_size_level_0']) / (features['bid_size_level_0'] + features['ask_size_level_0'])
    
    # Weighted order imbalance
    bid_notional_5 = np.sum(bids[:5, 0] * bids[:5, 1])
    ask_notional_5 = np.sum(asks[:5, 0] * asks[:5, 1])
    features['weighted_oi_5'] = (bid_notional_5 - ask_notional_5) / (bid_notional_5 + ask_notional_5)
    
    # ===== PRICE LEVEL FEATURES =====
    # Distance between levels
    features['bid_level_spacing_mean'] = np.mean(np.diff(bids[:10, 0])) if len(bids) >= 2 else 0
    features['ask_level_spacing_mean'] = np.mean(np.diff(asks[:10, 0])) if len(asks) >= 2 else 0
    
    # Price impact (how much price moves per unit volume)
    features['bid_depth_10bps'] = np.sum(bids[bids[:, 0] >= features['best_bid'] * 0.999, 1]) if len(bids) > 0 else 0
    features['ask_depth_10bps'] = np.sum(asks[asks[:, 0] <= features['best_ask'] * 1.001, 1]) if len(asks) > 0 else 0
    
    # ===== LIQUIDITY FEATURES =====
    # Volume-weighted average price
    features['vwap_bid_5'] = np.sum(bids[:5, 0] * bids[:5, 1]) / np.sum(bids[:5, 1]) if len(bids) >= 5 else np.nan
    features['vwap_ask_5'] = np.sum(asks[:5, 0] * asks[:5, 1]) / np.sum(asks[:5, 1]) if len(asks) >= 5 else np.nan
    features['vwap_spread'] = features['vwap_ask_5'] - features['vwap_bid_5']
    
    # Microprice (fair value estimate)
    features['microprice'] = (features['best_bid'] * features['ask_size_level_0'] + features['best_ask'] * features['bid_size_level_0']) / (features['bid_size_level_0'] + features['ask_size_level_0'])
    
    # ===== SHAPE FEATURES =====
    # Distribution of liquidity
    bid_volume_cumsum = np.cumsum(bids[:, 1])
    ask_volume_cumsum = np.cumsum(asks[:, 1])
    
    # Find where 50% and 90% of volume is
    total_bid_vol = bid_volume_cumsum[-1] if len(bid_volume_cumsum) > 0 else 0
    total_ask_vol = ask_volume_cumsum[-1] if len(ask_volume_cumsum) > 0 else 0
    
    features['bid_50pct_depth'] = np.argmax(bid_volume_cumsum >= total_bid_vol * 0.5) if total_bid_vol > 0 else 0
    features['ask_50pct_depth'] = np.argmax(ask_volume_cumsum >= total_ask_vol * 0.5) if total_ask_vol > 0 else 0
    
    # Concentration (Herfindahl index)
    if total_bid_vol > 0:
        bid_shares = bids[:10, 1] / total_bid_vol
        features['bid_concentration'] = np.sum(bid_shares ** 2)
    else:
        features['bid_concentration'] = 0
        
    if total_ask_vol > 0:
        ask_shares = asks[:10, 1] / total_ask_vol
        features['ask_concentration'] = np.sum(ask_shares ** 2)
    else:
        features['ask_concentration'] = 0
    
    # ===== PRESSURE FEATURES =====
    # Volume ratio at different depths
    for depth in [3, 5, 10]:
        bid_vol = np.sum(bids[:depth, 1])
        ask_vol = np.sum(asks[:depth, 1])
        features[f'volume_ratio_{depth}'] = bid_vol / ask_vol if ask_vol > 0 else np.inf
    
    # Notional value pressure
    bid_notional_10 = np.sum(bids[:10, 0] * bids[:10, 1])
    ask_notional_10 = np.sum(asks[:10, 0] * asks[:10, 1])
    features['notional_pressure_10'] = (bid_notional_10 - ask_notional_10) / (bid_notional_10 + ask_notional_10)
    
    # ===== ADVANCED FEATURES =====
    # Smart depth (volume-adjusted by distance from mid)
    bid_distances = (features['mid_price'] - bids[:, 0]) / features['mid_price']
    ask_distances = (asks[:, 0] - features['mid_price']) / features['mid_price']
    
    features['smart_bid_depth'] = np.sum(bids[:, 1] * np.exp(-100 * bid_distances))
    features['smart_ask_depth'] = np.sum(asks[:, 1] * np.exp(-100 * ask_distances))
    features['smart_depth_imbalance'] = (features['smart_bid_depth'] - features['smart_ask_depth']) / (features['smart_bid_depth'] + features['smart_ask_depth'])
    
    # Kyle's lambda (price impact coefficient approximation)
    features['kyle_lambda_bid'] = features['spread'] / (2 * features['bid_volume_5']) if features['bid_volume_5'] > 0 else np.nan
    features['kyle_lambda_ask'] = features['spread'] / (2 * features['ask_volume_5']) if features['ask_volume_5'] > 0 else np.nan
    
    # Amihud illiquidity measure (approximation)
    features['amihud_illiquidity'] = features['spread'] / (features['bid_volume_10'] + features['ask_volume_10'])
    
    # Order book slope (price change per volume)
    if len(bids) >= 5:
        bid_prices_5 = bids[:5, 0]
        bid_volumes_cumsum = np.cumsum(bids[:5, 1])
        features['bid_slope'] = (bid_prices_5[-1] - bid_prices_5[0]) / bid_volumes_cumsum[-1] if bid_volumes_cumsum[-1] > 0 else 0
    else:
        features['bid_slope'] = 0
        
    if len(asks) >= 5:
        ask_prices_5 = asks[:5, 0]
        ask_volumes_cumsum = np.cumsum(asks[:5, 1])
        features['ask_slope'] = (ask_prices_5[-1] - ask_prices_5[0]) / ask_volumes_cumsum[-1] if ask_volumes_cumsum[-1] > 0 else 0
    else:
        features['ask_slope'] = 0
    
    # Volume-weighted spread
    total_vol_10 = features['bid_volume_10'] + features['ask_volume_10']
    features['vw_spread'] = (features['vwap_ask_5'] - features['vwap_bid_5']) if not np.isnan(features['vwap_ask_5']) else features['spread']
    
    # Relative spread (normalized by mid price)
    features['relative_spread'] = features['spread'] / features['mid_price']
    
    # Effective spread (considering volume)
    features['effective_spread'] = 2 * abs(features['microprice'] - features['mid_price'])
    
    return features

In [5]:
extract_orderbook_features(orderbook_data[0])

{'timestamp': 1765241057511,
 'symbol': 'BTC/USDT',
 'exchange': 'binanceus',
 'best_bid': 90390.05,
 'best_ask': 90698.75,
 'mid_price': 90544.4,
 'spread': 308.6999999999971,
 'spread_bps': 34.093770570018364,
 'bid_size_level_0': 2.23167,
 'ask_size_level_0': 0.01966,
 'bid_price_level_0': 90390.05,
 'ask_price_level_0': 90698.75,
 'bid_size_level_1': 0.1275,
 'bid_price_level_1': 90390.04,
 'ask_size_level_1': 0.1001,
 'ask_price_level_1': 90698.76,
 'bid_size_level_2': 0.10912,
 'bid_price_level_2': 90389.98,
 'ask_size_level_2': 2e-05,
 'ask_price_level_2': 90698.82,
 'bid_size_level_3': 0.94748,
 'bid_price_level_3': 90389.82,
 'ask_size_level_3': 0.12896,
 'ask_price_level_3': 90698.85,
 'bid_size_level_4': 0.00163,
 'bid_price_level_4': 90368.32,
 'ask_size_level_4': 0.057,
 'ask_price_level_4': 90699.0,
 'bid_size_level_5': 0.1001,
 'bid_price_level_5': 90282.94,
 'ask_size_level_5': 0.01058,
 'ask_price_level_5': 90699.14,
 'bid_size_level_6': 0.12979,
 'bid_price_level_6': 

In [None]:
# CCXT Implementation Plan
# 1. Add CcxtConfig to config/config.py
# 2. Create ingestion/collectors/ccxt_collector.py
# 3. Update ingestion/orchestrators/ingestion_pipeline.py
# 4. Update config/config.examples.yaml

In [1]:
import sys
import os
import duckdb
import logging
from tqdm import tqdm
import polars as pl
import pandas as pd
import numpy as np
from pathlib import Path
from config import load_config
import pyarrow.parquet as pq
import pyarrow as pa

logger = logging.getLogger(__name__)

config = load_config("config/config.yaml")

logging.basicConfig(
        level=getattr(logging, config.log_level),
        format=config.log_format
    )

  class DatabentoConfig(BaseModel):


In [None]:
pl_df = pl.scan_parquet("F:/processed/coinbase/ticker/**/*.parquet")
df = pl_df.filter(pl.col("product_id") == "BTC-USD").collect()
df

## Compaction

In [None]:
from etl.repartitioner import ParquetCompactor

compactor = ParquetCompactor(
    dataset_dir="F:/processed/coinbase/level2/",  # Directory containing the dataset
    target_file_size_mb=100,  # Target 100MB files
)

stats = compactor.compact(
    min_file_count=2,          # Only compact partitions with 2+ files
    target_file_count=1,       # Consolidate to exactly 1 file per partition
    delete_source_files=True,  # Delete original files after compaction
    dry_run=False,
)
stats

## Syncing

In [1]:
import logging
from config import load_config
from storage.sync import StorageSync
from storage.factory import create_sync_source_storage, create_sync_destination_storage

logger = logging.getLogger(__name__)
config = load_config("config/config.yaml")
logging.basicConfig(
        level=getattr(logging, config.log_level),
        format=config.log_format
    )

In [2]:
s3_storage = create_sync_destination_storage(config)  # s3
local_storage = create_sync_source_storage(config)  # local


source_storage = local_storage # s3_storage
destination_storage = s3_storage # local_storage

sync = StorageSync(
    source=source_storage,
    destination=destination_storage,
)

2026-01-02 11:45:05,133 - storage.factory - INFO - [sync_destination] Initializing S3 storage: market-data-vault
2026-01-02 11:45:05,400 - botocore.credentials - INFO - Found credentials in shared credentials file: ~/.aws/credentials
2026-01-02 11:45:05,646 - botocore.credentials - INFO - Found credentials in shared credentials file: ~/.aws/credentials
2026-01-02 11:45:05,724 - storage.factory - INFO - [sync_source] Initializing local storage: ./data
2026-01-02 11:45:05,724 - storage.sync - INFO - [StorageSync] Initialized: local:./data → s3:market-data-vault


In [None]:
SYNC_PATHS = [
    "raw/ready/ccxt/ticker/",
    "raw/ready/ccxt/trades/",
    "raw/ready/ccxt/orderbook/",
    # "processed/ccxt/ticker/",
    # "processed/ccxt/trades/",
    # "processed/ccxt/orderbook/hf/",
    # "processed/ccxt/orderbook/bars/"
]
for sync_path in SYNC_PATHS:
    print(f"Syncing path: {sync_path}")
    # Sync processed parquet files
    stats = sync.sync(
        source_path=sync_path,
        dest_path=sync_path,
        pattern="**/*.parquet",
        recursive_list_files=False,
        delete_after_transfer=False,
        max_workers=42,
        skip_existing=True,
        dry_run=True,
    )
    print(stats)

## Deleting

In [4]:
from storage.base import batch_delete_files

In [6]:
storage = sync.destination
storage

<storage.base.S3Storage at 0x273a6382d80>

### Delete Non-Partitioned Files Only

To delete only the non-partitioned parquet files (in immediate directory) while keeping partitioned ones (in subdirectories), use `recursive=False`:

In [8]:
# Get only non-partitioned files (immediate directory, not recursive)
# This will match: raw/ready/ccxt/ticker/*.parquet
# But NOT: raw/ready/ccxt/ticker/exchange=binanceus/**/*.parquet
non_partitioned_files = storage.list_files(
    path="raw/ready/ccxt/ticker/",
    pattern="*.parquet",
    recursive=False  # Key: only immediate directory
)

print(f"Found {len(non_partitioned_files)} non-partitioned files")

Found 0 non-partitioned files


In [36]:
result = batch_delete_files(
    storage, 
    paths=[f["path"] for f in non_partitioned_files],
    dry_run=False
)
result

2025-12-20 02:28:50,184 - storage.base - INFO - Batch delete complete: 170 deleted, 0 failed out of 170 files


{'deleted': 170,
 'failed': 0,
 'errors': [],
 'files': ['raw/ready/ccxt/trades/segment_20251212T17_00001.parquet',
  'raw/ready/ccxt/trades/segment_20251212T17_00007.parquet',
  'raw/ready/ccxt/trades/segment_20251212T17_00010.parquet',
  'raw/ready/ccxt/trades/segment_20251212T17_00011.parquet',
  'raw/ready/ccxt/trades/segment_20251212T17_00014.parquet',
  'raw/ready/ccxt/trades/segment_20251212T18_00002.parquet',
  'raw/ready/ccxt/trades/segment_20251212T18_00005.parquet',
  'raw/ready/ccxt/trades/segment_20251212T18_00006.parquet',
  'raw/ready/ccxt/trades/segment_20251212T18_00007.parquet',
  'raw/ready/ccxt/trades/segment_20251212T18_00008.parquet',
  'raw/ready/ccxt/trades/segment_20251212T18_00009.parquet',
  'raw/ready/ccxt/trades/segment_20251212T18_00010.parquet',
  'raw/ready/ccxt/trades/segment_20251212T19_00001.parquet',
  'raw/ready/ccxt/trades/segment_20251212T19_00002.parquet',
  'raw/ready/ccxt/trades/segment_20251212T19_00003.parquet',
  'raw/ready/ccxt/trades/segme

In [None]:
# To actually delete (remove dry_run=True)
# result = batch_delete_files(
#     storage, 
#     paths=[f["path"] for f in non_partitioned_files],
#     dry_run=False
# )
# print(f"Deleted {result['deleted']} files, {result['failed']} failed")

### Delete Multiple Channels at Once

In [None]:
# Delete non-partitioned files across multiple channels
channels = ["ticker", "trades", "orderbook"]
all_files_to_delete = []

for channel in channels:
    files = storage.list_files(
        path=f"raw/ready/ccxt/{channel}/",
        pattern="*.parquet",
        recursive=False  # Only immediate directory
    )
    all_files_to_delete.extend([f["path"] for f in files])
    print(f"{channel}: {len(files)} non-partitioned files")

print(f"\nTotal: {len(all_files_to_delete)} files to delete")

# Dry run
result = batch_delete_files(storage, paths=all_files_to_delete, dry_run=True)
print(f"Would delete {len(result['files'])} files")

# To actually delete:
# result = batch_delete_files(storage, paths=all_files_to_delete, dry_run=False)
# print(f"Deleted {result['deleted']} files, {result['failed']} failed")

## Upload Local & Cloud

In [None]:
import logging
from pathlib import Path
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor, as_completed
from config import load_config
from storage.factory import (
    create_etl_storage_input,
    create_etl_storage_output,
)
from etl.job import ETLJob

logger = logging.getLogger(__name__)

config = load_config("config/config.yaml")

logging.basicConfig(
        level=getattr(logging, config.log_level),
        format=config.log_format
    )

# Create storage backends
storage_input = create_etl_storage_input(config)
storage_output = create_etl_storage_output(config)

logger.info(f"Storage Input:  {storage_input.backend_type} @ {storage_input.base_path}")
logger.info(f"Storage Output: {storage_output.backend_type} @ {storage_output.base_path}")


SOURCE_PATHS = [
    "processed/coinbase/market_trades/",
    "processed/coinbase/ticker/",
    "processed/coinbase/level2/",
]

for upload_path in SOURCE_PATHS:
    print(f"\nUploading files from {upload_path}...")
    files_to_upload = storage_input.list_files(upload_path, pattern="**/*.parquet")
    print(f"Found {len(files_to_upload)} files to upload.")

    def upload_file(file_info):
        """Upload a single file and delete it locally"""
        fpath = file_info['path']
        try:
            storage_output.write_file(
                local_path=storage_input.get_full_path(fpath),
                remote_path=fpath
            )
            storage_input.delete(fpath)
            return {'success': True, 'path': fpath}
        except Exception as e:
            return {'success': False, 'path': fpath, 'error': str(e)}

    # Parallelize uploads with thread pool
    max_workers = 10  # Adjust based on your bandwidth and system
    failed_uploads = []

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(upload_file, f): f for f in files_to_upload}
        
        for future in tqdm(as_completed(futures), total=len(files_to_upload), desc="Uploading files"):
            result = future.result()
            if not result['success']:
                failed_uploads.append(result)
                print(f"Failed to upload {result['path']}: {result['error']}")

    if failed_uploads:
        print(f"\nFailed to upload {len(failed_uploads)} files")
    else:
        print(f"\nSuccessfully uploaded all {len(files_to_upload)} files")

## ETL

In [1]:
from etl.parquet_etl_pipeline import ParquetETLPipeline, ParquetETLConfig

# Configure
config = ParquetETLConfig(
    horizons=[5, 15, 60, 300, 900],
    bar_durations=[60, 300, 900, 3600],
    max_levels=20,
    ofi_levels=10,
    mode='hybrid',
)

pipeline = ParquetETLPipeline(config)


In [None]:
# Process ticker (fully vectorized)
ticker_df = pipeline.process_ticker(
    input_path="data/raw/ready/ccxt/ticker",
    output_path="data/silver/ticker",
)
print(f"Processed {len(ticker_df)} ticker records")

In [None]:
# Process trades (fully vectorized)
trades_df = pipeline.process_trades(
    input_path="data/raw/ready/ccxt/trades",
    output_path="data/silver/trades",
)
print(f"Processed {len(trades_df)} trade records")

In [None]:
# Process orderbook with trades (hybrid)
hf_df, bars_df = pipeline.process_orderbook_with_trades(
    orderbook_path="data/raw/ready/ccxt/orderbook",
    trades_path="data/raw/ready/ccxt/trades",
    output_hf_path="data/silver/orderbook/hf",
    output_bars_path="data/silver/orderbook/bars",
)
print(f"Produced {len(hf_df)} HF feature rows")
print(f"Produced {len(bars_df)} bar aggregates")

In [14]:
import logging
import sys
from pathlib import Path
from datetime import datetime, timedelta
import os

from config import load_config
from storage.factory import (
    create_etl_storage_input,
    create_etl_storage_output,
    get_etl_input_path,
    get_etl_output_path,
    get_processing_path
)
from etl.job import ETLJob

logger = logging.getLogger(__name__)

config = load_config("config/config.yaml")

logging.basicConfig(
        level=getattr(logging, config.log_level),
        format=config.log_format
    )

In [15]:
# Create storage backends
storage_input = create_etl_storage_input(config)
storage_output = create_etl_storage_output(config)

# Get paths from config
input_path = get_etl_input_path(config, "coinbase")
output_path = get_etl_output_path(config, "coinbase")
processing_path = get_processing_path(config, "coinbase")

logger.info(f"Storage Input:  {storage_input.backend_type} @ {storage_input.base_path}")
logger.info(f"Storage Output: {storage_output.backend_type} @ {storage_output.base_path}")
logger.info(f"Input path: {input_path}")
logger.info(f"Output path: {output_path}")
logger.info(f"Processing path: {processing_path}")

2025-12-04 10:56:44,371 - storage.factory - INFO - [etl_input] Initializing local storage: F:/
2025-12-04 10:56:44,373 - storage.factory - INFO - [etl_output] Initializing S3 storage: market-data-vault
2025-12-04 10:56:44,455 - __main__ - INFO - Storage Input:  local @ F:/
2025-12-04 10:56:44,457 - __main__ - INFO - Storage Output: s3 @ market-data-vault
2025-12-04 10:56:44,457 - __main__ - INFO - Input path: raw/ready/coinbase
2025-12-04 10:56:44,460 - __main__ - INFO - Output path: processed/coinbase
2025-12-04 10:56:44,461 - __main__ - INFO - Processing path: raw/processing/coinbase


In [16]:
channel_config = None
if hasattr(config.etl, 'channels') and config.etl.channels:
    channel_config = {
        channel_name: {
            "partition_cols": channel_cfg.partition_cols,
            "processor_options": channel_cfg.processor_options,
        }
        for channel_name, channel_cfg in config.etl.channels.items()
        if channel_cfg.enabled
    }
channel_config

{'level2': {'partition_cols': ['product_id', 'date'],
  'processor_options': {'reconstruct_lob': False,
   'compute_features': False,
   'add_derived_fields': True}},
 'market_trades': {'partition_cols': ['product_id', 'date'],
  'processor_options': {'add_derived_fields': True, 'infer_aggressor': False}},
 'ticker': {'partition_cols': ['product_id', 'date'],
  'processor_options': {'add_derived_fields': True}}}

In [17]:
job = ETLJob(
    storage_input=storage_input,
    storage_output=storage_output,
    input_path=input_path,
    output_path=output_path,
    delete_after_processing=config.etl.delete_after_processing,
    processing_path=processing_path,
    channel_config=channel_config,
    )

2025-12-04 10:57:14,545 - etl.processors.coinbase.level2_processor - INFO - [CoinbaseLevel2Processor] Initialized: reconstruct_lob=False, compute_features=False
2025-12-04 10:57:14,546 - etl.processors.raw_parser - INFO - [RawParser] Initialized for source=coinbase, channel_filter=level2
2025-12-04 10:57:14,546 - etl.writers.parquet_writer - INFO - [ParquetWriter] Initialized: storage=s3, compression=snappy
2025-12-04 10:57:14,548 - etl.orchestrators.pipeline - INFO - [ETLPipeline] Initialized: reader=NDJSONReader, processors=RawParser → Level2Processor, writer=ParquetWriter
2025-12-04 10:57:14,548 - etl.processors.coinbase.trades_processor - INFO - [CoinbaseTradesProcessor] Initialized: add_derived_fields=True, infer_aggressor=False
2025-12-04 10:57:14,550 - etl.processors.raw_parser - INFO - [RawParser] Initialized for source=coinbase, channel_filter=market_trades
2025-12-04 10:57:14,551 - etl.writers.parquet_writer - INFO - [ParquetWriter] Initialized: storage=s3, compression=snappy

In [18]:
job.process_all()

2025-12-04 10:58:46,494 - etl.job - INFO - [ETLJob] Scanning for segments in raw/ready/coinbase
2025-12-04 10:58:46,496 - etl.job - INFO - [ETLJob] Found 3 segment(s) to process
2025-12-04 10:58:46,509 - etl.job - INFO - [ETLJob] Processing segment: segment_20251204T18_00011.ndjson
2025-12-04 10:58:46,510 - etl.orchestrators.coinbase_segment_pipeline - INFO - [CoinbaseSegmentPipeline] Processing segment: segment_20251204T18_00011.ndjson
2025-12-04 10:58:46,510 - etl.orchestrators.pipeline - INFO - [ETLPipeline] Executing: F:\raw\processing\coinbase\segment_20251204T18_00011.ndjson → processed/coinbase/level2 (partition_cols=['product_id', 'date'])
2025-12-04 10:58:56,576 - etl.readers.ndjson_reader - INFO - [NDJSONReader] Read 107200 records from segment_20251204T18_00011.ndjson (0 errors)
2025-12-04 10:59:00,366 - etl.writers.parquet_writer - INFO - [ParquetWriter] Wrote 10755 records to processed/coinbase/level2/product_id=ADA-USD/date=2025-12-04/part_20251204T10_114a4663.parquet (91

## Test Startup Migration (active/ → ready/)

In [None]:
# Simulate orphan files in active/ directory by creating test files
import shutil
from pathlib import Path
from storage.factory import create_etl_storage_input
from config import load_config

config = load_config()
storage = create_etl_storage_input(config)

# Create test orphan file in active/
test_active_path = Path(storage.base_path) / "raw/active/ccxt/ticker/exchange=test/symbol=TEST-USD/year=2025/month=1/day=19/hour=14"
test_active_path.mkdir(parents=True, exist_ok=True)
test_file = test_active_path / "test_orphan.parquet"
test_file.write_text("fake parquet content")

print(f"Created test orphan file: {test_file}")
print(f"File exists: {test_file.exists()}")

In [None]:
# Now test the StreamingParquetWriter startup migration
from ingestion.writers.parquet_writer import StreamingParquetWriter
import asyncio

# Create writer instance
writer = StreamingParquetWriter(
    storage=storage,
    active_path="raw/active/ccxt/",
    ready_path="raw/ready/ccxt/",
    source_name="test",
    batch_size=100,
    flush_interval_seconds=5,
    queue_maxsize=10000,
    segment_max_mb=100,
    partition_by=["exchange", "symbol"]
)

# Start the writer - this should trigger migration
await writer.start()

# Check if file was migrated
test_ready_path = Path(storage.base_path) / "raw/ready/ccxt/ticker/exchange=test/symbol=TEST-USD/year=2025/month=1/day=19/hour=14"
migrated_file = test_ready_path / "test_orphan.parquet"

print(f"\n--- Migration Test Results ---")
print(f"Original file exists in active/: {test_file.exists()}")
print(f"Migrated file exists in ready/: {migrated_file.exists()}")

if migrated_file.exists():
    print("✅ SUCCESS: Startup migration working!")
else:
    print("❌ FAILED: File was not migrated")

# Cleanup
await writer.stop()
if migrated_file.exists():
    migrated_file.unlink()
    # Clean up empty dirs
    for parent in migrated_file.parents:
        if parent.exists() and not any(parent.iterdir()):
            parent.rmdir()

## Fix Duplicated S3 Paths from NDJSON Migration

In [3]:
from storage.factory import create_sync_destination_storage
from config import load_config
from tqdm import tqdm
import logging

logger = logging.getLogger(__name__)
config = load_config("config/config.yaml")

# Get S3 storage
s3_storage = create_sync_destination_storage(config)

print(f"Storage backend: {s3_storage.backend_type}")
print(f"Base path: {s3_storage.base_path}")

# Define the channels with duplicated paths
CHANNELS = ["ticker", "trades", "orderbook"]
BASE_PATH = "raw/ready/ccxt"

# First, scan for files in duplicated paths
all_moves = []
for channel in CHANNELS:
    duplicated_path = f"{BASE_PATH}/{channel}/{BASE_PATH}/{channel}/"
    correct_path_prefix = f"{BASE_PATH}/{channel}/"
    
    print(f"\n{'='*80}")
    print(f"Scanning: {duplicated_path}")
    print(f"{'='*80}")
    
    try:
        # List all files recursively in the duplicated path
        files = s3_storage.list_files(duplicated_path, pattern="**/*", recursive=True)
        print(f"Found {len(files)} files in duplicated path")
        
        for file_info in files:
            old_path = file_info['path']
            
            # Extract the part after the duplicated prefix
            # e.g., "raw/ready/ccxt/orderbook/raw/ready/ccxt/orderbook/exchange=binanceus/..."
            # should become "raw/ready/ccxt/orderbook/exchange=binanceus/..."
            if duplicated_path in old_path:
                # Remove the duplicated prefix
                relative_path = old_path.replace(duplicated_path, "", 1)
                new_path = correct_path_prefix + relative_path
                
                all_moves.append({
                    'old_path': old_path,
                    'new_path': new_path,
                    'channel': channel,
                    'size': file_info.get('size', 0)
                })
        
    except Exception as e:
        print(f"Error scanning {channel}: {e}")

print(f"\n{'='*80}")
print(f"SUMMARY")
print(f"{'='*80}")
print(f"Total files to move: {len(all_moves)}")
total_size_mb = sum(m['size'] for m in all_moves) / (1024 * 1024)
print(f"Total size: {total_size_mb:.2f} MB")

# Group by channel
for channel in CHANNELS:
    channel_files = [m for m in all_moves if m['channel'] == channel]
    if channel_files:
        channel_size_mb = sum(m['size'] for m in channel_files) / (1024 * 1024)
        print(f"  {channel}: {len(channel_files)} files ({channel_size_mb:.2f} MB)")

2025-12-29 12:46:13,044 - storage.factory - INFO - [sync_destination] Initializing S3 storage: market-data-vault


Storage backend: s3
Base path: market-data-vault

Scanning: raw/ready/ccxt/ticker/raw/ready/ccxt/ticker/
Found 23414 files in duplicated path

Scanning: raw/ready/ccxt/trades/raw/ready/ccxt/trades/
Found 11888 files in duplicated path

Scanning: raw/ready/ccxt/orderbook/raw/ready/ccxt/orderbook/
Found 128273 files in duplicated path

SUMMARY
Total files to move: 163575
Total size: 8625.25 MB
  ticker: 23414 files (420.73 MB)
  trades: 11888 files (379.95 MB)
  orderbook: 128273 files (7824.57 MB)


In [4]:
# DRY RUN: Verify moves without executing
print(f"\n{'='*80}")
print("DRY RUN - Verifying move operations")
print(f"{'='*80}")

dry_run_errors = []
for move in tqdm(all_moves[:5], desc="Dry run check"):  # Check first 5
    old_path = move['old_path']
    new_path = move['new_path']
    
    # Check if source exists
    if not s3_storage.exists(old_path):
        dry_run_errors.append(f"Source doesn't exist: {old_path}")
    
    # Check if destination already exists
    if s3_storage.exists(new_path):
        dry_run_errors.append(f"Destination already exists: {new_path}")

if dry_run_errors:
    print(f"\n⚠️ Found {len(dry_run_errors)} potential issues:")
    for error in dry_run_errors[:10]:  # Show first 10
        print(f"  - {error}")
else:
    print("✅ Dry run passed - no issues detected")


DRY RUN - Verifying move operations


Dry run check: 100%|██████████| 5/5 [00:02<00:00,  2.16it/s]

✅ Dry run passed - no issues detected





In [5]:
# EXECUTE: Move files from duplicated paths to correct paths
from concurrent.futures import ThreadPoolExecutor, as_completed

print(f"\n{'='*80}")
print("EXECUTING FILE MOVES")
print(f"{'='*80}")
print(f"Moving {len(all_moves)} files...")
print("This may take a while depending on file count and size.")

successful_moves = []
failed_moves = []

def move_s3_file(move_info):
    """Move a single file using S3 copy + delete."""
    old_path = move_info['old_path']
    new_path = move_info['new_path']
    
    try:
        # Get S3 keys (remove any leading slashes)
        source_key = s3_storage._get_s3_key(old_path)
        dest_key = s3_storage._get_s3_key(new_path)
        
        # Use S3 server-side copy (no data transfer)
        copy_source = {
            'Bucket': s3_storage.bucket,
            'Key': source_key
        }
        
        s3_storage.s3_client.copy_object(
            CopySource=copy_source,
            Bucket=s3_storage.bucket,
            Key=dest_key
        )
        
        # Delete original after successful copy
        s3_storage.delete(old_path)
        
        return {'success': True, 'move': move_info}
        
    except Exception as e:
        return {'success': False, 'move': move_info, 'error': str(e)}

# Use ThreadPoolExecutor for parallel moves
max_workers = 50  # Can handle more since it's server-side copy
print(f"Using {max_workers} parallel workers for S3 server-side copy...")

with ThreadPoolExecutor(max_workers=max_workers) as executor:
    futures = {executor.submit(move_s3_file, move): move for move in all_moves}
    
    for future in tqdm(as_completed(futures), total=len(all_moves), desc="Moving files"):
        result = future.result()
        if result['success']:
            successful_moves.append(result['move'])
        else:
            failed_moves.append(result)

print(f"\n{'='*80}")
print("RESULTS")
print(f"{'='*80}")
print(f"✅ Successfully moved: {len(successful_moves)} files")
print(f"❌ Failed: {len(failed_moves)} files")

if failed_moves:
    print(f"\nFailed moves:")
    for failure in failed_moves[:10]:  # Show first 10
        print(f"  - {failure['move']['old_path']}")
        print(f"    Error: {failure['error']}")


EXECUTING FILE MOVES
Moving 163575 files...
This may take a while depending on file count and size.
Using 50 parallel workers for S3 server-side copy...


Moving files: 100%|██████████| 163575/163575 [16:25<00:00, 165.95it/s]


RESULTS
✅ Successfully moved: 163575 files
❌ Failed: 0 files





In [6]:
# CLEANUP: Remove empty duplicated path directories
print(f"\n{'='*80}")
print("CLEANUP - Removing empty duplicated directories")
print(f"{'='*80}")

for channel in CHANNELS:
    duplicated_path = f"{BASE_PATH}/{channel}/{BASE_PATH}/{channel}/"
    
    try:
        # Check if path still has files
        remaining_files = s3_storage.list_files(duplicated_path, pattern="**/*", recursive=True)
        
        if not remaining_files:
            print(f"✅ {channel}: No files remain in duplicated path")
            # Note: S3 doesn't have directories, so no explicit cleanup needed
        else:
            print(f"⚠️ {channel}: Still has {len(remaining_files)} files in duplicated path")
            
    except Exception as e:
        print(f"Error checking {channel}: {e}")

print(f"\n{'='*80}")
print("MIGRATION COMPLETE")
print(f"{'='*80}")


CLEANUP - Removing empty duplicated directories
✅ ticker: No files remain in duplicated path
✅ trades: No files remain in duplicated path
✅ orderbook: No files remain in duplicated path

MIGRATION COMPLETE
