# Colab: Binance Futures Direction Model — **Streaming & Chunked** (LightGBM)

**Purpose:** Handle **very large** Binance parquet files **without OOM**.

### Key tactics
- Use **pyarrow** `ParquetFile.iter_batches()` to read **in chunks**.
- For tick/snapshot datasets (`trades`, `aggTrades`, `bookDepth`), we:
  - Stream batches → parse time → compute a 15‑min bin (`.floor('15min')`) per row.
  - Aggregate **within the chunk** by bin.
  - **Accumulate** into a per‑file 15m accumulator (index ≈ tens of thousands rows tops).
  - Save a tiny **per‑file cache parquet** to Drive.
- Only after caching all heavy files do we merge the small 15m frames and train.

Robust timestamp detection + `open_time` reconstruction for klines are preserved.


## 0) Setup & Mount

In [None]:
%pip -q install lightgbm==4.3.0 pyarrow==16.1.0 fastparquet==2024.5.0
import os, sys, json, gc, pickle
import numpy as np
import pandas as pd
from pathlib import Path
from google.colab import drive
drive.mount('/content/drive')
print('Drive mounted.')

## 1) Paths & Manifest

In [None]:
DATA_ROOT = Path('/content/drive/MyDrive/binance_data')
CACHE_DIR = Path('/content/drive/MyDrive/binance_cache')
CACHE_DIR.mkdir(parents=True, exist_ok=True)
ARTIFACT_DIR = Path('/content/drive/MyDrive/binance_models')
ARTIFACT_DIR.mkdir(parents=True, exist_ok=True)
assert DATA_ROOT.exists(), f'Data root not found: {DATA_ROOT}'

def build_manifest(root: Path) -> dict:
    manifest = {k: [] for k in [
        'klines','markpriceklines','indexpriceklines','premiumindexklines',
        'metrics','trades','aggtrades','bookdepth','other']}
    for p in root.rglob('*.parquet'):
        name = p.name.lower()
        if 'markprice' in name and 'kline' in name:
            manifest['markpriceklines'].append(str(p))
        elif 'indexprice' in name and 'kline' in name:
            manifest['indexpriceklines'].append(str(p))
        elif 'premiumindex' in name and 'kline' in name:
            manifest['premiumindexklines'].append(str(p))
        elif 'kline' in name or 'klines' in name:
            manifest['klines'].append(str(p))
        elif 'aggtrade' in name:
            manifest['aggtrades'].append(str(p))
        elif 'bookdepth' in name or 'orderbook' in name:
            manifest['bookdepth'].append(str(p))
        elif 'metrics' in name or 'open_interest' in name:
            manifest['metrics'].append(str(p))
        elif 'trade' in name:
            manifest['trades'].append(str(p))
        else:
            manifest['other'].append(str(p))
    return manifest

manifest = build_manifest(DATA_ROOT)
for k,v in manifest.items():
    print(f"{k}: {len(v)} files")

## 2) Streaming helpers & robust time handling

In [None]:
import pyarrow as pa
import pyarrow.parquet as pq

BAR_FREQ = '15min'

def _cache_path(kind: str, src_path: str) -> Path:
    fn = Path(src_path).name.replace('.parquet', f'.{kind}.15m.parquet')
    return CACHE_DIR / fn

def detect_time_col(df, candidates):
    for c in candidates:
        if c in df.columns: return c
    return None

def to_utc_index(df, preferred_time_cols=('open_time','time','timestamp','create_time','transact_time','close_time'), reconstruct_from_close=False):
    if df.empty: return df
    tcol = detect_time_col(df, preferred_time_cols)
    if reconstruct_from_close and (tcol is None or tcol == 'close_time'):
        if 'close_time' in df.columns:
            df = df.copy(); close_ts = pd.to_datetime(df['close_time'], utc=True, errors='coerce')
            df['open_time'] = close_ts - pd.Timedelta(BAR_FREQ); tcol = 'open_time'
        else:
            raise ValueError('Need close_time to reconstruct open_time.')
    if tcol is None:
        raise AssertionError(f'Expected one of {preferred_time_cols}, got {list(df.columns)[:10]} ...')
    df[tcol] = pd.to_datetime(df[tcol], utc=True, errors='coerce')
    df = df.dropna(subset=[tcol]).sort_values(tcol).drop_duplicates(tcol)
    return df.set_index(tcol).tz_localize('UTC') if df.index.tz is None else df.set_index(tcol).tz_convert('UTC')

