In [2]:
import logging
import pickle
from typing import List, Optional

import numpy as np
import pandas as pd
from numpy.lib.stride_tricks import sliding_window_view
from keras import models, utils

from predictive_maintenance.config import Config
from predictive_maintenance.data import IMSDataETL

# Configure logging
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)


class IMSModelEvaluator:
    """
    Handles the loading of trained models and evaluation of test data
    for IMS Anomaly Detection.
    """

    def __init__(self):
        """Initializes the evaluator and ensures output directories exist."""
        Config.OUTPUTS_DIR.mkdir(parents=True, exist_ok=True)
        Config.RESULTS_SAVE_PATH.parent.mkdir(parents=True, exist_ok=True)

        self.model: Optional[models.Model] = None
        self.scaler = None
        self.etl = IMSDataETL()

    def load_artifacts(self) -> None:
        """Loads the trained Keras model and the Pickle scaler."""
        model_path = Config.MODEL_SAVE_PATH
        scaler_path = Config.SCALER_SAVE_PATH

        if not model_path.exists() or not scaler_path.exists():
            raise FileNotFoundError(
                f"Artifacts not found in {Config.MODELS_DIR}. Please run training first."
            )

        logger.info(f">>> Loading model from: {model_path}")
        self.model = models.load_model(model_path)

        logger.info(f">>> Loading scaler from: {scaler_path}")
        with open(scaler_path, "rb") as f:
            self.scaler = pickle.load(f)

    def evaluate(self) -> None:
        """
        Main evaluation loop: Loads data, predicts reconstruction,
        calculates MAE, and saves results.
        """
        logger.info(">>> Starting Test Evaluation...")

        if self.model is None:
            self.load_artifacts()

        # Load Test Data
        df_test, feats_test = self.etl.load_test_data()

        # Group indices by test_id and bearing_id
        groups = df_test.groupby(["test_id", "bearing_id"], observed=True).indices
        results: List[pd.DataFrame] = []

        logger.info(f">>> Processing {len(groups)} groups...")

        for (tid, bid), idxs in groups.items():
            vals = feats_test[idxs]

            # Validation: Ensure sufficient data length
            if len(vals) < Config.WINDOW:
                logger.warning(f"Skipping group {tid}-{bid}: Insufficient data length.")
                continue

            # Scale data
            scaled_vals = self.scaler.transform(vals)

            # 1. Prepare Input for Model (TF Dataset for batching efficiency)
            ds_test = utils.timeseries_dataset_from_array(
                scaled_vals,
                None,
                sequence_length=Config.WINDOW,
                batch_size=Config.BATCH,
                shuffle=False,
            )

            # 2. Predict (Reconstruction)
            # Returns shape: (N_windows, Window_Size, Features)
            predictions = self.model.predict(ds_test, verbose=0)

            # 3. Prepare Ground Truth (Optimized Vectorization)
            # Instead of iterating the dataset (slow), create a sliding window view (fast)
            # sliding_window_view output: (N_windows, Features, Window_Size) -> Move axis to match preds
            actual_windows = sliding_window_view(
                scaled_vals, window_shape=Config.WINDOW, axis=0
            )
            # Transpose from (N, Feat, Win) to (N, Win, Feat)
            actual_windows = np.moveaxis(actual_windows, -1, 1)

            # 4. Calculate Mean Absolute Error (MAE)
            # shape: (N_windows,)
            mae = np.mean(np.abs(predictions - actual_windows), axis=(1, 2))

            # 5. Compile Results
            # Slice the original dataframe to match the number of predictions
            sub_df = df_test.iloc[idxs].iloc[: len(mae)].copy()
            sub_df["anomaly_score"] = mae.astype(np.float32)

            # Cleanup unnecessary columns to optimize memory
            sub_df.drop(columns=["vibration"], errors="ignore", inplace=True)

            results.append(sub_df)
            logger.info(f"Processed: {tid}-{bid} | Windows: {len(mae)}")

        # Save Final Results
        if results:
            final_df = pd.concat(results, ignore_index=True)
            output_path = Config.RESULTS_SAVE_PATH
            final_df.to_csv(output_path, index=False)
            logger.info(f"\n✅ Evaluation complete! Results saved to: {output_path}")
        else:
            logger.warning("No results were generated. Check your data configuration.")


if __name__ == "__main__":
    evaluator = IMSModelEvaluator()
    evaluator.evaluate()

2025-12-11 20:16:22,398 - INFO - >>> Starting Test Evaluation...
2025-12-11 20:16:22,400 - INFO - >>> Loading model from: /home/seyhankokcu/Development/TSA/predictive-maintenance-lstm/models/ims_lstm_model.keras
2025-12-11 20:16:22,655 - INFO - >>> Loading scaler from: /home/seyhankokcu/Development/TSA/predictive-maintenance-lstm/models/scaler.pkl
2025-12-11 20:17:10,898 - INFO - >>> Processing 12 groups...
2025-12-11 20:17:15,735 - INFO - Processed: 1-1 | Windows: 2029
2025-12-11 20:17:18,211 - INFO - Processed: 1-2 | Windows: 2029
2025-12-11 20:17:20,301 - INFO - Processed: 1-3 | Windows: 2029
2025-12-11 20:17:22,331 - INFO - Processed: 1-4 | Windows: 2029
2025-12-11 20:17:23,313 - INFO - Processed: 2-1 | Windows: 857
2025-12-11 20:17:24,276 - INFO - Processed: 2-2 | Windows: 857
2025-12-11 20:17:25,245 - INFO - Processed: 2-3 | Windows: 857
2025-12-11 20:17:26,199 - INFO - Processed: 2-4 | Windows: 857
2025-12-11 20:17:31,955 - INFO - Processed: 3-1 | Windows: 6197
2025-12-11 20:17: