From f92bb91fb69bbb842da0fbef7b671a72724e1722 Mon Sep 17 00:00:00 2001 From: Joshua Temple Date: Sun, 31 May 2026 12:53:01 -0400 Subject: [PATCH] docs: document the cluster runtime with a benchmark and integration capstone Signed-off-by: Joshua Temple --- cluster/CHANGELOG.md | 41 +++++++++++++ cluster/README.md | 117 ++++++++++++++++++++++++++++++++++++ cluster/bench_test.go | 77 ++++++++++++++++++++++++ cluster/doc.go | 27 +++++++-- cluster/integration_test.go | 93 ++++++++++++++++++++++++++++ 5 files changed, 351 insertions(+), 4 deletions(-) create mode 100644 cluster/CHANGELOG.md create mode 100644 cluster/README.md create mode 100644 cluster/bench_test.go create mode 100644 cluster/integration_test.go diff --git a/cluster/CHANGELOG.md b/cluster/CHANGELOG.md new file mode 100644 index 0000000..23da615 --- /dev/null +++ b/cluster/CHANGELOG.md @@ -0,0 +1,41 @@ +# Changelog + +All notable changes to `crucible/cluster` are documented here. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), +and this module adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [0.1.0] + +The first release of the host-side distribution runtime for the `crucible/state` +kernel. It is additive over the kernel: it consumes the opaque `ActorRef` (with its +`Node` locator), the injectable `ActorSystem`, the `Snapshot`/`Restore` pair, and the +typed `ActorEscalation`/`EscalationHandler`, and the kernel stays pure and +stdlib-only. The cluster core is itself stdlib-only; transport dependencies live +behind the `Transport` interface. + +### Added + +- **Remote actors.** `System` wraps a node's local `state.ActorSystem` with a node + identity and an optional `Transport`. `Spawn` starts an actor on this node or, when + the target is another node, over the transport; `Deliver` routes to the actor's + owning node; both treat the `ActorRef` as opaque. `SpawnLocal` and `Respawn` are the + local primitives a transport and a supervisor drive. A `System` with no transport + serves local actors and reports `ErrNoTransport` for a remote ref. +- **In-memory transport.** `InMemoryTransport` connects node-scoped systems in one + process — the reference `Transport` for tests and single-process development — with + `ErrNodeUnreachable` for an unregistered node. A real network transport implements + the same interface out of tree. +- **Supervision.** `Supervisor` routes each escalated actor failure to a per-source + `Decision`: `Escalate` (forward to a sink), `Stop` (contain), `Restart` (re-spawn + through a `Respawner` within a per-source budget), and `Backoff` (deferred, + exponentially paced restart applied by the host via `Tick`, timed through an + injectable `state.Clock`). It plugs into `ActorSystem.WithEscalationHandler` and + records the decisions it applied. +- **Live migration.** `Capture` snapshots a running instance, its actor tree, and its + machine definition into a wire-shippable `Checkpoint`; `Restore` rebuilds it on + another node, resuming in place and reconstructing actors from the target palette + (`WithActorBehaviors`). The move is gated on schema compatibility via + `state/evolution`, refusing a breaking target with `ErrIncompatibleMigration`. + +[0.1.0]: https://github.com/stablekernel/crucible/releases/tag/cluster/v0.1.0 diff --git a/cluster/README.md b/cluster/README.md new file mode 100644 index 0000000..4f02023 --- /dev/null +++ b/cluster/README.md @@ -0,0 +1,117 @@ +# crucible/cluster + +The host-side **distribution runtime** for the +[Crucible](../README.md) [`state`](../state) kernel: remote actors, supervision, +and live instance migration. + +> **Status:** experimental, pre-1.0. The runtime is feature-complete on the +> in-memory transport and extensively tested; the API may still change before v1. + +Import path: `github.com/stablekernel/crucible/cluster` + +## What it is + +`state` runs a machine and its child-machine actors in one process. `cluster` +spreads that across nodes: a parent on one node addresses and drives an actor +running on another, failures are supervised with restart/backoff strategies, and +a running instance can be migrated to a different node — all over a pluggable +`Transport`, with the kernel left **pure and stdlib-only**. + +The runtime is **additive** over the kernel. It consumes seams the kernel already +reserves — the opaque `ActorRef` (whose `Node` locator names the owning host), the +injectable `ActorSystem`, the `Snapshot`/`Restore` pair, and the typed +`ActorEscalation`/`EscalationHandler` — and needs no kernel change beyond the +additive `ActorRef.Node` locator. The core is itself stdlib-only; transport +dependencies live behind the `Transport` interface, out of the core. + +## Remote actors + +A `System` wraps a node's local `state.ActorSystem` with a node identity and an +optional `Transport`. Operations on a ref this node owns are delegated locally; +operations on a ref another node owns are routed over the transport. + +```go +tr := cluster.NewInMemoryTransport() + +nodeA := cluster.NewSystem("node-a", actorSysA, cluster.WithTransport(tr)) +nodeB := cluster.NewSystem("node-b", actorSysB, cluster.WithTransport(tr)) +tr.Register("node-a", nodeA) +tr.Register("node-b", nodeB) + +// Spawn a worker on node-b from node-a, then drive it through the returned ref. +ref, err := nodeA.Spawn(ctx, "node-b", "worker", "w-1", nil) +_, err = nodeA.Deliver(ctx, ref, "start") // routed to node-b +``` + +The ref is opaque: the holder never learns where the actor runs. A ref this node +owns has an empty `Node`; a remote ref carries the owning node. A `System` with no +`Transport` serves its local actors transparently and reports `ErrNoTransport` for +a remote ref. The in-tree `InMemoryTransport` connects node-scoped systems in one +process; a real network transport implements the same `Transport` interface. + +## Supervision + +A `Supervisor` turns the kernel's typed `ActorEscalation` into a per-source policy. +Wire it with `ActorSystem.WithEscalationHandler(sup.Handle)`; each failed actor is +routed to a `Decision` by the src it was spawned from: + +| Decision | Behavior | +| --- | --- | +| `Escalate` | forward the failure to a sink up the hierarchy (the default) | +| `Stop` | contain the failure at this level | +| `Restart` | re-spawn through a `Respawner` (the `System`), bounded by a per-src budget; on exhaustion, escalate | +| `Backoff` | defer the re-spawn behind an exponentially growing delay; the host applies due restarts via `Tick` | + +```go +sup := cluster.NewSupervisor( + cluster.WithRestart("worker", 3), // up to 3 immediate restarts + cluster.WithBackoff("flaky", 5, 100*time.Millisecond, time.Minute, 2.0), + cluster.WithEscalationSink(parentHandler), +) +sup.SetRespawner(node) +actorSys.WithEscalationHandler(sup.Handle) +// ... drive backoff restarts from a timer loop: +for range ticker.C { sup.Tick(ctx) } +``` + +Backoff reads time through an injected `state.Clock` (`WithClock`, default the +system clock), so it is deterministic under a `state.FakeClock` in tests. + +## Live migration + +`Capture` snapshots a running instance, its actor tree, and its machine definition +into a wire-shippable `Checkpoint`; `Restore` rebuilds it on another node, resuming +in place. The move is **gated on schema compatibility**: `Restore` diffs the +source and target machine definitions with [`state/evolution`](../state/evolution) +and refuses a breaking target with `ErrIncompatibleMigration`, so an instance never +resumes against a definition that would misread its state. + +```go +cp, err := cluster.Capture(inst, actorSys, machine) // on the source node +// ... ship cp (it is all JSON) to the target node ... +inst, sys, err := cluster.Restore(ctx, cp, targetMachine, // on the target node + cluster.WithActorBehaviors(palette)) +// err is ErrIncompatibleMigration if targetMachine is a breaking change. +``` + +## Performance + +Indicative numbers (Apple Silicon dev machine, `go test -bench`); reproduce with +`go test -bench=. -benchmem -run=^$ ./cluster/`. The local `System.Deliver` is a +thin pass-through over the kernel `ActorSystem`; remote delivery over the in-memory +transport adds a map lookup and the delegating call. + +## Stability + +Stability label: **experimental** (pre-1.0; the API may change). Each module is +independently versioned per-module SemVer. + +## Design & discussions + +Design rationale lives on the GitHub +[Discussions board](https://github.com/stablekernel/crucible/discussions) under +the **State Machine** category. + +## License + +Apache-2.0. See the repository [LICENSE](../LICENSE) and [NOTICE](../NOTICE). diff --git a/cluster/bench_test.go b/cluster/bench_test.go new file mode 100644 index 0000000..09764ad --- /dev/null +++ b/cluster/bench_test.go @@ -0,0 +1,77 @@ +package cluster_test + +import ( + "context" + "testing" + + "github.com/stablekernel/crucible/cluster" + "github.com/stablekernel/crucible/state" +) + +// pingEnt is a long-lived bench actor's context. +type pingEnt struct{} + +// pingMachine stays running: a self-transition on "ping" keeps the actor alive so +// a benchmark can deliver to it repeatedly without re-spawning. +func pingMachine() *state.Machine[string, string, *pingEnt] { + return state.Forge[string, string, *pingEnt]("ping"). + State("up"). + Initial("up"). + Transition("up").On("ping").GoTo("up"). + Quench() +} + +func pingBehavior() state.ActorBehavior { + m := pingMachine() + return func(map[string]any) (state.ActorInstance, error) { + return state.NewActor(m.Cast(&pingEnt{}, state.WithInitialState("up")), nil), nil + } +} + +// pingSystem builds a node-scoped System running one long-lived ping actor, and +// returns the System and the actor's ref. +func pingSystem(node string, opts ...cluster.Option) (*cluster.System[string, string, *parentEnt], state.ActorRef) { + parent := parentMachine().Cast(&parentEnt{}, state.WithInitialState("idle")) + actorSys := state.NewActorSystem(parent).Register("child", pingBehavior()) + ctx := context.Background() + actorSys.Absorb(ctx, []state.Effect{state.SpawnActor{ID: "ping-1", Src: state.Ref{Name: "child"}}}) + sys := cluster.NewSystem(node, actorSys, opts...) + ref, _ := sys.Ref("ping-1") + return sys, ref +} + +// BenchmarkDeliver measures the per-delivery overhead the System adds: the local +// path is a thin pass-through over the kernel ActorSystem, and the remote path +// adds the transport's node lookup and the delegating call. +func BenchmarkDeliver(b *testing.B) { + ctx := context.Background() + + b.Run("local", func(b *testing.B) { + sys, ref := pingSystem("node-a") + b.ReportAllocs() + b.ResetTimer() + for range b.N { + if _, err := sys.Deliver(ctx, ref, "ping"); err != nil { + b.Fatalf("deliver: %v", err) + } + } + }) + + b.Run("remote/inmemory", func(b *testing.B) { + tr := cluster.NewInMemoryTransport() + sysB, localRef := pingSystem("node-b") + parentA := parentMachine().Cast(&parentEnt{}, state.WithInitialState("idle")) + sysA := cluster.NewSystem("node-a", state.NewActorSystem(parentA), cluster.WithTransport(tr)) + tr.Register("node-a", sysA) + tr.Register("node-b", sysB) + remote := state.ActorRef{ID: localRef.ID, Src: localRef.Src, Node: "node-b"} + + b.ReportAllocs() + b.ResetTimer() + for range b.N { + if _, err := sysA.Deliver(ctx, remote, "ping"); err != nil { + b.Fatalf("remote deliver: %v", err) + } + } + }) +} diff --git a/cluster/doc.go b/cluster/doc.go index 4ec91c9..c690e68 100644 --- a/cluster/doc.go +++ b/cluster/doc.go @@ -22,8 +22,27 @@ // // # Transport // -// Transport is the seam that moves a delivery to the node that owns the target -// actor. It is host-supplied, so the kernel and this package's core carry no -// network dependency: an in-memory transport drives multi-node tests, and a real -// network transport (gRPC) lives behind the same interface. +// Transport is the seam that moves an actor operation (deliver, spawn) to the node +// that owns the target actor. It is host-supplied, so the kernel and this package's +// core carry no network dependency: the in-tree InMemoryTransport drives multi-node +// tests and single-process development, and a real network transport (gRPC) lives +// behind the same interface. +// +// # Supervision +// +// Supervisor turns the kernel's typed ActorEscalation into a per-source supervision +// policy. Each failed actor is routed to a Decision by the src it was spawned from: +// Escalate forwards the failure to a sink up the hierarchy, Stop contains it, +// Restart re-spawns the actor through a Respawner within a per-source budget, and +// Backoff defers the re-spawn behind an exponentially growing delay applied by the +// host through Tick. It plugs into the seam with ActorSystem.WithEscalationHandler. +// +// # Migration +// +// Capture snapshots a running instance, its actor tree, and its machine definition +// into a wire-shippable Checkpoint; Restore rebuilds it on another node, resuming in +// place. Restore gates the move on schema compatibility — it diffs the source and +// target machine definitions with state/evolution and refuses a breaking target with +// ErrIncompatibleMigration — so an instance never resumes against a definition that +// would misread its persisted configuration. package cluster diff --git a/cluster/integration_test.go b/cluster/integration_test.go new file mode 100644 index 0000000..68dd553 --- /dev/null +++ b/cluster/integration_test.go @@ -0,0 +1,93 @@ +package cluster_test + +import ( + "context" + "errors" + "testing" + + "github.com/stablekernel/crucible/cluster" + "github.com/stablekernel/crucible/state" +) + +// TestIntegration_RemoteSpawnDeliverSupervise exercises the distribution runtime +// end to end across two nodes: node-a spawns a supervised worker on node-b, drives +// it remotely, the worker fails, node-b's supervisor restarts it within budget, and +// node-a keeps driving the same ref — all without node-a knowing where the worker +// runs. +func TestIntegration_RemoteSpawnDeliverSupervise(t *testing.T) { + ctx := context.Background() + tr := cluster.NewInMemoryTransport() + + // node-b hosts workers and supervises them with a restart budget. + parentB := parentMachine().Cast(&parentEnt{}, state.WithInitialState("idle")) + actorSysB := state.NewActorSystem(parentB).Register("worker", pingBehavior()) + nodeB := cluster.NewSystem("node-b", actorSysB, cluster.WithTransport(tr)) + sup := cluster.NewSupervisor(cluster.WithRestart("worker", 2)) + sup.SetRespawner(nodeB) + actorSysB.WithEscalationHandler(sup.Handle) + + // node-a only routes. + parentA := parentMachine().Cast(&parentEnt{}, state.WithInitialState("idle")) + nodeA := cluster.NewSystem("node-a", state.NewActorSystem(parentA), cluster.WithTransport(tr)) + + tr.Register("node-a", nodeA) + tr.Register("node-b", nodeB) + + // node-a spawns a worker on node-b and drives it through the returned ref. + ref, err := nodeA.Spawn(ctx, "node-b", "worker", "w-1", nil) + if err != nil { + t.Fatalf("remote spawn: %v", err) + } + if ref.Node != "node-b" { + t.Fatalf("ref Node = %q, want node-b", ref.Node) + } + if delivered, err := nodeA.Deliver(ctx, ref, "ping"); err != nil || !delivered { + t.Fatalf("remote deliver = (%v, %v)", delivered, err) + } + + // The worker fails on node-b; the supervisor restarts it within budget. + actorSysB.SettleError(ctx, "w-1", errors.New("boom")) + if nodeB.Running() != 1 { + t.Fatalf("node-b Running() after supervised restart = %d, want 1", nodeB.Running()) + } + if got := sup.Handled(); len(got) != 1 || got[0].Decision != cluster.Restart { + t.Fatalf("supervisor Handled = %+v, want one Restart", got) + } + + // node-a drives the restarted worker through the same opaque ref. + if delivered, err := nodeA.Deliver(ctx, ref, "ping"); err != nil || !delivered { + t.Fatalf("deliver after restart = (%v, %v)", delivered, err) + } +} + +// TestIntegration_MigrateAcrossNodes captures an instance running on one node and +// restores it onto an additively-evolved machine on another, preserving its +// configuration and actors. +func TestIntegration_MigrateAcrossNodes(t *testing.T) { + ctx := context.Background() + + // Source: an instance in state b with a running actor. + inst := migSource().Cast(&migEnt{Step: 7}, state.WithInitialState("a")) + inst.Fire(ctx, "go") + srcSys := state.NewActorSystem(inst).Register("child", childBehavior()) + srcSys.Absorb(ctx, []state.Effect{state.SpawnActor{ID: "w", Src: state.Ref{Name: "child"}}}) + + cp, err := cluster.Capture(inst, srcSys, migSource()) + if err != nil { + t.Fatalf("capture: %v", err) + } + + // Target node runs an additively-evolved machine; the migration is allowed and + // the instance resumes in state b with its actor. + got, sys, err := cluster.Restore(ctx, cp, migAdditive(), + cluster.WithActorBehaviors(map[string]state.ActorBehavior{"child": childBehavior()})) + if err != nil { + t.Fatalf("restore: %v", err) + } + if got.Current() != "b" || got.Entity().Step != 7 { + t.Fatalf("migrated instance = (%q, step %d), want (b, 7)", got.Current(), got.Entity().Step) + } + if _, ok := sys.Ref("w"); !ok { + t.Fatal("migrated actor w not present on target") + } +}