# Step 1: Importing Libraries and Setup

Installed **HistorianQuery** for historian data access, **PyCaret (full)** for AutoML, and **PyTorch with vision/audio** for deep learning on CPU.


In [0]:
!pip install historian_query pycaret[full]
!pip3 install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cpu
%pip install lifelines


In [0]:
dbutils.library.restartPython()

In [0]:
import json
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import plotly.express as px
import plotly.graph_objects as go
import logging
from itertools import chain
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import window, avg, create_map, lit, col, expr, from_unixtime, to_timestamp
from datetime import datetime, timedelta
from functools import singledispatchmethod
import pyspark.sql.functions as sfn
from tempo import TSDF
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error

## Step 1.1: Historian Class for Influx Objects

Hhelper to pull, clean, and regularize historian time-series data on Spark into tidy, evenly sampled series.
* **Three ways to init:**

  1. from an existing DataFrame; 2) from a table + list of tags; 3) from a table + aligned triplets (tag, reference, controller).
* **Schema expectations:** Table mode requires standard historian columns (tag/reference/controller, ts, value\_double, site, historian\_name, \_event\_date, optional line).
* **Resampling core:** `resample(start, end, ...)` does ceil-based resampling with forward-fill, pads edges, enforces a **forward-fill timeout** (stale values → null), and returns a slim TS (optionally joined to aliases).
* **Alias support:** Provide an alias map to enrich outputs with `alias`, while keeping reference/controller—useful for human-readable plotting and joins.
* **Null handling:** `ignore_nulls=False` turns nulls into a sentinel to preserve “no value” vs “stale after timeout”; then it restores nulls post-resample.
* **Safety checks:** Verifies `ff_timeout ≥ sample_freq`; filters by site/historian\_name/date (and line when present).
* **Utilities:** `get_raw_data(...)` gets unregularized slices; `get_latest_ts()` finds most recent timestamp; `with_alias(...)` returns **all** historian columns + alias for richer downstream use.


In [0]:
from functools import singledispatchmethod
from typing import List, Optional, Tuple, Union
from pyspark.sql import DataFrame, Row
from pyspark.sql import functions as sfn
from pyspark.sql.functions import col
from tempo import TSDF

class HistorianQuery:
    """
    Query & regularize time series from a historian table/DataFrame on Spark.

    Modes:
      1) DataFrame:
         HistorianQuery(df, sample_freq, ff_timeout, site, historian_name, start_date, end_date,
                        line: Optional[str]=None, ignore_nulls=False)

      2) Table + tag list:
         HistorianQuery(table_name: str, tag_list: list[str], sample_freq, ff_timeout,
                        site, historian_name, start_date, end_date,
                        line: Optional[str]=None, ignore_nulls=False)

      3) Table + three arrays (tag, reference, controller):
         HistorianQuery(table_name: str,
                        tag_names: list[str], ref_names: list[str], ctrl_names: list[str],
                        sample_freq, ff_timeout, site, historian_name, start_date, end_date,
                        line: Optional[str]=None, ignore_nulls=False)

    Notes:
      - In table modes, the table must contain:
        ["tag_name","reference_name","controller_name","ts","value_double",
         "_event_date","site","historian_name"] and optionally "line".
      - .resample(...) returns a slim TS (tag_name, ts, value_double, orig_ts)
      - .with_alias(alias) returns ALL historian columns + an 'alias' column.
    """

    # ---------------------------
    # DF mode
    # ---------------------------
    @singledispatchmethod
    def __init__(
        self,
        df: DataFrame,
        sample_freq: str,
        ff_timeout: str,
        site: str,
        historian_name: str,
        start_date: str,
        end_date: str,
        line: Optional[str] = None,
        ignore_nulls: bool = False,
    ):
        self.sample_freq = sample_freq
        self.ff_timeout = ff_timeout
        self.ignore_nulls = ignore_nulls
        self.column_types = {"value_double": "double"}

        # ✅ keep ALL columns here for alias joins later
        self.df_full = df

        # ✅ slim view used for resampling (NO 'alias' here!)
        self.required_cols = ["tag_name", "ts", "value_double"]
        self.df = df.select(*self.required_cols)

        if not spark.sql(f"select interval {ff_timeout} >= interval {sample_freq}").first()[0]:
            raise ValueError(
                "Timeout interval must be at least as long as sampling frequency, "
                f"but {ff_timeout} < {sample_freq}."
            )

        self.table_name = None
        self.tag_list = None

    # ---------------------------
    # Table mode (tag list OR triplets)
    # ---------------------------
    @__init__.register
    def _(
        self,
        table_name: str,
        list1: List[str],
        *rest,
        sample_freq: str,
        ff_timeout: str,
        site: str,
        historian_name: str,
        start_date: str,
        end_date: str,
        line: Optional[str] = None,
        ignore_nulls: bool = False,
    ):
        self.table_name = table_name
        self.sample_freq = sample_freq
        self.ff_timeout = ff_timeout
        self.ignore_nulls = ignore_nulls
        self.column_types = {"value_double": "double"}
        #  slim schema for Tempo only 
        self.required_cols = ["tag_name", "ts", "value_double"]

        if not spark.sql(f"select interval {ff_timeout} >= interval {sample_freq}").first()[0]:
            raise ValueError(
                "Timeout interval must be at least as long as sampling frequency, "
                f"but {ff_timeout} < {sample_freq}."
            )

        base = spark.table(table_name).where(
            (col("_event_date").between(start_date, end_date))
            & (col("site").ilike(f"%{site}%"))
            & (col("historian_name").ilike(f"%{historian_name}%"))
        )
        if line is not None and "line" in base.columns:
            base = base.where(col("line") == line)

        # Decide variant
        use_triples = len(rest) >= 2 and isinstance(rest[0], list) and isinstance(rest[1], list)
        if use_triples:
            tag_names = list1
            ref_names = rest[0]
            ctrl_names = rest[1]
            if not (len(tag_names) == len(ref_names) == len(ctrl_names)):
                raise ValueError(
                    "tag_names, ref_names, and ctrl_names must be the same length "
                    f"(got {len(tag_names)}, {len(ref_names)}, {len(ctrl_names)})"
                )

            triples_rows = [
                Row(tag_name=t, reference_name=r, controller_name=c)
                for t, r, c in zip(tag_names, ref_names, ctrl_names)
            ]
            filter_df = spark.createDataFrame(triples_rows).dropDuplicates()

            # keep ALL columns, filtered to the triplets
            cond = (
                base["tag_name"] == filter_df["tag_name"]
            ) & (
                base["controller_name"] == filter_df["controller_name"]
            ) & (
                base["reference_name"].eqNullSafe(filter_df["reference_name"])
            )
            df_full = base.join(filter_df, cond, "semi")
            self.tag_list = tag_names
        else:
            tag_list = list1
            df_full = base.where(col("tag_name").isin(tag_list))
            self.tag_list = tag_list

        # store both views
        self.df_full = df_full
        self.df = df_full.select(*self.required_cols)

    # ---------------------------
    # Public API
    # ---------------------------
    
    def resample(
        self,
        start_ts_str: str,
        end_ts_str: str,
        alias_map: Optional[Union[DataFrame, List[Tuple[str, Optional[str], str, str]]]] = None
    ) -> DataFrame:
        """
        Regularized time series using ceil-resample + forward fill.
        If alias_map is provided, output also includes reference_name and alias.
        Output columns (when alias_map is given):
        tag_name, reference_name, controller_name, alias, ts, value_double, orig_ts
        Otherwise:
        tag_name, ts, value_double, orig_ts
        """
        # --- work off the full DF so we keep reference/controller columns
        base = self.df_full.select("tag_name", "reference_name", "controller_name", "ts", "value_double")

        start_ts = self.str2ts(start_ts_str)
        end_ts = self.str2ts(end_ts_str)
        ff_timeout = self.ff_timeout
        sample_freq = self.sample_freq
        ignore_nulls = self.ignore_nulls
        column_types = self.column_types

        # raw window with lead for ffill
        df = base.where(sfn.col("ts") > start_ts - sfn.expr(f"interval {ff_timeout}")) \
                .where(sfn.col("ts") < end_ts)

        if ignore_nulls:
            df = df.na.drop(subset=["value_double"])

        # pad early per (tag,ref,ctrl)
        early_pad = (df
            .select("tag_name", "reference_name", "controller_name")
            .distinct()
            .withColumn("ts", start_ts - sfn.expr(f"interval {ff_timeout}"))
            .withColumn("value_double", sfn.lit(None).cast(column_types["value_double"]))
        )
        df = early_pad.unionByName(df)

        # pad final per (tag,ref,ctrl)
        late_pad = (df
            .select("tag_name", "reference_name", "controller_name")
            .distinct()
            .withColumn("ts", end_ts)
            .withColumn("value_double", sfn.lit(None).cast(column_types["value_double"]))
        )
        df = df.unionByName(late_pad)

        # preserve write times
        df = df.withColumn("orig_ts_double", sfn.col("ts").cast("double"))

        # shift timestamps so ceil-resample picks last value
        df = df.withColumn("ts", sfn.col("ts") + sfn.expr(f"interval {sample_freq} - interval 1 millisecond"))

        if not ignore_nulls:
            neg_inf = sfn.lit("-Inf").cast(column_types["value_double"])
            df = df.withColumn(
                "value_double",
                sfn.when(sfn.col("value_double").isNull(), neg_inf).otherwise(sfn.col("value_double")),
            )

        # resample by (tag_name, reference_name, controller_name)
        df_out = (
            TSDF(df, partition_cols=["tag_name", "reference_name", "controller_name"], ts_col="ts")
            .resample(freq=sample_freq, func="ceil")
            .interpolate(method="ffill", show_interpolated=False)
            .df
        )

        if not ignore_nulls:
            neg_inf = sfn.lit("-Inf").cast(column_types["value_double"])
            df_out = df_out.withColumn(
                "value_double",
                sfn.when(sfn.col("value_double") == neg_inf, sfn.lit(None).cast("double"))
                .otherwise(sfn.col("value_double")),
            )

        # recover orig_ts
        df_out = df_out.withColumn("orig_ts", sfn.to_timestamp(sfn.col("orig_ts_double"))).drop("orig_ts_double")

        # null out stale values beyond timeout
        df_out = df_out.withColumn(
            "value_double",
            sfn.when(sfn.col("ts") - sfn.col("orig_ts") >= sfn.expr("interval " + ff_timeout), None)
            .otherwise(sfn.col("value_double")),
        )

        # clip final window
        df_out = df_out.filter(sfn.col("ts") >= start_ts).filter(sfn.col("ts") < end_ts)

        # Optionally join alias to add alias column and keep reference_name
        if alias_map is not None:
            if isinstance(alias_map, list):
                alias_df = spark.createDataFrame(
                    alias_map, ["tag_name", "reference_name", "controller_name", "alias"]
                ).dropDuplicates()
            else:
                alias_df = alias_map.select("tag_name", "reference_name", "controller_name", "alias").dropDuplicates()

            df_out = (df_out.alias("t")
                .join(
                    alias_df.alias("m"),
                    (col("t.tag_name") == col("m.tag_name")) &
                    (col("t.reference_name").eqNullSafe(col("m.reference_name"))) &
                    (col("t.controller_name") == col("m.controller_name")),
                    "inner"
                )
                .select(
                    col("t.tag_name").alias("tag_name"),
                    col("t.reference_name").alias("reference_name"),
                    col("t.controller_name").alias("controller_name"),
                    col("m.alias").alias("alias"),
                    col("t.ts").alias("ts"),
                    col("t.value_double").alias("value_double"),
                    col("t.orig_ts").alias("orig_ts"),
                )
            )
            return df_out

        # default (no alias)
        return df_out.select("tag_name", "ts", "value_double", "orig_ts")


    def get_raw_data(self, start_ts_str: str, end_ts_str: str) -> DataFrame:
        start_ts = self.str2ts(start_ts_str)
        end_ts = self.str2ts(end_ts_str)
        ff_timeout = self.ff_timeout
        df_raw = self.df.where(sfn.col("ts") > start_ts - sfn.expr(f"interval {ff_timeout}"))
        df_raw = df_raw.where(sfn.col("ts") < end_ts)
        return df_raw

    def get_latest_ts(self) -> str:
        recent = self.df.filter(self.df.ts >= sfn.date_sub(sfn.current_date(), 1))
        max_ts = self._get_latest_ts(recent) or self._get_latest_ts(self.df)
        if max_ts is None:
            raise EOFError("No records found")
        return max_ts.strftime("%Y-%m-%d %H:%M:%S.%f")

    @staticmethod
    def _get_latest_ts(df: DataFrame):
        row = df.select(sfn.max(sfn.col("ts")).alias("max_ts")).first()
        return None if row is None else row[0]

    def pad_constant_timestamp(self, df: DataFrame, ts) -> DataFrame:
        pad_df = df.select("tag_name").distinct().withColumn("ts", ts)
        pad_df = pad_df.withColumn("value_double", sfn.lit(None).cast(self.column_types["value_double"]))
        return pad_df

    @staticmethod
    def str2ts(ts_str: str):
        return sfn.to_timestamp(sfn.lit(ts_str))

    # ---------------------------
    # ALL columns + alias
    # ---------------------------
    def with_alias(self, alias: Union[DataFrame, List[Tuple[str, Optional[str], str, str]]]) -> DataFrame:
        """
        Join filtered historian (df_full) with alias mapping and return:
        tag_name, reference_name, controller_name, alias, <all other historian columns>
        """
        if isinstance(alias, list):
            alias_df = spark.createDataFrame(
                alias, ["tag_name", "reference_name", "controller_name", "alias"]
            ).dropDuplicates()
        else:
            alias_df = alias.select("tag_name", "reference_name", "controller_name", "alias").dropDuplicates()

        t = self.df_full.alias("t")
        m = alias_df.alias("m")

        joined = t.join(
            m,
            (col("t.tag_name") == col("m.tag_name")) &
            (col("t.controller_name") == col("m.controller_name")) &
            (col("t.reference_name").eqNullSafe(col("m.reference_name"))),
            "inner"
        )

        table_cols = [c for c in t.columns if c not in ("tag_name", "reference_name", "controller_name")]
        select_exprs = [
            col("t.tag_name").alias("tag_name"),
            col("t.reference_name").alias("reference_name"),
            col("t.controller_name").alias("controller_name"),
            col("m.alias").alias("alias"),
        ] + [col(f"t.{c}") for c in table_cols]

        return joined.select(*select_exprs)


