# Cost / Slippage Stress Grid (BTCUSDT 1h)

**Goal:** stress-test a retrieval-based trading policy over a grid of `fee` and `slippage` assumptions.

Artifacts:
- heatmap: performance vs (fee, slippage)
- robustness zone: where performance remains positive

Notes:
- This uses the **retrieved episode distribution** at many anchors (like 04/05/06).
- It is not a true historical market replay; it answers “if the retrieved analog futures are representative, how sensitive is the strategy to friction?”.

Terminology:
- `feePct` and `slippagePct` are **percent per entry** (e.g. 0.04 = 0.04%).
- Delay (optional): use `suggestedAction` from step `DELAY_STEPS` and apply returns from that step onward.


In [1]:
import os
import time
from pathlib import Path
from datetime import datetime, timezone

import numpy as np
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go

from aipricepatterns import Client

pd.set_option('display.max_columns', 200)
pd.set_option('display.width', 200)

def safe_float(x, default=np.nan) -> float:
    try:
        return float(x)
    except Exception:
        return float(default)

def map_suggested_action_to_pos(x) -> int:
    if x is None:
        return 0
    if isinstance(x, (int, float)):
        v = int(x)
        if v in (-1, 0, 1):
            return v
        if v in (0, 1, 2):
            return 1 if v == 1 else (-1 if v == 2 else 0)
        return 0
    s = str(x).strip().lower()
    if s in ('hold','flat','none','neutral','wait'): return 0
    if s in ('long','buy','bull','up'): return 1
    if s in ('short','sell','bear','down'): return -1
    return 0

def episode_gross_pnl_with_delay(ep: dict, horizon: int, delay_steps: int) -> tuple[int, float]:
    """Return (pos, grossPnL) using suggestedAction at step=delay and returns from delay..horizon."""
    ts = ep.get("transitions")
    if not isinstance(ts, list) or not ts:
        return 0, 0.0
    delay = int(max(0, delay_steps))
    delay = min(delay, len(ts) - 1)
    t_delay = ts[delay] if isinstance(ts[delay], dict) else {}
    pos = map_suggested_action_to_pos(t_delay.get("suggestedAction"))
    steps = min(int(horizon), len(ts))
    pnl = 0.0
    for i in range(delay, steps):
        t = ts[i] if isinstance(ts[i], dict) else {}
        pnl += float(pos) * safe_float(t.get("ret", t.get("return", 0.0)), 0.0)
    return int(pos), float(pnl)


## Parameters
Defaults: BTCUSDT 1h, 300 anchors, 120 days lookback.
Grid: you can override fee/slippage ranges via env vars.


In [2]:
BASE_URL = os.getenv("AIPP_BASE_URL", "https://aipricepatterns.com/api/rust")
API_KEY = os.getenv("AIPP_API_KEY")

SYMBOL = os.getenv("AIPP_RL_SYMBOL", "BTCUSDT")
INTERVAL = os.getenv("AIPP_RL_INTERVAL", "1h")

ANCHOR_POINTS = int(os.getenv("AIPP_SWEEP_ANCHORS", "300"))
LOOKBACK_DAYS = int(os.getenv("AIPP_SWEEP_LOOKBACK_DAYS", "120"))

FORECAST_HORIZON = int(os.getenv("AIPP_RL_HORIZON", "24"))
EPISODES_PER_ANCHOR = int(os.getenv("AIPP_SWEEP_EPISODES_PER_ANCHOR", "60"))
MIN_SIMILARITY = float(os.getenv("AIPP_RL_MIN_SIMILARITY", "0.70"))
SAMPLING_STRATEGY = os.getenv("AIPP_RL_SAMPLING_STRATEGY", "uniform")

# Confidence gating threshold (trade only if similarity >= this)
CONF_THR = float(os.getenv("AIPP_RL_SUGGESTED_MIN_SIMILARITY", "0.90"))