def stream_parquet_batches(path, columns=None, batch_size=250_000):
    pf = pq.ParquetFile(path)
    for batch in pf.iter_batches(batch_size=batch_size, columns=columns):
        # Convert to pandas with minimal dtype inflation
        yield batch.to_pandas()

def _ensure_float32(df, cols):
    for c in cols:
        if c in df.columns:
            df[c] = pd.to_numeric(df[c], errors='coerce').astype('float32')
    return df

def _to_bool_series(s):
    if s.dtype == 'bool': return s
    mapping_true  = {True, 'true', 'True', 1, '1'}
    mapping_false = {False, 'false', 'False', 0, '0'}
    return s.map(lambda x: True if x in mapping_true else (False if x in mapping_false else False)).fillna(False).astype(bool)

# ---------- Chunked aggregators per file ----------
def aggregate_trades_file_streaming(path):
    acc = None
    for chunk in stream_parquet_batches(path, columns=['price','qty','quote_qty','time','is_buyer_maker']):
        if chunk.empty: continue
        tcol = detect_time_col(chunk, ('time','timestamp')) or 'time'
        chunk[tcol] = pd.to_datetime(chunk[tcol], utc=True, errors='coerce')
        chunk = chunk.dropna(subset=[tcol]).set_index(tcol)
        chunk = _ensure_float32(chunk, ['price','qty','quote_qty'])
        if 'is_buyer_maker' in chunk.columns:
            chunk['is_buyer_maker'] = _to_bool_series(chunk['is_buyer_maker'])
        else:
            chunk['is_buyer_maker'] = False
        chunk['bin'] = chunk.index.floor(BAR_FREQ)
        chunk['px_qty'] = (chunk['price'] * chunk['qty']).fillna(0.0).astype('float32')
        # vectorized groupby per chunk
        grp = chunk.groupby('bin', sort=False)
        qty_sum    = grp['qty'].sum(min_count=1)
        dollar_sum = grp['px_qty'].sum(min_count=1)
        trades_cnt = grp.size().astype('float32')
        vwap       = dollar_sum / (qty_sum.replace(0, np.nan))
        # OFI without apply
        buy_qty  = chunk.loc[~chunk['is_buyer_maker'], 'qty'].groupby(chunk['bin']).sum(min_count=1)
        sell_qty = chunk.loc[ chunk['is_buyer_maker'], 'qty'].groupby(chunk['bin']).sum(min_count=1)
        ofi      = (buy_qty - sell_qty) / (qty_sum + 1e-9)
        out = pd.DataFrame({
            'trades_count': trades_cnt.astype('float32'),
            'qty_sum':      qty_sum.astype('float32'),
            'dollar_sum':   dollar_sum.astype('float32'),
            'vwap':         vwap.astype('float32'),
            'buy_qty':      buy_qty.astype('float32'),
            'sell_qty':     sell_qty.astype('float32'),
            'ofi':          ofi.astype('float32'),
        })
        acc = out if acc is None else acc.add(out, fill_value=0)
        del chunk, grp, out; gc.collect()
    return acc if acc is not None else pd.DataFrame()

def aggregate_aggtrades_file_streaming(path):
    acc = None
    for chunk in stream_parquet_batches(path, columns=['price','quantity','transact_time']):
        if chunk.empty: continue
        tcol = detect_time_col(chunk, ('transact_time','time','timestamp')) or 'transact_time'
        chunk[tcol] = pd.to_datetime(chunk[tcol], utc=True, errors='coerce')
        chunk = chunk.dropna(subset=[tcol]).set_index(tcol)
        chunk.rename(columns={'quantity':'qty'}, inplace=True)
        chunk = _ensure_float32(chunk, ['price','qty'])
        chunk['bin'] = chunk.index.floor(BAR_FREQ)
        chunk['px_qty'] = (chunk['price'] * chunk['qty']).fillna(0.0).astype('float32')
        grp = chunk.groupby('bin', sort=False)
        qty_sum    = grp['qty'].sum(min_count=1)
        dollar_sum = grp['px_qty'].sum(min_count=1)
        agg_count  = grp.size().astype('float32')
        vwap       = dollar_sum / (qty_sum.replace(0, np.nan))
        out = pd.DataFrame({
            'agg_count':      agg_count,
            'agg_qty_sum':    qty_sum.astype('float32'),
            'agg_dollar_sum': dollar_sum.astype('float32'),
            'agg_vwap':       vwap.astype('float32'),
        })
        acc = out if acc is None else acc.add(out, fill_value=0)
        del chunk, grp, out; gc.collect()
    return acc if acc is not None else pd.DataFrame()

