# Notebook 04: Streaming Monitor (Real‑Time Simulation)

This notebook simulates real‑time anomaly detection:

1. Load (or simulate) temperature time series data with ground truth.
2. Maintain a rolling buffer of recent points.
3. Apply multiple detectors at each step:
- Z‑score rule
- Rolling slope (drift)
- Rolling variance (jitter)
- Isolation Forest (windowed patterns)
4. Combine them (OR rule) to raise alerts.
5. Evaluate precision/recall/F1 and plot results.


In [None]:

import os, sys, json, time
from collections import deque

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

PROJECT_ROOT = os.path.abspath(os.path.join(os.getcwd(), ".."))
if PROJECT_ROOT not in sys.path:
    sys.path.insert(0, PROJECT_ROOT)

from sklearn.ensemble import IsolationForest
from sklearn.metrics import precision_recall_fscore_support

from src.sim import simulate_temperature_series
from src.features import make_windowed_matrix

plt.rcParams['figure.figsize'] = (12, 4)


In [None]:

CONF = {
    "buffer_len": 200,
    "z_thresh": 4.0,
    "if_window": 50,
    "if_contamination": 0.03,
    "if_n_estimators": 200,
    "jitter_window": 80,
    "jitter_mult": 4.0
}

try:
    with open("../data/processed/best_params.json", "r") as f:
        best = json.load(f)
        CONF["if_window"] = int(best["iforest"]["window"])
        CONF["if_contamination"] = float(best["iforest"]["contam"])
        CONF["if_n_estimators"] = int(best["iforest"]["n_estimators"])
        CONF["z_thresh"] = float(best["rule"]["thresh"])
        CONF["buffer_len"] = max(CONF["buffer_len"], int(best["rule"]["window"]))
        print("Loaded tuned params from best_params.json")
except FileNotFoundError:
    print("No tuned params found, using defaults")

CONF


In [None]:

USE_SAVED = False

if USE_SAVED:
    df = pd.read_csv("../data/processed/simulated_with_flags.csv")
else:
    df = simulate_temperature_series(n=5000, base_temp=100.0, noise_sigma=0.2, seed=42)

y_true = df["is_anomaly"].astype(int).to_numpy()

def stream_rows(frame):
    for row in frame.itertuples(index=False):
        yield row.t, row.temp


In [None]:

def slope_over_last(x):
    t = np.arange(len(x), dtype=float)
    denom = np.var(t)
    if denom == 0:
        return 0.0
    return np.mean((t - t.mean()) * (x - x.mean())) / denom

def robust_med_mad(values):
    v = values[np.isfinite(values)]
    if v.size == 0:
        return np.nan, np.nan
    med = np.median(v)
    mad = np.median(np.abs(v - med))
    return med, mad


In [None]:

BUF = deque(maxlen=CONF["buffer_len"])
times, temps = [], []

flags_rule, flags_slope, flags_jitter, flags_iforest, flags_final = [], [], [], [], []

warmup = max(CONF["buffer_len"], 1000)
warm_df = df.iloc[:warmup]

X_warm = make_windowed_matrix(warm_df["temp"].to_numpy(), window=CONF["if_window"])
iforest = None
if X_warm.shape[0] > 0:
    iforest = IsolationForest(
        n_estimators=CONF["if_n_estimators"],
        contamination=CONF["if_contamination"],
        random_state=42
    ).fit(X_warm)

roll_std = pd.Series(warm_df["temp"]).rolling(CONF["jitter_window"]).std(ddof=0).to_numpy()
med_std, mad_std = robust_med_mad(roll_std)
tau_jitter = med_std + CONF["jitter_mult"] * mad_std

tau_jitter


In [None]:

START_AT = warmup

for t, x in stream_rows(df):
    BUF.append(x)
    times.append(t)
    temps.append(x)
    
    r = s = j = f = 0
    
    if len(BUF) >= CONF["buffer_len"] and t >= START_AT:
        arr = np.array(BUF)
        
        mean, std = arr.mean(), arr.std() + 1e-8
        z = (arr[-1] - mean) / std
        r = int(abs(z) >= CONF["z_thresh"])
        
        slope = slope_over_last(arr)
        slope_tau = 3.0 * (std / len(arr))
        s = int(abs(slope) >= slope_tau)
        
        jw = min(CONF["jitter_window"], len(arr))
        local_std = pd.Series(arr).rolling(jw).std(ddof=0).to_numpy()[-1]
        j = int(local_std >= tau_jitter)
        
        if iforest and CONF["if_window"] <= len(arr):
            win = CONF["if_window"]
            cur = arr[-win:]
            pred = iforest.predict(cur.reshape(1, -1))[0]
            f = int(pred == -1)
    
    final = int(r or s or j or f)
    
    flags_rule.append(r)
    flags_slope.append(s)
    flags_jitter.append(j)
    flags_iforest.append(f)
    flags_final.append(final)
    
print("Streaming finished.")


In [None]:

def prf1(y_true, y_pred):
    pr, rc, f1, _ = precision_recall_fscore_support(y_true, y_pred, average="binary", zero_division=0)
    return pr, rc, f1

y_true_np = y_true[:len(flags_final)]
results = {
    "rule": prf1(y_true_np, np.array(flags_rule)),
    "slope": prf1(y_true_np, np.array(flags_slope)),
    "jitter": prf1(y_true_np, np.array(flags_jitter)),
    "iforest": prf1(y_true_np, np.array(flags_iforest)),
    "final": prf1(y_true_np, np.array(flags_final))
}
results


In [None]:

ts, ys = np.array(times), np.array(temps)

plt.figure()
plt.plot(ts, ys, label="temp")
plt.scatter(ts[np.array(flags_final)==1], ys[np.array(flags_final)==1], s=10, label="final flags")
plt.legend(); plt.show()


In [None]:

out = pd.DataFrame({
    "t": ts,
    "temp": ys,
    "flag_rule": flags_rule,
    "flag_slope": flags_slope,
    "flag_jitter": flags_jitter,
    "flag_iforest": flags_iforest,
    "flag_final": flags_final,
    "is_anomaly": y_true_np
})
os.makedirs("../data/processed", exist_ok=True)
csv_path = "../data/processed/streaming_results.csv"
out.to_csv(csv_path, index=False)
csv_path
