In [2]:
from __future__ import annotations

import json
import logging
from pathlib import Path
from typing import Sequence

import joblib
import numpy as np
import numpy.typing as npt
import pandas as pd
from sklearn.preprocessing import MinMaxScaler

from config import (
    TRAIN_FD001_PATH,
    ARTIFACTS_DIR,
    SCALER_PATH,
    ARTIFACTS_INFO_PATH,
    SENSORS,
    COLS,
)

logger = logging.getLogger(__name__)



In [3]:
def create_sequences(
    data_df: pd.DataFrame,
    sensor_cols: Sequence[str],
    sequence_length: int,
) -> npt.NDArray[np.float32]:
    """
    Convert time series data into sliding windows for sequence models.

    Groups by engine unit, sorts by time, then builds windows of length
    sequence_length over the selected sensor columns.
    """
    if sequence_length <= 0:
        raise ValueError("sequence_length must be positive")

    required_cols = {"unit_number", "time_in_cycles"}
    missing = required_cols.difference(data_df.columns)
    if missing:
        raise KeyError(f"data_df is missing required columns: {missing}")

    # Ensure chronological order per unit
    data_sorted = data_df.sort_values(
        ["unit_number", "time_in_cycles"],
        kind="mergesort",
    )

    sequences: list[npt.NDArray[np.float32]] = []

    for _, group in data_sorted.groupby("unit_number", sort=False):
        values = group.loc[:, sensor_cols].to_numpy(dtype=np.float32)
        num_sequences = values.shape[0] - sequence_length + 1
        if num_sequences <= 0:
            continue

        for start in range(num_sequences):
            end = start + sequence_length
            sequences.append(values[start:end])

    if not sequences:
        return np.empty((0, sequence_length, len(sensor_cols)), dtype=np.float32)

    return np.stack(sequences, axis=0)


In [None]:
def fit_and_save_artifacts(
    data_path: Path = TRAIN_FD001_PATH,
    cols: Sequence[str] = COLS,
    artifacts_dir: Path = ARTIFACTS_DIR,
    scaler_path: Path = SCALER_PATH,
    info_path: Path = ARTIFACTS_INFO_PATH,
) -> None:
    """
    Fit a MinMaxScaler on training data and persist preprocessing artifacts.

    The scaler is fit only on non constant sensor columns.
    """
    # logger.info("Starting preprocessing using data at %s", data_path)

    # print(data_path)
    # return 
    if not data_path.is_file():
        logger.error("Data file not found at %s", data_path)
        raise FileNotFoundError(
            f"Data file not found at {data_path}. "
            "Expected 'train_FD001.txt' under 'data/CMAPSSData/'."
        )

    df = pd.read_csv(
        data_path,
        sep=r"\s+",
        header=None,
        names=cols,
    )
    
    
    # Identify constant columns among sensors
    sensor_df = df.loc[:, SENSORS] # : means all row, SENSORS means only columns in SENSORS list


    variance = sensor_df.var() # variance for each sensor column, not just one value
    cols_to_drop = variance[variance == 0.0].index.to_list()
    cols_to_scale = [col for col in SENSORS if col not in cols_to_drop] # we do this because SENSORS is a python list

    logger.info("Found %d constant sensor columns to drop", len(cols_to_drop))
    logger.debug("Columns to drop: %s", cols_to_drop)
    logger.info("Found %d sensor columns to scale", len(cols_to_scale))

    # Fit scaler on non constant sensors (training data only)
    scaler = MinMaxScaler()
    scaler.fit(df.loc[:, cols_to_scale]) # take all rows, only columns to scale, fit scaler on that

    # Save artifacts
    artifacts_dir.mkdir(parents=True, exist_ok=True)

    joblib.dump(scaler, scaler_path) 
    logger.info("Scaler saved to %s", scaler_path)

    artifacts_info = {
        "cols_to_drop": cols_to_drop,
        "cols_to_scale": cols_to_scale,
    }

    info_path.parent.mkdir(parents=True, exist_ok=True)
    with info_path.open("w", encoding="utf-8") as f:
        json.dump(artifacts_info, f, indent=4)

    logger.info("Artifacts metadata saved to %s", info_path)
    logger.info("Preprocessing complete")


fit_and_save_artifacts()

sensor_1     0.0
sensor_10    0.0
sensor_18    0.0
sensor_19    0.0
dtype: float64
++++++++++++++++++++++++++++++
Index(['sensor_1', 'sensor_10', 'sensor_18', 'sensor_19'], dtype='object')
++++++++++++++++++++++++++++++
['sensor_1', 'sensor_10', 'sensor_18', 'sensor_19']


In [26]:
mylist = [1,2,3,4,5,6,7,8,9,10]
print([x for x in mylist if x % 2 == 0])

[2, 4, 6, 8, 10]


In [None]:
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(levelname)s %(name)s - %(message)s",
)

In [10]:
fit_and_save_artifacts()

2025-11-26 09:51:44,024 INFO __main__ - Starting preprocessing using data at C:\Users\bayou\OneDrive\Dokumente\projects\turbofan-mlops-pipeline\data\CMAPSSData\train_FD001.txt


   unit_number  time_in_cycles  op_setting_1  op_setting_2  op_setting_3  \
0            1               1       -0.0007       -0.0004         100.0   
1            1               2        0.0019       -0.0003         100.0   
2            1               3       -0.0043        0.0003         100.0   
3            1               4        0.0007        0.0000         100.0   
4            1               5       -0.0019       -0.0002         100.0   

   sensor_1  sensor_2  sensor_3  sensor_4  sensor_5  ...  sensor_12  \
0    518.67    641.82   1589.70   1400.60     14.62  ...     521.66   
1    518.67    642.15   1591.82   1403.14     14.62  ...     522.28   
2    518.67    642.35   1587.99   1404.20     14.62  ...     522.42   
3    518.67    642.35   1582.79   1401.87     14.62  ...     522.86   
4    518.67    642.37   1582.85   1406.22     14.62  ...     522.19   

   sensor_13  sensor_14  sensor_15  sensor_16  sensor_17  sensor_18  \
0    2388.02    8138.62     8.4195       0.03