# Full Limit Order Book Snapshots at One-Minute Intervals

This notebook demonstrates how to capture complete limit order book depth at regular intervals from ITCH 5.0 data.

Full LOB snapshots capture multiple price levels on both bid and ask sides, providing:
- Complete market depth information
- Order book imbalance metrics
- Liquidity distribution across price levels
- Data for sophisticated trading algorithms

In [None]:
from pathlib import Path
import pandas as pd
import time
from meatpy.itch50 import ITCH50MessageReader, ITCH50MarketProcessor

# Define paths and parameters
data_dir = Path("data")
file_path = data_dir / "S081321-v50.txt.gz"

# Symbols to track
symbols = ["AAPL", "SPY"]

# Snapshot parameters
interval_minutes = 1
max_depth = 5  # Number of price levels to capture on each side

print(f"Input file: {file_path}")
print(f"Target symbols: {symbols}")
print(f"Snapshot interval: {interval_minutes} minute(s)")
print(f"Maximum depth: {max_depth} levels per side")

# Check if file exists
if not file_path.exists():
    print(f"❌ Input file not found: {file_path}")
    print(
        "Please place an ITCH 5.0 file (e.g., S081321-v50.txt.gz) in the data/ directory"
    )
else:
    print(f"✅ Input file found: {file_path}")
    print(f"File size: {file_path.stat().st_size / (1024**3):.2f} GB")

In [None]:
def capture_lob_snapshot(lob, symbol, timestamp, max_depth):
    """Capture a complete snapshot of the limit order book.

    Args:
        lob: LimitOrderBook instance
        symbol: Symbol string
        timestamp: Current timestamp
        max_depth: Maximum number of levels to capture

    Returns:
        Dictionary containing the snapshot data or None if no valid book
    """
    if not lob.bids or not lob.asks:
        return None

    snapshot = {
        "timestamp": timestamp,
        "symbol": symbol,
    }

    # Capture bid side (sorted by price descending - best bids first)
    bid_levels = sorted(lob.bids.items(), key=lambda x: x[0], reverse=True)[:max_depth]
    for i, (price, level) in enumerate(bid_levels):
        snapshot[f"bid_price_{i + 1}"] = price
        snapshot[f"bid_size_{i + 1}"] = level.size
        snapshot[f"bid_count_{i + 1}"] = level.count

    # Fill in missing bid levels with None
    for i in range(len(bid_levels), max_depth):
        snapshot[f"bid_price_{i + 1}"] = None
        snapshot[f"bid_size_{i + 1}"] = None
        snapshot[f"bid_count_{i + 1}"] = None

    # Capture ask side (sorted by price ascending - best asks first)
    ask_levels = sorted(lob.asks.items(), key=lambda x: x[0])[:max_depth]
    for i, (price, level) in enumerate(ask_levels):
        snapshot[f"ask_price_{i + 1}"] = price
        snapshot[f"ask_size_{i + 1}"] = level.size
        snapshot[f"ask_count_{i + 1}"] = level.count

    # Fill in missing ask levels with None
    for i in range(len(ask_levels), max_depth):
        snapshot[f"ask_price_{i + 1}"] = None
        snapshot[f"ask_size_{i + 1}"] = None
        snapshot[f"ask_count_{i + 1}"] = None

    # Add summary statistics
    if bid_levels and ask_levels:
        best_bid = bid_levels[0][0]
        best_ask = ask_levels[0][0]
        snapshot["spread"] = best_ask - best_bid
        snapshot["mid_price"] = (best_bid + best_ask) / 2
        snapshot["total_bid_volume"] = sum(level.size for _, level in bid_levels)
        snapshot["total_ask_volume"] = sum(level.size for _, level in ask_levels)
        snapshot["volume_imbalance"] = (
            snapshot["total_bid_volume"] - snapshot["total_ask_volume"]
        ) / (snapshot["total_bid_volume"] + snapshot["total_ask_volume"])
        snapshot["total_bid_orders"] = sum(level.count for _, level in bid_levels)
        snapshot["total_ask_orders"] = sum(level.count for _, level in ask_levels)

    return snapshot

