In [32]:
import os, sys, json, time, uuid, glob, math, warnings
from pathlib import Path

warnings.filterwarnings("ignore")


REPO_ROOT         = Path(r"C:\engine_module_pipeline")
INFER_STAGE_ROOT  = REPO_ROOT / "infer_stage"

DELTA_BASE        = INFER_STAGE_ROOT / "delta"
ARTIFACTS_DIR     = INFER_STAGE_ROOT / "artifacts"
CHECKPOINTS_DIR   = INFER_STAGE_ROOT / "checkpoints"
DLQ_DIR           = INFER_STAGE_ROOT / "dlq"
LOGS_DIR          = INFER_STAGE_ROOT / "logs"

for p in (DELTA_BASE, ARTIFACTS_DIR, CHECKPOINTS_DIR, DLQ_DIR, LOGS_DIR):
    p.mkdir(parents=True, exist_ok=True)


INFER_READY             = DELTA_BASE / "engine_module_infer_ready"        # reader
INFER_RESULTS_DELTA     = DELTA_BASE / "engine_module_inference_results"  # writer 
ALERTS_DELTA            = DELTA_BASE / "engine_module_alerts"
LSTM_WIN_DELTA          = DELTA_BASE / "engine_module_lstm_windows"
MODEL_META_DELTA        = DELTA_BASE / "engine_module_model_metadata"
VEH_HEALTH_DELTA        = DELTA_BASE / "vehicle_health_summary"

# ---- Artifacts / contract files ----
CONTRACT_JSON           = ARTIFACTS_DIR / "model_input_contract.json"
SCALER_FNAME            = "scaler_robust.joblib"
DENSE_TS_FNAME          = "model_dense_best_torchscript.pt"
LSTM_SD_FNAME           = "model_lstm_long_best.pt"             
LSTM_TS_FNAME           = "model_lstm_long_best_torchscript.pt" 
ISOF_FNAME              = "isolation_forest_combiner_final.joblib"
KDE_PREFIX              = "kde_"
GMM_PREFIX              = "gmm_"


USE_MLFLOW = False  
print("Paths set. Using MLflow?", USE_MLFLOW)


Paths set. Using MLflow? False


In [33]:
import subprocess
from pathlib import Path

print("JAVA_HOME:", os.environ.get("JAVA_HOME"))
java_path = Path(os.environ.get("JAVA_HOME",""))/"bin/java.exe"
print("java exe exists:", java_path, java_path.exists())
print("PATH[0..2]:", os.environ.get("PATH","").split(os.pathsep)[:3])

try:
    out = subprocess.run([str(java_path), "-version"], capture_output=True, text=True)
    print("java -version OK")
    print(out.stderr or out.stdout)
except Exception as e:
    print("java spawn FAILED:", repr(e))

delta_jar = REPO_ROOT / r"jars\delta-spark_2.12-3.2.0.jar"
storage_jar = REPO_ROOT / r"jars\delta-storage-3.2.0.jar"
print("JARs:", [str(delta_jar), str(storage_jar)])
print(delta_jar, "exists:", delta_jar.exists())
print(storage_jar, "exists:", storage_jar.exists())


JAVA_HOME: C:\jdk-11.0.28+6
java exe exists: C:\jdk-11.0.28+6\bin\java.exe True
PATH[0..2]: ['C:\\jdk-11.0.28+6\\bin', 'C:\\jdk-11.0.28+6\\bin', 'C:\\engine_module_pipeline\\.venv-spark\\Scripts']
java -version OK
openjdk version "11.0.28" 2025-07-15
OpenJDK Runtime Environment Temurin-11.0.28+6 (build 11.0.28+6)
OpenJDK 64-Bit Server VM Temurin-11.0.28+6 (build 11.0.28+6, mixed mode)

JARs: ['C:\\engine_module_pipeline\\jars\\delta-spark_2.12-3.2.0.jar', 'C:\\engine_module_pipeline\\jars\\delta-storage-3.2.0.jar']
C:\engine_module_pipeline\jars\delta-spark_2.12-3.2.0.jar exists: True
C:\engine_module_pipeline\jars\delta-storage-3.2.0.jar exists: True


In [34]:
import pyspark as _ps
from pathlib import Path
import os, shutil

PYSPARK_DIR = Path(_ps.__file__).resolve().parent
SPARK_HOME  = PYSPARK_DIR
SPARK_SUBMIT = SPARK_HOME / "bin" / "spark-submit.cmd"
assert SPARK_SUBMIT.exists(), f"{SPARK_SUBMIT} missing. Reinstall pyspark==3.5.1"

JAVA_HOME = os.environ.get("JAVA_HOME", r"C:\jdk-11.0.28+6")
java_exe  = Path(JAVA_HOME) / "bin" / "java.exe"
assert java_exe.exists(), f"java.exe not found under JAVA_HOME={JAVA_HOME}"
os.environ["JAVA_HOME"] = JAVA_HOME
os.environ["PATH"] = str(java_exe.parent) + os.pathsep + os.environ.get("PATH","")

os.environ["SPARK_HOME"] = str(SPARK_HOME)
DELTA_JAR   = REPO_ROOT / r"jars\delta-spark_2.12-3.2.0.jar"
STORAGE_JAR = REPO_ROOT / r"jars\delta-storage-3.2.0.jar"
for p in (DELTA_JAR, STORAGE_JAR):
    assert p.exists(), f"Missing JAR: {p}"
classpath = f"{DELTA_JAR};{STORAGE_JAR}"
os.environ["CLASSPATH"] = classpath

from pyspark.sql import SparkSession
builder = (
    SparkSession.builder
    .appName("engine_infer_stream_local_delta")
    .master("local[2]")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.jars", f"{DELTA_JAR},{STORAGE_JAR}")
    .config("spark.driver.extraClassPath", classpath)
    .config("spark.executor.extraClassPath", classpath)
    .config("spark.sql.shuffle.partitions", "8")
    .config("spark.driver.memory", "4g")
)
spark = builder.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
print("Spark version:", spark.version)

# Probe Delta IO
probe_path = (DELTA_BASE / "__probe_delta").as_posix()
shutil.rmtree(Path(probe_path), ignore_errors=True)
spark.range(1).write.format("delta").mode("overwrite").save(probe_path)
spark.read.format("delta").load(probe_path).show()
print("[OK] Delta IO verified.")


Spark version: 3.5.1
+---+
| id|
+---+
|  0|
+---+

[OK] Delta IO verified.


In [35]:
import json

assert CONTRACT_JSON.exists(), f"Missing model input contract: {CONTRACT_JSON}"
contract = json.loads(CONTRACT_JSON.read_text(encoding="utf-8"))

features = contract["features_canonical_order"]
assert isinstance(features, list) and len(features) == 25, f"Expected 25 canonical features in contract, got {len(features)}"
timestamp_column = contract.get("timestamp_column", "timestamp")

SAN_COLON = contract.get("per_feature_filename_rule", {}).get("colon", "__COLON__")
def sanitize_for_filename(feat: str) -> str:
    return feat.replace(":", SAN_COLON)

print("Loaded contract.")
print("First 5 features in-order:", features[:5])
print("SAN_COLON token:", SAN_COLON)


Loaded contract.
First 5 features in-order: ['Air_Fuel_Ratio_Commanded_:1', 'Air_Fuel_Ratio_Measured_:1', 'Catalyst_Temperature__Bank_1_Sensor_1', 'Catalyst_Temperature__Bank_1_Sensor_2', 'Engine_kW__At_the_wheels_kW']
SAN_COLON token: __COLON__


In [50]:
from pathlib import Path
import os, json

ARTIFACTS_DIR = REPO_ROOT / r"infer_stage\artifacts"   
assert ARTIFACTS_DIR.exists(), f"Artifacts dir missing: {ARTIFACTS_DIR}"

FEATURES_FNAME = "features.json"
SCALER_FNAME   = "scaler_robust.joblib"
DENSE_TS       = "model_dense_best_torchscript.pt"
DENSE_STATE    = "model_dense_best_state_dict.pt"
LSTM_TS        = "model_lstm_long_best_torchscript.pt"  
LSTM_STATE     = "model_lstm_long_best.pt"
ISO_JOBLIB     = "isolation_forest_combiner_final.joblib"
KDE_PREFIX     = "kde_"
GMM_PREFIX     = "gmm_"

def _safe_local(name: str|None):
    if not name: return None
    p = ARTIFACTS_DIR / name
    return str(p) if p.exists() else None

ARTS = {
    "features": _safe_local(FEATURES_FNAME),
    "scaler":   _safe_local(SCALER_FNAME),
    "dense_ts": _safe_local(DENSE_TS),
    "dense_sd": _safe_local(DENSE_STATE),
    "lstm_ts":  _safe_local(LSTM_TS),
    "lstm_sd":  _safe_local(LSTM_STATE),
    "isof":     _safe_local(ISO_JOBLIB),
    "kde_dir":  str(ARTIFACTS_DIR),
    "gmm_dir":  str(ARTIFACTS_DIR),
    "kde_prefix": KDE_PREFIX,
    "gmm_prefix": GMM_PREFIX,
    "sanitizer": {"colon": "__COLON__"},
}

missing_required = [k for k in ("features","scaler","dense_ts","lstm_sd","isof") if not ARTS[k]]
if missing_required:
    raise FileNotFoundError(f"Required artifacts not found: {missing_required}\nARTS={ARTS}")

SCALER_PATH   = ARTS["scaler"]
DENSE_TS_PATH = ARTS["dense_ts"]
DENSE_SD_PATH = ARTS["dense_sd"]
LSTM_TS_PATH  = ARTS["lstm_ts"]
LSTM_SD_PATH  = ARTS["lstm_sd"]
ISOF_PATH     = ARTS["isof"]
KDE_DIR       = Path(ARTS["kde_dir"])
GMM_DIR       = Path(ARTS["gmm_dir"])
KDE_PREFIX    = ARTS["kde_prefix"]
GMM_PREFIX    = ARTS["gmm_prefix"]
SAN_RULE      = ARTS["sanitizer"]  


for k in ("features","scaler","dense_ts","dense_sd","lstm_ts","lstm_sd","isof"):
    p = ARTS.get(k)
    if p and os.path.exists(p):
        try:
            spark.sparkContext.addFile(p)
        except Exception as e:
            print(f"[WARN] addFile failed for {p}: {e}")

print("ARTS manifest:", ARTS)


ARTS manifest: {'features': 'C:\\engine_module_pipeline\\infer_stage\\artifacts\\features.json', 'scaler': 'C:\\engine_module_pipeline\\infer_stage\\artifacts\\scaler_robust.joblib', 'dense_ts': 'C:\\engine_module_pipeline\\infer_stage\\artifacts\\model_dense_best_torchscript.pt', 'dense_sd': 'C:\\engine_module_pipeline\\infer_stage\\artifacts\\model_dense_best_state_dict.pt', 'lstm_ts': 'C:\\engine_module_pipeline\\infer_stage\\artifacts\\model_lstm_long_best_torchscript.pt', 'lstm_sd': 'C:\\engine_module_pipeline\\infer_stage\\artifacts\\model_lstm_long_best.pt', 'isof': 'C:\\engine_module_pipeline\\infer_stage\\artifacts\\isolation_forest_combiner_final.joblib', 'kde_dir': 'C:\\engine_module_pipeline\\infer_stage\\artifacts', 'gmm_dir': 'C:\\engine_module_pipeline\\infer_stage\\artifacts', 'kde_prefix': 'kde_', 'gmm_prefix': 'gmm_', 'sanitizer': {'colon': '__COLON__'}}


In [88]:
from pyspark.sql.types import *

feature_fields = [StructField(f, DoubleType(), True) for f in features]

INFER_RESULTS_SCHEMA = StructType([
    StructField("row_hash", StringType(), False),
    StructField("timestamp", TimestampType(), True),
    StructField("date", StringType(), False),  # string in results (kept as your design)
    StructField("source_id", StringType(), True),
    StructField("kafka_key", StringType(), True),
    StructField("offset", LongType(), True),
    StructField("source_file", StringType(), True),
    *feature_fields,
    StructField("recon_error_dense", DoubleType(), True),
    StructField("dense_per_feature_error", MapType(StringType(), DoubleType()), True),
    StructField("recon_error_lstm", DoubleType(), True),
    StructField("lstm_window_id", StringType(), True),
    StructField("isolation_score", DoubleType(), True),
    StructField("kde_logp", DoubleType(), True),
    StructField("gmm_logp", DoubleType(), True),
    StructField("composite_score", DoubleType(), True),
    StructField("anomaly_label", StringType(), True),
    StructField("anomaly_severity", IntegerType(), True),
    StructField("model_versions", MapType(StringType(), StringType()), True),
    StructField("inference_run_id", StringType(), True),
    StructField("inference_ts", TimestampType(), True),
    StructField("processing_latency_ms", LongType(), True),
    StructField("explain_top_k", ArrayType(StructType([
        StructField("feature", StringType(), False),
        StructField("contribution", DoubleType(), False),
    ])), True),
    StructField("raw_model_outputs", MapType(StringType(), DoubleType()), True),
])

def ensure_delta_table(path: Path, schema: StructType, partition_cols=None):
    partition_cols = partition_cols or []
    if (path / "_delta_log").exists():
        return
    empty = spark.createDataFrame([], schema)
    w = empty.write.format("delta").mode("overwrite")
    if partition_cols:
        w = w.partitionBy(*partition_cols)
    w.save(str(path))
    print("Initialized Delta table:", path)

ALERTS_SCHEMA = StructType([
    StructField("alert_id", StringType(), False),
    StructField("alert_ts", TimestampType(), False),
    StructField("row_hash", StringType(), True),
    StructField("vehicle_id", StringType(), True),
    StructField("alert_type", StringType(), True),
    StructField("severity", IntegerType(), True),
    StructField("composite_score", DoubleType(), True),
    StructField("triggering_models", ArrayType(StringType()), True),
    StructField("reason", StringType(), True),
    StructField("top_features", ArrayType(StringType()), True),
    StructField("model_versions", MapType(StringType(), StringType()), True),
    StructField("inference_run_id", StringType(), True),
    StructField("acked", BooleanType(), True),
    StructField("acked_by", StringType(), True),
    StructField("acked_ts", TimestampType(), True),
    StructField("notified_channels", ArrayType(StringType()), True),
    StructField("linked_rows", ArrayType(StringType()), True),
    StructField("extra", MapType(StringType(), StringType()), True),
    StructField("date", StringType(), True),
])

LSTM_WIN_SCHEMA = StructType([
    StructField("lstm_window_id", StringType(), False),
    StructField("window_start_ts", TimestampType(), False),
    StructField("window_end_ts", TimestampType(), False),
    StructField("row_hashes", ArrayType(StringType()), False),
    StructField("reconstruction_error", DoubleType(), True),
    StructField("per_step_errors", ArrayType(DoubleType()), True),
    StructField("model_version", StringType(), True),
    StructField("inference_run_id", StringType(), True),
    StructField("date", StringType(), False),
])

MODEL_METADATA_SCHEMA = StructType([
    StructField("inference_run_id", StringType(), False),
    StructField("timestamp", TimestampType(), False),
    StructField("model_versions", MapType(StringType(), StringType()), False),
    StructField("params", MapType(StringType(), StringType()), True),
    StructField("baseline_stats", MapType(StringType(), StringType()), True),
    StructField("notes", StringType(), True),
    StructField("source_commit", StringType(), True),
    StructField("date", StringType(), False),
])

VEH_HEALTH_SCHEMA = StructType([
    StructField("vehicle_id", StringType(), False),
    StructField("date", StringType(), False),
    StructField("rows_count", LongType(), True),
    StructField("anomaly_count", LongType(), True),
    StructField("anomaly_rate", DoubleType(), True),
    StructField("median_composite_score", DoubleType(), True),
    StructField("p95_composite_score", DoubleType(), True),
    StructField("health_score", DoubleType(), True),
    StructField("days_since_last_alert", IntegerType(), True),
    StructField("top_failure_modes", ArrayType(StringType()), True),
    StructField("trend_flag", StringType(), True),
    StructField("estimated_rul", DoubleType(), True),
    StructField("model_versions", MapType(StringType(), StringType()), True),
    StructField("last_inference_ts", TimestampType(), True),
])

