# QuantLaxmi — Arb Observables Analysis

**Session**: Profile-1 Crypto (2-hour certified)

**Purpose**: Discover structural arbitrage signals that survive fees + latency

**Status**: SCAFFOLDING ONLY - DO NOT EXECUTE UNTIL SESSION COMPLETE

---

## Rule Zero
You are not allowed to invent conclusions tonight.
Tonight is about preparing the ground so that tomorrow's analysis is boring, deterministic, and correct.

## Constants (Frozen)

| Parameter | Value | Notes |
|-----------|-------|-------|
| Taker fee | 0.001 (0.1%) | Binance spot default |
| Latency windows | 20ms, 50ms, 100ms | Sensitivity test |
| Quote age cutoff | 200ms | Hard filter |
| Price exponent | -2 | Binance mantissa |
| Qty exponent | -8 | Binance mantissa |

### Profile-1 Symbols
**USDT pairs**: BTCUSDT, ETHUSDT, BNBUSDT, SOLUSDT, XRPUSDT

**Cross pairs**: ETHBTC, BNBBTC, SOLBTC

### Triangle Definitions
| Triangle | Leg A | Leg B | Leg C |
|----------|-------|-------|-------|
| ETH-BTC | BTCUSDT | ETHBTC | ETHUSDT |
| BNB-BTC | BTCUSDT | BNBBTC | BNBUSDT |
| SOL-BTC | BTCUSDT | SOLBTC | SOLUSDT |

---
## 0. Preconditions Check

Before computing anything, verify:
- [ ] Session is certified
- [ ] All symbols present for entire interval
- [ ] No symbol downgrade
- [ ] Depth + trades both available
- [ ] Timestamps monotonic per symbol

**If any fail → ABORT ANALYSIS**

In [None]:
# 0. Preconditions Check
import json
from pathlib import Path

SESSION_DIR = Path("/home/isoula/7hills/QuantLaxmi/data/sessions/_sealed/profile1_2h_20260122_2224")

# Load and verify manifest
with open(SESSION_DIR / "session_manifest.json") as f:
    manifest = json.load(f)

print("Session ID:", manifest["session_id"])
print("Certified:", manifest["determinism"]["certified"])
print("All symbols clean:", manifest["determinism"]["all_symbols_clean"])
print("Semantics:", manifest["determinism"]["semantics"])
print("Duration:", f"{manifest['duration_secs']:.1f}s ({manifest['duration_secs']/3600:.2f}h)")
print("\nSymbols:", manifest["symbols"])

# Verify Triangle A symbols present
triangle_a_symbols = ["BTCUSDT", "ETHUSDT", "ETHBTC"]
for sym in triangle_a_symbols:
    if sym not in manifest["symbols"]:
        raise ValueError(f"ABORT: Missing symbol {sym}")
    capture = next(c for c in manifest["captures"] if c["symbol"] == sym)
    print(f"\n{sym}:")
    print(f"  Depth events: {capture['events_written']:,}")
    print(f"  Trades: {capture['trades_written']:,}")
    print(f"  Hash: {capture['depth_hash'][:16]}...")

print("\n✓ All preconditions satisfied for Triangle A")

---
## 1. Data Loading & Time Alignment

In [None]:
# 1. Data Loading & Time Alignment for Triangle A
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from dateutil import parser as dtparser

def load_depth_quotes(session_dir: Path, symbol: str) -> pd.DataFrame:
    """Load depth.jsonl and extract best bid/ask with timestamps."""
    depth_file = session_dir / symbol / "depth.jsonl"
    
    records = []
    with open(depth_file) as f:
        for line in f:
            rec = json.loads(line)
            ts = dtparser.isoparse(rec["ts"])
            
            # Extract best bid/ask from depth
            # bids and asks are lists of [price_mantissa, qty_mantissa]
            bids = rec.get("bids", [])
            asks = rec.get("asks", [])
            
            if not bids or not asks:
                continue
            
            price_exp = rec.get("price_exponent", -2)
            
            # Best bid = highest bid, best ask = lowest ask
            best_bid = bids[0][0] * (10 ** price_exp)
            best_ask = asks[0][0] * (10 ** price_exp)
            
            records.append({
                "ts": ts,
                "bid": best_bid,
                "ask": best_ask
            })
    
    df = pd.DataFrame(records)
    df = df.sort_values("ts").reset_index(drop=True)
    df["ts_ns"] = df["ts"].astype("int64")
    return df

