From 3dec309d7a43346bcaaf9d53d096283a502aa87e Mon Sep 17 00:00:00 2001 From: charlie83Gs Date: Mon, 20 Apr 2026 11:56:47 -0600 Subject: [PATCH] feat(kt-config): GraphMigration planner (Phase 7 #56) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pure function walking a GraphTypePlugin's declared migrations and returning the ordered list that advances from_version to plugin.current_version. No Hatchet, no DB, no side effects — consumed by the Phase 7 graph_migration_wf, which dispatches one migration at a time and persists audit rows between hops. Deliberately under-specified on execution concerns: - does NOT check that migrations are runnable - does NOT check if a migration was already applied - does NOT enforce downgrade policy beyond rejecting them Those belong to the workflow; the planner only answers "what migrations, in what order, does the plugin declare for this walk?" Error cases: - from_version < 1 raises (v1 is the baseline) - from_version > plugin.current_version raises (downgrades are unsupported; framework is forward-only by the to_version == from_version + 1 invariant) - from_version == current returns [] (startup auto-dispatch's "no migration needed" signal) Migrations within a hop preserve declared order (plugin author encodes dependency via list position). Out-of-window migrations are silently filtered — a plugin may ship a staged v3 → v4 behind a still-at-v3 current_version. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.7 (1M context) --- libs/kt-config/src/kt_config/migrations.py | 96 ++++++++ .../kt-config/tests/test_migration_planner.py | 226 ++++++++++++++++++ 2 files changed, 322 insertions(+) create mode 100644 libs/kt-config/src/kt_config/migrations.py create mode 100644 libs/kt-config/tests/test_migration_planner.py diff --git a/libs/kt-config/src/kt_config/migrations.py b/libs/kt-config/src/kt_config/migrations.py new file mode 100644 index 00000000..fb9da701 --- /dev/null +++ b/libs/kt-config/src/kt_config/migrations.py @@ -0,0 +1,96 @@ +"""Migration planner for :class:`GraphTypePlugin`. + +Walks the declared migrations of a registered graph-type plugin and +returns the ordered list that advances a graph from +``from_version`` to ``plugin.current_version``. Pure function — no +Hatchet, no DB, no side effects. The Phase 7 ``graph_migration_wf`` +consumes this output to dispatch one migration at a time, persisting +audit rows between hops. + +The planner is deliberately *under*-specified: it does **not** check +that every migration is runnable, that the target graph still exists, +or that an earlier migration run already covered the hop — those +belong to the execution workflow. This module only answers "what +migrations, in what order, does the plugin declare for this version +walk?" so the workflow can skip already-applied hops via the +``graph_migration_runs`` audit table. + +Contract invariants on the plugin (already enforced at +:meth:`PluginRegistry.register_graph_type`): + +* ``current_version >= 1`` +* every declared migration has ``to_version == from_version + 1`` +* every hop from v1 to ``current_version - 1`` has at least one + migration + +The planner therefore trusts the shape of its input. Feeding it an +unvalidated plugin (e.g. in a test bypassing the registry) is a +programming error that may produce an unsorted or incomplete plan. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from kt_config.plugin import GraphMigration, GraphTypePlugin + + +def plan_migrations( + plugin: "GraphTypePlugin", + from_version: int, +) -> list["GraphMigration"]: + """Return the ordered migration plan advancing ``from_version`` to + ``plugin.current_version``. + + Migrations within the same hop run in their declared order (the + order returned by :meth:`GraphTypePlugin.migrations`). Plugins + that ship multiple migrations per hop — e.g. a main data migration + + a follow-up fix-up — are expected to list them in execution + order; the planner preserves it. + + Returns ``[]`` when the graph is already at current. Raises + ``ValueError`` on: + + * ``from_version < 1`` — there is no pre-v1 plugin state to walk + from; v1 is the baseline. + * ``from_version > plugin.current_version`` — this would be a + downgrade. The framework intentionally does not support + downgrades; every migration is forward-only by the invariant + ``to_version == from_version + 1``. A graph ahead of the + plugin's current version almost always means the plugin was + rolled back — operators resolve by re-deploying the newer + plugin, not by downgrading the graph. + + A plan is also returned when the plugin's :meth:`migrations` + declares more rows than the walk needs (e.g. multiple migrations + for a single hop, or migrations for hops outside the + ``[from_version, current_version)`` range): only migrations whose + ``from_version`` falls in the walk window are included. Out-of- + window migrations are silently filtered — they may belong to + older hops kept for re-migrate, or to forward hops still under + development. + """ + if from_version < 1: + raise ValueError(f"from_version must be >= 1 (got {from_version})") + current = plugin.current_version + if from_version > current: + raise ValueError( + f"from_version {from_version} is ahead of plugin current_version {current} — " + f"downgrades are not supported. Re-deploy the newer plugin to advance the graph." + ) + if from_version == current: + return [] + + window = range(from_version, current) # hops: from → from+1, …, current-1 → current + declared = plugin.migrations() + # Bucket by source version, preserving declared order within each hop. + by_hop: dict[int, list["GraphMigration"]] = {v: [] for v in window} + for migration in declared: + if migration.from_version in by_hop: + by_hop[migration.from_version].append(migration) + + plan: list["GraphMigration"] = [] + for hop in window: + plan.extend(by_hop[hop]) + return plan diff --git a/libs/kt-config/tests/test_migration_planner.py b/libs/kt-config/tests/test_migration_planner.py new file mode 100644 index 00000000..5aafb0df --- /dev/null +++ b/libs/kt-config/tests/test_migration_planner.py @@ -0,0 +1,226 @@ +"""Tests for :func:`kt_config.migrations.plan_migrations`.""" + +from __future__ import annotations + +from typing import Any + +import pytest + +from kt_config.migrations import plan_migrations +from kt_config.plugin import GraphTypeComposition, GraphTypePlugin + + +class _MigrationStub: + """Minimal GraphMigration satisfying the Protocol shape.""" + + def __init__(self, migration_id: str, from_version: int) -> None: + self.migration_id = migration_id + self.from_version = from_version + self.to_version = from_version + 1 + self.description = f"stub {migration_id}" + + async def run(self, ctx: Any) -> None: # noqa: ARG002 + return None + + +class _StubPlugin(GraphTypePlugin): + """Bypasses PluginRegistry to let tests construct arbitrary shapes. + + The registry enforces hop-coverage at registration; these tests exercise + the planner directly and must exercise some shapes (multi-hop, + out-of-window migrations) that the registry would itself reject + indirectly via the declaration. + """ + + def __init__( + self, + *, + current_version: int, + migrations_: list[_MigrationStub], + ) -> None: + self._current = current_version + self._migrations = migrations_ + + @property + def graph_type_id(self) -> str: + return "stub" + + @property + def display_name(self) -> str: + return "Stub" + + @property + def current_version(self) -> int: + return self._current + + def composition(self) -> GraphTypeComposition: # pragma: no cover — planner never reads + return GraphTypeComposition( + fetch_chain=[], + search_providers=[], + fact_decomposition="", + concept_extractor="", + disambiguation="", + seed_multiplex="", + seed_promotion="", + dimensions="", + definition="", + relations="", + sync="", + source_cache="", + source_contribution="", + agentic_tasks={}, + ) + + def migrations(self): # type: ignore[override] + return list(self._migrations) + + +# ── Trivial cases ──────────────────────────────────────────────────────── + + +def test_plan_is_empty_when_already_at_current() -> None: + """A graph at the plugin's current_version needs no work — empty plan, + not a raise. This is the canonical "nothing to migrate" path the + startup auto-dispatch relies on to decide whether to enqueue a run.""" + plugin = _StubPlugin(current_version=3, migrations_=[]) + assert plan_migrations(plugin, from_version=3) == [] + + +def test_plan_for_plugin_at_v1_and_graph_at_v1_is_empty() -> None: + """Baseline: freshly-created graph, unversioned plugin. No migrations + declared, no walk needed.""" + plugin = _StubPlugin(current_version=1, migrations_=[]) + assert plan_migrations(plugin, from_version=1) == [] + + +# ── Happy path ──────────────────────────────────────────────────────────── + + +def test_plan_single_hop() -> None: + m1 = _MigrationStub("m1", from_version=1) + plugin = _StubPlugin(current_version=2, migrations_=[m1]) + plan = plan_migrations(plugin, from_version=1) + assert plan == [m1] + + +def test_plan_multi_hop_preserves_hop_order() -> None: + """v1 → v2 → v3 → v4 walk returns migrations in hop order regardless + of how the plugin ordered its declaration list.""" + m3 = _MigrationStub("m3", from_version=3) # v3 → v4 + m1 = _MigrationStub("m1", from_version=1) # v1 → v2 + m2 = _MigrationStub("m2", from_version=2) # v2 → v3 + plugin = _StubPlugin(current_version=4, migrations_=[m3, m1, m2]) # declared out of order + plan = plan_migrations(plugin, from_version=1) + assert [m.migration_id for m in plan] == ["m1", "m2", "m3"] + + +def test_plan_preserves_declaration_order_within_hop() -> None: + """Multiple migrations on the same hop (e.g. data migration + fix-up) + MUST run in declared order — the plugin author encoded the + dependency in the list. Planner preserves it.""" + m1a = _MigrationStub("m1a", from_version=1) + m1b = _MigrationStub("m1b", from_version=1) + m1c = _MigrationStub("m1c", from_version=1) + plugin = _StubPlugin(current_version=2, migrations_=[m1a, m1b, m1c]) + plan = plan_migrations(plugin, from_version=1) + assert [m.migration_id for m in plan] == ["m1a", "m1b", "m1c"] + + +def test_plan_partial_walk_starting_mid_range() -> None: + """Graph already at v2, plugin at v4 — plan covers v2 → v3 → v4 only.""" + m1 = _MigrationStub("m1", from_version=1) + m2 = _MigrationStub("m2", from_version=2) + m3 = _MigrationStub("m3", from_version=3) + plugin = _StubPlugin(current_version=4, migrations_=[m1, m2, m3]) + plan = plan_migrations(plugin, from_version=2) + assert [m.migration_id for m in plan] == ["m2", "m3"] + + +# ── Error cases ─────────────────────────────────────────────────────────── + + +def test_plan_rejects_downgrade() -> None: + """from_version ahead of current = downgrade. Framework is forward-only + (every migration is ``to = from + 1``). Catch here so operators see a + clear error instead of a silent no-op.""" + plugin = _StubPlugin(current_version=2, migrations_=[]) + with pytest.raises(ValueError, match="downgrades are not supported"): + plan_migrations(plugin, from_version=3) + + +def test_plan_rejects_zero_from_version() -> None: + """v1 is the baseline — there is no pre-v1 state to walk from.""" + plugin = _StubPlugin(current_version=3, migrations_=[]) + with pytest.raises(ValueError, match="from_version must be >= 1"): + plan_migrations(plugin, from_version=0) + + +def test_plan_rejects_negative_from_version() -> None: + plugin = _StubPlugin(current_version=3, migrations_=[]) + with pytest.raises(ValueError, match="from_version must be >= 1"): + plan_migrations(plugin, from_version=-1) + + +# ── Out-of-window filtering ────────────────────────────────────────────── + + +def test_plan_ignores_migrations_before_window() -> None: + """Graph at v3, plugin at v4: the v1 → v2 migration is behind us and + must not be re-run. Planner filters by window, not by 'has this + migration been applied?' (that's the workflow's job via the audit + table).""" + m1 = _MigrationStub("m1", from_version=1) # behind the window + m3 = _MigrationStub("m3", from_version=3) # in window + plugin = _StubPlugin(current_version=4, migrations_=[m1, m3]) + plan = plan_migrations(plugin, from_version=3) + assert [m.migration_id for m in plan] == ["m3"] + + +def test_plan_ignores_migrations_beyond_current() -> None: + """A plugin may keep a v3 → v4 migration checked in while + ``current_version`` still reads 3 (pre-ship review). The planner + MUST NOT run unpublished hops — the plugin explicitly shipped at v3. + Filter by window.""" + m1 = _MigrationStub("m1", from_version=1) + m_future = _MigrationStub("m3", from_version=3) # beyond current + plugin = _StubPlugin(current_version=3, migrations_=[m1, _MigrationStub("m2", 2), m_future]) + plan = plan_migrations(plugin, from_version=1) + assert [m.migration_id for m in plan] == ["m1", "m2"] + + +def test_plan_ignores_migration_with_out_of_range_from_version() -> None: + """A migration declaring from_version=0 or from_version=current is + pathological — the planner silently drops them rather than raising, + matching the documented 'only walk the window' behaviour. The + registry's contract validator is the right place to reject these + *at registration*; the planner trusts its input.""" + valid = _MigrationStub("valid", from_version=1) + out_of_range_low = _MigrationStub("low", from_version=0) + out_of_range_high = _MigrationStub("high", from_version=2) # target = 3, beyond current=2 + plugin = _StubPlugin(current_version=2, migrations_=[out_of_range_low, valid, out_of_range_high]) + plan = plan_migrations(plugin, from_version=1) + assert [m.migration_id for m in plan] == ["valid"] + + +# ── Interaction with validator ─────────────────────────────────────────── + + +def test_planner_output_shape_matches_registry_hop_contract() -> None: + """Sanity: for a plugin that would pass + :meth:`PluginRegistry.register_graph_type` validation (every hop + covered, single-step hops, current >= 1), the planner produces a + contiguous list of migrations whose from_version values walk the + window in order. Guards against a future refactor where one side + drifts from the other.""" + plugin = _StubPlugin( + current_version=5, + migrations_=[ + _MigrationStub("m1", 1), + _MigrationStub("m2", 2), + _MigrationStub("m3", 3), + _MigrationStub("m4", 4), + ], + ) + plan = plan_migrations(plugin, from_version=1) + from_versions = [m.from_version for m in plan] + assert from_versions == [1, 2, 3, 4]