# FD001 RUL Prediction — Experiment v2

**Change from v1:** `compute_derivatives()` in `packages/geometry/src/geometry/dynamics.py` was non-causal.
Central differences + symmetric smoothing meant `effective_dim_velocity[t]` used windows `t+1` and `t+2`.

**v2 fix:**
- Backward-only smoothing (cumsum-based causal moving average)
- Backward finite differences (velocity at t uses only t and t-1)
- Also fixed `smooth_window` passthrough bug: orchestration pipeline now reads `derivative_depth` from manifest

**Parquets regenerated:** FD001 Train (220s) and Test (153s) re-run with causal derivatives.

**Hypothesis:** Geodyn features (effective_dim_velocity, spectral_gap_velocity, etc.) should transfer better
across cohorts now that they don't encode future information. Centroid features (cv_*) should be unchanged.

In [None]:
import numpy as np
import polars as pl
import json
from pathlib import Path
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.linear_model import Ridge, LinearRegression
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import GroupKFold
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.impute import SimpleImputer
import warnings
warnings.filterwarnings('ignore')

In [None]:
TRAIN_DIR = Path('/Users/jasonrudder/domains/cmapss/FD_001/Train/output_time')
TEST_DIR  = Path('/Users/jasonrudder/domains/cmapss/FD_001/Test/output_time')
RUL_PATH  = Path('/Users/jasonrudder/domains/cmapss/FD_001/RUL_FD001.txt')
OBS_TRAIN = TRAIN_DIR / 'observations.parquet'
OBS_TEST  = TEST_DIR / 'observations.parquet'

EXPERIMENT = 'v2_causal_derivatives'
RUL_CAP = 125
N_FOLDS = 5
SEED = 42

## 1. Build Feature Matrix