# Load all three legs
print("Loading BTCUSDT depth...")
btcusdt = load_depth_quotes(SESSION_DIR, "BTCUSDT")
print(f"  {len(btcusdt):,} quotes, {btcusdt['ts'].min()} to {btcusdt['ts'].max()}")

print("Loading ETHUSDT depth...")
ethusdt = load_depth_quotes(SESSION_DIR, "ETHUSDT")
print(f"  {len(ethusdt):,} quotes, {ethusdt['ts'].min()} to {ethusdt['ts'].max()}")

print("Loading ETHBTC depth...")
ethbtc = load_depth_quotes(SESSION_DIR, "ETHBTC")
print(f"  {len(ethbtc):,} quotes, {ethbtc['ts'].min()} to {ethbtc['ts'].max()}")

In [None]:
# 1b. Build Global Timeline with 200ms Staleness Cap

MAX_STALENESS_MS = 200
MAX_STALENESS_NS = MAX_STALENESS_MS * 1_000_000

def build_aligned_quotes(dfs: dict, max_staleness_ns: int) -> pd.DataFrame:
    """
    Build global timeline from union of all quote timestamps.
    Forward-fill each leg with staleness cap.
    Drop rows where any leg exceeds staleness.
    """
    # Collect all unique timestamps
    all_ts = set()
    for name, df in dfs.items():
        all_ts.update(df["ts_ns"].tolist())
    
    global_ts = sorted(all_ts)
    print(f"Global timeline: {len(global_ts):,} unique timestamps")
    
    # Build result dataframe
    result = pd.DataFrame({"ts_ns": global_ts})
    
    for name, df in dfs.items():
        # Merge and forward-fill
        df_subset = df[["ts_ns", "bid", "ask"]].copy()
        df_subset = df_subset.rename(columns={"bid": f"{name}_bid", "ask": f"{name}_ask"})
        df_subset[f"{name}_quote_ts"] = df_subset["ts_ns"]
        
        result = pd.merge_asof(
            result.sort_values("ts_ns"),
            df_subset.sort_values("ts_ns"),
            on="ts_ns",
            direction="backward"
        )
    
    # Compute staleness for each leg
    for name in dfs.keys():
        result[f"{name}_staleness"] = result["ts_ns"] - result[f"{name}_quote_ts"]
    
    # Filter: drop rows where any leg exceeds staleness cap
    mask = pd.Series(True, index=result.index)
    for name in dfs.keys():
        mask &= (result[f"{name}_staleness"] <= max_staleness_ns)
    
    result_clean = result[mask].copy()
    
    dropped = len(result) - len(result_clean)
    print(f"Dropped {dropped:,} rows ({100*dropped/len(result):.2f}%) due to staleness > {MAX_STALENESS_MS}ms")
    
    return result_clean

# Build aligned quotes
aligned = build_aligned_quotes(
    {"btcusdt": btcusdt, "ethusdt": ethusdt, "ethbtc": ethbtc},
    MAX_STALENESS_NS
)

print(f"\nAligned dataset: {len(aligned):,} valid quote snapshots")
print(f"Time range: {pd.to_datetime(aligned['ts_ns'].min())} to {pd.to_datetime(aligned['ts_ns'].max())}")

# 2. Triangle A Residuals — Gross (No Fees)
#
# Triangle A: BTC-ETH-USDT
# Legs: BTCUSDT, ETHUSDT, ETHBTC
#
# Direction CW (USDT → BTC → ETH → USDT):
#   Buy BTC with USDT (pay ask), Sell BTC for ETH (get bid on ETHBTC), Sell ETH for USDT (get bid)
#   ε_cw = log(ETHUSDT_bid) - log(BTCUSDT_ask) - log(ETHBTC_ask)
#
# Direction CCW (USDT → ETH → BTC → USDT):
#   Buy ETH with USDT (pay ask), Sell ETH for BTC (get bid on ETHBTC), Sell BTC for USDT (get bid)
#   ε_ccw = log(BTCUSDT_bid) + log(ETHBTC_bid) - log(ETHUSDT_ask)

# Compute residuals
aligned["epsilon_cw"] = (
    np.log(aligned["ethusdt_bid"]) 
    - np.log(aligned["btcusdt_ask"]) 
    - np.log(aligned["ethbtc_ask"])
)