## Step 1.2: Time Format and Widgets for Notebook


* **Define dates:** Sets `start_date` to XX (self-define) days before today and `end_date` to today, formatted as `YYYY-MM-DD`.
* **Create Databricks widgets:** Exposes parameters you can adjust when running a notebook:

  * `start_date`, `end_date` → time window for queries
  * `site_name`, `historian_name` → which plant/historian to use
  * `site_id`, `site_server` → identifiers for the site and its Proficy server
  * `pl_ids`, `pu_ids_base` → IDs for production line and unit
  * `debug_mode` → dropdown to toggle debug logging (`yes`/`no`)


In [0]:
# Inputs hidden for protecting industrial info
time_format = '%Y-%m-%d'
time_delta_days = 50
end_date = datetime.today()
start_date = end_date - timedelta(days=time_delta_days)

dbutils.widgets.text("start_date", start_date.strftime(time_format))
dbutils.widgets.text("end_date", end_date.strftime(time_format))
dbutils.widgets.text("site_name", "")
dbutils.widgets.text("historian_name", "")
dbutils.widgets.text("site_id", "", "Site iODS ID")
dbutils.widgets.text("site_server", "", "Site iODS Server") 
dbutils.widgets.text("pl_ids", "", "PLID") 
dbutils.widgets.text("pu_ids_base", "", "PUID Base") 
dbutils.widgets.dropdown("debug_mode", "yes",  ["yes", "no"], "Debug Mode")

# Step 2: Reading Influx Data and Pre-Processing

Define tags with actual PLC tag names, references, and PLC controller. Define the alias for each tag for downstream use

In [0]:
# ------------------------------------------------------------
# Anonymized process signal definitions (industrial details hidden)
# Aliases are kept unchanged for thesis consistency
# ------------------------------------------------------------

tag_names = [
    "TAG_SPEED_SETPOINT",
    "TAG_SPEED_ACTUAL",

    "TAG_METERING_PITCH_SP",

    "TAG_UNW_ROLL1_DIAMETER",
    "TAG_UNW_ROLL2_DIAMETER",
    "TAG_UNW_ROLL1_ACTIVE",
    "TAG_UNW_ROLL2_ACTIVE",
    "TAG_LOADCELL_BLC1",
    "TAG_LOADCELL_BLC2",
    "TAG_LOADCELL_UNW",
    "TAG_ENL_UNW_WIDTH",
    "TAG_ENL_UNW_MIDDLE",

    "TAG_ENL_BLC1_WIDTH",
    "TAG_ENL_BLC1_MIDDLE",

    "TAG_BLC_OMEGA1_PITCH",
]

ref_names = [
    "ref_line_speed_sp",
    "ref_line_speed_actual",

    "ref_metering_pitch_sp",

    "ref_unw1_diameter",
    "ref_unw2_diameter",
    "ref_unw1_active",
    "ref_unw2_active",
    "ref_blc1_loadcell",
    "ref_blc2_loadcell",
    "ref_unw_loadcell",
    "ref_enl_unw_width",
    "ref_enl_unw_middle",

    "ref_enl_blc1_width",
    "ref_enl_blc1_middle",

    "ref_blc_omega1_pitch",
]

ctrl_names = [
    "CTRL_MAIN",
    "CTRL_MAIN",

    "CTRL_METERING",

    "CTRL_UNW",
    "CTRL_UNW",
    "CTRL_UNW",
    "CTRL_UNW",
    "CTRL_GEO_A",
    "CTRL_GEO_A",
    "CTRL_UNW",
    "CTRL_UNW",
    "CTRL_UNW",

    "CTRL_GEO_B",
    "CTRL_GEO_B",

    "CTRL_GEO_B",
]

aliases = [
    "Set Speed",
    "Current Speed",

    "Metering Omega Pitch",

    "UNW1 Diameter",
    "UNW2 Diameter",
    "UNW1 Active Status",
    "UNW2 Active Status",
    "BLC1 Loadcell (Before Omega1)",
    "BLC2 Loadcell (Before ChillRoll)",
    "UNW Loadcell (Before Metering Omega)",
    "EnL UNW BLC Width",
    "EnL UNW Middle",

    "EnL BLC1 Width",
    "EnL BLC1 Middle",

    "BLC Omega 1 Pitch",
]

# Unified mapping used throughout the pipeline
alias_map = list(zip(tag_names, ref_names, ctrl_names, aliases))


## Step 2.1: Reading Influx Data for Defined Setpoints (and Resampling)

* **Purpose:** Read widget inputs, align the time window to **06:00 → 06:00**, and fetch + regularize historian data.
* **Timestamps:** `convert_to_timestamp()` turns `YYYY-MM-DD` into `YYYY-MM-DD 06:00:00` for both start/end.
* **Query setup:** Builds `HistorianQuery` in **triplet mode** (parallel `tag_names`, `ref_names`, `ctrl_names`) with `sample_freq="1 minute"` and a **forward-fill timeout** of 15 minutes.
* **Rich view:** `with_alias(alias_map)` returns **all historian columns + a human-readable `alias`** for joins/plots.
* **Regularized series:** `resample(start_ts, end_ts, alias_map=...)` outputs a 1-min grid with `tag_name, reference_name, controller_name, alias, ts, value_double, orig_ts`.
* **Remember to define:** `tag_names`, `ref_names`, `ctrl_names`, and `alias_map` beforehand; ensure the table has the required historian columns.


- **Database:** cdl_stc_prod
- **Schema:** silver_mfg_ot
- **Table:** influxhistorian_fem_timeseries
Source: https://datacatalog.pg.com/table/1937067/

In [0]:
from datetime import datetime

# --- Inputs from widgets ---
table_name     = "cdl_stc_prod.silver_mfg_ot.influxhistorian_fem_timeseries"
site           = dbutils.widgets.get("site_name")
historian_name = dbutils.widgets.get("historian_name")
start_date     = dbutils.widgets.get("start_date")   # "YYYY-MM-DD"
end_date       = dbutils.widgets.get("end_date")     # "YYYY-MM-DD"
start_hour     = 6  # 06:00 window alignment

# --- Helper to build "YYYY-MM-DD HH:MM:SS" at a fixed hour (e.g., 06:00) ---
def convert_to_timestamp(date_str: str, hour: int, minute: int = 0) -> str:
    dt = datetime.strptime(date_str, "%Y-%m-%d")
    dt = dt.replace(hour=hour, minute=minute, second=0, microsecond=0)
    return dt.strftime("%Y-%m-%d %H:%M:%S")

start_ts = convert_to_timestamp(start_date, start_hour)  
end_ts   = convert_to_timestamp(end_date,   start_hour)  

# --- Build HistorianQuery using triplets (tag_names, ref_names, ctrl_names) ---
hq = HistorianQuery(
    table_name,
    tag_names, ref_names, ctrl_names,
    sample_freq="10 seconds",
    ff_timeout="2 minutes",
    site=site,
    historian_name=historian_name,
    start_date=start_date,
    end_date=end_date,
    ignore_nulls=False
)

# --- Rich table: ALL historian columns + alias (first 4 ordered) ---
df_all_plus_alias = hq.with_alias(alias_map)
display(df_all_plus_alias)

# --- Resampled TS on a 1-min grid, aligned 06:00→06:00, with reference_name + controller_name + alias ---
df_resampled = hq.resample(
    start_ts,
    end_ts, 
    alias_map=alias_map    # ensures columns: tag_name, reference_name, controller_name, alias, ts, value_double, orig_ts
).persist()

## Step 2.2: Adding Splice Points and Splice Windows

* **Aligns series:** Extracts a unique time index from `df_resampled`, then left-joins two roll status signals (`UNW1/UNW2 Active Status`) onto it.
* **Flags active states:** Converts each status to binary (`> 0.5 → 1`), giving `st1_active` and `st2_active`.
* **Detects splices:** Finds a **handoff** moment where the previously active roll goes 1→0 while the other is 1 (sets `splice_point = 1`).
* **Builds a window:** Marks a **±WINDOW\_MIN minutes** window around each splice using a seconds-based range window on `ts_long` → `splice_window = 1`.
* **Merges back:** Joins `splice_point`/`splice_window` back to the full long table; missing flags default to 0.
* **Unit fix for sensors:** For specific EnL width/edge aliases, divides `value_double` by **10** to put them on the correct scale; leaves others unchanged.

*Notes:* `WINDOW_MIN = 1` (so ±1 minute). The splice condition assumes exactly one roll active at a time and transitions are clean; adjust if overlaps or gaps occur.


In [0]:
# --- Splice features only (no exclusion) ---

from pyspark.sql import functions as F, Window

# ---------- parameters ----------
THRESH = 0.5             # >0.5 means "active"
PRE_MIN  = 3            # minutes BEFORE splice to mark as "in splice"
POST_MIN = 5             # minutes AFTER splice to mark as "in splice"
ALIAS_ST1 = "UNW1 Active Status"   # Roll_St1_Active
ALIAS_ST2 = "UNW2 Active Status"   # Roll_St2_Active

# Optional: small debounce in ticks to avoid chatter (3-tick mean > 0.5)
DEBOUNCE_TICKS = 3       # if your cadence is ~10s, this ≈30s smoothing; if 1min, it's 3min

# ---------- build aligned status frame ----------
time_df = df_resampled.select("ts").distinct()

st1_df = (df_resampled
          .filter(F.col("alias") == ALIAS_ST1)
          .select("ts", F.col("value_double").alias("st1_val")))
st2_df = (df_resampled
          .filter(F.col("alias") == ALIAS_ST2)
          .select("ts", F.col("value_double").alias("st2_val")))

aligned = (time_df.join(st1_df, "ts", "left")
                  .join(st2_df, "ts", "left")
                  .orderBy("ts"))

# ---------- binary active flags with debounce ----------
w_debounce = Window.orderBy("ts").rowsBetween(-(DEBOUNCE_TICKS-1), 0)

aligned = (aligned
    .withColumn("st1_raw", (F.col("st1_val") > THRESH).cast("int"))
    .withColumn("st2_raw", (F.col("st2_val") > THRESH).cast("int"))
    .withColumn("st1_active", (F.avg("st1_raw").over(w_debounce) > 0.5).cast("int"))
    .withColumn("st2_active", (F.avg("st2_raw").over(w_debounce) > 0.5).cast("int"))
)

# ---------- detect splice point (hand-over moment) ----------
w_ts = Window.orderBy("ts")
aligned = (aligned
    .withColumn("st1_prev", F.lag("st1_active").over(w_ts))
    .withColumn("st2_prev", F.lag("st2_active").over(w_ts))
    .withColumn(
        "splice_point",
        F.when(
            ((F.col("st1_prev") == 1) & (F.col("st1_active") == 0) & (F.col("st2_active") == 1)) |
            ((F.col("st2_prev") == 1) & (F.col("st2_active") == 0) & (F.col("st1_active") == 1)),
            1
        ).otherwise(0)
    )
)