In [None]:
def build_features(output_dir: Path, obs_path: Path) -> pl.DataFrame:
    """Build ML feature matrix from pipeline parquets."""
    # Cohort vector (base)
    cv = pl.read_parquet(str(output_dir / 'cohort/cohort_vector.parquet'))
    join_cols = ['cohort', 'signal_0_center']
    feat_cols = [c for c in cv.columns if c not in ['signal_0_start', 'signal_0_end', 'signal_0_center', 'cohort', 'n_signals']]
    ml = cv.select(['cohort', 'signal_0_center'] + feat_cols).rename({c: f'cv_{c}' for c in feat_cols})
    print(f'  cohort_vector: {len(feat_cols)} features, {len(ml)} rows')

    # Geometry dynamics
    gd_path = output_dir / 'cohort/cohort_dynamics/geometry_dynamics.parquet'
    if gd_path.exists():
        gd = pl.read_parquet(str(gd_path))
        gd_feat = [c for c in gd.columns if c not in ['cohort', 'signal_0_start', 'signal_0_end', 'signal_0_center', 'I']]
        gd_sel = gd.select(['cohort', 'signal_0_center'] + gd_feat).rename({c: f'gd_{c}' for c in gd_feat})
        before = ml.shape[1]
        ml = ml.join(gd_sel, on=join_cols, how='left', coalesce=True)
        print(f'  + geometry_dynamics: {ml.shape[1] - before} features')

    # Velocity field (aggregate per cohort+window since it's per-signal)
    vf_path = output_dir / 'cohort/cohort_dynamics/velocity_field.parquet'
    if vf_path.exists():
        vf = pl.read_parquet(str(vf_path))
        num_cols = [c for c in vf.columns if c not in ['cohort', 'signal_0_start', 'signal_0_end', 'signal_0_center',
                    'signal_id', 'dominant_motion_signal'] and vf[c].dtype in [pl.Float64, pl.Float32, pl.Int64]]
        if 'signal_0_center' in vf.columns and num_cols:
            vf_agg = vf.group_by(['cohort', 'signal_0_center']).agg(
                [pl.col(c).mean().alias(f'vf_{c}_mean') for c in num_cols] +
                [pl.col(c).std().alias(f'vf_{c}_std') for c in num_cols]
            )
            before = ml.shape[1]
            ml = ml.join(vf_agg, on=join_cols, how='left', coalesce=True)
            print(f'  + velocity_field: {ml.shape[1] - before} features')

    # Cohort geometry
    cg_path = output_dir / 'cohort/cohort_geometry.parquet'
    if cg_path.exists():
        cg = pl.read_parquet(str(cg_path))
        cg_feat = [c for c in cg.columns if c not in ['cohort', 'signal_0_start', 'signal_0_end', 'signal_0_center',
                    'window_index', 'signal_id'] and cg[c].dtype in [pl.Float64, pl.Float32]]
        if 'signal_0_center' in cg.columns and cg_feat:
            cg_agg = cg.group_by(['cohort', 'signal_0_center']).agg(
                [pl.col(c).mean().alias(f'cg_{c}_mean') for c in cg_feat]
            )
            before = ml.shape[1]
            ml = ml.join(cg_agg, on=join_cols, how='left', coalesce=True)
            print(f'  + cohort_geometry: {ml.shape[1] - before} features')

    # Add RUL
    obs = pl.read_parquet(str(obs_path))
    first_sig = obs['signal_id'].unique().sort()[0]
    lifecycles = dict(
        obs.filter(pl.col('signal_id') == first_sig)
        .group_by('cohort').agg(pl.col('signal_0').max().alias('max_s0'))
        .iter_rows()
    )

    rul_rows = []
    for row in ml.select(['cohort', 'signal_0_center']).iter_rows(named=True):
        max_s0 = lifecycles.get(row['cohort'])
        if max_s0 is not None:
            rul_rows.append({
                'cohort': row['cohort'],
                'signal_0_center': row['signal_0_center'],
                'RUL': max_s0 - row['signal_0_center'],
                'lifecycle': max_s0,
                'lifecycle_pct': row['signal_0_center'] / max_s0 if max_s0 > 0 else 0,
            })

    rul_df = pl.DataFrame(rul_rows)
    ml = ml.join(rul_df, on=['cohort', 'signal_0_center'], how='left', coalesce=True)

    # Drop constant columns
    feat_cols = [c for c in ml.columns if c not in ['cohort', 'signal_0_center', 'RUL', 'lifecycle', 'lifecycle_pct']]
    drop_const = [c for c in feat_cols if ml[c].dtype in [pl.Float64, pl.Float32] and
                  (ml[c].drop_nulls().std() or 0) < 1e-10]
    if drop_const:
        ml = ml.drop(drop_const)
        print(f'  Dropped {len(drop_const)} constant columns')

    feat_final = [c for c in ml.columns if c not in ['cohort', 'signal_0_center', 'RUL', 'lifecycle', 'lifecycle_pct']]
    print(f'  Total: {len(feat_final)} features, {len(ml)} rows, {ml["cohort"].n_unique()} cohorts')
    return ml

In [None]:
print('=== TRAIN ===')
ml_train = build_features(TRAIN_DIR, OBS_TRAIN)
print(f'\n=== TEST ===')
ml_test = build_features(TEST_DIR, OBS_TEST)

## 2. Cross-Validation on Train (GroupKFold by cohort)

In [None]:
META = ['cohort', 'signal_0_center', 'RUL', 'lifecycle', 'lifecycle_pct']

def prepare(ml, cap=RUL_CAP):
    feat_cols = [c for c in ml.columns if c not in META]
    X = ml.select(feat_cols).to_numpy().astype(np.float64)
    y = np.minimum(ml['RUL'].to_numpy().astype(np.float64), cap)
    groups = ml['cohort'].to_numpy()
    # Replace inf BEFORE imputation (SimpleImputer chokes on inf)
    X = np.where(np.isinf(X), np.nan, X)
    imp = SimpleImputer(strategy='median')
    X = imp.fit_transform(X)
    # Belt-and-suspenders: catch any remaining non-finite
    X = np.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0)
    return X, y, groups, feat_cols, imp

X_train, y_train, groups_train, feat_names, imputer = prepare(ml_train)
print(f'Train: {X_train.shape[0]} rows x {X_train.shape[1]} features')
print(f'RUL range (capped): {y_train.min():.0f} – {y_train.max():.0f}')
print(f'Any inf remaining: {np.any(np.isinf(X_train))}')
print(f'Any NaN remaining: {np.any(np.isnan(X_train))}')

