# Online Algorithms Quickstart

This notebook walks through streaming changepoint detection with BOCPD, CUSUM, and Page-Hinkley on a synthetic service-latency stream.

In [None]:
import numpy as np
import cpd

try:
    import matplotlib.pyplot as plt
    HAS_MATPLOTLIB = True
except ImportError:
    HAS_MATPLOTLIB = False
    print("matplotlib is optional. Install with: python -m pip install matplotlib")


In [None]:
# Deterministic service latency stream: baseline, shift, drift, and transient spikes
rng = np.random.default_rng(2027)
n1, n2, n3 = 220, 160, 140
seg1 = 95.0 + rng.normal(0.0, 1.8, size=n1)
seg2 = 112.0 + rng.normal(0.0, 2.0, size=n2)
seg3_base = np.linspace(112.0, 128.0, n3)
seg3 = seg3_base + rng.normal(0.0, 1.8, size=n3)
latency = np.concatenate([seg1, seg2, seg3]).astype(np.float64)
for idx in (75, 260, 310, 430):
    latency[idx] += 14.0
true_change_points = [n1, n1 + n2]
latency.shape, true_change_points


In [None]:
def summarize_steps(name, steps, change_index):
    first_alert = next((i for i, step in enumerate(steps) if step.alert), None)
    pre_alerts = sum(1 for i, step in enumerate(steps) if i < change_index and step.alert)
    post_alerts = sum(1 for i, step in enumerate(steps) if i >= change_index and step.alert)
    mean_p_change = float(np.mean([step.p_change for step in steps]))
    return {
        "detector": name,
        "first_alert": first_alert,
        "pre_alerts": pre_alerts,
        "post_alerts": post_alerts,
        "mean_p_change": round(mean_p_change, 4),
    }


In [None]:
# BOCPD
bocpd = cpd.Bocpd(
    model="gaussian_nig",
    hazard=1.0 / 200.0,
    max_run_length=512,
    alert_policy={"threshold": 0.35, "cooldown": 6, "min_run_length": 10},
)
bocpd_steps = bocpd.update_many(latency)
bocpd_summary = summarize_steps("bocpd", bocpd_steps, true_change_points[0])
bocpd_summary


In [None]:
# CUSUM
cusum = cpd.Cusum(
    drift=0.05,
    threshold=8.0,
    target_mean=float(np.mean(latency[:n1])),
    alert_policy={"threshold": 0.95, "cooldown": 6, "min_run_length": 10},
)
cusum_steps = cusum.update_many(latency)
cusum_summary = summarize_steps("cusum", cusum_steps, true_change_points[0])
cusum_summary


In [None]:
# Page-Hinkley
page_hinkley = cpd.PageHinkley(
    delta=0.02,
    threshold=8.0,
    initial_mean=float(np.mean(latency[:n1])),
    alert_policy={"threshold": 0.95, "cooldown": 6, "min_run_length": 10},
)
page_hinkley_steps = page_hinkley.update_many(latency)
page_hinkley_summary = summarize_steps("page_hinkley", page_hinkley_steps, true_change_points[0])
page_hinkley_summary


In [None]:
# Detector-by-detector summary
summaries = [bocpd_summary, cusum_summary, page_hinkley_summary]
for row in summaries:
    print(
        f"{row['detector']:<13} first_alert={row['first_alert']} "
        f"pre_alerts={row['pre_alerts']} post_alerts={row['post_alerts']} "
        f"mean_p_change={row['mean_p_change']}"
    )


In [None]:
# Plot stream and alert markers
if HAS_MATPLOTLIB:
    fig, ax = plt.subplots(figsize=(12, 4))
    ax.plot(latency, color="black", linewidth=1.0, alpha=0.85, label="latency")
    for cp in true_change_points:
        ax.axvline(cp, color="tab:green", linestyle="--", linewidth=1.5, alpha=0.8)
    detector_steps = {
        "bocpd": bocpd_steps,
        "cusum": cusum_steps,
        "page_hinkley": page_hinkley_steps,
    }
    marker_colors = {"bocpd": "tab:blue", "cusum": "tab:orange", "page_hinkley": "tab:red"}
    for name, steps in detector_steps.items():
        alert_idx = [i for i, step in enumerate(steps) if step.alert]
        ax.scatter(alert_idx, latency[alert_idx], s=28, alpha=0.85, color=marker_colors[name], label=name)
    ax.set_title("Online detector alerts on service-latency stream")
    ax.set_xlabel("time step")
    ax.set_ylabel("latency (ms)")
    ax.legend(loc="upper left")
    plt.show()
else:
    print("Skipping plot (matplotlib not installed).")


In [None]:
# Optional reliability check: checkpoint save/load for BOCPD
split_idx = 300
checkpoint_detector = cpd.Bocpd(
    model="gaussian_nig",
    hazard=1.0 / 200.0,
    max_run_length=512,
    alert_policy={"threshold": 0.35, "cooldown": 6, "min_run_length": 10},
)
_ = checkpoint_detector.update_many(latency[:split_idx])
state = checkpoint_detector.save_state(format="json")

restored = cpd.Bocpd(
    model="gaussian_nig",
    hazard=1.0 / 200.0,
    max_run_length=512,
    alert_policy={"threshold": 0.35, "cooldown": 6, "min_run_length": 10},
)
restored.load_state(state, format="json")
restored_tail = restored.update_many(latency[split_idx:])
len(restored_tail), restored_tail[0].t if restored_tail else None