# Delay model: use suggestedAction at this step, and apply returns from this step onward
DELAY_STEPS = int(os.getenv("AIPP_DELAY_STEPS", "0"))

# Stress grid in percent units (e.g. 0.04 means 0.04%)
FEE_MIN = float(os.getenv("AIPP_FEE_MIN", "0.00"))
FEE_MAX = float(os.getenv("AIPP_FEE_MAX", "0.20"))
FEE_STEPS = int(os.getenv("AIPP_FEE_STEPS", "21"))
SLIP_MIN = float(os.getenv("AIPP_SLIP_MIN", "0.00"))
SLIP_MAX = float(os.getenv("AIPP_SLIP_MAX", "0.20"))
SLIP_STEPS = int(os.getenv("AIPP_SLIP_STEPS", "21"))

# Cost model: apply entry cost once; if you want round-trip, set ROUND_TRIP=1
ROUND_TRIP = bool(int(os.getenv("AIPP_ROUND_TRIP", "0")))

SWEEP_SLEEP_SEC = float(os.getenv("AIPP_SWEEP_SLEEP_SEC", "0.05"))

# Cache: keep _cache next to this notebook by default; also support legacy nested cache path
NOTEBOOK_DIR = Path.cwd()
DEFAULT_CACHE_DIR = NOTEBOOK_DIR / "_cache"
CACHE_DIR = Path(os.getenv("AIPP_RESEARCH_CACHE_DIR", str(DEFAULT_CACHE_DIR)))
CACHE_DIR.mkdir(parents=True, exist_ok=True)
CACHE_NAME = f"07_cost_grid_eps_{SYMBOL}_{INTERVAL}_{ANCHOR_POINTS}_d{DELAY_STEPS}.csv"
CACHE_PATH = CACHE_DIR / CACHE_NAME
ALT_CACHE_PATH = NOTEBOOK_DIR / "python-sdk" / "research" / "_cache" / CACHE_NAME
if not CACHE_PATH.exists() and ALT_CACHE_PATH.exists():
    print("Using legacy nested cache path:", ALT_CACHE_PATH)
    CACHE_PATH = ALT_CACHE_PATH
    CACHE_DIR = ALT_CACHE_PATH.parent

print("Base URL:", BASE_URL)
print(f"Symbol: {SYMBOL}  Interval: {INTERVAL}")
print(f"Anchors: {ANCHOR_POINTS}  LookbackDays: {LOOKBACK_DAYS}")
print(f"Episodes/anchor: {EPISODES_PER_ANCHOR}  minSimilarity: {MIN_SIMILARITY:.2f}  horizon: {FORECAST_HORIZON}")
print(f"CONF_THR: {CONF_THR:.2f}  DELAY_STEPS: {DELAY_STEPS}  ROUND_TRIP: {ROUND_TRIP}")
print(f"Fee grid: {FEE_MIN:.3f}%..{FEE_MAX:.3f}% steps={FEE_STEPS}")
print(f"Slip grid: {SLIP_MIN:.3f}%..{SLIP_MAX:.3f}% steps={SLIP_STEPS}")
print("Cache:", str(CACHE_PATH))


Base URL: https://aipricepatterns.com/api/rust
Symbol: BTCUSDT  Interval: 1h
Anchors: 300  LookbackDays: 120
Episodes/anchor: 60  minSimilarity: 0.70  horizon: 24
CONF_THR: 0.90  DELAY_STEPS: 0  ROUND_TRIP: False
Fee grid: 0.000%..0.200% steps=21
Slip grid: 0.000%..0.200% steps=21
Cache: /Users/serg/projects/prod/ai_patterns/python-sdk/research/_cache/07_cost_grid_eps_BTCUSDT_1h_300_d0.csv


## Build episode dataset (cached)
We cache per-episode gross PnL (no fee/slippage) + position, so the cost grid evaluation is fast.


