# 📊 Liquidation Data Analysis
## Multi-Exchange Liquidation Tracking (Binance + Bybit, BTCUSDT)

This notebook provides real-time and historical analysis of liquidation data across:
- **Level 1:** In-Memory (last 60s)
- **Level 2:** Redis (last 1h, aggregated)
- **Level 3:** TimescaleDB (90 days, institutional events)
- **Level 4:** TimescaleDB Continuous Aggregates (pre-computed rollups)

In [None]:
# Import dependencies
import asyncio
import asyncpg
import redis
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import plotly.graph_objects as go
import plotly.express as px
from datetime import datetime, timedelta
import json

# Configure plotting
%matplotlib inline
plt.style.use('seaborn-v0_8-darkgrid')

print("✅ Dependencies imported")

## 🔌 Connect to Data Sources

In [None]:
# Connect to Redis (Level 2)
redis_client = redis.Redis(host='localhost', port=6379, db=1, decode_responses=False)
print(f"✅ Redis connected: {redis_client.ping()}")

# Connect to TimescaleDB (Level 3/4)
async def get_db_connection():
    return await asyncpg.connect(
        host='localhost',
        port=5432,
        database='liquidations',
        user='screener-m3'
    )

# Test connection
conn = await get_db_connection()
version = await conn.fetchval('SELECT version()')
print(f"✅ Database connected: {version.split(',')[0]}")
await conn.close()

## 📈 Level 2: Redis Analysis - Price Level Clusters

In [None]:
# Get all price level clusters from Redis
def get_redis_price_levels(symbol='BTCUSDT'):
    """Get price level clusters from Redis"""
    pattern = f"liq:levels:{symbol}:*"
    clusters = []
    
    for key in redis_client.scan_iter(match=pattern):
        data = redis_client.hgetall(key)
        if not data:
            continue
        
        # Parse key: liq:levels:BTCUSDT:67200:LONG
        parts = key.decode().split(':')
        price_level = float(parts[2])
        side = parts[3]
        
        clusters.append({
            'price_level': price_level,
            'side': side,
            'count': int(data.get(b'count', 0)),
            'total_value': float(data.get(b'total_value', 0)),
            'total_quantity': float(data.get(b'total_quantity', 0)),
            'first_seen': int(data.get(b'first_seen', 0)),
            'last_seen': int(data.get(b'last_seen', 0))
        })
    
    return pd.DataFrame(clusters)

# Get price levels
price_levels = get_redis_price_levels('BTCUSDT')
print(f"Found {len(price_levels)} price level clusters in Redis")
price_levels.head(10)

In [None]:
# Visualize price level heatmap
if not price_levels.empty:
    # Filter to top 20 levels by count
    top_levels = price_levels.nlargest(20, 'count')
    
    # Create heatmap
    fig = go.Figure()
    
    for side in ['LONG', 'SHORT']:
        side_data = top_levels[top_levels['side'] == side]
        
        fig.add_trace(go.Bar(
            x=side_data['price_level'],
            y=side_data['count'],
            name=f'{side} Liquidations',
            marker_color='red' if side == 'LONG' else 'green',
            text=side_data['total_value'].apply(lambda x: f'${x/1e6:.2f}M'),
            textposition='auto'
        ))
    
    fig.update_layout(
        title='BTCUSDT Liquidation Heatmap - Top 20 Price Levels (Last 1h)',
        xaxis_title='Price Level',
        yaxis_title='Number of Liquidations',
        barmode='group',
        height=600
    )
    
    fig.show()
else:
    print("No price level data available yet. Start the aggregator and wait for data.")

## 💾 Level 3: TimescaleDB Analysis - Significant Liquidations

In [None]:
# Get recent significant liquidations from TimescaleDB
async def get_significant_liquidations(hours=24, min_value=100_000):
    """Get significant liquidations from last N hours"""
    conn = await get_db_connection()
    
    query = """
    SELECT 
        time,
        exchange,
        symbol,
        side,
        price,
        quantity,
        value_usd,
        is_cascade,
        risk_score
    FROM liquidations_significant
    WHERE time >= NOW() - INTERVAL '$1 hours'
      AND value_usd >= $2
    ORDER BY time DESC
    LIMIT 1000
    """
    
    rows = await conn.fetch(query, hours, min_value)
    await conn.close()
    
    return pd.DataFrame([
        {
            'time': row['time'],
            'exchange': row['exchange'],
            'symbol': row['symbol'],
            'side': row['side'],
            'price': float(row['price']),
            'quantity': float(row['quantity']),
            'value_usd': float(row['value_usd']),
            'is_cascade': row['is_cascade'],
            'risk_score': float(row['risk_score']) if row['risk_score'] else None
        }
        for row in rows
    ])

# Get data
significant_liqs = await get_significant_liquidations(hours=24)
print(f"Found {len(significant_liqs)} significant liquidations (>$100K) in last 24h")
significant_liqs.head()

