In [None]:
##### ANOMALY DETECTION WITH DIFFUSION #####
#
#

In [None]:
# -----------------------------
# Config - Path / Parameters
# -----------------------------
import os, json, time, gc, tracemalloc, warnings, logging
import numpy as np, pandas as pd, torch
from diffusers import DDPMScheduler, UNet1DModel

warnings.filterwarnings("ignore", category=FutureWarning, module="diffusers")
logging.getLogger("diffusers").setLevel(logging.ERROR)

BASE = "/content/drive/MyDrive/Paper02_14Datasets"
MERGED_DIR = f"{BASE}/MERGED"
OUT_DIR = f"{BASE}/ANOMALY_DIFFUSION_RESIDUALSPECTRAL"
SUMMARY_DIR = f"{OUT_DIR}/Percentiles_Summary"
os.makedirs(OUT_DIR, exist_ok=True)
os.makedirs(SUMMARY_DIR, exist_ok=True)

TIME_COL, VAL_COL, FREQ = "timestamp", "active_power", "15min"
WIN = 24 * 4  # 96 points (15-min grid per day)
STEPS, LR, TSTEPS, BATCH, T_FIX = 600, 1e-4, 1000, 32, 500
THR_PCT = 70
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
torch.manual_seed(42)
np.random.seed(42)

RESIDENCES = [
    "AMPds2_House01",
    "GREEND_House00", "GREEND_House01", "GREEND_House03",
    "UKDALE_House01", "UKDALE_House02", "UKDALE_House05",
    "REFIT_House01", "REFIT_House02", "REFIT_House03", "REFIT_House05",
    "REFIT_House07", "REFIT_House09", "REFIT_House15"
    ]
APPLIANCES = ["Fridge", "WashingMachine", "Dishwasher"]
ANOMALIES = [
    "StepChange",
    "MultiStepChange",
    "Repeating",
    "Mirror",
    "StuckMAX",
    "StuckMIN",
    "PowerCycling",
]

# -----------------------------
# HELPERS
# -----------------------------
# Loads CSV, cleans timestamps, resamples to 15-minutes
def load_series_15T(csv_path):
    df = pd.read_csv(csv_path, dtype={TIME_COL: str})
    df[TIME_COL] = pd.to_datetime(df[TIME_COL].str.strip(), errors="coerce")
    df = df.dropna(subset=[TIME_COL])
    s = (
        pd.to_numeric(df[VAL_COL], errors="coerce")
        .rename("v")
        .to_frame()
        .set_index(df[TIME_COL])
        .sort_index()["v"]
    )
    return s.resample(FREQ).interpolate("time").ffill().bfill()


# Splits series into complete 96-point days (96x15minutes = 24hours)
def daily_windows(series):
    if series.empty:
        return np.empty((0, WIN), "float32"), pd.Index([]), pd.Index([])
    start = (
        series.index[0].normalize()
        if series.index[0].time() == pd.Timestamp.min.time()
        else (series.index[0] + pd.Timedelta(days=1)).normalize()
    )
    days = pd.date_range(start, series.index[-1].normalize(), freq="D")
    X, S, E = [], [], []
    for d in days:
        idx = pd.date_range(d, d + pd.Timedelta(hours=23, minutes=45), freq="15min")
        seg = series.reindex(idx).interpolate("time").ffill().bfill()
        if len(seg) == WIN:
            X.append(seg.values.astype("float32"))
            S.append(idx[0])
            E.append(idx[-1])
    if not X:
        return np.empty((0, WIN), "float32"), pd.Index([]), pd.Index([])
    return np.stack(X), pd.Index(S), pd.Index(E)

# Do z-standardization
def standardize(Xtr, Xte):
    mu, sd = Xtr.mean(0, keepdims=True), Xtr.std(0, keepdims=True) + 1e-12
    return (Xtr - mu) / sd, (Xte - mu) / sd, mu, sd

# Build a simple architecture using the libraries: UNet1DModel with a DDPMScheduler
def build_model():
    net = UNet1DModel(
        sample_size=WIN,
        in_channels=1,
        out_channels=1,
        layers_per_block=2,
        block_out_channels=(64,),
        down_block_types=("DownBlock1D",),
        up_block_types=("UpBlock1D",),
    ).to(DEVICE)
    sch = DDPMScheduler(num_train_timesteps=TSTEPS)
    opt = torch.optim.AdamW(net.parameters(), lr=LR)
    return net, sch, opt

