From 3bf8af4bbb1dbc94efcdc6b988fd98d20eb7734b Mon Sep 17 00:00:00 2001 From: Shay Palachy Date: Tue, 21 Apr 2026 14:18:28 +0300 Subject: [PATCH 1/8] =?UTF-8?q?feat:=20Milestone=203=20=E2=80=94=20schema?= =?UTF-8?q?=20layer=20(entities,=20IDs,=20FK=20validation,=20features,=20t?= =?UTF-8?q?asks)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - core/ids.py: make_id(prefix, n) → zero-padded entity ID strings; ID_PREFIXES registry covering all 9 entity namespaces - schema/entities.py: typed row dataclasses for all 9 v1 tables (AccountRow, ContactRow, LeadRow, TouchRow, SessionRow, SalesActivityRow, OpportunityRow, CustomerRow, SubscriptionRow) each with DTYPE_MAP, to_dict(), empty_dataframe(), and Parquet round-trip support via schema/tables.py - schema/relationships.py: FKConstraint dataclass, ALL_CONSTRAINTS tuple (10 FK edges from §9.2), validate_fk() raising FKViolationError - schema/features.py: FeatureSpec frozen dataclass; LEAD_SNAPSHOT_FEATURES — 29 pre-anchor features + 1 target for converted_within_90_days task - schema/dictionaries.py: feature_dictionary_df() and write_feature_dictionary() for the mandatory feature_dictionary.csv - schema/tasks.py: SplitSpec + TaskManifest frozen dataclasses; CONVERTED_WITHIN_90_DAYS constant (70/15/15 split, 90-day window) - pyproject.toml: pandas≥2.0 and pyarrow≥14.0 added as core deps; mypy overrides to ignore missing stubs for both - 82 new tests (ids, entities, relationships, features, tasks); total 192 passing; ruff + mypy clean Co-Authored-By: Claude Sonnet 4.6 --- .agent-plan.md | 47 ++-- leadforge/core/hashing.py | 2 +- leadforge/core/ids.py | 67 +++++- leadforge/narrative/spec.py | 18 +- leadforge/schema/dictionaries.py | 66 +++++ leadforge/schema/entities.py | 375 +++++++++++++++++++++++++++++ leadforge/schema/features.py | 211 ++++++++++++++++ leadforge/schema/relationships.py | 67 ++++++ leadforge/schema/tables.py | 35 +++ leadforge/schema/tasks.py | 94 ++++++++ pyproject.toml | 6 + tests/schema/__init__.py | 0 tests/schema/test_entities.py | 176 ++++++++++++++ tests/schema/test_features.py | 116 +++++++++ tests/schema/test_ids.py | 63 +++++ tests/schema/test_relationships.py | 59 +++++ tests/schema/test_tasks.py | 89 +++++++ 17 files changed, 1449 insertions(+), 42 deletions(-) create mode 100644 leadforge/schema/dictionaries.py create mode 100644 leadforge/schema/entities.py create mode 100644 leadforge/schema/features.py create mode 100644 leadforge/schema/relationships.py create mode 100644 leadforge/schema/tables.py create mode 100644 leadforge/schema/tasks.py create mode 100644 tests/schema/__init__.py create mode 100644 tests/schema/test_entities.py create mode 100644 tests/schema/test_features.py create mode 100644 tests/schema/test_ids.py create mode 100644 tests/schema/test_relationships.py create mode 100644 tests/schema/test_tasks.py diff --git a/.agent-plan.md b/.agent-plan.md index ae236dc..447c76f 100644 --- a/.agent-plan.md +++ b/.agent-plan.md @@ -6,41 +6,50 @@ ## Current System State -**v0.2.0 in progress — Milestone 2 complete (PR open).** Typed `NarrativeSpec` hierarchy, `WorldSpec` -with narrative field, `Generator.from_recipe()` populates `world_spec`, dataset card renderer, and -full test coverage. 110 tests passing. +**v0.2.0 in progress — Milestone 3 complete (PR open).** All 9 relational table schemas defined as +typed row dataclasses with Parquet round-trip support. FK constraints, ID generation, feature +dictionary, and task manifest implemented. 192 tests passing. --- -## Active Task Breakdown — Milestone 3: Schema Layer (v0.2.0 cont.) +## Active Task Breakdown — Milestone 4: World Structure (v0.3.0) -Goal: Define the relational entity schema (accounts, contacts, leads, etc.) and feature dictionary. +Goal: Implement the hidden world graph — DAG of latent nodes, motif families, and stochastic rewiring. -- [ ] **1. Entity schema** - - Implement `schema/entities.py`: typed dataclasses for `Account`, `Contact`, `Lead` - - Implement `schema/events.py`: `Touch`, `SalesActivity`, `Opportunity` etc. - -- [ ] **2. Feature dictionary** - - Implement `schema/features.py` + `schema/dictionaries.py` - - Generate `feature_dictionary.csv` stub - -- [ ] **3. Task schema** - - Implement `schema/tasks.py`: `converted_within_90_days` task manifest structure +- [ ] **1. Node type system** (`structure/node_types.py`) +- [ ] **2. World graph** (`structure/graph.py`) — `networkx.DiGraph`, DAG validation +- [ ] **3. Motif families** (`structure/motifs.py`, `structure/templates.py`) — 5 v1 families +- [ ] **4. Stochastic rewiring** (`structure/rewiring.py`) — seeded perturbation +- [ ] **5. Sampler** (`structure/sampler.py`) — draw a world graph from a motif + config --- ## Context Pointers -- Milestone 3 scope: `docs/leadforge_implementation_plan.md` §6 "Milestone 3" +- Milestone 4 scope: `docs/leadforge_implementation_plan.md` §7 "Milestone 4" - Full milestone dependency graph: `docs/leadforge_implementation_plan.md` §6 -- Schema spec: `docs/leadforge_architecture_spec.md` §8 -- Recipe assets: `leadforge/recipes/b2b_saas_procurement_v1/` +- Structure spec: `docs/leadforge_architecture_spec.md` §11 +- Motif families: `docs/leadforge_architecture_spec.md` §11.2 --- ## Completed Phases -### Milestone 2 — Narrative Layer ✓ (v0.2.0 in PR) +### Milestone 3 — Schema Layer ✓ (v0.2.0 in PR) +- `leadforge/core/ids.py`: `make_id(prefix, n)` + `ID_PREFIXES` registry +- `leadforge/schema/entities.py`: typed row dataclasses for all 9 tables (accounts, contacts, + leads, touches, sessions, sales_activities, opportunities, customers, subscriptions) with + `DTYPE_MAP`, `to_dict()`, `empty_dataframe()`, Parquet round-trip via `schema/tables.py` +- `leadforge/schema/relationships.py`: `FKConstraint`, `ALL_CONSTRAINTS` (10 FK edges), + `validate_fk()` helper raising `FKViolationError` +- `leadforge/schema/features.py`: `FeatureSpec` frozen dataclass + `LEAD_SNAPSHOT_FEATURES` + (29 features, one target) +- `leadforge/schema/dictionaries.py`: `feature_dictionary_df()` + `write_feature_dictionary()` +- `leadforge/schema/tasks.py`: `SplitSpec`, `TaskManifest`, `CONVERTED_WITHIN_90_DAYS` constant +- `pyproject.toml`: added pandas≥2.0 + pyarrow≥14.0 as core deps; mypy overrides for both +- 82 new tests; total 192 passing + +### Milestone 2 — Narrative Layer ✓ (v0.2.0 merged) - `leadforge/narrative/spec.py`: frozen dataclasses `NarrativeSpec`, `CompanySpec`, `ProductSpec`, `MarketSpec`, `GtmMotionSpec`, `PersonaSpec`, `FunnelStageSpec` — all with validated `from_dict()` - `leadforge/narrative/dataset_card.py`: `render_dataset_card(world_spec)` — Markdown card diff --git a/leadforge/core/hashing.py b/leadforge/core/hashing.py index 513d70e..2bdbcc7 100644 --- a/leadforge/core/hashing.py +++ b/leadforge/core/hashing.py @@ -18,7 +18,7 @@ def _canonical(obj: Any) -> Any: """Recursively convert to a JSON-stable form (sorted keys, enums → str).""" if isinstance(obj, dict): return {k: _canonical(v) for k, v in sorted(obj.items())} - if isinstance(obj, (list, tuple)): + if isinstance(obj, list | tuple): return [_canonical(v) for v in obj] # StrEnum values are already strings; this handles plain Enum too if hasattr(obj, "value"): diff --git a/leadforge/core/ids.py b/leadforge/core/ids.py index ba1cad7..fdb2af8 100644 --- a/leadforge/core/ids.py +++ b/leadforge/core/ids.py @@ -1,16 +1,57 @@ """Entity ID generation. -Implemented in Milestone 3. All IDs must be stable, opaque, namespace-unique, -and deterministic for a given run. - -Canonical prefixes: - acct_ — Account - cnt_ — Contact - lead_ — Lead - touch_ — Touch - sess_ — Session - act_ — SalesActivity - opp_ — Opportunity - cust_ — Customer - sub_ — Subscription +All IDs are stable, opaque, namespace-unique, and deterministic for a given +(recipe, config, seed) triple. Callers derive a dedicated RNG substream via +``RNGRoot.child()`` and pass a monotonically increasing counter to +:func:`make_id`. + +Canonical prefixes +------------------ +acct_ — Account +cnt_ — Contact +lead_ — Lead +touch_ — Touch +sess_ — Session +act_ — SalesActivity +opp_ — Opportunity +cust_ — Customer +sub_ — Subscription +rep_ — Sales rep (internal) """ + +from __future__ import annotations + +# Canonical prefix registry — single source of truth used by tests and +# simulation code alike. +ID_PREFIXES: dict[str, str] = { + "account": "acct", + "contact": "cnt", + "lead": "lead", + "touch": "touch", + "session": "sess", + "sales_activity": "act", + "opportunity": "opp", + "customer": "cust", + "subscription": "sub", + "rep": "rep", +} + +_PAD_WIDTH = 6 # e.g. acct_000001 + + +def make_id(prefix: str, n: int) -> str: + """Return a zero-padded entity ID string. + + Args: + prefix: The namespace prefix (e.g. ``"acct"``). + n: A 1-based counter for this entity type within one generation run. + + Returns: + A string of the form ``"_"``; e.g. ``"acct_000001"``. + + Raises: + ValueError: if *n* is not a positive integer. + """ + if not isinstance(n, int) or isinstance(n, bool) or n < 1: + raise ValueError(f"n must be a positive int, got {n!r}") + return f"{prefix}_{n:0{_PAD_WIDTH}d}" diff --git a/leadforge/narrative/spec.py b/leadforge/narrative/spec.py index d873b03..bae176d 100644 --- a/leadforge/narrative/spec.py +++ b/leadforge/narrative/spec.py @@ -37,7 +37,7 @@ def from_dict(cls, data: dict[str, Any]) -> CompanySpec: ) er = data["employee_range"] if not ( - isinstance(er, (list, tuple)) + isinstance(er, list | tuple) and len(er) == 2 and all(isinstance(v, int) and not isinstance(v, bool) for v in er) ): @@ -85,7 +85,7 @@ def from_dict(cls, data: dict[str, Any]) -> ProductSpec: ) acv = data["acv_range_usd"] if not ( - isinstance(acv, (list, tuple)) + isinstance(acv, list | tuple) and len(acv) == 2 and all(isinstance(v, int) and not isinstance(v, bool) for v in acv) ): @@ -93,7 +93,7 @@ def from_dict(cls, data: dict[str, Any]) -> ProductSpec: f"product.acv_range_usd must be a [min, max] int pair, got {acv!r}" ) terms = data["contract_terms_months"] - if not isinstance(terms, (list, tuple)) or not all( + if not isinstance(terms, list | tuple) or not all( isinstance(v, int) and not isinstance(v, bool) for v in terms ): raise InvalidRecipeError( @@ -141,7 +141,7 @@ def from_dict(cls, data: dict[str, Any]) -> MarketSpec: ) er = data["icp_employee_range"] if not ( - isinstance(er, (list, tuple)) + isinstance(er, list | tuple) and len(er) == 2 and all(isinstance(v, int) and not isinstance(v, bool) for v in er) ): @@ -149,7 +149,7 @@ def from_dict(cls, data: dict[str, Any]) -> MarketSpec: f"market.icp_employee_range must be a [min, max] int pair, got {er!r}" ) industries = data["icp_industries"] - if not isinstance(industries, (list, tuple)): + if not isinstance(industries, list | tuple): raise InvalidRecipeError( f"market.icp_industries must be a list of strings, got {industries!r}" ) @@ -158,7 +158,7 @@ def from_dict(cls, data: dict[str, Any]) -> MarketSpec: f"market.icp_industries must contain only strings, got {industries!r}" ) geographies = data["geographies"] - if not isinstance(geographies, (list, tuple)): + if not isinstance(geographies, list | tuple): raise InvalidRecipeError( f"market.geographies must be a list of strings, got {geographies!r}" ) @@ -194,13 +194,13 @@ def from_dict(cls, data: dict[str, Any]) -> GtmMotionSpec: "gtm_motion", ) channels = data["channels"] - if not isinstance(channels, (list, tuple)) or not all(isinstance(c, str) for c in channels): + if not isinstance(channels, list | tuple) or not all(isinstance(c, str) for c in channels): raise InvalidRecipeError( f"gtm_motion.channels must be a list of strings, got {channels!r}" ) for share_name in ("inbound_share", "outbound_share", "partner_share"): v = data[share_name] - if isinstance(v, bool) or not isinstance(v, (int, float)): + if isinstance(v, bool) or not isinstance(v, int | float): raise InvalidRecipeError( f"gtm_motion.{share_name} must be a float in [0, 1], got {type(v).__name__!r}" ) @@ -232,7 +232,7 @@ def from_dict(cls, data: dict[str, Any]) -> PersonaSpec: ) title_variants = data["title_variants"] if not ( - isinstance(title_variants, (list, tuple)) + isinstance(title_variants, list | tuple) and all(isinstance(t, str) for t in title_variants) ): raise InvalidRecipeError( diff --git a/leadforge/schema/dictionaries.py b/leadforge/schema/dictionaries.py new file mode 100644 index 0000000..39946e9 --- /dev/null +++ b/leadforge/schema/dictionaries.py @@ -0,0 +1,66 @@ +"""Feature dictionary builder. + +Converts :data:`~leadforge.schema.features.LEAD_SNAPSHOT_FEATURES` into a +``pd.DataFrame`` and optionally writes it as ``feature_dictionary.csv`` — one +of the three files required in every bundle output mode (§14.1). +""" + +from __future__ import annotations + +from pathlib import Path +from typing import TYPE_CHECKING + +import pandas as pd + +from leadforge.schema.features import LEAD_SNAPSHOT_FEATURES, FeatureSpec + +if TYPE_CHECKING: + pass + +_COLUMNS = ("name", "dtype", "description", "category", "is_target", "leakage_risk") + + +def feature_dictionary_df( + features: tuple[FeatureSpec, ...] = LEAD_SNAPSHOT_FEATURES, +) -> pd.DataFrame: + """Return the feature dictionary as a ``pd.DataFrame``. + + Columns: name, dtype, description, category, is_target, leakage_risk. + + Args: + features: Ordered tuple of :class:`~leadforge.schema.features.FeatureSpec` + objects. Defaults to the canonical lead snapshot feature list. + + Returns: + A ``pd.DataFrame`` with one row per feature, string columns for + categorical fields and boolean columns for flag fields. + """ + rows = [ + { + "name": f.name, + "dtype": f.dtype, + "description": f.description, + "category": f.category, + "is_target": f.is_target, + "leakage_risk": f.leakage_risk, + } + for f in features + ] + df = pd.DataFrame(rows, columns=list(_COLUMNS)) + df["is_target"] = df["is_target"].astype("boolean") + df["leakage_risk"] = df["leakage_risk"].astype("boolean") + return df + + +def write_feature_dictionary( + path: Path, + features: tuple[FeatureSpec, ...] = LEAD_SNAPSHOT_FEATURES, +) -> None: + """Write the feature dictionary CSV to *path*. + + Args: + path: Destination file path (created with ``parents=True``). + features: Feature list to serialize. Defaults to the canonical list. + """ + path.parent.mkdir(parents=True, exist_ok=True) + feature_dictionary_df(features).to_csv(path, index=False) diff --git a/leadforge/schema/entities.py b/leadforge/schema/entities.py new file mode 100644 index 0000000..df25b4c --- /dev/null +++ b/leadforge/schema/entities.py @@ -0,0 +1,375 @@ +"""Typed row contracts for all v1 relational tables. + +Each class represents one row in a Parquet table. Fields map directly to +the column specifications in §16 of the architecture spec. Optional columns +(nullable in the output) use ``... | None`` typing. + +All row classes expose: + +- ``TABLE_NAME`` — the canonical Parquet table name (no extension). +- ``DTYPE_MAP`` — ``{column: pandas-dtype-string}`` used to build empty + DataFrames with the right schema. +- ``to_dict()`` — returns a plain ``dict`` suitable for ``pd.DataFrame([...])`` + or JSON serialization. +- ``empty_dataframe()`` — class method returning a zero-row ``pd.DataFrame`` + with the correct columns and nullable dtypes. +""" + +from __future__ import annotations + +from dataclasses import dataclass, fields +from typing import Any, ClassVar + +import pandas as pd + + +def _empty_df(dtype_map: dict[str, str]) -> pd.DataFrame: + """Return a zero-row DataFrame with columns ordered as *dtype_map*.""" + return pd.DataFrame({col: pd.array([], dtype=dtype) for col, dtype in dtype_map.items()}) + + +# --------------------------------------------------------------------------- +# accounts +# --------------------------------------------------------------------------- + + +@dataclass +class AccountRow: + """One row in the ``accounts`` table.""" + + TABLE_NAME: ClassVar[str] = "accounts" + DTYPE_MAP: ClassVar[dict[str, str]] = { + "account_id": "string", + "company_name": "string", + "industry": "string", + "region": "string", + "employee_band": "string", + "estimated_revenue_band": "string", + "process_maturity_band": "string", + "created_at": "string", + } + + account_id: str + company_name: str + industry: str + region: str + employee_band: str + estimated_revenue_band: str + process_maturity_band: str + created_at: str + + def to_dict(self) -> dict[str, Any]: + return {f.name: getattr(self, f.name) for f in fields(self)} + + @classmethod + def empty_dataframe(cls) -> pd.DataFrame: + return _empty_df(cls.DTYPE_MAP) + + +# --------------------------------------------------------------------------- +# contacts +# --------------------------------------------------------------------------- + + +@dataclass +class ContactRow: + """One row in the ``contacts`` table.""" + + TABLE_NAME: ClassVar[str] = "contacts" + DTYPE_MAP: ClassVar[dict[str, str]] = { + "contact_id": "string", + "account_id": "string", + "job_title": "string", + "role_function": "string", + "seniority": "string", + "buyer_role": "string", + "email_domain_type": "string", + "created_at": "string", + } + + contact_id: str + account_id: str + job_title: str + role_function: str + seniority: str + buyer_role: str + email_domain_type: str + created_at: str + + def to_dict(self) -> dict[str, Any]: + return {f.name: getattr(self, f.name) for f in fields(self)} + + @classmethod + def empty_dataframe(cls) -> pd.DataFrame: + return _empty_df(cls.DTYPE_MAP) + + +# --------------------------------------------------------------------------- +# leads +# --------------------------------------------------------------------------- + + +@dataclass +class LeadRow: + """One row in the ``leads`` table.""" + + TABLE_NAME: ClassVar[str] = "leads" + DTYPE_MAP: ClassVar[dict[str, str]] = { + "lead_id": "string", + "contact_id": "string", + "account_id": "string", + "lead_created_at": "string", + "lead_source": "string", + "first_touch_channel": "string", + "current_stage": "string", + "owner_rep_id": "string", + "is_mql": "boolean", + "is_sql": "boolean", + "converted_within_90_days": "boolean", + "conversion_timestamp": "string", + } + + lead_id: str + contact_id: str + account_id: str + lead_created_at: str + lead_source: str + first_touch_channel: str + current_stage: str + owner_rep_id: str + is_mql: bool + is_sql: bool + converted_within_90_days: bool + conversion_timestamp: str | None = None + + def to_dict(self) -> dict[str, Any]: + return {f.name: getattr(self, f.name) for f in fields(self)} + + @classmethod + def empty_dataframe(cls) -> pd.DataFrame: + return _empty_df(cls.DTYPE_MAP) + + +# --------------------------------------------------------------------------- +# touches +# --------------------------------------------------------------------------- + + +@dataclass +class TouchRow: + """One row in the ``touches`` table.""" + + TABLE_NAME: ClassVar[str] = "touches" + DTYPE_MAP: ClassVar[dict[str, str]] = { + "touch_id": "string", + "lead_id": "string", + "touch_timestamp": "string", + "touch_type": "string", + "touch_channel": "string", + "campaign_id": "string", + "touch_direction": "string", + } + + touch_id: str + lead_id: str + touch_timestamp: str + touch_type: str + touch_channel: str + touch_direction: str + campaign_id: str | None = None + + def to_dict(self) -> dict[str, Any]: + return {f.name: getattr(self, f.name) for f in fields(self)} + + @classmethod + def empty_dataframe(cls) -> pd.DataFrame: + return _empty_df(cls.DTYPE_MAP) + + +# --------------------------------------------------------------------------- +# sessions +# --------------------------------------------------------------------------- + + +@dataclass +class SessionRow: + """One row in the ``sessions`` table.""" + + TABLE_NAME: ClassVar[str] = "sessions" + DTYPE_MAP: ClassVar[dict[str, str]] = { + "session_id": "string", + "lead_id": "string", + "session_timestamp": "string", + "session_type": "string", + "page_views": "Int64", + "pricing_page_views": "Int64", + "demo_page_views": "Int64", + "session_duration_seconds": "Int64", + } + + session_id: str + lead_id: str + session_timestamp: str + session_type: str + page_views: int + pricing_page_views: int + demo_page_views: int + session_duration_seconds: int + + def to_dict(self) -> dict[str, Any]: + return {f.name: getattr(self, f.name) for f in fields(self)} + + @classmethod + def empty_dataframe(cls) -> pd.DataFrame: + return _empty_df(cls.DTYPE_MAP) + + +# --------------------------------------------------------------------------- +# sales_activities +# --------------------------------------------------------------------------- + + +@dataclass +class SalesActivityRow: + """One row in the ``sales_activities`` table.""" + + TABLE_NAME: ClassVar[str] = "sales_activities" + DTYPE_MAP: ClassVar[dict[str, str]] = { + "activity_id": "string", + "lead_id": "string", + "rep_id": "string", + "activity_timestamp": "string", + "activity_type": "string", + "activity_outcome": "string", + } + + activity_id: str + lead_id: str + rep_id: str + activity_timestamp: str + activity_type: str + activity_outcome: str + + def to_dict(self) -> dict[str, Any]: + return {f.name: getattr(self, f.name) for f in fields(self)} + + @classmethod + def empty_dataframe(cls) -> pd.DataFrame: + return _empty_df(cls.DTYPE_MAP) + + +# --------------------------------------------------------------------------- +# opportunities +# --------------------------------------------------------------------------- + + +@dataclass +class OpportunityRow: + """One row in the ``opportunities`` table.""" + + TABLE_NAME: ClassVar[str] = "opportunities" + DTYPE_MAP: ClassVar[dict[str, str]] = { + "opportunity_id": "string", + "lead_id": "string", + "created_at": "string", + "stage": "string", + "estimated_acv": "Int64", + "close_outcome": "string", + "closed_at": "string", + } + + opportunity_id: str + lead_id: str + created_at: str + stage: str + estimated_acv: int + close_outcome: str | None = None + closed_at: str | None = None + + def to_dict(self) -> dict[str, Any]: + return {f.name: getattr(self, f.name) for f in fields(self)} + + @classmethod + def empty_dataframe(cls) -> pd.DataFrame: + return _empty_df(cls.DTYPE_MAP) + + +# --------------------------------------------------------------------------- +# customers +# --------------------------------------------------------------------------- + + +@dataclass +class CustomerRow: + """One row in the ``customers`` table.""" + + TABLE_NAME: ClassVar[str] = "customers" + DTYPE_MAP: ClassVar[dict[str, str]] = { + "customer_id": "string", + "opportunity_id": "string", + "account_id": "string", + "customer_start_at": "string", + } + + customer_id: str + opportunity_id: str + account_id: str + customer_start_at: str + + def to_dict(self) -> dict[str, Any]: + return {f.name: getattr(self, f.name) for f in fields(self)} + + @classmethod + def empty_dataframe(cls) -> pd.DataFrame: + return _empty_df(cls.DTYPE_MAP) + + +# --------------------------------------------------------------------------- +# subscriptions +# --------------------------------------------------------------------------- + + +@dataclass +class SubscriptionRow: + """One row in the ``subscriptions`` table.""" + + TABLE_NAME: ClassVar[str] = "subscriptions" + DTYPE_MAP: ClassVar[dict[str, str]] = { + "subscription_id": "string", + "customer_id": "string", + "plan_name": "string", + "subscription_start_at": "string", + "subscription_status": "string", + } + + subscription_id: str + customer_id: str + plan_name: str + subscription_start_at: str + subscription_status: str + + def to_dict(self) -> dict[str, Any]: + return {f.name: getattr(self, f.name) for f in fields(self)} + + @classmethod + def empty_dataframe(cls) -> pd.DataFrame: + return _empty_df(cls.DTYPE_MAP) + + +# --------------------------------------------------------------------------- +# Registry +# --------------------------------------------------------------------------- + +ALL_ROW_TYPES: tuple[type, ...] = ( + AccountRow, + ContactRow, + LeadRow, + TouchRow, + SessionRow, + SalesActivityRow, + OpportunityRow, + CustomerRow, + SubscriptionRow, +) + +TABLE_NAMES: tuple[str, ...] = tuple(cls.TABLE_NAME for cls in ALL_ROW_TYPES) # type: ignore[attr-defined] diff --git a/leadforge/schema/features.py b/leadforge/schema/features.py new file mode 100644 index 0000000..fbf9900 --- /dev/null +++ b/leadforge/schema/features.py @@ -0,0 +1,211 @@ +"""Feature specification for the lead snapshot task table. + +:data:`LEAD_SNAPSHOT_FEATURES` is the canonical ordered list of features +present in the primary task export (``tasks/converted_within_90_days/``). +Every feature here is anchored at or before the snapshot date — no +post-anchor data is included (leakage rule, §4 of the architecture spec). +""" + +from __future__ import annotations + +from dataclasses import dataclass + + +@dataclass(frozen=True) +class FeatureSpec: + """Metadata for one column in the lead snapshot table. + + Attributes: + name: Column name as it appears in the Parquet file. + dtype: Pandas-compatible dtype string (``"string"``, ``"Int64"``, + ``"Float64"``, ``"boolean"``). + description: Human-readable explanation of what the column captures. + category: Logical grouping (``"account"``, ``"contact"``, + ``"lead_meta"``, ``"engagement"``, ``"sales"``, ``"target"``). + is_target: True for the label column only. + leakage_risk: True if the column could contain post-snapshot-anchor + information and must be excluded from student_public exports. + """ + + name: str + dtype: str + description: str + category: str + is_target: bool = False + leakage_risk: bool = False + + +# --------------------------------------------------------------------------- +# Canonical feature list — lead snapshot +# --------------------------------------------------------------------------- + +LEAD_SNAPSHOT_FEATURES: tuple[FeatureSpec, ...] = ( + # -- Account features -- + FeatureSpec("account_id", "string", "Opaque account identifier.", "account"), + FeatureSpec( + "industry", + "string", + "Industry vertical of the buying organization.", + "account", + ), + FeatureSpec( + "region", + "string", + "Geographic region of the account's headquarters.", + "account", + ), + FeatureSpec( + "employee_band", + "string", + "Banded employee headcount of the account.", + "account", + ), + FeatureSpec( + "estimated_revenue_band", + "string", + "Banded estimated annual revenue of the account.", + "account", + ), + FeatureSpec( + "process_maturity_band", + "string", + "Banded internal process maturity score (latent).", + "account", + leakage_risk=False, + ), + # -- Contact features -- + FeatureSpec("contact_id", "string", "Opaque contact identifier.", "contact"), + FeatureSpec( + "role_function", + "string", + "Functional area of the primary contact (e.g. finance, ops).", + "contact", + ), + FeatureSpec( + "seniority", + "string", + "Seniority band of the primary contact.", + "contact", + ), + FeatureSpec( + "buyer_role", + "string", + "Buyer role classification (economic_buyer, champion, etc.).", + "contact", + ), + # -- Lead metadata features -- + FeatureSpec("lead_id", "string", "Opaque lead identifier.", "lead_meta"), + FeatureSpec( + "lead_created_at", + "string", + "ISO-8601 timestamp when the lead was created.", + "lead_meta", + ), + FeatureSpec( + "lead_source", + "string", + "Origination source of the lead (e.g. inbound_form, sdr_outbound).", + "lead_meta", + ), + FeatureSpec( + "first_touch_channel", + "string", + "Marketing channel responsible for the first recorded touch.", + "lead_meta", + ), + FeatureSpec( + "current_stage", + "string", + "Funnel stage at snapshot anchor date.", + "lead_meta", + ), + FeatureSpec( + "is_mql", + "boolean", + "Whether the lead had achieved MQL status at snapshot date.", + "lead_meta", + ), + FeatureSpec( + "is_sql", + "boolean", + "Whether the lead had achieved SQL status at snapshot date.", + "lead_meta", + ), + # -- Engagement features -- + FeatureSpec( + "touch_count", + "Int64", + "Total number of marketing/sales touches recorded before snapshot.", + "engagement", + ), + FeatureSpec( + "inbound_touch_count", + "Int64", + "Number of inbound touches before snapshot.", + "engagement", + ), + FeatureSpec( + "outbound_touch_count", + "Int64", + "Number of outbound touches before snapshot.", + "engagement", + ), + FeatureSpec( + "session_count", + "Int64", + "Number of web/trial sessions recorded before snapshot.", + "engagement", + ), + FeatureSpec( + "pricing_page_views", + "Int64", + "Cumulative pricing page views across all sessions before snapshot.", + "engagement", + ), + FeatureSpec( + "demo_page_views", + "Int64", + "Cumulative demo page views across all sessions before snapshot.", + "engagement", + ), + FeatureSpec( + "total_session_duration_seconds", + "Int64", + "Sum of session durations (seconds) before snapshot.", + "engagement", + ), + # -- Sales activity features -- + FeatureSpec( + "activity_count", + "Int64", + "Number of sales activities logged before snapshot.", + "sales", + ), + FeatureSpec( + "days_since_last_touch", + "Float64", + "Days elapsed between most recent touch and snapshot anchor date.", + "sales", + ), + FeatureSpec( + "has_open_opportunity", + "boolean", + "Whether an open opportunity existed at snapshot date.", + "sales", + ), + FeatureSpec( + "opportunity_estimated_acv", + "Float64", + "Estimated ACV of the most recent open opportunity (NaN if none).", + "sales", + ), + # -- Target -- + FeatureSpec( + "converted_within_90_days", + "boolean", + "Label: True if a closed_won event occurred within 90 days of " + "the snapshot anchor date. Derived from simulated events.", + "target", + is_target=True, + ), +) diff --git a/leadforge/schema/relationships.py b/leadforge/schema/relationships.py new file mode 100644 index 0000000..eb7e7f1 --- /dev/null +++ b/leadforge/schema/relationships.py @@ -0,0 +1,67 @@ +"""Foreign-key relationship definitions and validation helpers. + +Describes the canonical FK graph for the v1 relational model and provides +:func:`validate_fk` to assert referential integrity on a collection of rows +before they are written to Parquet. +""" + +from __future__ import annotations + +from dataclasses import dataclass + +from leadforge.core.exceptions import LeadforgeError + + +class FKViolationError(LeadforgeError): + """Raised when a foreign-key constraint is violated in synthetic data.""" + + +@dataclass(frozen=True) +class FKConstraint: + """Describes one foreign-key relationship between two tables.""" + + child_table: str + child_column: str + parent_table: str + parent_column: str + + +# All v1 FK constraints, derived from §9.2 of the architecture spec. +ALL_CONSTRAINTS: tuple[FKConstraint, ...] = ( + FKConstraint("contacts", "account_id", "accounts", "account_id"), + FKConstraint("leads", "account_id", "accounts", "account_id"), + FKConstraint("leads", "contact_id", "contacts", "contact_id"), + FKConstraint("touches", "lead_id", "leads", "lead_id"), + FKConstraint("sessions", "lead_id", "leads", "lead_id"), + FKConstraint("sales_activities", "lead_id", "leads", "lead_id"), + FKConstraint("opportunities", "lead_id", "leads", "lead_id"), + FKConstraint("customers", "opportunity_id", "opportunities", "opportunity_id"), + FKConstraint("customers", "account_id", "accounts", "account_id"), + FKConstraint("subscriptions", "customer_id", "customers", "customer_id"), +) + + +def validate_fk( + child_values: list[str], + parent_values: set[str], + constraint: FKConstraint, +) -> None: + """Assert every value in *child_values* exists in *parent_values*. + + Args: + child_values: All FK column values from the child table. + parent_values: All PK values from the parent table. + constraint: The :class:`FKConstraint` being checked (used in the + error message). + + Raises: + FKViolationError: if any child value is absent from *parent_values*. + """ + orphans = [v for v in child_values if v not in parent_values] + if orphans: + sample = orphans[:5] + raise FKViolationError( + f"FK violation: {constraint.child_table}.{constraint.child_column} " + f"→ {constraint.parent_table}.{constraint.parent_column}: " + f"{len(orphans)} orphan(s), e.g. {sample}" + ) diff --git a/leadforge/schema/tables.py b/leadforge/schema/tables.py new file mode 100644 index 0000000..339eefd --- /dev/null +++ b/leadforge/schema/tables.py @@ -0,0 +1,35 @@ +"""Parquet serialization helpers for schema-conformant DataFrames. + +These utilities are used by the rendering layer (M7+) and by tests to verify +that empty tables can be round-tripped through Parquet without schema loss. +""" + +from __future__ import annotations + +from pathlib import Path + +import pandas as pd + + +def write_parquet(df: pd.DataFrame, path: Path) -> None: + """Write *df* to a Parquet file at *path*, creating parent directories. + + Args: + df: DataFrame to serialize. Should be created via an entity class's + ``empty_dataframe()`` or populated with real simulation rows. + path: Destination ``.parquet`` file path. + """ + path.parent.mkdir(parents=True, exist_ok=True) + df.to_parquet(path, index=False, engine="pyarrow") + + +def read_parquet(path: Path) -> pd.DataFrame: + """Read a Parquet file back into a DataFrame. + + Args: + path: Path to the ``.parquet`` file. + + Returns: + A ``pd.DataFrame`` with columns as stored in the file. + """ + return pd.read_parquet(path, engine="pyarrow") diff --git a/leadforge/schema/tasks.py b/leadforge/schema/tasks.py new file mode 100644 index 0000000..cf249ee --- /dev/null +++ b/leadforge/schema/tasks.py @@ -0,0 +1,94 @@ +"""Task manifest definition for the primary v1 classification task. + +A :class:`TaskManifest` describes everything needed to reconstruct the task +from the output bundle: the label column, the time window, the split ratios, +and the table it lives in. +""" + +from __future__ import annotations + +from dataclasses import dataclass + + +@dataclass(frozen=True) +class SplitSpec: + """Train / validation / test proportions for a task. + + Attributes: + train: Fraction of rows allocated to the training split. + valid: Fraction allocated to validation. + test: Fraction allocated to test. + + Raises: + ValueError: if the three fractions do not sum to 1.0 (within 1e-6). + """ + + train: float + valid: float + test: float + + def __post_init__(self) -> None: + total = self.train + self.valid + self.test + if abs(total - 1.0) > 1e-6: + raise ValueError(f"SplitSpec fractions must sum to 1.0, got {total:.6f}") + + +@dataclass(frozen=True) +class TaskManifest: + """Immutable descriptor for one ML task exported from a bundle. + + Attributes: + task_id: Machine-readable task identifier. + label_column: Column name in the task Parquet files that holds the + binary label. + label_window_days: Number of days after the snapshot anchor date + within which a conversion event counts as positive. + primary_table: The relational table the snapshot rows are derived + from (usually ``"leads"``). + split: Train/valid/test proportions. + task_type: ML task type string (``"binary_classification"`` for v1). + description: Human-readable description of the task. + """ + + task_id: str + label_column: str + label_window_days: int + primary_table: str + split: SplitSpec + task_type: str = "binary_classification" + description: str = "" + + def to_dict(self) -> dict[str, object]: + """Return a JSON-serializable representation.""" + return { + "task_id": self.task_id, + "task_type": self.task_type, + "label_column": self.label_column, + "label_window_days": self.label_window_days, + "primary_table": self.primary_table, + "split": { + "train": self.split.train, + "valid": self.split.valid, + "test": self.split.test, + }, + "description": self.description, + } + + +# --------------------------------------------------------------------------- +# v1 task definition +# --------------------------------------------------------------------------- + +CONVERTED_WITHIN_90_DAYS: TaskManifest = TaskManifest( + task_id="converted_within_90_days", + label_column="converted_within_90_days", + label_window_days=90, + primary_table="leads", + split=SplitSpec(train=0.7, valid=0.15, test=0.15), + task_type="binary_classification", + description=( + "Predict whether a lead converts (closed_won event) within 90 days " + "of the snapshot anchor date. Label is event-derived — never sampled " + "directly. All features are pre-anchor (leakage-free by construction)." + ), +) diff --git a/pyproject.toml b/pyproject.toml index 1a4d6b2..c8f29a0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,8 @@ classifiers = [ dependencies = [ "typer[all]>=0.12", "pyyaml>=6.0", + "pandas>=2.0", + "pyarrow>=14.0", ] [project.optional-dependencies] @@ -65,5 +67,9 @@ disallow_incomplete_defs = true check_untyped_defs = true no_implicit_optional = true +[[tool.mypy.overrides]] +module = ["pandas", "pandas.*", "pyarrow", "pyarrow.*"] +ignore_missing_imports = true + [tool.pytest.ini_options] testpaths = ["tests"] diff --git a/tests/schema/__init__.py b/tests/schema/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/schema/test_entities.py b/tests/schema/test_entities.py new file mode 100644 index 0000000..4cdcdb9 --- /dev/null +++ b/tests/schema/test_entities.py @@ -0,0 +1,176 @@ +"""Tests for leadforge.schema.entities — row contracts and empty DataFrames.""" + +from pathlib import Path + +import pandas as pd +import pytest + +from leadforge.schema.entities import ( + ALL_ROW_TYPES, + TABLE_NAMES, + AccountRow, + ContactRow, + LeadRow, + SessionRow, + TouchRow, +) +from leadforge.schema.tables import read_parquet, write_parquet + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_account() -> AccountRow: + return AccountRow( + account_id="acct_000001", + company_name="Acme Corp", + industry="manufacturing", + region="US-East", + employee_band="100-250", + estimated_revenue_band="10M-50M", + process_maturity_band="medium", + created_at="2024-01-15", + ) + + +def _make_contact() -> ContactRow: + return ContactRow( + contact_id="cnt_000001", + account_id="acct_000001", + job_title="VP Finance", + role_function="finance", + seniority="vp", + buyer_role="economic_buyer", + email_domain_type="corporate", + created_at="2024-01-20", + ) + + +def _make_lead() -> LeadRow: + return LeadRow( + lead_id="lead_000001", + contact_id="cnt_000001", + account_id="acct_000001", + lead_created_at="2024-02-01", + lead_source="inbound_form", + first_touch_channel="organic_search", + current_stage="mql", + owner_rep_id="rep_000001", + is_mql=True, + is_sql=False, + converted_within_90_days=False, + conversion_timestamp=None, + ) + + +# --------------------------------------------------------------------------- +# to_dict +# --------------------------------------------------------------------------- + + +def test_account_to_dict_contains_all_columns() -> None: + row = _make_account() + d = row.to_dict() + assert set(d.keys()) == set(AccountRow.DTYPE_MAP.keys()) + + +def test_lead_to_dict_nullable_is_none() -> None: + row = _make_lead() + assert row.to_dict()["conversion_timestamp"] is None + + +def test_touch_to_dict_nullable_campaign_id() -> None: + row = TouchRow( + touch_id="touch_000001", + lead_id="lead_000001", + touch_timestamp="2024-02-05T10:00:00", + touch_type="email", + touch_channel="outbound", + touch_direction="outbound", + campaign_id=None, + ) + assert row.to_dict()["campaign_id"] is None + + +# --------------------------------------------------------------------------- +# empty_dataframe — columns and dtypes +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize("cls", ALL_ROW_TYPES) +def test_empty_dataframe_has_correct_columns(cls: type) -> None: + df = cls.empty_dataframe() # type: ignore[attr-defined] + assert list(df.columns) == list(cls.DTYPE_MAP.keys()) # type: ignore[attr-defined] + + +@pytest.mark.parametrize("cls", ALL_ROW_TYPES) +def test_empty_dataframe_has_zero_rows(cls: type) -> None: + assert len(cls.empty_dataframe()) == 0 # type: ignore[attr-defined] + + +def test_lead_empty_dataframe_boolean_columns() -> None: + df = LeadRow.empty_dataframe() + assert str(df["is_mql"].dtype) == "boolean" + assert str(df["converted_within_90_days"].dtype) == "boolean" + + +def test_session_empty_dataframe_int_columns() -> None: + df = SessionRow.empty_dataframe() + assert str(df["page_views"].dtype) == "Int64" + + +# --------------------------------------------------------------------------- +# Parquet round-trip +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize("cls", ALL_ROW_TYPES) +def test_empty_dataframe_parquet_roundtrip(cls: type, tmp_path: Path) -> None: + df = cls.empty_dataframe() # type: ignore[attr-defined] + path = tmp_path / f"{cls.TABLE_NAME}.parquet" # type: ignore[attr-defined] + write_parquet(df, path) + restored = read_parquet(path) + assert list(restored.columns) == list(df.columns) + assert len(restored) == 0 + + +def test_lead_rows_parquet_roundtrip(tmp_path: Path) -> None: + lead = _make_lead() + df = pd.DataFrame([lead.to_dict()]) + # cast to declared dtypes + for col, dtype in LeadRow.DTYPE_MAP.items(): + df[col] = df[col].astype(dtype) + path = tmp_path / "leads.parquet" + write_parquet(df, path) + restored = read_parquet(path) + assert restored["lead_id"].iloc[0] == "lead_000001" + assert restored["is_mql"].iloc[0] is True or restored["is_mql"].iloc[0] + + +# --------------------------------------------------------------------------- +# Registry +# --------------------------------------------------------------------------- + + +def test_all_row_types_covers_nine_tables() -> None: + assert len(ALL_ROW_TYPES) == 9 + + +def test_table_names_unique() -> None: + assert len(set(TABLE_NAMES)) == len(TABLE_NAMES) + + +def test_table_names_expected_values() -> None: + expected = { + "accounts", + "contacts", + "leads", + "touches", + "sessions", + "sales_activities", + "opportunities", + "customers", + "subscriptions", + } + assert set(TABLE_NAMES) == expected diff --git a/tests/schema/test_features.py b/tests/schema/test_features.py new file mode 100644 index 0000000..303aaf8 --- /dev/null +++ b/tests/schema/test_features.py @@ -0,0 +1,116 @@ +"""Tests for leadforge.schema.features and leadforge.schema.dictionaries.""" + +import dataclasses +from pathlib import Path + +import pandas as pd +import pytest + +from leadforge.schema.dictionaries import feature_dictionary_df, write_feature_dictionary +from leadforge.schema.features import LEAD_SNAPSHOT_FEATURES, FeatureSpec + +# --------------------------------------------------------------------------- +# FeatureSpec +# --------------------------------------------------------------------------- + + +def test_feature_spec_is_frozen() -> None: + f = FeatureSpec("x", "string", "desc", "account") + with pytest.raises(dataclasses.FrozenInstanceError): + f.name = "y" # type: ignore[misc] + + +def test_lead_snapshot_features_non_empty() -> None: + assert len(LEAD_SNAPSHOT_FEATURES) > 0 + + +def test_exactly_one_target_feature() -> None: + targets = [f for f in LEAD_SNAPSHOT_FEATURES if f.is_target] + assert len(targets) == 1 + assert targets[0].name == "converted_within_90_days" + + +def test_target_is_last_feature() -> None: + assert LEAD_SNAPSHOT_FEATURES[-1].is_target + + +def test_all_feature_names_unique() -> None: + names = [f.name for f in LEAD_SNAPSHOT_FEATURES] + assert len(names) == len(set(names)) + + +def test_all_dtypes_are_valid_strings() -> None: + valid = {"string", "Int64", "Float64", "boolean"} + for f in LEAD_SNAPSHOT_FEATURES: + assert f.dtype in valid, f"{f.name} has unknown dtype {f.dtype!r}" + + +def test_all_categories_are_known() -> None: + valid = {"account", "contact", "lead_meta", "engagement", "sales", "target"} + for f in LEAD_SNAPSHOT_FEATURES: + assert f.category in valid, f"{f.name} has unknown category {f.category!r}" + + +def test_target_feature_category_is_target() -> None: + for f in LEAD_SNAPSHOT_FEATURES: + if f.is_target: + assert f.category == "target" + + +def test_no_leakage_risk_on_target() -> None: + for f in LEAD_SNAPSHOT_FEATURES: + if f.is_target: + assert not f.leakage_risk + + +# --------------------------------------------------------------------------- +# feature_dictionary_df +# --------------------------------------------------------------------------- + + +def test_feature_dictionary_df_returns_dataframe() -> None: + df = feature_dictionary_df() + assert isinstance(df, pd.DataFrame) + + +def test_feature_dictionary_df_row_count_matches_features() -> None: + df = feature_dictionary_df() + assert len(df) == len(LEAD_SNAPSHOT_FEATURES) + + +def test_feature_dictionary_df_columns() -> None: + df = feature_dictionary_df() + expected = {"name", "dtype", "description", "category", "is_target", "leakage_risk"} + assert set(df.columns) == expected + + +def test_feature_dictionary_df_target_row() -> None: + df = feature_dictionary_df() + target_rows = df[df["is_target"] == True] # noqa: E712 + assert len(target_rows) == 1 + assert target_rows.iloc[0]["name"] == "converted_within_90_days" + + +# --------------------------------------------------------------------------- +# write_feature_dictionary +# --------------------------------------------------------------------------- + + +def test_write_feature_dictionary_creates_file(tmp_path: Path) -> None: + out = tmp_path / "feature_dictionary.csv" + write_feature_dictionary(out) + assert out.exists() + + +def test_write_feature_dictionary_csv_readable(tmp_path: Path) -> None: + out = tmp_path / "feature_dictionary.csv" + write_feature_dictionary(out) + df = pd.read_csv(out) + assert len(df) == len(LEAD_SNAPSHOT_FEATURES) + assert "name" in df.columns + + +def test_write_feature_dictionary_creates_parent_dirs(tmp_path: Path) -> None: + out = tmp_path / "deep" / "nested" / "feature_dictionary.csv" + write_feature_dictionary(out) + assert out.exists() diff --git a/tests/schema/test_ids.py b/tests/schema/test_ids.py new file mode 100644 index 0000000..3693d39 --- /dev/null +++ b/tests/schema/test_ids.py @@ -0,0 +1,63 @@ +"""Tests for leadforge.core.ids.""" + +import pytest + +from leadforge.core.ids import ID_PREFIXES, make_id + + +def test_make_id_format() -> None: + assert make_id("acct", 1) == "acct_000001" + assert make_id("lead", 999) == "lead_000999" + assert make_id("acct", 1_000_000) == "acct_1000000" + + +def test_make_id_zero_padded_to_six_digits() -> None: + result = make_id("cnt", 42) + assert result == "cnt_000042" + assert len(result.split("_")[1]) >= 6 + + +def test_make_id_deterministic() -> None: + assert make_id("opp", 7) == make_id("opp", 7) + + +def test_make_id_rejects_zero() -> None: + with pytest.raises(ValueError, match="positive int"): + make_id("acct", 0) + + +def test_make_id_rejects_negative() -> None: + with pytest.raises(ValueError, match="positive int"): + make_id("acct", -1) + + +def test_make_id_rejects_bool() -> None: + with pytest.raises(ValueError, match="positive int"): + make_id("acct", True) # type: ignore[arg-type] + + +def test_make_id_rejects_float() -> None: + with pytest.raises((ValueError, TypeError)): + make_id("acct", 1.0) # type: ignore[arg-type] + + +def test_id_prefixes_covers_all_entities() -> None: + expected = { + "account", + "contact", + "lead", + "touch", + "session", + "sales_activity", + "opportunity", + "customer", + "subscription", + "rep", + } + assert set(ID_PREFIXES.keys()) == expected + + +def test_id_prefixes_values_are_strings() -> None: + for key, prefix in ID_PREFIXES.items(): + assert isinstance(prefix, str), f"{key} prefix is not a string" + assert len(prefix) > 0 diff --git a/tests/schema/test_relationships.py b/tests/schema/test_relationships.py new file mode 100644 index 0000000..8e8118f --- /dev/null +++ b/tests/schema/test_relationships.py @@ -0,0 +1,59 @@ +"""Tests for leadforge.schema.relationships — FK constraints and validation.""" + +import dataclasses + +import pytest + +from leadforge.schema.relationships import ( + ALL_CONSTRAINTS, + FKConstraint, + FKViolationError, + validate_fk, +) + + +def test_all_constraints_count() -> None: + assert len(ALL_CONSTRAINTS) == 10 + + +def test_all_constraints_are_fk_constraint_instances() -> None: + for c in ALL_CONSTRAINTS: + assert isinstance(c, FKConstraint) + + +def test_contacts_has_account_fk() -> None: + c = next(c for c in ALL_CONSTRAINTS if c.child_table == "contacts") + assert c.parent_table == "accounts" + assert c.parent_column == "account_id" + + +def test_validate_fk_passes_when_all_present() -> None: + constraint = FKConstraint("contacts", "account_id", "accounts", "account_id") + parent_ids = {"acct_000001", "acct_000002"} + child_values = ["acct_000001", "acct_000002", "acct_000001"] + validate_fk(child_values, parent_ids, constraint) # should not raise + + +def test_validate_fk_raises_on_orphan() -> None: + constraint = FKConstraint("leads", "account_id", "accounts", "account_id") + parent_ids = {"acct_000001"} + child_values = ["acct_000001", "acct_MISSING"] + with pytest.raises(FKViolationError, match="orphan"): + validate_fk(child_values, parent_ids, constraint) + + +def test_validate_fk_error_message_contains_table_names() -> None: + constraint = FKConstraint("touches", "lead_id", "leads", "lead_id") + with pytest.raises(FKViolationError, match="touches"): + validate_fk(["lead_MISSING"], set(), constraint) + + +def test_validate_fk_empty_child_passes() -> None: + constraint = FKConstraint("sessions", "lead_id", "leads", "lead_id") + validate_fk([], {"lead_000001"}, constraint) + + +def test_fk_constraint_is_frozen() -> None: + c = FKConstraint("a", "b", "c", "d") + with pytest.raises(dataclasses.FrozenInstanceError): + c.child_table = "x" # type: ignore[misc] diff --git a/tests/schema/test_tasks.py b/tests/schema/test_tasks.py new file mode 100644 index 0000000..3e9de2c --- /dev/null +++ b/tests/schema/test_tasks.py @@ -0,0 +1,89 @@ +"""Tests for leadforge.schema.tasks — TaskManifest and SplitSpec.""" + +import dataclasses + +import pytest + +from leadforge.schema.tasks import CONVERTED_WITHIN_90_DAYS, SplitSpec + +# --------------------------------------------------------------------------- +# SplitSpec +# --------------------------------------------------------------------------- + + +def test_split_spec_valid() -> None: + s = SplitSpec(train=0.7, valid=0.15, test=0.15) + assert s.train == pytest.approx(0.7) + + +def test_split_spec_rejects_bad_sum() -> None: + with pytest.raises(ValueError, match="sum"): + SplitSpec(train=0.6, valid=0.2, test=0.1) + + +def test_split_spec_frozen() -> None: + s = SplitSpec(0.7, 0.15, 0.15) + with pytest.raises(dataclasses.FrozenInstanceError): + s.train = 0.5 # type: ignore[misc] + + +# --------------------------------------------------------------------------- +# TaskManifest +# --------------------------------------------------------------------------- + + +def test_task_manifest_frozen() -> None: + with pytest.raises(dataclasses.FrozenInstanceError): + CONVERTED_WITHIN_90_DAYS.task_id = "other" # type: ignore[misc] + + +def test_converted_within_90_days_id() -> None: + assert CONVERTED_WITHIN_90_DAYS.task_id == "converted_within_90_days" + + +def test_converted_within_90_days_label_column() -> None: + assert CONVERTED_WITHIN_90_DAYS.label_column == "converted_within_90_days" + + +def test_converted_within_90_days_window() -> None: + assert CONVERTED_WITHIN_90_DAYS.label_window_days == 90 + + +def test_converted_within_90_days_task_type() -> None: + assert CONVERTED_WITHIN_90_DAYS.task_type == "binary_classification" + + +def test_converted_within_90_days_primary_table() -> None: + assert CONVERTED_WITHIN_90_DAYS.primary_table == "leads" + + +def test_converted_within_90_days_split_sums_to_one() -> None: + s = CONVERTED_WITHIN_90_DAYS.split + assert s.train + s.valid + s.test == pytest.approx(1.0) + + +def test_task_manifest_to_dict_keys() -> None: + d = CONVERTED_WITHIN_90_DAYS.to_dict() + expected = { + "task_id", + "task_type", + "label_column", + "label_window_days", + "primary_table", + "split", + "description", + } + assert set(d.keys()) == expected + + +def test_task_manifest_to_dict_split_is_dict() -> None: + d = CONVERTED_WITHIN_90_DAYS.to_dict() + assert isinstance(d["split"], dict) + assert set(d["split"].keys()) == {"train", "valid", "test"} + + +def test_task_manifest_to_dict_is_json_serializable() -> None: + import json + + d = CONVERTED_WITHIN_90_DAYS.to_dict() + json.dumps(d) # should not raise From e72c3cc22d4647f7fddc6618a52b97a42a59aefb Mon Sep 17 00:00:00 2001 From: Shay Palachy Date: Tue, 21 Apr 2026 14:46:26 +0300 Subject: [PATCH 2/8] fix: address Copilot review comments on Milestone 3 PR MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - narrative/spec.py, core/hashing.py: revert UP038 unsafe fix — replace all isinstance(x, A | B) with isinstance(x, (A, B)) + noqa: UP038 (Python's isinstance does not accept PEP 604 union types at runtime; the ruff auto-fix was incorrect) (COPILOT-1, 2, 4, 7, 8, 9, 10, 11) - schema/tasks.py: SplitSpec.__post_init__ now validates each fraction is finite and in [0, 1] before checking the sum, preventing manifests with values like (1.2, -0.1, -0.1) that would sum to 1 but are invalid (COPILOT-3) - schema/entities.py: TouchRow.DTYPE_MAP column order aligned to match dataclass field order (touch_direction before campaign_id) (COPILOT-5) - schema/dictionaries.py: feature_dictionary_df() now casts string columns to pd.StringDtype; docstring corrected to reflect actual dtypes (COPILOT-6) - tests/schema/test_tasks.py: two new tests covering per-component SplitSpec rejection (negative and >1 values) Co-Authored-By: Claude Sonnet 4.6 --- leadforge/core/hashing.py | 2 +- leadforge/narrative/spec.py | 18 +++++++++--------- leadforge/schema/dictionaries.py | 8 ++++++-- leadforge/schema/entities.py | 2 +- leadforge/schema/tasks.py | 7 +++++++ tests/schema/test_tasks.py | 10 ++++++++++ 6 files changed, 34 insertions(+), 13 deletions(-) diff --git a/leadforge/core/hashing.py b/leadforge/core/hashing.py index 2bdbcc7..680a782 100644 --- a/leadforge/core/hashing.py +++ b/leadforge/core/hashing.py @@ -18,7 +18,7 @@ def _canonical(obj: Any) -> Any: """Recursively convert to a JSON-stable form (sorted keys, enums → str).""" if isinstance(obj, dict): return {k: _canonical(v) for k, v in sorted(obj.items())} - if isinstance(obj, list | tuple): + if isinstance(obj, (list, tuple)): # noqa: UP038 return [_canonical(v) for v in obj] # StrEnum values are already strings; this handles plain Enum too if hasattr(obj, "value"): diff --git a/leadforge/narrative/spec.py b/leadforge/narrative/spec.py index bae176d..9425959 100644 --- a/leadforge/narrative/spec.py +++ b/leadforge/narrative/spec.py @@ -37,7 +37,7 @@ def from_dict(cls, data: dict[str, Any]) -> CompanySpec: ) er = data["employee_range"] if not ( - isinstance(er, list | tuple) + isinstance(er, (list, tuple)) # noqa: UP038 and len(er) == 2 and all(isinstance(v, int) and not isinstance(v, bool) for v in er) ): @@ -85,7 +85,7 @@ def from_dict(cls, data: dict[str, Any]) -> ProductSpec: ) acv = data["acv_range_usd"] if not ( - isinstance(acv, list | tuple) + isinstance(acv, (list, tuple)) # noqa: UP038 and len(acv) == 2 and all(isinstance(v, int) and not isinstance(v, bool) for v in acv) ): @@ -93,7 +93,7 @@ def from_dict(cls, data: dict[str, Any]) -> ProductSpec: f"product.acv_range_usd must be a [min, max] int pair, got {acv!r}" ) terms = data["contract_terms_months"] - if not isinstance(terms, list | tuple) or not all( + if not isinstance(terms, (list, tuple)) or not all( # noqa: UP038 isinstance(v, int) and not isinstance(v, bool) for v in terms ): raise InvalidRecipeError( @@ -141,7 +141,7 @@ def from_dict(cls, data: dict[str, Any]) -> MarketSpec: ) er = data["icp_employee_range"] if not ( - isinstance(er, list | tuple) + isinstance(er, (list, tuple)) # noqa: UP038 and len(er) == 2 and all(isinstance(v, int) and not isinstance(v, bool) for v in er) ): @@ -149,7 +149,7 @@ def from_dict(cls, data: dict[str, Any]) -> MarketSpec: f"market.icp_employee_range must be a [min, max] int pair, got {er!r}" ) industries = data["icp_industries"] - if not isinstance(industries, list | tuple): + if not isinstance(industries, (list, tuple)): # noqa: UP038 raise InvalidRecipeError( f"market.icp_industries must be a list of strings, got {industries!r}" ) @@ -158,7 +158,7 @@ def from_dict(cls, data: dict[str, Any]) -> MarketSpec: f"market.icp_industries must contain only strings, got {industries!r}" ) geographies = data["geographies"] - if not isinstance(geographies, list | tuple): + if not isinstance(geographies, (list, tuple)): # noqa: UP038 raise InvalidRecipeError( f"market.geographies must be a list of strings, got {geographies!r}" ) @@ -194,13 +194,13 @@ def from_dict(cls, data: dict[str, Any]) -> GtmMotionSpec: "gtm_motion", ) channels = data["channels"] - if not isinstance(channels, list | tuple) or not all(isinstance(c, str) for c in channels): + if not isinstance(channels, (list, tuple)) or not all(isinstance(c, str) for c in channels): # noqa: UP038 raise InvalidRecipeError( f"gtm_motion.channels must be a list of strings, got {channels!r}" ) for share_name in ("inbound_share", "outbound_share", "partner_share"): v = data[share_name] - if isinstance(v, bool) or not isinstance(v, int | float): + if isinstance(v, bool) or not isinstance(v, (int, float)): # noqa: UP038 raise InvalidRecipeError( f"gtm_motion.{share_name} must be a float in [0, 1], got {type(v).__name__!r}" ) @@ -232,7 +232,7 @@ def from_dict(cls, data: dict[str, Any]) -> PersonaSpec: ) title_variants = data["title_variants"] if not ( - isinstance(title_variants, list | tuple) + isinstance(title_variants, (list, tuple)) # noqa: UP038 and all(isinstance(t, str) for t in title_variants) ): raise InvalidRecipeError( diff --git a/leadforge/schema/dictionaries.py b/leadforge/schema/dictionaries.py index 39946e9..e997af6 100644 --- a/leadforge/schema/dictionaries.py +++ b/leadforge/schema/dictionaries.py @@ -32,8 +32,10 @@ def feature_dictionary_df( objects. Defaults to the canonical lead snapshot feature list. Returns: - A ``pd.DataFrame`` with one row per feature, string columns for - categorical fields and boolean columns for flag fields. + A ``pd.DataFrame`` with one row per feature. String columns + (``name``, ``dtype``, ``description``, ``category``) use + ``pd.StringDtype``; flag columns (``is_target``, ``leakage_risk``) + use ``pd.BooleanDtype``. """ rows = [ { @@ -47,6 +49,8 @@ def feature_dictionary_df( for f in features ] df = pd.DataFrame(rows, columns=list(_COLUMNS)) + for col in ("name", "dtype", "description", "category"): + df[col] = df[col].astype("string") df["is_target"] = df["is_target"].astype("boolean") df["leakage_risk"] = df["leakage_risk"].astype("boolean") return df diff --git a/leadforge/schema/entities.py b/leadforge/schema/entities.py index df25b4c..2a60a7c 100644 --- a/leadforge/schema/entities.py +++ b/leadforge/schema/entities.py @@ -166,8 +166,8 @@ class TouchRow: "touch_timestamp": "string", "touch_type": "string", "touch_channel": "string", - "campaign_id": "string", "touch_direction": "string", + "campaign_id": "string", } touch_id: str diff --git a/leadforge/schema/tasks.py b/leadforge/schema/tasks.py index cf249ee..31d2f68 100644 --- a/leadforge/schema/tasks.py +++ b/leadforge/schema/tasks.py @@ -28,6 +28,13 @@ class SplitSpec: test: float def __post_init__(self) -> None: + import math + + for name, value in (("train", self.train), ("valid", self.valid), ("test", self.test)): + if not isinstance(value, (int, float)) or math.isnan(value) or math.isinf(value): # noqa: UP038 + raise ValueError(f"SplitSpec.{name} must be a finite number, got {value!r}") + if not (0.0 <= value <= 1.0): + raise ValueError(f"SplitSpec.{name} must be in [0, 1], got {value}") total = self.train + self.valid + self.test if abs(total - 1.0) > 1e-6: raise ValueError(f"SplitSpec fractions must sum to 1.0, got {total:.6f}") diff --git a/tests/schema/test_tasks.py b/tests/schema/test_tasks.py index 3e9de2c..ec91c60 100644 --- a/tests/schema/test_tasks.py +++ b/tests/schema/test_tasks.py @@ -21,6 +21,16 @@ def test_split_spec_rejects_bad_sum() -> None: SplitSpec(train=0.6, valid=0.2, test=0.1) +def test_split_spec_rejects_negative_component() -> None: + with pytest.raises(ValueError, match=r"\[0, 1\]"): + SplitSpec(train=1.2, valid=-0.1, test=-0.1) + + +def test_split_spec_rejects_component_above_one() -> None: + with pytest.raises(ValueError, match=r"\[0, 1\]"): + SplitSpec(train=1.1, valid=0.0, test=0.0) + + def test_split_spec_frozen() -> None: s = SplitSpec(0.7, 0.15, 0.15) with pytest.raises(dataclasses.FrozenInstanceError): From bccdf343b4a056ab8c886d193e9d5f017d7b41a9 Mon Sep 17 00:00:00 2001 From: Shay Palachy Date: Tue, 21 Apr 2026 17:46:34 +0300 Subject: [PATCH 3/8] ci: upgrade pr-agent-context refresh to v4.0.19 approval-gated fallback pattern Hardens the refresh flow so bot-authored / approval-gated review events (e.g. from Copilot) do not leave pr-agent-context refresh stuck waiting for a manual approval or re-run. Changes to pr-agent-context-refresh.yml: - Add workflow_dispatch trigger with three required inputs: pull_request_number, pull_request_head_sha, pull_request_base_sha (needed by dispatcher-initiated runs that carry no PR event payload) - Pass all three inputs as explicit PR context overrides to the reusable workflow call (empty string when event-triggered, so upstream default resolution applies) - SHA-aware concurrency group: workflow_dispatch runs key on "{pr_number}-{head_sha}", so different-SHA dispatches for the same PR run rather than cancel, while duplicate same-SHA dispatches are deduplicated - Extend job `if` guard to always pass for workflow_dispatch New pr-agent-context-refresh-dispatcher.yml: - Triggers on schedule (every 15 min, Mon-Fri 07:00-23:00 UTC) - For each open same-repo PR with recent review activity (last 20 min): - Bounded lookup: only checks the 10 most recent reviews/comments - In-flight dedupe: skips if any run for that head SHA is queued, in_progress, waiting, or completed within the last 10 min - Per-PR error isolation: failures are caught and logged; other PRs are still processed - Uses correct actions/github-script@v7 Octokit method names (github.rest.pulls.*, github.rest.actions.*) - Dispatches to the repo default branch so the workflow file is always resolvable Co-Authored-By: Claude Sonnet 4.6 --- .../pr-agent-context-refresh-dispatcher.yml | 124 ++++++++++++++++++ .../workflows/pr-agent-context-refresh.yml | 22 ++++ 2 files changed, 146 insertions(+) create mode 100644 .github/workflows/pr-agent-context-refresh-dispatcher.yml diff --git a/.github/workflows/pr-agent-context-refresh-dispatcher.yml b/.github/workflows/pr-agent-context-refresh-dispatcher.yml new file mode 100644 index 0000000..694ccf2 --- /dev/null +++ b/.github/workflows/pr-agent-context-refresh-dispatcher.yml @@ -0,0 +1,124 @@ +name: PR agent context refresh dispatcher + +# Runs on a schedule and dispatches pr-agent-context-refresh for any open +# same-repo PR that had recent review activity but no corresponding in-flight +# or recently-completed refresh run. +# +# This is the approval-gated fallback: when a bot (e.g. Copilot) submits a +# review, the pull_request_review event may trigger a run that is held for +# approval, or may be skipped entirely in certain org configurations. This +# dispatcher ensures those PRs are not left stale indefinitely. + +on: + schedule: + # Every 15 minutes during business hours (UTC). + # Adjust or extend to '*/15 * * * *' if 24/7 coverage is needed. + - cron: '*/15 7-23 * * 1-5' + +permissions: + actions: write + pull-requests: read + +jobs: + dispatch: + name: Dispatch stalled PR agent context refreshes + runs-on: ubuntu-latest + steps: + - name: Find and dispatch pending refreshes + uses: actions/github-script@v7 + with: + script: | + const REFRESH_WORKFLOW = 'pr-agent-context-refresh.yml'; + // Only look at activity in the last N minutes to avoid spurious dispatches. + const LOOKBACK_MINUTES = 20; + // Skip dispatch if a run completed within the last N minutes. + const RECENT_COMPLETED_WINDOW_MINUTES = 10; + + const now = Date.now(); + const since = new Date(now - LOOKBACK_MINUTES * 60 * 1000).toISOString(); + const recentCompletedSince = new Date( + now - RECENT_COMPLETED_WINDOW_MINUTES * 60 * 1000 + ).toISOString(); + + const defaultBranch = context.payload.repository.default_branch; + + // List open PRs in this repository (same-repo only). + const { data: pulls } = await github.rest.pulls.list({ + ...context.repo, + state: 'open', + per_page: 50, + }); + + for (const pr of pulls) { + // Same-repo guard: skip forks. + if (pr.head.repo.full_name !== context.payload.repository.full_name) { + continue; + } + + try { + // --- Bounded recent activity check --- + const [{ data: reviews }, { data: reviewComments }] = await Promise.all([ + github.rest.pulls.listReviews({ + ...context.repo, + pull_number: pr.number, + per_page: 10, + }), + github.rest.pulls.listReviewComments({ + ...context.repo, + pull_number: pr.number, + per_page: 10, + }), + ]); + + const hasRecentActivity = + reviews.some((r) => r.submitted_at >= since) || + reviewComments.some( + (c) => c.created_at >= since || c.updated_at >= since + ); + + if (!hasRecentActivity) continue; + + // --- In-flight / recent-completion dedupe --- + const { data: { workflow_runs: runs } } = + await github.rest.actions.listWorkflowRuns({ + ...context.repo, + workflow_id: REFRESH_WORKFLOW, + head_sha: pr.head.sha, + per_page: 10, + }); + + const isRunningOrRecent = runs.some( + (r) => + r.status === 'in_progress' || + r.status === 'queued' || + r.status === 'waiting' || + (r.status === 'completed' && r.updated_at >= recentCompletedSince) + ); + + if (isRunningOrRecent) { + console.log( + `PR #${pr.number}: refresh already running or recently completed — skipping.` + ); + continue; + } + + // --- Dispatch --- + await github.rest.actions.createWorkflowDispatch({ + ...context.repo, + workflow_id: REFRESH_WORKFLOW, + ref: defaultBranch, + inputs: { + pull_request_number: String(pr.number), + pull_request_head_sha: pr.head.sha, + pull_request_base_sha: pr.base.sha, + }, + }); + + console.log( + `Dispatched refresh for PR #${pr.number} (head: ${pr.head.sha}).` + ); + } catch (err) { + // Per-PR error isolation: log and continue to the next PR. + console.error(`PR #${pr.number}: dispatch failed — ${err.message}`); + } + } diff --git a/.github/workflows/pr-agent-context-refresh.yml b/.github/workflows/pr-agent-context-refresh.yml index e2e3e88..bfcaa25 100644 --- a/.github/workflows/pr-agent-context-refresh.yml +++ b/.github/workflows/pr-agent-context-refresh.yml @@ -7,10 +7,28 @@ on: types: [created, edited, deleted] check_run: types: [completed] + workflow_dispatch: + inputs: + pull_request_number: + description: PR number to refresh + required: true + type: string + pull_request_head_sha: + description: Head SHA of the PR + required: true + type: string + pull_request_base_sha: + description: Base SHA of the PR + required: true + type: string +# SHA-aware concurrency: workflow_dispatch runs key on PR+SHA so same-PR/different-SHA +# dispatches are not cancelled, but duplicate dispatches for the same PR+SHA are. concurrency: group: >- pr-agent-context-refresh-${{ + (github.event_name == 'workflow_dispatch' && + format('{0}-{1}', inputs.pull_request_number, inputs.pull_request_head_sha)) || github.event.pull_request.number || github.event.check_run.pull_requests[0].number || github.event.check_run.head_sha || @@ -27,6 +45,7 @@ jobs: pr-agent-context-refresh: name: PR agent context refresh if: >- + github.event_name == 'workflow_dispatch' || (github.event_name == 'pull_request_review' && github.event.pull_request.head.repo.full_name == github.repository) || (github.event_name == 'pull_request_review_comment' && @@ -52,3 +71,6 @@ jobs: wait_for_reviews_to_settle: true publish_all_clear_comments_in_refresh: false debug_artifacts: true + pull_request_number: ${{ inputs.pull_request_number || '' }} + pull_request_head_sha: ${{ inputs.pull_request_head_sha || '' }} + pull_request_base_sha: ${{ inputs.pull_request_base_sha || '' }} From e23aea49cf7276e997bf9f1410aa662628c9c8af Mon Sep 17 00:00:00 2001 From: Shay Palachy Date: Tue, 21 Apr 2026 17:50:18 +0300 Subject: [PATCH 4/8] fix: address round-2 Copilot review comments on Milestone 3 PR - schema/tasks.py: SplitSpec.__post_init__ now explicitly rejects bool values before the numeric checks (bool is an int subclass, so True/False previously passed the isinstance guard and range check silently) (COPILOT-1); added test_split_spec_rejects_bool_component - schema/dictionaries.py: remove empty `if TYPE_CHECKING: pass` block (COPILOT-2) - core/ids.py: clarify in module docstring that the 9 table-backed prefixes correspond to relational entities, and that rep_ is an internal-only namespace with no standalone output table (COPILOT-3) Co-Authored-By: Claude Sonnet 4.6 --- leadforge/core/ids.py | 8 +++++++- leadforge/schema/dictionaries.py | 4 ---- leadforge/schema/tasks.py | 7 ++++++- tests/schema/test_tasks.py | 5 +++++ 4 files changed, 18 insertions(+), 6 deletions(-) diff --git a/leadforge/core/ids.py b/leadforge/core/ids.py index fdb2af8..708358b 100644 --- a/leadforge/core/ids.py +++ b/leadforge/core/ids.py @@ -7,6 +7,9 @@ Canonical prefixes ------------------ +The following nine prefixes correspond directly to the nine relational tables +defined in ``schema/entities.py``: + acct_ — Account cnt_ — Contact lead_ — Lead @@ -16,7 +19,10 @@ opp_ — Opportunity cust_ — Customer sub_ — Subscription -rep_ — Sales rep (internal) + +The ``rep_`` prefix is an internal-only namespace used for sales-rep entities +that participate in simulation mechanics but do **not** have a corresponding +standalone relational table in the v1 output bundle. """ from __future__ import annotations diff --git a/leadforge/schema/dictionaries.py b/leadforge/schema/dictionaries.py index e997af6..f97cb90 100644 --- a/leadforge/schema/dictionaries.py +++ b/leadforge/schema/dictionaries.py @@ -8,15 +8,11 @@ from __future__ import annotations from pathlib import Path -from typing import TYPE_CHECKING import pandas as pd from leadforge.schema.features import LEAD_SNAPSHOT_FEATURES, FeatureSpec -if TYPE_CHECKING: - pass - _COLUMNS = ("name", "dtype", "description", "category", "is_target", "leakage_risk") diff --git a/leadforge/schema/tasks.py b/leadforge/schema/tasks.py index 31d2f68..c93b155 100644 --- a/leadforge/schema/tasks.py +++ b/leadforge/schema/tasks.py @@ -31,7 +31,12 @@ def __post_init__(self) -> None: import math for name, value in (("train", self.train), ("valid", self.valid), ("test", self.test)): - if not isinstance(value, (int, float)) or math.isnan(value) or math.isinf(value): # noqa: UP038 + if ( + isinstance(value, bool) + or not isinstance(value, (int, float)) # noqa: UP038 + or math.isnan(value) + or math.isinf(value) + ): raise ValueError(f"SplitSpec.{name} must be a finite number, got {value!r}") if not (0.0 <= value <= 1.0): raise ValueError(f"SplitSpec.{name} must be in [0, 1], got {value}") diff --git a/tests/schema/test_tasks.py b/tests/schema/test_tasks.py index ec91c60..4039a6b 100644 --- a/tests/schema/test_tasks.py +++ b/tests/schema/test_tasks.py @@ -16,6 +16,11 @@ def test_split_spec_valid() -> None: assert s.train == pytest.approx(0.7) +def test_split_spec_rejects_bool_component() -> None: + with pytest.raises(ValueError, match="finite number"): + SplitSpec(train=True, valid=0.15, test=0.15) # type: ignore[arg-type] + + def test_split_spec_rejects_bad_sum() -> None: with pytest.raises(ValueError, match="sum"): SplitSpec(train=0.6, valid=0.2, test=0.1) From e52d5cb80a5ac4b7ff416706a4a091fc7168e299 Mon Sep 17 00:00:00 2001 From: Shay Palachy Date: Tue, 21 Apr 2026 18:36:12 +0300 Subject: [PATCH 5/8] =?UTF-8?q?fix(ci):=20harden=20dispatcher=20dedupe=20?= =?UTF-8?q?=E2=80=94=20blocked=20runs=20must=20not=20suppress=20fallback?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Root cause of the stale refresh comment after Copilot's review on PR #7: 1. The dispatcher workflow only lives on this PR branch; scheduled workflows only run from the default branch, so it had never fired. This resolves the moment the PR merges. 2. The dedupe logic in the dispatcher was too broad: (r.status === 'completed' && r.updated_at >= recentCompletedSince) This treated ANY recently-completed run as valid coverage, including runs with conclusion=startup_failure (bot-triggered run failed to start) and conclusion=action_required (run was approval-gated and auto-cancelled). Both conclusions mean the run was blocked and never produced a refresh comment — yet they suppressed the fallback, leaving the PR stale. Additionally, runs with status=action_required (actively waiting for approval) were erroneously counted as "waiting" coverage. Fix applied to the dispatcher: - Extract BLOCKED_CONCLUSIONS set: startup_failure, action_required, failure, cancelled, timed_out, skipped — none of these suppress dispatch. - A completed run only counts as coverage when its conclusion is 'success' or 'neutral' AND it completed within the recent-success window. - status=action_required (approval-gated, will never run) explicitly returns false — not treated as active coverage. - Active statuses (in_progress, queued, waiting, requested) still suppress dispatch — those will actually produce a refresh. - Schedule changed from business-hours-only to '*/15 * * * *' so bot reviews arriving outside Mon–Fri 07-23 UTC are also caught. - Comments in the YAML explain the dedupe contract explicitly. Verification against the failing scenario (SHA e23aea4): - run 24729455592 (startup_failure): BLOCKED_CONCLUSIONS → not coverage → dispatcher would redispatch ✓ - pull_request_review_comment runs (action_required): BLOCKED_CONCLUSIONS → not coverage → dispatcher would redispatch ✓ - A currently in_progress dispatch run: active → suppress ✓ - A recently succeeded dispatch run: success + within window → suppress ✓ Co-Authored-By: Claude Sonnet 4.6 --- .../pr-agent-context-refresh-dispatcher.yml | 97 +++++++++++++++---- 1 file changed, 77 insertions(+), 20 deletions(-) diff --git a/.github/workflows/pr-agent-context-refresh-dispatcher.yml b/.github/workflows/pr-agent-context-refresh-dispatcher.yml index 694ccf2..41679a3 100644 --- a/.github/workflows/pr-agent-context-refresh-dispatcher.yml +++ b/.github/workflows/pr-agent-context-refresh-dispatcher.yml @@ -2,18 +2,41 @@ name: PR agent context refresh dispatcher # Runs on a schedule and dispatches pr-agent-context-refresh for any open # same-repo PR that had recent review activity but no corresponding in-flight -# or recently-completed refresh run. +# or recently-succeeded refresh run. # -# This is the approval-gated fallback: when a bot (e.g. Copilot) submits a -# review, the pull_request_review event may trigger a run that is held for -# approval, or may be skipped entirely in certain org configurations. This -# dispatcher ensures those PRs are not left stale indefinitely. +# WHY THIS EXISTS +# --------------- +# When a bot (e.g. Copilot / copilot-pull-request-reviewer[bot]) submits a +# review, the pull_request_review / pull_request_review_comment events fire +# and trigger pr-agent-context-refresh — but the triggered run is immediately +# blocked by GitHub's approval gate for bot/external actors: +# +# • status=action_required → waiting for maintainer approval (never runs) +# • conclusion=startup_failure → workflow could not start (counts as blocked) +# • conclusion=action_required → was approval-gated, later auto-cancelled +# +# None of those outcomes produce a refresh comment. This dispatcher fires +# from the default branch (where it is active), bypasses the approval gate +# by using the schedule's GITHUB_TOKEN, and dispatches a workflow_dispatch +# run that executes with full repo permissions. +# +# DEDUPE CONTRACT +# --------------- +# A dispatch is suppressed only when there is already meaningful coverage for +# the PR's current head SHA: +# • run is in_progress, queued, waiting, or requested → suppress +# • run completed with conclusion=success or =neutral recently → suppress +# +# Blocked / non-executed conclusions (startup_failure, action_required, +# failure, cancelled, timed_out, skipped) do NOT count as coverage and do +# NOT suppress a fallback dispatch. on: schedule: - # Every 15 minutes during business hours (UTC). - # Adjust or extend to '*/15 * * * *' if 24/7 coverage is needed. - - cron: '*/15 7-23 * * 1-5' + # Every 15 minutes, all day every day. + # Bot reviews can arrive at any hour; restrict to business hours only if + # cost is a concern (e.g. '*/15 7-23 * * 1-5' for Mon-Fri 07-23 UTC). + - cron: '*/15 * * * *' permissions: actions: write @@ -29,15 +52,27 @@ jobs: with: script: | const REFRESH_WORKFLOW = 'pr-agent-context-refresh.yml'; - // Only look at activity in the last N minutes to avoid spurious dispatches. + + // Only look at review activity in the last N minutes. const LOOKBACK_MINUTES = 20; - // Skip dispatch if a run completed within the last N minutes. - const RECENT_COMPLETED_WINDOW_MINUTES = 10; + // A successfully-completed run within this window suppresses redispatch. + const RECENT_SUCCESS_WINDOW_MINUTES = 10; + + // Conclusions that mean the run was BLOCKED and never produced a + // refresh comment. These must NOT suppress a fallback dispatch. + const BLOCKED_CONCLUSIONS = new Set([ + 'startup_failure', + 'action_required', + 'failure', + 'cancelled', + 'timed_out', + 'skipped', + ]); const now = Date.now(); const since = new Date(now - LOOKBACK_MINUTES * 60 * 1000).toISOString(); - const recentCompletedSince = new Date( - now - RECENT_COMPLETED_WINDOW_MINUTES * 60 * 1000 + const recentSuccessSince = new Date( + now - RECENT_SUCCESS_WINDOW_MINUTES * 60 * 1000 ).toISOString(); const defaultBranch = context.payload.repository.default_branch; @@ -78,7 +113,9 @@ jobs: if (!hasRecentActivity) continue; - // --- In-flight / recent-completion dedupe --- + // --- In-flight / recent-success dedupe --- + // Fetch runs for this exact head SHA so stale runs from + // earlier commits don't suppress dispatch for the new SHA. const { data: { workflow_runs: runs } } = await github.rest.actions.listWorkflowRuns({ ...context.repo, @@ -87,17 +124,37 @@ jobs: per_page: 10, }); - const isRunningOrRecent = runs.some( - (r) => + const hasValidCoverage = runs.some((r) => { + // Approval-gated status: the run is waiting and will never + // execute — do NOT treat this as coverage. + if (r.status === 'action_required') return false; + + // Actively working toward a refresh — don't interrupt. + if ( r.status === 'in_progress' || r.status === 'queued' || r.status === 'waiting' || - (r.status === 'completed' && r.updated_at >= recentCompletedSince) - ); + r.status === 'requested' + ) { + return true; + } + + // Completed: only suppress if the run actually succeeded + // recently. Blocked / failed conclusions are transparent. + if (r.status === 'completed') { + if (BLOCKED_CONCLUSIONS.has(r.conclusion)) return false; + return ( + (r.conclusion === 'success' || r.conclusion === 'neutral') && + r.updated_at >= recentSuccessSince + ); + } + + return false; + }); - if (isRunningOrRecent) { + if (hasValidCoverage) { console.log( - `PR #${pr.number}: refresh already running or recently completed — skipping.` + `PR #${pr.number}: valid refresh already running or recently succeeded — skipping.` ); continue; } From 8ce47fafa612fbfb1a9866696a81d1debeefe63f Mon Sep 17 00:00:00 2001 From: Shay Palachy Date: Tue, 21 Apr 2026 19:03:20 +0300 Subject: [PATCH 6/8] fix: address Copilot round-3 review comments on PR #7 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - relationships.py: replace set-difference orphan collection with an O(1) single-pass pattern (count + bounded sample list) to avoid building a full intermediate set; rename _MAX_SAMPLE → _max_sample for ruff N806 - test_entities.py: wrap nullable-boolean iloc result in bool() before identity-comparing to True, avoiding pandas BooleanDtype NA-coercion risk - test_features.py: use direct boolean filter df[df["is_target"]] instead of df[df["is_target"] == True] (ruff E712) - pr-agent-context-refresh-dispatcher.yml: switch from per_page:50 cap to github.paginate() so all open PRs are checked, not just the first page Co-Authored-By: Claude Sonnet 4.6 --- .../pr-agent-context-refresh-dispatcher.yml | 6 +++--- leadforge/schema/relationships.py | 14 ++++++++++---- tests/schema/test_entities.py | 2 +- tests/schema/test_features.py | 2 +- 4 files changed, 15 insertions(+), 9 deletions(-) diff --git a/.github/workflows/pr-agent-context-refresh-dispatcher.yml b/.github/workflows/pr-agent-context-refresh-dispatcher.yml index 41679a3..51091ef 100644 --- a/.github/workflows/pr-agent-context-refresh-dispatcher.yml +++ b/.github/workflows/pr-agent-context-refresh-dispatcher.yml @@ -77,11 +77,11 @@ jobs: const defaultBranch = context.payload.repository.default_branch; - // List open PRs in this repository (same-repo only). - const { data: pulls } = await github.rest.pulls.list({ + // List ALL open PRs in this repository via pagination (same-repo only). + const pulls = await github.paginate(github.rest.pulls.list, { ...context.repo, state: 'open', - per_page: 50, + per_page: 100, }); for (const pr of pulls) { diff --git a/leadforge/schema/relationships.py b/leadforge/schema/relationships.py index eb7e7f1..d515cf7 100644 --- a/leadforge/schema/relationships.py +++ b/leadforge/schema/relationships.py @@ -57,11 +57,17 @@ def validate_fk( Raises: FKViolationError: if any child value is absent from *parent_values*. """ - orphans = [v for v in child_values if v not in parent_values] - if orphans: - sample = orphans[:5] + _max_sample = 5 + orphan_count = 0 + orphan_sample: list[str] = [] + for v in child_values: + if v not in parent_values: + orphan_count += 1 + if len(orphan_sample) < _max_sample: + orphan_sample.append(v) + if orphan_count: raise FKViolationError( f"FK violation: {constraint.child_table}.{constraint.child_column} " f"→ {constraint.parent_table}.{constraint.parent_column}: " - f"{len(orphans)} orphan(s), e.g. {sample}" + f"{orphan_count} orphan(s), e.g. {orphan_sample}" ) diff --git a/tests/schema/test_entities.py b/tests/schema/test_entities.py index 4cdcdb9..e3d6f2a 100644 --- a/tests/schema/test_entities.py +++ b/tests/schema/test_entities.py @@ -145,7 +145,7 @@ def test_lead_rows_parquet_roundtrip(tmp_path: Path) -> None: write_parquet(df, path) restored = read_parquet(path) assert restored["lead_id"].iloc[0] == "lead_000001" - assert restored["is_mql"].iloc[0] is True or restored["is_mql"].iloc[0] + assert bool(restored["is_mql"].iloc[0]) is True # --------------------------------------------------------------------------- diff --git a/tests/schema/test_features.py b/tests/schema/test_features.py index 303aaf8..e51d992 100644 --- a/tests/schema/test_features.py +++ b/tests/schema/test_features.py @@ -86,7 +86,7 @@ def test_feature_dictionary_df_columns() -> None: def test_feature_dictionary_df_target_row() -> None: df = feature_dictionary_df() - target_rows = df[df["is_target"] == True] # noqa: E712 + target_rows = df[df["is_target"]] assert len(target_rows) == 1 assert target_rows.iloc[0]["name"] == "converted_within_90_days" From 171961a9a65bf444719113a7370367e430b561e1 Mon Sep 17 00:00:00 2001 From: Shay Palachy Date: Wed, 22 Apr 2026 13:09:13 +0300 Subject: [PATCH 7/8] fix: address Copilot round-4 review comments on PR #7 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - pr-agent-context-refresh.yml: use github.event.inputs.* instead of inputs.* in concurrency.group; the inputs context is only defined for workflow_dispatch/workflow_call events, github.event.inputs is always safe to reference - pr-agent-context-refresh-dispatcher.yml: remove dead r.status === 'action_required' guard — action_required is a conclusion value, not a status value, so this branch never fired; the actual case (completed+conclusion=action_required) is already handled by BLOCKED_CONCLUSIONS; update top-of-file comment accordingly Resolves COPILOT-1 and COPILOT-2. COPILOT-3 (make_id docstring) is resolved as already treated — the docstring correctly describes the prefix argument pattern. Co-Authored-By: Claude Sonnet 4.6 --- .github/workflows/pr-agent-context-refresh-dispatcher.yml | 5 ----- .github/workflows/pr-agent-context-refresh.yml | 2 +- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/.github/workflows/pr-agent-context-refresh-dispatcher.yml b/.github/workflows/pr-agent-context-refresh-dispatcher.yml index 51091ef..76109a8 100644 --- a/.github/workflows/pr-agent-context-refresh-dispatcher.yml +++ b/.github/workflows/pr-agent-context-refresh-dispatcher.yml @@ -11,7 +11,6 @@ name: PR agent context refresh dispatcher # and trigger pr-agent-context-refresh — but the triggered run is immediately # blocked by GitHub's approval gate for bot/external actors: # -# • status=action_required → waiting for maintainer approval (never runs) # • conclusion=startup_failure → workflow could not start (counts as blocked) # • conclusion=action_required → was approval-gated, later auto-cancelled # @@ -125,10 +124,6 @@ jobs: }); const hasValidCoverage = runs.some((r) => { - // Approval-gated status: the run is waiting and will never - // execute — do NOT treat this as coverage. - if (r.status === 'action_required') return false; - // Actively working toward a refresh — don't interrupt. if ( r.status === 'in_progress' || diff --git a/.github/workflows/pr-agent-context-refresh.yml b/.github/workflows/pr-agent-context-refresh.yml index bfcaa25..bc36541 100644 --- a/.github/workflows/pr-agent-context-refresh.yml +++ b/.github/workflows/pr-agent-context-refresh.yml @@ -28,7 +28,7 @@ concurrency: group: >- pr-agent-context-refresh-${{ (github.event_name == 'workflow_dispatch' && - format('{0}-{1}', inputs.pull_request_number, inputs.pull_request_head_sha)) || + format('{0}-{1}', github.event.inputs.pull_request_number, github.event.inputs.pull_request_head_sha)) || github.event.pull_request.number || github.event.check_run.pull_requests[0].number || github.event.check_run.head_sha || From 84562fec5b49c63c9b32090cd503a21276af8584 Mon Sep 17 00:00:00 2001 From: Shay Palachy Date: Wed, 22 Apr 2026 14:21:56 +0300 Subject: [PATCH 8/8] fix: use listWorkflowRunsForWorkflow for per-workflow dedupe query MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit github.rest.actions.listWorkflowRuns is the repo-level endpoint and does not accept a workflow_id filter; the correct method is listWorkflowRunsForWorkflow, which scopes the query to the named workflow and supports the head_sha parameter used for SHA-aware dedupe. Without this fix the call throws at runtime, the try/catch swallows the error, and the dedupe step is silently skipped — causing the dispatcher to always fire without deduplication. Resolves COPILOT-1 (round 5). FAIL-1 (startup_failure on bot-triggered refresh run) is expected approval-gate behaviour; no code change needed. Co-Authored-By: Claude Sonnet 4.6 --- .github/workflows/pr-agent-context-refresh-dispatcher.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/pr-agent-context-refresh-dispatcher.yml b/.github/workflows/pr-agent-context-refresh-dispatcher.yml index 76109a8..b04d6d7 100644 --- a/.github/workflows/pr-agent-context-refresh-dispatcher.yml +++ b/.github/workflows/pr-agent-context-refresh-dispatcher.yml @@ -116,7 +116,7 @@ jobs: // Fetch runs for this exact head SHA so stale runs from // earlier commits don't suppress dispatch for the new SHA. const { data: { workflow_runs: runs } } = - await github.rest.actions.listWorkflowRuns({ + await github.rest.actions.listWorkflowRunsForWorkflow({ ...context.repo, workflow_id: REFRESH_WORKFLOW, head_sha: pr.head.sha,