# Trader Analysis Pipeline (Database)

Build the trader profiling pipeline connected to TimescaleDB.

## Setup
1. Run `just sample-data` to pull sample fills from cloud DB
2. Run `just db-local-up` to start local TimescaleDB
3. Run `just load-sample-local` to load sample into local DB
4. Run this notebook

## Tiers
1. **Tier 1**: Direct aggregations (volume, PnL, fees, maker%)
2. **Tier 2**: Position reconstruction (holding periods, win rate per trade)
3. **Tier 3**: Performance metrics (Sharpe, ROI, MTM/TV)
4. **Tier 4**: Classification (HFT, Smart Directional, Basis, Retail)

In [None]:
import sys
sys.path.insert(0, '../src')

import polars as pl
import numpy as np
from vigil.db import execute_query, get_db_connection

# Test connection
conn = get_db_connection()
result = execute_query("SELECT COUNT(*) as cnt FROM fills", conn)
print(f"Connected! Fills in database: {result['cnt'][0]:,}")
conn.close()

In [None]:
# Load sample of fills for analysis
conn = get_db_connection()

fills = execute_query("""
    SELECT
        time, user_address, coin, px, sz, side, dir,
        start_position, closed_pnl, fee, crossed,
        twap_id, builder, liquidation::TEXT as liquidation
    FROM fills
    LIMIT 500000
""", conn)

print(f"Loaded {len(fills):,} fills")
print(f"Columns: {fills.columns}")
fills.head()

In [None]:
# Convert string columns to numeric for calculations
fills = fills.with_columns([
    pl.col("px").cast(pl.Float64).alias("price"),
    pl.col("sz").cast(pl.Float64).alias("size"),
    pl.col("closed_pnl").cast(pl.Float64).fill_null(0).alias("pnl"),
    pl.col("fee").cast(pl.Float64).fill_null(0).alias("fee_num"),
    pl.col("start_position").cast(pl.Float64).fill_null(0).alias("start_pos"),
])

# Calculate notional value
fills = fills.with_columns(
    (pl.col("price") * pl.col("size")).alias("notional")
)

# Check liquidation data
print(f"Liquidation fills: {(fills['liquidation'].is_not_null()).sum()}")
print(f"TWAP fills: {(fills['twap_id'].is_not_null()).sum()}")
print(f"Builder fills: {(fills['builder'].is_not_null()).sum()}")

fills.head()

## Tier 1: Direct Aggregations

These can become continuous aggregates in TimescaleDB.

In [None]:
# Trader lifetime stats
trader_stats = fills.group_by("user_address").agg([
    # Activity
    pl.len().alias("total_trades"),
    pl.col("notional").sum().alias("total_volume"),
    pl.col("coin").n_unique().alias("unique_coins"),
    pl.col("time").min().alias("first_trade"),
    pl.col("time").max().alias("last_trade"),
    
    # PnL
    pl.col("pnl").sum().alias("realized_pnl"),
    pl.col("fee_num").sum().alias("fees_paid"),
    
    # Maker/Taker
    pl.col("crossed").mean().alias("taker_pct"),
    
    # Win rate
    (pl.col("pnl") > 0).sum().alias("winning_fills"),
    (pl.col("pnl") < 0).sum().alias("losing_fills"),
    (pl.col("pnl") != 0).sum().alias("closing_fills"),
    
    # Liquidations
    pl.col("liquidation").is_not_null().sum().alias("liquidation_fills"),
    
    # TWAP
    pl.col("twap_id").is_not_null().sum().alias("twap_fills"),
])

# Derived metrics
trader_stats = trader_stats.with_columns([
    (1 - pl.col("taker_pct")).alias("maker_pct"),
    (pl.col("realized_pnl") - pl.col("fees_paid")).alias("net_pnl"),
    (pl.col("winning_fills") / pl.col("closing_fills").cast(pl.Float64)).fill_null(0).alias("win_rate"),
    (pl.col("realized_pnl") / pl.col("total_volume")).alias("mtm_tv"),
    (pl.col("liquidation_fills") / pl.col("total_trades").cast(pl.Float64)).fill_null(0).alias("liquidation_pct"),
])

print(f"Computed stats for {len(trader_stats):,} traders")
trader_stats.sort("total_volume", descending=True).head(20)

In [None]:
# Daily aggregation - for Sharpe ratio calculation
MS_PER_DAY = 86400000