# Robust z-standardization is z-scaling that ignores outliers.
def robust_z(x):
    x = np.asarray(x, "float32")
    med = np.median(x)
    mad = np.median(np.abs(x - med)) + 1e-12
    return (x - med) / (1.4826 * mad)


# Determines the Reconstruction Error
# Adds fixed diffusion noise and denoises once
# Reconstructs clean signal estimate
# Computes time-domain MAE and frequency-domain MAE
# Returns errors the MAE's and original signal variance
@torch.no_grad()
def recon_err(net, sch, Xt):
    if Xt.shape[0] == 0:
        Z = np.array([], "float32")
        return Z, Z, Z
    B = Xt.shape[0]
    tvec = torch.full((B,), T_FIX, dtype=torch.int64, device=DEVICE)
    ts = torch.tensor(T_FIX, dtype=torch.int64, device=DEVICE)
    noisy = sch.add_noise(Xt, torch.randn_like(Xt), tvec)
    eps = net(noisy, tvec).sample
    out = sch.step(eps, ts, noisy)
    if getattr(out, "pred_original_sample", None) is not None:
        x0 = out.pred_original_sample
    else:
        ac = sch.alphas_cumprod.to(Xt.device)
        at = ac[ts]
        x0 = (noisy - torch.sqrt(1 - at).view(1, 1, 1) * eps) / (
            torch.sqrt(at).view(1, 1, 1) + 1e-12
        )
    mae_t = torch.mean(torch.abs(Xt - x0), dim=(1, 2))
    def _logmag(u): return torch.log1p(torch.abs(torch.fft.rfft(u, dim=-1)))
    mae_f = torch.mean(torch.abs(_logmag(Xt) - _logmag(x0)), dim=(1, 2))
    var_t = torch.var(Xt, dim=(1, 2), unbiased=False)
    return mae_t.cpu().numpy(), mae_f.cpu().numpy(), var_t.cpu().numpy()

# It writes window-level anomaly predictions back to 15-minute timestamps and saves them.
# Recall, we had separated into merged 96-points of 15-minutes each to a day window
def write_preds_15m(out_path, series_15m, starts, ends, pred_labels, original_df):
    original_df = original_df.copy()
    original_df[TIME_COL] = pd.to_datetime(original_df[TIME_COL], errors="coerce")
    original_df = original_df.dropna(subset=[TIME_COL]).sort_values(TIME_COL)
    gt_cols = [
        c
        for c in original_df.columns
        if c.lower() in ["ground_truth_anomaly", "ground_truth_appliance"]
    ]
    original_gt = (
        original_df[[TIME_COL] + gt_cols].drop_duplicates(subset=[TIME_COL])
        if gt_cols
        else original_df[[TIME_COL]].copy()
    )
    frames = []
    for i in range(len(starts)):
        idx = pd.date_range(starts[i], ends[i], freq="15min")
        seg = series_15m.reindex(idx).interpolate("time").ffill().bfill()
        seg_df = pd.DataFrame({TIME_COL: seg.index, VAL_COL: seg.values})
        seg_df = seg_df.merge(original_gt, how="left", on=TIME_COL)
        seg_df["prediction_anomaly"] = pred_labels[i]
        frames.append(seg_df)
    out_df = (
        pd.concat(frames, ignore_index=True)
        if frames
        else pd.DataFrame(
            columns=[
                TIME_COL,
                VAL_COL,
                "ground_truth_anomaly",
                "ground_truth_appliance",
                "prediction_anomaly",
            ]
        )
    )
    for c in ["ground_truth_anomaly", "ground_truth_appliance"]:
        if c not in out_df.columns:
            out_df[c] = np.nan
    out_df = out_df[
        [TIME_COL, VAL_COL, "ground_truth_anomaly", "ground_truth_appliance", "prediction_anomaly"]
    ]
    os.makedirs(os.path.dirname(out_path), exist_ok=True)
    out_df.to_csv(out_path, index=False)
    return out_df