# ---------- splice window flag (asymmetric: [-PRE, +POST] minutes) ----------
aligned = aligned.withColumn("ts_long", F.col("ts").cast("long"))  # epoch seconds
w_range = Window.orderBy("ts_long").rangeBetween(-PRE_MIN*60, POST_MIN*60)
aligned = aligned.withColumn("splice_flag", (F.max("splice_point").over(w_range) > 0).cast("int"))

# ---------- current active roll (1 or 2) ----------
aligned = aligned.withColumn(
    "current_roll",
    F.when((F.col("st1_active")==1) & (F.col("st2_active")==0), F.lit(1))
     .when((F.col("st2_active")==1) & (F.col("st1_active")==0), F.lit(2))
     .otherwise(F.lit(None).cast("int"))  # ambiguous/transition
)

# ---------- time since last splice (minutes, continuous feature) ----------
w_cum = Window.orderBy("ts")
aligned = aligned.withColumn("splice_id", F.sum("splice_point").over(w_cum))

# last splice timestamp up to t
w_seg = Window.partitionBy("splice_id").orderBy("ts").rowsBetween(Window.unboundedPreceding, 0)
aligned = aligned.withColumn(
    "last_splice_ts",
    F.max(F.when(F.col("splice_point")==1, F.col("ts"))).over(w_seg)
)

aligned = aligned.withColumn(
    "time_since_splice_min",
    F.when(F.col("last_splice_ts").isNotNull(),
           (F.col("ts").cast("long") - F.col("last_splice_ts").cast("long"))/60.0)
     .otherwise(None)
)

# ---------- select splice features ----------
splice_feats = aligned.select(
    "ts",
    "splice_point",          # 1 only at the hand-over tick
    "splice_flag",           # 0/1 feature: within splice window
    "current_roll",          # 1 or 2 (or null at transition)
    "time_since_splice_min"  # continuous stabilization feature
)

# ---------- join back to the long dataframe (no exclusion) ----------
df_with_splice_unscaled = (df_resampled
    .join(splice_feats, "ts", "left")
    .fillna({"splice_point": 0, "splice_flag": 0})
)

# ---------- EnL scaling (keep your behavior) ----------
enl_aliases = [
    "EnL UNW BLC Width",
    "EnL UNW Right",
    "EnL UNW Left",
    "EnL BLC1 Right",
    "EnL BLC1 Left"
]

df_with_splice = df_with_splice_unscaled.withColumn(
    "value_double",
    F.when(F.col("alias").isin(enl_aliases), F.col("value_double") / 10.0)
     .otherwise(F.col("value_double"))
)

display(df_with_splice.orderBy(F.desc("ts")).limit(100))
# df_with_splice now has splice features ready to be pivoted/widened and used alongside tensions/modulus/widths/etc.


## Step 2.3: Computing E-Modulus


* **Bring flags:** Pulls unique `splice_point`/`splice_window` per timestamp once.
* **Pivot wide:** Turns the long table into one row per `ts` with columns for each `alias` so formulas can reference signals directly.
* **Prepare inputs:** Maps needed signals (A, B, E, I, G) and converts width from mm→m.
* **Safe formula:** Computes `e_modulus` with guards—if any input is null or a denominator becomes 0, returns `NULL` to avoid bad values.
* **Reattach labels:** Joins splice flags back onto the calculated result by `ts`.
* **Shape to long:** Builds new rows matching the **exact schema** of the original long dataframe (`df_with_splice`), filling non-relevant columns with typed nulls and setting `alias="e_modulus"`.
* **Append:** Unions the new `e_modulus` rows with the original data → final `df_with_emodulus` includes both raw tags and the computed metric, ready for downstream analysis/plots.

$$
E \;=\; \frac{B}{E_m \cdot \left(\frac{0.001 \cdot I \cdot (A - B)}{\big(0.001 \cdot I \cdot A - 0.001 \cdot G \cdot B\big)} - 1\right)}
$$

Where:

* A = **UNW Loadcell (Before Metering Omega)** \[N]
* B = **BLC1 Loadcell (Before Omega1)** \[N]
* E_m = **EnL UNW BLC Width** \[m] (converted from mm ÷ 1000)
* I = **BLC Omega1 Pitch** \[mm/pad]
* G = **Metering Omega Pitch** \[mm/pad]



In [0]:
from pyspark.sql import functions as F

# --- 0) Bring splice flags (once) ---
flags = (
    df_with_splice.select("ts", "splice_point", "splice_flag").distinct()
)

# --- 1) Pivot wide so we can reference aliases in one row per ts ---
# We pivot only the two columns we need to compute: ts, alias/value
wide = (
    df_with_splice
    .select("ts", "alias", "value_double")
    .groupBy("ts")
    .pivot("alias")
    .agg(F.first("value_double"))
)



# Required inputs (using your symbols):
# A = UNW Loadcell (N)
# B = BLC1 Loadcell (Before Omega1) (N)
# E = EnL UNW BLC Width (mm)   -> meters in the formula
# I = BLC1/BLC Omega1 Pitch (mm/pads)
# G = Metering Omega Pitch (mm/pads)

A = F.col("UNW Loadcell (Before Metering Omega)")
B = F.col("BLC1 Loadcell (Before Omega1)")
E_m = (F.col("EnL UNW BLC Width") / F.lit(1000.0))  # mm -> m
I = F.col("BLC Omega 1 Pitch")
G = F.col("Metering Omega Pitch")

# Inner pieces of the formula
denom_inner = (A * 0.001 * I) - (B * 0.001 * G)
ratio_term = (0.001 * I * (A - B)) / denom_inner

# Guard conditions: if any needed term is null OR denominators == 0 => NULL
safe_emod = F.when(
    (A.isNull()) | (B.isNull()) | (E_m.isNull()) | (I.isNull()) | (G.isNull()) |
    (denom_inner == 0) | ((E_m * (ratio_term - 1)) == 0),
    F.lit(None).cast("double")
).otherwise(
    B / (E_m * (ratio_term - 1))
)

# Compute e_modulus per timestamp
e_calc = (
    wide
    .select(
        F.col("ts"),
        safe_emod.alias("e_modulus")
    )
)

# Attach splice flags so the new rows carry the same labels at that ts
e_calc = e_calc.join(flags, "ts", "left")

# --- 3) Build rows with the FULL schema of df_with_splice ---
# We’ll create the new “e_modulus” rows with the same columns (in order)
schema = df_with_splice.schema
cols = [f.name for f in schema]
types = {f.name: f.dataType for f in schema}

def typed_null(colname):
    return F.lit(None).cast(types[colname]).alias(colname)

select_exprs = []
for c in cols:
    if c == "tag_name":
        select_exprs.append(F.lit("e_modulus_tag").alias(c))
    elif c == "reference_name":
        select_exprs.append(F.lit("e_modulus_cal").alias(c))
    elif c == "alias":
        select_exprs.append(F.lit("e_modulus").alias(c))
    elif c == "ts":
        select_exprs.append(F.col("ts").alias(c))
    elif c == "value_double":
        select_exprs.append(F.col("e_modulus").alias(c))
    elif c == "splice_point":
        select_exprs.append(F.col("splice_point").cast(types[c]).alias(c))
    elif c == "splice_flag":
        select_exprs.append(F.col("splice_flag").cast(types[c]).alias(c))
    else:
        select_exprs.append(typed_null(c))

e_rows_long = e_calc.select(*select_exprs)

# --- 4) Append (union) these rows to the original long-format DF ---
df_with_emodulus = df_with_splice.unionByName(e_rows_long)


display(df_with_emodulus.orderBy(F.desc("ts")).limit(100))


## Step 2.4: Rolling Features

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window as W  # safer import

# --- carry splice/context features (ONE ROW PER ts) ---
flags = (
    df_with_emodulus
      .groupBy("ts")
      .agg(
          F.max("splice_flag").alias("splice_flag"),
          F.first("current_roll", ignorenulls=True).alias("current_roll"),
          F.first("time_since_splice_min", ignorenulls=True).alias("time_since_splice_min"),
      )
)

# --- pivot to wide (one row per ts, one column per alias) ---
wide = (
    df_with_emodulus
      .select("ts", "alias", "value_double")
      .groupBy("ts")
      .pivot("alias")
      .agg(F.first("value_double"))
      .orderBy("ts")
)

# join splice flags (no dupes now)
wide = wide.join(flags, "ts", "left").fillna({"splice_flag": 0})

# optional one-hots for roll
wide = (
    wide
    .withColumn("roll1_flag", F.when(F.col("current_roll") == 1, 1).otherwise(0))
    .withColumn("roll2_flag", F.when(F.col("current_roll") == 2, 1).otherwise(0))
)

# =====================================================================
# 1) Define which signals get which type of rolling features
# =====================================================================

full_rolling_signals = [
    "BLC1 Loadcell (Before Omega1)",
    "BLC2 Loadcell (Before ChillRoll)",
    "e_modulus",
]

mean_std_only_signals = [
    "UNW Loadcell (Before Metering Omega)",
    "EnL UNW BLC Width",
    "EnL UNW Middle",
]

base_signals_only = [
    "UNW1 Diameter",
    "UNW2 Diameter",
]

speed_signals = ["Current Speed", "Set Speed", "Linear Line Speed"]

# Time window definitions (range in *seconds* based on ts_long)
wide = wide.withColumn("ts_long", F.col("ts").cast("long"))

def time_win(mins: int):
    return W.orderBy("ts_long").rangeBetween(-int(mins * 60), 0)

WINS_FULL = [3, 5, 10]   # minutes
WINS_LIGHT = [5, 10]     # minutes

# =====================================================================
# 2) Helpers for adding rolling features
# =====================================================================

def add_full_window_feats(df, colname, wmin):
    """pctchg + slope + mean + std for core signals."""
    if colname not in df.columns:
        return df

    w = time_win(wmin)
    x = F.col(colname)

    med   = F.percentile_approx(x, 0.5, 10000).over(w)
    mean  = F.avg(x).over(w)
    std   = F.stddev_samp(x).over(w)
    first = F.first(x, ignorenulls=True).over(w)

    pctchg = (x - med) / F.when(F.abs(med) > 0, F.abs(med))
    slope  = (x - first) / F.lit(float(wmin))

    return (
        df
        .withColumn(f"{colname}__pctchg_med_{wmin}m", pctchg)
        .withColumn(f"{colname}__slope_{wmin}m",      slope)
        .withColumn(f"{colname}__mean_{wmin}m",       mean)
        .withColumn(f"{colname}__std_{wmin}m",        std)
    )

def add_mean_std_feats(df, colname, wmin):
    """Only mean + std (no pctchg, no slope)."""
    if colname not in df.columns:
        return df

    w = time_win(wmin)
    x = F.col(colname)

    mean = F.avg(x).over(w)
    std  = F.stddev_samp(x).over(w)

    return (
        df
        .withColumn(f"{colname}__mean_{wmin}m", mean)
        .withColumn(f"{colname}__std_{wmin}m",  std)
    )

# =====================================================================
# 3) Apply rolling features, but only where we actually need them
# =====================================================================

dfF = wide

for s in full_rolling_signals:
    for wmin in WINS_FULL:
        dfF = add_full_window_feats(dfF, s, wmin)

for s in mean_std_only_signals:
    for wmin in WINS_LIGHT:
        dfF = add_mean_std_feats(dfF, s, wmin)

# joint/group effects (still optional)
t1 = "BLC1 Loadcell (Before Omega1)__pctchg_med_5m"
t2 = "BLC2 Loadcell (Before ChillRoll)__pctchg_med_5m"
em = "e_modulus__pctchg_med_5m"

if all(c in dfF.columns for c in (t1, t2, em)):
    dfF = (
        dfF
        .withColumn(
            "joint_energy_t1_t2_em_5m",
            F.sqrt(F.col(t1) * F.col(t1) + F.col(t2) * F.col(t2) + F.col(em) * F.col(em)),
        )
        .withColumn("t1_t2_interact_5m", F.col(t1) * F.col(t2))
        .withColumn("t1_em_interact_5m", F.col(t1) * F.col(em))
        .withColumn("t2_em_interact_5m", F.col(t2) * F.col(em))
    )

# =====================================================================
# 4) Final feature selection into fv_blc
# =====================================================================

present_speed       = [c for c in speed_signals         if c in dfF.columns]
present_full_base   = [c for c in full_rolling_signals  if c in dfF.columns]
present_light_base  = [c for c in mean_std_only_signals if c in dfF.columns]
present_base_levels = [c for c in base_signals_only     if c in dfF.columns]

core_cols = [
    "ts",
    "splice_flag",
    "current_roll",
    "time_since_splice_min",
    "roll1_flag",
    "roll2_flag",
]
core_cols = [c for c in core_cols if c in dfF.columns]

generated_cols = [c for c in dfF.columns if "__" in c]

keep_cols = (
    core_cols
    + present_speed
    + present_full_base
    + present_light_base
    + present_base_levels
    + generated_cols
)

