# 02 — Feature Engineering (RUL, Streaming)

This notebook streams per-machine to keep memory low.
- Input: `data/processed/rul_labeled.parquet`
- Shards: `data/processed/shards_RUL`
- Target: `RUL`

In [None]:
from pathlib import Path
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import TimeSeriesSplit
import joblib

NB_PATH = Path.cwd()
ROOT = NB_PATH.parents[1] if NB_PATH.parts[-2].lower() in {'rul','cof'} else NB_PATH
DATA_DIR = ROOT / 'data'
PROCESSED_DIR = DATA_DIR / 'processed'
print("ROOT:", ROOT)

In [None]:
# --- Config
id_col   = 'machine_id'
time_col = 'timestamp'
target   = 'RUL'
LABELED  = PROCESSED_DIR / 'rul_labeled.parquet'
SHARDS_DIR = PROCESSED_DIR / 'shards_RUL'
SHARDS_DIR.mkdir(parents=True, exist_ok=True)
print('Using labeled file:', LABELED)


In [None]:
# --- Load labeled dataset
df = pd.read_parquet(LABELED)
df = df.sort_values([id_col, time_col]).reset_index(drop=True)
print("Loaded:", df.shape)

In [None]:
# --- Column selection & downcast
exclude = {id_col, time_col, '__line', target}
exclude |= {c for c in df.columns if c.lower() in {'breakdown','failure','fail','is_failure'}}
num_cols = [c for c in df.select_dtypes(include='number').columns if c not in exclude]
print("Numeric sensors:", len(num_cols))

def downcast_numeric(g: pd.DataFrame) -> pd.DataFrame:
    for c in g.select_dtypes(include='float').columns:
        g[c] = pd.to_numeric(g[c], downcast='float')
    for c in g.select_dtypes(include='integer').columns:
        g[c] = pd.to_numeric(g[c], downcast='integer')
    return g

In [None]:
# --- Lightweight feature engineering
def add_lags(g, cols, lags=(1,)):
    for L in lags:
        for c in cols:
            g[f'{c}_lag{L}'] = g[c].shift(L)
    return g

def add_roll_stats(g, cols, windows=(3,5)):
    for w in windows:
        roll = g[cols].rolling(w, min_periods=w)
        g[[f'{c}_roll{w}_mean' for c in cols]] = roll.mean().values
        g[[f'{c}_roll{w}_std'  for c in cols]] = roll.std().values
    return g

In [None]:
# --- Stream per machine → write shard to disk
shard_paths = []
for gid, g in df.groupby(id_col, sort=False):
    g = g[[id_col, time_col, target] + num_cols].copy()
    g = downcast_numeric(g)
    g = add_lags(g, num_cols, lags=(1,))
    g = add_roll_stats(g, num_cols, windows=(3,5))
    g = g.dropna()

    out_path = SHARDS_DIR / f"part_{id_col}_{gid}.parquet"
    g.to_parquet(out_path, index=False)
    shard_paths.append(out_path)

print(f"Shards written: {len(shard_paths)} → {SHARDS_DIR}")

In [None]:
# --- Fit scaler on a sample, then stream‑transform shards to splits
scaler = StandardScaler()
sample_rows = 100_000
seen = 0
X_buf = None
for p in shard_paths:
    g = pd.read_parquet(p)
    feats = [c for c in g.columns if c not in [id_col, time_col, target]]
    X_chunk = g[feats].values
    X_buf = X_chunk if X_buf is None else np.vstack([X_buf, X_chunk])
    seen += len(g)
    if seen >= sample_rows:
        break

joblib.dump(scaler.fit(X_buf), PROCESSED_DIR / 'rul_scaler.joblib')
print('Scaler fitted and saved:', PROCESSED_DIR / 'rul_scaler.joblib')


In [None]:
# --- Build global time ordering to compute cutoffs for splits
meta_parts = []
for p in shard_paths:
    g = pd.read_parquet(p, columns=[time_col])
    meta_parts.append(g.assign(path=str(p)))
meta = pd.concat(meta_parts, ignore_index=True).sort_values(time_col).reset_index(drop=True)

tscv = TimeSeriesSplit(n_splits=3)
splits = list(tscv.split(meta))
train_idx, test_idx = splits[-1]
mid = (test_idx[0] + test_idx[-1]) // 2
val_idx = np.arange(test_idx[0], mid+1)
test_idx2 = np.arange(mid+1, test_idx[-1]+1)

cut_train_t = meta.iloc[train_idx][-1][time_col]
cut_val_t   = meta.iloc[val_idx][-1][time_col]
print('Cutoffs:', cut_train_t, '|', cut_val_t)

In [None]:
# --- Helper to append parquet
from pathlib import Path
def append_parquet(df, path: Path):
    if path.exists():
        old = pd.read_parquet(path)
        pd.concat([old, df], ignore_index=True).to_parquet(path, index=False)
    else:
        df.to_parquet(path, index=False)

In [None]:
# --- Stream shards → scale & route to split
Xtr_path = PROCESSED_DIR / 'RUL_X_train.parquet'
Xva_path = PROCESSED_DIR / 'RUL_X_val.parquet'
Xte_path = PROCESSED_DIR / 'RUL_X_test.parquet'
ytr_path = PROCESSED_DIR / 'RUL_y_train.parquet'
yva_path = PROCESSED_DIR / 'RUL_y_val.parquet'
yte_path = PROCESSED_DIR / 'RUL_y_test.parquet'

for p in [Xtr_path, Xva_path, Xte_path, ytr_path, yva_path, yte_path]:
    if p.exists():
        p.unlink()

for p in shard_paths:
    g = pd.read_parquet(p)
    feats = [c for c in g.columns if c not in [id_col, time_col, target]]
    keep  = [id_col, time_col]
    X_scaled = pd.DataFrame(scaler.transform(g[feats].values), columns=feats)
    X_scaled = pd.concat([g[keep].reset_index(drop=True), X_scaled.reset_index(drop=True)], axis=1)

    mask_tr = g[time_col] <= cut_train_t
    mask_va = (g[time_col] > cut_train_t) & (g[time_col] <= cut_val_t)
    mask_te = g[time_col] > cut_val_t

    append_parquet(X_scaled.loc[mask_tr], Xtr_path)
    append_parquet(X_scaled.loc[mask_va], Xva_path)
    append_parquet(X_scaled.loc[mask_te], Xte_path)

    append_parquet(pd.DataFrame({target: g.loc[mask_tr, target].values}), ytr_path)
    append_parquet(pd.DataFrame({target: g.loc[mask_va, target].values}), yva_path)
    append_parquet(pd.DataFrame({target: g.loc[mask_te, target].values}), yte_path)

print('Saved datasets:')
print('  ', Xtr_path.name, Xva_path.name, Xte_path.name)
print('  ', ytr_path.name, yva_path.name, yte_path.name)
