# Neutron: Advanced Market Data Pipeline

Welcome to the **Neutron** tutorial. This notebook demonstrates how to build a production-grade market data pipeline capable of downloading, storing, and analyzing massive datasets from crypto exchanges.

## Key Capabilities
1.  **Multi-Exchange Support**: Architecture designed for Binance, Bitstamp, and more.
2.  **Diverse Data Types**: 
    *   **OHLCV** (Candlesticks)
    *   **Trades** (Tick-level data)
    *   **Aggregated Trades** (Compressed ticks)
    *   **Order Book Tickers** (Best Bid/Ask snapshots)
    *   **Metrics** (Open Interest, Long/Short Ratios)
    *   **Advanced Klines**: Mark Price, Index Price, Premium Index
    *   **Order Book Depth**: Level 2 snapshots
3.  **Smart Parallelization**: 
    *   **Stability First**: OHLCV data is downloaded with per-exchange parallelism but sequential symbol processing to ensure zero gaps.
    *   **Speed Second**: High-frequency data (Trades, Tickers) is downloaded with massive parallelism using a configurable worker pool.
4.  **Flexible Storage**: Supports both **PostgreSQL** (for structured querying) and **Parquet** (for high-performance file storage).

---

In [None]:
import sys
import os
import logging
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta

# Add src to path so we can import neutron
sys.path.append(os.path.abspath('../src'))

from neutron.core.downloader import Downloader
from neutron.core.crawler import DataCrawler
from neutron.core.config import NeutronConfig, StorageConfig, TaskConfig

# Setup logging to see what's happening under the hood
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
sns.set_theme(style="darkgrid")

## 1. Configuration: The Blueprint

We will configure a comprehensive download task for **BTC/USDT** on both **Spot** and **Futures (Swap)** markets.
We'll fetch a small sample (**2 minutes**) for high-frequency data to keep this demo quick and manageable.

In [None]:
# Define Configuration
config = NeutronConfig(
    storage=StorageConfig(type='database'), 
    data_state_path='data_state.json',
    exchange_state_path='exchange_state.json',
    max_workers=8, # High parallelism for multiple data types
    
    tasks=[
        # --- SPOT DATA ---
        # OHLCV (1 Day - Foundation)
        TaskConfig(
            type='backfill_ohlcv',
            params={'timeframe': '1h', 'start_date': '2024-11-15T00:00:00', 'end_date': '2024-11-16T00:00:00', 'rewrite': True},
            exchanges={'binance': {'spot': {'symbols': ['BTC/USDT']}}}
        ),
        # Aggregated Trades (2 Minutes)
        TaskConfig(
            type='backfill_agg_trades',
            params={'start_date': '2024-11-15T00:00:00', 'end_date': '2024-11-15T00:02:00', 'rewrite': True},
            exchanges={'binance': {'spot': {'symbols': ['BTC/USDT']}}}
        ),
        
        # --- FUTURES DATA (UM) ---
        # Mark Price Klines (1 Day)
        TaskConfig(
            type='backfill_mark_price_klines',
            params={'timeframe': '1h', 'start_date': '2024-11-15T00:00:00', 'end_date': '2024-11-16T00:00:00', 'rewrite': True},
            exchanges={'binance': {'swap': {'symbols': ['BTC/USDT']}}}
        ),
        # Index Price Klines (1 Day)
        TaskConfig(
            type='backfill_index_price_klines',
            params={'timeframe': '1h', 'start_date': '2024-11-15T00:00:00', 'end_date': '2024-11-16T00:00:00', 'rewrite': True},
            exchanges={'binance': {'swap': {'symbols': ['BTC/USDT']}}}
        ),
        # Premium Index Klines (1 Day)
        TaskConfig(
            type='backfill_premium_index_klines',
            params={'timeframe': '1h', 'start_date': '2024-11-15T00:00:00', 'end_date': '2024-11-16T00:00:00', 'rewrite': True},
            exchanges={'binance': {'swap': {'symbols': ['BTC/USDT']}}}
        ),
        # Metrics (Open Interest) (2 Minutes)
        TaskConfig(
            type='backfill_metrics',
            params={'start_date': '2024-11-15T00:00:00', 'end_date': '2024-11-15T00:02:00', 'rewrite': True},
            exchanges={'binance': {'swap': {'symbols': ['BTC/USDT']}}}
        )
    ]
)

