diff --git a/app/features/featuresets/service.py b/app/features/featuresets/service.py index 417babaa..669772a6 100644 --- a/app/features/featuresets/service.py +++ b/app/features/featuresets/service.py @@ -133,6 +133,11 @@ def compute_features( result, cols = self._compute_exogenous_features(result) feature_columns.extend(cols) + # 6. Lifecycle features (PRP-3.1B — Phase 2) + if self.config.lifecycle_config: + result, cols = self._compute_lifecycle_features(result) + feature_columns.extend(cols) + # Compute stats null_counts: dict[str, int] = {} if feature_columns: @@ -403,6 +408,89 @@ def _compute_exogenous_features(self, df: pd.DataFrame) -> tuple[pd.DataFrame, l return result, columns + def _compute_lifecycle_features(self, df: pd.DataFrame) -> tuple[pd.DataFrame, list[str]]: + """Compute product-lifecycle features from launch/discontinue dates. + + CRITICAL: This method assumes ``df`` has already been sorted by + [*entity_cols, date_col] and cutoff-filtered upstream in + :meth:`compute_features`. It does NOT re-sort or re-filter. + + The compute is two-step: + 1. Per-row date deltas: ``date - launch_date`` (int days, NaN-safe). + 2. Lagged by ``config.lag_days`` per ``(store_id, product_id)`` to + ensure the value at row ``i`` reflects only data at row + ``i - lag_days``. + + Source columns (must be joined upstream — typically by an extended + :class:`FeatureDataLoader`; see PRP-3.1E): + * ``launch_date`` — ``datetime.date | NaT`` per product + * ``discontinue_date`` — ``datetime.date | NaT`` per product + + Defensive behavior: if BOTH source columns are absent (the legacy + ``/featuresets/compute`` endpoint does not join product attrs), emit + zero columns and a single info-level log line. This preserves the + additive-contract invariant: callers who set ``lifecycle_config`` but + don't join attrs see ``"lifecycle"`` in ``enabled_features`` but no + new columns in ``feature_columns``. The end-to-end wiring lands in + PRP-3.1E. + + Note on signed deltas: ``days_since_discontinue`` is signed (negative + pre-retire, positive post-retire). LightGBM learns the sign — do NOT + clip to non-negative. + + Args: + df: Input dataframe (already sorted + cutoff-filtered). + + Returns: + Tuple of (dataframe with lifecycle features, list of new column + names). + """ + config = self.config.lifecycle_config + if config is None: + raise RuntimeError("_compute_lifecycle_features called without lifecycle_config") + + result = df.copy() + columns: list[str] = [] + lag = config.lag_days + + # Defensive: skip silently if product attrs were not joined upstream. + # PRP-3.1E will extend FeatureDataLoader to join product.launch_date / + # product.discontinue_date; until then, callers without an extended + # loader see the "lifecycle" family token but zero new columns. + if "launch_date" not in df.columns and "discontinue_date" not in df.columns: + logger.info( + "featureops.lifecycle_skipped_no_product_attrs", + reason="launch_date / discontinue_date columns absent from input df", + hint="loader must join product.launch_date / product.discontinue_date " + "before calling compute_features (see PRP-3.1E)", + ) + return result, columns + + date_series = pd.to_datetime(result[self.date_col]) + + if config.include_days_since_launch and "launch_date" in df.columns: + launch = pd.to_datetime(result["launch_date"]) + # Pre-shift delta: int days where both dates set, NaN otherwise. + delta_launch: pd.Series[Any] = (date_series - launch).dt.days + # Lag per (store_id, product_id) so row i reflects row i-lag's delta. + col_name = f"days_since_launch_lag{lag}" + result[col_name] = delta_launch.groupby( + [result[c] for c in self.entity_cols], observed=True + ).shift(lag) + columns.append(col_name) + + if config.include_days_since_discontinue and "discontinue_date" in df.columns: + discontinue = pd.to_datetime(result["discontinue_date"]) + # Signed delta: negative pre-retire, positive post-retire, NaN if NULL. + delta_discontinue: pd.Series[Any] = (date_series - discontinue).dt.days + col_name = f"days_since_discontinue_lag{lag}" + result[col_name] = delta_discontinue.groupby( + [result[c] for c in self.entity_cols], observed=True + ).shift(lag) + columns.append(col_name) + + return result, columns + class FeatureDataLoader: """Async data loader for feature computation. diff --git a/app/features/featuresets/tests/test_leakage.py b/app/features/featuresets/tests/test_leakage.py index 7f32b112..ad9c7d71 100644 --- a/app/features/featuresets/tests/test_leakage.py +++ b/app/features/featuresets/tests/test_leakage.py @@ -12,6 +12,7 @@ from app.features.featuresets.schemas import ( FeatureSetConfig, LagConfig, + LifecycleConfig, RollingConfig, ) from app.features.featuresets.service import FeatureEngineeringService @@ -326,3 +327,94 @@ def test_insufficient_history_has_nan(self, sample_time_series: pd.DataFrame) -> assert not pd.isna(result.df.iloc[14]["rolling_mean_14"]), ( "Row 14 should have valid rolling_mean_14" ) + + +class TestLifecycleLeakage: + """CRITICAL: Lifecycle features must never use future data (PRP-3.1B).""" + + def test_days_since_launch_lag1_no_future_leakage( + self, sample_time_series: pd.DataFrame + ) -> None: + """CRITICAL: With a known launch_date and sequential dates, the lagged + column at row i must equal (date[i-1] - launch_date).days exactly. + + sample_time_series has 30 sequential days starting 2024-01-01 for + (store=1, product=1). With launch_date=2023-12-25, the per-row + days-since-launch is 7, 8, 9, ..., 36; after shift(1), the lagged + column at row i is the value at row i-1: NaN at row 0, 7 at row 1, + 8 at row 2, ... Any other integer is leakage. + """ + df = sample_time_series.copy() + df["launch_date"] = date(2023, 12, 25) + df["discontinue_date"] = pd.NaT + + config = FeatureSetConfig( + name="test_lifecycle_leakage", + lifecycle_config=LifecycleConfig( + include_days_since_launch=True, + include_days_since_discontinue=False, + lag_days=1, + ), + ) + service = FeatureEngineeringService(config) + result = service.compute_features(df) + + col = "days_since_launch_lag1" + assert col in result.feature_columns, f"missing column {col} -- wiring regression" + + # Row 0: NaN (no prior row to lag from) + assert pd.isna(result.df.iloc[0][col]), ( + f"row 0 must be NaN (no history), got {result.df.iloc[0][col]}" + ) + + # Rows 1..29: exactly (i - 1) + 7 days since launch + # (date[0] is 2024-01-01 -> 7 days since 2023-12-25) + for i in range(1, len(result.df)): + expected = (i - 1) + 7 + actual = result.df.iloc[i][col] + assert actual == expected, ( + f"LEAKAGE DETECTED at row {i}: {col}={actual} != expected={expected}. " + "Lifecycle feature must reflect data at row i - lag_days only." + ) + + def test_lifecycle_group_isolation_no_cross_product_leakage( + self, multi_series_time_series: pd.DataFrame + ) -> None: + """CRITICAL: Two products with different launch_dates must produce + independently correct columns -- no cross-series contamination via + groupby boundary.""" + df = multi_series_time_series.copy() + # Product 1 launched 2023-12-01 (31 days before 2024-01-01) + # Product 2 launched 2023-12-25 (7 days before 2024-01-01) + launch_map = {1: date(2023, 12, 1), 2: date(2023, 12, 25)} + df["launch_date"] = df["product_id"].map(launch_map) + df["discontinue_date"] = pd.NaT + + config = FeatureSetConfig( + name="test_lifecycle_isolation", + entity_columns=("store_id", "product_id"), + lifecycle_config=LifecycleConfig( + include_days_since_launch=True, + include_days_since_discontinue=False, + lag_days=1, + ), + ) + service = FeatureEngineeringService(config) + result = service.compute_features(df) + + for store_id in (1, 2): + for product_id, base_lag in ((1, 31), (2, 7)): + series = result.df[ + (result.df["store_id"] == store_id) & (result.df["product_id"] == product_id) + ].reset_index(drop=True) + # Row 0: NaN + assert pd.isna(series.iloc[0]["days_since_launch_lag1"]), ( + f"({store_id},{product_id}) row 0 must be NaN" + ) + # Row 1: base_lag = (date[0] - launch_date).days + actual = series.iloc[1]["days_since_launch_lag1"] + assert actual == base_lag, ( + f"CROSS-PRODUCT LEAKAGE: ({store_id},{product_id}) row 1: " + f"days_since_launch_lag1={actual}, expected={base_lag}. " + "Lifecycle lag is mixing across products." + ) diff --git a/app/features/featuresets/tests/test_service.py b/app/features/featuresets/tests/test_service.py index 9faddb7e..22f4f96c 100644 --- a/app/features/featuresets/tests/test_service.py +++ b/app/features/featuresets/tests/test_service.py @@ -10,6 +10,7 @@ FeatureSetConfig, ImputationConfig, LagConfig, + LifecycleConfig, RollingConfig, ) from app.features.featuresets.service import FeatureEngineeringService @@ -317,3 +318,126 @@ def test_empty_dataframe_handling(self): assert len(result.df) == 0 assert result.feature_columns == ["lag_1"] + + +class TestLifecycleFeatures: + """Tests for _compute_lifecycle_features (PRP-3.1B).""" + + def test_compute_lifecycle_happy_path(self, sample_time_series: pd.DataFrame) -> None: + """Happy path: launch_date and discontinue_date both set; produces + both lagged columns with expected integer values.""" + df = sample_time_series.copy() + df["launch_date"] = date(2024, 1, 1) # delta starts at 0 + df["discontinue_date"] = date(2024, 1, 15) # signed crossover + + config = FeatureSetConfig( + name="lc_happy", + lifecycle_config=LifecycleConfig( + include_days_since_launch=True, + include_days_since_discontinue=True, + lag_days=1, + ), + ) + service = FeatureEngineeringService(config) + result = service.compute_features(df) + + assert "days_since_launch_lag1" in result.feature_columns + assert "days_since_discontinue_lag1" in result.feature_columns + + # Row 1 (date=2024-01-02): lag1 reflects row 0 (date=2024-01-01) + # days_since_launch at row 0 = 0; days_since_discontinue at row 0 = -14 + assert result.df.iloc[1]["days_since_launch_lag1"] == 0 + assert result.df.iloc[1]["days_since_discontinue_lag1"] == -14 + + # Row 16 (date=2024-01-17): lag1 reflects row 15 (date=2024-01-16) + # days_since_launch at row 15 = 15; days_since_discontinue at row 15 = +1 + assert result.df.iloc[16]["days_since_launch_lag1"] == 15 + assert result.df.iloc[16]["days_since_discontinue_lag1"] == 1 + + def test_compute_lifecycle_null_launch_date(self, sample_time_series: pd.DataFrame) -> None: + """NULL launch_date -> all-NaN lifecycle column, no exception.""" + df = sample_time_series.copy() + df["launch_date"] = pd.NaT + df["discontinue_date"] = pd.NaT + + config = FeatureSetConfig( + name="lc_null", + lifecycle_config=LifecycleConfig( + include_days_since_launch=True, + include_days_since_discontinue=False, + lag_days=1, + ), + ) + service = FeatureEngineeringService(config) + result = service.compute_features(df) + + col = "days_since_launch_lag1" + assert col in result.feature_columns + assert result.df[col].isna().all(), "NULL launch_date must produce all-NaN column" + + def test_compute_lifecycle_discontinue_before_cutoff( + self, sample_time_series: pd.DataFrame + ) -> None: + """discontinue_date before all rows -> positive integer for every row.""" + df = sample_time_series.copy() + df["launch_date"] = date(2023, 1, 1) + df["discontinue_date"] = date(2023, 12, 25) # 7 days before row 0 + + config = FeatureSetConfig( + name="lc_post_discontinue", + lifecycle_config=LifecycleConfig( + include_days_since_launch=False, + include_days_since_discontinue=True, + lag_days=1, + ), + ) + service = FeatureEngineeringService(config) + result = service.compute_features(df) + + # Row 1: lag1 reflects row 0 -> date=2024-01-01 - discontinue=2023-12-25 = +7 + assert result.df.iloc[1]["days_since_discontinue_lag1"] == 7 + # Row 8: lag1 reflects row 7 -> 2024-01-08 - 2023-12-25 = +14 + assert result.df.iloc[8]["days_since_discontinue_lag1"] == 14 + # All non-NaN values must be >= 0 (discontinue is in the past) + non_na = result.df["days_since_discontinue_lag1"].dropna() + assert (non_na >= 0).all(), "with discontinue in the past, all lagged values must be >= 0" + + def test_compute_lifecycle_skipped_when_attrs_absent( + self, sample_time_series: pd.DataFrame + ) -> None: + """Defensive: missing product-attrs columns -> zero new columns, no crash. + + This is the contract for the legacy /featuresets/compute path; PRP-3.1E + adds the loader extension that joins product attrs. + """ + # sample_time_series has NO launch_date / discontinue_date columns. + config = FeatureSetConfig( + name="lc_no_attrs", + lifecycle_config=LifecycleConfig(), + ) + service = FeatureEngineeringService(config) + result = service.compute_features(sample_time_series) + + assert "days_since_launch_lag1" not in result.feature_columns + assert "days_since_discontinue_lag1" not in result.feature_columns + # The family token still appears via get_enabled_features (set in PRP-3.1A). + assert "lifecycle" in config.get_enabled_features() + + def test_compute_lifecycle_uses_phase2_fixture( + self, + sample_time_series: pd.DataFrame, + phase2_product_attrs_df: pd.DataFrame, + ) -> None: + """End-to-end merge with the PRP-3.1A fixture: P1 launched 2023-06-01.""" + df = sample_time_series.merge(phase2_product_attrs_df, on="product_id", how="left") + config = FeatureSetConfig( + name="lc_phase2_fixture", + lifecycle_config=LifecycleConfig(lag_days=1), + ) + service = FeatureEngineeringService(config) + result = service.compute_features(df) + + # P1 launched 2023-06-01; 2024-01-01 is 214 days after + # -> row 1 (date=2024-01-02, lag1 reflects row 0) = 214 + expected = (date(2024, 1, 1) - date(2023, 6, 1)).days + assert result.df.iloc[1]["days_since_launch_lag1"] == expected