keep_cols = [c for c in keep_cols if c in dfF.columns]

fv_blc = dfF.select(*keep_cols)

# --- sanity check: should be 1 row per ts ---
n_rows = fv_blc.count()
n_ts   = fv_blc.select("ts").distinct().count()
print("rows in fv_blc:        ", n_rows)
print("distinct ts in fv_blc: ", n_ts)

display(fv_blc.orderBy(F.desc("ts")).limit(100))


## Step 2.5: Computing Linear Speed


* Pulls **Current Speed** and **Metering Omega Pitch** per timestamp, outer-joins them.
* Computes **Linear Line Speed = speed × pitch** (else **0** if missing or ≤0).
* Stamps fixed metadata (alias/tag/reference/controller, flags, timers, `orig_ts`).
* Reorders columns to match `df_with_emodulus` and **unions** the new rows in.

**Notes to sanity-check:**

* Ensure units: if `speed` is **pads/min** and `pitch` is **mm/pad**, result is **mm/min** (✓).
  If `speed` is **m/min**, multiply by **pads/m** or convert `pitch` to meters first.


In [0]:
# --- Add Linear Line Speed directly in WIDE ---
SPEED_COL = "Current Speed"          # update if your alias differs
PITCH_COL = "Metering Omega Pitch"   # update if your alias differs
LINEAR_COL = "Linear Line Speed"

if (SPEED_COL in wide.columns) and (PITCH_COL in wide.columns):
    wide = wide.withColumn(
        LINEAR_COL,
        F.when((F.col(SPEED_COL) > 0) & (F.col(PITCH_COL) > 0),
               (F.col(SPEED_COL) * F.col(PITCH_COL)).cast("double"))
         .otherwise(F.lit(0.0))
    )
else:
    # If one of the inputs is missing, we just skip creating the feature (no breakage)
    pass

signals = [
    "BLC1 Loadcell (Before Omega1)",
    "BLC2 Loadcell (Before ChillRoll)",
    "UNW Loadcell (Before Metering Omega)",
    "EnL UNW BLC Width",
    "EnL UNW Middle",
    "UNW1 Diameter",
    "UNW2 Diameter",
    "e_modulus",
    "Linear Line Speed"  # <-- newly added
]


In [0]:
# after you've added Linear Line Speed into `wide` as you showed
speedish = ["Current Speed", "Set Speed", "Metering Omega Pitch", "Linear Line Speed"]
present_speedish = [c for c in speedish if c in dfF.columns]

keep_cols = (
    ["ts", "splice_flag", "current_roll", "time_since_splice_min", "roll1_flag", "roll2_flag"]
    + present_speedish
    + [c for c in dfF.columns if c in signals or "__" in c]  # all engineered features + base signals
)

fv_blc = dfF.select(*keep_cols)


# Step 3: Proficy Historian / IODS Downtime Data (PySpark SQL)

### **Data Source: Proficy Historian**
- Database in CDL (table name and column names anonymized)

In [0]:
# ------------------------------------------------------------
# Anonymized downtime / uptime extraction query
# (industrial identifiers and reason texts removed)
# ------------------------------------------------------------

query = f"""
SELECT
    t.line_description,
    t.start_time,
    t.end_time,
    t.start_time_utc,
    t.end_time_utc,
    t.reason_lvl1,
    t.reason_lvl2,
    t.reason_lvl3,
    t.reason_lvl4,
    t.operator_comment,
    t.team_description,
    t.product_code,
    t.product_description,
    t.event_duration,
    t.uptime,
    t.fault_flag,
    t.fault_code,
    t.planned_flag,
    t.location_code,
    t.process_unit,
    t.manual_stop_flag,
    t.minor_stop_flag,
    t.major_stop_flag,
    t.is_blocked,
    t.is_constraint,
    t.is_starved
FROM analytics_layer.production_events.downtime_events AS t
WHERE event_date BETWEEN '{dbutils.widgets.get("start_date")}'
                      AND '{dbutils.widgets.get("end_date")}'
  AND business_unit = 'business_unit_X'
  AND site_identifier = '{dbutils.widgets.get("site_id")}'
  AND delete_flag = FALSE
  AND production_line_id = '{dbutils.widgets.get("pl_ids")}'
  AND production_line_desc = 'LINE_X'
  AND (
        reason_lvl1 IN (
          'PROCESS_MODULE_A',
          'PROCESS_MODULE_B',
          'PROCESS_MODULE_C',
          'PROCESS_MODULE_D'
        )
        OR reason_lvl3 IN ('PROCESS_RESOURCE_EXHAUSTED')
      )
  AND reason_lvl2 NOT IN (
        'UPSTREAM_FEED_ISSUE_A',
        'UPSTREAM_FEED_ISSUE_B',
        'UPSTREAM_FEED_ISSUE_C',
        'UPSTREAM_FEED_ISSUE_D'
      )
"""

df_stops_blc = (
    spark.sql(query)
         .toPandas()
         .sort_values(by="start_time_utc")
)

display(df_stops_blc)


# Step 4: Data Merge (fv_blc_labeled)

In [0]:
from pyspark.sql import functions as F, types as T

# ---------- 0) Coerce inputs to Spark DFs ----------
def to_spark(df):
    # already Spark?
    if hasattr(df, "withColumn"): 
        return df
    # pandas / Koalas -> Spark
    import pandas as pd
    if isinstance(df, pd.DataFrame) or hasattr(df, "to_pandas"):
        pdf = df if isinstance(df, pd.DataFrame) else df.to_pandas()
        return spark.createDataFrame(pdf)
    raise TypeError("Provide pandas/Spark/pandas-on-Spark DataFrame.")

fv_blc_sp      = to_spark(fv_blc)
df_stops_blc_sp = to_spark(df_stops_blc)

# ---------- 1) Ensure expected stop column names exist ----------
cols = {c.lower(): c for c in df_stops_blc_sp.columns}
start_col = cols.get("start_time_utc", cols.get("start_ts"))
end_col   = cols.get("end_time_utc",   cols.get("end_ts"))
if start_col is None or end_col is None:
    raise KeyError("df_stops_blc must have start_time_utc/end_time_utc (or start_ts/end_ts)")

st = (
    df_stops_blc_sp
      .withColumn("start_ts", F.to_timestamp(F.col(start_col)))
      .withColumn("end_ts",   F.to_timestamp(F.col(end_col)))
      .select("start_ts","end_ts")
      .dropna(subset=["start_ts"])
)

x = fv_blc_sp.withColumn("ts", F.to_timestamp("ts"))

# ---------- 2) Feature 1: is_ts_in_blc_stop ----------
inside = (
    x.select("ts")
     .join(st, (F.col("ts") >= F.col("start_ts")) & (F.col("ts") <= F.col("end_ts")), "left")
     .withColumn("is_ts_in_blc_stop", F.when(F.col("start_ts").isNotNull(), 1).otherwise(0))
     .groupBy("ts").agg(F.max("is_ts_in_blc_stop").alias("is_ts_in_blc_stop"))
)

# ---------- 3) Feature 2: time_since_failure ----------
last_end = (
    x.select("ts")
     .join(st, F.col("end_ts") <= F.col("ts"), "left")
     .groupBy("ts").agg(F.max("end_ts").alias("last_end_ts"))
)
time_since_failure = last_end.withColumn(
    "time_since_failure",
    F.when(F.col("last_end_ts").isNull(), F.lit(float("inf")))
     .otherwise((F.col("ts").cast("long") - F.col("last_end_ts").cast("long"))/60.0)
).drop("last_end_ts")

# ---------- 4) Feature 4: time_to_failure ----------
next_start = (
    x.select("ts")
     .join(st, F.col("start_ts") >= F.col("ts"), "left")
     .groupBy("ts").agg(F.min("start_ts").alias("next_start_ts"))
)
time_to_failure = next_start.withColumn(
    "time_to_failure",
    F.when(F.col("next_start_ts").isNull(), F.lit(float("inf")))
     .otherwise((F.col("next_start_ts").cast("long") - F.col("ts").cast("long"))/60.0)
).drop("next_start_ts")

# ---------- 5) Join features back ----------
fv_blc_labeled = (
    x
    .join(inside,           "ts", "left")
    .join(time_since_failure,"ts", "left")
    .join(time_to_failure,  "ts", "left")
    .fillna({"is_ts_in_blc_stop": 0})
)

# ---------- 6) Display ----------
display(fv_blc_labeled.orderBy(F.desc("ts")).limit(20))


## Step 4.1 fv_blc_labeled analysis

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

def summarize_fv_blc(df, ts_col="ts"):
    # Ensure timestamp column is proper timestamp type
    df = df.withColumn(ts_col, F.to_timestamp(ts_col))

    print("=== 1) Columns and Data Types ===")
    for f in df.schema.fields:
        print(f" - {f.name}: {f.dataType.simpleString()}")

    # --- 2) One-line definition of each feature (auto + custom for labels) ---
    descriptions = {}
    for c in df.columns:
        if c == ts_col:
            descriptions[c] = "Timestamp of the aggregated sample for this row."
        elif c == "is_ts_in_blc_stop":
            descriptions[c] = "Binary label: 1 if ts lies inside a BLC stop interval, else 0."
        elif c == "time_since_failure":
            descriptions[c] = "Minutes elapsed since the end of the most recent BLC stop (∞ if none so far)."
        elif c == "time_to_failure":
            descriptions[c] = "Minutes until the next BLC stop starts (∞ if no later stop)."
        else:
            # Generic placeholder – you can overwrite these if you want more precise text
            descriptions[c] = f"Feature '{c}' (process variable or engineered feature)."

    print("\n=== 2) Column Descriptions ===")
    for c in df.columns:
        print(f" - {c}: {descriptions[c]}")

    # --- 3) What are labels / what are not ---
    label_cols = ["is_ts_in_blc_stop", "time_since_failure", "time_to_failure"]
    label_cols_present = [c for c in label_cols if c in df.columns]
    feature_cols = [c for c in df.columns if c not in label_cols_present]

    print("\n=== 3) Label vs Non-label Columns ===")
    print("Label columns (targets):")
    print(" ", label_cols_present)
    print("Non-label feature columns:")
    print(" ", feature_cols)

    # --- 4) Total rows ---
    n_rows = df.count()
    print("\n=== 4) Row Count ===")
    print(f"Total rows: {n_rows}")

    # --- 5) Time start and time end ---
    ts_bounds = df.agg(
        F.min(ts_col).alias("t_min"),
        F.max(ts_col).alias("t_max")
    ).first()

    print("\n=== 5) Time Range ===")
    print(f"Time start: {ts_bounds.t_min}")
    print(f"Time end  : {ts_bounds.t_max}")

        # --- 6) Frequency of data (timestamp spacing) ---
    w = Window.orderBy(ts_col)
    df_delta = (
        df.select(
            F.col(ts_col),
            (F.col(ts_col).cast("long") - F.lag(ts_col).over(w).cast("long")).alias("delta_s")
        )
        .where(F.col("delta_s").isNotNull())
    )

    # >>> FIX: avoid .rdd.isEmpty() on shared clusters <<<
    if df_delta.limit(1).count() == 0:
        print("\n=== 6) Frequency ===")
        print("Not enough rows to compute timestamp frequency.")
        return

    delta_stats = df_delta.agg(
        F.min("delta_s").alias("min_s"),
        F.expr("percentile_approx(delta_s, 0.5)").alias("median_s"),
        F.max("delta_s").alias("max_s"),
    ).first()

    print("\n=== 6) Timestamp Step / Frequency ===")
    print(f"Min step   : {delta_stats.min_s} seconds")
    print(f"Median step: {delta_stats.median_s} seconds")
    print(f"Max step   : {delta_stats.max_s} seconds")
    if delta_stats.median_s and delta_stats.median_s != 0:
        print(f"Approx. sampling frequency: {1.0 / delta_stats.median_s:.4f} Hz "
              f"({delta_stats.median_s} s between samples)")


# ---- Call it on your table ----
summarize_fv_blc(fv_blc_labeled)


## Step 4.2: All rows in is_in_blc_stop = 1

In [0]:
df_stops_only = (
    fv_blc_labeled
        .filter(F.col("is_ts_in_blc_stop") == 1)
        .orderBy("ts")
)

display(df_stops_only)


## Step 4.3: Quality Check

The code evaluates several logical conditions in the BLC-labeled dataset to verify alignment between process behavior and engineered stop labels. It computes counts for:

1. **Rows where the line is fully stopped**
   - `Current Speed == 0`
   - Validates true downtime moments.

2. **Rows labeled as inside a BLC stop**
   - `is_ts_in_blc_stop == 1`
   - Measures the size of the stop window after joining with the stop table.

3. **Rows where actual speed deviates from the set speed**
   - `Current Speed != Set Speed`
   - Captures acceleration/deceleration and control mismatches.