def aggregate_bookdepth_file_streaming(path):
    acc = None
    for chunk in stream_parquet_batches(path, columns=['timestamp','percentage','depth','notional']):
        if chunk.empty: continue
        tcol = detect_time_col(chunk, ('timestamp','time')) or 'timestamp'
        chunk[tcol] = pd.to_datetime(chunk[tcol], utc=True, errors='coerce')
        chunk = chunk.dropna(subset=[tcol]).set_index(tcol)
        chunk = _ensure_float32(chunk, ['percentage','depth','notional'])
        chunk['bin'] = chunk.index.floor(BAR_FREQ)
        grp = chunk.groupby('bin', sort=False)
        out = pd.DataFrame({
            'bd_notional_sum': grp['notional'].sum(min_count=1).astype('float32'),
            'bd_depth_sum':    grp['depth'].sum(min_count=1).astype('float32'),
        })
        acc = out if acc is None else acc.add(out, fill_value=0)
        del chunk, grp, out; gc.collect()
    return acc if acc is not None else pd.DataFrame()

def cache_or_build(kind, path, builder_func):
    cpath = _cache_path(kind, path)
    if cpath.exists():
        return pd.read_parquet(cpath)
    df = builder_func(path)
    if df is None or df.empty:
        return pd.DataFrame()
    df.index.name = 'time'
    df.sort_index(inplace=True)
    df.to_parquet(cpath)
    return df


## 3) Load sources (stream + cache heavy files)

In [None]:
# Light sources
def load_concat(files, columns=None):
    dfs = []
    for f in files:
        try:
            df = pd.read_parquet(f, columns=columns) if columns else pd.read_parquet(f)
            dfs.append(df)
        except Exception as e:
            print('Failed to load', f, e)
    return pd.concat(dfs, ignore_index=True) if dfs else pd.DataFrame()

# KLINES (base)
kl_raw = load_concat(manifest['klines'], columns=['open_time','close_time','open','high','low','close','volume'])
print('klines raw:', kl_raw.shape)
kl_df = to_utc_index(kl_raw, preferred_time_cols=('open_time','time','timestamp','close_time'), reconstruct_from_close=True)
keep = [c for c in ['open','high','low','close','volume'] if c in kl_df.columns]
kl_df = kl_df[keep].astype('float32')
print('klines aligned:', kl_df.shape)

# MARK / INDEX / PREMIUM
mk_df = to_utc_index(load_concat(manifest['markpriceklines'], columns=['open_time','close_time','open','high','low','close','volume']))
ix_df = to_utc_index(load_concat(manifest['indexpriceklines'], columns=['open_time','close_time','open','high','low','close','volume']))
pr_df = to_utc_index(load_concat(manifest['premiumindexklines'], columns=['open_time','close_time','open','high','low','close','volume']))
def sel_bars(df):
    return df[[c for c in ['open','high','low','close','volume'] if c in df.columns]].astype('float32') if not df.empty else df
mark_k, index_k, prem_k = sel_bars(mk_df), sel_bars(ix_df), sel_bars(pr_df)

# METRICS (ffill to 15min)
mt_raw = load_concat(manifest['metrics'], columns=['create_time','sum_open_interest','sum_open_interest_value','sum_toptrader_long_short_ratio','sum_taker_long_short_vol_ratio'])
mt_df  = to_utc_index(mt_raw, preferred_time_cols=('create_time','time','timestamp'))
mt_15  = mt_df[['sum_open_interest','sum_open_interest_value','sum_toptrader_long_short_ratio','sum_taker_long_short_vol_ratio']].astype('float32').resample(BAR_FREQ).ffill() if not mt_df.empty else pd.DataFrame()