aligned["epsilon_ccw"] = (
    np.log(aligned["btcusdt_bid"]) 
    + np.log(aligned["ethbtc_bid"]) 
    - np.log(aligned["ethusdt_ask"])
)

# Basic sanity check
print("Residual Statistics (log scale):")
print(f"\nε_cw:  mean={aligned['epsilon_cw'].mean():.6f}, std={aligned['epsilon_cw'].std():.6f}")
print(f"       min={aligned['epsilon_cw'].min():.6f}, max={aligned['epsilon_cw'].max():.6f}")
print(f"\nε_ccw: mean={aligned['epsilon_ccw'].mean():.6f}, std={aligned['epsilon_ccw'].std():.6f}")
print(f"       min={aligned['epsilon_ccw'].min():.6f}, max={aligned['epsilon_ccw'].max():.6f}")

# Convert to basis points for intuition
print(f"\nIn basis points (1 bp = 0.0001):")
print(f"ε_cw  range: [{aligned['epsilon_cw'].min()*10000:.1f}, {aligned['epsilon_cw'].max()*10000:.1f}] bp")
print(f"ε_ccw range: [{aligned['epsilon_ccw'].min()*10000:.1f}, {aligned['epsilon_ccw'].max()*10000:.1f}] bp")

In [None]:
# 2. Triangle Residuals Gross - DO NOT EXECUTE UNTIL SESSION COMPLETE

---
## 3. Triangle Residuals — Fee Adjusted

ε_net = ε_gross - 3f (where f = 0.001)

In [None]:
# 3. Fee Adjusted - DO NOT EXECUTE UNTIL SESSION COMPLETE

---
## 4. Latency Penalty Sensitivity

Test with T_exec = 20ms, 50ms, 100ms

In [None]:
# 4. Latency Penalty - DO NOT EXECUTE UNTIL SESSION COMPLETE

# 5. Triangle A Statistics — Minimal Required Metrics

def compute_run_durations(series: pd.Series, ts_ns: pd.Series, positive: bool = True) -> dict:
    """
    Compute contiguous run durations where condition is met.
    Returns median, p90, max durations in milliseconds.
    """
    if positive:
        mask = series > 0
    else:
        mask = series < 0
    
    # Find run boundaries
    run_starts = []
    run_ends = []
    in_run = False
    
    for i in range(len(mask)):
        if mask.iloc[i] and not in_run:
            run_starts.append(i)
            in_run = True
        elif not mask.iloc[i] and in_run:
            run_ends.append(i - 1)
            in_run = False
    
    if in_run:
        run_ends.append(len(mask) - 1)
    
    if not run_starts:
        return {"median_ms": 0, "p90_ms": 0, "max_ms": 0, "count": 0}
    
    # Compute durations in ms
    durations_ms = []
    for start, end in zip(run_starts, run_ends):
        duration_ns = ts_ns.iloc[end] - ts_ns.iloc[start]
        durations_ms.append(duration_ns / 1_000_000)
    
    durations_ms = np.array(durations_ms)
    
    return {
        "median_ms": np.median(durations_ms),
        "p90_ms": np.percentile(durations_ms, 90),
        "max_ms": np.max(durations_ms),
        "count": len(durations_ms)
    }

# Compute statistics for both directions
print("=" * 60)
print("TRIANGLE A (BTC-ETH-USDT) — STATISTICS SUMMARY")
print("=" * 60)

for direction, col in [("CW", "epsilon_cw"), ("CCW", "epsilon_ccw")]:
    eps = aligned[col]
    
    # Hit rate (gross positive)
    hr = (eps > 0).mean()
    
    # P99 and max
    p99 = np.percentile(eps, 99)
    eps_max = eps.max()
    
    # Run durations
    runs = compute_run_durations(eps, aligned["ts_ns"], positive=True)
    
    # Spread-conditioned hit rate
    tight_mask = aligned["spread_sum"] <= spread_sum_p50
    hr_tight = (eps[tight_mask] > 0).mean() if tight_mask.sum() > 0 else 0
    
    print(f"\n{direction} Direction:")
    print(f"  HR (gross):        {hr*100:.4f}%")
    print(f"  HR (tight spread): {hr_tight*100:.4f}%")
    print(f"  P99 residual:      {p99*10000:.2f} bp")
    print(f"  Max residual:      {eps_max*10000:.2f} bp")
    print(f"  Positive runs:     {runs['count']:,}")
    print(f"    Median duration: {runs['median_ms']:.1f} ms")
    print(f"    P90 duration:    {runs['p90_ms']:.1f} ms")
    print(f"    Max duration:    {runs['max_ms']:.1f} ms")

