# Benchmark Stress Test


In [15]:
import asyncio
import random
import time
from datetime import datetime, timezone

import httpx
import pandas as pd
import plotly.express as px


## Fit Benchmark


In [None]:
# Config
BASE_URL = "http://localhost:8000"
ENDPOINT = "/fit/{series_id}"

WORKERS = 100
TOTAL_CALLS = 100
REQUEST_TIMEOUT_SECONDS = 60.0
FIT_WARMUP_CALLS = 5


{'base_url': 'http://localhost:8000', 'workers': 100, 'total_calls': 100, 'timeout_s': 60.0, 'warmup_calls': 5}


In [17]:
run_id = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")

# Explicit limits avoid a small default pool becoming a benchmark bottleneck.
fit_limits = httpx.Limits(
    max_connections=WORKERS,
    max_keepalive_connections=WORKERS,
)


def build_payload(seed: int) -> dict:
    # Valid training payload: increasing timestamps + non-constant values
    start_ts = 1700000000 + seed * 10
    timestamps = [start_ts + i for i in range(6)]
    base = 10.0 + (seed % 5)
    values = [
        base + random.uniform(-0.8, 0.8),
        base + random.uniform(-0.8, 0.8),
        base + random.uniform(-0.8, 0.8),
        base + random.uniform(-0.8, 0.8),
        base + random.uniform(-0.8, 0.8),
        base + random.uniform(-0.8, 0.8),
    ]
    # Avoid accidental all-equal values
    if len(set(round(v, 6) for v in values)) == 1:
        values[-1] += 0.001

    return {"timestamps": timestamps, "values": values}


# Pre-generate request data so timed sections reflect API/network time, not client prep.
fit_requests = []
for i in range(TOTAL_CALLS):
    series_id = f"benchmark_stress_{run_id}_{i}"
    fit_requests.append(
        {
            "call_index": i,
            "series_id": series_id,
            "url": BASE_URL + ENDPOINT.format(series_id=series_id),
            "payload": build_payload(i),
        }
    )


async def call_train(client: httpx.AsyncClient, req: dict) -> dict:
    t0 = time.perf_counter()
    start_epoch_ms = int(time.time() * 1000)

    try:
        response = await client.post(req["url"], json=req["payload"])
        ok = response.status_code == 200
        # Only decode and store body previews for failed responses.
        body = None if ok else response.text[:500]
        error = None
    except Exception as exc:
        response = None
        ok = False
        body = None
        error = str(exc)

    end_epoch_ms = int(time.time() * 1000)
    delta_ms = (time.perf_counter() - t0) * 1000.0

    return {
        "call_index": req["call_index"],
        "series_id": req["series_id"],
        "status_code": response.status_code if response is not None else None,
        "ok": ok,
        "delta_ms": delta_ms,
        "start_epoch_ms": start_epoch_ms,
        "end_epoch_ms": end_epoch_ms,
        "error": error,
        "body_preview": body,
    }


async def run_fit_benchmark() -> list[dict]:
    # Warm up a few calls to stabilize connection reuse before timing starts.
    warmup_calls = min(FIT_WARMUP_CALLS, TOTAL_CALLS)
    semaphore = asyncio.Semaphore(WORKERS)

    async with httpx.AsyncClient(
        timeout=REQUEST_TIMEOUT_SECONDS,
        limits=fit_limits,
        trust_env=False,
    ) as train_client:
        for i in range(warmup_calls):
            warmup_series_id = f"benchmark_stress_{run_id}_warmup_{i}"
            warmup_url = BASE_URL + ENDPOINT.format(series_id=warmup_series_id)
            await train_client.post(warmup_url, json=build_payload(100000 + i))

        async def bounded_call(req: dict) -> dict:
            async with semaphore:
                return await call_train(train_client, req)

        tasks = [bounded_call(req) for req in fit_requests]
        return await asyncio.gather(*tasks)


In [None]:
# Run benchmark
wall_start = time.perf_counter()

# In notebooks, use top-level await instead of asyncio.run()
results = await run_fit_benchmark()
wall_delta_s = time.perf_counter() - wall_start

print(f"Finished {TOTAL_CALLS} calls with {WORKERS} workers in {wall_delta_s:.2f}s")


Finished 100 calls with 100 workers in 1.16s


In [19]:
# Build dataframe + summary
df = pd.DataFrame(results).sort_values(by="call_index").reset_index(drop=True)

display(df.head())

summary = {
    "total_calls": int(len(df)),
    "success_count": int(df["ok"].sum()),
    "error_count": int((~df["ok"]).sum()),
    "min_ms": float(df["delta_ms"].min()),
    "p50_ms": float(df["delta_ms"].quantile(0.50)),
    "p95_ms": float(df["delta_ms"].quantile(0.95)),
    "max_ms": float(df["delta_ms"].max()),
    "mean_ms": float(df["delta_ms"].mean()),
}
summary