4. **Rows before the first recorded failure**
   - `time_since_failure == ∞`
   - Indicates initial normal-operation region.

5. **Rows after the last recorded failure**
   - `time_to_failure == ∞`
   - Represents the tail section of the dataset.

6. **Rows inside splice intervals**
   - `splice_flag == 1`
   - Helps distinguish splice-induced disturbances from true BLC failures.

7. **Potential label misalignment rows**
   - `is_ts_in_blc_stop == 1 AND Current Speed == Set Speed`
   - Ideally, during a true failure, current speed should drop quickly.
   - These rows highlight timestamps where stop labels may be "smeared" due to minute-level stop windows merging with 10-second historian data.

These checks validate whether stop labels, splice indicators, and speed dynamics behave consistently, helping ensure high-quality ML training data.


In [0]:
from pyspark.sql import functions as F

df = fv_blc_labeled
total_rows = 328320  # fixed total count

result = (
    df.agg(
        # 1) Current Speed == 0
        F.sum(F.when(F.col("Current Speed") == 0, 1).otherwise(0)).alias("Current Speed = 0"),

        # 2) In BLC stop
        F.sum(F.when(F.col("is_ts_in_blc_stop") == 1, 1).otherwise(0)).alias("Rows in BLC Stop"),

        # 3) Current Speed != Set Speed AND Current Speed != 0
        F.sum(
            F.when(
                (F.col("Current Speed") != F.col("Set Speed")) &
                (F.col("Current Speed") != 0),
                1
            ).otherwise(0)
        ).alias("Ramp Up / Ramp Down"),

        # 4) time_since_failure == inf/null
        F.sum(
            F.when(
                F.col("time_since_failure").isNull() |
                (F.col("time_since_failure") == float("inf")),
                1
            ).otherwise(0)
        ).alias("Infinite Time since Failure"),

        # 5) time_to_failure == inf/null
        F.sum(
            F.when(
                F.col("time_to_failure").isNull() |
                (F.col("time_to_failure") == float("inf")),
                1
            ).otherwise(0)
        ).alias("Infinite Time to Failure"),

        # 6) splice_flag == 1
        F.sum(F.when(F.col("splice_flag") == 1, 1).otherwise(0)).alias("Data in Splice Window -3/+5 Mins"),

        # 7) is_ts_in_blc_stop AND Current Speed == Set Speed
        F.sum(
            F.when(
                (F.col("is_ts_in_blc_stop") == 1) &
                (F.col("Current Speed") == F.col("Set Speed")),
                1
            ).otherwise(0)
        ).alias("Data in Stop (Smerge with Running Line)")
    )
)

# Convert to Pandas to compute percentage columns
pdf = result.toPandas()

# Add % columns for each metric
for col in pdf.columns:
    perc_col = col + "_perc"
    pdf[perc_col] = (pdf[col] / total_rows) * 100

display(pdf)


# Step 5: PCA Setup (All Dataset)



In [0]:
from pyspark.sql.types import NumericType
from pyspark.sql import functions as F
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt

# -----------------------------
# 1) Choose numeric columns & exclude labels / IDs
# -----------------------------
numeric_cols = [
    f.name for f in fv_blc_labeled.schema.fields
    if isinstance(f.dataType, NumericType)
]

cols_to_exclude = [
    "current_roll",
    "roll1_flag",
    "roll2_flag",
    "Metering Omega Pitch",
    "Set Speed",
    "is_ts_in_blc_stop",
    "time_since_failure",
    "time_to_failure",
]

numeric_cols = [c for c in numeric_cols if c not in cols_to_exclude]

print(f"Total numeric columns used for PCA: {len(numeric_cols)}")
print("Sample of numeric columns:", numeric_cols[:15])

# -----------------------------
# 2) Move data to pandas & scale
# -----------------------------
# drop rows with any NaN in PCA columns
df_pd = (
    fv_blc_labeled
    .select(numeric_cols)
    .dropna(subset=numeric_cols)
    .toPandas()
)

print("Rows going into PCA (Spark -> pandas):", df_pd.shape[0])
print("Shape of pandas matrix:", df_pd.shape)

# Standardize features
scaler = StandardScaler()
X_scaled = scaler.fit_transform(df_pd.values)

# -----------------------------
# 3) Run PCA
# -----------------------------
k = min(20, X_scaled.shape[1])  # up to 20 PCs or #features
pca = PCA(n_components=k, random_state=42)
X_pca = pca.fit_transform(X_scaled)

eigenvalues = pca.explained_variance_
expl_var_ratio = pca.explained_variance_ratio_
cum_ratio = np.cumsum(expl_var_ratio)

# Table of eigenvalues & explained variance
eig_df = pd.DataFrame({
    "PC": [f"PC{i+1}" for i in range(k)],
    "eigenvalue": eigenvalues,
    "expl_var_ratio": expl_var_ratio,
    "cum_expl_var_ratio": cum_ratio,
})

print("\n=== Eigenvalues & explained variance ===")
display(eig_df)

# How many PCs for >=70% variance?
num_for_70 = int(np.argmax(cum_ratio >= 0.70) + 1)
print(f"\nNumber of PCs needed to explain ≥70% variance: {num_for_70}")

# -----------------------------
# 4) Scree plot
# -----------------------------
plt.figure(figsize=(8, 5))
plt.plot(range(1, k+1), expl_var_ratio, marker="o")
plt.xlabel("Principal Component")
plt.ylabel("Explained Variance Ratio")
plt.title("Scree Plot")
plt.grid(True)

# Optional: vertical line at 70% cutoff
plt.axvline(num_for_70, linestyle="--")
plt.show()

# -----------------------------
# 5) Feature loadings (which features dominate each PC)
# -----------------------------
components = pca.components_  # shape (k, n_features)
loadings = pd.DataFrame(
    components.T,
    index=numeric_cols,
    columns=[f"PC{i+1}" for i in range(k)]
)

print("\n=== Loadings matrix (features x PCs) – head ===")
display(loadings.head())

# Top features per PC (by absolute loading)
N = min(8, k)  # show for PC1..PC8
for i in range(N):
    pc_name = f"PC{i+1}"
    print(f"\n=== Top 10 features for {pc_name} (by |loading|) ===")
    top_idx = (
        loadings[pc_name]
        .abs()
        .sort_values(ascending=False)
        .head(10)
        .index
    )
    top_df = pd.DataFrame({
        "feature": top_idx,
        "loading": loadings.loc[top_idx, pc_name],
        "abs_loading": loadings.loc[top_idx, pc_name].abs()
    })
    display(top_df)


## Step 5.1: Feature Domination in each PCA

In [0]:
from pyspark.sql.types import NumericType
from pyspark.sql import functions as F
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt

# -----------------------------
# 1) Choose numeric columns & exclude labels / IDs
# -----------------------------
numeric_cols = [
    f.name for f in fv_blc_labeled.schema.fields
    if isinstance(f.dataType, NumericType)
]

cols_to_exclude = [
    "current_roll",
    "roll1_flag",
    "roll2_flag",
    "Metering Omega Pitch",
    "Set Speed",
    "is_ts_in_blc_stop",
    "time_since_failure",
    "time_to_failure",
]

numeric_cols = [c for c in numeric_cols if c not in cols_to_exclude]

print(f"Total numeric columns used for PCA: {len(numeric_cols)}")
print("Sample of numeric columns:", numeric_cols[:15])

# -----------------------------
# 2) Move data to pandas & scale
# -----------------------------
# drop rows with any NaN in PCA columns
df_pd = (
    fv_blc_labeled
    .select(numeric_cols)
    .dropna(subset=numeric_cols)
    .toPandas()
)

print("Rows going into PCA (Spark -> pandas):", df_pd.shape[0])
print("Shape of pandas matrix:", df_pd.shape)

# Standardize features
scaler = StandardScaler()
X_scaled = scaler.fit_transform(df_pd.values)

# -----------------------------
# 3) Run PCA
# -----------------------------
k = min(20, X_scaled.shape[1])  # up to 20 PCs or #features
pca = PCA(n_components=k, random_state=42)
X_pca = pca.fit_transform(X_scaled)

eigenvalues = pca.explained_variance_
expl_var_ratio = pca.explained_variance_ratio_
cum_ratio = np.cumsum(expl_var_ratio)

# Table of eigenvalues & explained variance
eig_df = pd.DataFrame({
    "PC": [f"PC{i+1}" for i in range(k)],
    "eigenvalue": eigenvalues,
    "expl_var_ratio": expl_var_ratio,
    "cum_expl_var_ratio": cum_ratio,
})

print("\n=== Eigenvalues & explained variance ===")
display(eig_df)

# How many PCs for >=70% variance?
num_for_70 = int(np.argmax(cum_ratio >= 0.70) + 1)
print(f"\nNumber of PCs needed to explain ≥70% variance: {num_for_70}")

# -----------------------------
# 4) Scree plot
# -----------------------------
plt.figure(figsize=(8, 5))
plt.plot(range(1, k+1), expl_var_ratio, marker="o")
plt.xlabel("Principal Component")
plt.ylabel("Explained Variance Ratio")
plt.title("Scree Plot")
plt.grid(True)

# Optional: vertical line at 70% cutoff
plt.axvline(num_for_70, linestyle="--")
plt.show()

# -----------------------------
# 5) Feature loadings (which features dominate each PC)
# -----------------------------
components = pca.components_  # shape (k, n_features)
loadings = pd.DataFrame(
    components.T,
    index=numeric_cols,
    columns=[f"PC{i+1}" for i in range(k)]
)

print("\n=== Loadings matrix (features x PCs) – head ===")
display(loadings.head())

# Top features per PC (by absolute loading)
N = min(8, k)  # show for PC1..PC8
for i in range(N):
    pc_name = f"PC{i+1}"
    print(f"\n=== Top 10 features for {pc_name} (by |loading|) ===")
    top_idx = (
        loadings[pc_name]
        .abs()
        .sort_values(ascending=False)
        .head(10)
        .index
    )
    top_df = pd.DataFrame({
        "feature": top_idx,
        "loading": loadings.loc[top_idx, pc_name],
        "abs_loading": loadings.loc[top_idx, pc_name].abs()
    })
    display(top_df)


## Step 5.2 PCA Plots

In [0]:
import matplotlib.pyplot as plt
import seaborn as sns

# =========================================================
# Put PCA results into dataframe
# =========================================================
pca_df = pd.DataFrame({
    "PC1": X_pca[:,0],
    "PC2": X_pca[:,1],
    "PC3": X_pca[:,2],
    "label": df_pd_plot["label"]
})

# =========================================================
# Adjustable typography controls (EDIT FREELY)
# =========================================================
AXIS_LABEL_SIZE   = 16
AXIS_LABEL_WEIGHT = "bold"

TICK_LABEL_SIZE   = 13

TITLE_SIZE        = 18
TITLE_WEIGHT      = "bold"

LEGEND_SIZE       = 12

MARKER_SIZE       = 8
ALPHA             = 0.75

# =========================================================
# Color palette (UNCHANGED)
# =========================================================
palette = {
    "Normal": "lightgrey",
    "Speed = 0": "#1f77b4",
    "BLC Stop": "#d62728",
    "Ramp Up/Down": "#ff7f0e",
    "Inf time_since_failure": "#9467bd",
    "Inf time_to_failure": "#8c564b",
    "Splice Window": "#2ca02c",
    "Stop-Merged": "#e377c2"
}

# =========================================================
# Scatter plot function (UNCHANGED logic, typography added)
# =========================================================
def plot_pca(x, y, title, filename):
    fig, ax = plt.subplots(figsize=(11.5, 7.5))  # BIG, thesis-scale

    sns.scatterplot(
        ax=ax,
        data=pca_df,
        x=x, y=y,
        hue="label",
        palette=palette,
        s=MARKER_SIZE,
        alpha=ALPHA,
        edgecolor=None
    )

    # Axis labels
    ax.set_xlabel(
        x,
        fontsize=AXIS_LABEL_SIZE,
        fontweight=AXIS_LABEL_WEIGHT,
        labelpad=14
    )
    ax.set_ylabel(
        y,
        fontsize=AXIS_LABEL_SIZE,
        fontweight=AXIS_LABEL_WEIGHT,
        labelpad=14
    )

    # Tick values
    ax.tick_params(axis="both", labelsize=TICK_LABEL_SIZE)

    # Title
    ax.set_title(
        title,
        fontsize=TITLE_SIZE,
        fontweight=TITLE_WEIGHT,
        pad=16
    )

    # Legend
    legend = ax.legend(
        loc="best",
        fontsize=LEGEND_SIZE,
        frameon=True,
        framealpha=0.95
    )

    # ---- FIX: legend color visibility ----
    for handle in legend.legendHandles:
        handle.set_alpha(1.0)
        handle.set_markersize(8)

    # Layout + export
    fig.tight_layout()
    fig.savefig(f"{filename}.pdf", bbox_inches="tight")
    fig.savefig(f"{filename}.png", dpi=400, bbox_inches="tight")

    plt.show()