In [None]:
# Summary statistics
if not significant_liqs.empty:
    print("📊 SUMMARY STATISTICS (Last 24h, >$100K liquidations)")
    print("="*60)
    print(f"Total Events: {len(significant_liqs)}")
    print(f"Total Volume: ${significant_liqs['value_usd'].sum():,.0f}")
    print(f"Average Size: ${significant_liqs['value_usd'].mean():,.0f}")
    print(f"Largest: ${significant_liqs['value_usd'].max():,.0f}")
    print(f"")
    print("Exchange Breakdown:")
    print(significant_liqs.groupby('exchange')['value_usd'].agg(['count', 'sum']).sort_values('sum', ascending=False))
    print(f"")
    print("Side Breakdown:")
    print(significant_liqs.groupby('side')['value_usd'].agg(['count', 'sum']))
    print(f"")
    print(f"Cascades Detected: {significant_liqs['is_cascade'].sum()}")
    print(f"Average Risk Score (cascades): {significant_liqs[significant_liqs['is_cascade']]['risk_score'].mean():.3f}")
else:
    print("No data yet. Start the aggregator and wait for liquidation events.")

In [None]:
# Time series plot
if not significant_liqs.empty:
    significant_liqs['hour'] = significant_liqs['time'].dt.floor('1H')
    hourly = significant_liqs.groupby(['hour', 'exchange'])['value_usd'].sum().reset_index()
    
    fig = px.line(
        hourly, 
        x='hour', 
        y='value_usd', 
        color='exchange',
        title='Hourly Liquidation Volume by Exchange (Last 24h)',
        labels={'value_usd': 'Total Volume (USD)', 'hour': 'Time'}
    )
    fig.show()

## 🔥 Cascade Analysis

In [None]:
# Get cascade events
async def get_cascades(hours=24):
    """Get cascade events from continuous aggregate"""
    conn = await get_db_connection()
    
    query = """
    SELECT 
        cascade_id::text,
        symbol,
        cascade_start,
        cascade_end,
        duration_seconds,
        event_count,
        total_value_usd,
        avg_risk_score,
        max_risk_score,
        exchanges_involved,
        exchange_count,
        long_count,
        short_count
    FROM liquidation_cascades
    WHERE hour >= NOW() - INTERVAL '$1 hours'
    ORDER BY cascade_start DESC
    LIMIT 100
    """
    
    rows = await conn.fetch(query, hours)
    await conn.close()
    
    return pd.DataFrame([dict(row) for row in rows])

# Get cascades
cascades = await get_cascades(hours=24)
print(f"Found {len(cascades)} cascades in last 24h")
if not cascades.empty:
    cascades.head()

In [None]:
# Cascade summary
if not cascades.empty:
    print("🚨 CASCADE SUMMARY (Last 24h)")
    print("="*60)
    print(f"Total Cascades: {len(cascades)}")
    print(f"Cross-Exchange Cascades: {(cascades['exchange_count'] > 1).sum()}")
    print(f"Total Cascade Volume: ${cascades['total_value_usd'].sum():,.0f}")
    print(f"Average Cascade Size: ${cascades['total_value_usd'].mean():,.0f}")
    print(f"Average Events per Cascade: {cascades['event_count'].mean():.1f}")
    print(f"Average Duration: {cascades['duration_seconds'].mean():.1f}s")
    print(f"Average Risk Score: {cascades['avg_risk_score'].mean():.3f}")
    print(f"Max Risk Score: {cascades['max_risk_score'].max():.3f}")
    
    # Top 5 cascades
    print(f"\nTop 5 Largest Cascades:")
    print(cascades.nlargest(5, 'total_value_usd')[['symbol', 'event_count', 'total_value_usd', 'max_risk_score', 'exchange_count']])

## 🎯 Real-Time Monitoring - Check Current Status

In [None]:
# Check cascade status in Redis
def check_cascade_status(symbol='BTCUSDT'):
    """Check if cascade is currently active"""
    key = f"liq:cascade:status:{symbol}"
    data = redis_client.hgetall(key)
    
    if not data:
        return None
    
    return {k.decode(): v.decode() for k, v in data.items()}

status = check_cascade_status('BTCUSDT')
if status:
    print("🚨 ACTIVE CASCADE DETECTED!")
    print(f"Cascade ID: {status.get('cascade_id')}")
    print(f"Event Count: {status.get('event_count')}")
    print(f"Total Value: ${float(status.get('total_value', 0)):,.0f}")
    print(f"Risk Score: {status.get('risk_score')}")
    print(f"Exchanges: {status.get('exchanges')}")
else:
    print("✅ No active cascade at the moment")

## 📊 Exchange Comparison

In [None]:
# Get exchange comparison from continuous aggregate
async def get_exchange_comparison(hours=24):
    conn = await get_db_connection()
    
    query = """
    SELECT 
        period,
        symbol,
        exchange,
        event_count,
        total_value_usd,
        avg_value_usd,
        cascade_count,
        long_count,
        short_count
    FROM liquidation_exchange_comparison
    WHERE period >= NOW() - INTERVAL '$1 hours'
    ORDER BY period DESC
    """
    
    rows = await conn.fetch(query, hours)
    await conn.close()
    
    return pd.DataFrame([dict(row) for row in rows])

# Get comparison
comparison = await get_exchange_comparison(hours=24)
if not comparison.empty:
    print(f"Exchange comparison data: {len(comparison)} periods")
    
    # Aggregate by exchange
    exchange_totals = comparison.groupby('exchange').agg({
        'event_count': 'sum',
        'total_value_usd': 'sum',
        'avg_value_usd': 'mean',
        'cascade_count': 'sum'
    }).round(2)
    
    print(exchange_totals)
else:
    print("No comparison data yet")

## 🔄 Refresh this notebook periodically to see updated data

Run all cells again to get the latest liquidation data and analysis.