# Determine the following metrics: Accuracy/Precision/Recall/F1-Score/Normal_pct/Anomaly_pct/Total/TP/TN/FP/FN/ActualNormal/ActualAnomaly
def metrics_from_rows(df):
    cols = df.columns
    if ("ground_truth_anomaly" not in cols) or ("prediction_anomaly" not in cols):
        return dict(Accuracy=np.nan, Precision=np.nan, Recall=np.nan, F1_Score=np.nan,
                    Normal_pct=np.nan, Anomaly_pct=np.nan, Total=0,
                    TP=0, TN=0, FP=0, FN=0, ActualNormal=0, ActualAnomaly=0)
    def gt_flag(x):
        if pd.isna(x): return np.nan
        s = str(x).lower()
        if "anom" in s or s in ["1", "true", "t", "yes", "y"]: return 1
        if s in ["0", "false", "f", "no", "n", "normal", "norm"]: return 0
        return np.nan
    df = df.copy()
    df["gt"] = df["ground_truth_anomaly"].map(gt_flag).astype("float32")
    df["pd"] = df["prediction_anomaly"].map(lambda x: 1 if str(x).lower() == "anomaly" else 0).astype("float32")
    df = df.dropna(subset=["gt"])
    if len(df) == 0:
        return dict(Accuracy=np.nan, Precision=np.nan, Recall=np.nan, F1_Score=np.nan,
                    Normal_pct=np.nan, Anomaly_pct=np.nan, Total=0,
                    TP=0, TN=0, FP=0, FN=0, ActualNormal=0, ActualAnomaly=0)
    gt, pdv = df["gt"].values.astype(int), df["pd"].values.astype(int)
    TP = int(((gt == 1) & (pdv == 1)).sum())
    TN = int(((gt == 0) & (pdv == 0)).sum())
    FP = int(((gt == 0) & (pdv == 1)).sum())
    FN = int(((gt == 1) & (pdv == 0)).sum())
    tot = len(gt)
    actN, actA = int((gt == 0).sum()), int((gt == 1).sum())
    acc = (TP + TN) / tot if tot > 0 else np.nan
    prec = TP / (TP + FP) if (TP + FP) > 0 else np.nan
    rec = TP / (TP + FN) if (TP + FN) > 0 else np.nan
    f1 = 2 * prec * rec / (prec + rec) if prec and rec else np.nan
    norm_pct = (TN / actN) * 100 if actN > 0 else np.nan
    anom_pct = (TP / actA) * 100 if actA > 0 else np.nan
    return dict(Accuracy=acc, Precision=prec, Recall=rec, F1_Score=f1,
                Normal_pct=norm_pct, Anomaly_pct=anom_pct, Total=tot,
                TP=TP, TN=TN, FP=FP, FN=FN,
                ActualNormal=actN, ActualAnomaly=actA)

# Save into summary file
def append_summary(res, row):
    path = f"{SUMMARY_DIR}/{res}_ANOMALY_DIFFUSION_OUTLINE.csv"
    pd.DataFrame([row]).to_csv(path, mode="a", header=not os.path.exists(path), index=False)

# -----------------------------
# TRAINING PROCESS
# -----------------------------
def train_residence(res):
    # Get the data - 15-minutes, windows, 80%, and standardize
    train_csv = f"{MERGED_DIR}/{res}_Fridge_15minutes_StepChange_MERGED.csv"
    if not os.path.exists(train_csv): raise FileNotFoundError(train_csv)
    tracemalloc.start(); t0 = time.time()
    s_tr = load_series_15T(train_csv)
    X, _, _ = daily_windows(s_tr)
    n, ntr = len(X), int(0.8 * len(X))
    if not (0 < ntr < n): raise ValueError(f"{res}: need >=2 full-day windows (got {n}).")
    Xtr, Xte = X[:ntr], X[ntr:]
    Tr, Te, mu, sd = standardize(Xtr, Xte)

    # Build the model
    net, sch, opt = build_model()
    Tr_t = torch.tensor(Tr, dtype=torch.float32, device=DEVICE).unsqueeze(1)

    # Training the Model
    for step in range(STEPS):
        if Tr_t.shape[0] == 0: break
        idx = torch.randint(0, Tr_t.shape[0], (min(BATCH, Tr_t.shape[0]),), device=DEVICE)
        x = Tr_t.index_select(0, idx)
        t = torch.randint(0, TSTEPS, (x.shape[0],), dtype=torch.int64, device=DEVICE)
        n = torch.randn_like(x)
        noisy = sch.add_noise(x, n, t)
        loss = torch.mean((net(noisy, t).sample - n) ** 2)
        opt.zero_grad(); loss.backward(); opt.step()
        if (step + 1) % 100 == 0:
            print(f"[{res}] Step {step+1}/{STEPS}, Loss = {loss.item():.6f}")
    with torch.no_grad():
        tr_t, tr_f, tr_v = recon_err(net, sch, Tr_t)

    # Find the thresholds
    zt, zf = robust_z(tr_t), robust_z(tr_f)
    tr_score = 0.5 * (zt + zf)
    thr = float(np.percentile(tr_score, THR_PCT))
    var_thr = float(np.percentile(tr_v, 1.0))
    train_time = time.time() - t0
    _, peak = tracemalloc.get_traced_memory()
    tracemalloc.stop()
    train_peak_mb = peak / (1024 * 1024)
    with open(f"{OUT_DIR}/training_metadata_{res}.json", "w") as f:
        json.dump(dict(residence=res, threshold=thr, var_threshold=var_thr,
                       train_peak_mb=train_peak_mb, train_time=train_time), f, indent=2)
    return net, sch, mu, sd, thr, var_thr, train_time, train_peak_mb

