Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions app/features/featuresets/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@ def compute_features(
result, cols = self._compute_exogenous_features(result)
feature_columns.extend(cols)

# 6. Lifecycle features (PRP-3.1B — Phase 2)
if self.config.lifecycle_config:
result, cols = self._compute_lifecycle_features(result)
feature_columns.extend(cols)

# Compute stats
null_counts: dict[str, int] = {}
if feature_columns:
Expand Down Expand Up @@ -403,6 +408,89 @@ def _compute_exogenous_features(self, df: pd.DataFrame) -> tuple[pd.DataFrame, l

return result, columns

def _compute_lifecycle_features(self, df: pd.DataFrame) -> tuple[pd.DataFrame, list[str]]:
"""Compute product-lifecycle features from launch/discontinue dates.

CRITICAL: This method assumes ``df`` has already been sorted by
[*entity_cols, date_col] and cutoff-filtered upstream in
:meth:`compute_features`. It does NOT re-sort or re-filter.

The compute is two-step:
1. Per-row date deltas: ``date - launch_date`` (int days, NaN-safe).
2. Lagged by ``config.lag_days`` per ``(store_id, product_id)`` to
ensure the value at row ``i`` reflects only data at row
``i - lag_days``.

Source columns (must be joined upstream — typically by an extended
:class:`FeatureDataLoader`; see PRP-3.1E):
* ``launch_date`` — ``datetime.date | NaT`` per product
* ``discontinue_date`` — ``datetime.date | NaT`` per product

Defensive behavior: if BOTH source columns are absent (the legacy
``/featuresets/compute`` endpoint does not join product attrs), emit
zero columns and a single info-level log line. This preserves the
additive-contract invariant: callers who set ``lifecycle_config`` but
don't join attrs see ``"lifecycle"`` in ``enabled_features`` but no
new columns in ``feature_columns``. The end-to-end wiring lands in
PRP-3.1E.

Note on signed deltas: ``days_since_discontinue`` is signed (negative
pre-retire, positive post-retire). LightGBM learns the sign — do NOT
clip to non-negative.

Args:
df: Input dataframe (already sorted + cutoff-filtered).

Returns:
Tuple of (dataframe with lifecycle features, list of new column
names).
"""
config = self.config.lifecycle_config
if config is None:
raise RuntimeError("_compute_lifecycle_features called without lifecycle_config")

result = df.copy()
columns: list[str] = []
lag = config.lag_days

# Defensive: skip silently if product attrs were not joined upstream.
# PRP-3.1E will extend FeatureDataLoader to join product.launch_date /
# product.discontinue_date; until then, callers without an extended
# loader see the "lifecycle" family token but zero new columns.
if "launch_date" not in df.columns and "discontinue_date" not in df.columns:
logger.info(
"featureops.lifecycle_skipped_no_product_attrs",
reason="launch_date / discontinue_date columns absent from input df",
hint="loader must join product.launch_date / product.discontinue_date "
"before calling compute_features (see PRP-3.1E)",
)
return result, columns

date_series = pd.to_datetime(result[self.date_col])

if config.include_days_since_launch and "launch_date" in df.columns:
launch = pd.to_datetime(result["launch_date"])
# Pre-shift delta: int days where both dates set, NaN otherwise.
delta_launch: pd.Series[Any] = (date_series - launch).dt.days
# Lag per (store_id, product_id) so row i reflects row i-lag's delta.
col_name = f"days_since_launch_lag{lag}"
result[col_name] = delta_launch.groupby(
[result[c] for c in self.entity_cols], observed=True
).shift(lag)
columns.append(col_name)

if config.include_days_since_discontinue and "discontinue_date" in df.columns:
discontinue = pd.to_datetime(result["discontinue_date"])
# Signed delta: negative pre-retire, positive post-retire, NaN if NULL.
delta_discontinue: pd.Series[Any] = (date_series - discontinue).dt.days
col_name = f"days_since_discontinue_lag{lag}"
result[col_name] = delta_discontinue.groupby(
[result[c] for c in self.entity_cols], observed=True
).shift(lag)
columns.append(col_name)

return result, columns


class FeatureDataLoader:
"""Async data loader for feature computation.
Expand Down
92 changes: 92 additions & 0 deletions app/features/featuresets/tests/test_leakage.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from app.features.featuresets.schemas import (
FeatureSetConfig,
LagConfig,
LifecycleConfig,
RollingConfig,
)
from app.features.featuresets.service import FeatureEngineeringService
Expand Down Expand Up @@ -326,3 +327,94 @@ def test_insufficient_history_has_nan(self, sample_time_series: pd.DataFrame) ->
assert not pd.isna(result.df.iloc[14]["rolling_mean_14"]), (
"Row 14 should have valid rolling_mean_14"
)


class TestLifecycleLeakage:
"""CRITICAL: Lifecycle features must never use future data (PRP-3.1B)."""

def test_days_since_launch_lag1_no_future_leakage(
self, sample_time_series: pd.DataFrame
) -> None:
"""CRITICAL: With a known launch_date and sequential dates, the lagged
column at row i must equal (date[i-1] - launch_date).days exactly.

sample_time_series has 30 sequential days starting 2024-01-01 for
(store=1, product=1). With launch_date=2023-12-25, the per-row
days-since-launch is 7, 8, 9, ..., 36; after shift(1), the lagged
column at row i is the value at row i-1: NaN at row 0, 7 at row 1,
8 at row 2, ... Any other integer is leakage.
"""
df = sample_time_series.copy()
df["launch_date"] = date(2023, 12, 25)
df["discontinue_date"] = pd.NaT

config = FeatureSetConfig(
name="test_lifecycle_leakage",
lifecycle_config=LifecycleConfig(
include_days_since_launch=True,
include_days_since_discontinue=False,
lag_days=1,
),
)
service = FeatureEngineeringService(config)
result = service.compute_features(df)

col = "days_since_launch_lag1"
assert col in result.feature_columns, f"missing column {col} -- wiring regression"

# Row 0: NaN (no prior row to lag from)
assert pd.isna(result.df.iloc[0][col]), (
f"row 0 must be NaN (no history), got {result.df.iloc[0][col]}"
)

# Rows 1..29: exactly (i - 1) + 7 days since launch
# (date[0] is 2024-01-01 -> 7 days since 2023-12-25)
for i in range(1, len(result.df)):
expected = (i - 1) + 7
actual = result.df.iloc[i][col]
assert actual == expected, (
f"LEAKAGE DETECTED at row {i}: {col}={actual} != expected={expected}. "
"Lifecycle feature must reflect data at row i - lag_days only."
)

def test_lifecycle_group_isolation_no_cross_product_leakage(
self, multi_series_time_series: pd.DataFrame
) -> None:
"""CRITICAL: Two products with different launch_dates must produce
independently correct columns -- no cross-series contamination via
groupby boundary."""
df = multi_series_time_series.copy()
# Product 1 launched 2023-12-01 (31 days before 2024-01-01)
# Product 2 launched 2023-12-25 (7 days before 2024-01-01)
launch_map = {1: date(2023, 12, 1), 2: date(2023, 12, 25)}
df["launch_date"] = df["product_id"].map(launch_map)
df["discontinue_date"] = pd.NaT

config = FeatureSetConfig(
name="test_lifecycle_isolation",
entity_columns=("store_id", "product_id"),
lifecycle_config=LifecycleConfig(
include_days_since_launch=True,
include_days_since_discontinue=False,
lag_days=1,
),
)
service = FeatureEngineeringService(config)
result = service.compute_features(df)

for store_id in (1, 2):
for product_id, base_lag in ((1, 31), (2, 7)):
series = result.df[
(result.df["store_id"] == store_id) & (result.df["product_id"] == product_id)
].reset_index(drop=True)
# Row 0: NaN
assert pd.isna(series.iloc[0]["days_since_launch_lag1"]), (
f"({store_id},{product_id}) row 0 must be NaN"
)
# Row 1: base_lag = (date[0] - launch_date).days
actual = series.iloc[1]["days_since_launch_lag1"]
assert actual == base_lag, (
f"CROSS-PRODUCT LEAKAGE: ({store_id},{product_id}) row 1: "
f"days_since_launch_lag1={actual}, expected={base_lag}. "
"Lifecycle lag is mixing across products."
)
124 changes: 124 additions & 0 deletions app/features/featuresets/tests/test_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
FeatureSetConfig,
ImputationConfig,
LagConfig,
LifecycleConfig,
RollingConfig,
)
from app.features.featuresets.service import FeatureEngineeringService
Expand Down Expand Up @@ -317,3 +318,126 @@ def test_empty_dataframe_handling(self):

assert len(result.df) == 0
assert result.feature_columns == ["lag_1"]


class TestLifecycleFeatures:
"""Tests for _compute_lifecycle_features (PRP-3.1B)."""

def test_compute_lifecycle_happy_path(self, sample_time_series: pd.DataFrame) -> None:
"""Happy path: launch_date and discontinue_date both set; produces
both lagged columns with expected integer values."""
df = sample_time_series.copy()
df["launch_date"] = date(2024, 1, 1) # delta starts at 0
df["discontinue_date"] = date(2024, 1, 15) # signed crossover

config = FeatureSetConfig(
name="lc_happy",
lifecycle_config=LifecycleConfig(
include_days_since_launch=True,
include_days_since_discontinue=True,
lag_days=1,
),
)
service = FeatureEngineeringService(config)
result = service.compute_features(df)

assert "days_since_launch_lag1" in result.feature_columns
assert "days_since_discontinue_lag1" in result.feature_columns

# Row 1 (date=2024-01-02): lag1 reflects row 0 (date=2024-01-01)
# days_since_launch at row 0 = 0; days_since_discontinue at row 0 = -14
assert result.df.iloc[1]["days_since_launch_lag1"] == 0
assert result.df.iloc[1]["days_since_discontinue_lag1"] == -14

# Row 16 (date=2024-01-17): lag1 reflects row 15 (date=2024-01-16)
# days_since_launch at row 15 = 15; days_since_discontinue at row 15 = +1
assert result.df.iloc[16]["days_since_launch_lag1"] == 15
assert result.df.iloc[16]["days_since_discontinue_lag1"] == 1

def test_compute_lifecycle_null_launch_date(self, sample_time_series: pd.DataFrame) -> None:
"""NULL launch_date -> all-NaN lifecycle column, no exception."""
df = sample_time_series.copy()
df["launch_date"] = pd.NaT
df["discontinue_date"] = pd.NaT

config = FeatureSetConfig(
name="lc_null",
lifecycle_config=LifecycleConfig(
include_days_since_launch=True,
include_days_since_discontinue=False,
lag_days=1,
),
)
service = FeatureEngineeringService(config)
result = service.compute_features(df)

col = "days_since_launch_lag1"
assert col in result.feature_columns
assert result.df[col].isna().all(), "NULL launch_date must produce all-NaN column"

def test_compute_lifecycle_discontinue_before_cutoff(
self, sample_time_series: pd.DataFrame
) -> None:
"""discontinue_date before all rows -> positive integer for every row."""
df = sample_time_series.copy()
df["launch_date"] = date(2023, 1, 1)
df["discontinue_date"] = date(2023, 12, 25) # 7 days before row 0

config = FeatureSetConfig(
name="lc_post_discontinue",
lifecycle_config=LifecycleConfig(
include_days_since_launch=False,
include_days_since_discontinue=True,
lag_days=1,
),
)
service = FeatureEngineeringService(config)
result = service.compute_features(df)

# Row 1: lag1 reflects row 0 -> date=2024-01-01 - discontinue=2023-12-25 = +7
assert result.df.iloc[1]["days_since_discontinue_lag1"] == 7
# Row 8: lag1 reflects row 7 -> 2024-01-08 - 2023-12-25 = +14
assert result.df.iloc[8]["days_since_discontinue_lag1"] == 14
# All non-NaN values must be >= 0 (discontinue is in the past)
non_na = result.df["days_since_discontinue_lag1"].dropna()
assert (non_na >= 0).all(), "with discontinue in the past, all lagged values must be >= 0"

def test_compute_lifecycle_skipped_when_attrs_absent(
self, sample_time_series: pd.DataFrame
) -> None:
"""Defensive: missing product-attrs columns -> zero new columns, no crash.

This is the contract for the legacy /featuresets/compute path; PRP-3.1E
adds the loader extension that joins product attrs.
"""
# sample_time_series has NO launch_date / discontinue_date columns.
config = FeatureSetConfig(
name="lc_no_attrs",
lifecycle_config=LifecycleConfig(),
)
service = FeatureEngineeringService(config)
result = service.compute_features(sample_time_series)

assert "days_since_launch_lag1" not in result.feature_columns
assert "days_since_discontinue_lag1" not in result.feature_columns
# The family token still appears via get_enabled_features (set in PRP-3.1A).
assert "lifecycle" in config.get_enabled_features()

def test_compute_lifecycle_uses_phase2_fixture(
self,
sample_time_series: pd.DataFrame,
phase2_product_attrs_df: pd.DataFrame,
) -> None:
"""End-to-end merge with the PRP-3.1A fixture: P1 launched 2023-06-01."""
df = sample_time_series.merge(phase2_product_attrs_df, on="product_id", how="left")
config = FeatureSetConfig(
name="lc_phase2_fixture",
lifecycle_config=LifecycleConfig(lag_days=1),
)
service = FeatureEngineeringService(config)
result = service.compute_features(df)

# P1 launched 2023-06-01; 2024-01-01 is 214 days after
# -> row 1 (date=2024-01-02, lag1 reflects row 0) = 214
expected = (date(2024, 1, 1) - date(2023, 6, 1)).days
assert result.df.iloc[1]["days_since_launch_lag1"] == expected