# =========================================================
# 3 SEPARATE HIGH-QUALITY PLOTS
# =========================================================
plot_pca("PC1", "PC2", "PCA Scatter for  Reduced Feature Dataset: PC1 vs PC2", "pca_pc1_pc2")
plot_pca("PC2", "PC3", "PCA Scatter for Reduced Feature Dataset: PC2 vs PC3", "pca_pc2_pc3")
plot_pca("PC1", "PC3", "PCA Scatter for Reduced Feature Dataset: PC1 vs PC3", "pca_pc1_pc3")


# Step 6: PCA Setup (Small Dataset)

In [0]:
from pyspark.sql.types import NumericType
from pyspark.sql import functions as F

import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA

# 1) Select numeric columns from your Spark DF
numeric_cols = [
    f.name for f in fv_blc_labeled.schema.fields
    if isinstance(f.dataType, NumericType)
]

# 2) Explicitly drop columns you *don’t* want in PCA
cols_to_exclude = [
    "current_roll", "roll1_flag", "roll2_flag",
    "Metering Omega Pitch",
    "is_ts_in_blc_stop", "time_since_failure", "time_to_failure",
    "Set Speed",
]

numeric_cols = [c for c in numeric_cols if c not in cols_to_exclude]

# 3) Drop all 5m and 10m rolling features, keep only 3m + base signals
numeric_cols = [
    c for c in numeric_cols
    if ("5m" not in c and "10m" not in c)
]

print(f"Total numeric columns used for PCA: {len(numeric_cols)}")
print("Sample of numeric columns:", numeric_cols[:15])

# 4) Drop rows with NULLs in PCA columns
df_for_pca = fv_blc_labeled.dropna(subset=numeric_cols)
print("Rows going into PCA (Spark):", df_for_pca.count())

# --- OPTIONAL: sample if you’re worried about memory / speed ---
# df_for_pca = df_for_pca.sample(fraction=0.3, seed=42)

# 5) Bring only the PCA columns to pandas
pdf = df_for_pca.select(numeric_cols).toPandas()
print("Shape of pandas matrix:", pdf.shape)  # (rows, features)

# 6) Standardize: zero mean, unit variance
scaler = StandardScaler()
X_scaled = scaler.fit_transform(pdf.values)

# 7) Run PCA – up to 20 components or #features, whichever is smaller
k = min(20, X_scaled.shape[1])
pca = PCA(n_components=k)
X_pca = pca.fit_transform(X_scaled)

explained = pca.explained_variance_ratio_
cum_explained = np.cumsum(explained)

print("\n=== Per-component and cumulative explained variance ===")
for i, (var, cum) in enumerate(zip(explained, cum_explained), start=1):
    print(f"PC{i:2d}: var = {var:.4f},  cum = {cum:.4f}")

# 8) How many PCs to reach ≥70% variance?
num_for_70 = int(np.argmax(cum_explained >= 0.70) + 1)
print(f"\nNumber of PCs needed to explain ≥70% variance: {num_for_70}")

# 9) (Optional) small result DataFrame for the first few PCs
pca_summary = pd.DataFrame({
    "PC": [f"PC{i}" for i in range(1, k+1)],
    "explained_variance_ratio": explained,
    "cumulative_variance_ratio": cum_explained
})
pca_summary.head(10)


## Step 6.1: Feature Domination in each PCA

In [0]:
from pyspark.sql.types import NumericType
from pyspark.sql import functions as F
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt

# -----------------------------
# 1) Choose numeric columns & exclude labels / IDs
# -----------------------------
numeric_cols = [
    f.name for f in fv_blc_labeled.schema.fields
    if isinstance(f.dataType, NumericType)
]

cols_to_exclude = [
    "current_roll",
    "roll1_flag",
    "roll2_flag",
    "Metering Omega Pitch",
    "Set Speed",
    "is_ts_in_blc_stop",
    "time_since_failure",
    "time_to_failure",
]

numeric_cols = [c for c in numeric_cols if c not in cols_to_exclude]

# Keep only base + 3m features (drop 5m and 10m rollings)
numeric_cols = [
    c for c in numeric_cols
    if ("5m" not in c and "10m" not in c)
]

print(f"Total numeric columns used for PCA: {len(numeric_cols)}")
print("Sample of numeric columns:", numeric_cols[:15])

# -----------------------------
# 2) Move data to pandas & scale
# -----------------------------
df_pd = (
    fv_blc_labeled
    .select(numeric_cols)
    .dropna(subset=numeric_cols)
    .toPandas()
)

print("Rows going into PCA (Spark -> pandas):", df_pd.shape[0])
print("Shape of pandas matrix:", df_pd.shape)

# Standardize features
scaler = StandardScaler()
X_scaled = scaler.fit_transform(df_pd.values)

# -----------------------------
# 3) Run PCA
# -----------------------------
k = min(20, X_scaled.shape[1])  # up to 20 PCs or #features
pca = PCA(n_components=k, random_state=42)
X_pca = pca.fit_transform(X_scaled)

eigenvalues = pca.explained_variance_
expl_var_ratio = pca.explained_variance_ratio_
cum_ratio = np.cumsum(expl_var_ratio)

eig_df = pd.DataFrame({
    "PC": [f"PC{i+1}" for i in range(k)],
    "eigenvalue": eigenvalues,
    "expl_var_ratio": expl_var_ratio,
    "cum_expl_var_ratio": cum_ratio,
})

print("\n=== Eigenvalues & explained variance ===")
display(eig_df)

num_for_70 = int(np.argmax(cum_ratio >= 0.70) + 1)
print(f"\nNumber of PCs needed to explain ≥70% variance: {num_for_70}")

# -----------------------------
# 4) Scree plot
# -----------------------------
plt.figure(figsize=(8, 5))
plt.plot(range(1, k+1), expl_var_ratio, marker="o")
plt.xlabel("Principal Component")
plt.ylabel("Explained Variance Ratio")
plt.title("Scree Plot")
plt.grid(True)
plt.axvline(num_for_70, linestyle="--")  # 70% cutoff
plt.show()

# -----------------------------
# 5) Feature loadings (which features dominate each PC)
# -----------------------------
components = pca.components_  # shape (k, n_features)
loadings = pd.DataFrame(
    components.T,
    index=numeric_cols,
    columns=[f"PC{i+1}" for i in range(k)]
)

print("\n=== Loadings matrix (features x PCs) – head ===")
display(loadings.head())

# Top features per PC (by absolute loading)
N = min(8, k)  # show for PC1..PC8
for i in range(N):
    pc_name = f"PC{i+1}"
    print(f"\n=== Top 10 features for {pc_name} (by |loading|) ===")
    top_idx = (
        loadings[pc_name]
        .abs()
        .sort_values(ascending=False)
        .head(10)
        .index
    )
    top_df = pd.DataFrame({
        "feature": top_idx,
        "loading": loadings.loc[top_idx, pc_name],
        "abs_loading": loadings.loc[top_idx, pc_name].abs()
    })
    display(top_df)


## Step 6.2: PCA Plots

In [0]:
import matplotlib.pyplot as plt
import seaborn as sns

# =========================================================
# Put PCA results into dataframe
# =========================================================
pca_df = pd.DataFrame({
    "PC1": X_pca[:,0],
    "PC2": X_pca[:,1],
    "PC3": X_pca[:,2],
    "label": df_pd_plot["label"]
})

# =========================================================
# Adjustable typography controls (EDIT FREELY)
# =========================================================
AXIS_LABEL_SIZE   = 16
AXIS_LABEL_WEIGHT = "bold"

TICK_LABEL_SIZE   = 13

TITLE_SIZE        = 18
TITLE_WEIGHT      = "bold"

LEGEND_SIZE       = 12

MARKER_SIZE       = 8
ALPHA             = 0.75

# =========================================================
# Color palette (UNCHANGED)
# =========================================================
palette = {
    "Normal": "lightgrey",
    "Speed = 0": "#1f77b4",
    "BLC Stop": "#d62728",
    "Ramp Up/Down": "#ff7f0e",
    "Inf time_since_failure": "#9467bd",
    "Inf time_to_failure": "#8c564b",
    "Splice Window": "#2ca02c",
    "Stop-Merged": "#e377c2"
}

# =========================================================
# Scatter plot function (UNCHANGED logic, typography added)
# =========================================================
def plot_pca(x, y, title, filename):
    fig, ax = plt.subplots(figsize=(11.5, 7.5))  # BIG, thesis-scale

    sns.scatterplot(
        ax=ax,
        data=pca_df,
        x=x, y=y,
        hue="label",
        palette=palette,
        s=MARKER_SIZE,
        alpha=ALPHA,
        edgecolor=None
    )

    # Axis labels
    ax.set_xlabel(
        x,
        fontsize=AXIS_LABEL_SIZE,
        fontweight=AXIS_LABEL_WEIGHT,
        labelpad=14
    )
    ax.set_ylabel(
        y,
        fontsize=AXIS_LABEL_SIZE,
        fontweight=AXIS_LABEL_WEIGHT,
        labelpad=14
    )

    # Tick values
    ax.tick_params(axis="both", labelsize=TICK_LABEL_SIZE)

    # Title
    ax.set_title(
        title,
        fontsize=TITLE_SIZE,
        fontweight=TITLE_WEIGHT,
        pad=16
    )

    # Legend
    legend = ax.legend(
        loc="best",
        fontsize=LEGEND_SIZE,
        frameon=True,
        framealpha=0.95
    )

    # ---- FIX: legend color visibility ----
    for handle in legend.legendHandles:
        handle.set_alpha(1.0)
        handle.set_markersize(8)

    # Layout + export
    fig.tight_layout()
    fig.savefig(f"{filename}.pdf", bbox_inches="tight")
    fig.savefig(f"{filename}.png", dpi=400, bbox_inches="tight")

    plt.show()


# =========================================================
# 3 SEPARATE HIGH-QUALITY PLOTS
# =========================================================
plot_pca("PC1", "PC2", "PCA Scatter for  Reduced Feature Dataset: PC1 vs PC2", "pca_pc1_pc2")
plot_pca("PC2", "PC3", "PCA Scatter for Reduced Feature Dataset: PC2 vs PC3", "pca_pc2_pc3")
plot_pca("PC1", "PC3", "PCA Scatter for Reduced Feature Dataset: PC1 vs PC3", "pca_pc1_pc3")


## Step 6.3: PCA Plosts with Smaller Regime

In [0]:
# =========================================================
# 0) Imports
# =========================================================
from pyspark.sql import functions as F
import pandas as pd
import matplotlib.pyplot as plt


# =========================================================
# 1) FILTER TO ONLY LAST 120 MIN BEFORE STOP
# =========================================================
df_filtered = fv_blc_labeled.filter(
    (F.col("time_to_failure") <= 120) | (F.col("is_ts_in_blc_stop") == 1)
)


# =========================================================
# 2) APPLY LABEL LOGIC (UNCHANGED)
# =========================================================
df_filtered = df_filtered.withColumn(
    "label",
    F.when(F.col("is_ts_in_blc_stop") == 1, "BLC Stop")
     .when(F.col("Current Speed") == 0, "Speed = 0")
     .when(
         (F.col("Current Speed") != F.col("Set Speed")) &
         (F.col("Current Speed") != 0),
         "Ramp Up/Down"
     )
     .when(F.col("splice_flag") == 1, "Splice Window")
     .otherwise("Normal")
)


# =========================================================
# 3) MOVE TO PANDAS (ALIGN WITH PCA BASE)
# =========================================================
df_pd_filtered = (
    df_filtered
    .select(numeric_cols + ["label"])
    .dropna()
    .toPandas()
)


# =========================================================
# 4) APPLY EXISTING SCALER + PCA (NO REFIT)
# =========================================================
X_scaled_filtered = scaler.transform(
    df_pd_filtered[numeric_cols].values
)

X_pca_filtered = pca.transform(X_scaled_filtered)

df_pd_filtered["PC1"] = X_pca_filtered[:, 0]
df_pd_filtered["PC2"] = X_pca_filtered[:, 1]
df_pd_filtered["PC3"] = X_pca_filtered[:, 2]


# =========================================================
# 5) PLOTTING PARAMETERS (ADJUST FREELY)
# =========================================================
AXIS_LABEL_SIZE   = 16
AXIS_LABEL_WEIGHT = "bold"

TICK_LABEL_SIZE   = 13

TITLE_SIZE        = 18
TITLE_WEIGHT      = "bold"

LEGEND_SIZE       = 12

MARKER_SIZE       = 12
ALPHA             = 0.85


labels = ["Normal", "Splice Window", "Speed = 0", "Ramp Up/Down", "BLC Stop"]

