Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions libs/kt-config/src/kt_config/migrations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
"""Migration planner for :class:`GraphTypePlugin`.

Walks the declared migrations of a registered graph-type plugin and
returns the ordered list that advances a graph from
``from_version`` to ``plugin.current_version``. Pure function — no
Hatchet, no DB, no side effects. The Phase 7 ``graph_migration_wf``
consumes this output to dispatch one migration at a time, persisting
audit rows between hops.

The planner is deliberately *under*-specified: it does **not** check
that every migration is runnable, that the target graph still exists,
or that an earlier migration run already covered the hop — those
belong to the execution workflow. This module only answers "what
migrations, in what order, does the plugin declare for this version
walk?" so the workflow can skip already-applied hops via the
``graph_migration_runs`` audit table.

Contract invariants on the plugin (already enforced at
:meth:`PluginRegistry.register_graph_type`):

* ``current_version >= 1``
* every declared migration has ``to_version == from_version + 1``
* every hop from v1 to ``current_version - 1`` has at least one
migration

The planner therefore trusts the shape of its input. Feeding it an
unvalidated plugin (e.g. in a test bypassing the registry) is a
programming error that may produce an unsorted or incomplete plan.
"""

from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from kt_config.plugin import GraphMigration, GraphTypePlugin


def plan_migrations(
plugin: "GraphTypePlugin",
from_version: int,
) -> list["GraphMigration"]:
"""Return the ordered migration plan advancing ``from_version`` to
``plugin.current_version``.

Migrations within the same hop run in their declared order (the
order returned by :meth:`GraphTypePlugin.migrations`). Plugins
that ship multiple migrations per hop — e.g. a main data migration
+ a follow-up fix-up — are expected to list them in execution
order; the planner preserves it.

Returns ``[]`` when the graph is already at current. Raises
``ValueError`` on:

* ``from_version < 1`` — there is no pre-v1 plugin state to walk
from; v1 is the baseline.
* ``from_version > plugin.current_version`` — this would be a
downgrade. The framework intentionally does not support
downgrades; every migration is forward-only by the invariant
``to_version == from_version + 1``. A graph ahead of the
plugin's current version almost always means the plugin was
rolled back — operators resolve by re-deploying the newer
plugin, not by downgrading the graph.

A plan is also returned when the plugin's :meth:`migrations`
declares more rows than the walk needs (e.g. multiple migrations
for a single hop, or migrations for hops outside the
``[from_version, current_version)`` range): only migrations whose
``from_version`` falls in the walk window are included. Out-of-
window migrations are silently filtered — they may belong to
older hops kept for re-migrate, or to forward hops still under
development.
"""
if from_version < 1:
raise ValueError(f"from_version must be >= 1 (got {from_version})")
current = plugin.current_version
if from_version > current:
raise ValueError(
f"from_version {from_version} is ahead of plugin current_version {current} — "
f"downgrades are not supported. Re-deploy the newer plugin to advance the graph."
)
if from_version == current:
return []

window = range(from_version, current) # hops: from → from+1, …, current-1 → current
declared = plugin.migrations()
# Bucket by source version, preserving declared order within each hop.
by_hop: dict[int, list["GraphMigration"]] = {v: [] for v in window}
for migration in declared:
if migration.from_version in by_hop:
by_hop[migration.from_version].append(migration)

plan: list["GraphMigration"] = []
for hop in window:
plan.extend(by_hop[hop])
return plan
226 changes: 226 additions & 0 deletions libs/kt-config/tests/test_migration_planner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
"""Tests for :func:`kt_config.migrations.plan_migrations`."""

from __future__ import annotations

from typing import Any

import pytest

from kt_config.migrations import plan_migrations
from kt_config.plugin import GraphTypeComposition, GraphTypePlugin


class _MigrationStub:
"""Minimal GraphMigration satisfying the Protocol shape."""

def __init__(self, migration_id: str, from_version: int) -> None:
self.migration_id = migration_id
self.from_version = from_version
self.to_version = from_version + 1
self.description = f"stub {migration_id}"