print("\n" + "=" * 60)

# Sanity check: are both always positive? (would indicate bug)
both_positive = ((aligned["epsilon_cw"] > 0) & (aligned["epsilon_ccw"] > 0)).mean()
print(f"\nSanity check: Both CW and CCW positive simultaneously: {both_positive*100:.4f}%")
if both_positive > 0.5:
    print("⚠️  WARNING: If both directions often positive, check alignment logic!")

In [None]:
# 5. Health Metrics - DO NOT EXECUTE UNTIL SESSION COMPLETE

# 6. Spread-Residual Coupling — Context Series

# Compute relative spread for each leg
aligned["btcusdt_spread"] = (aligned["btcusdt_ask"] - aligned["btcusdt_bid"]) / ((aligned["btcusdt_ask"] + aligned["btcusdt_bid"]) / 2)
aligned["ethusdt_spread"] = (aligned["ethusdt_ask"] - aligned["ethusdt_bid"]) / ((aligned["ethusdt_ask"] + aligned["ethusdt_bid"]) / 2)
aligned["ethbtc_spread"] = (aligned["ethbtc_ask"] - aligned["ethbtc_bid"]) / ((aligned["ethbtc_ask"] + aligned["ethbtc_bid"]) / 2)

# Total spread (sum of relative spreads)
aligned["spread_sum"] = aligned["btcusdt_spread"] + aligned["ethusdt_spread"] + aligned["ethbtc_spread"]

print("Spread Statistics (relative):")
print(f"\nBTCUSDT: mean={aligned['btcusdt_spread'].mean()*10000:.2f} bp, p50={aligned['btcusdt_spread'].median()*10000:.2f} bp")
print(f"ETHUSDT: mean={aligned['ethusdt_spread'].mean()*10000:.2f} bp, p50={aligned['ethusdt_spread'].median()*10000:.2f} bp")
print(f"ETHBTC:  mean={aligned['ethbtc_spread'].mean()*10000:.2f} bp, p50={aligned['ethbtc_spread'].median()*10000:.2f} bp")
print(f"\nSpread Sum: mean={aligned['spread_sum'].mean()*10000:.2f} bp, p50={aligned['spread_sum'].median()*10000:.2f} bp")

# Compute spread_sum percentiles for later conditioning
spread_sum_p50 = aligned["spread_sum"].median()
print(f"\nSpread Sum p50 threshold: {spread_sum_p50*10000:.2f} bp")

In [None]:
# Plot 3: Residual vs Spread_Sum Scatter

# Use max of cw/ccw for y-axis
aligned["epsilon_max"] = aligned[["epsilon_cw", "epsilon_ccw"]].max(axis=1)

fig, ax = plt.subplots(figsize=(10, 6))

# Subsample for scatter (every 50th point)
scatter_data = aligned.iloc[::50].copy()

ax.scatter(
    scatter_data["spread_sum"] * 10000, 
    scatter_data["epsilon_max"] * 10000,
    alpha=0.3, s=5
)

ax.axhline(y=0, color='r', linestyle='--', linewidth=1, label='ε=0')
ax.axvline(x=spread_sum_p50 * 10000, color='g', linestyle='--', linewidth=1, label=f'spread_sum p50 ({spread_sum_p50*10000:.1f} bp)')

ax.set_xlabel("Spread Sum (basis points)")
ax.set_ylabel("max(ε_cw, ε_ccw) (basis points)")
ax.set_title("Triangle A: Residual vs Total Spread")
ax.legend()

plt.tight_layout()
plt.show()

print("Plot 3: Residual vs spread scatter — check if positives require wide spreads")

In [None]:
# Plot 2: Residual Histograms (cw and ccw separately)

fig, axes = plt.subplots(1, 2, figsize=(12, 4))