In [None]:
models = {
    'ridge': Ridge(alpha=1.0),
    'random_forest': RandomForestRegressor(
        n_estimators=200, max_depth=10, min_samples_leaf=5,
        random_state=SEED, n_jobs=-1),
    'gradient_boosting': GradientBoostingRegressor(
        n_estimators=200, max_depth=5, learning_rate=0.1,
        min_samples_leaf=5, subsample=0.8, random_state=SEED),
}

cv_results = {}
gkf = GroupKFold(n_splits=N_FOLDS)

for name, model in models.items():
    rmses, maes, r2s = [], [], []
    for fold, (tr_idx, te_idx) in enumerate(gkf.split(X_train, y_train, groups_train)):
        scaler = StandardScaler()
        Xtr = scaler.fit_transform(X_train[tr_idx])
        Xte = scaler.transform(X_train[te_idx])
        m = model.__class__(**model.get_params())
        m.fit(Xtr, y_train[tr_idx])
        pred = np.clip(m.predict(Xte), 0, RUL_CAP)
        rmses.append(np.sqrt(mean_squared_error(y_train[te_idx], pred)))
        maes.append(mean_absolute_error(y_train[te_idx], pred))
        r2s.append(r2_score(y_train[te_idx], pred))
    cv_results[name] = {
        'rmse': np.mean(rmses), 'rmse_std': np.std(rmses),
        'mae': np.mean(maes), 'mae_std': np.std(maes),
        'r2': np.mean(r2s),
    }
    print(f'{name:>20s}  RMSE={np.mean(rmses):.2f} ± {np.std(rmses):.2f}  '
          f'MAE={np.mean(maes):.2f}  R²={np.mean(r2s):.4f}')

## 3. Official Test Evaluation (Train→Test transfer)

In [None]:
# Load ground truth RUL for test engines
rul_gt = np.loadtxt(str(RUL_PATH))
rul_gt_capped = np.clip(rul_gt, 0, RUL_CAP)
print(f'Ground truth RUL: {len(rul_gt)} engines')

# Align test features: use common columns with train
test_feat_cols = [c for c in ml_test.columns if c not in META]
common_feats = sorted(set(feat_names) & set(test_feat_cols))
missing_in_test = sorted(set(feat_names) - set(test_feat_cols))
print(f'Common features: {len(common_feats)} / {len(feat_names)}')
if missing_in_test:
    print(f'Missing in test: {len(missing_in_test)} — will fill with 0')

In [None]:
# Build test matrix aligned with train columns
X_test_full = np.zeros((len(ml_test), len(feat_names)))
for i, col in enumerate(feat_names):
    if col in test_feat_cols:
        X_test_full[:, i] = ml_test[col].to_numpy().astype(np.float64)

# Replace inf before imputer, then impute, then catch stragglers
X_test_full = np.where(np.isinf(X_test_full), np.nan, X_test_full)
X_test_full = imputer.transform(X_test_full)
X_test_full = np.nan_to_num(X_test_full, nan=0.0, posinf=0.0, neginf=0.0)

# Get last window per test cohort (closest to failure)
test_cohorts = ml_test['cohort'].unique().sort().to_list()
last_idx = []
for coh in test_cohorts:
    mask = ml_test['cohort'] == coh
    coh_rows = ml_test.filter(mask)
    max_s0 = coh_rows['signal_0_center'].max()
    idx = ml_test.with_row_index('_idx').filter(
        (pl.col('cohort') == coh) & (pl.col('signal_0_center') == max_s0)
    )['_idx'][0]
    last_idx.append(idx)

X_test_last = X_test_full[last_idx]
print(f'Test engines: {len(X_test_last)}')

# Map test cohorts to RUL ground truth (engine_1 → index 0, etc.)
cohort_to_idx = {}
for coh in test_cohorts:
    num = int(coh.replace('engine_', ''))
    cohort_to_idx[coh] = num - 1  # 1-indexed to 0-indexed

y_test_gt = np.array([rul_gt_capped[cohort_to_idx[c]] for c in test_cohorts])
print(f'Ground truth aligned: {len(y_test_gt)} engines')

In [None]:
# Train on full train set, predict on test
scaler_full = StandardScaler()
X_train_scaled = scaler_full.fit_transform(X_train)
X_test_scaled = scaler_full.transform(X_test_last)