colors = {
    "Normal": "#d3d3d3",
    "Splice Window": "#1f77b4",
    "Speed = 0": "#2ca02c",
    "Ramp Up/Down": "#ff7f0e",
    "BLC Stop": "#d62728",
}


# =========================================================
# 6) HELPER: SINGLE PCA SCATTER PLOT
# =========================================================
def plot_pca_filtered(xcol, ycol, filename):
    fig, ax = plt.subplots(figsize=(11.5, 7.5))

    for lbl in labels:
        subset = df_pd_filtered[df_pd_filtered["label"] == lbl]
        ax.scatter(
            subset[xcol],
            subset[ycol],
            s=MARKER_SIZE,
            alpha=ALPHA,
            color=colors[lbl],
            label=lbl
        )

    # Axis labels
    ax.set_xlabel(
        xcol,
        fontsize=AXIS_LABEL_SIZE,
        fontweight=AXIS_LABEL_WEIGHT,
        labelpad=14
    )
    ax.set_ylabel(
        ycol,
        fontsize=AXIS_LABEL_SIZE,
        fontweight=AXIS_LABEL_WEIGHT,
        labelpad=14
    )

    # Tick labels
    ax.tick_params(axis="both", labelsize=TICK_LABEL_SIZE)

    # Title
    ax.set_title(
        f"PCA Scatter for Reduced Feature Dataset: {xcol} vs {ycol}",
        fontsize=TITLE_SIZE,
        fontweight=TITLE_WEIGHT,
        pad=16
    )

    ax.grid(True)

    # Legend
    legend = ax.legend(
        loc="best",
        fontsize=LEGEND_SIZE,
        frameon=True,
        framealpha=0.95
    )

    # Ensure legend visibility
    for handle in legend.legendHandles:
        handle.set_alpha(1.0)


    # Export
    fig.tight_layout()
    fig.savefig(f"{filename}.pdf", bbox_inches="tight")
    fig.savefig(f"{filename}.png", dpi=400, bbox_inches="tight")

    plt.show()


# =========================================================
# 7) GENERATE 3 SEPARATE HIGH-QUALITY FIGURES
# =========================================================
plot_pca_filtered("PC1", "PC2", "pca_filtered_pc1_pc2")
plot_pca_filtered("PC2", "PC3", "pca_filtered_pc2_pc3")
plot_pca_filtered("PC1", "PC3", "pca_filtered_pc1_pc3")


# Step 7: PCA Setup (Mini Dataset)

In [0]:
from pyspark.sql.types import NumericType
from pyspark.sql import functions as F

import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA

# 0) Restrict to smaller regime:
#    - points within 120 min to failure, OR
#    - points that are actually in BLC stop
df_filtered = fv_blc_labeled.filter(
    (F.col("time_to_failure") <= 120) | (F.col("is_ts_in_blc_stop") == 1)
)

print("Rows in filtered window (<=120 min or BLC stop):", df_filtered.count())

# 1) Select numeric columns from the FILTERED DF
numeric_cols = [
    f.name for f in df_filtered.schema.fields
    if isinstance(f.dataType, NumericType)
]

# 2) Explicitly drop columns you don’t want in PCA
cols_to_exclude = [
    "current_roll", "roll1_flag", "roll2_flag",
    "Metering Omega Pitch",
    "is_ts_in_blc_stop", "time_since_failure", "time_to_failure",
    "Set Speed",
]

numeric_cols = [c for c in numeric_cols if c not in cols_to_exclude]

# 3) Drop all 5m and 10m rolling features, keep only 3m + base signals
numeric_cols = [
    c for c in numeric_cols
    if ("5m" not in c and "10m" not in c)
]

print(f"Total numeric columns used for PCA: {len(numeric_cols)}")
print("Sample of numeric columns:", numeric_cols[:15])

# 4) Drop rows with NULLs in PCA columns
df_for_pca = df_filtered.dropna(subset=numeric_cols)
print("Rows going into PCA (Spark):", df_for_pca.count())

# 5) Bring only the PCA columns to pandas
pdf = df_for_pca.select(numeric_cols).toPandas()
print("Shape of pandas matrix:", pdf.shape)  # (rows, features)

# 6) Standardize: zero mean, unit variance
scaler = StandardScaler()
X_scaled = scaler.fit_transform(pdf.values)

# 7) Run PCA – up to 20 components or #features, whichever is smaller
k = min(20, X_scaled.shape[1])
pca = PCA(n_components=k, random_state=42)
X_pca = pca.fit_transform(X_scaled)

explained = pca.explained_variance_ratio_
cum_explained = np.cumsum(explained)

print("\n=== Per-component and cumulative explained variance ===")
for i, (var, cum) in enumerate(zip(explained, cum_explained), start=1):
    print(f"PC{i:2d}: var = {var:.4f},  cum = {cum:.4f}")

# 8) How many PCs to reach ≥70% variance?
num_for_70 = int(np.argmax(cum_explained >= 0.70) + 1)
print(f"\nNumber of PCs needed to explain ≥70% variance: {num_for_70}")

# 9) (Optional) summary DF
pca_summary = pd.DataFrame({
    "PC": [f"PC{i}" for i in range(1, k+1)],
    "explained_variance_ratio": explained,
    "cumulative_variance_ratio": cum_explained
})
pca_summary.head(10)


## Step 7.1: Feature Domination in each PCA

In [0]:
from pyspark.sql.types import NumericType
from pyspark.sql import functions as F
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt

# -----------------------------
# 0) Filter to smaller regime
# -----------------------------
df_filtered = fv_blc_labeled.filter(
    (F.col("time_to_failure") <= 120) | (F.col("is_ts_in_blc_stop") == 1)
)

print("Rows in filtered window (<=120 min or BLC stop):", df_filtered.count())

# -----------------------------
# 1) Choose numeric columns & exclude labels / IDs
# -----------------------------
numeric_cols = [
    f.name for f in df_filtered.schema.fields
    if isinstance(f.dataType, NumericType)
]

cols_to_exclude = [
    "current_roll",
    "roll1_flag",
    "roll2_flag",
    "Metering Omega Pitch",
    "Set Speed",
    "is_ts_in_blc_stop",
    "time_since_failure",
    "time_to_failure",
]

numeric_cols = [c for c in numeric_cols if c not in cols_to_exclude]

# Keep only base + 3m features (drop 5m and 10m rolling features)
numeric_cols = [
    c for c in numeric_cols
    if ("5m" not in c and "10m" not in c)
]

print(f"Total numeric columns used for PCA: {len(numeric_cols)}")
print("Sample of numeric columns:", numeric_cols[:15])

# -----------------------------
# 2) Move data to pandas & scale
# -----------------------------
df_pd = (
    df_filtered
    .select(numeric_cols)
    .dropna(subset=numeric_cols)
    .toPandas()
)

print("Rows going into PCA (Spark -> pandas):", df_pd.shape[0])
print("Shape of pandas matrix:", df_pd.shape)

# Standardize features
scaler = StandardScaler()
X_scaled = scaler.fit_transform(df_pd.values)

# -----------------------------
# 3) Run PCA
# -----------------------------
k = min(20, X_scaled.shape[1])  # up to 20 PCs or #features
pca = PCA(n_components=k, random_state=42)
X_pca = pca.fit_transform(X_scaled)

eigenvalues = pca.explained_variance_
expl_var_ratio = pca.explained_variance_ratio_
cum_ratio = np.cumsum(expl_var_ratio)

eig_df = pd.DataFrame({
    "PC": [f"PC{i+1}" for i in range(k)],
    "eigenvalue": eigenvalues,
    "expl_var_ratio": expl_var_ratio,
    "cum_expl_var_ratio": cum_ratio,
})

print("\n=== Eigenvalues & explained variance ===")
display(eig_df)

num_for_70 = int(np.argmax(cum_ratio >= 0.70) + 1)
print(f"\nNumber of PCs needed to explain ≥70% variance: {num_for_70}")

# -----------------------------
# 4) Scree plot
# -----------------------------
plt.figure(figsize=(8, 5))
plt.plot(range(1, k+1), expl_var_ratio, marker="o")
plt.xlabel("Principal Component")
plt.ylabel("Explained Variance Ratio")
plt.title("Scree Plot (Filtered ≤120 min / BLC stop, 3m features)")
plt.grid(True)
plt.axvline(num_for_70, linestyle="--")  # 70% cutoff
plt.show()

# -----------------------------
# 5) Feature loadings (which features dominate each PC)
# -----------------------------
components = pca.components_  # shape (k, n_features)
loadings = pd.DataFrame(
    components.T,
    index=numeric_cols,
    columns=[f"PC{i+1}" for i in range(k)]
)

print("\n=== Loadings matrix (features x PCs) – head ===")
display(loadings.head())

# Keep X_pca + df_pd for later plotting
# (PC1, PC2, PC3 are X_pca[:,0], X_pca[:,1], X_pca[:,2])

# Top features per PC (by absolute loading)
N = min(8, k)  # show for PC1..PC8
for i in range(N):
    pc_name = f"PC{i+1}"
    print(f"\n=== Top 10 features for {pc_name} (by |loading|) ===")
    top_idx = (
        loadings[pc_name]
        .abs()
        .sort_values(ascending=False)
        .head(10)
        .index
    )
    top_df = pd.DataFrame({
        "feature": top_idx,
        "loading": loadings.loc[top_idx, pc_name],
        "abs_loading": loadings.loc[top_idx, pc_name].abs()
    })
    display(top_df)


## Step 7.2: PCA Plots

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import NumericType
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt
import seaborn as sns

# ----------------------------------------------------------
# 1) FILTER REGIME (<=120 min OR in BLC stop)
# ----------------------------------------------------------
df_filtered = fv_blc_labeled.filter(
    (F.col("time_to_failure") <= 120) | (F.col("is_ts_in_blc_stop") == 1)
)

print("Rows in filtered DF:", df_filtered.count())

# ----------------------------------------------------------
# 2) ADD LABEL COLUMN HERE (ON THE SAME FILTERED SET)
# ----------------------------------------------------------
df_labeled = df_filtered.withColumn(
    "label",
    F.when(F.col("is_ts_in_blc_stop") == 1, "BLC Stop")
     #.when(F.col("Current Speed") == 0, "Speed = 0")
     .when(
         (F.col("Current Speed") != F.col("Set Speed")) &
         (F.col("Current Speed") != 0),
         "Ramp Up/Down"
     )
     .when(F.col("splice_flag") == 1, "Splice Window")
     .otherwise("Normal")
)

# ----------------------------------------------------------
# 3) SELECT NUMERIC FEATURES (base + 3m only)
# ----------------------------------------------------------
numeric_cols = [
    f.name for f in df_labeled.schema.fields
    if isinstance(f.dataType, NumericType)
]

cols_to_exclude = [
    "current_roll", "roll1_flag", "roll2_flag",
    "Metering Omega Pitch",
    "Set Speed",
    "is_ts_in_blc_stop", "time_since_failure", "time_to_failure",
]

numeric_cols = [c for c in numeric_cols if c not in cols_to_exclude]
numeric_cols = [c for c in numeric_cols if ("5m" not in c and "10m" not in c)]

print("Numeric columns used:", len(numeric_cols))

# ----------------------------------------------------------
# 4) BRING PCA FEATURES + LABELS INTO ONE PANDAS DF
# ----------------------------------------------------------
df_pd = (
    df_labeled
    .select(numeric_cols + ["label"])
    .dropna(subset=numeric_cols)
    .toPandas()
)

print("Rows in df_pd:", df_pd.shape[0])

# ----------------------------------------------------------
# 5) PCA
# ----------------------------------------------------------
X = df_pd[numeric_cols].values
y = df_pd["label"].values  # ALIGNED!

scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

k = min(20, len(numeric_cols))
pca = PCA(n_components=k, random_state=42)
X_pca = pca.fit_transform(X_scaled)

# ----------------------------------------------------------
# 6) Build PCA DF for plotting
# ----------------------------------------------------------
pca_df = pd.DataFrame({
    "PC1": X_pca[:, 0],
    "PC2": X_pca[:, 1],
    "PC3": X_pca[:, 2],
    "label": y
})

# ----------------------------------------------------------
# 7) PLOT
# ----------------------------------------------------------
palette = {
    "Normal": "#B0B0B0",        # Darker grey for visibility
    "Speed = 0": "#0047FF",     # Deep electric blue
    "BLC Stop": "#FF0000",      # Pure red (high alert)
    "Ramp Up/Down": "#FF8C00",  # Dark orange (vivid)
    "Splice Window": "#00CC00"  # Strong green
}


def plot(ax, x, y, title):
    sns.scatterplot(
        data=pca_df,
        x=x, y=y,
        hue="label",
        palette=palette,
        s=10, alpha=0.35,
        ax=ax
    )
    ax.set_title(title)