## 2. Execution
Run the downloader. Watch the logs to see parallel execution in action.

In [None]:
downloader = Downloader(config=config, log_file='neutron_demo.log')
print("ðŸš€ Starting Comprehensive Download...")
downloader.run()
print("âœ… Download Complete!")

## 3. Analysis
Let's retrieve and visualize the different data types.

In [None]:
crawler = DataCrawler(storage_type='database')

### A. Price Analysis (Spot vs Mark vs Index)
Compare the Spot price against Futures Mark and Index prices.

In [None]:
df_spot = crawler.get_ohlcv('binance', 'BTC/USDT', '1h', '2024-11-15', '2024-11-16', instrument_type='spot')
df_mark = crawler.get_mark_price_klines('binance', 'BTC/USDT', '1h', '2024-11-15', '2024-11-16', instrument_type='swap')
df_index = crawler.get_index_price_klines('binance', 'BTC/USDT', '1h', '2024-11-15', '2024-11-16', instrument_type='swap')

if not df_spot.empty and not df_mark.empty:
    plt.figure(figsize=(12, 6))
    plt.plot(df_spot['time'], df_spot['close'], label='Spot Close', alpha=0.7)
    plt.plot(df_mark['time'], df_mark['close'], label='Mark Price', linestyle='--')
    if not df_index.empty:
        plt.plot(df_index['time'], df_index['close'], label='Index Price', linestyle=':')
        
    plt.title('BTC/USDT: Spot vs Mark vs Index Price')
    plt.legend()
    plt.show()

### B. Premium Index
Analyze the premium index.

In [None]:
df_premium = crawler.get_premium_index_klines('binance', 'BTC/USDT', '1h', '2024-11-15', '2024-11-16', instrument_type='swap')

if not df_premium.empty:
    plt.figure(figsize=(12, 4))
    plt.plot(df_premium['time'], df_premium['close'], color='magenta', label='Premium Index')
    plt.title('BTC/USDT Premium Index')
    plt.legend()
    plt.show()

### C. Open Interest
Market health indicators.

In [None]:
df_oi = crawler.get_data('metrics', 'binance', 'BTC/USDT', '2024-11-15', '2024-11-16', instrument_type='swap')

if not df_oi.empty:
    plt.figure(figsize=(12, 4))
    plt.plot(df_oi['time'], df_oi['value'], color='cyan')
    plt.title('Open Interest (BTC/USDT)')
    plt.show()

### D. Aggregated Trades (2 Minutes)
Visualizing the 2-minute sample.

In [None]:
df_agg = crawler.get_data(
    data_type='aggTrades', exchange='binance', symbol='BTC/USDT',
    start_date='2024-11-15T00:00:00', end_date='2024-11-15T00:02:00'
)

if not df_agg.empty:
    print(f"Retrieved {len(df_agg)} aggregated trades in 2 minutes.")
    # Calculate Buy/Sell Volume
    df_agg['volume_usd'] = df_agg['price'] * df_agg['qty']
    buy_vol = df_agg[df_agg['is_buyer_maker'] == False]['volume_usd'].sum()
    sell_vol = df_agg[df_agg['is_buyer_maker'] == True]['volume_usd'].sum()

    plt.figure(figsize=(8, 5))
    plt.bar(['Buy Volume', 'Sell Volume'], [buy_vol, sell_vol], color=['green', 'red'])
    plt.title(f'Buy vs Sell Volume (2 Mins) - Total: ${buy_vol+sell_vol:,.0f}')
    plt.ylabel('Volume (USD)')
    plt.show()
    display(df_agg.head())