async def run(self, ctx: Any) -> None: # noqa: ARG002
return None


class _StubPlugin(GraphTypePlugin):
"""Bypasses PluginRegistry to let tests construct arbitrary shapes.

The registry enforces hop-coverage at registration; these tests exercise
the planner directly and must exercise some shapes (multi-hop,
out-of-window migrations) that the registry would itself reject
indirectly via the declaration.
"""

def __init__(
self,
*,
current_version: int,
migrations_: list[_MigrationStub],
) -> None:
self._current = current_version
self._migrations = migrations_

@property
def graph_type_id(self) -> str:
return "stub"

@property
def display_name(self) -> str:
return "Stub"

@property
def current_version(self) -> int:
return self._current

def composition(self) -> GraphTypeComposition: # pragma: no cover — planner never reads
return GraphTypeComposition(
fetch_chain=[],
search_providers=[],
fact_decomposition="",
concept_extractor="",
disambiguation="",
seed_multiplex="",
seed_promotion="",
dimensions="",
definition="",
relations="",
sync="",
source_cache="",
source_contribution="",
agentic_tasks={},
)

def migrations(self): # type: ignore[override]
return list(self._migrations)


# ── Trivial cases ────────────────────────────────────────────────────────


def test_plan_is_empty_when_already_at_current() -> None:
"""A graph at the plugin's current_version needs no work — empty plan,
not a raise. This is the canonical "nothing to migrate" path the
startup auto-dispatch relies on to decide whether to enqueue a run."""
plugin = _StubPlugin(current_version=3, migrations_=[])
assert plan_migrations(plugin, from_version=3) == []


def test_plan_for_plugin_at_v1_and_graph_at_v1_is_empty() -> None:
"""Baseline: freshly-created graph, unversioned plugin. No migrations
declared, no walk needed."""
plugin = _StubPlugin(current_version=1, migrations_=[])
assert plan_migrations(plugin, from_version=1) == []


# ── Happy path ────────────────────────────────────────────────────────────


def test_plan_single_hop() -> None:
m1 = _MigrationStub("m1", from_version=1)
plugin = _StubPlugin(current_version=2, migrations_=[m1])
plan = plan_migrations(plugin, from_version=1)
assert plan == [m1]


def test_plan_multi_hop_preserves_hop_order() -> None:
"""v1 → v2 → v3 → v4 walk returns migrations in hop order regardless
of how the plugin ordered its declaration list."""
m3 = _MigrationStub("m3", from_version=3) # v3 → v4
m1 = _MigrationStub("m1", from_version=1) # v1 → v2
m2 = _MigrationStub("m2", from_version=2) # v2 → v3
plugin = _StubPlugin(current_version=4, migrations_=[m3, m1, m2]) # declared out of order
plan = plan_migrations(plugin, from_version=1)
assert [m.migration_id for m in plan] == ["m1", "m2", "m3"]


def test_plan_preserves_declaration_order_within_hop() -> None:
"""Multiple migrations on the same hop (e.g. data migration + fix-up)
MUST run in declared order — the plugin author encoded the
dependency in the list. Planner preserves it."""
m1a = _MigrationStub("m1a", from_version=1)
m1b = _MigrationStub("m1b", from_version=1)
m1c = _MigrationStub("m1c", from_version=1)
plugin = _StubPlugin(current_version=2, migrations_=[m1a, m1b, m1c])
plan = plan_migrations(plugin, from_version=1)
assert [m.migration_id for m in plan] == ["m1a", "m1b", "m1c"]


def test_plan_partial_walk_starting_mid_range() -> None:
"""Graph already at v2, plugin at v4 — plan covers v2 → v3 → v4 only."""
m1 = _MigrationStub("m1", from_version=1)
m2 = _MigrationStub("m2", from_version=2)
m3 = _MigrationStub("m3", from_version=3)
plugin = _StubPlugin(current_version=4, migrations_=[m1, m2, m3])
plan = plan_migrations(plugin, from_version=2)
assert [m.migration_id for m in plan] == ["m2", "m3"]