fig, axes = plt.subplots(1, 3, figsize=(22, 6))
plot(axes[0], "PC1", "PC2", "PC1 vs PC2")
plot(axes[1], "PC2", "PC3", "PC2 vs PC3")
plot(axes[2], "PC1", "PC3", "PC1 vs PC3")
plt.tight_layout()
plt.show()


# Step 8: Early Alarm Detection ML Model

%md
### Simple Supervised Early-Warning Alarm (1ß-Minute Horizon, 0–180 min Regime)

**Goal.**  
We build a lightweight, supervised alarm model that predicts whether the machine will enter a BLC stop within the next **10 minutes**, using only data from the **0–180 minute regime before a stop**. The focus is on:

- Raising alarms shortly before a stop (10-minute horizon)
- Keeping the **false positive rate low** (operator trust)
- Keeping the model **simple and interpretable**

---

### 1. Label Definition

From the existing `time_to_failure` signal, we define a **binary label** for each timestamp:

- `y = 1` if `time_to_failure <= 10` minutes  
- `y = 0` otherwise  

We restrict to rows where `0 <= time_to_failure <= 180` minutes. This creates a focused "normal regime + pre-stop" dataset that matches our PCA Phase 3 diagnostics.

---

### 2. Features

From all numeric columns, we **exclude**:

- Direct labels or leak variables:
  - `y`, `time_to_failure`, `time_since_failure`, `is_ts_in_blc_stop`, `splice_flag`
- Low-level technical/control fields:
  - `current_roll`, `roll1_flag`, `roll2_flag`, `Metering Omega Pitch`, `Set Speed`

Everything else (base signals + 3/5/10-minute rolling statistics) is used as input.  
This is consistent with the PCA findings, where:

- BLC1 loadcell, BLC2 loadcell, and `e_modulus` (and their rolling means/std/slopes) dominated the first components.
- Width and middle sensors plus unwinder diameters showed up in later PCs as structural “setup” features.

---

### 3. Model

We use a **tree-based classifier** (`HistGradientBoostingClassifier`) because:

- It handles non-linear interactions between features (tension, modulus, diameters, etc.).
- It does **not** require feature scaling.
- It supports **class weights** to handle the strong imbalance between “normal” and “imminent stop” samples.

Class weights are set such that positive samples (imminent stop, `y = 1`) are up-weighted relative to the majority normal class.

The data is split into:

- **Training set:** 70% of the regime data  
- **Validation set:** 30% of the regime data (stratified by label)

---

### 4. Threshold Selection (Conservative Alarm)

The model outputs a probability `p(stop within 10 min)` for each timestamp.  
Instead of using a naive threshold of 0.5, we:

1. Compute the **ROC curve** on the validation set.
2. Search for a threshold where:
   - The **false positive rate (FPR)** is below a small target (e.g. 2%).
   - Among these low-FPR points, we pick the one with the **highest true positive rate (recall)**.

This gives a **conservative alarm**: it may miss some stops, but when it raises an alarm, it is less likely to be noise.

---

### 5. Evaluation Outputs

The code prints:

- **ROC AUC**: how well the model separates positive/negative cases in terms of probability.
- **Chosen threshold**
- **Confusion matrix components**:
  - TP: true alarms (correctly predicted imminent stops)
  - FP: false alarms (alarm but no stop in 10 min)
  - FN: missed alarms (stop but no alarm)
  - TN: correct normals (no alarm when there is no stop)

It also generates:

1. A **ROC curve plot**  
   - Model curve vs. random baseline  
   - Marked point for the chosen threshold

2. A **bar chart** of `[TP, FP, FN, TN]`  
   - Clear visual split between correct alarms, false alarms, missed stops, and normal states.



In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import NumericType

import pandas as pd
import numpy as np

from sklearn.model_selection import train_test_split
from sklearn.ensemble import HistGradientBoostingClassifier
from sklearn.metrics import roc_auc_score, roc_curve, confusion_matrix
import matplotlib.pyplot as plt

# ---------------------------------------------------------
# 1) Filter to 0–120 min regime and build 3-min label
# ---------------------------------------------------------
# Keep only rows where we have a defined time_to_failure in [0, 120] minutes
df_regime = (
    fv_blc_labeled
    .filter((F.col("time_to_failure") >= 0) & (F.col("time_to_failure") <= 180))
)

# Binary label:
#   y = 1  if stop will occur within 3 minutes
#   y = 0  otherwise
df_regime = df_regime.withColumn(
    "y",
    F.when(F.col("time_to_failure") <= 10, 1).otherwise(0)
)

# ---------------------------------------------------------
# 2) Select numeric feature columns
# ---------------------------------------------------------
numeric_cols = [
    f.name for f in df_regime.schema.fields
    if isinstance(f.dataType, NumericType)
]

# Exclude obvious non-features / targets / leak columns
cols_to_exclude = [
    "y",
    "time_to_failure",
    "time_since_failure",
    "is_ts_in_blc_stop",
    "splice_flag",
    "current_roll",
    "roll1_flag",
    "roll2_flag",
    "Metering Omega Pitch",
    "Set Speed"
]

feature_cols = [c for c in numeric_cols if c not in cols_to_exclude]

print(f"Number of feature columns: {len(feature_cols)}")
print("Sample features:", feature_cols[:15])

# ---------------------------------------------------------
# 3) Move regime data to pandas and drop NaNs
# ---------------------------------------------------------
pdf = (
    df_regime
    .select(feature_cols + ["y"])
    .dropna(subset=feature_cols + ["y"])
    .toPandas()
)

print("Pandas shape (rows, features+label):", pdf.shape)

X = pdf[feature_cols].values
y = pdf["y"].values.astype(int)

# Basic label stats
n_pos = int(y.sum())
n_total = len(y)
print(f"Total samples: {n_total}")
print(f"Positives (<=10 min to stop): {n_pos} ({n_pos/n_total:.4%})")

# ---------------------------------------------------------
# 4) Train / validation split
# ---------------------------------------------------------
X_train, X_val, y_train, y_val = train_test_split(
    X,
    y,
    test_size=0.3,
    random_state=42,
    stratify=y
)

print("Train size:", X_train.shape[0], "Val size:", X_val.shape[0])

# ---------------------------------------------------------
# 5) Simple, interpretable classifier (HistGradientBoosting)
#    with class weighting to handle imbalance
# ---------------------------------------------------------
pos_weight = (len(y_train) - y_train.sum()) / max(y_train.sum(), 1)
class_weight = {0: 1.0, 1: float(pos_weight)}
print("Class weights used:", class_weight)

clf = HistGradientBoostingClassifier(
    max_depth=4,
    learning_rate=0.05,
    max_iter=200,
    class_weight=class_weight,
    random_state=42
)

clf.fit(X_train, y_train)

# ---------------------------------------------------------
# 6) Predict probabilities and choose conservative threshold
#    – target: low false positive rate (e.g. <= 2%)
# ---------------------------------------------------------
probs_val = clf.predict_proba(X_val)[:, 1]

auc = roc_auc_score(y_val, probs_val)
print(f"\nValidation ROC AUC: {auc:.3f}")

fpr, tpr, thresholds = roc_curve(y_val, probs_val)

# Choose threshold with FPR <= 2% and highest TPR among them
target_fpr = 0.02
mask = fpr <= target_fpr

if mask.any():
    idx = np.argmax(tpr[mask])
    best_threshold = thresholds[mask][idx]
    best_fpr = fpr[mask][idx]
    best_tpr = tpr[mask][idx]
else:
    # If nothing reaches FPR <= 2%, pick the lowest FPR point
    idx = np.argmin(fpr)
    best_threshold = thresholds[idx]
    best_fpr = fpr[idx]
    best_tpr = tpr[idx]

y_pred = (probs_val >= best_threshold).astype(int)

tn, fp, fn, tp = confusion_matrix(y_val, y_pred).ravel()

precision = tp / (tp + fp) if (tp + fp) > 0 else 0.0
recall = tp / (tp + fn) if (tp + fn) > 0 else 0.0
fpr_final = fp / (fp + tn) if (fp + tn) > 0 else 0.0

print("\n==== 10-MIN EARLY WARNING RESULTS (Supervised model, 3 Hour Window) ====")
print(f"Chosen threshold: {best_threshold:.3f}")
print(f"TP (true alarms):   {tp}")
print(f"FP (false alarms):  {fp}")
print(f"FN (missed alarms): {fn}")
print(f"TN (correct normals): {tn}")
print(f"Recall (TPR):   {recall:.3f}")
print(f"FPR:           {fpr_final:.3f}")
print(f"Precision:     {precision:.3f}")

# ---------------------------------------------------------
# 7) Visuals: ROC + Confusion bar chart
# ---------------------------------------------------------

# --- ROC curve plot ---
plt.figure(figsize=(7, 6))
plt.plot(fpr, tpr, label=f"ROC (AUC = {auc:.3f})")
plt.plot([0, 1], [0, 1], linestyle="--", color="grey", label="Random")
plt.scatter([best_fpr], [best_tpr], color="red", label="Chosen threshold")
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("ROC Curve – 10-Min Early Warning (T2 regime)")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()

# --- Bar chart: TP vs FP vs FN vs TN ---
counts = [tp, fp, fn, tn]
labels_bar = ["TP (True alarms)", "FP (False alarms)", "FN (Missed)", "TN (Correct normals)"]
colors_bar = ["#2ca02c", "#d62728", "#ff7f0e", "#1f77b4"]  # green, red, orange, blue

plt.figure(figsize=(7, 6))
plt.bar(labels_bar, counts, color=colors_bar)
plt.ylabel("Count")
plt.title("Alarm Outcomes – 10-Min Early Warning (Validation set)")
for i, v in enumerate(counts):
    plt.text(i, v, str(v), ha="center", va="bottom", fontsize=9)
plt.tight_layout()
plt.show()


# --- Confusion Matrix Plot ---
import matplotlib.pyplot as plt
import numpy as np

# Build confusion matrix in sklearn format:
# [[TN, FP],
#  [FN, TP]]
cm = np.array([[tn, fp],
               [fn, tp]])

fig, ax = plt.subplots(figsize=(5, 4))
im = ax.imshow(cm, cmap="Blues")

# Annotate each cell with its numeric value
for i in range(cm.shape[0]):
    for j in range(cm.shape[1]):
        ax.text(j, i, cm[i, j], ha="center", va="center", fontsize=12, color="black")

# Labels + ticks
ax.set_xticks([0, 1])
ax.set_yticks([0, 1])
ax.set_xticklabels(["Predicted No Alarm", "Predicted Alarm"])
ax.set_yticklabels(["Actual No Alarm", "Actual Alarm"])

ax.set_title("Confusion Matrix", fontsize=14)
plt.colorbar(im, fraction=0.046, pad=0.04)
plt.tight_layout()
plt.show()


# Step 9: Label Leakage Validation Test

In [0]:
import numpy as np
from sklearn.metrics import roc_auc_score
from xgboost import XGBClassifier

# ----- 1. Prepare data (already engineered) -----
X = pdf[feature_cols].values
y = pdf["y"].values.astype(int)

n = len(pdf)
split_idx = int(0.7 * n)   # time-based split: first 70% train, last 30% val

X_train, X_val = X[:split_idx], X[split_idx:]
y_train, y_val = y[:split_idx], y[split_idx:]

# class weight like before
pos = y_train.sum()
neg = len(y_train) - pos
scale_pos_weight = neg / max(pos, 1)

common_params = dict(
    n_estimators=200,
    max_depth=4,
    learning_rate=0.05,
    subsample=0.8,
    colsample_bytree=0.8,
    objective="binary:logistic",
    eval_metric="logloss",
    scale_pos_weight=scale_pos_weight,
    n_jobs=-1,
    tree_method="hist",
)

# ----- 2. Baseline model (real labels) -----
model_real = XGBClassifier(**common_params)
model_real.fit(X_train, y_train)

probs_real = model_real.predict_proba(X_val)[:, 1]
auc_real = roc_auc_score(y_val, probs_real)
print(f"Baseline ROC AUC (real labels): {auc_real:.3f}")

# ----- 3. Leakage test: train on SHUFFLED labels -----
y_train_shuffled = y_train.copy()
np.random.shuffle(y_train_shuffled)

model_shuf = XGBClassifier(**common_params)
model_shuf.fit(X_train, y_train_shuffled)

probs_shuf = model_shuf.predict_proba(X_val)[:, 1]
auc_shuf = roc_auc_score(y_val, probs_shuf)
print(f"Leakage test ROC AUC (train on shuffled labels): {auc_shuf:.3f}")

# Interpretation:
# - auc_real high (e.g. 0.98–0.99) AND
# - auc_shuf ~ 0.50
# → strong evidence that the model is learning real structure, not leakage.