# -----------------------------
# INFERENCE PROCESS
# -----------------------------
def test_file(csv_path, res, app, anom, thr, var_thr, mu, sd, net, sch, t_train, p_train):
    # Get the data - 15-minutes, windows, and standardize
    tracemalloc.start(); t1 = time.time()
    s = load_series_15T(csv_path)
    X, starts, ends = daily_windows(s)
    Xn = ((X - mu) / sd).astype("float32") if len(X) > 0 else X
    Xt = torch.tensor(Xn, dtype=torch.float32, device=DEVICE).unsqueeze(1) if len(X) > 0 else torch.zeros((0, 1, WIN), device=DEVICE)

    # Determine Reconstruction Error
    with torch.no_grad():
        te_t, te_f, te_v = recon_err(net, sch, Xt)

    # Calculate the score and determine if it is an anomaly
    score = 0.5 * (robust_z(te_t) + robust_z(te_f))
    is_anom = (score > thr) | (te_v < var_thr)
    preds = np.where(is_anom, "Anomaly", "Normal")

    # Print to files
    out_path = f"{OUT_DIR}/{res}_{app}_15minutes_{anom}_MERGED_DIFFUSION_RESIDUALSPECTRAL.csv"
    per15 = write_preds_15m(out_path, s, starts, ends, preds, pd.read_csv(csv_path))
    m = metrics_from_rows(per15)
    inf_time = time.time() - t1
    _, peak = tracemalloc.get_traced_memory()
    tracemalloc.stop()
    inf_peak_mb = peak / (1024 * 1024)
    row = dict(Residence=res, Appliance=app, AnomalyType=anom,
               ThresholdPct=THR_PCT, ThresholdValue=thr, VarThreshold=var_thr,
               Windows=len(preds), TrainingTimeSec=round(t_train, 6),
               InferenceTimeSec=round(inf_time, 6), TrainPeakMB=round(p_train, 3),
               InferencePeakMB=round(inf_peak_mb, 3), **m)
    return row

# Do testing per file (appliance/anomaly)
def test_residence(res, net, sch, mu, sd, thr, var_thr, t_train, p_train):
    for app in APPLIANCES:
        for anom in ANOMALIES:
            csv = f"{MERGED_DIR}/{res}_{app}_15minutes_{anom}_MERGED.csv"
            if not os.path.exists(csv):
                continue
            try:
                row = test_file(csv, res, app, anom, thr, var_thr, mu, sd, net, sch, t_train, p_train)
            except Exception as e:
                row = dict(Residence=res, Appliance=app, AnomalyType=anom, Error=str(e))
            append_summary(res, row)


# ---------------- MAIN ----------------
if __name__ == "__main__":
    for res in RESIDENCES:
        print(f"\n===== {res}: train & test =====")
        try:
            net, sch, mu, sd, thr, var_thr, t_train, p_train = train_residence(res)
            test_residence(res, net, sch, mu, sd, thr, var_thr, t_train, p_train)
        except Exception as e:
            append_summary(res, dict(Residence=res, Error=f"TRAIN ERROR: {e}"))
        finally:
            try:
                del net, sch
            except:
                pass
            gc.collect()
            if torch.cuda.is_available():
                torch.cuda.empty_cache()

    print("\nDone. 15-minute predictions saved, summaries in Percentiles_Summary/")