test_results = {}
for name, model in models.items():
    m = model.__class__(**model.get_params())
    m.fit(X_train_scaled, y_train)
    pred = np.clip(m.predict(X_test_scaled), 0, RUL_CAP)
    rmse = np.sqrt(mean_squared_error(y_test_gt, pred))
    mae = mean_absolute_error(y_test_gt, pred)
    r2 = r2_score(y_test_gt, pred)
    
    # PHM08 scoring function
    d = pred - y_test_gt
    phm = np.sum(np.where(d < 0, np.exp(-d/13) - 1, np.exp(d/10) - 1))
    bias = np.mean(d)
    n_early = np.sum(d < 0)
    n_late = np.sum(d > 0)
    
    test_results[name] = {
        'rmse': rmse, 'mae': mae, 'r2': r2,
        'phm08_score': phm, 'bias': bias,
        'n_early': int(n_early), 'n_late': int(n_late),
    }
    print(f'{name:>20s}  RMSE={rmse:.2f}  MAE={mae:.2f}  R²={r2:.4f}  '
          f'PHM08={phm:.0f}  bias={bias:.1f} ({n_early}E/{n_late}L)')

## 4. Feature Importance

In [None]:
# Train GB on full data for importance
gb = GradientBoostingRegressor(
    n_estimators=200, max_depth=5, learning_rate=0.1,
    min_samples_leaf=5, subsample=0.8, random_state=SEED)
gb.fit(X_train_scaled, y_train)

imp_idx = np.argsort(gb.feature_importances_)[::-1]
print(f'{"Rank":>4s}  {"Feature":>50s}  {"Importance":>10s}  {"Group":>5s}')
print('-' * 75)
for rank, i in enumerate(imp_idx[:25]):
    feat = feat_names[i]
    group = feat.split('_')[0]
    print(f'{rank+1:>4d}  {feat:>50s}  {gb.feature_importances_[i]:>10.4f}  {group:>5s}')

## 5. Comparison with v1 (pre-causal fix)

In [None]:
# v1 results (from ~/domains/testing/FD_001/test/output_sequential/ml_results/test_summary.json)
v1 = {
    'ridge':             {'rmse': 72.73, 'mae': 61.29, 'phm08_score': 786263, 'bias': 42.5},
    'gradient_boosting': {'rmse': 52.69, 'mae': 45.00, 'phm08_score': 54255,  'bias': 37.1},
    'random_forest':     {'rmse': 51.49, 'mae': 43.88, 'phm08_score': 45151,  'bias': 35.1},
}

print(f'{"Model":>20s}  {"v1 RMSE":>8s}  {"v2 RMSE":>8s}  {"delta":>8s}  {"v1 PHM08":>10s}  {"v2 PHM08":>10s}')
print('-' * 70)
for name in ['ridge', 'random_forest', 'gradient_boosting']:
    v1r = v1[name]['rmse']
    v2r = test_results[name]['rmse']
    v1p = v1[name]['phm08_score']
    v2p = test_results[name]['phm08_score']
    delta = v2r - v1r
    arrow = 'better' if delta < 0 else 'worse'
    print(f'{name:>20s}  {v1r:>8.2f}  {v2r:>8.2f}  {delta:>+8.2f}  {v1p:>10.0f}  {v2p:>10.0f}  {arrow}')

## 6. Save Results

In [None]:
out_dir = TRAIN_DIR / 'ml_results_v2'
out_dir.mkdir(exist_ok=True)

summary = {
    'experiment': EXPERIMENT,
    'dataset': 'FD001',
    'evaluation': 'official_test_split',
    'change': 'causal backward derivatives (no future lookahead)',
    'n_train_rows': int(X_train.shape[0]),
    'n_train_features': int(X_train.shape[1]),
    'n_test_engines': len(test_cohorts),
    'rul_cap': RUL_CAP,
    'best_model': min(test_results, key=lambda k: test_results[k]['rmse']),
    'cv_results': cv_results,
    'test_results': test_results,
}

with open(out_dir / 'model_summary.json', 'w') as f:
    json.dump(summary, f, indent=2, default=str)

print(f'Saved to {out_dir}')
print(json.dumps(summary, indent=2, default=str))