In [3]:
if CACHE_PATH.exists():
    eps_df = pd.read_csv(CACHE_PATH)
    print("loaded cache:", str(CACHE_PATH), "rows:", len(eps_df))
else:
    client = Client(base_url=BASE_URL, api_key=API_KEY)
    now_ms = int(time.time() * 1000)
    start_ms = now_ms - LOOKBACK_DAYS * 24 * 60 * 60 * 1000
    anchors = np.linspace(start_ms, now_ms, num=ANCHOR_POINTS, dtype=np.int64).tolist()
    rows = []
    for idx, anchor_ts in enumerate(anchors, start=1):
        res = client.get_rl_episodes(
            symbol=SYMBOL,
            interval=INTERVAL,
            anchor_ts=int(anchor_ts),
            forecast_horizon=FORECAST_HORIZON,
            num_episodes=EPISODES_PER_ANCHOR,
            min_similarity=MIN_SIMILARITY,
            include_actions=True,
            reward_type="returns",
            sampling_strategy=SAMPLING_STRATEGY,
        )
        eps = res.get("episodes") if isinstance(res, dict) else None
        if not isinstance(eps, list) or not eps:
            continue
        anchor_dt = datetime.fromtimestamp(anchor_ts/1000, tz=timezone.utc).strftime("%Y-%m-%d %H:%M")
        for ep in eps:
            sim = safe_float(ep.get("similarity"), np.nan)
            pos, gross = episode_gross_pnl_with_delay(ep, horizon=FORECAST_HORIZON, delay_steps=DELAY_STEPS)
            rows.append({
                "anchorTs": int(anchor_ts),
                "anchorDtUtc": anchor_dt,
                "similarity": float(sim),
                "pos": int(pos),
                "grossPnL": float(gross),
            })
        if idx % 20 == 0:
            print(f"{idx}/{len(anchors)} anchors... rows={len(rows)}")
        time.sleep(SWEEP_SLEEP_SEC)
    eps_df = pd.DataFrame(rows).dropna(subset=["similarity"]).reset_index(drop=True)
    eps_df.to_csv(CACHE_PATH, index=False)
    print("wrote cache:", str(CACHE_PATH), "rows:", len(eps_df))

print("episode rows:", len(eps_df))
eps_df.head()


20/300 anchors... rows=1200
40/300 anchors... rows=2400
60/300 anchors... rows=3600
80/300 anchors... rows=4800
100/300 anchors... rows=6000
120/300 anchors... rows=7200
140/300 anchors... rows=8400
160/300 anchors... rows=9600
180/300 anchors... rows=10800
200/300 anchors... rows=12000
220/300 anchors... rows=13200
240/300 anchors... rows=14400
260/300 anchors... rows=15600
280/300 anchors... rows=16800
300/300 anchors... rows=17994
wrote cache: /Users/serg/projects/prod/ai_patterns/python-sdk/research/_cache/07_cost_grid_eps_BTCUSDT_1h_300_d0.csv rows: 17994
episode rows: 17994


Unnamed: 0,anchorTs,anchorDtUtc,similarity,pos,grossPnL
0,1755798702697,2025-08-21 17:51,0.9248,-1,15.6328
1,1755798702697,2025-08-21 17:51,0.9212,0,0.0
2,1755798702697,2025-08-21 17:51,0.9186,0,0.0
3,1755798702697,2025-08-21 17:51,0.9148,0,0.0
4,1755798702697,2025-08-21 17:51,0.9147,0,0.0


## Evaluate cost/slippage grid
We evaluate a simple gated strategy: trade only if `similarity >= CONF_THR`.
Net PnL = grossPnL - entryCost, where entryCost = |pos| * (feePct + slippagePct) (and optionally doubled if ROUND_TRIP).


In [4]:
fees = np.linspace(FEE_MIN, FEE_MAX, num=FEE_STEPS)
slips = np.linspace(SLIP_MIN, SLIP_MAX, num=SLIP_STEPS)

