# Binance Coin-Margined Futures Data Exploration

This notebook demonstrates how to load gzip-compressed Binance Coin-Margined (CM) futures market data captured by the Rust collector into pandas for exploratory analysis. Each `.gz` file contains newline-delimited records with a nanosecond receive timestamp followed by the JSON payload emitted by the Binance websocket stream. We will parse these files efficiently and build a few exemplar visualizations to jump-start analysis.

## 1. Environment setup

We import the core Python libraries used for data manipulation and visualization.
- **pandas**: tabular data handling
- **numpy**: numerical helpers
- **matplotlib**/**seaborn**: charting utilities
- **pathlib**: convenient filesystem navigation

In [None]:
from pathlib import Path
import gzip
import json

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# Configure plotting aesthetics
sns.set_theme(style='darkgrid', context='talk')
plt.rcParams['figure.figsize'] = (14, 6)

## 2. Locate available market data files

The collector organises files under `/data/binance_cm/<symbol>/<symbol>_YYYYMMDD.gz`. The snippet below walks the directory tree, verifies availability, and previews a few gzip archives per symbol.

In [None]:
DATA_DIR = Path('/data/binance_cm')

if not DATA_DIR.exists():
    raise FileNotFoundError(
        f'Data directory {DATA_DIR} not found. Ensure the path is mounted inside the environment before running the notebook.'
    )

from collections import defaultdict

gz_files = sorted(DATA_DIR.rglob('*.gz'))
print(f'Found {len(gz_files)} gzip file(s) under {DATA_DIR}.')

files_by_symbol: dict[str, list[Path]] = defaultdict(list)
for path in gz_files:
    symbol = path.parent.name.upper()
    files_by_symbol[symbol].append(path)

preview = {symbol: paths[:3] for symbol, paths in files_by_symbol.items()}
for symbol, paths in preview.items():
    print(f"Symbol: {symbol}")
    for path in paths:
        print(f"  - {path.name}")

## 3. Helpers to parse collector output

Collector files store each websocket event as `<recv_timestamp_ns> <json_payload>`. The helpers below stream gzip archives, optionally filter by event type (e.g. `trade`, `depthUpdate`), and assemble a clean, time-indexed `DataFrame`. Numeric fields delivered as strings (Binance trades encode price/size as strings) are converted to floats for convenience.

In [None]:
def parse_collector_line(line: str) -> tuple[int, dict] | None:
    """Parse a single collector line into receive timestamp and payload dict."""

    line = line.strip()
    if not line:
        return None

    try:
        ts_str, payload_json = line.split(' ', 1)
    except ValueError:
        return None

    try:
        timestamp_ns = int(ts_str)
    except ValueError:
        return None

    try:
        payload = json.loads(payload_json)
    except json.JSONDecodeError:
        return None

    if isinstance(payload, dict):
        data = payload.get('data', payload)
        if isinstance(data, dict):
            return timestamp_ns, data

    return None


def count_event_types(symbol: str, limit_messages: int | None = 100_000) -> pd.Series:
    """Return a Series with event-type counts for a symbol (sampled if requested)."""

    symbol_dir = DATA_DIR / symbol.lower()
    if not symbol_dir.exists():
        raise FileNotFoundError(f'Could not find directory for symbol {symbol!r} under {DATA_DIR}.')

    from collections import Counter

    counter: Counter[str] = Counter()
    messages_seen = 0

    for path in sorted(symbol_dir.glob('*.gz')):
        with gzip.open(path, 'rt') as fh:
            for raw_line in fh:
                parsed = parse_collector_line(raw_line)
                if parsed is None:
                    continue
                _, data = parsed
                event_type = str(data.get('e', 'unknown'))
                counter[event_type] += 1
                messages_seen += 1
                if limit_messages is not None and messages_seen >= limit_messages:
                    break
        if limit_messages is not None and messages_seen >= limit_messages:
            break

    if not counter:
        raise ValueError(f'No messages found for symbol {symbol!r}.')

    return pd.Series(counter).sort_values(ascending=False)


def load_symbol_events(
    symbol: str,
    event_type: str | None = 'trade',
    limit_messages: int | None = None,
) -> pd.DataFrame:
    """Load events for a symbol into a pandas DataFrame."""

    symbol_dir = DATA_DIR / symbol.lower()
    if not symbol_dir.exists():
        raise FileNotFoundError(f'Could not find directory for symbol {symbol!r} under {DATA_DIR}.')

    files = sorted(symbol_dir.glob('*.gz'))
    if not files:
        raise FileNotFoundError(f'No gzip files found for symbol {symbol!r}.')

    events: list[dict] = []
    messages_loaded = 0

    for path in files:
        with gzip.open(path, 'rt') as fh:
            for raw_line in fh:
                parsed = parse_collector_line(raw_line)
                if parsed is None:
                    continue
                recv_timestamp_ns, data = parsed
                if event_type is not None and data.get('e') != event_type:
                    continue

                record = dict(data)
                record['recv_timestamp_ns'] = recv_timestamp_ns
                events.append(record)

                messages_loaded += 1
                if limit_messages is not None and messages_loaded >= limit_messages:
                    break
        if limit_messages is not None and messages_loaded >= limit_messages:
            break

    if not events:
        raise ValueError(
            f'No events matched symbol={symbol!r} event_type={event_type!r} with the provided limits.'
        )

    df = pd.DataFrame(events)

    recv_timestamp_ns = df.pop('recv_timestamp_ns')
    df.insert(0, 'recv_timestamp_ns', recv_timestamp_ns)

    timestamp = pd.to_datetime(recv_timestamp_ns, unit='ns', utc=True)
    df.index = pd.DatetimeIndex(timestamp, name='timestamp')

    if 'E' in df.columns:
        df.insert(1, 'event_time', pd.to_datetime(df['E'], unit='ms', utc=True, errors='coerce'))

    for column in df.select_dtypes(include='object'):
        df[column] = pd.to_numeric(df[column], errors='ignore')

    if {'p', 'q'}.issubset(df.columns):
        df['price'] = pd.to_numeric(df['p'], errors='coerce')
        df['quantity'] = pd.to_numeric(df['q'], errors='coerce')
        df['notional'] = df['price'] * df['quantity']

    return df.sort_index()

## 4. Load a sample symbol

Update `sample_symbol` to any symbol present in your dataset. We inspect the mix of event types captured (trades, depth updates, etc.) and then load trade prints into a tidy DataFrame.

In [None]:
sample_symbol = next(iter(files_by_symbol)) if files_by_symbol else None
if sample_symbol is None:
    raise RuntimeError('No symbols detected. Ensure the dataset directory contains gzip files.')

print(f'Using sample symbol: {sample_symbol}')

event_counts = count_event_types(sample_symbol, limit_messages=200_000)
event_counts

cm_trades = load_symbol_events(sample_symbol, event_type='trade', limit_messages=250_000)
cm_trades.head()

### Summary statistics

In [None]:
cm_trades.describe().T

## 5. Resample to higher timeframes

High-frequency data can be noisy. We aggregate to 1-minute candles (open, high, low, close) and traded volume to facilitate plotting.

In [None]:
resampled = cm_trades[['price', 'quantity']].dropna()
resampled = resampled.resample('1min').agg({
    'price': ['first', 'max', 'min', 'last'],
    'quantity': 'sum'
}).dropna()

# Flatten column MultiIndex
resampled.columns = ['open', 'high', 'low', 'close', 'volume']
resampled.head()

## 6. Visualizations

We plot both price action and traded volume over time. Adjust the resampling window or columns to suit your analysis needs.

In [None]:
fig, axes = plt.subplots(2, 1, sharex=True, figsize=(16, 10))

resampled['close'].plot(ax=axes[0], color='dodgerblue')
axes[0].set_title(f'{sample_symbol} Close Price (1-minute trades)')
axes[0].set_ylabel('Price')

resampled['volume'].plot(ax=axes[1], color='salmon')
axes[1].set_title(f'{sample_symbol} Aggregated Quantity (1-minute)')
axes[1].set_ylabel('Contracts')
axes[1].set_xlabel('Timestamp (UTC)')

plt.tight_layout()

### Rolling volatility

Rolling volatility (standard deviation of returns) is a useful diagnostic for regime changes.
Feel free to adjust the window size for your strategy horizon.

In [None]:
returns = resampled['close'].pct_change()
rolling_vol = returns.rolling(window=30, min_periods=15).std() * np.sqrt(30)

ax = rolling_vol.plot(color='mediumseagreen')
ax.set_title(f'{sample_symbol} Rolling Volatility (30-minute window)')
ax.set_ylabel('Volatility (σ)')
ax.set_xlabel('Timestamp (UTC)')
plt.tight_layout()

## 7. Next steps

- Compare multiple symbols by repeating the workflow for each directory.
- Join with funding-rate or open-interest data to understand market structure.
- Export aggregated DataFrames with `DataFrame.to_parquet()` for downstream backtesting pipelines.

This notebook can serve as a starting point for deeper feature engineering and signal research on Binance Coin-Margined futures markets.