In [None]:
"""
Generic anomaly detection for minute-level Oxygen readings.

Covers:
- Point anomalies (outliers)
- Collective anomalies (patterns / sequences)
- Contextual anomalies (conditional on hour-of-day)
- Sensor fault anomalies (stuck sensor, spikes/glitches, high noise)

Produces:
- Per-minute anomaly severity score in [0, 1]
"""

import numpy as np
import pandas as pd
from dataclasses import dataclass

import os
from pathlib import Path

In [None]:
# =========================
# 0. Configuration
# =========================

@dataclass
class AnomalyConfig:
    # Rolling windows (in minutes)
    roll_window_baseline: int = 60           # 1-hour baseline
    roll_window_collective: int = 120        # 2-hour window for collective anomalies
    roll_window_stuck: int = 60              # 1-hour rolling std for 'stuck'
    roll_window_noise: int = 60              # 1-hour rolling std for 'noise'

    # Point / contextual anomaly z-score thresholds for score scaling
    z_point_low: float = 2.0                 # start increasing score
    z_point_high: float = 5.0                # max severity for point anomalies
    z_ctx_low: float = 2.0
    z_ctx_high: float = 4.0

    # Collective anomaly thresholds (mean |z| over a window)
    collective_low: float = 1.0
    collective_high: float = 3.0

    # Sensor-fault parameters
    stuck_rel_std_factor: float = 0.1        # local std < 0.1 * typical std ⇒ stuck
    noise_factor: float = 2.0                # local std > 2 * typical std ⇒ noisy
    spike_z_threshold: float = 3.0           # how many std-devs for spikes

    # Small epsilon for numerical stability
    eps: float = 1e-6

In [None]:
# =========================
# 1. Loading & preparation
# =========================

def load_oxygen_data(csv_path: str) -> pd.DataFrame:
    """
    Load the raw oxygen CSV.
    Assumes columns: time, Oxygen[%sat], EquipmentUnit, SubUnit, System, Unit.
    """
    df = pd.read_csv(csv_path)
    # Parse time
    df["time"] = pd.to_datetime(df["time"], errors="coerce")
    return df


def prepare_sensor_frame(df_raw: pd.DataFrame) -> pd.DataFrame:
    """
    Prepare a generic sensor frame:

    - Drop rows with missing Oxygen.
    - Create a generic sensor_id that does NOT rely on tag semantics.
    - Keep only (time, sensor_id, value, basic time features).
    """
    df = df_raw.copy()

    # Keep only rows where Oxygen is present
    df = df[df['time'].notna() & df["Oxygen[%sat]"].notna()].copy()

    # Generic sensor id: we just use tags as opaque identifiers.
    # This will still generalize to new customers & tag values.
    df["sensor_id"] = (
        df["System"].astype(str)
        + "|"
        + df["EquipmentUnit"].astype(str)
        + "|"
        + df["SubUnit"].astype(str)
    )

    df.rename(columns={"Oxygen[%sat]": "oxygen"}, inplace=True)

    # Basic time features for contextual anomalies
    df["hour"] = df["time"].dt.hour
    df["dayofweek"] = df["time"].dt.dayofweek

    # Sort by sensor and time to make rolling operations valid
    df = df.sort_values(["sensor_id", "time"]).reset_index(drop=True)

    return df[["time", "sensor_id", "oxygen", "hour", "dayofweek"]]

In [None]:
# =========================
# 2. Helper: score scaling
# =========================

def squash_z(z: pd.Series, low: float, high: float) -> pd.Series:
    """
    Map |z| to [0, 1] linearly between `low` and `high`.

    - |z| <= low  => 0
    - |z| >= high => 1
    - between     => linear ramp
    """
    az = z.abs()
    return ((az - low) / (high - low)).clip(lower=0.0, upper=1.0)

In [None]:
# =========================
# 3. Baseline & residuals
# =========================