# HEAVY SOURCES — STREAM & CACHE PER FILE
tr_list = manifest['trades']
ag_list = manifest['aggtrades']
bd_list = manifest['bookdepth']

def process_list(kind, files, builder):
    outs = []
    for p in files:
        out = cache_or_build(kind, p, builder)
        if not out.empty:
            outs.append(out)
    if not outs:
        return pd.DataFrame()
    df_all = pd.concat(outs).sort_index()
    return df_all.groupby(df_all.index).sum()

tr_15 = process_list('trades', tr_list, aggregate_trades_file_streaming)
ag_15 = process_list('aggtrades', ag_list, aggregate_aggtrades_file_streaming)
bd_15 = process_list('bookdepth', bd_list, aggregate_bookdepth_file_streaming)

print('Shapes — kl:', kl_df.shape,
      'mark:', getattr(mark_k,'shape',()), 'index:', getattr(index_k,'shape',()), 'prem:', getattr(prem_k,'shape',()))
print('Shapes — metrics15:', mt_15.shape, 'trades15:', tr_15.shape, 'aggtrades15:', ag_15.shape, 'bookdepth15:', bd_15.shape)

## 4) Merge unified 15min base

In [None]:
base = kl_df.copy()

def safe_join(left, right):
    return left.join(right, how='left') if not right.empty else left

if not mark_k.empty:  base = safe_join(base, mark_k[['close']].rename(columns={'close':'mark_close'}))
if not index_k.empty: base = safe_join(base, index_k[['close']].rename(columns={'close':'index_close'}))
if not prem_k.empty:  base = safe_join(base, prem_k[['close']].rename(columns={'close':'premium_close'}))
base = safe_join(base, mt_15)
base = safe_join(base, tr_15)
base = safe_join(base, ag_15)
base = safe_join(base, bd_15)

base = base.sort_index().replace([np.inf,-np.inf], np.nan).fillna(method='ffill').fillna(0)
print('Unified base shape:', base.shape)
base.tail()

## 5) Labels — Triple‑Barrier

In [None]:
N_HORIZON = 16; K_UP = 1.5; K_DN = 1.5; ATR_LEN = 14

def compute_atr(df, atr_len=14):
    high, low, close = df['high'], df['low'], df['close']
    tr = pd.concat([(high-low).abs(), (high-close.shift()).abs(), (low-close.shift()).abs()], axis=1).max(axis=1)
    return tr.ewm(span=atr_len, adjust=False).mean()

def triple_barrier_labels(df, n=16, k_up=1.5, k_dn=1.5, atr_len=14):
    atr = compute_atr(df, atr_len=atr_len)
    price = df['close'].values
    up_m = (k_up * atr / df['close']).fillna(method='bfill').values
    dn_m = (k_dn * atr / df['close']).fillna(method='bfill').values
    y = np.full(len(df), 2, dtype=np.int8)
    hi, lo = df['high'].values, df['low'].values
    L = len(df)
    for i in range(max(0, L-n)):
        p0 = price[i]
        up = p0 * (1 + up_m[i]); dn = p0 * (1 - dn_m[i])
        hp = hi[i+1:i+n+1]; lp = lo[i+1:i+n+1]
        iu = np.where(hp >= up)[0]; idn = np.where(lp <= dn)[0]
        if iu.size and (not idn.size or iu[0] < idn[0]): y[i] = 1
        elif idn.size and (not iu.size or idn[0] < iu[0]): y[i] = 0
        else: y[i] = 2
    labels = pd.Series(y, index=df.index, name='label'); labels.iloc[-n:] = 2
    return labels

labels = triple_barrier_labels(base, n=N_HORIZON, k_up=K_UP, k_dn=K_DN, atr_len=ATR_LEN)
labels.value_counts()

## 6) Features