# CW histogram
axes[0].hist(aligned["epsilon_cw"] * 10000, bins=100, alpha=0.7, edgecolor='black', linewidth=0.5)
axes[0].axvline(x=0, color='r', linestyle='--', linewidth=1, label='zero')
axes[0].set_xlabel("ε_cw (basis points)")
axes[0].set_ylabel("Frequency")
axes[0].set_title("Distribution of ε_cw (Clockwise)")
axes[0].legend()

# CCW histogram
axes[1].hist(aligned["epsilon_ccw"] * 10000, bins=100, alpha=0.7, edgecolor='black', linewidth=0.5, color='orange')
axes[1].axvline(x=0, color='r', linestyle='--', linewidth=1, label='zero')
axes[1].set_xlabel("ε_ccw (basis points)")
axes[1].set_ylabel("Frequency")
axes[1].set_title("Distribution of ε_ccw (Counter-Clockwise)")
axes[1].legend()

plt.tight_layout()
plt.show()

print("Plot 2: Residual histograms — check tail behavior and typical magnitude")

In [None]:
# Final Summary Cell — To be filled after execution

print("=" * 70)
print("TRIANGLE A FINAL REPORT")
print("=" * 70)

# Hit rates
hr_cw = (aligned["epsilon_cw"] > 0).mean()
hr_ccw = (aligned["epsilon_ccw"] > 0).mean()

tight_mask = aligned["spread_sum"] <= spread_sum_p50
hr_cw_tight = (aligned["epsilon_cw"][tight_mask] > 0).mean()
hr_ccw_tight = (aligned["epsilon_ccw"][tight_mask] > 0).mean()

# P99
p99_cw = np.percentile(aligned["epsilon_cw"], 99)
p99_ccw = np.percentile(aligned["epsilon_ccw"], 99)

# Max run durations
runs_cw = compute_run_durations(aligned["epsilon_cw"], aligned["ts_ns"], positive=True)
runs_ccw = compute_run_durations(aligned["epsilon_ccw"], aligned["ts_ns"], positive=True)

print(f"\nHR(cw):  {hr_cw*100:.4f}%")
print(f"HR(ccw): {hr_ccw*100:.4f}%")
print(f"\np99(cw):  {p99_cw*10000:.2f} bp")
print(f"p99(ccw): {p99_ccw*10000:.2f} bp")
print(f"\nMax run duration (cw):  {runs_cw['max_ms']:.1f} ms")
print(f"Max run duration (ccw): {runs_ccw['max_ms']:.1f} ms")

# Spread assessment
wide_mask = aligned["spread_sum"] > spread_sum_p50
hr_cw_wide = (aligned["epsilon_cw"][wide_mask] > 0).mean()
hr_ccw_wide = (aligned["epsilon_ccw"][wide_mask] > 0).mean()

print(f"\nSpread conditioning:")
print(f"  HR_tight(cw):  {hr_cw_tight*100:.4f}%  |  HR_wide(cw):  {hr_cw_wide*100:.4f}%")
print(f"  HR_tight(ccw): {hr_ccw_tight*100:.4f}%  |  HR_wide(ccw): {hr_ccw_wide*100:.4f}%")

# Determine spread condition
if hr_cw_tight > hr_cw_wide:
    spread_cond_cw = "low/normal"
elif hr_cw_tight < hr_cw_wide * 0.5:
    spread_cond_cw = "high (wide spreads)"
else:
    spread_cond_cw = "normal (no strong dependency)"

if hr_ccw_tight > hr_ccw_wide:
    spread_cond_ccw = "low/normal"
elif hr_ccw_tight < hr_ccw_wide * 0.5:
    spread_cond_ccw = "high (wide spreads)"
else:
    spread_cond_ccw = "normal (no strong dependency)"

print(f"\n→ Positive residuals (CW) occur mostly when spread_sum is: {spread_cond_cw}")
print(f"→ Positive residuals (CCW) occur mostly when spread_sum is: {spread_cond_ccw}")

print("\n" + "=" * 70)

In [None]:
# 6. Spread Coupling - DO NOT EXECUTE UNTIL SESSION COMPLETE

---
## 7. Stat-Arb Spread Diagnostics

In [None]:
# 7. Stat-Arb - DO NOT EXECUTE UNTIL SESSION COMPLETE

---
## 8. Summary (Fill after 02:00 IST)

1. Which triangles ever go gross-positive: [TBD]
2. Typical duration of positive windows: [TBD]
3. Whether cw/ccw are symmetric: [TBD]