def add_baseline_and_point_scores(df: pd.DataFrame, cfg: AnomalyConfig) -> pd.DataFrame:
    """
    For each sensor_id:
    - Compute rolling baseline (mean + std) over `roll_window_baseline`.
    - Compute standardized residual z-score.
    - Compute point anomaly score from z-score.
    """
    df = df.copy()
    g = df.groupby("sensor_id", group_keys=False)

    # Rolling mean & std as local baseline
    df["roll_mean"] = g["oxygen"].transform(
        lambda s: s.rolling(
            window=cfg.roll_window_baseline,
            min_periods=cfg.roll_window_baseline // 2
        ).mean()
    )
    df["roll_std"] = g["oxygen"].transform(
        lambda s: s.rolling(
            window=cfg.roll_window_baseline,
            min_periods=cfg.roll_window_baseline // 2
        ).std()
    )

    # Standardized residual (z-score vs rolling baseline)
    df["z_global"] = (df["oxygen"] - df["roll_mean"]) / (df["roll_std"] + cfg.eps)

    # Point anomaly score
    df["score_point"] = squash_z(df["z_global"], cfg.z_point_low, cfg.z_point_high)

    return df

In [None]:
# =========================
# 4. Collective anomalies
# =========================

def add_collective_scores(df: pd.DataFrame, cfg: AnomalyConfig) -> pd.DataFrame:
    """
    Collective anomalies: sequences where residuals are consistently large.

    We use rolling mean(|z_global|) over a larger window, then scale.
    """
    df = df.copy()
    g = df.groupby("sensor_id", group_keys=False)

    df["roll_mean_abs_z"] = g["z_global"].transform(
        lambda s: s.abs().rolling(
            window=cfg.roll_window_collective,
            min_periods=cfg.roll_window_collective // 2
        ).mean()
    )

    df["score_collective"] = (
        (df["roll_mean_abs_z"] - cfg.collective_low)
        / (cfg.collective_high - cfg.collective_low)
    ).clip(lower=0.0, upper=1.0)

    return df

In [None]:
# =========================
# 5. Contextual anomalies
# =========================

def add_contextual_scores(df: pd.DataFrame, cfg: AnomalyConfig) -> pd.DataFrame:
    """
    Contextual anomalies: values unusual for their context (e.g. hour-of-day).

    Here we build a simple global "expected" oxygen per (hour) across all sensors
    and compute z-score vs that contextual expectation.
    """
    df = df.copy()

    # Contextual baseline: global per-hour stats
    ctx_stats = (
        df.groupby("hour")["oxygen"]
        .agg(["mean", "std"])
        .rename(columns={"mean": "ctx_mean_hour", "std": "ctx_std_hour"})
    )

    df = df.join(ctx_stats, on="hour")

    # Contextual z-score
    df["z_context"] = (df["oxygen"] - df["ctx_mean_hour"]) / (df["ctx_std_hour"] + cfg.eps)

    # Contextual anomaly score
    df["score_context"] = squash_z(df["z_context"], cfg.z_ctx_low, cfg.z_ctx_high)

    return df

In [None]:
# =========================
# 6. Sensor fault anomalies
# =========================

