# Experiment B — IMS Bearings: STFT + Autoencoder (Run-to-Failure)

## Goal
Build a time–frequency anomaly score using **STFT + Autoencoder reconstruction loss** on IMS bearings.
This experiment extends the "it depends" message: time-domain and frequency-domain can each be better
depending on failure mode and signal characteristics. STFT is used as a practical compromise because it
preserves both *when* and *where (in frequency)* changes occur.

## Dataset
IMS bearings (run-to-failure). Each file is a ~1-second snapshot. Folder layout:
`data/external/ims_bearings/{1st_test,2nd_test,3rd_test}/<timestamp_filename>`

## Output
- Per-snapshot anomaly score over time for each run
- Healthy-only training on early-life snapshots
- Operational thresholds (p99 / p99.5) + smoothing (rolling median/quantiles)


# 1. Import

In [1]:
from __future__ import annotations

import os
import re
import glob
from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional, Tuple, Dict

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from scipy import signal

# Deep learning (Experiment B core)
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

SEED = 42
np.random.seed(SEED)
tf.random.set_seed(SEED)


In [3]:
@dataclass(frozen=True)
class IMSConfig:
    data_root: Path
    fs_hz: int = 20480              # typical IMS sampling rate
    n_channels: int = 8             # your files show 8 columns
    use_channel: int = 0            # start with ch1 (0-based)
    window_size: int = 2048
    overlap: float = 0.5

    # STFT per window
    stft_nperseg: int = 256
    stft_noverlap: int = 192
    stft_nfft: int = 512

    # Training: first fraction of snapshots per run are treated as "healthy baseline"
    healthy_fraction_per_run: float = 0.2

    # Score aggregation (from window-level -> snapshot-level)
    snapshot_agg: str = "p95"       # mean | p95 | max

    # Smoothing for operational view
    smooth_window: int = 15         # rolling window in snapshots (tune)
    

cfg = IMSConfig(data_root=Path("../../../data/external/ims_bearings").resolve())
print(cfg)
print("Data root exists?", cfg.data_root.exists(), cfg.data_root)


IMSConfig(data_root=WindowsPath('C:/Users/M4005001/OneDrive - Saint-Gobain/Pessoal/Alura/Anomaly_Detection/unsupervised_industrial_anomaly_detection/data/external/ims_bearings'), fs_hz=20480, n_channels=8, use_channel=0, window_size=2048, overlap=0.5, stft_nperseg=256, stft_noverlap=192, stft_nfft=512, healthy_fraction_per_run=0.2, snapshot_agg='p95', smooth_window=15)
Data root exists? True C:\Users\M4005001\OneDrive - Saint-Gobain\Pessoal\Alura\Anomaly_Detection\unsupervised_industrial_anomaly_detection\data\external\ims_bearings


## Data Index (Run-to-Failure Timeline)

We build a tidy index with:
- run_id: {1st_test, 2nd_test, 3rd_test}
- filepath
- timestamp parsed from filename
- per-run file ordering
- life_fraction: normalized position in the run [0,1]


In [10]:
def discover_ims_index(data_root: Path) -> pd.DataFrame:
    if not data_root.exists():
        raise FileNotFoundError(f"IMS root not found: {data_root}")

    paths = [Path(p) for p in glob.glob(str(data_root / "**/*"), recursive=True)]
    paths = [p for p in paths if p.is_file() and p.stat().st_size > 1000]

    if not paths:
        raise FileNotFoundError(f"No data files found under: {data_root}")

    rows = []
    for p in sorted(paths):
        rel = p.relative_to(data_root)
        run_id = rel.parts[0] if len(rel.parts) > 0 else "unknown_run"

        name = p.name
        m = re.search(r"(\d{4})\.(\d{2})\.(\d{2})\.(\d{2})\.(\d{2})\.(\d{2})", name)
        ts = None
        if m:
            ts = f"{m.group(1)}-{m.group(2)}-{m.group(3)} {m.group(4)}:{m.group(5)}:{m.group(6)}"

        rows.append(
            dict(run_id=run_id, filepath=str(p), filename=p.name, timestamp=ts)
        )

    df = pd.DataFrame(rows)
    df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce")

    # stable per-run ordering
    df = df.sort_values(["run_id", "timestamp", "filename"]).reset_index(drop=True)
    df["file_order"] = df.groupby("run_id").cumcount()

    # numeric sort_key
    ts_ns = pd.to_datetime(df["timestamp"], errors="coerce").astype("int64")
    df["sort_key"] = np.where(df["timestamp"].notna(), ts_ns, df["file_order"].astype("int64"))

    # final ordering
    df = df.sort_values(["run_id", "sort_key", "filename"]).reset_index(drop=True)

    # normalized life fraction
    df["life_fraction"] = (
        df.groupby("run_id").cumcount() / df.groupby("run_id")["filepath"].transform("count")
    )

    return df


In [11]:
index_df = discover_ims_index(cfg.data_root)
print(index_df.dtypes)
display(index_df.groupby("run_id").agg(n_files=("filepath","count")).sort_values("n_files", ascending=False))
index_df.head()



run_id                      str
filepath                    str
filename                    str
timestamp        datetime64[us]
file_order                int64
sort_key                  int64
life_fraction           float64
dtype: object


Unnamed: 0_level_0,n_files
run_id,Unnamed: 1_level_1
3rd_test,6324
1st_test,2156
2nd_test,984
Readme Document for IMS Bearing Data.pdf,1


Unnamed: 0,run_id,filepath,filename,timestamp,file_order,sort_key,life_fraction
0,1st_test,C:\Users\M4005001\OneDrive - Saint-Gobain\Pess...,2003.10.22.12.06.24,2003-10-22 12:06:24,0,1066824384000000,0.0
1,1st_test,C:\Users\M4005001\OneDrive - Saint-Gobain\Pess...,2003.10.22.12.09.13,2003-10-22 12:09:13,1,1066824553000000,0.000464
2,1st_test,C:\Users\M4005001\OneDrive - Saint-Gobain\Pess...,2003.10.22.12.14.13,2003-10-22 12:14:13,2,1066824853000000,0.000928
3,1st_test,C:\Users\M4005001\OneDrive - Saint-Gobain\Pess...,2003.10.22.12.19.13,2003-10-22 12:19:13,3,1066825153000000,0.001391
4,1st_test,C:\Users\M4005001\OneDrive - Saint-Gobain\Pess...,2003.10.22.12.24.13,2003-10-22 12:24:13,4,1066825453000000,0.001855