In [None]:
def extract_lob_snapshots(file_path, symbols, interval_minutes=1, max_depth=5):
    """Extract full limit order book snapshots at regular intervals.

    Args:
        file_path: Path to ITCH file
        symbols: List of symbols to track
        interval_minutes: Interval between snapshots in minutes
        max_depth: Maximum number of price levels to capture

    Returns:
        List of snapshot dictionaries
    """
    processor = ITCH50MarketProcessor()
    snapshots = []

    # Convert interval to nanoseconds
    interval_ns = interval_minutes * 60 * 1_000_000_000
    next_snapshot_time = None

    message_count = 0
    start_time = time.time()

    print(
        f"🚀 Processing file to extract {interval_minutes}-minute LOB snapshots (depth: {max_depth})...\n"
    )

    with ITCH50MessageReader(file_path) as reader:
        for message in reader:
            message_count += 1
            processor.process_message(message)

            # Initialize snapshot time
            if next_snapshot_time is None and hasattr(message, "timestamp"):
                next_snapshot_time = message.timestamp + interval_ns
                print(f"📅 First timestamp: {message.timestamp} ns")
                print(f"⏰ Next snapshot at: {next_snapshot_time} ns\n")

            # Check if it's time for a snapshot
            if (
                hasattr(message, "timestamp")
                and next_snapshot_time is not None
                and message.timestamp >= next_snapshot_time
            ):
                # Take snapshot for each symbol
                for symbol in symbols:
                    lob = processor.get_lob(symbol)
                    if lob:
                        snapshot = capture_lob_snapshot(
                            lob, symbol, message.timestamp, max_depth
                        )
                        if snapshot:
                            snapshots.append(snapshot)

                # Set next snapshot time
                next_snapshot_time += interval_ns

                # Progress update
                if len(snapshots) % 20 == 0:
                    elapsed = time.time() - start_time
                    print(
                        f"📸 Captured {len(snapshots)} LOB snapshots after {elapsed:.1f}s"
                    )

            # General progress indicator
            if message_count % 5_000_000 == 0:
                elapsed = time.time() - start_time
                rate = message_count / elapsed if elapsed > 0 else 0
                print(f"   Processed {message_count:,} messages ({rate:,.0f} msg/s)")

    elapsed = time.time() - start_time
    print(f"\n✅ Processing complete in {elapsed:.1f} seconds")
    print(f"   Total messages processed: {message_count:,}")
    print(f"   LOB snapshots captured: {len(snapshots)}")

    return snapshots

In [None]:
# Extract LOB snapshots
if file_path.exists():
    snapshots = extract_lob_snapshots(file_path, symbols, interval_minutes, max_depth)

    if snapshots:
        # Convert to DataFrame
        df = pd.DataFrame(snapshots)

        print("\n📊 LOB Snapshot Summary:")
        print(f"   Total snapshots: {len(df)}")
        print(f"   Symbols covered: {sorted(df['symbol'].unique())}")
        print(f"   Depth levels: {max_depth} per side")
        print(f"   Columns captured: {len(df.columns)}")

        # Show column structure
        print("\n📋 Data structure:")
        bid_cols = [c for c in df.columns if c.startswith("bid_")]
        ask_cols = [c for c in df.columns if c.startswith("ask_")]
        summary_cols = [
            c
            for c in df.columns
            if c not in bid_cols + ask_cols + ["timestamp", "symbol"]
        ]

        print(
            f"   Bid columns ({len(bid_cols)}): {bid_cols[:6]}{'...' if len(bid_cols) > 6 else ''}"
        )
        print(
            f"   Ask columns ({len(ask_cols)}): {ask_cols[:6]}{'...' if len(ask_cols) > 6 else ''}"
        )
        print(f"   Summary columns: {summary_cols}")

        # Show sample data - first snapshot with good data
        print("\n🔍 Sample snapshot data (first complete snapshot):")
        pd.set_option("display.precision", 4)
        pd.set_option("display.max_columns", None)
        pd.set_option("display.width", None)

        # Find first complete snapshot (no None values in top level)
        complete_snapshots = df[
            (df["bid_price_1"].notna()) & (df["ask_price_1"].notna())
        ]
        if not complete_snapshots.empty:
            sample = complete_snapshots.iloc[0]
            print(f"\n   Symbol: {sample['symbol']}")
            print(f"   Timestamp: {sample['timestamp']}")
            print("\n   📈 Bid Side (best to worst):")
            for i in range(1, max_depth + 1):
                price = sample[f"bid_price_{i}"]
                size = sample[f"bid_size_{i}"]
                count = sample[f"bid_count_{i}"]
                if pd.notna(price):
                    print(
                        f"      Level {i}: ${price:.4f} | {size:,} shares | {count} orders"
                    )

            print("\n   📉 Ask Side (best to worst):")
            for i in range(1, max_depth + 1):
                price = sample[f"ask_price_{i}"]
                size = sample[f"ask_size_{i}"]
                count = sample[f"ask_count_{i}"]
                if pd.notna(price):
                    print(
                        f"      Level {i}: ${price:.4f} | {size:,} shares | {count} orders"
                    )

            print("\n   📊 Summary Stats:")
            print(f"      Spread: ${sample['spread']:.4f}")
            print(f"      Mid Price: ${sample['mid_price']:.4f}")
            print(f"      Total Bid Volume: {sample['total_bid_volume']:,} shares")
            print(f"      Total Ask Volume: {sample['total_ask_volume']:,} shares")
            print(f"      Volume Imbalance: {sample['volume_imbalance']:.4f}")

    else:
        print("❌ No LOB snapshots were captured")