m = eps_df["similarity"] >= float(CONF_THR)
active = eps_df.loc[m].copy()
coverage = float(m.mean())
print("coverage:", coverage, "active rows:", len(active))

rows = []
pos_abs = active["pos"].abs().to_numpy(dtype=float)
gross = active["grossPnL"].to_numpy(dtype=float)

rt_mult = 2.0 if ROUND_TRIP else 1.0
for fee in fees:
    for slip in slips:
        cost = (float(fee) + float(slip)) / 100.0
        entry_cost = rt_mult * pos_abs * cost
        net_active = gross - entry_cost
        # Overall: include non-trades as 0 pnl
        overall_avg = float(net_active.sum() / len(eps_df))
        trade_avg = float(net_active.mean()) if len(net_active) else float("nan")
        winrate = float((net_active > 0).mean()) if len(net_active) else float("nan")
        rows.append({
            "feePct": float(fee),
            "slippagePct": float(slip),
            "coverage": coverage,
            "overallAvgPnL": overall_avg,
            "tradeAvgPnL": trade_avg,
            "winrate": winrate,
        })

grid = pd.DataFrame(rows)
grid.head()


coverage: 0.8039902189618762 active rows: 14467


Unnamed: 0,feePct,slippagePct,coverage,overallAvgPnL,tradeAvgPnL,winrate
0,0.0,0.0,0.80399,0.261918,0.325772,0.18912
1,0.0,0.01,0.80399,0.261894,0.325743,0.18912
2,0.0,0.02,0.80399,0.261871,0.325714,0.18912
3,0.0,0.03,0.80399,0.261848,0.325685,0.18912
4,0.0,0.04,0.80399,0.261825,0.325657,0.18912


In [5]:
heat = grid.pivot(index="feePct", columns="slippagePct", values="overallAvgPnL").sort_index(ascending=True)
fig = px.imshow(
    heat,
    origin="lower",
    aspect="auto",
    title=f"OverallAvgPnL heatmap (gated @ sim≥{CONF_THR:.2f}, delay={DELAY_STEPS}, roundTrip={ROUND_TRIP})",
    labels={"x": "slippagePct", "y": "feePct", "color": "overallAvgPnL"},
)
fig.update_layout(height=520)
fig


## Robustness zone
We mark the region where overallAvgPnL remains positive.


In [6]:
grid2 = grid.copy()
grid2["isPositive"] = grid2["overallAvgPnL"] > 0

# Max total cost (fee+slip) that still yields positive overallAvgPnL
pos = grid2[grid2["isPositive"]].copy()
if len(pos) == 0:
    print("No positive region in the evaluated grid. Reduce costs or revisit CONF_THR/delay.")
else:
    pos["totalCostPct"] = pos["feePct"] + pos["slippagePct"]
    best = pos.sort_values(["totalCostPct"], ascending=False).iloc[0]
    print("Max total cost with positive overallAvgPnL:")
    print(best[["feePct","slippagePct","totalCostPct","overallAvgPnL","tradeAvgPnL","winrate","coverage"]])

heat_pos = grid2.pivot(index="feePct", columns="slippagePct", values="isPositive").sort_index(ascending=True)
fig = px.imshow(heat_pos, origin="lower", aspect="auto", title="Robustness zone: overallAvgPnL > 0", labels={"x":"slippagePct","y":"feePct","color":"positive"})
fig.update_layout(height=520)
fig


Max total cost with positive overallAvgPnL:
feePct                0.2
slippagePct           0.2
totalCostPct          0.4
overallAvgPnL    0.260988
tradeAvgPnL      0.324616
winrate          0.188774
coverage          0.80399
Name: 440, dtype: object


## What to conclude
- If the positive region disappears with small costs, the strategy is likely not realistic for your market/venue.
- If there is a wide positive region, you have a strong argument that the signal survives fees/slippage.
- If delay kills the positive region, the edge may be too short-lived or depends on unrealistically fast execution.