daily_stats = fills.with_columns(
    (pl.col("time") // MS_PER_DAY * MS_PER_DAY).alias("day")
).group_by(["user_address", "day"]).agg([
    pl.len().alias("trades"),
    pl.col("notional").sum().alias("volume"),
    pl.col("pnl").sum().alias("daily_pnl"),
    pl.col("fee_num").sum().alias("daily_fees"),
])

print(f"Daily stats: {len(daily_stats):,} trader-days")
daily_stats.head()

## Tier 2: Position Reconstruction

Track openâ†’close cycles to compute holding periods.

In [None]:
# Check the 'dir' column values
fills.group_by("dir").agg(pl.len().alias("count")).sort("count", descending=True)

In [None]:
def reconstruct_positions(fills_df: pl.DataFrame) -> pl.DataFrame:
    """
    Track open->close cycles for each (user, coin) pair.
    Returns completed trades with holding period and PnL.
    """
    fills = fills_df.sort(["user_address", "coin", "time"])
    
    positions = {}
    trades = []
    
    for row in fills.iter_rows(named=True):
        key = (row["user_address"], row["coin"])
        direction = row["dir"]
        
        if not direction:
            continue
            
        if direction.startswith("Open"):
            if key not in positions or positions[key]["size"] == 0:
                positions[key] = {
                    "entry_time": row["time"],
                    "size": row["size"],
                    "side": "long" if "Long" in direction else "short",
                }
            else:
                positions[key]["size"] += row["size"]
                
        elif direction.startswith("Close"):
            if key in positions and positions[key]["size"] > 0:
                pos = positions[key]
                holding_ms = row["time"] - pos["entry_time"]
                
                trades.append({
                    "user_address": row["user_address"],
                    "coin": row["coin"],
                    "side": pos["side"],
                    "holding_period_ms": holding_ms,
                    "holding_period_hours": holding_ms / 3600000,
                    "closed_pnl": row["pnl"],
                    "exit_time": row["time"],
                })
                
                pos["size"] -= row["size"]
                if pos["size"] <= 0:
                    positions[key] = {"size": 0, "entry_time": None, "side": None}
    
    return pl.DataFrame(trades)

# Run on top 100 traders by volume (faster than all)
top_traders = trader_stats.sort("total_volume", descending=True).head(100)["user_address"].to_list()
fills_subset = fills.filter(pl.col("user_address").is_in(top_traders))
print(f"Processing {len(fills_subset):,} fills for top 100 traders...")

trades = reconstruct_positions(fills_subset)
print(f"Found {len(trades):,} completed trades")
trades.head()

In [None]:
# Holding period stats per trader
if len(trades) > 0:
    holding_stats = trades.group_by("user_address").agg([
        pl.len().alias("completed_trades"),
        pl.col("holding_period_hours").mean().alias("avg_hold_hours"),
        pl.col("holding_period_hours").median().alias("median_hold_hours"),
        pl.col("closed_pnl").sum().alias("total_pnl"),
        (pl.col("closed_pnl") > 0).mean().alias("trade_win_rate"),
    ])
    
    print("Holding period distribution:")
    print(trades["holding_period_hours"].describe())
    print()
    holding_stats.sort("completed_trades", descending=True).head(20)
else:
    print("No completed trades found in sample")
    holding_stats = pl.DataFrame()

## Tier 3: Performance Metrics

Sharpe ratio, turnover, etc.

In [None]:
def calculate_sharpe(daily_pnl: np.ndarray, risk_free: float = 0) -> float:
    """Annualized Sharpe ratio from daily PnL series."""
    if len(daily_pnl) < 2:
        return 0.0
    excess = daily_pnl - risk_free
    std = excess.std()
    if std == 0:
        return 0.0
    return float((excess.mean() / std) * np.sqrt(365))


# Calculate Sharpe for each trader with enough data
sharpe_results = []

for user in daily_stats["user_address"].unique().to_list():
    user_daily = daily_stats.filter(pl.col("user_address") == user).sort("day")
    if len(user_daily) >= 2:
        daily_pnl = user_daily["daily_pnl"].to_numpy()
        sharpe = calculate_sharpe(daily_pnl)
        sharpe_results.append({
            "user_address": user,
            "sharpe_ratio": sharpe,
            "trading_days": len(user_daily),
        })

sharpe_df = pl.DataFrame(sharpe_results)
print(f"Calculated Sharpe for {len(sharpe_df):,} traders")
sharpe_df.sort("sharpe_ratio", descending=True).head(20)

## Tier 4: Classification

Combine all metrics and classify traders.

In [None]:
# Merge all metrics
profiles = trader_stats

# Add Sharpe
if len(sharpe_df) > 0:
    profiles = profiles.join(sharpe_df, on="user_address", how="left")

# Add holding period stats
if len(trades) > 0 and len(holding_stats) > 0:
    profiles = profiles.join(holding_stats.select(["user_address", "avg_hold_hours"]), on="user_address", how="left")

profiles.head()

In [None]:
def classify_trader(row: dict) -> str:
    """
    Classify trader based on behavioral metrics.
    """
    maker_pct = row.get("maker_pct", 0) or 0
    mtm_tv = row.get("mtm_tv", 0) or 0
    avg_hold = row.get("avg_hold_hours", 999) or 999
    net_pnl = row.get("net_pnl", 0) or 0
    sharpe = row.get("sharpe_ratio", 0) or 0
    liquidation_pct = row.get("liquidation_pct", 0) or 0
    
    # LIQUIDATOR: Primarily liquidates others
    if liquidation_pct >= 0.20:
        return "LIQUIDATOR"
    
    # HFT: High maker%, low edge per trade
    if (maker_pct >= 0.70 and abs(mtm_tv) <= 0.001):
        return "HFT"
    
    # Smart Directional: High PnL, good risk-adjusted returns
    if (net_pnl >= 10000 and mtm_tv >= 0.001 and sharpe >= 1.0):
        return "SMART_DIRECTIONAL"
    
    # Basis: Long holds
    if avg_hold >= 24:
        return "BASIS"
    
    return "RETAIL"


# Classify each trader
classifications = [classify_trader(row) for row in profiles.iter_rows(named=True)]
profiles = profiles.with_columns(pl.Series("trader_type", classifications))

# Distribution
print("Trader type distribution:")
profiles.group_by("trader_type").agg([
    pl.len().alias("count"),
    pl.col("total_volume").sum().alias("total_volume"),
    pl.col("net_pnl").sum().alias("total_pnl"),
    pl.col("liquidation_fills").sum().alias("liquidation_fills"),
]).sort("count", descending=True)

In [None]:
# View top traders by type
for trader_type in ["HFT", "SMART_DIRECTIONAL", "LIQUIDATOR", "BASIS"]:
    print(f"\n=== {trader_type} ===")
    subset = profiles.filter(pl.col("trader_type") == trader_type)
    if len(subset) > 0:
        display_cols = ["user_address", "total_trades", "total_volume", "net_pnl", 
                       "maker_pct", "mtm_tv", "win_rate", "liquidation_pct"]
        display_cols = [c for c in display_cols if c in subset.columns]
        print(subset.select(display_cols).sort("total_volume", descending=True).head(5))
    else:
        print("No traders in this category")

## Liquidation Analysis

In [None]:
import json

# Parse liquidation events
liq_fills = fills.filter(pl.col("liquidation").is_not_null())
print(f"Total liquidation fills: {len(liq_fills):,}")

if len(liq_fills) > 0:
    # Parse JSON
    liq_events = []
    for row in liq_fills.iter_rows(named=True):
        try:
            data = json.loads(row["liquidation"])
            liq_events.append({
                "time": row["time"],
                "coin": row["coin"],
                "liquidator": row["user_address"],
                "liquidated_user": data.get("liquidatedUser"),
                "mark_price": float(data.get("markPx", 0)),
                "size": row["size"],
                "notional": row["notional"],
                "method": data.get("method"),
            })
        except:
            pass
    
    liq_df = pl.DataFrame(liq_events)
    print(f"Parsed {len(liq_df):,} liquidation events")
    
    # Top liquidated coins
    print("\nTop coins by liquidation volume:")
    print(liq_df.group_by("coin").agg([
        pl.len().alias("count"),
        pl.col("notional").sum().alias("total_notional"),
    ]).sort("total_notional", descending=True).head(10))
    
    # Top liquidators
    print("\nTop liquidators:")
    print(liq_df.group_by("liquidator").agg([
        pl.len().alias("count"),
        pl.col("notional").sum().alias("total_notional"),
    ]).sort("total_notional", descending=True).head(10))

## Summary & Export

In [None]:
# Print the final profile schema
print("Final trader_profiles schema:")
print(profiles.schema)

# Save profiles for reference
profiles.write_parquet("../data/sample_profiles.parquet")
print(f"\nSaved {len(profiles):,} profiles to data/sample_profiles.parquet")

## Next Steps

1. Push schema to cloud DB: `just db-migrate`
2. Refresh continuous aggregates: `just db-refresh-aggregates`
3. Run full analysis: `just analyze`