def add_sensor_fault_scores(df: pd.DataFrame, cfg: AnomalyConfig) -> pd.DataFrame:
    """
    Sensor fault anomalies include:
    - Stuck sensor: very low local variance compared to typical variance.
    - Spikes/glitches: sharp jumps and immediate reversals.
    - High noise: local variance much higher than typical.

    We compute three sub-scores and take their max as sensor_fault score.
    """
    df = df.copy()
    g = df.groupby("sensor_id", group_keys=False)

    # --- Overall typical stats per sensor for comparison ---
    sensor_std = g["oxygen"].transform("std")           # typical volatility
    sensor_diff_std = g["oxygen"].transform(lambda s: s.diff().std())

    # --- Rolling std over larger window: used for stuck & noise ---
    df["roll_std_long"] = g["oxygen"].transform(
        lambda s: s.rolling(
            window=cfg.roll_window_stuck,
            min_periods=cfg.roll_window_stuck // 2
        ).std()
    )

    # 6.1 Stuck sensor score
    # If local std is much smaller than typical std, we suspect stuck behaviour.
    # Score grows as roll_std_long / sensor_std goes towards 0.
    ratio_std = df["roll_std_long"] / (sensor_std + cfg.eps)
    # We want large score when ratio_std is very small (< stuck_rel_std_factor)
    df["score_stuck"] = (cfg.stuck_rel_std_factor - ratio_std) / cfg.stuck_rel_std_factor
    df["score_stuck"] = df["score_stuck"].clip(lower=0.0, upper=1.0)

    # 6.2 Spikes / glitches score
    # Look for a large jump followed by a reversal.
    diff_prev = g["oxygen"].diff()
    diff_next = -g["oxygen"].diff(-1)  # difference to next point, negated to compare sign
    spike_mag = np.minimum(diff_prev.abs(), diff_next.abs())

    # normalized spike magnitude
    spike_norm = spike_mag / (sensor_diff_std + cfg.eps)
    # candidate spikes: sign reversal + big magnitude
    candidate_spike = (diff_prev * diff_next < 0) & (spike_norm > cfg.spike_z_threshold)

    df["score_spike"] = 0.0
    df.loc[candidate_spike, "score_spike"] = (
        (spike_norm[candidate_spike] - cfg.spike_z_threshold) / cfg.spike_z_threshold
    ).clip(upper=1.0)

    # 6.3 High noise score
    # If local std is much higher than typical std, we suspect high noise.
    df["roll_std_noise"] = g["oxygen"].transform(
        lambda s: s.rolling(
            window=cfg.roll_window_noise,
            min_periods=cfg.roll_window_noise // 2
        ).std()
    )
    noise_ratio = df["roll_std_noise"] / (sensor_std + cfg.eps)
    df["score_noise"] = (noise_ratio - 1.0) / (cfg.noise_factor - 1.0)
    df["score_noise"] = df["score_noise"].clip(lower=0.0, upper=1.0)

    # Combine into a single sensor_fault score
    df["score_sensor_fault"] = df[["score_stuck", "score_spike", "score_noise"]].max(axis=1)

    return df

In [None]:
# =========================
# 7. Synthetic anomaly injection (optional, for illustration)
# =========================

def inject_synthetic_anomalies(
    df: pd.DataFrame,
    n_point_spikes: int = 20,
    n_collective_dips: int = 5,
    stuck_length: int = 60,
    noise_length: int = 60,
    random_state: int = 42,
) -> pd.DataFrame:
    """
    Injects synthetic anomalies into a copy of df for illustration and testing.

    - Random large spikes (point anomalies)
    - Random dips over short windows (collective)
    - Stuck segments (constant value)
    - High-noise segments

    Adds a column `synthetic_label` with one of:
    - 'normal'
    - 'syn_point_spike'
    - 'syn_collective_dip'
    - 'syn_stuck'
    - 'syn_high_noise'
    """
    rng = np.random.default_rng(random_state)
    df = df.copy()
    df["synthetic_label"] = "normal"

    # Work sensor-by-sensor to keep indexing simple
    for sensor_id, df_s in df.groupby("sensor_id"):
        idx = df_s.index.to_numpy()
        n = len(idx)
        if n < 1000:
            continue  # skip very short series

        # 7.1 Inject point spikes
        for _ in range(n_point_spikes):
            i = int(rng.integers(0, n))
            row_idx = idx[i]
            # Add a large positive or negative spike
            sign = rng.choice([-1, 1])
            magnitude = rng.uniform(20, 40)  # adjust as desired
            df.loc[row_idx, "oxygen"] += sign * magnitude
            df.loc[row_idx, "synthetic_label"] = "syn_point_spike"

        # 7.2 Inject collective dips (sequence anomalies)
        for _ in range(n_collective_dips):
            start = int(rng.integers(0, n - 120))
            length = rng.integers(30, 120)  # 30–120 minutes
            segment_idx = idx[start : start + length]
            df.loc[segment_idx, "oxygen"] *= rng.uniform(0.6, 0.8)  # dip
            df.loc[segment_idx, "synthetic_label"] = "syn_collective_dip"

        # 7.3 Inject stuck sensor segments
        # pick a start where we have enough space
        start = int(rng.integers(0, n - stuck_length))
        segment_idx = idx[start : start + stuck_length]
        stuck_value = float(df.loc[segment_idx[0], "oxygen"])
        df.loc[segment_idx, "oxygen"] = stuck_value
        df.loc[segment_idx, "synthetic_label"] = "syn_stuck"

        # 7.4 Inject high-noise segment
        start = int(rng.integers(0, n - noise_length))
        segment_idx = idx[start : start + noise_length]
        base = df.loc[segment_idx, "oxygen"]
        df.loc[segment_idx, "oxygen"] = base + rng.normal(
            0, base.std() * 2.0, size=len(segment_idx)
        )
        df.loc[segment_idx, "synthetic_label"] = "syn_high_noise"

    return df