# Initialize once (so downstream merges work later)
ensure_delta_table(INFER_RESULTS_DELTA, INFER_RESULTS_SCHEMA, partition_cols=["date"])
ensure_delta_table(ALERTS_DELTA,        ALERTS_SCHEMA,        partition_cols=["date"])
ensure_delta_table(LSTM_WIN_DELTA,      LSTM_WIN_SCHEMA,      partition_cols=["date"])
ensure_delta_table(MODEL_META_DELTA,    MODEL_METADATA_SCHEMA,partition_cols=["date"])
ensure_delta_table(VEH_HEALTH_DELTA,    VEH_HEALTH_SCHEMA,    partition_cols=["date"])
print("All 5 tables initialized; this notebook writes only inference_results.")


Initialized Delta table: C:\engine_module_pipeline\infer_stage\delta\engine_module_alerts
Initialized Delta table: C:\engine_module_pipeline\infer_stage\delta\engine_module_lstm_windows
Initialized Delta table: C:\engine_module_pipeline\infer_stage\delta\engine_module_model_metadata
Initialized Delta table: C:\engine_module_pipeline\infer_stage\delta\vehicle_health_summary
All 5 tables initialized; this notebook writes only inference_results.


In [52]:
import threading, joblib, torch, numpy as np, pandas as pd, os, glob
from pyspark import SparkFiles
from pathlib import Path as _P

req = ["SCALER_PATH","DENSE_TS_PATH","DENSE_SD_PATH","LSTM_TS_PATH","LSTM_SD_PATH","ISOF_PATH",
       "KDE_DIR","GMM_DIR","KDE_PREFIX","GMM_PREFIX","SAN_RULE"]
missing = [r for r in req if r not in globals()]
if missing:
    raise RuntimeError(f"Run Cell 5 first; missing globals: {missing}")

_executor_cache = {"models":{}, "scaler":None, "kde":{}, "gmm":{}, "versions":{}}
_cache_lock = threading.Lock()

def _first_existing(*candidates):
    for p in candidates:
        if p and os.path.exists(p): return p
    return None

def _sparkfile_by_basename(local_path: str|None):
    if not local_path: return None
    try:
        base = os.path.basename(local_path)
        p = SparkFiles.get(base)
        return p if p and os.path.exists(p) else None
    except Exception:
        return None

COLON_TOKEN = SAN_RULE.get("colon", "__COLON__")
def _sanitize_feature_name(s: str) -> str:
    return (s or "").replace(":", COLON_TOKEN)
def _desanitize_feature_name(s: str) -> str:
    return (s or "").replace(COLON_TOKEN, ":")

def load_scaler():
    with _cache_lock:
        if _executor_cache["scaler"] is not None:
            return _executor_cache["scaler"]
        p = _first_existing(SCALER_PATH, _sparkfile_by_basename(SCALER_PATH))
        if not p: raise RuntimeError("Scaler artifact not found.")
        sc = joblib.load(p); _executor_cache["scaler"] = sc; return sc

def load_dense():
    with _cache_lock:
        if "dense" in _executor_cache["models"]:
            return _executor_cache["models"]["dense"]
        p_ts = _first_existing(DENSE_TS_PATH, _sparkfile_by_basename(DENSE_TS_PATH))
        p_sd = _first_existing(DENSE_SD_PATH, _sparkfile_by_basename(DENSE_SD_PATH))
        if p_ts:
            m = torch.jit.load(p_ts, map_location="cpu").eval()
            _executor_cache["models"]["dense"] = ("ts", m)
            _executor_cache["versions"]["dense"] = "ts"; return _executor_cache["models"]["dense"]
        if p_sd:
            m = torch.load(p_sd, map_location="cpu"); 
            try: m.eval()
            except: pass
            _executor_cache["models"]["dense"] = ("sd", m)
            _executor_cache["versions"]["dense"] = "sd"; return _executor_cache["models"]["dense"]
        raise RuntimeError("Dense AE artifact not found (TS/SD).")

def load_lstm():
    with _cache_lock:
        if "lstm" in _executor_cache["models"]:
            return _executor_cache["models"]["lstm"]
        p_ts = _first_existing(LSTM_TS_PATH, _sparkfile_by_basename(LSTM_TS_PATH))
        p_sd = _first_existing(LSTM_SD_PATH, _sparkfile_by_basename(LSTM_SD_PATH))
        if p_ts:
            m = torch.jit.load(p_ts, map_location="cpu").eval()
            _executor_cache["models"]["lstm"] = ("ts", m)
            _executor_cache["versions"]["lstm"] = "ts"; return _executor_cache["models"]["lstm"]
        if p_sd:
            import torch.nn as nn
            class ForgivingLSTMAE(nn.Module):
                def __init__(self, input_dim=25, enc_hidden=64, dec_hidden=25, enc_layers=1, dec_layers=1):
                    super().__init__()
                    self.encoder = nn.LSTM(input_size=input_dim, hidden_size=enc_hidden, num_layers=enc_layers, batch_first=True)
                    self.decoder = nn.LSTM(input_size=enc_hidden,  hidden_size=dec_hidden, num_layers=dec_layers, batch_first=True)
                    self.out     = nn.Linear(dec_hidden, input_dim)
                def forward(self, x):
                    enc_out,_ = self.encoder(x); dec_out,_ = self.decoder(enc_out)
                    return self.out(dec_out[:, -1:, :])
            state = torch.load(p_sd, map_location="cpu")
            m = ForgivingLSTMAE(); m.load_state_dict(state, strict=False); m.eval()
            _executor_cache["models"]["lstm"] = ("sd", m)
            _executor_cache["versions"]["lstm"] = "sd"; return _executor_cache["models"]["lstm"]
        _executor_cache["versions"]["lstm"] = "none"
        return None

def load_isof():
    with _cache_lock:
        if "isof" in _executor_cache["models"]:
            return _executor_cache["models"]["isof"]
        p = _first_existing(ISOF_PATH, _sparkfile_by_basename(ISOF_PATH))
        if not p: raise RuntimeError("IsolationForest artifact not found.")
        m = joblib.load(p)
        _executor_cache["models"]["isof"] = m
        _executor_cache["versions"]["isof"] = "joblib"; return m

def _scan_models(prefix: str, base_dir: _P):
    """Return mapping {canonical_feature_name: full_path} (de-sanitized)."""
    pat = str(base_dir / f"{prefix}*.joblib")
    out = {}
    for p in glob.glob(pat):
        base = os.path.basename(p)
        key = base[len(prefix):-7]  # filename core
        canon = _desanitize_feature_name(key)  
        if canon:
            out[canon] = p
    return out

_KDE_INDEX = None
_GMM_INDEX = None

def load_kde_for(feature):
    global _KDE_INDEX
    with _cache_lock:
        if _KDE_INDEX is None:
            _KDE_INDEX = _scan_models(KDE_PREFIX, _P(KDE_DIR))
        p = _KDE_INDEX.get(feature) or _KDE_INDEX.get(_desanitize_feature_name(_sanitize_feature_name(feature)))
        if not p or not os.path.exists(p): return None
        if feature not in _executor_cache["kde"]:
            _executor_cache["kde"][feature] = joblib.load(p)
        return _executor_cache["kde"][feature]

def load_gmm_for(feature):
    global _GMM_INDEX
    with _cache_lock:
        if _GMM_INDEX is None:
            _GMM_INDEX = _scan_models(GMM_PREFIX, _P(GMM_DIR))
        p = _GMM_INDEX.get(feature) or _GMM_INDEX.get(_desanitize_feature_name(_sanitize_feature_name(feature)))
        if not p or not os.path.exists(p): return None
        if feature not in _executor_cache["gmm"]:
            _executor_cache["gmm"][feature] = joblib.load(p)
        return _executor_cache["gmm"][feature]

def model_versions_map():
    with _cache_lock:
        return dict(_executor_cache["versions"])


In [53]:
import math

BASELINES = {
    "dense_med": 0.05, "dense_mad": 0.05,
    "lstm_med":  0.05, "lstm_mad":  0.05,
    "isof_med":  0.0,  "isof_mad":  0.5,
    "kde_med":  -10.0, "kde_mad":   3.0,
    "gmm_med":  -10.0, "gmm_mad":   3.0
}

def robust_norm(x, med, mad, invert=False, k=1.4826):
    if x is None: return 0.0
    denom = (mad*k) if mad and mad>0 else 1.0
    z = (x - med)/denom
    if invert: z = -z
    return 1/(1+math.exp(-z))

WEIGHTS = {"dense":0.35, "lstm":0.25, "isof":0.20, "kde":0.10, "gmm":0.10}

def composite_from_raw(dense_e, lstm_e, isof_s, kde_lp, gmm_lp):
    s_dense = robust_norm(dense_e, BASELINES["dense_med"], BASELINES["dense_mad"])
    s_lstm  = robust_norm(lstm_e,  BASELINES["lstm_med"],  BASELINES["lstm_mad"])
    s_isof  = robust_norm(isof_s,  BASELINES["isof_med"],  BASELINES["isof_mad"], invert=True)
    s_kde   = robust_norm(kde_lp,  BASELINES["kde_med"],   BASELINES["kde_mad"],  invert=True)
    s_gmm   = robust_norm(gmm_lp,  BASELINES["gmm_med"],   BASELINES["gmm_mad"],  invert=True)
    return (WEIGHTS["dense"]*s_dense +
            WEIGHTS["lstm"] *s_lstm  +
            WEIGHTS["isof"] *s_isof  +
            WEIGHTS["kde"]  *s_kde   +
            WEIGHTS["gmm"]  *s_gmm)

def label_from_composite(c):
    if c is None: return "unknown", 0
    if c < 0.20: return "normal", 0
    if c < 0.50: return "suspicious", 1
    if c < 0.75: return "anomaly", 2
    return "critical", 3


In [66]:

from typing import Iterator
import uuid, numpy as np, pandas as pd, torch

LSTM_WINDOW = 10  


def _feat_variants(feat: str):
    if ":" in feat:
        return (feat, feat.replace(":", "__COLON__"))
    return (feat,)

def _get_kde(feat: str):
    for key in _feat_variants(feat):
        m = load_kde_for(key)
        if m is not None:
            return m
    return None

def _get_gmm(feat: str):
    for key in _feat_variants(feat):
        m = load_gmm_for(key)
        if m is not None:
            return m
    return None