In [None]:
def engineer_features(df):
    X = pd.DataFrame(index=df.index)
    X['ret_1'] = df['close'].pct_change()
    for lag in [2,4,8,16,32,64]:
        X[f'ret_{lag}'] = df['close'].pct_change(lag)
    atr14 = compute_atr(df, atr_len=14)
    X['atr14p'] = atr14/df['close']
    ema8 = df['close'].ewm(span=8).mean(); ema21 = df['close'].ewm(span=21).mean()
    X['ema_diff'] = ema8 - ema21
    delta = df['close'].diff(); up = delta.clip(lower=0); down = -delta.clip(upper=0)
    rs = up.ewm(span=14).mean() / (down.ewm(span=14).mean() + 1e-9)
    X['rsi14'] = 100 - 100/(1+rs)
    if 'mark_close' in df.columns:  X['mark_spread_p']  = (df['mark_close']  - df['close'])/df['close']
    if 'index_close' in df.columns: X['index_spread_p'] = (df['index_close'] - df['close'])/df['close']
    if 'premium_close' in df.columns: X['premium_chg'] = df['premium_close'].pct_change()
    if 'sum_open_interest' in df.columns: X['oi_chg_p'] = df['sum_open_interest'].pct_change().fillna(0)
    if 'sum_taker_long_short_vol_ratio' in df.columns: X['taker_ls_chg_p'] = df['sum_taker_long_short_vol_ratio'].pct_change().fillna(0)
    if 'ofi' in df.columns: X['ofi'] = df['ofi']
    if 'trades_count' in df.columns: X['trades_count'] = df['trades_count']
    if 'vwap' in df.columns: X['vwap_close_spread'] = (df['vwap'] - df['close'])/df['close']
    if 'bd_notional_sum' in df.columns: X['bd_notional_sum'] = df['bd_notional_sum']
    X = X.replace([np.inf,-np.inf], np.nan).fillna(method='bfill').fillna(0)
    return X.astype('float32')

X = engineer_features(base)
print('X shape:', X.shape)

## 7) Train LightGBM + Isotonic calibration

In [None]:
from sklearn.preprocessing import LabelEncoder
from sklearn.calibration import CalibratedClassifierCV
from sklearn.metrics import brier_score_loss
import lightgbm as lgb

n = len(X); cal_size = max(int(n * 0.15), 500); split_idx = n - cal_size
X_tr, y_tr = X.iloc[:split_idx], labels.iloc[:split_idx]
X_cal, y_cal = X.iloc[split_idx:], labels.iloc[split_idx:]

le = LabelEncoder(); y_tr_enc = le.fit_transform(y_tr.values); y_cal_enc = le.transform(y_cal.values)

clf = lgb.LGBMClassifier(
    objective='multiclass', num_class=3,
    learning_rate=0.03, n_estimators=4000,
    num_leaves=64, subsample=0.8, colsample_bytree=0.8, random_state=42, n_jobs=-1
)
clf.fit(
    X_tr, y_tr_enc,
    eval_set=[(X_cal, y_cal_enc)], eval_metric='multi_logloss',
    callbacks=[lgb.early_stopping(stopping_rounds=200, verbose=False)]
)

cal = CalibratedClassifierCV(clf, method='isotonic', cv='prefit')
cal.fit(X_cal, y_cal_enc)

probs_cal = cal.predict_proba(X_cal)
up_idx = list(le.classes_).index(1) if 1 in le.classes_ else 0
print('Calibration Brier (UP on holdout):', brier_score_loss((y_cal_enc==1).astype(int), probs_cal[:, up_idx]))

## 8) Save artifacts & predict helper

In [None]:
with open(ARTIFACT_DIR/'lgbm_calibrated.pkl', 'wb') as f: pickle.dump(cal, f)
with open(ARTIFACT_DIR/'label_encoder.pkl', 'wb') as f: pickle.dump(le, f)
with open(ARTIFACT_DIR/'feature_columns.json', 'w') as f: f.write(json.dumps(list(X.columns)))
print('Saved artifacts to', ARTIFACT_DIR)

def predict_live(latest_row: pd.Series, calibrated_model, label_encoder, feature_columns):
    Xr = latest_row[feature_columns].values.reshape(1,-1)
    proba = calibrated_model.predict_proba(Xr)[0]
    classes = list(label_encoder.classes_)
    out = {}
    for cname, cid in [('P_down',0), ('P_up',1), ('P_neutral',2)]:
        out[cname] = float(proba[classes.index(cid)]) if cid in classes else np.nan
    return out

print('Latest probabilities:', predict_live(X.iloc[-1], cal, le, X.columns))