Unnamed: 0,call_index,series_id,status_code,ok,delta_ms,start_epoch_ms,end_epoch_ms,error,body_preview
0,0,benchmark_stress_20260218_222137_0,200,True,89.859333,1771453298154,1771453298244,,
1,1,benchmark_stress_20260218_222137_1,200,True,140.851777,1771453298155,1771453298296,,
2,2,benchmark_stress_20260218_222137_2,200,True,181.090353,1771453298156,1771453298337,,
3,3,benchmark_stress_20260218_222137_3,200,True,203.900478,1771453298156,1771453298360,,
4,4,benchmark_stress_20260218_222137_4,200,True,213.786689,1771453298157,1771453298371,,


{'total_calls': 100,
 'success_count': 100,
 'error_count': 0,
 'min_ms': 89.85933299982207,
 'p50_ms': 694.0394534999541,
 'p95_ms': 834.2810473498957,
 'max_ms': 871.7442649999612,
 'mean_ms': 606.7127993299755}

In [None]:
# Latency distribution
fig_hist = px.histogram(
    df,
    x="delta_ms",
    nbins=30,
    title="Training API Latency Distribution",
    labels={"delta_ms": "Latency (ms)"},
)

fig_hist.show()

fig_box = px.box(
    df,
    y="delta_ms",
    points="all",
    title="Training API Latency (Box Plot)",
    labels={"delta_ms": "Latency (ms)"},
)

fig_box.show()

In [None]:
# Fit API status code distribution
df_status_df = df.fillna({"status_code": -1}).copy()
df_status_df["status_code"] = df_status_df["status_code"].astype(int).astype(str)

fig_status = px.histogram(
    df_status_df,
    x="status_code",
    title="Fit API Status Code Distribution",
    labels={"status_code": "HTTP Status Code"},
)

fig_status.show()


## Cooling Down

In [None]:
# Cooldown between tests so server/client state settles before predict load
COOLDOWN_SECONDS = 60
print(f"Cooling down for {COOLDOWN_SECONDS}s before starting predict benchmark...")
time.sleep(COOLDOWN_SECONDS)


Cooling down for 60s before starting predict benchmark...


## Predict Benchmark


In [30]:
# Predict config
PREDICT_ENDPOINT = "/predict/{series_id}"
PREDICT_WORKERS = 100
PREDICT_TOTAL_CALLS = 100
PREDICT_TIMEOUT_SECONDS = 30.0
PREDICT_WARMUP_CALLS = 5
PREDICT_MODEL_VERSION = "0"


In [None]:
# Setup a trained series for predict benchmark
predict_run_id = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
predict_series_id = f"benchmark_predict_{predict_run_id}"
fit_url = BASE_URL + ENDPOINT.format(series_id=predict_series_id)

fit_payload = {
    "timestamps": [1705000000 + i for i in range(8)],
    "values": [10.0, 10.2, 10.1, 9.9, 10.3, 10.15, 9.95, 10.25],
}

with httpx.Client(timeout=PREDICT_TIMEOUT_SECONDS, trust_env=False) as client:
    fit_response = client.post(fit_url, json=fit_payload)

print("predict_series_id:", predict_series_id)
print("fit status:", fit_response.status_code)

if fit_response.status_code != 200:
    print("fit body:", fit_response.text[:500])


In [None]:
# Explicit limits keep predict concurrency from being client-pool bound
predict_limits = httpx.Limits(
    max_connections=PREDICT_WORKERS,
    max_keepalive_connections=PREDICT_WORKERS,
)

# Pre-generate payloads so measured latency excludes client-side random/data setup work
predict_requests = []

for i in range(PREDICT_TOTAL_CALLS):
    predict_requests.append(
        {
            "call_index": i,
            "series_id": predict_series_id,
            "url": BASE_URL + PREDICT_ENDPOINT.format(series_id=predict_series_id),
            "params": {"version": PREDICT_MODEL_VERSION},
            "payload": {
                "timestamp": str(1705001000 + i),
                "value": 10.0 + random.uniform(-1.5, 3.0),
            },
        }
    )