else:
    print("⚠️  Cannot run LOB snapshot extraction without input file")

In [None]:
# Save snapshots and analyze imbalance patterns
if "df" in locals() and not df.empty:
    # Save to CSV
    output_csv = data_dir / f"lob_snapshots_{max_depth}levels_{interval_minutes}min.csv"
    df.to_csv(output_csv, index=False)

    print(f"\n💾 LOB snapshots saved to: {output_csv}")
    print(f"   File size: {output_csv.stat().st_size / 1024:.1f} KB")

    # Analyze volume imbalance patterns
    print("\n📈 Volume Imbalance Analysis:")
    imbalance_stats = (
        df.groupby("symbol")["volume_imbalance"]
        .agg(["count", "mean", "std", "min", "max"])
        .round(4)
    )
    display(imbalance_stats)

    # Spread analysis
    print("\n💰 Spread Analysis:")
    spread_stats = (
        df.groupby("symbol")["spread"]
        .agg(["count", "mean", "std", "min", "max"])
        .round(6)
    )
    display(spread_stats)

    # Show data completeness
    print("\n✅ Data Completeness:")
    for symbol in df["symbol"].unique():
        symbol_data = df[df["symbol"] == symbol]
        complete_snapshots = symbol_data[
            symbol_data["bid_price_1"].notna() & symbol_data["ask_price_1"].notna()
        ]
        completeness = len(complete_snapshots) / len(symbol_data) * 100
        print(
            f"   {symbol}: {len(complete_snapshots)}/{len(symbol_data)} snapshots complete ({completeness:.1f}%)"
        )

## Understanding the Data Structure

Each LOB snapshot contains:

### Price Level Data
- `bid_price_1` to `bid_price_N`: Bid prices from best (highest) to worst
- `bid_size_1` to `bid_size_N`: Total shares available at each bid level
- `bid_count_1` to `bid_count_N`: Number of orders at each bid level
- `ask_price_1` to `ask_price_N`: Ask prices from best (lowest) to worst
- `ask_size_1` to `ask_size_N`: Total shares available at each ask level
- `ask_count_1` to `ask_count_N`: Number of orders at each ask level

### Summary Statistics
- `spread`: Best ask - best bid
- `mid_price`: (Best bid + best ask) / 2
- `total_bid_volume`: Sum of all captured bid volumes
- `total_ask_volume`: Sum of all captured ask volumes
- `volume_imbalance`: (Bid volume - Ask volume) / (Bid volume + Ask volume)
- `total_bid_orders`: Sum of all bid order counts
- `total_ask_orders`: Sum of all ask order counts

## Advanced Analysis Examples

### Order Book Imbalance
```python
# Analyze how volume imbalance relates to future price movements
df['next_mid_price'] = df.groupby('symbol')['mid_price'].shift(-1)
df['price_change'] = df['next_mid_price'] - df['mid_price']

# Correlation between imbalance and next price change
correlation = df[['volume_imbalance', 'price_change']].corr()
```

### Liquidity Analysis
```python
# Calculate average order size at each level
for level in range(1, max_depth + 1):
    df[f'avg_bid_order_size_{level}'] = df[f'bid_size_{level}'] / df[f'bid_count_{level}']
    df[f'avg_ask_order_size_{level}'] = df[f'ask_size_{level}'] / df[f'ask_count_{level}']
```

### Market Depth Visualization
```python
import matplotlib.pyplot as plt

# Plot order book shape for a specific snapshot
snapshot = df.iloc[0]
bid_prices = [snapshot[f'bid_price_{i}'] for i in range(1, max_depth+1) if pd.notna(snapshot[f'bid_price_{i}'])]
bid_sizes = [snapshot[f'bid_size_{i}'] for i in range(1, max_depth+1) if pd.notna(snapshot[f'bid_size_{i}'])]
ask_prices = [snapshot[f'ask_price_{i}'] for i in range(1, max_depth+1) if pd.notna(snapshot[f'ask_price_{i}'])]
ask_sizes = [snapshot[f'ask_size_{i}'] for i in range(1, max_depth+1) if pd.notna(snapshot[f'ask_size_{i}'])]

plt.barh(bid_prices, bid_sizes, alpha=0.7, color='green', label='Bids')
plt.barh(ask_prices, [-size for size in ask_sizes], alpha=0.7, color='red', label='Asks')
plt.xlabel('Volume')
plt.ylabel('Price')
plt.title('Order Book Snapshot')
plt.legend()
```

## Key Applications

- **Market Making**: Understanding order book depth for liquidity provision
- **Execution Algorithms**: Optimizing trade execution based on market depth
- **Risk Management**: Monitoring market liquidity and depth changes
- **Academic Research**: Studying market microstructure and price formation
- **Machine Learning**: Features for price prediction models