# ── Error cases ───────────────────────────────────────────────────────────


def test_plan_rejects_downgrade() -> None:
"""from_version ahead of current = downgrade. Framework is forward-only
(every migration is ``to = from + 1``). Catch here so operators see a
clear error instead of a silent no-op."""
plugin = _StubPlugin(current_version=2, migrations_=[])
with pytest.raises(ValueError, match="downgrades are not supported"):
plan_migrations(plugin, from_version=3)


def test_plan_rejects_zero_from_version() -> None:
"""v1 is the baseline — there is no pre-v1 state to walk from."""
plugin = _StubPlugin(current_version=3, migrations_=[])
with pytest.raises(ValueError, match="from_version must be >= 1"):
plan_migrations(plugin, from_version=0)


def test_plan_rejects_negative_from_version() -> None:
plugin = _StubPlugin(current_version=3, migrations_=[])
with pytest.raises(ValueError, match="from_version must be >= 1"):
plan_migrations(plugin, from_version=-1)


# ── Out-of-window filtering ──────────────────────────────────────────────


def test_plan_ignores_migrations_before_window() -> None:
"""Graph at v3, plugin at v4: the v1 → v2 migration is behind us and
must not be re-run. Planner filters by window, not by 'has this
migration been applied?' (that's the workflow's job via the audit
table)."""
m1 = _MigrationStub("m1", from_version=1) # behind the window
m3 = _MigrationStub("m3", from_version=3) # in window
plugin = _StubPlugin(current_version=4, migrations_=[m1, m3])
plan = plan_migrations(plugin, from_version=3)
assert [m.migration_id for m in plan] == ["m3"]


def test_plan_ignores_migrations_beyond_current() -> None:
"""A plugin may keep a v3 → v4 migration checked in while
``current_version`` still reads 3 (pre-ship review). The planner
MUST NOT run unpublished hops — the plugin explicitly shipped at v3.
Filter by window."""
m1 = _MigrationStub("m1", from_version=1)
m_future = _MigrationStub("m3", from_version=3) # beyond current
plugin = _StubPlugin(current_version=3, migrations_=[m1, _MigrationStub("m2", 2), m_future])
plan = plan_migrations(plugin, from_version=1)
assert [m.migration_id for m in plan] == ["m1", "m2"]


def test_plan_ignores_migration_with_out_of_range_from_version() -> None:
"""A migration declaring from_version=0 or from_version=current is
pathological — the planner silently drops them rather than raising,
matching the documented 'only walk the window' behaviour. The
registry's contract validator is the right place to reject these
*at registration*; the planner trusts its input."""
valid = _MigrationStub("valid", from_version=1)
out_of_range_low = _MigrationStub("low", from_version=0)
out_of_range_high = _MigrationStub("high", from_version=2) # target = 3, beyond current=2
plugin = _StubPlugin(current_version=2, migrations_=[out_of_range_low, valid, out_of_range_high])
plan = plan_migrations(plugin, from_version=1)
assert [m.migration_id for m in plan] == ["valid"]


# ── Interaction with validator ───────────────────────────────────────────


def test_planner_output_shape_matches_registry_hop_contract() -> None:
"""Sanity: for a plugin that would pass
:meth:`PluginRegistry.register_graph_type` validation (every hop
covered, single-step hops, current >= 1), the planner produces a
contiguous list of migrations whose from_version values walk the
window in order. Guards against a future refactor where one side
drifts from the other."""
plugin = _StubPlugin(
current_version=5,
migrations_=[
_MigrationStub("m1", 1),
_MigrationStub("m2", 2),
_MigrationStub("m3", 3),
_MigrationStub("m4", 4),
],
)
plan = plan_migrations(plugin, from_version=1)
from_versions = [m.from_version for m in plan]
assert from_versions == [1, 2, 3, 4]
Loading