async def call_predict(client: httpx.AsyncClient, req: dict) -> dict:
    t0 = time.perf_counter()
    start_epoch_ms = int(time.time() * 1000)

    try:
        response = await client.post(
            req["url"],
            params=req["params"],
            json=req["payload"],
        )
        ok = response.status_code == 200

        # Keep full body parsing off the hot path for successful requests
        body = None if ok else response.text[:500]
        error = None
    except Exception as exc:
        response = None
        ok = False
        body = None
        error = str(exc)

    end_epoch_ms = int(time.time() * 1000)
    delta_ms = (time.perf_counter() - t0) * 1000.0

    return {
        "call_index": req["call_index"],
        "series_id": req["series_id"],
        "status_code": response.status_code if response is not None else None,
        "ok": ok,
        "delta_ms": delta_ms,
        "start_epoch_ms": start_epoch_ms,
        "end_epoch_ms": end_epoch_ms,
        "error": error,
        "body_preview": body,
    }


async def run_predict_benchmark() -> list[dict]:
    # Warm up a few predict calls so the measured run has stable keep-alive behavior
    warmup_calls = min(PREDICT_WARMUP_CALLS, PREDICT_TOTAL_CALLS)
    semaphore = asyncio.Semaphore(PREDICT_WORKERS)

    async with httpx.AsyncClient(
        timeout=PREDICT_TIMEOUT_SECONDS,
        limits=predict_limits,
        trust_env=False,
    ) as predict_client:
        for i in range(warmup_calls):
            warmup_payload = {
                "timestamp": str(1705009000 + i),
                "value": 10.0,
            }
            warmup_url = BASE_URL + PREDICT_ENDPOINT.format(series_id=predict_series_id)
            await predict_client.post(
                warmup_url,
                params={"version": PREDICT_MODEL_VERSION},
                json=warmup_payload,
            )

        async def bounded_call(req: dict) -> dict:
            async with semaphore:
                return await call_predict(predict_client, req)

        tasks = [bounded_call(req) for req in predict_requests]
        return await asyncio.gather(*tasks)


In [None]:
# Run predict benchmark
predict_wall_start = time.perf_counter()

# In notebooks, use top-level await instead of asyncio.run()
predict_results = await run_predict_benchmark()
predict_wall_delta_s = time.perf_counter() - predict_wall_start
print(f"Finished {PREDICT_TOTAL_CALLS} predict calls with {PREDICT_WORKERS} workers in {predict_wall_delta_s:.2f}s")


Finished 100 predict calls with 100 workers in 0.72s


In [27]:
# Predict dataframe + summary
predict_df = pd.DataFrame(predict_results).sort_values(by="call_index").reset_index(drop=True)
display(predict_df.head())

predict_summary = {
    "total_calls": int(len(predict_df)),
    "success_count": int(predict_df["ok"].sum()),
    "error_count": int((~predict_df["ok"]).sum()),
    "min_ms": float(predict_df["delta_ms"].min()),
    "p50_ms": float(predict_df["delta_ms"].quantile(0.50)),
    "p95_ms": float(predict_df["delta_ms"].quantile(0.95)),
    "max_ms": float(predict_df["delta_ms"].max()),
    "mean_ms": float(predict_df["delta_ms"].mean()),
}
predict_summary


Unnamed: 0,call_index,series_id,status_code,ok,delta_ms,start_epoch_ms,end_epoch_ms,error,body_preview
0,0,benchmark_predict_20260218_222239,200,True,99.669332,1771453359616,1771453359715,,
1,1,benchmark_predict_20260218_222239,200,True,129.614077,1771453359617,1771453359746,,
2,2,benchmark_predict_20260218_222239,200,True,134.503062,1771453359617,1771453359752,,
3,3,benchmark_predict_20260218_222239,200,True,149.021248,1771453359618,1771453359767,,
4,4,benchmark_predict_20260218_222239,200,True,165.642525,1771453359619,1771453359784,,


{'total_calls': 100,
 'success_count': 100,
 'error_count': 0,
 'min_ms': 99.66933200030326,
 'p50_ms': 292.71208399995885,
 'p95_ms': 521.6899290498986,
 'max_ms': 566.3763170000493,
 'mean_ms': 323.3056336100026}

In [28]:
# Predict latency plots
fig_predict_hist = px.histogram(
    predict_df,
    x="delta_ms",
    nbins=30,
    title="Predict API Latency Distribution",
    labels={"delta_ms": "Latency (ms)"},
)
fig_predict_hist.show()

fig_predict_box = px.box(
    predict_df,
    y="delta_ms",
    points="all",
    title="Predict API Latency (Box Plot)",
    labels={"delta_ms": "Latency (ms)"},
)
fig_predict_box.show()



In [29]:
# Predict API status code distribution
predict_df_status_df = predict_df.fillna({"status_code": -1}).copy()
predict_df_status_df["status_code"] = predict_df_status_df["status_code"].astype(int).astype(str)

fig_status = px.histogram(
    predict_df_status_df,
    x="status_code",
    title="Predict API Status Code Distribution",
    labels={"status_code": "HTTP Status Code"},
)
fig_status.show()