In [None]:
# =========================
# 8. Combine all scores into severity
# =========================

def add_severity_score(df: pd.DataFrame) -> pd.DataFrame:
    """
    Combine all sub-scores into a single anomaly severity score in [0, 1].

    We take the max of:
    - point
    - collective
    - contextual
    - sensor_fault
    """
    df = df.copy()
    score_cols = [
        "score_point",
        "score_collective",
        "score_context",
        "score_sensor_fault",
    ]
    df["severity"] = df[score_cols].max(axis=1)
    return df

In [None]:
# =========================
# 9. End-to-end pipeline
# =========================


def run_anomaly_pipeline(csv_path: str,
                         cfg: AnomalyConfig = AnomalyConfig(),
                         inject_synthetic: bool = False) -> pd.DataFrame:
    """
    Full pipeline:
    - Load raw data
    - Prepare generic sensor frame
    - (Optionally) inject synthetic anomalies
    - Compute all anomaly scores
    - Return scored DataFrame with 1 row per minute per sensor
    """
    # 1) Load
    df_raw = load_oxygen_data(csv_path)

    # 2) Prepare
    df = prepare_sensor_frame(df_raw)

    # 3) Inject synthetic anomalies (for illustration / testing)
    if inject_synthetic:
        df = inject_synthetic_anomalies(df)

    # 4) Add anomaly scores
    df = add_baseline_and_point_scores(df, cfg)
    df = add_collective_scores(df, cfg)
    df = add_contextual_scores(df, cfg)
    df = add_sensor_fault_scores(df, cfg)
    df = add_severity_score(df)

    return df

In [None]:
DATA_DIR = os.path.abspath(os.path.join(os.getcwd(), "../" , "data"))
DATA_RAW_DIR = Path(os.path.join(DATA_DIR, "raw"))
# Path to the dataset (adjust this if needed)
input_csv = Path(os.path.join(DATA_RAW_DIR, "oxygen.csv"))

In [None]:
config = AnomalyConfig(
    roll_window_baseline=60,
    roll_window_collective=120,
    roll_window_stuck=60,
    roll_window_noise=60,
)

scored = run_anomaly_pipeline(input_csv, cfg=config, inject_synthetic=True)

In [None]:
scored

In [None]:
# scored.to_csv("scored_v2.csv")

In [None]:
# # =========================
# # 10. Example usage
# # =========================

# if __name__ == "__main__":
#     # Example: run on the provided oxygen.csv
#     # (adjust path as needed)
#     input_csv = "oxygen.csv"   # e.g. "/mnt/data/oxygen.csv"

#     config = AnomalyConfig(
#         roll_window_baseline=60,
#         roll_window_collective=120,
#         roll_window_stuck=60,
#         roll_window_noise=60,
#     )

#     scored = run_anomaly_pipeline(input_csv, cfg=config, inject_synthetic=True)

#     # Peek at the result
#     print(scored[[
#         "time", "sensor_id", "oxygen",
#         "score_point", "score_collective",
#         "score_context", "score_sensor_fault",
#         "severity"
#     ]].head())