def infer_partition(pdf_iter: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
    scaler = load_scaler()
    dense_mode, dense = load_dense()
    lstm_entry = load_lstm()
    isof = load_isof()

    for pdf in pdf_iter:
        if pdf.empty:
            yield pd.DataFrame([], columns=[f.name for f in INFER_RESULTS_SCHEMA])
            continue

        for f in features:
            if f not in pdf.columns:
                pdf[f] = np.nan

        feat_df = pdf[features].astype(float)
        X = scaler.transform(np.nan_to_num(feat_df.values, copy=False))

        with torch.no_grad():
            xt = torch.from_numpy(X.astype("float32"))
            recon = dense(xt).numpy() if dense_mode == "ts" else dense(xt).detach().numpy()
        dense_err = ((X - recon) ** 2).mean(axis=1)
        dens_resid = np.abs(X - recon)  # [n, F]

        lstm_err = [None] * len(X)
        lstm_win_ids = [None] * len(X)
        lstm_windows_rows = []
        if lstm_entry is not None:
            lstm_mode, lstm = lstm_entry
            with torch.no_grad():
                for i in range(len(X)):
                    start = max(0, i - (LSTM_WINDOW - 1))
                    window = X[start : i + 1]
                    if window.shape[0] < 2:
                        continue
                    w_id = str(uuid.uuid4())
                    wt = torch.from_numpy(window.astype("float32")).unsqueeze(0)  
                    out = lstm(wt)  
                    out_np = out.squeeze().detach().cpu().numpy()
                    last_rec = out_np[-1] if out_np.ndim == 2 else out_np  
                    err = ((window[-1] - last_rec) ** 2).mean()
                    lstm_err[i] = float(err)
                    lstm_win_ids[i] = w_id

                    rows_slice = pdf.iloc[start : i + 1]
                    lstm_windows_rows.append({
                        "lstm_window_id": w_id,
                        "window_start_ts": pd.to_datetime(rows_slice["timestamp"].iloc[0], utc=True, errors="coerce"),
                        "window_end_ts":   pd.to_datetime(rows_slice["timestamp"].iloc[-1], utc=True, errors="coerce"),
                        "row_hashes": rows_slice["row_hash"].astype(str).tolist(),
                        "reconstruction_error": float(err),
                        "per_step_errors": [],
                        "model_version": "ts" if lstm_mode == "ts" else "sd",
                        "inference_run_id": None,
                        "date": str(pd.to_datetime(rows_slice["timestamp"].iloc[-1]).date())
                    })

        try:
            iso_scores = isof.decision_function(X).tolist()
        except Exception:
            iso_scores = [None] * len(X)

        kde_lp, gmm_lp = [], []
        Fw = len(features)
        for r in range(len(X)):
            s_kde = 0.0; cnt_k = 0
            s_gmm = 0.0; cnt_g = 0
            for j in range(Fw):
                feat = features[j]
                v = X[r, j]

                mk = _get_kde(feat)
                if mk is not None:
                    try:
                        s_kde += float(mk.score_samples(np.array([[v]]))[0]); cnt_k += 1
                    except Exception:
                        pass

                mg = _get_gmm(feat)
                if mg is not None:
                    try:
                        s_gmm += float(mg.score(np.array([[v]]))[0]); cnt_g += 1
                    except Exception:
                        pass

            kde_lp.append(s_kde if cnt_k > 0 else None)
            gmm_lp.append(s_gmm if cnt_g > 0 else None)

        n = len(pdf)

        def _fix_len(seq, n, fill=None):
            if seq is None:
                return [fill] * n
            if isinstance(seq, np.ndarray):
                seq = seq.tolist()
            if len(seq) < n:
                return list(seq) + [fill] * (n - len(seq))
            elif len(seq) > n:
                return list(seq)[:n]
            return list(seq)

        dense_err     = _fix_len(dense_err, n, fill=None)
        lstm_err      = _fix_len(lstm_err, n, fill=None)
        lstm_win_ids  = _fix_len(lstm_win_ids, n, fill=None)
        iso_scores    = _fix_len(iso_scores, n, fill=None)
        kde_lp        = _fix_len(kde_lp, n, fill=None)
        gmm_lp        = _fix_len(gmm_lp, n, fill=None)

        # dens_resid -> [n, F] strictly
        if not isinstance(dens_resid, np.ndarray) or dens_resid.ndim != 2:
            dens_resid = np.zeros((n, Fw), dtype=np.float32)
        else:
            if dens_resid.shape[0] < n:
                pad_rows = np.zeros((n - dens_resid.shape[0], dens_resid.shape[1]), dtype=dens_resid.dtype)
                dens_resid = np.vstack([dens_resid, pad_rows])
            elif dens_resid.shape[0] > n:
                dens_resid = dens_resid[:n, :]
            if dens_resid.shape[1] < Fw:
                pad_cols = np.zeros((dens_resid.shape[0], Fw - dens_resid.shape[1]), dtype=dens_resid.dtype)
                dens_resid = np.hstack([dens_resid, pad_cols])
            elif dens_resid.shape[1] > Fw:
                dens_resid = dens_resid[:, :Fw]

        composites, labels, severities, explains = [], [], [], []
        for i in range(n):
            comp = composite_from_raw(
                dense_err[i],
                lstm_err[i],
                iso_scores[i],
                kde_lp[i],
                gmm_lp[i]
            )
            composites.append(comp)
            lab, sev = label_from_composite(comp)
            labels.append(lab); severities.append(sev)

            # top-3 by dense residuals (width-safe)
            feats_abs = [(features[j], float(dens_resid[i, j])) for j in range(Fw)]
            feats_abs.sort(key=lambda x: x[1], reverse=True)
            explains.append([{"feature": a, "contribution": b} for a, b in feats_abs[:3]])

        now = pd.Timestamp.utcnow()  
        out_rows = []
        for i, row in pdf.reset_index(drop=True).iterrows():
            dct = dict(row)
            dct["date"] = str(pd.to_datetime(row["timestamp"]).date())
            dct["recon_error_dense"] = None if dense_err[i] is None else float(dense_err[i])
            dct["dense_per_feature_error"] = {features[j]: float(dens_resid[i, j]) for j in range(Fw)}
            dct["recon_error_lstm"] = None if lstm_err[i] is None else float(lstm_err[i])
            dct["lstm_window_id"] = lstm_win_ids[i]
            dct["isolation_score"] = None if iso_scores[i] is None else float(iso_scores[i])
            dct["kde_logp"] = kde_lp[i]
            dct["gmm_logp"] = gmm_lp[i]   
            dct["combiner_score"] = None  
            dct["composite_score"] = composites[i]
            dct["anomaly_label"] = labels[i]
            dct["anomaly_severity"] = severities[i]
            dct["model_versions"] = model_versions_map()
            dct["inference_run_id"] = None
            dct["inference_ts"] = now
            dct["processing_latency_ms"] = None 
            dct["explain_top_k"] = explains[i]
            dct["raw_model_outputs"] = {}
            dct["notes"] = None
            out_rows.append(dct)

        df_out = pd.DataFrame(out_rows)
        df_lstm = pd.DataFrame(lstm_windows_rows) if lstm_windows_rows else pd.DataFrame(
            [], columns=[f.name for f in LSTM_WIN_SCHEMA]
        )
        yield df_out.assign(__lstm_windows__=pd.Series([df_lstm] * len(df_out)))


In [67]:

from delta.tables import DeltaTable
from pyspark.sql import functions as F
from pyspark.sql import DataFrame
from pyspark.sql.types import (
    StringType, TimestampType, LongType, IntegerType, DoubleType, DateType,
    MapType, ArrayType, StructType, StructField
)
import pandas as pd, numpy as np, time

def _to_int_or_none_series(s: pd.Series) -> pd.Series:
    tmp = pd.to_numeric(s, errors="coerce")
    return tmp.apply(lambda x: int(x) if pd.notna(x) else None)

def _norm_map_float(m):
    if not isinstance(m, dict): return {}
    out = {}
    for k, v in m.items():
        try:
            out[str(k)] = None if v is None else float(v)
        except Exception:
            out[str(k)] = None
    return out

def _norm_explain_top_k(v):
    if v is None or (isinstance(v, float) and np.isnan(v)): return []
    if isinstance(v, list):
        out = []
        for d in v:
            if isinstance(d, dict):
                f = "" if d.get("feature") is None else str(d.get("feature"))
                try:
                    c = 0.0 if d.get("contribution") is None else float(d.get("contribution"))
                except Exception:
                    c = 0.0
                out.append({"feature": f, "contribution": c})
        return out
    return []

def _norm_versions(m):
    if not isinstance(m, dict): return {}
    return {str(k): ("" if v is None else str(v)) for k, v in m.items()}

def _select_cast_by_name(df, target_schema: StructType):
    def _is_simple(dt) -> bool:
        return isinstance(dt, (StringType, TimestampType, LongType, IntegerType, DoubleType, DateType))
    sel = []
    for field in target_schema:
        cname = field.name
        col = F.col(f"`{cname}`")
        if _is_simple(field.dataType):
            col = col.cast(field.dataType)
        sel.append(col.alias(cname))
    return df.select(*sel)

def foreach_batch(batch_df: DataFrame, batch_id: int):
    t0 = time.time()

    # Strict input projection by name (system + canonical features)
    expected_in_cols = ["row_hash","timestamp","source_id","kafka_key","offset","source_file","date"] + features
    src_cols = [c for c in expected_in_cols if c in batch_df.columns]
    pdf = batch_df.select(*src_cols).toPandas()
    if pdf.empty:
        print(f"[fb] ▶ batch_id={batch_id} empty; skip.")
        return

    # Run inference
    outs = list(infer_partition([pdf]))
    df_out = outs[0]

    # Fill lineage/timing
    run_id = f"run-{uuid.uuid4().hex}"
    now_ts = pd.Timestamp.utcnow()
    df_out["inference_run_id"] = run_id
    if "inference_ts" not in df_out.columns or df_out["inference_ts"].isna().all():
        df_out["inference_ts"] = now_ts
    df_out["date"] = pd.to_datetime(df_out["timestamp"], errors="coerce").dt.date.astype(str)

    # Normalize fields
    if "offset" in df_out.columns:
        df_out["offset"] = _to_int_or_none_series(df_out["offset"])
    if "processing_latency_ms" in df_out.columns:
        # compute elapsed end-to-end per batch and stamp same number (simple, not per-row)
        took_ms = int((time.time() - t0) * 1000)
        df_out["processing_latency_ms"] = took_ms

    if "dense_per_feature_error" in df_out.columns:
        df_out["dense_per_feature_error"] = df_out["dense_per_feature_error"].apply(_norm_map_float)
    if "raw_model_outputs" in df_out.columns:
        df_out["raw_model_outputs"] = df_out["raw_model_outputs"].apply(_norm_map_float)
    if "model_versions" in df_out.columns:
        df_out["model_versions"] = df_out["model_versions"].apply(_norm_versions)
    if "explain_top_k" in df_out.columns:
        df_out["explain_top_k"] = df_out["explain_top_k"].apply(_norm_explain_top_k)
    if "anomaly_severity" in df_out.columns:
        df_out["anomaly_severity"] = pd.to_numeric(df_out["anomaly_severity"], errors="coerce").fillna(0).astype(int)

    # Ensure column set & order
    tgt_cols = [f.name for f in INFER_RESULTS_SCHEMA]
    for c in tgt_cols:
        if c not in df_out.columns:
            df_out[c] = None
    df_out = df_out[tgt_cols]

    # String-stage everything, then parse/cast by schema
    import json as _json
    def _to_json_or_empty(v, empty):
        try: return _json.dumps(v if v is not None else empty)
        except Exception: return _json.dumps(empty)

    def _to_string(v):
        if v is None or (isinstance(v, float) and np.isnan(v)): return None
        if isinstance(v, pd.Timestamp):
            try: return v.tz_localize("UTC").isoformat()
            except Exception: return v.isoformat()
        return str(v)

    complex_schemas = {
        "dense_per_feature_error": MapType(StringType(), DoubleType()),
        "raw_model_outputs":       MapType(StringType(), DoubleType()),
        "model_versions":          MapType(StringType(), StringType()),
        "explain_top_k":           ArrayType(StructType([
                                    StructField("feature", StringType(), False),
                                    StructField("contribution", DoubleType(), False)
                                  ])),
    }

    df_str = pd.DataFrame()
    for c in tgt_cols:
        if c in complex_schemas:
            empty = {} if c != "explain_top_k" else []
            df_str[c] = df_out[c].apply(lambda v: _to_json_or_empty(v, empty))
        else:
            df_str[c] = df_out[c].apply(_to_string)

    string_schema = StructType([StructField(c, StringType(), True) for c in tgt_cols])
    tmp_df = spark.createDataFrame(df_str, schema=string_schema)

    # Parse/cast by name
    select_exprs = []
    def _is_simple(dt):
        return isinstance(dt, (StringType, TimestampType, LongType, IntegerType, DoubleType, DateType))
    for f in INFER_RESULTS_SCHEMA:
        cname = f.name
        col = F.col(f"`{cname}`")
        if cname in complex_schemas:
            col = F.from_json(col, complex_schemas[cname])
        elif isinstance(f.dataType, (TimestampType, DateType)):
            col = col.cast(f.dataType)
        elif _is_simple(f.dataType):
            col = col.cast(f.dataType)
        select_exprs.append(col.alias(cname))
    results_df = tmp_df.select(*select_exprs)

    # MERGE by row_hash; newer inference_ts wins
    tgt = DeltaTable.forPath(spark, str(INFER_RESULTS_DELTA))
    cols = [f.name for f in INFER_RESULTS_SCHEMA]
    (tgt.alias("t")
        .merge(results_df.alias("s"), "t.row_hash = s.row_hash")
        .whenMatchedUpdate(
            condition="s.inference_ts > t.inference_ts",
            set={c: f"s.`{c}`" for c in cols}
        )
        .whenNotMatchedInsert(values={c: f"s.`{c}`" for c in cols})
        .execute())

    wrote = results_df.count()
    took_ms = int((time.time() - t0) * 1000)
    print(f"[fb] ✓ batch_id={batch_id} rows_in={len(pdf)} rows_out={wrote} ms={took_ms}")


In [68]:
from pyspark.sql import functions as F
from pyspark.sql.types import *

spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false")

system_cols = [
    StructField("row_hash",    StringType(),    False),
    StructField("timestamp",   TimestampType(), True),
    StructField("source_id",   StringType(),    True),
    StructField("kafka_key",   StringType(),    True),
    StructField("offset",      LongType(),      True),
    StructField("source_file", StringType(),    True),
    StructField("date",        DateType(),      True),
]
feature_fields = [StructField(f, DoubleType(), True) for f in features]
INFER_READY_SCHEMA = StructType(system_cols + feature_fields)

infer_ready_glob = (INFER_READY / "date=*").as_posix()
src = (
    spark.readStream
         .schema(INFER_READY_SCHEMA)
         .option("maxFilesPerTrigger", "50")
         .parquet(infer_ready_glob)
)
print("Streaming source prepared. (We won’t start it in this notebook.)")


Streaming source prepared. (We won’t start it in this notebook.)


In [69]:
import threading, http.server
METRICS_PORT = 9109
_metrics = {"batches":0, "rows":0}

class Handler(http.server.BaseHTTPRequestHandler):
    def do_GET(self):
        if self.path != "/metrics":
            self.send_response(404); self.end_headers(); return
        self.send_response(200); self.end_headers()
        out = "\n".join([
            f'engine_inference_batches_total {_metrics["batches"]}',
            f'engine_inference_rows_total {_metrics["rows"]}',
        ])
        self.wfile.write(out.encode("utf-8"))
    def log_message(self, *args, **kwargs): pass

def run_metrics():
    with http.server.ThreadingHTTPServer(("", METRICS_PORT), Handler) as httpd:
        print(f"Prometheus metrics on :{METRICS_PORT}/metrics")
        httpd.serve_forever()

t = threading.Thread(target=run_metrics, daemon=True); t.start()


Prometheus metrics on :9109/metrics


In [70]:

try:
    _ = spark.sparkContext.parallelize([0], 1).map(lambda x: x).collect()
    print("Executors reachable. (Prewarm done)")
except Exception as e:
    print("Prewarm skip:", e)


Executors reachable. (Prewarm done)


In [71]:

from pyspark.sql.types import *
from pyspark.sql import functions as F
import pandas as pd

_system_cols = [
    StructField("row_hash",    StringType(),    False),
    StructField("timestamp",   TimestampType(), True),
    StructField("source_id",   StringType(),    True),
    StructField("kafka_key",   StringType(),    True),
    StructField("offset",      LongType(),      True),
    StructField("source_file", StringType(),    True),
    StructField("date",        DateType(),      True),
]
_feature_fields = [StructField(f, DoubleType(), True) for f in features]
INFER_READY_SCHEMA = StructType(_system_cols + _feature_fields)

infer_ready_glob = (INFER_READY / "date=*").as_posix()
_sel_cols = ["row_hash","timestamp","source_id","kafka_key","offset","source_file","date"] + features

df_batch = (
    spark.read
         .schema(INFER_READY_SCHEMA)
         .parquet(infer_ready_glob)
         .select(*_sel_cols)
)
total_rows = df_batch.count()
print(f"[14A] Batch rows visible: {total_rows}")

if total_rows == 0:
    df_out_pdf = pd.DataFrame(columns=[f.name for f in INFER_RESULTS_SCHEMA])
    print("[14A] Nothing to process.")
else:
    pdf = df_batch.toPandas()
    outs = list(infer_partition([pdf]))
    df_out_pdf = outs[0].copy()

    # Fill lineage/timing
    run_id = f"run-{uuid.uuid4().hex}"
    now_ts = pd.Timestamp.utcnow()
    df_out_pdf["inference_run_id"] = run_id
    if "inference_ts" not in df_out_pdf.columns or df_out_pdf["inference_ts"].isna().all():
        df_out_pdf["inference_ts"] = now_ts
    df_out_pdf["date"] = pd.to_datetime(df_out_pdf["timestamp"], errors="coerce").dt.date.astype(str)

    # Normalize types that often bite
    def _to_int_or_none_series(s: pd.Series) -> pd.Series:
        tmp = pd.to_numeric(s, errors="coerce")
        return tmp.apply(lambda x: int(x) if pd.notna(x) else None)
    if "offset" in df_out_pdf.columns:
        df_out_pdf["offset"] = _to_int_or_none_series(df_out_pdf["offset"])
    df_out_pdf["processing_latency_ms"] = 0  # static run — set 0; IS-10 sets real ms per batch

    print("[14A] Prepared df_out_pdf. Sample cols:", df_out_pdf.columns[:8].tolist())


[14A] Batch rows visible: 2000
[14A] Prepared df_out_pdf. Sample cols: ['row_hash', 'timestamp', 'source_id', 'kafka_key', 'offset', 'source_file', 'date', 'Air_Fuel_Ratio_Commanded_:1']


In [72]:
import numpy as np, pandas as pd

print("gmm null frac:", float(pd.Series(df_out_pdf["gmm_logp"]).isna().mean()))
print("kde null frac:", float(pd.Series(df_out_pdf["kde_logp"]).isna().mean()))
print("latency stats (ms):", {
    "min": int(pd.Series(df_out_pdf["processing_latency_ms"]).min()),
    "p50": int(pd.Series(df_out_pdf["processing_latency_ms"]).median()),
    "max": int(pd.Series(df_out_pdf["processing_latency_ms"]).max())
})
print("example row:", df_out_pdf[["gmm_logp","kde_logp","processing_latency_ms"]].head(3).to_dict(orient="records"))


gmm null frac: 1.0
kde null frac: 0.0
latency stats (ms): {'min': 0, 'p50': 0, 'max': 0}
example row: [{'gmm_logp': None, 'kde_logp': -47.018585608970696, 'processing_latency_ms': 0}, {'gmm_logp': None, 'kde_logp': -47.018585608970696, 'processing_latency_ms': 0}, {'gmm_logp': None, 'kde_logp': -47.018585608970696, 'processing_latency_ms': 0}]


In [73]:
from delta.tables import DeltaTable
from pyspark.sql import functions as F
from pyspark.sql.types import *
import pandas as pd, numpy as np, json as _json

if len(df_out_pdf) == 0:
    print("[14B] No results to write.")
else:
    tgt_cols = [f.name for f in INFER_RESULTS_SCHEMA]
    for c in tgt_cols:
        if c not in df_out_pdf.columns:
            df_out_pdf[c] = None
    df_out_pdf = df_out_pdf[tgt_cols]

    complex_schemas = {
        "dense_per_feature_error": MapType(StringType(), DoubleType()),
        "raw_model_outputs":       MapType(StringType(), DoubleType()),
        "model_versions":          MapType(StringType(), StringType()),
        "explain_top_k":           ArrayType(StructType([
                                    StructField("feature", StringType(), False),
                                    StructField("contribution", DoubleType(), False)
                                  ])),
    }

    def _to_json_or_empty(v, empty):
        try: return _json.dumps(v if v is not None else empty)
        except Exception: return _json.dumps(empty)

    def _to_string(v):
        if v is None or (isinstance(v, float) and np.isnan(v)): return None
        if isinstance(v, pd.Timestamp):
            try: return v.tz_localize("UTC").isoformat()
            except Exception: return v.isoformat()
        return str(v)

    df_str = pd.DataFrame()
    for c in tgt_cols:
        if c in complex_schemas:
            empty = {} if c != "explain_top_k" else []
            df_str[c] = df_out_pdf[c].apply(lambda v: _to_json_or_empty(v, empty))
        else:
            df_str[c] = df_out_pdf[c].apply(_to_string)

    string_schema = StructType([StructField(c, StringType(), True) for c in tgt_cols])
    tmp_df = spark.createDataFrame(df_str, schema=string_schema)

    select_exprs = []
    def _is_simple(dt):
        return isinstance(dt, (StringType, TimestampType, LongType, IntegerType, DoubleType, DateType))
    for f in INFER_RESULTS_SCHEMA:
        cname = f.name
        col = F.col(f"`{cname}`")
        if cname in complex_schemas:
            col = F.from_json(col, complex_schemas[cname])
        elif isinstance(f.dataType, (TimestampType, DateType)):
            col = col.cast(f.dataType)
        elif _is_simple(f.dataType):
            col = col.cast(f.dataType)
        select_exprs.append(col.alias(cname))
    results_df = tmp_df.select(*select_exprs)

    tgt = DeltaTable.forPath(spark, str(INFER_RESULTS_DELTA))
    cols = [f.name for f in INFER_RESULTS_SCHEMA]
    (tgt.alias("t")
        .merge(results_df.alias("s"), "t.row_hash = s.row_hash")
        .whenMatchedUpdate(
            condition="s.inference_ts > t.inference_ts",
            set={c: f"s.`{c}`" for c in cols}
        )
        .whenNotMatchedInsert(values={c: f"s.`{c}`" for c in cols})
        .execute())

    wrote = results_df.count()
    print(f"[14B] inference_results MERGE complete. rows={wrote}")


[14B] inference_results MERGE complete. rows=2000


In [74]:
from delta.tables import DeltaTable
import pandas as pd

tgt = DeltaTable.forPath(spark, str(INFER_RESULTS_DELTA))
df = tgt.toDF()

print("inference_results count:", df.count())
print("schema:")
df.printSchema()

# sanity: null counts for key cols
key_cols = ["recon_error_dense","isolation_score","kde_logp","gmm_logp","composite_score","raw_model_outputs"]
agg = df.select([F.sum(F.col(c).isNull().cast("int")).alias(c+"_nulls") for c in key_cols])
agg.show(truncate=False)

print("sample rows:")
df.select("row_hash","timestamp","composite_score","anomaly_label","kde_logp","gmm_logp","raw_model_outputs").show(5, truncate=False)


inference_results count: 2000
schema:
root
 |-- row_hash: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- date: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- kafka_key: string (nullable = true)
 |-- offset: long (nullable = true)
 |-- source_file: string (nullable = true)
 |-- Air_Fuel_Ratio_Commanded_:1: double (nullable = true)
 |-- Air_Fuel_Ratio_Measured_:1: double (nullable = true)
 |-- Catalyst_Temperature__Bank_1_Sensor_1: double (nullable = true)
 |-- Catalyst_Temperature__Bank_1_Sensor_2: double (nullable = true)
 |-- Engine_kW__At_the_wheels_kW: double (nullable = true)
 |-- Engine_Load_Absolute_pct: double (nullable = true)
 |-- Engine_Oil_Temperature: double (nullable = true)
 |-- Engine_RPM_rpm: double (nullable = true)
 |-- Fuel_flow_rate_hour_l_hr: double (nullable = true)
 |-- Fuel_Trim_Bank_1_Long_Term_pct: double (nullable = true)
 |-- Fuel_Trim_Bank_1_Short_Term_pct: double (nullable = true)
 |-- Mass_Air_Flow_Rate_g

In [75]:

import os, shutil, glob, time
from pathlib import Path
from delta.tables import DeltaTable
from pyspark.sql import functions as F
from pyspark.sql.types import MapType, ArrayType, StructType

assert 'spark' in globals(), "SparkSession missing."
assert 'INFER_RESULTS_DELTA' in globals(), "INFER_RESULTS_DELTA path not set."
assert 'INFER_RESULTS_SCHEMA' in globals(), "INFER_RESULTS_SCHEMA not loaded."

TARGET_DIR = Path(r"C:\engine_module_pipeline\infer_stage\csv")
TMP_DIR    = TARGET_DIR / "_export_tmp"
TARGET_DIR.mkdir(parents=True, exist_ok=True)
shutil.rmtree(TMP_DIR, ignore_errors=True)

# ---- Read Delta table ----
tgt = DeltaTable.forPath(spark, str(INFER_RESULTS_DELTA))
df  = tgt.toDF()

# ---- Build select list: JSON-encode complex cols, keep others as-is ----
complex_cols = set()
sel_exprs = []
schema_by_name = {f.name: f.dataType for f in INFER_RESULTS_SCHEMA}

for f in INFER_RESULTS_SCHEMA:
    c = f.name
    dt = schema_by_name.get(c)
    col = F.col(f"`{c}`")
    if isinstance(dt, (MapType, ArrayType, StructType)):
        complex_cols.add(c)
        # Convert complex types to a compact JSON string column for CSV
        col = F.to_json(col)
    # else: leave simple numeric/string/timestamp/date columns as they are
    sel_exprs.append(col.alias(c))

df_flat = df.select(*sel_exprs)

if df_flat.rdd.isEmpty():
    raise FileNotFoundError("inference_results has no rows to export yet.")

# ---- Write single CSV part with header ----
(df_flat
    .coalesce(1)
    .write
    .mode("overwrite")
    .option("header", "true")
    .option("quote", '"')
    .option("escape", '"')
    .csv(str(TMP_DIR))
)

# ---- Move part file to final timestamped name ----
part_files = glob.glob(str(TMP_DIR / "part-*.csv"))
if not part_files:
    raise FileNotFoundError("No CSV part file produced. (Check permissions/space.)")
part_path = Path(part_files[0])

ts = time.strftime("%Y%m%d_%H%M%S")
final_path = TARGET_DIR / f"inference_results_{ts}.csv"

if final_path.exists():
    final_path.unlink()
shutil.move(str(part_path), str(final_path))

# cleanup Spark markers
for p in (TMP_DIR / "_SUCCESS",):
    try: p.unlink()
    except Exception: pass
shutil.rmtree(TMP_DIR, ignore_errors=True)

print(f"[export] Wrote CSV: {final_path}")
print(f"[export] JSON-encoded columns: {sorted(complex_cols)}")


[export] Wrote CSV: C:\engine_module_pipeline\infer_stage\csv\inference_results_20251003_044019.csv
[export] JSON-encoded columns: ['dense_per_feature_error', 'explain_top_k', 'model_versions', 'raw_model_outputs']


In [79]:
import os, json, glob
from pathlib import Path
import joblib

# --- where artifacts live (adjust only if yours differ) ---
ARTIFACTS_DIR = Path(r"C:\engine_module_pipeline\infer_stage\artifacts")
assert ARTIFACTS_DIR.exists(), f"Artifacts not found: {ARTIFACTS_DIR}"

# --- load feature contract (single source of truth) ---
contract_path = ARTIFACTS_DIR / "model_input_contract.json"
with open(contract_path, "r", encoding="utf-8") as fh:
    contract = json.load(fh)

features = contract["features_canonical_order"]
colon_token = contract.get("per_feature_filename_rule", {}).get("colon", "__COLON__")

def _desanitize_token(s: str) -> str:
    # turn filename token back into canonical colon
    return s.replace(colon_token, ":")

# Build index by scanning *.joblib and mapping back to canonical names
_GMM_INDEX = {}            # canonical feature -> full path
_GMM_CACHE = {}            # loaded models cache

for p in glob.glob(str(ARTIFACTS_DIR / "gmm_*.joblib")):
    base = os.path.basename(p)          # e.g., gmm_Air_Fuel_Ratio_Commanded__COLON__1.joblib
    stem = base[len("gmm_"):-len(".joblib")]
    canonical = _desanitize_token(stem)
    if canonical in features:
        _GMM_INDEX[canonical] = p

print(f"[GMM-1r] Indexed GMM files: {len(_GMM_INDEX)}/{len(features)}")
missing = [f for f in features if f not in _GMM_INDEX]
if missing:
    print("[GMM-1r] Missing models for:", missing[:10], "..." if len(missing)>10 else "")

def get_gmm_for(feature: str):
    """Return a loaded sklearn.mixture.GaussianMixture for the canonical feature name, or None."""
    if feature in _GMM_CACHE:
        return _GMM_CACHE[feature]
    p = _GMM_INDEX.get(feature)
    if not p or not os.path.exists(p):
        return None
    try:
        m = joblib.load(p)
        _GMM_CACHE[feature] = m
        return m
    except Exception as e:
        print(f"[GMM-1r] load failed for {feature}: {e}")
        return None

# quick smoke test: how many GMMs can actually be loaded?
loaded_ok = sum(1 for f in features if get_gmm_for(f) is not None)
print(f"[GMM-1r] Loadable GMMs: {loaded_ok}/{len(features)}")


[GMM-1r] Indexed GMM files: 25/25
[GMM-1r] Loadable GMMs: 25/25


In [81]:
# GMM-2rr — compute and MERGE only gmm_logp (fixed: use score_samples) + richer diagnostics

from typing import Iterator
import numpy as np, pandas as pd
from delta.tables import DeltaTable
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, DoubleType

# Reuse: ARTIFACTS_DIR, features, scaler, get_gmm_for must already exist from GMM-1r & your notebook.
# Quick asserts to avoid silent fallbacks:
assert 'features' in globals() and len(features) == 25, "features (25) not bound"
assert 'scaler' in globals(), "scaler not loaded"
assert 'get_gmm_for' in globals(), "get_gmm_for() missing (run GMM-1r first)"
assert 'INFER_RESULTS_DELTA' in globals(), "target delta path missing"

# Read rows that need gmm_logp
need_cols = ["row_hash", "inference_ts"] + features + ["gmm_logp"]
df_in = (spark.read.format("delta")
         .load(str(INFER_RESULTS_DELTA))
         .select(*need_cols)
         .where(F.col("gmm_logp").isNull()))

todo = df_in.count()
print(f"[GMM-2rr] Rows needing gmm_logp backfill: {todo}")
if todo == 0:
    print("[GMM-2rr] Nothing to do.")
else:
    # Preflight: how many features have usable GMMs?
    avail = sum(1 for f in features if get_gmm_for(f) is not None)
    print(f"[GMM-2rr] GMMs available for features: {avail}/{len(features)}")

    out_schema = StructType([
        StructField("row_hash",     StringType(),    False),
        StructField("inference_ts", TimestampType(), True),
        StructField("gmm_logp",     DoubleType(),    True),
    ])

    # Preload models once per task to avoid repeated disk IO
    _GMM_LOCAL = {f: get_gmm_for(f) for f in features}

    def _score_gmm_chunks(pdf_iter: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
        for pdf in pdf_iter:
            if pdf is None or pdf.empty:
                yield pd.DataFrame(columns=["row_hash","inference_ts","gmm_logp"])
                continue

            # Ensure all features present
            for f in features:
                if f not in pdf.columns:
                    pdf[f] = np.nan

            # Scale
            X = pdf[features].astype(float).values
            Xs = scaler.transform(np.nan_to_num(X, copy=False))

            # Row-wise aggregate log-prob across available feature models
            gmm_vals = np.full((len(pdf),), np.nan, dtype=float)
            # simple diagnostics for this partition
            used_feat_counts = np.zeros((len(pdf),), dtype=int)

            for j, feat in enumerate(features):
                m = _GMM_LOCAL.get(feat)
                if m is None:
                    continue
                try:
                    # score_samples expects 2D; pass the column j as (n,1)
                    col = Xs[:, [j]]
                    lp = m.score_samples(col)  # shape: (n,)
                    # accumulate; if any are nan/inf, treat safely
                    mask = np.isfinite(lp)
                    gmm_vals[mask] = np.where(np.isfinite(gmm_vals[mask]), gmm_vals[mask] + lp[mask], lp[mask])
                    used_feat_counts[mask] += 1
                except Exception as e:
                    # continue; one bad feature shouldn't block others
                    # optional: print(f"[GMM-2rr] feature {feat} score_samples error: {e}")
                    pass

            # Where no features contributed, leave as NaN
            out = pd.DataFrame({
                "row_hash": pdf["row_hash"].astype(str).values,
                "inference_ts": pd.to_datetime(pdf["inference_ts"], utc=True),
                "gmm_logp": gmm_vals,
            })

            # tiny debug for first few rows in this chunk
            if len(out) > 0:
                c0 = int(used_feat_counts[:10].mean())
                print(f"[GMM-2rr][chunk] first10 mean contributing feats: {c0}")
            yield out

    df_updates = df_in.mapInPandas(_score_gmm_chunks, schema=out_schema)

    # Pre-merge diagnostics
    diag = df_updates.select(
        F.count("*").alias("rows"),
        F.count(F.when(F.col("gmm_logp").isNull(), True)).alias("nulls"),
        F.mean("gmm_logp").alias("mean_gmm_logp"),
        F.min("gmm_logp").alias("min_gmm_logp"),
        F.max("gmm_logp").alias("max_gmm_logp")
    ).collect()[0]
    print(f"[GMM-2rr] Pre-merge: rows={diag['rows']} nulls={diag['nulls']} "
          f"mean={diag['mean_gmm_logp']} min={diag['min_gmm_logp']} max={diag['max_gmm_logp']}")

    # MERGE into inference_results by row_hash (keep newest inference_ts)
    tgt = DeltaTable.forPath(spark, str(INFER_RESULTS_DELTA))
    (tgt.alias("t")
        .merge(df_updates.alias("s"), "t.row_hash = s.row_hash")
        .whenMatchedUpdate(
            condition="s.inference_ts >= t.inference_ts",
            set={"gmm_logp": "s.gmm_logp"}
        )
        .execute())

    # Post-merge verification
    post = (spark.read.format("delta").load(str(INFER_RESULTS_DELTA))
            .select(
                F.count("*").alias("rows"),
                F.count(F.when(F.col("gmm_logp").isNull(), True)).alias("gmm_nulls_total"),
                F.mean("gmm_logp").alias("gmm_mean"),
                F.min("gmm_logp").alias("gmm_min"),
                F.max("gmm_logp").alias("gmm_max")
            ))
    post.show(truncate=False)
    print("[GMM-2rr] Backfill complete.")


[GMM-2rr] Rows needing gmm_logp backfill: 2000
[GMM-2rr] GMMs available for features: 25/25
[GMM-2rr] Pre-merge: rows=2000 nulls=0 mean=-4.731006930828524 min=-74.09407025208272 max=54.07666622201737
+----+---------------+------------------+------------------+-----------------+
|rows|gmm_nulls_total|gmm_mean          |gmm_min           |gmm_max          |
+----+---------------+------------------+------------------+-----------------+
|2000|0              |-4.731006930828524|-74.09407025208272|54.07666622201737|
+----+---------------+------------------+------------------+-----------------+

[GMM-2rr] Backfill complete.


In [82]:
# GMM-3 — recompute composite_score, anomaly_label, anomaly_severity now that gmm_logp is filled

from delta.tables import DeltaTable
from pyspark.sql import functions as F
from pyspark.sql.types import (
    StructType, StructField, StringType, TimestampType, DoubleType, IntegerType
)
import pandas as pd
import numpy as np
import math

# Reuse BASELINES/WEIGHTS & label function from your notebook; just mirror them here to keep cell self-contained
BASELINES = {
    "dense_med": 0.05, "dense_mad": 0.05,
    "lstm_med":  0.05, "lstm_mad":  0.05,
    "isof_med":  0.0,  "isof_mad":  0.5,
    "kde_med":  -10.0, "kde_mad":   3.0,
    "gmm_med":  -10.0, "gmm_mad":   3.0,
}
WEIGHTS = {"dense":0.35, "lstm":0.25, "isof":0.20, "kde":0.10, "gmm":0.10}

def robust_norm(x, med, mad, invert=False, k=1.4826):
    if x is None or (isinstance(x, float) and (np.isnan(x) or np.isinf(x))):
        return 0.0
    denom = (mad*k) if mad and mad>0 else 1.0
    z = (x - med)/denom
    if invert: z = -z
    return 1/(1+math.exp(-z))

def composite_from_raw(dense_e, lstm_e, isof_s, kde_lp, gmm_lp):
    s_dense = robust_norm(dense_e, BASELINES["dense_med"], BASELINES["dense_mad"])
    s_lstm  = robust_norm(lstm_e,  BASELINES["lstm_med"],  BASELINES["lstm_mad"])
    s_isof  = robust_norm(isof_s,  BASELINES["isof_med"],  BASELINES["isof_mad"], invert=True)
    s_kde   = robust_norm(kde_lp,  BASELINES["kde_med"],   BASELINES["kde_mad"],  invert=True)
    s_gmm   = robust_norm(gmm_lp,  BASELINES["gmm_med"],   BASELINES["gmm_mad"],  invert=True)
    return (WEIGHTS["dense"]*s_dense +
            WEIGHTS["lstm"] *s_lstm  +
            WEIGHTS["isof"] *s_isof  +
            WEIGHTS["kde"]  *s_kde   +
            WEIGHTS["gmm"]  *s_gmm)

def label_from_composite(c):
    if c is None or (isinstance(c,float) and np.isnan(c)):
        return "unknown", 0
    if c < 0.20: return "normal", 0
    if c < 0.50: return "suspicious", 1
    if c < 0.75: return "anomaly", 2
    return "critical", 3

need_cols = ["row_hash","inference_ts",
             "recon_error_dense","recon_error_lstm",
             "isolation_score","kde_logp","gmm_logp",
             "composite_score","anomaly_label","anomaly_severity"]

df_base = (spark.read.format("delta")
           .load(str(INFER_RESULTS_DELTA))
           .select(*need_cols))

# Recompute for all rows, or restrict to rows where composite is null or changed
df_needs = df_base  # keep simple & deterministic

out_schema = StructType([
    StructField("row_hash",        StringType(),    False),
    StructField("inference_ts",    TimestampType(), True),
    StructField("composite_score", DoubleType(),    True),
    StructField("anomaly_label",   StringType(),    True),
    StructField("anomaly_severity",IntegerType(),   True),
])

def _recompute(pdf_iter):
    for pdf in pdf_iter:
        if pdf is None or pdf.empty:
            yield pd.DataFrame(columns=[f.name for f in out_schema])
            continue

        # Compute composite row-wise
        comp = []
        lab  = []
        sev  = []
        for _, r in pdf.iterrows():
            c = composite_from_raw(
                r.get("recon_error_dense"),
                r.get("recon_error_lstm"),
                r.get("isolation_score"),
                r.get("kde_logp"),
                r.get("gmm_logp")
            )
            l, s = label_from_composite(c)
            comp.append(float(c))
            lab.append(l)
            sev.append(int(s))

        out = pd.DataFrame({
            "row_hash": pdf["row_hash"].astype(str).values,
            "inference_ts": pd.to_datetime(pdf["inference_ts"], utc=True),
            "composite_score": comp,
            "anomaly_label": lab,
            "anomaly_severity": sev,
        })
        yield out

df_updates = df_needs.mapInPandas(_recompute, schema=out_schema)

# quick diagnostics
df_updates.select(
    F.count("*").alias("rows"),
    F.mean("composite_score").alias("comp_mean"),
    F.min("composite_score").alias("comp_min"),
    F.max("composite_score").alias("comp_max")
).show(truncate=False)

# MERGE by row_hash; keep newest inference_ts
tgt = DeltaTable.forPath(spark, str(INFER_RESULTS_DELTA))
(tgt.alias("t")
    .merge(df_updates.alias("s"), "t.row_hash = s.row_hash")
    .whenMatchedUpdate(
        condition="s.inference_ts >= t.inference_ts",
        set={
            "composite_score": "s.composite_score",
            "anomaly_label":   "s.anomaly_label",
            "anomaly_severity":"s.anomaly_severity",
        }
    )
    .execute())

print("[GMM-3] composite/labels refreshed from gmm-inclusive signals.")


+----+------------------+------------------+------------------+
|rows|comp_mean         |comp_min          |comp_max          |
+----+------------------+------------------+------------------+
|2000|0.5391502616027245|0.3604706184902124|0.8094386903476654|
+----+------------------+------------------+------------------+

[GMM-3] composite/labels refreshed from gmm-inclusive signals.


In [83]:
# CSV-2 — export inference_results to CSV with complex columns flattened
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
from pathlib import Path
import os

CSV_OUT_DIR = Path(r"C:\engine_module_pipeline\infer_stage\csv")
CSV_OUT_DIR.mkdir(parents=True, exist_ok=True)

df_all = spark.read.format("delta").load(str(INFER_RESULTS_DELTA))

# Flatten complex columns to strings (JSON)
to_json = F.to_json
df_flat = (df_all
    .withColumn("dense_per_feature_error_json", to_json("dense_per_feature_error"))
    .withColumn("explain_top_k_json",           to_json("explain_top_k"))
    .withColumn("model_versions_json",          to_json("model_versions"))
    .withColumn("raw_model_outputs_json",       to_json("raw_model_outputs"))
)

# Select order: keep the important scalar cols + JSON strings at the end
scalar_cols = [
    "row_hash","timestamp","date","source_id","kafka_key","offset","source_file"
] + features + [
    "recon_error_dense","recon_error_lstm","lstm_window_id",
    "isolation_score","kde_logp","gmm_logp",
    "composite_score","anomaly_label","anomaly_severity",
    "inference_run_id","inference_ts","processing_latency_ms"
]

export_cols = [c for c in scalar_cols if c in df_flat.columns] + [
    "dense_per_feature_error_json","explain_top_k_json","model_versions_json","raw_model_outputs_json"
]

out_path = (CSV_OUT_DIR / "inference_results.csv").as_posix()
(df_flat.select(*export_cols)
        .coalesce(1)
        .write.mode("overwrite")
        .option("header","true")
        .csv(out_path))

print(f"[CSV-2] Wrote CSV to folder: {out_path}")


[CSV-2] Wrote CSV to folder: C:/engine_module_pipeline/infer_stage/csv/inference_results.csv


In [84]:
# DER-ALERTS — derive alerts Delta from inference_results and export CSV
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, StringType, BooleanType
from delta.tables import DeltaTable
from pathlib import Path

ALERT_THRESHOLD = 0.75
CSV_OUT_DIR = Path(r"C:\engine_module_pipeline\infer_stage\csv")
CSV_OUT_DIR.mkdir(parents=True, exist_ok=True)

df_ir = spark.read.format("delta").load(str(INFER_RESULTS_DELTA))

# derive alerts
def _top_feats(col):
    # col is array<struct<feature:string, contribution:double>>
    return F.transform(
        F.slice(
            F.array_sort(
                F.transform(col, lambda s: F.struct(F.col("s.contribution").alias("c"), F.col("s.feature").alias("f")))
            )[::-1], 1, 3
        ),
        lambda x: F.col("x.f")
    )

alerts_df = (
    df_ir
    .filter(F.col("composite_score").isNotNull() & (F.col("composite_score") >= F.lit(ALERT_THRESHOLD)))
    .select(
        F.expr("uuid()").alias("alert_id"),
        F.col("inference_ts").alias("alert_ts"),
        F.col("row_hash"),
        F.col("source_id").cast("string").alias("vehicle_id"),
        F.lit("composite_high").alias("alert_type"),
        F.col("anomaly_severity").cast("int").alias("severity"),
        F.col("composite_score"),
        F.array(F.lit("dense"),F.lit("lstm"),F.lit("isof"),F.lit("kde"),F.lit("gmm")).alias("triggering_models"),
        F.lit(None).cast("string").alias("reason"),
        _top_feats(F.col("explain_top_k")).alias("top_features"),
        F.col("model_versions"),
        F.col("inference_run_id"),
        F.lit(False).cast(BooleanType()).alias("acked"),
        F.lit(None).cast("string").alias("acked_by"),
        F.lit(None).cast("timestamp").alias("acked_ts"),
        F.array().cast(ArrayType(StringType())).alias("notified_channels"),
        F.array(F.col("row_hash").cast("string")).alias("linked_rows"),
        F.map().alias("extra"),
        F.col("date").cast("string")
    )
)

# append
alerts_df.write.format("delta").mode("append").save(str(ALERTS_DELTA))
print("[DER-ALERTS] wrote:", alerts_df.count())

# CSV (flatten maps/arrays to JSON)
alerts_csv = (
    alerts_df
    .withColumn("triggering_models_json", F.to_json("triggering_models"))
    .withColumn("top_features_json",      F.to_json("top_features"))
    .withColumn("model_versions_json",    F.to_json("model_versions"))
    .withColumn("notified_channels_json", F.to_json("notified_channels"))
    .withColumn("linked_rows_json",       F.to_json("linked_rows"))
    .withColumn("extra_json",             F.to_json("extra"))
)

(alerts_csv
 .drop("triggering_models","top_features","model_versions","notified_channels","linked_rows","extra")
 .coalesce(1)
 .write.mode("overwrite").option("header","true")
 .csv((CSV_OUT_DIR / "alerts").as_posix()))

print("[DER-ALERTS] CSV ready:", (CSV_OUT_DIR / "alerts").as_posix())


PySparkValueError: [SLICE_WITH_STEP] Slice with step is not supported.

In [85]:
# DER-LSTM — reconstruct sliding windows (size=10) from inference_results and export CSV
from pyspark.sql import Window, functions as F, types as T
from pathlib import Path

CSV_OUT_DIR = Path(r"C:\engine_module_pipeline\infer_stage\csv")
CSV_OUT_DIR.mkdir(parents=True, exist_ok=True)

WINDOW = 10

df_ir = (spark.read.format("delta").load(str(INFER_RESULTS_DELTA))
         .select("source_id","timestamp","row_hash","date","recon_error_lstm","inference_run_id")
         .withColumn("timestamp", F.col("timestamp").cast("timestamp"))
         .withColumn("source_id", F.col("source_id").cast("string"))
        )

w = Window.partitionBy("source_id").orderBy(F.col("timestamp").cast("long")).rowsBetween(-(WINDOW-1), 0)

# Collect last N row_hashes and first/last timestamps in the frame
lstm_df = (
    df_ir
    .withColumn("row_hashes", F.collect_list(F.col("row_hash").cast("string")).over(w))
    .withColumn("window_start_ts", F.element_at(F.sort_array(F.collect_list(F.col("timestamp")).over(w)), 1))
    .withColumn("window_end_ts",   F.col("timestamp"))
    .filter(F.size("row_hashes") >= 2)  # need at least 2 points to be meaningful
    .withColumn("lstm_window_id", F.expr("uuid()"))
    .withColumn("reconstruction_error", F.col("recon_error_lstm").cast("double"))
    .withColumn("per_step_errors", F.array().cast(T.ArrayType(T.DoubleType())))
    .withColumn("model_version", F.lit("posthoc"))
    .withColumn("date", F.col("date").cast("string"))
    .select("lstm_window_id","window_start_ts","window_end_ts","row_hashes",
            "reconstruction_error","per_step_errors","model_version","inference_run_id","date")
)

# append
lstm_df.write.format("delta").mode("append").save(str(LSTM_WIN_DELTA))
print("[DER-LSTM] wrote:", lstm_df.count())

# CSV (flatten arrays)
lstm_csv = (lstm_df
    .withColumn("row_hashes_json", F.to_json("row_hashes"))
    .withColumn("per_step_errors_json", F.to_json("per_step_errors"))
    .drop("row_hashes","per_step_errors")
)
(lstm_csv.coalesce(1)
    .write.mode("overwrite").option("header","true")
    .csv((CSV_OUT_DIR / "lstm_windows").as_posix()))
print("[DER-LSTM] CSV ready:", (CSV_OUT_DIR / "lstm_windows").as_posix())


[DER-LSTM] wrote: 1999
[DER-LSTM] CSV ready: C:/engine_module_pipeline/infer_stage/csv/lstm_windows


In [86]:
# DER-VEH — derive vehicle health daily aggregates and export CSV
from pyspark.sql import functions as F
from delta.tables import DeltaTable
from pathlib import Path

CSV_OUT_DIR = Path(r"C:\engine_module_pipeline\infer_stage\csv")
CSV_OUT_DIR.mkdir(parents=True, exist_ok=True)

df_ir = spark.read.format("delta").load(str(INFER_RESULTS_DELTA)).select(
    F.col("source_id").cast("string").alias("vehicle_id"),
    F.col("date").cast("string").alias("date"),
    "row_hash","anomaly_severity","composite_score","inference_ts","model_versions"
)

veh_df = (
    df_ir.groupBy("vehicle_id","date")
         .agg(
             F.count("row_hash").alias("rows_count"),
             F.sum(F.when(F.col("anomaly_severity")>=2, 1).otherwise(0)).cast("long").alias("anomaly_count"),
             F.expr("percentile_approx(composite_score, 0.5)").alias("median_composite_score"),
             F.expr("percentile_approx(composite_score, 0.95)").alias("p95_composite_score"),
             F.max("inference_ts").alias("last_inference_ts"),
             F.first("model_versions", ignorenulls=True).alias("model_versions")
         )
         .withColumn("anomaly_rate",
             F.when(F.col("rows_count")>0, F.col("anomaly_count")/F.col("rows_count")).otherwise(F.lit(0.0))
         )
         .withColumn("health_score",
             F.expr("greatest(0.0, least(100.0, (1 - (0.6*median_composite_score + 0.3*anomaly_rate + 0.1*(1-1.0)))*100))")
         )
         .withColumn("days_since_last_alert", F.lit(None).cast("int"))
         .withColumn("top_failure_modes", F.array().cast("array<string>"))
         .withColumn("trend_flag", F.lit("steady"))
         .withColumn("estimated_rul", F.lit(None).cast("double"))
)

# MERGE on (vehicle_id, date) with recency
tgt_h = DeltaTable.forPath(spark, str(VEH_HEALTH_DELTA))
(tgt_h.alias("t")
     .merge(veh_df.alias("s"),
            "t.vehicle_id = s.vehicle_id AND t.date = s.date")
     .whenMatchedUpdate(
         condition="s.last_inference_ts > t.last_inference_ts",
         set={c: f"s.`{c}`" for c in veh_df.columns}
     )
     .whenNotMatchedInsert(values={c: f"s.`{c}`" for c in veh_df.columns})
     .execute())

print("[DER-VEH] upserted.")

# CSV (flatten maps/arrays)
veh_csv = (veh_df
    .withColumn("model_versions_json", F.to_json("model_versions"))
    .withColumn("top_failure_modes_json", F.to_json("top_failure_modes"))
    .drop("model_versions","top_failure_modes")
)
(veh_csv.coalesce(1)
        .write.mode("overwrite").option("header","true")
        .csv((CSV_OUT_DIR / "vehicle_health").as_posix()))
print("[DER-VEH] CSV ready:", (CSV_OUT_DIR / "vehicle_health").as_posix())


[DER-VEH] upserted.
[DER-VEH] CSV ready: C:/engine_module_pipeline/infer_stage/csv/vehicle_health


In [87]:
# DER-META — derive one metadata row per inference_run_id and export CSV
from pyspark.sql import functions as F
from pathlib import Path

CSV_OUT_DIR = Path(r"C:\engine_module_pipeline\infer_stage\csv")
CSV_OUT_DIR.mkdir(parents=True, exist_ok=True)

df_ir = spark.read.format("delta").load(str(INFER_RESULTS_DELTA)).select(
    "inference_run_id","inference_ts","model_versions","date"
).where(F.col("inference_run_id").isNotNull())

# pick the latest row per run_id
w = Window.partitionBy("inference_run_id").orderBy(F.col("inference_ts").desc())
meta_df = (
    df_ir
    .withColumn("rn", F.row_number().over(w))
    .filter(F.col("rn")==1)
    .select(
        "inference_run_id",
        F.col("inference_ts").alias("timestamp"),
        "model_versions",
        F.map().alias("params"),
        F.map().alias("baseline_stats"),
        F.lit(None).cast("string").alias("notes"),
        F.lit(None).cast("string").alias("source_commit"),
        F.col("date").cast("string").alias("date"),
    )
)

meta_df.write.format("delta").mode("append").save(str(MODEL_META_DELTA))
print("[DER-META] wrote:", meta_df.count())

# CSV (flatten maps)
meta_csv = meta_df.withColumn("model_versions_json", F.to_json("model_versions")) \
                  .withColumn("params_json", F.to_json("params")) \
                  .withColumn("baseline_stats_json", F.to_json("baseline_stats")) \
                  .drop("model_versions","params","baseline_stats")
(meta_csv.coalesce(1)
        .write.mode("overwrite").option("header","true")
        .csv((CSV_OUT_DIR / "model_metadata").as_posix()))
print("[DER-META] CSV ready:", (CSV_OUT_DIR / "model_metadata").as_posix())


AttributeError: module 'pyspark.sql.functions' has no attribute 'map'

In [89]:
# DER-LSTM-FULL — recompute complete LSTM windows with per-step errors and export CSV
import hashlib, uuid, numpy as np, pandas as pd
from pathlib import Path
from pyspark.sql import functions as F, types as T
from pyspark.sql import Row

from pathlib import Path

# Set this at the top of the notebook (or before the DER-LSTM-FULL cell)
LSTM_WIN_DELTA = Path(r"C:\engine_module_pipeline\infer_stage\delta\engine_module_lstm_windows")


# --------------------
# Config
# --------------------
WINDOW = 10  # fixed window length for LSTM windows
CSV_OUT_DIR = Path(r"C:\engine_module_pipeline\infer_stage\csv")
CSV_OUT_DIR.mkdir(parents=True, exist_ok=True)

# --------------------
# Preconditions
# --------------------
assert 'INFER_RESULTS_DELTA' in globals(), "Missing INFER_RESULTS_DELTA"
assert 'LSTM_WIN_DELTA' in globals(), "Missing LSTM_WIN_DELTA"
assert 'features' in globals() and isinstance(features, list) and len(features) == 25, "Features (25) not loaded"
# Scaler is needed only for model path
try:
    _scaler = load_scaler()
except Exception:
    _scaler = None
_lstm_entry = load_lstm()  # may be None (fallback path uses recon_error_lstm)

# --------------------
# Read inference_results minimal columns
# --------------------
cols_needed = ["source_id","timestamp","row_hash","date","recon_error_lstm","inference_run_id"] + features
ir_spark = spark.read.format("delta").load(str(INFER_RESULTS_DELTA)).select(*[c for c in cols_needed if c in spark.read.format("delta").load(str(INFER_RESULTS_DELTA)).columns])

# guard empties
if ir_spark.rdd.isEmpty():
    print("[DER-LSTM-FULL] inference_results is empty; nothing to do.")
else:
    # maintain ordering per source_id
    ir_pdf = (ir_spark
              .withColumn("timestamp", F.col("timestamp").cast("timestamp"))
              .orderBy(F.col("source_id").asc_nulls_last(), F.col("timestamp").asc_nulls_last())
              .toPandas())

    # quick clean
    ir_pdf = ir_pdf.dropna(subset=["timestamp", "row_hash"])
    if ir_pdf.empty:
        print("[DER-LSTM-FULL] no valid rows after cleaning; nothing to write.")
    else:
        # --------------------
        # Helper: stable window id from row_hashes
        # --------------------
        def stable_id(row_hashes):
            h = hashlib.sha1("|".join(map(str, row_hashes)).encode("utf-8")).hexdigest()
            # uuid5-like string for readability
            return str(uuid.UUID(h[:32]))

        # --------------------
        # Helper: model-based per-step error computation
        # For each step t in the window, score prefix [0..t] and compute MSE on last step
        # --------------------
        def per_step_errors_model(Xw, lstm):
            # Xw: np.array shape [W, F] (already scaled)
            errs = []
            with torch.no_grad():
                for t in range(WINDOW):
                    # Need at least 2 steps to compute a meaningful seq error
                    if t < 1:
                        errs.append(0.0)
                        continue
                    seq = Xw[:t+1]  # shape [t+1, F]
                    wt = torch.from_numpy(seq.astype("float32")).unsqueeze(0)  # [1, T, F]
                    yhat = lstm(wt)  # [1,1,F] (per your loader)
                    rec = yhat.squeeze(0).squeeze(0).cpu().numpy()
                    errs.append(float(((seq[-1] - rec)**2).mean()))
            return errs

        # --------------------
        # Helper: fallback per-step from existing recon_error_lstm (no model)
        # --------------------
        def per_step_errors_fallback(lstm_errors_slice):
            # Use the known last-step recon errors for each row in the window.
            arr = [float(x) if pd.notna(x) else 0.0 for x in lstm_errors_slice]
            # if all zeros, keep zeros; last element becomes reconstruction_error
            return arr

        # --------------------
        # Process per source_id in pandas
        # --------------------
        out_rows = []
        grouped = ir_pdf.groupby(ir_pdf["source_id"].astype(str), dropna=False)

        # Prepare model/scaler if present
        lstm_model = None
        model_version = "fallback"
        if _lstm_entry is not None and _scaler is not None:
            lstm_mode, lstm_model = _lstm_entry
            model_version = "ts" if lstm_mode == "ts" else "sd"

        for sid, gdf in grouped:
            gdf = gdf.sort_values("timestamp").reset_index(drop=True)

            # build full windows only (exact WINDOW size)
            if len(gdf) < WINDOW:
                continue

            # feature matrix (unscaled values -> scaled if we have a model)
            feat_mat = gdf[features].astype(float).values
            if lstm_model is not None and _scaler is not None:
                Xs = _scaler.transform(np.nan_to_num(feat_mat, copy=False))
            else:
                Xs = np.nan_to_num(feat_mat, copy=False)  # not used for model path anyway

            for end in range(WINDOW-1, len(gdf)):
                start = end - (WINDOW-1)
                win_slice = gdf.iloc[start:end+1]

                row_hashes = win_slice["row_hash"].astype(str).tolist()
                win_id = stable_id(row_hashes)

                # choose inference_run_id from the last row (most recent)
                run_id = str(win_slice["inference_run_id"].iloc[-1]) if "inference_run_id" in win_slice.columns else None

                # per-step errors
                if lstm_model is not None and _scaler is not None:
                    Xw = Xs[start:end+1]  # [WINDOW, F]
                    per_step = per_step_errors_model(Xw, lstm_model)
                else:
                    # fallback from recon_error_lstm column
                    per_step = per_step_errors_fallback(win_slice["recon_error_lstm"].tolist())

                reconstruction_error = float(per_step[-1]) if len(per_step) > 0 else None

                out_rows.append({
                    "lstm_window_id": win_id,
                    "window_start_ts": pd.to_datetime(win_slice["timestamp"].iloc[0], utc=True).tz_convert(None),
                    "window_end_ts":   pd.to_datetime(win_slice["timestamp"].iloc[-1], utc=True).tz_convert(None),
                    "row_hashes": row_hashes,
                    "reconstruction_error": reconstruction_error,
                    "per_step_errors": per_step,
                    "model_version": model_version,
                    "inference_run_id": run_id,
                    "date": str(pd.to_datetime(win_slice["timestamp"].iloc[-1]).date())
                })

        # --------------------
        # Write to Delta (append; stable ids prevent dupes across re-runs)
        # --------------------
        if not out_rows:
            print("[DER-LSTM-FULL] No windows created (not enough rows per vehicle).")
        else:
            out_pdf = pd.DataFrame(out_rows)
            # Create Spark DF and cast exactly to LSTM_WIN_SCHEMA
            tmp = spark.createDataFrame(out_pdf)
            lstm_df = tmp.select(
                F.col("lstm_window_id").cast("string").alias("lstm_window_id"),
                F.col("window_start_ts").cast("timestamp").alias("window_start_ts"),
                F.col("window_end_ts").cast("timestamp").alias("window_end_ts"),
                F.col("row_hashes").cast(T.ArrayType(T.StringType())).alias("row_hashes"),
                F.col("reconstruction_error").cast("double").alias("reconstruction_error"),
                F.col("per_step_errors").cast(T.ArrayType(T.DoubleType())).alias("per_step_errors"),
                F.col("model_version").cast("string").alias("model_version"),
                F.col("inference_run_id").cast("string").alias("inference_run_id"),
                F.col("date").cast("string").alias("date"),
            )

            lstm_df.write.format("delta").mode("append").save(str(LSTM_WIN_DELTA))
            wrote = lstm_df.count()
            print(f"[DER-LSTM-FULL] LSTM windows appended. rows={wrote}")

            # --------------------
            # CSV export (arrays as JSON strings)
            # --------------------
            lstm_csv = (lstm_df
                .withColumn("row_hashes_json", F.to_json("row_hashes"))
                .withColumn("per_step_errors_json", F.to_json("per_step_errors"))
                .drop("row_hashes","per_step_errors")
            )
            (lstm_csv.coalesce(1)
                     .write.mode("overwrite").option("header","true")
                     .csv((CSV_OUT_DIR / "lstm_windows_full").as_posix()))
            print("[DER-LSTM-FULL] CSV ready:", (CSV_OUT_DIR / "lstm_windows_full").as_posix())


[DER-LSTM-FULL] LSTM windows appended. rows=1991
[DER-LSTM-FULL] CSV ready: C:/engine_module_pipeline/infer_stage/csv/lstm_windows_full


In [90]:
# CELL — Inspect & Re-export LSTM Windows Table
from pyspark.sql import functions as F
import pandas as pd
import json
from pathlib import Path

# ------------------------------------------------------------------
# 1) Read Delta table
lstm_delta_path = r"C:\engine_module_pipeline\infer_stage\delta\engine_module_lstm_windows"
df_lstm = spark.read.format("delta").load(lstm_delta_path)

print("=== First 2 rows from Delta table ===")
df_lstm.show(2, truncate=False)     # prints to console
# Optionally collect a pandas slice for debugging
pdf_preview = df_lstm.limit(2).toPandas()
print("\nPandas preview of first two rows:\n", pdf_preview)

# ------------------------------------------------------------------
# 2) Re-export to CSV
#    Serialize array/map columns as JSON strings for clean CSV output
export_path = Path(r"C:\engine_module_pipeline\infer_stage\csv\lstm_windows_full")
export_path.mkdir(parents=True, exist_ok=True)

# Convert to pandas to handle JSON serialization
pdf = df_lstm.toPandas()

# Columns that are not plain scalars
array_like_cols = ["row_hashes", "per_step_errors"]

def to_json_if_needed(x):
    if isinstance(x, (list, dict)):
        return json.dumps(x)
    return x

for c in array_like_cols:
    if c in pdf.columns:
        pdf[c] = pdf[c].apply(to_json_if_needed)

out_csv = export_path / "lstm_windows_export.csv"
pdf.to_csv(out_csv, index=False, quoting=1)  # quoting=1 → quote all fields with commas/brackets

print(f"\n✅ LSTM windows table re-exported to:\n  {out_csv}")
print("Columns saved:", pdf.columns.tolist())
print("Row count    :", len(pdf))


=== First 2 rows from Delta table ===
+------------------------------------+-----------------------+-----------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------

In [95]:
# === ALERTS — FULL & REFINED (trip-aware), fixed `extra` handling ===
from pyspark.sql import functions as F
from pyspark.sql import Window
from pyspark.sql.types import (
    StringType, IntegerType, DoubleType, BooleanType, TimestampType, DateType,
    ArrayType, MapType, StructType, StructField
)
from delta.tables import DeltaTable
from pathlib import Path

# ---------- Paths ----------
REPO_ROOT = Path(r"C:\engine_module_pipeline")
DELTA_DIR = REPO_ROOT / r"infer_stage\delta"
CSV_DIR   = REPO_ROOT / r"infer_stage\csv"
CSV_DIR.mkdir(parents=True, exist_ok=True)

INFER_RESULTS_DELTA  = DELTA_DIR / "engine_module_inference_results"
ALERTS_DELTA         = DELTA_DIR / "engine_module_alerts"
ALERTS_REFINED_DELTA = DELTA_DIR / "engine_module_alerts_refined"

# ---------- Schemas ----------
ALERTS_SCHEMA = StructType([
    StructField("alert_id", StringType(), False),
    StructField("alert_ts", TimestampType(), False),
    StructField("row_hash", StringType(), True),
    StructField("vehicle_id", StringType(), True),
    StructField("alert_type", StringType(), True),
    StructField("severity", IntegerType(), True),
    StructField("composite_score", DoubleType(), True),
    StructField("triggering_models", ArrayType(StringType()), True),
    StructField("reason", StringType(), True),
    StructField("top_features", ArrayType(StringType()), True),
    StructField("model_versions", MapType(StringType(), StringType()), True),
    StructField("inference_run_id", StringType(), True),
    StructField("acked", BooleanType(), True),
    StructField("acked_by", StringType(), True),
    StructField("acked_ts", TimestampType(), True),
    StructField("notified_channels", ArrayType(StringType()), True),
    StructField("linked_rows", ArrayType(StringType()), True),
    StructField("extra", MapType(StringType(), StringType()), True),
    StructField("date", StringType(), True),
])

ALERTS_REFINED_SCHEMA = StructType([
    StructField("alert_group_id", StringType(), False),
    StructField("vehicle_id", StringType(), False),
    StructField("trip_id", StringType(), False),
    StructField("alert_type", StringType(), False),
    StructField("start_ts", TimestampType(), False),
    StructField("end_ts", TimestampType(), False),
    StructField("alerts_count", IntegerType(), False),
    StructField("max_severity", IntegerType(), False),
    StructField("max_composite_score", DoubleType(), False),
    StructField("representative_alert_id", StringType(), True),
    StructField("merged_alert_ids", ArrayType(StringType()), True),
    StructField("top_features_union", ArrayType(StringType()), True),
    StructField("model_versions", MapType(StringType(), StringType()), True),
    StructField("date", StringType(), False),
])

def ensure_delta_table(path: Path, schema: StructType, partition_cols=None):
    partition_cols = partition_cols or []
    if (path / "_delta_log").exists():
        return
    empty = spark.createDataFrame([], schema)
    w = empty.write.format("delta").mode("overwrite")
    if partition_cols:
        w = w.partitionBy(*partition_cols)
    w.save(str(path))
    print("Initialized Delta table:", path)

ensure_delta_table(ALERTS_DELTA, ALERTS_SCHEMA, partition_cols=["date"])
ensure_delta_table(ALERTS_REFINED_DELTA, ALERTS_REFINED_SCHEMA, partition_cols=["date"])

# ---------- Typed empty literals for arrays / maps ----------
EMPTY_STR_ARRAY = F.from_json(F.lit("[]"), ArrayType(StringType()))
EMPTY_STR_MAP   = F.from_json(F.lit("{}"), MapType(StringType(), StringType()))

# ---------- Parameters ----------
ALERT_THRESHOLD        = 0.75
TRIP_GAP_MINUTES       = 20   # gap > 20m => new trip
MERGE_SAME_TYPE_GAP_MIN= 5    # within-trip same-type alerts within 5m collapse

# ---------- 1) Load inference results above threshold ----------
res_df = (
    spark.read.format("delta").load(str(INFER_RESULTS_DELTA))
         .select(
             "row_hash","timestamp","date","source_id",
             "composite_score","anomaly_severity","explain_top_k",
             "model_versions","inference_run_id"
         )
)

cand = res_df.where(F.col("composite_score") >= F.lit(ALERT_THRESHOLD))
cand = cand.withColumn("alert_type", F.lit("composite_high"))

# extract top_features = array<string>
def top3_features(col):
    return F.transform(F.slice(col, 1, 3), lambda s: F.coalesce(s["feature"].cast("string"), F.lit("")))

cand = cand.withColumn(
    "top_features",
    F.when(F.col("explain_top_k").isNotNull(), top3_features(F.col("explain_top_k")))
     .otherwise(EMPTY_STR_ARRAY)
)

# ---------- 2) Build FULL alerts ----------
full_alerts = (
    cand
    .withColumn("alert_id", F.expr("uuid()"))
    .withColumn("alert_ts", F.col("timestamp").cast(TimestampType()))
    .withColumn("vehicle_id", F.col("source_id").cast(StringType()))
    .withColumn("severity", F.col("anomaly_severity").cast(IntegerType()))
    .withColumn("triggering_models", F.array(F.lit("dense"), F.lit("lstm"), F.lit("isof"), F.lit("kde"), F.lit("gmm")))
    .withColumn("reason", F.lit("Composite score exceeded threshold"))
    .withColumn("notified_channels", EMPTY_STR_ARRAY)
    .withColumn("linked_rows", F.array(F.col("row_hash").cast(StringType())))
    .withColumn("acked", F.lit(False).cast(BooleanType()))
    .withColumn("acked_by", F.lit(None).cast(StringType()))
    .withColumn("acked_ts", F.lit(None).cast(TimestampType()))
    .withColumn("extra", EMPTY_STR_MAP)  # always a typed map; fixes NullType issues
    .select(
        "alert_id","alert_ts","row_hash","vehicle_id","alert_type","severity",
        "composite_score","triggering_models","reason","top_features",
        "model_versions","inference_run_id",
        "acked","acked_by","acked_ts","notified_channels","linked_rows",
        "extra",
        F.col("date").cast(StringType()).alias("date")
    )
)

# name-cast by schema (no stray literals)
full_alerts_s = full_alerts.select(
    *[
        (F.col(f.name).cast(f.dataType)
         if isinstance(f.dataType, (StringType, IntegerType, DoubleType, BooleanType, TimestampType, DateType))
         else F.col(f.name)
        ).alias(f.name)
        for f in ALERTS_SCHEMA
    ]
)

full_alerts_s.write.format("delta").mode("append").save(str(ALERTS_DELTA))
print("[ALERTS] FULL alerts written →", ALERTS_DELTA)

# ---------- 3) Trip detection ----------
a = full_alerts_s.select("vehicle_id","alert_id","alert_ts","alert_type","severity","composite_score","top_features","model_versions","date")
wVehTs = Window.partitionBy("vehicle_id").orderBy("alert_ts")

a_seg = (
    a
    .withColumn("prev_ts", F.lag("alert_ts").over(wVehTs))
    .withColumn("gap_min",
        F.when(F.col("prev_ts").isNull(), F.lit(999999.0))
         .otherwise((F.col("alert_ts").cast("long") - F.col("prev_ts").cast("long")) / 60.0)
    )
    .withColumn("trip_break", F.when(F.col("gap_min") > F.lit(TRIP_GAP_MINUTES), F.lit(1)).otherwise(F.lit(0)))
    .withColumn("trip_seq", F.sum("trip_break").over(wVehTs))
    .withColumn("trip_id", F.concat_ws("-", F.col("vehicle_id"), F.col("trip_seq").cast("string")))
    .drop("prev_ts","gap_min","trip_break","trip_seq")
)

# ---------- 4) Within-trip merge of same-type alerts ----------
wTripType = Window.partitionBy("vehicle_id","trip_id","alert_type").orderBy("alert_ts")
with_gaps = (
    a_seg
    .withColumn("prev_ts2", F.lag("alert_ts").over(wTripType))
    .withColumn("gap_min2",
        F.when(F.col("prev_ts2").isNull(), F.lit(999999.0))
         .otherwise((F.col("alert_ts").cast("long") - F.col("prev_ts2").cast("long")) / 60.0)
    )
    .withColumn("grp_bump", F.when(F.col("gap_min2") > F.lit(MERGE_SAME_TYPE_GAP_MIN), F.lit(1)).otherwise(F.lit(0)))
    .withColumn("local_group", F.sum("grp_bump").over(wTripType))
    .drop("prev_ts2","gap_min2","grp_bump")
)

ref_groups = with_gaps.withColumn(
    "alert_group_id",
    F.concat_ws(":", F.lit("grp"), F.col("vehicle_id"), F.col("trip_id"), F.col("alert_type"), F.col("local_group").cast("string"))
)

agg = (
    ref_groups
    .groupBy("alert_group_id","vehicle_id","trip_id","alert_type")
    .agg(
        F.min("alert_ts").alias("start_ts"),
        F.max("alert_ts").alias("end_ts"),
        F.count(F.lit(1)).alias("alerts_count"),
        F.max("severity").alias("max_severity"),
        F.max("composite_score").alias("max_composite_score"),
        F.first(
            F.sort_array(
                F.transform(F.array(F.struct("composite_score","alert_id")),
                            lambda x: x),
                asc=False
            )[0]["alert_id"]
        ).alias("representative_alert_id"),
        F.collect_list("alert_id").alias("merged_alert_ids"),
        F.flatten(F.collect_list("top_features")).alias("top_features_union_flat"),
        F.last("model_versions", ignorenulls=True).alias("model_versions"),
        F.max("date").alias("date")
    )
    .withColumn("top_features_union", F.array_distinct(F.col("top_features_union_flat")))
    .drop("top_features_union_flat")
)

refined_s = agg.select(
    "alert_group_id","vehicle_id","trip_id","alert_type",
    "start_ts","end_ts","alerts_count","max_severity","max_composite_score",
    "representative_alert_id","merged_alert_ids","top_features_union","model_versions",
    F.col("date").cast(StringType()).alias("date")
)

# coerce null arrays/maps to typed empties
refined_s = (
    refined_s
    .withColumn("merged_alert_ids",
        F.when(F.col("merged_alert_ids").isNull(), EMPTY_STR_ARRAY).otherwise(F.col("merged_alert_ids")))
    .withColumn("top_features_union",
        F.when(F.col("top_features_union").isNull(), EMPTY_STR_ARRAY).otherwise(F.col("top_features_union")))
    .withColumn("model_versions",
        F.when(F.col("model_versions").isNull(), EMPTY_STR_MAP).otherwise(F.col("model_versions")))
)

refined_s = refined_s.select(
    *[
        (F.col(f.name).cast(f.dataType)
         if isinstance(f.dataType, (StringType, IntegerType, DoubleType, BooleanType, TimestampType, DateType))
         else F.col(f.name)
        ).alias(f.name)
        for f in ALERTS_REFINED_SCHEMA
    ]
)

refined_s.write.format("delta").mode("append").save(str(ALERTS_REFINED_DELTA))
print("[ALERTS] REFINED alerts written →", ALERTS_REFINED_DELTA)

# ---------- 5) CSV exports (complex columns → JSON strings) ----------
def export_csv(df, out_dir: Path):
    out = df
    # Add JSON shadow columns for complex types if present
    for c in ["triggering_models","top_features","linked_rows","notified_channels","extra","model_versions",
              "merged_alert_ids","top_features_union"]:
        if c in out.columns:
            out = out.withColumn(c + "_json", F.to_json(F.col(c)))
            out = out.drop(c)
    (out.coalesce(1)
        .write.mode("overwrite")
        .option("header", True)
        .csv(str(out_dir)))

alerts_full_df    = spark.read.format("delta").load(str(ALERTS_DELTA))
alerts_refined_df = spark.read.format("delta").load(str(ALERTS_REFINED_DELTA))

export_csv(alerts_full_df,    CSV_DIR / "alerts_full")
export_csv(alerts_refined_df, CSV_DIR / "alerts_refined")

print("CSV full dir    :", CSV_DIR / "alerts_full")
print("CSV refined dir :", CSV_DIR / "alerts_refined")


Initialized Delta table: C:\engine_module_pipeline\infer_stage\delta\engine_module_alerts
Initialized Delta table: C:\engine_module_pipeline\infer_stage\delta\engine_module_alerts_refined
[ALERTS] FULL alerts written → C:\engine_module_pipeline\infer_stage\delta\engine_module_alerts
[ALERTS] REFINED alerts written → C:\engine_module_pipeline\infer_stage\delta\engine_module_alerts_refined
CSV full dir    : C:\engine_module_pipeline\infer_stage\csv\alerts_full
CSV refined dir : C:\engine_module_pipeline\infer_stage\csv\alerts_refined


In [96]:
# === ALERTS CSV — robust re-export + preview first 2 rows from each ===
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pathlib import Path

REPO_ROOT = Path(r"C:\engine_module_pipeline")
DELTA_DIR = REPO_ROOT / r"infer_stage\delta"
CSV_DIR   = REPO_ROOT / r"infer_stage\csv"

ALERTS_DELTA         = DELTA_DIR / "engine_module_alerts"
ALERTS_REFINED_DELTA = DELTA_DIR / "engine_module_alerts_refined"

CSV_FULL_DIR    = CSV_DIR / "alerts_full_clean"
CSV_REFINED_DIR = CSV_DIR / "alerts_refined_clean"

# 1) Load delta tables
alerts_full_df    = spark.read.format("delta").load(str(ALERTS_DELTA))
alerts_refined_df = spark.read.format("delta").load(str(ALERTS_REFINED_DELTA))

# 2) JSON-stringify complex fields to avoid CSV column shifts
def make_csv_safe(df):
    out = df
    complex_cols = []
    for c, dt in df.dtypes:
        # detect complex types by Spark dtype string
        if any(dt.startswith(prefix) for prefix in ("array", "map", "struct")):
            complex_cols.append(c)
    # add shadow columns with _json suffix and drop original complex
    for c in complex_cols:
        out = out.withColumn(c + "_json", F.to_json(F.col(c)))
        out = out.drop(c)
    # ensure all remaining are cast to string for consistent quoting
    for c in out.columns:
        out = out.withColumn(c, F.col(c).cast(T.StringType()))
    return out

alerts_full_csv    = make_csv_safe(alerts_full_df)
alerts_refined_csv = make_csv_safe(alerts_refined_df)

# 3) Write CSV with strict quoting/escaping
def write_csv(df, target_dir: Path):
    (df.coalesce(1)
       .write.mode("overwrite")
       .option("header", True)
       .option("quote", '"')
       .option("escape", '"')
       .option("quoteAll", True)
       .option("multiLine", False)
       .csv(str(target_dir)))

write_csv(alerts_full_csv,    CSV_FULL_DIR)
write_csv(alerts_refined_csv, CSV_REFINED_DIR)

print("Wrote CSV:")
print("  Full   :", CSV_FULL_DIR)
print("  Refined:", CSV_REFINED_DIR)

# 4) Read back and show first 2 rows from each (for sanity)
def preview_csv(path: Path, name: str):
    df = (spark.read
                .option("header", True)
                .option("quote", '"')
                .option("escape", '"')
                .csv(str(path)))
    print(f"\n=== Preview: {name} (first 2 rows) ===")
    df.show(2, truncate=False)

preview_csv(CSV_FULL_DIR,    "alerts_full_clean")
preview_csv(CSV_REFINED_DIR, "alerts_refined_clean")


Wrote CSV:
  Full   : C:\engine_module_pipeline\infer_stage\csv\alerts_full_clean
  Refined: C:\engine_module_pipeline\infer_stage\csv\alerts_refined_clean

=== Preview: alerts_full_clean (first 2 rows) ===
+------------------------------------+-----------------------+----------------------------------------------------------------+----------+--------------+--------+------------------+----------------------------------+------------------------------------+-----+--------+--------+----------+-----------------------------------+----------------------------------------------------------------------------------------------------+------------------------------------------+----------------------+--------------------------------------------------------------------+----------+
|alert_id                            |alert_ts               |row_hash                                                        |vehicle_id|alert_type    |severity|composite_score   |reason                            |infer

In [97]:
# === VEHICLE HEALTH SUMMARY — build from results+alerts, write Delta+CSV, preview ===
from pathlib import Path
from pyspark.sql import functions as F, Window as W
from pyspark.sql import types as T

# ----- Paths -----
REPO_ROOT = Path(r"C:\engine_module_pipeline")
DELTA_DIR = REPO_ROOT / r"infer_stage\delta"
CSV_DIR   = REPO_ROOT / r"infer_stage\csv"

INFER_RESULTS_DELTA = DELTA_DIR / "engine_module_inference_results"
ALERTS_DELTA        = DELTA_DIR / "engine_module_alerts"
VEH_HEALTH_DELTA    = DELTA_DIR / "vehicle_health_summary"

CSV_VEH_DIR = CSV_DIR / "vehicle_health_summary_clean"

# ----- Load sources -----
res = spark.read.format("delta").load(str(INFER_RESULTS_DELTA)) \
        .select(
            F.col("source_id").alias("vehicle_id"),
            F.col("date").cast("string").alias("date"),
            F.col("row_hash"),
            F.col("anomaly_severity").cast("int").alias("anomaly_severity"),
            F.col("composite_score").cast("double").alias("composite_score"),
            F.col("inference_ts").cast("timestamp").alias("inference_ts"),
            F.col("model_versions").cast("map<string,string>").alias("model_versions")
        )

alerts = spark.read.format("delta").load(str(ALERTS_DELTA)) \
        .select(
            F.col("vehicle_id").cast("string").alias("vehicle_id"),
            F.col("date").cast("string").alias("date"),
            F.col("alert_ts").cast("timestamp").alias("alert_ts"),
            F.col("top_features").cast("array<string>").alias("top_features")
        )

# ----- Aggregate per vehicle_id + date from inference_results -----
# rows_count, anomaly_count (severity >=2), median & p95 composite, last_inference_ts
agg_base = (
    res.groupBy("vehicle_id", "date")
       .agg(
           F.count("*").alias("rows_count"),
           F.sum(F.when(F.col("anomaly_severity") >= 2, 1).otherwise(0)).alias("anomaly_count"),
           F.expr("percentile_approx(composite_score, 0.5)").alias("median_composite_score"),
           F.expr("percentile_approx(composite_score, 0.95)").alias("p95_composite_score"),
           F.max("inference_ts").alias("last_inference_ts")
       )
)

# Bring forward a representative model_versions per vehicle/day (from the latest row that day)
w_day = W.partitionBy("vehicle_id", "date").orderBy(F.col("inference_ts").desc())
versions_latest = (
    res.withColumn("rn", F.row_number().over(w_day))
       .where(F.col("rn") == 1)
       .select("vehicle_id", "date", F.col("model_versions").alias("model_versions_latest"))
)

veh_day = (
    agg_base.join(versions_latest, on=["vehicle_id","date"], how="left")
            .withColumn("model_versions",
                        F.when(F.col("model_versions_latest").isNotNull(), F.col("model_versions_latest"))
                         .otherwise(F.create_map()))  # empty map if missing
            .drop("model_versions_latest")
)

# anomaly_rate
veh_day = veh_day.withColumn(
    "anomaly_rate",
    F.when(F.col("rows_count") > 0, F.col("anomaly_count") / F.col("rows_count")).otherwise(F.lit(0.0))
)

# ----- Last alert per vehicle (across all dates) & days_since_last_alert per day -----
last_alert = (
    alerts.groupBy("vehicle_id")
          .agg(F.max(F.to_date("alert_ts")).alias("last_alert_date"))
)

veh_day = (
    veh_day.join(last_alert, on="vehicle_id", how="left")
           .withColumn("day_date", F.to_date("date"))
           .withColumn(
               "days_since_last_alert",
               F.when(F.col("last_alert_date").isNotNull(),
                      F.datediff(F.col("day_date"), F.col("last_alert_date")))
                .otherwise(F.lit(None).cast("int"))
           )
           .drop("day_date")
)

# ----- Top failure modes (from alerts top_features) per vehicle_id+date -----
# Explode top_features for that exact date and vehicle, count frequency, take top 3
af = (alerts
      .withColumn("feat", F.explode(F.coalesce("top_features", F.array().cast("array<string>"))))
      .groupBy("vehicle_id", "date", "feat")
      .agg(F.count("*").alias("cnt")))

w_feat = W.partitionBy("vehicle_id", "date").orderBy(F.col("cnt").desc(), F.col("feat").asc())
top_feats_ranked = af.withColumn("rk", F.row_number().over(w_feat)).where(F.col("rk") <= 3)

top_feats = (top_feats_ranked
             .groupBy("vehicle_id", "date")
             .agg(F.collect_list("feat").alias("top_failure_modes")))

veh_day = veh_day.join(top_feats, on=["vehicle_id","date"], how="left") \
                 .withColumn("top_failure_modes",
                             F.coalesce(F.col("top_failure_modes"), F.array().cast("array<string>")))

# ----- Trend flag (compare median against previous day for that vehicle) -----
w_trend = W.partitionBy("vehicle_id").orderBy(F.to_date("date"))
veh_day = veh_day.withColumn(
    "prev_median", F.lag(F.col("median_composite_score")).over(w_trend)
)
veh_day = veh_day.withColumn(
    "trend_flag",
    F.when(F.col("prev_median").isNull(), F.lit("steady"))
     .when(F.col("median_composite_score") - F.col("prev_median") > F.lit(0.10), F.lit("worsening"))
     .when(F.col("median_composite_score") - F.col("prev_median") < F.lit(-0.10), F.lit("improving"))
     .otherwise(F.lit("steady"))
).drop("prev_median")

# ----- Health score (same logic you used earlier) -----
# health_score = clip( (1 - (0.6*median + 0.3*anomaly_rate + 0.1*(1 - recency))) * 100 )
veh_day = veh_day.withColumn("recency", F.lit(1.0))
veh_day = veh_day.withColumn(
    "health_score_raw",
    1.0 - (0.6*F.col("median_composite_score") + 0.3*F.col("anomaly_rate") + 0.1*(1.0 - F.col("recency")))
)
veh_day = veh_day.withColumn("health_score",
                             F.when(F.col("health_score_raw") < 0, 0.0)
                              .when(F.col("health_score_raw") > 1, 1.0)
                              .otherwise(F.col("health_score_raw")) * 100.0
                            ).drop("health_score_raw","recency")

# ----- Estimated RUL (simple, monotone with health) -----
veh_day = veh_day.withColumn(
    "estimated_rul",
    F.round((F.col("health_score")/100.0) * F.lit(60.0), 1).cast("double")
)

# ----- Normalize columns to match VEH_HEALTH_SCHEMA (types + names) -----
veh_day_out = (veh_day
    .select(
        F.col("vehicle_id").cast("string").alias("vehicle_id"),
        F.col("date").cast("string").alias("date"),
        F.col("rows_count").cast("long").alias("rows_count"),
        F.col("anomaly_count").cast("long").alias("anomaly_count"),
        F.col("anomaly_rate").cast("double").alias("anomaly_rate"),
        F.col("median_composite_score").cast("double").alias("median_composite_score"),
        F.col("p95_composite_score").cast("double").alias("p95_composite_score"),
        F.col("health_score").cast("double").alias("health_score"),
        F.col("days_since_last_alert").cast("int").alias("days_since_last_alert"),
        F.col("top_failure_modes").cast("array<string>").alias("top_failure_modes"),
        F.col("trend_flag").cast("string").alias("trend_flag"),
        F.col("estimated_rul").cast("double").alias("estimated_rul"),
        F.col("model_versions").cast("map<string,string>").alias("model_versions"),
        F.col("last_inference_ts").cast("timestamp").alias("last_inference_ts"),
    )
)

# ----- Ensure Delta table exists with your preset schema (call your existing helper if available) -----
# (If you already initialized the table earlier, this will just write.)
from delta.tables import DeltaTable
try:
    DeltaTable.forPath(spark, str(VEH_HEALTH_DELTA))
    table_exists = True
except Exception:
    table_exists = False

if not table_exists:
    # Minimal schema (matches your earlier VEH_HEALTH_SCHEMA)
    from pyspark.sql.types import *
    VEH_HEALTH_SCHEMA = StructType([
        StructField("vehicle_id", StringType(), False),
        StructField("date", StringType(), False),
        StructField("rows_count", LongType(), True),
        StructField("anomaly_count", LongType(), True),
        StructField("anomaly_rate", DoubleType(), True),
        StructField("median_composite_score", DoubleType(), True),
        StructField("p95_composite_score", DoubleType(), True),
        StructField("health_score", DoubleType(), True),
        StructField("days_since_last_alert", IntegerType(), True),
        StructField("top_failure_modes", ArrayType(StringType()), True),
        StructField("trend_flag", StringType(), True),
        StructField("estimated_rul", DoubleType(), True),
        StructField("model_versions", MapType(StringType(), StringType()), True),
        StructField("last_inference_ts", TimestampType(), True),
    ])
    spark.createDataFrame([], VEH_HEALTH_SCHEMA) \
         .write.format("delta").mode("overwrite").save(str(VEH_HEALTH_DELTA))

# ----- MERGE into Delta (vehicle_id + date) -----
tgt = DeltaTable.forPath(spark, str(VEH_HEALTH_DELTA))
cols = veh_day_out.columns
(
  tgt.alias("t")
     .merge(
        veh_day_out.alias("s"),
        "t.vehicle_id = s.vehicle_id AND t.date = s.date"
     )
     .whenMatchedUpdate(
        condition="s.last_inference_ts > t.last_inference_ts",
        set={c: f"s.`{c}`" for c in cols}
     )
     .whenNotMatchedInsert(values={c: f"s.`{c}`" for c in cols})
     .execute()
)

# ----- Preview first row from the Delta table -----
print("=== vehicle_health_summary (first row) ===")
spark.read.format("delta").load(str(VEH_HEALTH_DELTA)).show(1, truncate=False)

# ----- CSV export (JSON-safe for complex columns) -----
def make_csv_safe(df):
    out = df
    # JSONify complex columns
    for c, dt in df.dtypes:
        if dt.startswith("array") or dt.startswith("map") or dt.startswith("struct"):
            out = out.withColumn(c + "_json", F.to_json(F.col(c))).drop(c)
    # Cast remaining to string & quote all
    for c in out.columns:
        out = out.withColumn(c, F.col(c).cast("string"))
    return out

veh_csv = make_csv_safe(spark.read.format("delta").load(str(VEH_HEALTH_DELTA)))

(veh_csv.coalesce(1)
        .write.mode("overwrite")
        .option("header", True)
        .option("quote", '"')
        .option("escape", '"')
        .option("quoteAll", True)
        .option("multiLine", False)
        .csv(str(CSV_VEH_DIR)))

print("CSV written to:", CSV_VEH_DIR)

# Read back CSV and print first row to confirm header alignment
csv_back = (spark.read
                 .option("header", True)
                 .option("quote", '"')
                 .option("escape", '"')
                 .csv(str(CSV_VEH_DIR)))
print("=== vehicle_health_summary CSV (first row) ===")
csv_back.show(1, truncate=False)


=== vehicle_health_summary (first row) ===
+----------+----------+----------+-------------+-------------------+----------------------+-------------------+------------------+---------------------+------------------------------------------------------------------------------------------------+----------+-------------+-----------------------------------------+--------------------------+
|vehicle_id|date      |rows_count|anomaly_count|anomaly_rate       |median_composite_score|p95_composite_score|health_score      |days_since_last_alert|top_failure_modes                                                                               |trend_flag|estimated_rul|model_versions                           |last_inference_ts         |
+----------+----------+----------+-------------+-------------------+----------------------+-------------------+------------------+---------------------+------------------------------------------------------------------------------------------------+----------+---------

In [99]:
# === MODEL METADATA — build per inference_run_id, write Delta+CSV, preview ===
from pathlib import Path
from pyspark.sql import functions as F
from pyspark.sql import types as T
from delta.tables import DeltaTable

# ----- Paths -----
REPO_ROOT = Path(r"C:\engine_module_pipeline")
RES_DELTA = REPO_ROOT / r"infer_stage\delta\engine_module_inference_results"   # source
META_DELTA = REPO_ROOT / r"delta\engine_module_model_metadata"                 # target (as requested)
CSV_DIR   = REPO_ROOT / r"infer_stage\csv\engine_module_model_metadata_clean"

# ----- Load inference results (source) -----
res = (spark.read.format("delta").load(str(RES_DELTA))
         .select(
            F.col("inference_run_id").cast("string").alias("inference_run_id"),
            F.col("inference_ts").cast("timestamp").alias("inference_ts"),
            F.col("date").cast("string").alias("date"),
            F.col("model_versions").cast("map<string,string>").alias("model_versions"),
            # Stats inputs (optional but nice to carry into baseline_stats)
            F.col("recon_error_dense").cast("double").alias("recon_error_dense"),
            F.col("recon_error_lstm").cast("double").alias("recon_error_lstm"),
            F.col("isolation_score").cast("double").alias("isolation_score"),
            F.col("kde_logp").cast("double").alias("kde_logp"),
            F.col("gmm_logp").cast("double").alias("gmm_logp"),
            F.col("composite_score").cast("double").alias("composite_score")
         )
      )

# Guard for empty source
if res.head(1):
    # ----- Choose a representative row per run (latest inference_ts) -----
    w = F.window("inference_ts", "36500 days")  # big window not used; we’ll window by run_id instead
    from pyspark.sql import Window
    wrun = Window.partitionBy("inference_run_id").orderBy(F.col("inference_ts").desc())

    res_ranked = res.withColumn("rn", F.row_number().over(wrun))
    reps = (res_ranked.where(F.col("rn") == 1)
                      .select("inference_run_id", "inference_ts", "date", "model_versions"))

    # ----- Aggregate baseline stats per run (string map) -----
    # We compute robust-ish summaries and stringify them for baseline_stats.
    stats = (res.groupBy("inference_run_id")
                .agg(
                    F.expr("percentile_approx(recon_error_dense, 0.5)").alias("dense_p50"),
                    F.expr("percentile_approx(recon_error_dense, 0.95)").alias("dense_p95"),
                    F.expr("percentile_approx(recon_error_lstm, 0.5)").alias("lstm_p50"),
                    F.expr("percentile_approx(recon_error_lstm, 0.95)").alias("lstm_p95"),
                    F.expr("percentile_approx(isolation_score, 0.5)").alias("isof_p50"),
                    F.expr("percentile_approx(isolation_score, 0.05)").alias("isof_p05"),
                    F.expr("percentile_approx(kde_logp, 0.5)").alias("kde_p50"),
                    F.expr("percentile_approx(kde_logp, 0.05)").alias("kde_p05"),
                    F.expr("percentile_approx(gmm_logp, 0.5)").alias("gmm_p50"),
                    F.expr("percentile_approx(gmm_logp, 0.05)").alias("gmm_p05"),
                    F.expr("percentile_approx(composite_score, 0.5)").alias("comp_p50"),
                    F.expr("percentile_approx(composite_score, 0.95)").alias("comp_p95")
                ))

    # stringify numerics into a map<string,string>
    def str_map(cols):
        pairs = []
        for c in cols:
            pairs.append(F.lit(c))
            pairs.append(F.coalesce(F.format_number(F.col(c), 6), F.lit("null")))
        return F.create_map(*pairs)

    baseline_cols = ["dense_p50","dense_p95","lstm_p50","lstm_p95","isof_p50","isof_p05",
                     "kde_p50","kde_p05","gmm_p50","gmm_p05","comp_p50","comp_p95"]
    stats_str = stats.select(
        F.col("inference_run_id"),
        str_map(baseline_cols).alias("baseline_stats")
    )

    # ----- Params map (minimal but complete; keep in sync with your pipeline knobs) -----
    PARAMS = {
        "LSTM_WINDOW": "10",
        "ALERT_THRESHOLD": "0.75",
        "WEIGHTS": '{"dense":0.35,"lstm":0.25,"isof":0.20,"kde":0.10,"gmm":0.10}',
        "ROBUST_SCALE": "RobustScaler(25-75) train-only"
    }
    params_df = (reps
                 .select("inference_run_id")
                 .withColumn("params",
                             F.create_map(
                                 *[x for kv in PARAMS.items() for x in (F.lit(kv[0]), F.lit(kv[1]))]
                             ).cast("map<string,string>")
                            )
                )

    # ----- Assemble final rows as per schema -----
    meta = (reps.join(stats_str, "inference_run_id", "left")
                 .join(params_df, "inference_run_id", "left")
                 .withColumnRenamed("inference_ts", "timestamp")
                 .withColumn("notes", F.lit(None).cast("string"))
                 .withColumn("source_commit", F.lit(None).cast("string"))
            )

    # Normalize types and order to preset schema
    meta_out = (meta
        .select(
            F.col("inference_run_id").cast("string").alias("inference_run_id"),
            F.col("timestamp").cast("timestamp").alias("timestamp"),
            F.col("model_versions").cast("map<string,string>").alias("model_versions"),
            F.coalesce(F.col("params").cast("map<string,string>"), F.create_map()).alias("params"),
            F.coalesce(F.col("baseline_stats").cast("map<string,string>"), F.create_map()).alias("baseline_stats"),
            F.col("notes").cast("string").alias("notes"),
            F.col("source_commit").cast("string").alias("source_commit"),
            F.col("date").cast("string").alias("date")
        )
    )

    # ----- Ensure target Delta table exists with preset schema -----
    MODEL_METADATA_SCHEMA = T.StructType([
        T.StructField("inference_run_id", T.StringType(), False),
        T.StructField("timestamp", T.TimestampType(), False),
        T.StructField("model_versions", T.MapType(T.StringType(), T.StringType()), False),
        T.StructField("params", T.MapType(T.StringType(), T.StringType()), True),
        T.StructField("baseline_stats", T.MapType(T.StringType(), T.StringType()), True),
        T.StructField("notes", T.StringType(), True),
        T.StructField("source_commit", T.StringType(), True),
        T.StructField("date", T.StringType(), False),
    ])

    try:
        DeltaTable.forPath(spark, str(META_DELTA))
        table_exists = True
    except Exception:
        table_exists = False

    if not table_exists:
        spark.createDataFrame([], MODEL_METADATA_SCHEMA) \
             .write.format("delta").mode("overwrite").save(str(META_DELTA))

    # ----- MERGE by inference_run_id (keep the freshest timestamp) -----
    tgt = DeltaTable.forPath(spark, str(META_DELTA))
    cols = [f.name for f in MODEL_METADATA_SCHEMA]
    (tgt.alias("t")
        .merge(meta_out.alias("s"), "t.inference_run_id = s.inference_run_id")
        .whenMatchedUpdate(
            condition="s.timestamp > t.timestamp",
            set={c: f"s.`{c}`" for c in cols}
        )
        .whenNotMatchedInsert(values={c: f"s.`{c}`" for c in cols})
        .execute())

    # ----- Preview first row from Delta table -----
    print("=== engine_module_model_metadata (first row) ===")
    spark.read.format("delta").load(str(META_DELTA)).show(1, truncate=False)

    # ----- CSV export mirroring the table (JSON-encode complex fields) -----
    def csv_safe(df):
        out = df
        for c, dt in df.dtypes:
            if dt.startswith("map") or dt.startswith("array") or dt.startswith("struct"):
                out = out.withColumn(c + "_json", F.to_json(F.col(c))).drop(c)
        for c in out.columns:
            out = out.withColumn(c, F.col(c).cast("string"))
        return out

    csv_df = csv_safe(spark.read.format("delta").load(str(META_DELTA)))
    (csv_df.coalesce(1)
          .write.mode("overwrite")
          .option("header", True)
          .option("quote", '"')
          .option("escape", '"')
          .option("quoteAll", True)
          .csv(str(CSV_DIR)))

    print("CSV written to:", CSV_DIR)

    # Read back CSV to confirm header alignment
    csv_back = (spark.read
                    .option("header", True)
                    .option("quote", '"')
                    .option("escape", '"')
                    .csv(str(CSV_DIR)))
    print("=== model_metadata CSV (first row) ===")
    csv_back.show(1, truncate=False)

else:
    print("[model_metadata] No inference_results found; nothing to build.")


=== engine_module_model_metadata (first row) ===
+------------------------------------+--------------------------+-----------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+-------------+----------+
|inference_run_id                    |timestamp                 |model_versions                           |params                                                                                                                                                               |baseline_stats                                                                                        