diff --git a/.agent-plan.md b/.agent-plan.md index ae236dc..447c76f 100644 --- a/.agent-plan.md +++ b/.agent-plan.md @@ -6,41 +6,50 @@ ## Current System State -**v0.2.0 in progress — Milestone 2 complete (PR open).** Typed `NarrativeSpec` hierarchy, `WorldSpec` -with narrative field, `Generator.from_recipe()` populates `world_spec`, dataset card renderer, and -full test coverage. 110 tests passing. +**v0.2.0 in progress — Milestone 3 complete (PR open).** All 9 relational table schemas defined as +typed row dataclasses with Parquet round-trip support. FK constraints, ID generation, feature +dictionary, and task manifest implemented. 192 tests passing. --- -## Active Task Breakdown — Milestone 3: Schema Layer (v0.2.0 cont.) +## Active Task Breakdown — Milestone 4: World Structure (v0.3.0) -Goal: Define the relational entity schema (accounts, contacts, leads, etc.) and feature dictionary. +Goal: Implement the hidden world graph — DAG of latent nodes, motif families, and stochastic rewiring. -- [ ] **1. Entity schema** - - Implement `schema/entities.py`: typed dataclasses for `Account`, `Contact`, `Lead` - - Implement `schema/events.py`: `Touch`, `SalesActivity`, `Opportunity` etc. - -- [ ] **2. Feature dictionary** - - Implement `schema/features.py` + `schema/dictionaries.py` - - Generate `feature_dictionary.csv` stub - -- [ ] **3. Task schema** - - Implement `schema/tasks.py`: `converted_within_90_days` task manifest structure +- [ ] **1. Node type system** (`structure/node_types.py`) +- [ ] **2. World graph** (`structure/graph.py`) — `networkx.DiGraph`, DAG validation +- [ ] **3. Motif families** (`structure/motifs.py`, `structure/templates.py`) — 5 v1 families +- [ ] **4. Stochastic rewiring** (`structure/rewiring.py`) — seeded perturbation +- [ ] **5. Sampler** (`structure/sampler.py`) — draw a world graph from a motif + config --- ## Context Pointers -- Milestone 3 scope: `docs/leadforge_implementation_plan.md` §6 "Milestone 3" +- Milestone 4 scope: `docs/leadforge_implementation_plan.md` §7 "Milestone 4" - Full milestone dependency graph: `docs/leadforge_implementation_plan.md` §6 -- Schema spec: `docs/leadforge_architecture_spec.md` §8 -- Recipe assets: `leadforge/recipes/b2b_saas_procurement_v1/` +- Structure spec: `docs/leadforge_architecture_spec.md` §11 +- Motif families: `docs/leadforge_architecture_spec.md` §11.2 --- ## Completed Phases -### Milestone 2 — Narrative Layer ✓ (v0.2.0 in PR) +### Milestone 3 — Schema Layer ✓ (v0.2.0 in PR) +- `leadforge/core/ids.py`: `make_id(prefix, n)` + `ID_PREFIXES` registry +- `leadforge/schema/entities.py`: typed row dataclasses for all 9 tables (accounts, contacts, + leads, touches, sessions, sales_activities, opportunities, customers, subscriptions) with + `DTYPE_MAP`, `to_dict()`, `empty_dataframe()`, Parquet round-trip via `schema/tables.py` +- `leadforge/schema/relationships.py`: `FKConstraint`, `ALL_CONSTRAINTS` (10 FK edges), + `validate_fk()` helper raising `FKViolationError` +- `leadforge/schema/features.py`: `FeatureSpec` frozen dataclass + `LEAD_SNAPSHOT_FEATURES` + (29 features, one target) +- `leadforge/schema/dictionaries.py`: `feature_dictionary_df()` + `write_feature_dictionary()` +- `leadforge/schema/tasks.py`: `SplitSpec`, `TaskManifest`, `CONVERTED_WITHIN_90_DAYS` constant +- `pyproject.toml`: added pandas≥2.0 + pyarrow≥14.0 as core deps; mypy overrides for both +- 82 new tests; total 192 passing + +### Milestone 2 — Narrative Layer ✓ (v0.2.0 merged) - `leadforge/narrative/spec.py`: frozen dataclasses `NarrativeSpec`, `CompanySpec`, `ProductSpec`, `MarketSpec`, `GtmMotionSpec`, `PersonaSpec`, `FunnelStageSpec` — all with validated `from_dict()` - `leadforge/narrative/dataset_card.py`: `render_dataset_card(world_spec)` — Markdown card diff --git a/.github/workflows/pr-agent-context-refresh-dispatcher.yml b/.github/workflows/pr-agent-context-refresh-dispatcher.yml new file mode 100644 index 0000000..b04d6d7 --- /dev/null +++ b/.github/workflows/pr-agent-context-refresh-dispatcher.yml @@ -0,0 +1,176 @@ +name: PR agent context refresh dispatcher + +# Runs on a schedule and dispatches pr-agent-context-refresh for any open +# same-repo PR that had recent review activity but no corresponding in-flight +# or recently-succeeded refresh run. +# +# WHY THIS EXISTS +# --------------- +# When a bot (e.g. Copilot / copilot-pull-request-reviewer[bot]) submits a +# review, the pull_request_review / pull_request_review_comment events fire +# and trigger pr-agent-context-refresh — but the triggered run is immediately +# blocked by GitHub's approval gate for bot/external actors: +# +# • conclusion=startup_failure → workflow could not start (counts as blocked) +# • conclusion=action_required → was approval-gated, later auto-cancelled +# +# None of those outcomes produce a refresh comment. This dispatcher fires +# from the default branch (where it is active), bypasses the approval gate +# by using the schedule's GITHUB_TOKEN, and dispatches a workflow_dispatch +# run that executes with full repo permissions. +# +# DEDUPE CONTRACT +# --------------- +# A dispatch is suppressed only when there is already meaningful coverage for +# the PR's current head SHA: +# • run is in_progress, queued, waiting, or requested → suppress +# • run completed with conclusion=success or =neutral recently → suppress +# +# Blocked / non-executed conclusions (startup_failure, action_required, +# failure, cancelled, timed_out, skipped) do NOT count as coverage and do +# NOT suppress a fallback dispatch. + +on: + schedule: + # Every 15 minutes, all day every day. + # Bot reviews can arrive at any hour; restrict to business hours only if + # cost is a concern (e.g. '*/15 7-23 * * 1-5' for Mon-Fri 07-23 UTC). + - cron: '*/15 * * * *' + +permissions: + actions: write + pull-requests: read + +jobs: + dispatch: + name: Dispatch stalled PR agent context refreshes + runs-on: ubuntu-latest + steps: + - name: Find and dispatch pending refreshes + uses: actions/github-script@v7 + with: + script: | + const REFRESH_WORKFLOW = 'pr-agent-context-refresh.yml'; + + // Only look at review activity in the last N minutes. + const LOOKBACK_MINUTES = 20; + // A successfully-completed run within this window suppresses redispatch. + const RECENT_SUCCESS_WINDOW_MINUTES = 10; + + // Conclusions that mean the run was BLOCKED and never produced a + // refresh comment. These must NOT suppress a fallback dispatch. + const BLOCKED_CONCLUSIONS = new Set([ + 'startup_failure', + 'action_required', + 'failure', + 'cancelled', + 'timed_out', + 'skipped', + ]); + + const now = Date.now(); + const since = new Date(now - LOOKBACK_MINUTES * 60 * 1000).toISOString(); + const recentSuccessSince = new Date( + now - RECENT_SUCCESS_WINDOW_MINUTES * 60 * 1000 + ).toISOString(); + + const defaultBranch = context.payload.repository.default_branch; + + // List ALL open PRs in this repository via pagination (same-repo only). + const pulls = await github.paginate(github.rest.pulls.list, { + ...context.repo, + state: 'open', + per_page: 100, + }); + + for (const pr of pulls) { + // Same-repo guard: skip forks. + if (pr.head.repo.full_name !== context.payload.repository.full_name) { + continue; + } + + try { + // --- Bounded recent activity check --- + const [{ data: reviews }, { data: reviewComments }] = await Promise.all([ + github.rest.pulls.listReviews({ + ...context.repo, + pull_number: pr.number, + per_page: 10, + }), + github.rest.pulls.listReviewComments({ + ...context.repo, + pull_number: pr.number, + per_page: 10, + }), + ]); + + const hasRecentActivity = + reviews.some((r) => r.submitted_at >= since) || + reviewComments.some( + (c) => c.created_at >= since || c.updated_at >= since + ); + + if (!hasRecentActivity) continue; + + // --- In-flight / recent-success dedupe --- + // Fetch runs for this exact head SHA so stale runs from + // earlier commits don't suppress dispatch for the new SHA. + const { data: { workflow_runs: runs } } = + await github.rest.actions.listWorkflowRunsForWorkflow({ + ...context.repo, + workflow_id: REFRESH_WORKFLOW, + head_sha: pr.head.sha, + per_page: 10, + }); + + const hasValidCoverage = runs.some((r) => { + // Actively working toward a refresh — don't interrupt. + if ( + r.status === 'in_progress' || + r.status === 'queued' || + r.status === 'waiting' || + r.status === 'requested' + ) { + return true; + } + + // Completed: only suppress if the run actually succeeded + // recently. Blocked / failed conclusions are transparent. + if (r.status === 'completed') { + if (BLOCKED_CONCLUSIONS.has(r.conclusion)) return false; + return ( + (r.conclusion === 'success' || r.conclusion === 'neutral') && + r.updated_at >= recentSuccessSince + ); + } + + return false; + }); + + if (hasValidCoverage) { + console.log( + `PR #${pr.number}: valid refresh already running or recently succeeded — skipping.` + ); + continue; + } + + // --- Dispatch --- + await github.rest.actions.createWorkflowDispatch({ + ...context.repo, + workflow_id: REFRESH_WORKFLOW, + ref: defaultBranch, + inputs: { + pull_request_number: String(pr.number), + pull_request_head_sha: pr.head.sha, + pull_request_base_sha: pr.base.sha, + }, + }); + + console.log( + `Dispatched refresh for PR #${pr.number} (head: ${pr.head.sha}).` + ); + } catch (err) { + // Per-PR error isolation: log and continue to the next PR. + console.error(`PR #${pr.number}: dispatch failed — ${err.message}`); + } + } diff --git a/.github/workflows/pr-agent-context-refresh.yml b/.github/workflows/pr-agent-context-refresh.yml index e2e3e88..bc36541 100644 --- a/.github/workflows/pr-agent-context-refresh.yml +++ b/.github/workflows/pr-agent-context-refresh.yml @@ -7,10 +7,28 @@ on: types: [created, edited, deleted] check_run: types: [completed] + workflow_dispatch: + inputs: + pull_request_number: + description: PR number to refresh + required: true + type: string + pull_request_head_sha: + description: Head SHA of the PR + required: true + type: string + pull_request_base_sha: + description: Base SHA of the PR + required: true + type: string +# SHA-aware concurrency: workflow_dispatch runs key on PR+SHA so same-PR/different-SHA +# dispatches are not cancelled, but duplicate dispatches for the same PR+SHA are. concurrency: group: >- pr-agent-context-refresh-${{ + (github.event_name == 'workflow_dispatch' && + format('{0}-{1}', github.event.inputs.pull_request_number, github.event.inputs.pull_request_head_sha)) || github.event.pull_request.number || github.event.check_run.pull_requests[0].number || github.event.check_run.head_sha || @@ -27,6 +45,7 @@ jobs: pr-agent-context-refresh: name: PR agent context refresh if: >- + github.event_name == 'workflow_dispatch' || (github.event_name == 'pull_request_review' && github.event.pull_request.head.repo.full_name == github.repository) || (github.event_name == 'pull_request_review_comment' && @@ -52,3 +71,6 @@ jobs: wait_for_reviews_to_settle: true publish_all_clear_comments_in_refresh: false debug_artifacts: true + pull_request_number: ${{ inputs.pull_request_number || '' }} + pull_request_head_sha: ${{ inputs.pull_request_head_sha || '' }} + pull_request_base_sha: ${{ inputs.pull_request_base_sha || '' }} diff --git a/leadforge/core/hashing.py b/leadforge/core/hashing.py index 513d70e..680a782 100644 --- a/leadforge/core/hashing.py +++ b/leadforge/core/hashing.py @@ -18,7 +18,7 @@ def _canonical(obj: Any) -> Any: """Recursively convert to a JSON-stable form (sorted keys, enums → str).""" if isinstance(obj, dict): return {k: _canonical(v) for k, v in sorted(obj.items())} - if isinstance(obj, (list, tuple)): + if isinstance(obj, (list, tuple)): # noqa: UP038 return [_canonical(v) for v in obj] # StrEnum values are already strings; this handles plain Enum too if hasattr(obj, "value"): diff --git a/leadforge/core/ids.py b/leadforge/core/ids.py index ba1cad7..708358b 100644 --- a/leadforge/core/ids.py +++ b/leadforge/core/ids.py @@ -1,16 +1,63 @@ """Entity ID generation. -Implemented in Milestone 3. All IDs must be stable, opaque, namespace-unique, -and deterministic for a given run. - -Canonical prefixes: - acct_ — Account - cnt_ — Contact - lead_ — Lead - touch_ — Touch - sess_ — Session - act_ — SalesActivity - opp_ — Opportunity - cust_ — Customer - sub_ — Subscription +All IDs are stable, opaque, namespace-unique, and deterministic for a given +(recipe, config, seed) triple. Callers derive a dedicated RNG substream via +``RNGRoot.child()`` and pass a monotonically increasing counter to +:func:`make_id`. + +Canonical prefixes +------------------ +The following nine prefixes correspond directly to the nine relational tables +defined in ``schema/entities.py``: + +acct_ — Account +cnt_ — Contact +lead_ — Lead +touch_ — Touch +sess_ — Session +act_ — SalesActivity +opp_ — Opportunity +cust_ — Customer +sub_ — Subscription + +The ``rep_`` prefix is an internal-only namespace used for sales-rep entities +that participate in simulation mechanics but do **not** have a corresponding +standalone relational table in the v1 output bundle. """ + +from __future__ import annotations + +# Canonical prefix registry — single source of truth used by tests and +# simulation code alike. +ID_PREFIXES: dict[str, str] = { + "account": "acct", + "contact": "cnt", + "lead": "lead", + "touch": "touch", + "session": "sess", + "sales_activity": "act", + "opportunity": "opp", + "customer": "cust", + "subscription": "sub", + "rep": "rep", +} + +_PAD_WIDTH = 6 # e.g. acct_000001 + + +def make_id(prefix: str, n: int) -> str: + """Return a zero-padded entity ID string. + + Args: + prefix: The namespace prefix (e.g. ``"acct"``). + n: A 1-based counter for this entity type within one generation run. + + Returns: + A string of the form ``"_"``; e.g. ``"acct_000001"``. + + Raises: + ValueError: if *n* is not a positive integer. + """ + if not isinstance(n, int) or isinstance(n, bool) or n < 1: + raise ValueError(f"n must be a positive int, got {n!r}") + return f"{prefix}_{n:0{_PAD_WIDTH}d}" diff --git a/leadforge/narrative/spec.py b/leadforge/narrative/spec.py index d873b03..9425959 100644 --- a/leadforge/narrative/spec.py +++ b/leadforge/narrative/spec.py @@ -37,7 +37,7 @@ def from_dict(cls, data: dict[str, Any]) -> CompanySpec: ) er = data["employee_range"] if not ( - isinstance(er, (list, tuple)) + isinstance(er, (list, tuple)) # noqa: UP038 and len(er) == 2 and all(isinstance(v, int) and not isinstance(v, bool) for v in er) ): @@ -85,7 +85,7 @@ def from_dict(cls, data: dict[str, Any]) -> ProductSpec: ) acv = data["acv_range_usd"] if not ( - isinstance(acv, (list, tuple)) + isinstance(acv, (list, tuple)) # noqa: UP038 and len(acv) == 2 and all(isinstance(v, int) and not isinstance(v, bool) for v in acv) ): @@ -93,7 +93,7 @@ def from_dict(cls, data: dict[str, Any]) -> ProductSpec: f"product.acv_range_usd must be a [min, max] int pair, got {acv!r}" ) terms = data["contract_terms_months"] - if not isinstance(terms, (list, tuple)) or not all( + if not isinstance(terms, (list, tuple)) or not all( # noqa: UP038 isinstance(v, int) and not isinstance(v, bool) for v in terms ): raise InvalidRecipeError( @@ -141,7 +141,7 @@ def from_dict(cls, data: dict[str, Any]) -> MarketSpec: ) er = data["icp_employee_range"] if not ( - isinstance(er, (list, tuple)) + isinstance(er, (list, tuple)) # noqa: UP038 and len(er) == 2 and all(isinstance(v, int) and not isinstance(v, bool) for v in er) ): @@ -149,7 +149,7 @@ def from_dict(cls, data: dict[str, Any]) -> MarketSpec: f"market.icp_employee_range must be a [min, max] int pair, got {er!r}" ) industries = data["icp_industries"] - if not isinstance(industries, (list, tuple)): + if not isinstance(industries, (list, tuple)): # noqa: UP038 raise InvalidRecipeError( f"market.icp_industries must be a list of strings, got {industries!r}" ) @@ -158,7 +158,7 @@ def from_dict(cls, data: dict[str, Any]) -> MarketSpec: f"market.icp_industries must contain only strings, got {industries!r}" ) geographies = data["geographies"] - if not isinstance(geographies, (list, tuple)): + if not isinstance(geographies, (list, tuple)): # noqa: UP038 raise InvalidRecipeError( f"market.geographies must be a list of strings, got {geographies!r}" ) @@ -194,13 +194,13 @@ def from_dict(cls, data: dict[str, Any]) -> GtmMotionSpec: "gtm_motion", ) channels = data["channels"] - if not isinstance(channels, (list, tuple)) or not all(isinstance(c, str) for c in channels): + if not isinstance(channels, (list, tuple)) or not all(isinstance(c, str) for c in channels): # noqa: UP038 raise InvalidRecipeError( f"gtm_motion.channels must be a list of strings, got {channels!r}" ) for share_name in ("inbound_share", "outbound_share", "partner_share"): v = data[share_name] - if isinstance(v, bool) or not isinstance(v, (int, float)): + if isinstance(v, bool) or not isinstance(v, (int, float)): # noqa: UP038 raise InvalidRecipeError( f"gtm_motion.{share_name} must be a float in [0, 1], got {type(v).__name__!r}" ) @@ -232,7 +232,7 @@ def from_dict(cls, data: dict[str, Any]) -> PersonaSpec: ) title_variants = data["title_variants"] if not ( - isinstance(title_variants, (list, tuple)) + isinstance(title_variants, (list, tuple)) # noqa: UP038 and all(isinstance(t, str) for t in title_variants) ): raise InvalidRecipeError( diff --git a/leadforge/schema/dictionaries.py b/leadforge/schema/dictionaries.py new file mode 100644 index 0000000..f97cb90 --- /dev/null +++ b/leadforge/schema/dictionaries.py @@ -0,0 +1,66 @@ +"""Feature dictionary builder. + +Converts :data:`~leadforge.schema.features.LEAD_SNAPSHOT_FEATURES` into a +``pd.DataFrame`` and optionally writes it as ``feature_dictionary.csv`` — one +of the three files required in every bundle output mode (§14.1). +""" + +from __future__ import annotations + +from pathlib import Path + +import pandas as pd + +from leadforge.schema.features import LEAD_SNAPSHOT_FEATURES, FeatureSpec + +_COLUMNS = ("name", "dtype", "description", "category", "is_target", "leakage_risk") + + +def feature_dictionary_df( + features: tuple[FeatureSpec, ...] = LEAD_SNAPSHOT_FEATURES, +) -> pd.DataFrame: + """Return the feature dictionary as a ``pd.DataFrame``. + + Columns: name, dtype, description, category, is_target, leakage_risk. + + Args: + features: Ordered tuple of :class:`~leadforge.schema.features.FeatureSpec` + objects. Defaults to the canonical lead snapshot feature list. + + Returns: + A ``pd.DataFrame`` with one row per feature. String columns + (``name``, ``dtype``, ``description``, ``category``) use + ``pd.StringDtype``; flag columns (``is_target``, ``leakage_risk``) + use ``pd.BooleanDtype``. + """ + rows = [ + { + "name": f.name, + "dtype": f.dtype, + "description": f.description, + "category": f.category, + "is_target": f.is_target, + "leakage_risk": f.leakage_risk, + } + for f in features + ] + df = pd.DataFrame(rows, columns=list(_COLUMNS)) + for col in ("name", "dtype", "description", "category"): + df[col] = df[col].astype("string") + df["is_target"] = df["is_target"].astype("boolean") + df["leakage_risk"] = df["leakage_risk"].astype("boolean") + return df + + +def write_feature_dictionary( + path: Path, + features: tuple[FeatureSpec, ...] = LEAD_SNAPSHOT_FEATURES, +) -> None: + """Write the feature dictionary CSV to *path*. + + Args: + path: Destination file path (created with ``parents=True``). + features: Feature list to serialize. Defaults to the canonical list. + """ + path.parent.mkdir(parents=True, exist_ok=True) + feature_dictionary_df(features).to_csv(path, index=False) diff --git a/leadforge/schema/entities.py b/leadforge/schema/entities.py new file mode 100644 index 0000000..2a60a7c --- /dev/null +++ b/leadforge/schema/entities.py @@ -0,0 +1,375 @@ +"""Typed row contracts for all v1 relational tables. + +Each class represents one row in a Parquet table. Fields map directly to +the column specifications in §16 of the architecture spec. Optional columns +(nullable in the output) use ``... | None`` typing. + +All row classes expose: + +- ``TABLE_NAME`` — the canonical Parquet table name (no extension). +- ``DTYPE_MAP`` — ``{column: pandas-dtype-string}`` used to build empty + DataFrames with the right schema. +- ``to_dict()`` — returns a plain ``dict`` suitable for ``pd.DataFrame([...])`` + or JSON serialization. +- ``empty_dataframe()`` — class method returning a zero-row ``pd.DataFrame`` + with the correct columns and nullable dtypes. +""" + +from __future__ import annotations + +from dataclasses import dataclass, fields +from typing import Any, ClassVar + +import pandas as pd + + +def _empty_df(dtype_map: dict[str, str]) -> pd.DataFrame: + """Return a zero-row DataFrame with columns ordered as *dtype_map*.""" + return pd.DataFrame({col: pd.array([], dtype=dtype) for col, dtype in dtype_map.items()}) + + +# --------------------------------------------------------------------------- +# accounts +# --------------------------------------------------------------------------- + + +@dataclass +class AccountRow: + """One row in the ``accounts`` table.""" + + TABLE_NAME: ClassVar[str] = "accounts" + DTYPE_MAP: ClassVar[dict[str, str]] = { + "account_id": "string", + "company_name": "string", + "industry": "string", + "region": "string", + "employee_band": "string", + "estimated_revenue_band": "string", + "process_maturity_band": "string", + "created_at": "string", + } + + account_id: str + company_name: str + industry: str + region: str + employee_band: str + estimated_revenue_band: str + process_maturity_band: str + created_at: str + + def to_dict(self) -> dict[str, Any]: + return {f.name: getattr(self, f.name) for f in fields(self)} + + @classmethod + def empty_dataframe(cls) -> pd.DataFrame: + return _empty_df(cls.DTYPE_MAP) + + +# --------------------------------------------------------------------------- +# contacts +# --------------------------------------------------------------------------- + + +@dataclass +class ContactRow: + """One row in the ``contacts`` table.""" + + TABLE_NAME: ClassVar[str] = "contacts" + DTYPE_MAP: ClassVar[dict[str, str]] = { + "contact_id": "string", + "account_id": "string", + "job_title": "string", + "role_function": "string", + "seniority": "string", + "buyer_role": "string", + "email_domain_type": "string", + "created_at": "string", + } + + contact_id: str + account_id: str + job_title: str + role_function: str + seniority: str + buyer_role: str + email_domain_type: str + created_at: str + + def to_dict(self) -> dict[str, Any]: + return {f.name: getattr(self, f.name) for f in fields(self)} + + @classmethod + def empty_dataframe(cls) -> pd.DataFrame: + return _empty_df(cls.DTYPE_MAP) + + +# --------------------------------------------------------------------------- +# leads +# --------------------------------------------------------------------------- + + +@dataclass +class LeadRow: + """One row in the ``leads`` table.""" + + TABLE_NAME: ClassVar[str] = "leads" + DTYPE_MAP: ClassVar[dict[str, str]] = { + "lead_id": "string", + "contact_id": "string", + "account_id": "string", + "lead_created_at": "string", + "lead_source": "string", + "first_touch_channel": "string", + "current_stage": "string", + "owner_rep_id": "string", + "is_mql": "boolean", + "is_sql": "boolean", + "converted_within_90_days": "boolean", + "conversion_timestamp": "string", + } + + lead_id: str + contact_id: str + account_id: str + lead_created_at: str + lead_source: str + first_touch_channel: str + current_stage: str + owner_rep_id: str + is_mql: bool + is_sql: bool + converted_within_90_days: bool + conversion_timestamp: str | None = None + + def to_dict(self) -> dict[str, Any]: + return {f.name: getattr(self, f.name) for f in fields(self)} + + @classmethod + def empty_dataframe(cls) -> pd.DataFrame: + return _empty_df(cls.DTYPE_MAP) + + +# --------------------------------------------------------------------------- +# touches +# --------------------------------------------------------------------------- + + +@dataclass +class TouchRow: + """One row in the ``touches`` table.""" + + TABLE_NAME: ClassVar[str] = "touches" + DTYPE_MAP: ClassVar[dict[str, str]] = { + "touch_id": "string", + "lead_id": "string", + "touch_timestamp": "string", + "touch_type": "string", + "touch_channel": "string", + "touch_direction": "string", + "campaign_id": "string", + } + + touch_id: str + lead_id: str + touch_timestamp: str + touch_type: str + touch_channel: str + touch_direction: str + campaign_id: str | None = None + + def to_dict(self) -> dict[str, Any]: + return {f.name: getattr(self, f.name) for f in fields(self)} + + @classmethod + def empty_dataframe(cls) -> pd.DataFrame: + return _empty_df(cls.DTYPE_MAP) + + +# --------------------------------------------------------------------------- +# sessions +# --------------------------------------------------------------------------- + + +@dataclass +class SessionRow: + """One row in the ``sessions`` table.""" + + TABLE_NAME: ClassVar[str] = "sessions" + DTYPE_MAP: ClassVar[dict[str, str]] = { + "session_id": "string", + "lead_id": "string", + "session_timestamp": "string", + "session_type": "string", + "page_views": "Int64", + "pricing_page_views": "Int64", + "demo_page_views": "Int64", + "session_duration_seconds": "Int64", + } + + session_id: str + lead_id: str + session_timestamp: str + session_type: str + page_views: int + pricing_page_views: int + demo_page_views: int + session_duration_seconds: int + + def to_dict(self) -> dict[str, Any]: + return {f.name: getattr(self, f.name) for f in fields(self)} + + @classmethod + def empty_dataframe(cls) -> pd.DataFrame: + return _empty_df(cls.DTYPE_MAP) + + +# --------------------------------------------------------------------------- +# sales_activities +# --------------------------------------------------------------------------- + + +@dataclass +class SalesActivityRow: + """One row in the ``sales_activities`` table.""" + + TABLE_NAME: ClassVar[str] = "sales_activities" + DTYPE_MAP: ClassVar[dict[str, str]] = { + "activity_id": "string", + "lead_id": "string", + "rep_id": "string", + "activity_timestamp": "string", + "activity_type": "string", + "activity_outcome": "string", + } + + activity_id: str + lead_id: str + rep_id: str + activity_timestamp: str + activity_type: str + activity_outcome: str + + def to_dict(self) -> dict[str, Any]: + return {f.name: getattr(self, f.name) for f in fields(self)} + + @classmethod + def empty_dataframe(cls) -> pd.DataFrame: + return _empty_df(cls.DTYPE_MAP) + + +# --------------------------------------------------------------------------- +# opportunities +# --------------------------------------------------------------------------- + + +@dataclass +class OpportunityRow: + """One row in the ``opportunities`` table.""" + + TABLE_NAME: ClassVar[str] = "opportunities" + DTYPE_MAP: ClassVar[dict[str, str]] = { + "opportunity_id": "string", + "lead_id": "string", + "created_at": "string", + "stage": "string", + "estimated_acv": "Int64", + "close_outcome": "string", + "closed_at": "string", + } + + opportunity_id: str + lead_id: str + created_at: str + stage: str + estimated_acv: int + close_outcome: str | None = None + closed_at: str | None = None + + def to_dict(self) -> dict[str, Any]: + return {f.name: getattr(self, f.name) for f in fields(self)} + + @classmethod + def empty_dataframe(cls) -> pd.DataFrame: + return _empty_df(cls.DTYPE_MAP) + + +# --------------------------------------------------------------------------- +# customers +# --------------------------------------------------------------------------- + + +@dataclass +class CustomerRow: + """One row in the ``customers`` table.""" + + TABLE_NAME: ClassVar[str] = "customers" + DTYPE_MAP: ClassVar[dict[str, str]] = { + "customer_id": "string", + "opportunity_id": "string", + "account_id": "string", + "customer_start_at": "string", + } + + customer_id: str + opportunity_id: str + account_id: str + customer_start_at: str + + def to_dict(self) -> dict[str, Any]: + return {f.name: getattr(self, f.name) for f in fields(self)} + + @classmethod + def empty_dataframe(cls) -> pd.DataFrame: + return _empty_df(cls.DTYPE_MAP) + + +# --------------------------------------------------------------------------- +# subscriptions +# --------------------------------------------------------------------------- + + +@dataclass +class SubscriptionRow: + """One row in the ``subscriptions`` table.""" + + TABLE_NAME: ClassVar[str] = "subscriptions" + DTYPE_MAP: ClassVar[dict[str, str]] = { + "subscription_id": "string", + "customer_id": "string", + "plan_name": "string", + "subscription_start_at": "string", + "subscription_status": "string", + } + + subscription_id: str + customer_id: str + plan_name: str + subscription_start_at: str + subscription_status: str + + def to_dict(self) -> dict[str, Any]: + return {f.name: getattr(self, f.name) for f in fields(self)} + + @classmethod + def empty_dataframe(cls) -> pd.DataFrame: + return _empty_df(cls.DTYPE_MAP) + + +# --------------------------------------------------------------------------- +# Registry +# --------------------------------------------------------------------------- + +ALL_ROW_TYPES: tuple[type, ...] = ( + AccountRow, + ContactRow, + LeadRow, + TouchRow, + SessionRow, + SalesActivityRow, + OpportunityRow, + CustomerRow, + SubscriptionRow, +) + +TABLE_NAMES: tuple[str, ...] = tuple(cls.TABLE_NAME for cls in ALL_ROW_TYPES) # type: ignore[attr-defined] diff --git a/leadforge/schema/features.py b/leadforge/schema/features.py new file mode 100644 index 0000000..fbf9900 --- /dev/null +++ b/leadforge/schema/features.py @@ -0,0 +1,211 @@ +"""Feature specification for the lead snapshot task table. + +:data:`LEAD_SNAPSHOT_FEATURES` is the canonical ordered list of features +present in the primary task export (``tasks/converted_within_90_days/``). +Every feature here is anchored at or before the snapshot date — no +post-anchor data is included (leakage rule, §4 of the architecture spec). +""" + +from __future__ import annotations + +from dataclasses import dataclass + + +@dataclass(frozen=True) +class FeatureSpec: + """Metadata for one column in the lead snapshot table. + + Attributes: + name: Column name as it appears in the Parquet file. + dtype: Pandas-compatible dtype string (``"string"``, ``"Int64"``, + ``"Float64"``, ``"boolean"``). + description: Human-readable explanation of what the column captures. + category: Logical grouping (``"account"``, ``"contact"``, + ``"lead_meta"``, ``"engagement"``, ``"sales"``, ``"target"``). + is_target: True for the label column only. + leakage_risk: True if the column could contain post-snapshot-anchor + information and must be excluded from student_public exports. + """ + + name: str + dtype: str + description: str + category: str + is_target: bool = False + leakage_risk: bool = False + + +# --------------------------------------------------------------------------- +# Canonical feature list — lead snapshot +# --------------------------------------------------------------------------- + +LEAD_SNAPSHOT_FEATURES: tuple[FeatureSpec, ...] = ( + # -- Account features -- + FeatureSpec("account_id", "string", "Opaque account identifier.", "account"), + FeatureSpec( + "industry", + "string", + "Industry vertical of the buying organization.", + "account", + ), + FeatureSpec( + "region", + "string", + "Geographic region of the account's headquarters.", + "account", + ), + FeatureSpec( + "employee_band", + "string", + "Banded employee headcount of the account.", + "account", + ), + FeatureSpec( + "estimated_revenue_band", + "string", + "Banded estimated annual revenue of the account.", + "account", + ), + FeatureSpec( + "process_maturity_band", + "string", + "Banded internal process maturity score (latent).", + "account", + leakage_risk=False, + ), + # -- Contact features -- + FeatureSpec("contact_id", "string", "Opaque contact identifier.", "contact"), + FeatureSpec( + "role_function", + "string", + "Functional area of the primary contact (e.g. finance, ops).", + "contact", + ), + FeatureSpec( + "seniority", + "string", + "Seniority band of the primary contact.", + "contact", + ), + FeatureSpec( + "buyer_role", + "string", + "Buyer role classification (economic_buyer, champion, etc.).", + "contact", + ), + # -- Lead metadata features -- + FeatureSpec("lead_id", "string", "Opaque lead identifier.", "lead_meta"), + FeatureSpec( + "lead_created_at", + "string", + "ISO-8601 timestamp when the lead was created.", + "lead_meta", + ), + FeatureSpec( + "lead_source", + "string", + "Origination source of the lead (e.g. inbound_form, sdr_outbound).", + "lead_meta", + ), + FeatureSpec( + "first_touch_channel", + "string", + "Marketing channel responsible for the first recorded touch.", + "lead_meta", + ), + FeatureSpec( + "current_stage", + "string", + "Funnel stage at snapshot anchor date.", + "lead_meta", + ), + FeatureSpec( + "is_mql", + "boolean", + "Whether the lead had achieved MQL status at snapshot date.", + "lead_meta", + ), + FeatureSpec( + "is_sql", + "boolean", + "Whether the lead had achieved SQL status at snapshot date.", + "lead_meta", + ), + # -- Engagement features -- + FeatureSpec( + "touch_count", + "Int64", + "Total number of marketing/sales touches recorded before snapshot.", + "engagement", + ), + FeatureSpec( + "inbound_touch_count", + "Int64", + "Number of inbound touches before snapshot.", + "engagement", + ), + FeatureSpec( + "outbound_touch_count", + "Int64", + "Number of outbound touches before snapshot.", + "engagement", + ), + FeatureSpec( + "session_count", + "Int64", + "Number of web/trial sessions recorded before snapshot.", + "engagement", + ), + FeatureSpec( + "pricing_page_views", + "Int64", + "Cumulative pricing page views across all sessions before snapshot.", + "engagement", + ), + FeatureSpec( + "demo_page_views", + "Int64", + "Cumulative demo page views across all sessions before snapshot.", + "engagement", + ), + FeatureSpec( + "total_session_duration_seconds", + "Int64", + "Sum of session durations (seconds) before snapshot.", + "engagement", + ), + # -- Sales activity features -- + FeatureSpec( + "activity_count", + "Int64", + "Number of sales activities logged before snapshot.", + "sales", + ), + FeatureSpec( + "days_since_last_touch", + "Float64", + "Days elapsed between most recent touch and snapshot anchor date.", + "sales", + ), + FeatureSpec( + "has_open_opportunity", + "boolean", + "Whether an open opportunity existed at snapshot date.", + "sales", + ), + FeatureSpec( + "opportunity_estimated_acv", + "Float64", + "Estimated ACV of the most recent open opportunity (NaN if none).", + "sales", + ), + # -- Target -- + FeatureSpec( + "converted_within_90_days", + "boolean", + "Label: True if a closed_won event occurred within 90 days of " + "the snapshot anchor date. Derived from simulated events.", + "target", + is_target=True, + ), +) diff --git a/leadforge/schema/relationships.py b/leadforge/schema/relationships.py new file mode 100644 index 0000000..d515cf7 --- /dev/null +++ b/leadforge/schema/relationships.py @@ -0,0 +1,73 @@ +"""Foreign-key relationship definitions and validation helpers. + +Describes the canonical FK graph for the v1 relational model and provides +:func:`validate_fk` to assert referential integrity on a collection of rows +before they are written to Parquet. +""" + +from __future__ import annotations + +from dataclasses import dataclass + +from leadforge.core.exceptions import LeadforgeError + + +class FKViolationError(LeadforgeError): + """Raised when a foreign-key constraint is violated in synthetic data.""" + + +@dataclass(frozen=True) +class FKConstraint: + """Describes one foreign-key relationship between two tables.""" + + child_table: str + child_column: str + parent_table: str + parent_column: str + + +# All v1 FK constraints, derived from §9.2 of the architecture spec. +ALL_CONSTRAINTS: tuple[FKConstraint, ...] = ( + FKConstraint("contacts", "account_id", "accounts", "account_id"), + FKConstraint("leads", "account_id", "accounts", "account_id"), + FKConstraint("leads", "contact_id", "contacts", "contact_id"), + FKConstraint("touches", "lead_id", "leads", "lead_id"), + FKConstraint("sessions", "lead_id", "leads", "lead_id"), + FKConstraint("sales_activities", "lead_id", "leads", "lead_id"), + FKConstraint("opportunities", "lead_id", "leads", "lead_id"), + FKConstraint("customers", "opportunity_id", "opportunities", "opportunity_id"), + FKConstraint("customers", "account_id", "accounts", "account_id"), + FKConstraint("subscriptions", "customer_id", "customers", "customer_id"), +) + + +def validate_fk( + child_values: list[str], + parent_values: set[str], + constraint: FKConstraint, +) -> None: + """Assert every value in *child_values* exists in *parent_values*. + + Args: + child_values: All FK column values from the child table. + parent_values: All PK values from the parent table. + constraint: The :class:`FKConstraint` being checked (used in the + error message). + + Raises: + FKViolationError: if any child value is absent from *parent_values*. + """ + _max_sample = 5 + orphan_count = 0 + orphan_sample: list[str] = [] + for v in child_values: + if v not in parent_values: + orphan_count += 1 + if len(orphan_sample) < _max_sample: + orphan_sample.append(v) + if orphan_count: + raise FKViolationError( + f"FK violation: {constraint.child_table}.{constraint.child_column} " + f"→ {constraint.parent_table}.{constraint.parent_column}: " + f"{orphan_count} orphan(s), e.g. {orphan_sample}" + ) diff --git a/leadforge/schema/tables.py b/leadforge/schema/tables.py new file mode 100644 index 0000000..339eefd --- /dev/null +++ b/leadforge/schema/tables.py @@ -0,0 +1,35 @@ +"""Parquet serialization helpers for schema-conformant DataFrames. + +These utilities are used by the rendering layer (M7+) and by tests to verify +that empty tables can be round-tripped through Parquet without schema loss. +""" + +from __future__ import annotations + +from pathlib import Path + +import pandas as pd + + +def write_parquet(df: pd.DataFrame, path: Path) -> None: + """Write *df* to a Parquet file at *path*, creating parent directories. + + Args: + df: DataFrame to serialize. Should be created via an entity class's + ``empty_dataframe()`` or populated with real simulation rows. + path: Destination ``.parquet`` file path. + """ + path.parent.mkdir(parents=True, exist_ok=True) + df.to_parquet(path, index=False, engine="pyarrow") + + +def read_parquet(path: Path) -> pd.DataFrame: + """Read a Parquet file back into a DataFrame. + + Args: + path: Path to the ``.parquet`` file. + + Returns: + A ``pd.DataFrame`` with columns as stored in the file. + """ + return pd.read_parquet(path, engine="pyarrow") diff --git a/leadforge/schema/tasks.py b/leadforge/schema/tasks.py new file mode 100644 index 0000000..c93b155 --- /dev/null +++ b/leadforge/schema/tasks.py @@ -0,0 +1,106 @@ +"""Task manifest definition for the primary v1 classification task. + +A :class:`TaskManifest` describes everything needed to reconstruct the task +from the output bundle: the label column, the time window, the split ratios, +and the table it lives in. +""" + +from __future__ import annotations + +from dataclasses import dataclass + + +@dataclass(frozen=True) +class SplitSpec: + """Train / validation / test proportions for a task. + + Attributes: + train: Fraction of rows allocated to the training split. + valid: Fraction allocated to validation. + test: Fraction allocated to test. + + Raises: + ValueError: if the three fractions do not sum to 1.0 (within 1e-6). + """ + + train: float + valid: float + test: float + + def __post_init__(self) -> None: + import math + + for name, value in (("train", self.train), ("valid", self.valid), ("test", self.test)): + if ( + isinstance(value, bool) + or not isinstance(value, (int, float)) # noqa: UP038 + or math.isnan(value) + or math.isinf(value) + ): + raise ValueError(f"SplitSpec.{name} must be a finite number, got {value!r}") + if not (0.0 <= value <= 1.0): + raise ValueError(f"SplitSpec.{name} must be in [0, 1], got {value}") + total = self.train + self.valid + self.test + if abs(total - 1.0) > 1e-6: + raise ValueError(f"SplitSpec fractions must sum to 1.0, got {total:.6f}") + + +@dataclass(frozen=True) +class TaskManifest: + """Immutable descriptor for one ML task exported from a bundle. + + Attributes: + task_id: Machine-readable task identifier. + label_column: Column name in the task Parquet files that holds the + binary label. + label_window_days: Number of days after the snapshot anchor date + within which a conversion event counts as positive. + primary_table: The relational table the snapshot rows are derived + from (usually ``"leads"``). + split: Train/valid/test proportions. + task_type: ML task type string (``"binary_classification"`` for v1). + description: Human-readable description of the task. + """ + + task_id: str + label_column: str + label_window_days: int + primary_table: str + split: SplitSpec + task_type: str = "binary_classification" + description: str = "" + + def to_dict(self) -> dict[str, object]: + """Return a JSON-serializable representation.""" + return { + "task_id": self.task_id, + "task_type": self.task_type, + "label_column": self.label_column, + "label_window_days": self.label_window_days, + "primary_table": self.primary_table, + "split": { + "train": self.split.train, + "valid": self.split.valid, + "test": self.split.test, + }, + "description": self.description, + } + + +# --------------------------------------------------------------------------- +# v1 task definition +# --------------------------------------------------------------------------- + +CONVERTED_WITHIN_90_DAYS: TaskManifest = TaskManifest( + task_id="converted_within_90_days", + label_column="converted_within_90_days", + label_window_days=90, + primary_table="leads", + split=SplitSpec(train=0.7, valid=0.15, test=0.15), + task_type="binary_classification", + description=( + "Predict whether a lead converts (closed_won event) within 90 days " + "of the snapshot anchor date. Label is event-derived — never sampled " + "directly. All features are pre-anchor (leakage-free by construction)." + ), +) diff --git a/pyproject.toml b/pyproject.toml index 1a4d6b2..c8f29a0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,8 @@ classifiers = [ dependencies = [ "typer[all]>=0.12", "pyyaml>=6.0", + "pandas>=2.0", + "pyarrow>=14.0", ] [project.optional-dependencies] @@ -65,5 +67,9 @@ disallow_incomplete_defs = true check_untyped_defs = true no_implicit_optional = true +[[tool.mypy.overrides]] +module = ["pandas", "pandas.*", "pyarrow", "pyarrow.*"] +ignore_missing_imports = true + [tool.pytest.ini_options] testpaths = ["tests"] diff --git a/tests/schema/__init__.py b/tests/schema/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/schema/test_entities.py b/tests/schema/test_entities.py new file mode 100644 index 0000000..e3d6f2a --- /dev/null +++ b/tests/schema/test_entities.py @@ -0,0 +1,176 @@ +"""Tests for leadforge.schema.entities — row contracts and empty DataFrames.""" + +from pathlib import Path + +import pandas as pd +import pytest + +from leadforge.schema.entities import ( + ALL_ROW_TYPES, + TABLE_NAMES, + AccountRow, + ContactRow, + LeadRow, + SessionRow, + TouchRow, +) +from leadforge.schema.tables import read_parquet, write_parquet + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_account() -> AccountRow: + return AccountRow( + account_id="acct_000001", + company_name="Acme Corp", + industry="manufacturing", + region="US-East", + employee_band="100-250", + estimated_revenue_band="10M-50M", + process_maturity_band="medium", + created_at="2024-01-15", + ) + + +def _make_contact() -> ContactRow: + return ContactRow( + contact_id="cnt_000001", + account_id="acct_000001", + job_title="VP Finance", + role_function="finance", + seniority="vp", + buyer_role="economic_buyer", + email_domain_type="corporate", + created_at="2024-01-20", + ) + + +def _make_lead() -> LeadRow: + return LeadRow( + lead_id="lead_000001", + contact_id="cnt_000001", + account_id="acct_000001", + lead_created_at="2024-02-01", + lead_source="inbound_form", + first_touch_channel="organic_search", + current_stage="mql", + owner_rep_id="rep_000001", + is_mql=True, + is_sql=False, + converted_within_90_days=False, + conversion_timestamp=None, + ) + + +# --------------------------------------------------------------------------- +# to_dict +# --------------------------------------------------------------------------- + + +def test_account_to_dict_contains_all_columns() -> None: + row = _make_account() + d = row.to_dict() + assert set(d.keys()) == set(AccountRow.DTYPE_MAP.keys()) + + +def test_lead_to_dict_nullable_is_none() -> None: + row = _make_lead() + assert row.to_dict()["conversion_timestamp"] is None + + +def test_touch_to_dict_nullable_campaign_id() -> None: + row = TouchRow( + touch_id="touch_000001", + lead_id="lead_000001", + touch_timestamp="2024-02-05T10:00:00", + touch_type="email", + touch_channel="outbound", + touch_direction="outbound", + campaign_id=None, + ) + assert row.to_dict()["campaign_id"] is None + + +# --------------------------------------------------------------------------- +# empty_dataframe — columns and dtypes +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize("cls", ALL_ROW_TYPES) +def test_empty_dataframe_has_correct_columns(cls: type) -> None: + df = cls.empty_dataframe() # type: ignore[attr-defined] + assert list(df.columns) == list(cls.DTYPE_MAP.keys()) # type: ignore[attr-defined] + + +@pytest.mark.parametrize("cls", ALL_ROW_TYPES) +def test_empty_dataframe_has_zero_rows(cls: type) -> None: + assert len(cls.empty_dataframe()) == 0 # type: ignore[attr-defined] + + +def test_lead_empty_dataframe_boolean_columns() -> None: + df = LeadRow.empty_dataframe() + assert str(df["is_mql"].dtype) == "boolean" + assert str(df["converted_within_90_days"].dtype) == "boolean" + + +def test_session_empty_dataframe_int_columns() -> None: + df = SessionRow.empty_dataframe() + assert str(df["page_views"].dtype) == "Int64" + + +# --------------------------------------------------------------------------- +# Parquet round-trip +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize("cls", ALL_ROW_TYPES) +def test_empty_dataframe_parquet_roundtrip(cls: type, tmp_path: Path) -> None: + df = cls.empty_dataframe() # type: ignore[attr-defined] + path = tmp_path / f"{cls.TABLE_NAME}.parquet" # type: ignore[attr-defined] + write_parquet(df, path) + restored = read_parquet(path) + assert list(restored.columns) == list(df.columns) + assert len(restored) == 0 + + +def test_lead_rows_parquet_roundtrip(tmp_path: Path) -> None: + lead = _make_lead() + df = pd.DataFrame([lead.to_dict()]) + # cast to declared dtypes + for col, dtype in LeadRow.DTYPE_MAP.items(): + df[col] = df[col].astype(dtype) + path = tmp_path / "leads.parquet" + write_parquet(df, path) + restored = read_parquet(path) + assert restored["lead_id"].iloc[0] == "lead_000001" + assert bool(restored["is_mql"].iloc[0]) is True + + +# --------------------------------------------------------------------------- +# Registry +# --------------------------------------------------------------------------- + + +def test_all_row_types_covers_nine_tables() -> None: + assert len(ALL_ROW_TYPES) == 9 + + +def test_table_names_unique() -> None: + assert len(set(TABLE_NAMES)) == len(TABLE_NAMES) + + +def test_table_names_expected_values() -> None: + expected = { + "accounts", + "contacts", + "leads", + "touches", + "sessions", + "sales_activities", + "opportunities", + "customers", + "subscriptions", + } + assert set(TABLE_NAMES) == expected diff --git a/tests/schema/test_features.py b/tests/schema/test_features.py new file mode 100644 index 0000000..e51d992 --- /dev/null +++ b/tests/schema/test_features.py @@ -0,0 +1,116 @@ +"""Tests for leadforge.schema.features and leadforge.schema.dictionaries.""" + +import dataclasses +from pathlib import Path + +import pandas as pd +import pytest + +from leadforge.schema.dictionaries import feature_dictionary_df, write_feature_dictionary +from leadforge.schema.features import LEAD_SNAPSHOT_FEATURES, FeatureSpec + +# --------------------------------------------------------------------------- +# FeatureSpec +# --------------------------------------------------------------------------- + + +def test_feature_spec_is_frozen() -> None: + f = FeatureSpec("x", "string", "desc", "account") + with pytest.raises(dataclasses.FrozenInstanceError): + f.name = "y" # type: ignore[misc] + + +def test_lead_snapshot_features_non_empty() -> None: + assert len(LEAD_SNAPSHOT_FEATURES) > 0 + + +def test_exactly_one_target_feature() -> None: + targets = [f for f in LEAD_SNAPSHOT_FEATURES if f.is_target] + assert len(targets) == 1 + assert targets[0].name == "converted_within_90_days" + + +def test_target_is_last_feature() -> None: + assert LEAD_SNAPSHOT_FEATURES[-1].is_target + + +def test_all_feature_names_unique() -> None: + names = [f.name for f in LEAD_SNAPSHOT_FEATURES] + assert len(names) == len(set(names)) + + +def test_all_dtypes_are_valid_strings() -> None: + valid = {"string", "Int64", "Float64", "boolean"} + for f in LEAD_SNAPSHOT_FEATURES: + assert f.dtype in valid, f"{f.name} has unknown dtype {f.dtype!r}" + + +def test_all_categories_are_known() -> None: + valid = {"account", "contact", "lead_meta", "engagement", "sales", "target"} + for f in LEAD_SNAPSHOT_FEATURES: + assert f.category in valid, f"{f.name} has unknown category {f.category!r}" + + +def test_target_feature_category_is_target() -> None: + for f in LEAD_SNAPSHOT_FEATURES: + if f.is_target: + assert f.category == "target" + + +def test_no_leakage_risk_on_target() -> None: + for f in LEAD_SNAPSHOT_FEATURES: + if f.is_target: + assert not f.leakage_risk + + +# --------------------------------------------------------------------------- +# feature_dictionary_df +# --------------------------------------------------------------------------- + + +def test_feature_dictionary_df_returns_dataframe() -> None: + df = feature_dictionary_df() + assert isinstance(df, pd.DataFrame) + + +def test_feature_dictionary_df_row_count_matches_features() -> None: + df = feature_dictionary_df() + assert len(df) == len(LEAD_SNAPSHOT_FEATURES) + + +def test_feature_dictionary_df_columns() -> None: + df = feature_dictionary_df() + expected = {"name", "dtype", "description", "category", "is_target", "leakage_risk"} + assert set(df.columns) == expected + + +def test_feature_dictionary_df_target_row() -> None: + df = feature_dictionary_df() + target_rows = df[df["is_target"]] + assert len(target_rows) == 1 + assert target_rows.iloc[0]["name"] == "converted_within_90_days" + + +# --------------------------------------------------------------------------- +# write_feature_dictionary +# --------------------------------------------------------------------------- + + +def test_write_feature_dictionary_creates_file(tmp_path: Path) -> None: + out = tmp_path / "feature_dictionary.csv" + write_feature_dictionary(out) + assert out.exists() + + +def test_write_feature_dictionary_csv_readable(tmp_path: Path) -> None: + out = tmp_path / "feature_dictionary.csv" + write_feature_dictionary(out) + df = pd.read_csv(out) + assert len(df) == len(LEAD_SNAPSHOT_FEATURES) + assert "name" in df.columns + + +def test_write_feature_dictionary_creates_parent_dirs(tmp_path: Path) -> None: + out = tmp_path / "deep" / "nested" / "feature_dictionary.csv" + write_feature_dictionary(out) + assert out.exists() diff --git a/tests/schema/test_ids.py b/tests/schema/test_ids.py new file mode 100644 index 0000000..3693d39 --- /dev/null +++ b/tests/schema/test_ids.py @@ -0,0 +1,63 @@ +"""Tests for leadforge.core.ids.""" + +import pytest + +from leadforge.core.ids import ID_PREFIXES, make_id + + +def test_make_id_format() -> None: + assert make_id("acct", 1) == "acct_000001" + assert make_id("lead", 999) == "lead_000999" + assert make_id("acct", 1_000_000) == "acct_1000000" + + +def test_make_id_zero_padded_to_six_digits() -> None: + result = make_id("cnt", 42) + assert result == "cnt_000042" + assert len(result.split("_")[1]) >= 6 + + +def test_make_id_deterministic() -> None: + assert make_id("opp", 7) == make_id("opp", 7) + + +def test_make_id_rejects_zero() -> None: + with pytest.raises(ValueError, match="positive int"): + make_id("acct", 0) + + +def test_make_id_rejects_negative() -> None: + with pytest.raises(ValueError, match="positive int"): + make_id("acct", -1) + + +def test_make_id_rejects_bool() -> None: + with pytest.raises(ValueError, match="positive int"): + make_id("acct", True) # type: ignore[arg-type] + + +def test_make_id_rejects_float() -> None: + with pytest.raises((ValueError, TypeError)): + make_id("acct", 1.0) # type: ignore[arg-type] + + +def test_id_prefixes_covers_all_entities() -> None: + expected = { + "account", + "contact", + "lead", + "touch", + "session", + "sales_activity", + "opportunity", + "customer", + "subscription", + "rep", + } + assert set(ID_PREFIXES.keys()) == expected + + +def test_id_prefixes_values_are_strings() -> None: + for key, prefix in ID_PREFIXES.items(): + assert isinstance(prefix, str), f"{key} prefix is not a string" + assert len(prefix) > 0 diff --git a/tests/schema/test_relationships.py b/tests/schema/test_relationships.py new file mode 100644 index 0000000..8e8118f --- /dev/null +++ b/tests/schema/test_relationships.py @@ -0,0 +1,59 @@ +"""Tests for leadforge.schema.relationships — FK constraints and validation.""" + +import dataclasses + +import pytest + +from leadforge.schema.relationships import ( + ALL_CONSTRAINTS, + FKConstraint, + FKViolationError, + validate_fk, +) + + +def test_all_constraints_count() -> None: + assert len(ALL_CONSTRAINTS) == 10 + + +def test_all_constraints_are_fk_constraint_instances() -> None: + for c in ALL_CONSTRAINTS: + assert isinstance(c, FKConstraint) + + +def test_contacts_has_account_fk() -> None: + c = next(c for c in ALL_CONSTRAINTS if c.child_table == "contacts") + assert c.parent_table == "accounts" + assert c.parent_column == "account_id" + + +def test_validate_fk_passes_when_all_present() -> None: + constraint = FKConstraint("contacts", "account_id", "accounts", "account_id") + parent_ids = {"acct_000001", "acct_000002"} + child_values = ["acct_000001", "acct_000002", "acct_000001"] + validate_fk(child_values, parent_ids, constraint) # should not raise + + +def test_validate_fk_raises_on_orphan() -> None: + constraint = FKConstraint("leads", "account_id", "accounts", "account_id") + parent_ids = {"acct_000001"} + child_values = ["acct_000001", "acct_MISSING"] + with pytest.raises(FKViolationError, match="orphan"): + validate_fk(child_values, parent_ids, constraint) + + +def test_validate_fk_error_message_contains_table_names() -> None: + constraint = FKConstraint("touches", "lead_id", "leads", "lead_id") + with pytest.raises(FKViolationError, match="touches"): + validate_fk(["lead_MISSING"], set(), constraint) + + +def test_validate_fk_empty_child_passes() -> None: + constraint = FKConstraint("sessions", "lead_id", "leads", "lead_id") + validate_fk([], {"lead_000001"}, constraint) + + +def test_fk_constraint_is_frozen() -> None: + c = FKConstraint("a", "b", "c", "d") + with pytest.raises(dataclasses.FrozenInstanceError): + c.child_table = "x" # type: ignore[misc] diff --git a/tests/schema/test_tasks.py b/tests/schema/test_tasks.py new file mode 100644 index 0000000..4039a6b --- /dev/null +++ b/tests/schema/test_tasks.py @@ -0,0 +1,104 @@ +"""Tests for leadforge.schema.tasks — TaskManifest and SplitSpec.""" + +import dataclasses + +import pytest + +from leadforge.schema.tasks import CONVERTED_WITHIN_90_DAYS, SplitSpec + +# --------------------------------------------------------------------------- +# SplitSpec +# --------------------------------------------------------------------------- + + +def test_split_spec_valid() -> None: + s = SplitSpec(train=0.7, valid=0.15, test=0.15) + assert s.train == pytest.approx(0.7) + + +def test_split_spec_rejects_bool_component() -> None: + with pytest.raises(ValueError, match="finite number"): + SplitSpec(train=True, valid=0.15, test=0.15) # type: ignore[arg-type] + + +def test_split_spec_rejects_bad_sum() -> None: + with pytest.raises(ValueError, match="sum"): + SplitSpec(train=0.6, valid=0.2, test=0.1) + + +def test_split_spec_rejects_negative_component() -> None: + with pytest.raises(ValueError, match=r"\[0, 1\]"): + SplitSpec(train=1.2, valid=-0.1, test=-0.1) + + +def test_split_spec_rejects_component_above_one() -> None: + with pytest.raises(ValueError, match=r"\[0, 1\]"): + SplitSpec(train=1.1, valid=0.0, test=0.0) + + +def test_split_spec_frozen() -> None: + s = SplitSpec(0.7, 0.15, 0.15) + with pytest.raises(dataclasses.FrozenInstanceError): + s.train = 0.5 # type: ignore[misc] + + +# --------------------------------------------------------------------------- +# TaskManifest +# --------------------------------------------------------------------------- + + +def test_task_manifest_frozen() -> None: + with pytest.raises(dataclasses.FrozenInstanceError): + CONVERTED_WITHIN_90_DAYS.task_id = "other" # type: ignore[misc] + + +def test_converted_within_90_days_id() -> None: + assert CONVERTED_WITHIN_90_DAYS.task_id == "converted_within_90_days" + + +def test_converted_within_90_days_label_column() -> None: + assert CONVERTED_WITHIN_90_DAYS.label_column == "converted_within_90_days" + + +def test_converted_within_90_days_window() -> None: + assert CONVERTED_WITHIN_90_DAYS.label_window_days == 90 + + +def test_converted_within_90_days_task_type() -> None: + assert CONVERTED_WITHIN_90_DAYS.task_type == "binary_classification" + + +def test_converted_within_90_days_primary_table() -> None: + assert CONVERTED_WITHIN_90_DAYS.primary_table == "leads" + + +def test_converted_within_90_days_split_sums_to_one() -> None: + s = CONVERTED_WITHIN_90_DAYS.split + assert s.train + s.valid + s.test == pytest.approx(1.0) + + +def test_task_manifest_to_dict_keys() -> None: + d = CONVERTED_WITHIN_90_DAYS.to_dict() + expected = { + "task_id", + "task_type", + "label_column", + "label_window_days", + "primary_table", + "split", + "description", + } + assert set(d.keys()) == expected + + +def test_task_manifest_to_dict_split_is_dict() -> None: + d = CONVERTED_WITHIN_90_DAYS.to_dict() + assert isinstance(d["split"], dict) + assert set(d["split"].keys()) == {"train", "valid", "test"} + + +def test_task_manifest_to_dict_is_json_serializable() -> None: + import json + + d = CONVERTED_WITHIN_90_DAYS.to_dict() + json.dumps(d) # should not raise