Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions cluster/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Changelog

All notable changes to `crucible/cluster` are documented here.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this module adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.1.0]

The first release of the host-side distribution runtime for the `crucible/state`
kernel. It is additive over the kernel: it consumes the opaque `ActorRef` (with its
`Node` locator), the injectable `ActorSystem`, the `Snapshot`/`Restore` pair, and the
typed `ActorEscalation`/`EscalationHandler`, and the kernel stays pure and
stdlib-only. The cluster core is itself stdlib-only; transport dependencies live
behind the `Transport` interface.

### Added

- **Remote actors.** `System` wraps a node's local `state.ActorSystem` with a node
identity and an optional `Transport`. `Spawn` starts an actor on this node or, when
the target is another node, over the transport; `Deliver` routes to the actor's
owning node; both treat the `ActorRef` as opaque. `SpawnLocal` and `Respawn` are the
local primitives a transport and a supervisor drive. A `System` with no transport
serves local actors and reports `ErrNoTransport` for a remote ref.
- **In-memory transport.** `InMemoryTransport` connects node-scoped systems in one
process — the reference `Transport` for tests and single-process development — with
`ErrNodeUnreachable` for an unregistered node. A real network transport implements
the same interface out of tree.
- **Supervision.** `Supervisor` routes each escalated actor failure to a per-source
`Decision`: `Escalate` (forward to a sink), `Stop` (contain), `Restart` (re-spawn
through a `Respawner` within a per-source budget), and `Backoff` (deferred,
exponentially paced restart applied by the host via `Tick`, timed through an
injectable `state.Clock`). It plugs into `ActorSystem.WithEscalationHandler` and
records the decisions it applied.
- **Live migration.** `Capture` snapshots a running instance, its actor tree, and its
machine definition into a wire-shippable `Checkpoint`; `Restore` rebuilds it on
another node, resuming in place and reconstructing actors from the target palette
(`WithActorBehaviors`). The move is gated on schema compatibility via
`state/evolution`, refusing a breaking target with `ErrIncompatibleMigration`.

[0.1.0]: https://github.com/stablekernel/crucible/releases/tag/cluster/v0.1.0
117 changes: 117 additions & 0 deletions cluster/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# crucible/cluster

The host-side **distribution runtime** for the
[Crucible](../README.md) [`state`](../state) kernel: remote actors, supervision,
and live instance migration.

> **Status:** experimental, pre-1.0. The runtime is feature-complete on the
> in-memory transport and extensively tested; the API may still change before v1.

Import path: `github.com/stablekernel/crucible/cluster`

## What it is

`state` runs a machine and its child-machine actors in one process. `cluster`
spreads that across nodes: a parent on one node addresses and drives an actor
running on another, failures are supervised with restart/backoff strategies, and
a running instance can be migrated to a different node — all over a pluggable
`Transport`, with the kernel left **pure and stdlib-only**.

The runtime is **additive** over the kernel. It consumes seams the kernel already
reserves — the opaque `ActorRef` (whose `Node` locator names the owning host), the
injectable `ActorSystem`, the `Snapshot`/`Restore` pair, and the typed
`ActorEscalation`/`EscalationHandler` — and needs no kernel change beyond the
additive `ActorRef.Node` locator. The core is itself stdlib-only; transport
dependencies live behind the `Transport` interface, out of the core.

## Remote actors

A `System` wraps a node's local `state.ActorSystem` with a node identity and an
optional `Transport`. Operations on a ref this node owns are delegated locally;
operations on a ref another node owns are routed over the transport.

```go
tr := cluster.NewInMemoryTransport()

nodeA := cluster.NewSystem("node-a", actorSysA, cluster.WithTransport(tr))
nodeB := cluster.NewSystem("node-b", actorSysB, cluster.WithTransport(tr))
tr.Register("node-a", nodeA)
tr.Register("node-b", nodeB)

// Spawn a worker on node-b from node-a, then drive it through the returned ref.
ref, err := nodeA.Spawn(ctx, "node-b", "worker", "w-1", nil)
_, err = nodeA.Deliver(ctx, ref, "start") // routed to node-b
```

The ref is opaque: the holder never learns where the actor runs. A ref this node
owns has an empty `Node`; a remote ref carries the owning node. A `System` with no
`Transport` serves its local actors transparently and reports `ErrNoTransport` for
a remote ref. The in-tree `InMemoryTransport` connects node-scoped systems in one
process; a real network transport implements the same `Transport` interface.

## Supervision

A `Supervisor` turns the kernel's typed `ActorEscalation` into a per-source policy.
Wire it with `ActorSystem.WithEscalationHandler(sup.Handle)`; each failed actor is
routed to a `Decision` by the src it was spawned from:

| Decision | Behavior |
| --- | --- |
| `Escalate` | forward the failure to a sink up the hierarchy (the default) |
| `Stop` | contain the failure at this level |
| `Restart` | re-spawn through a `Respawner` (the `System`), bounded by a per-src budget; on exhaustion, escalate |
| `Backoff` | defer the re-spawn behind an exponentially growing delay; the host applies due restarts via `Tick` |

```go
sup := cluster.NewSupervisor(
cluster.WithRestart("worker", 3), // up to 3 immediate restarts
cluster.WithBackoff("flaky", 5, 100*time.Millisecond, time.Minute, 2.0),
cluster.WithEscalationSink(parentHandler),
)
sup.SetRespawner(node)
actorSys.WithEscalationHandler(sup.Handle)
// ... drive backoff restarts from a timer loop:
for range ticker.C { sup.Tick(ctx) }
```

Backoff reads time through an injected `state.Clock` (`WithClock`, default the
system clock), so it is deterministic under a `state.FakeClock` in tests.

## Live migration

`Capture` snapshots a running instance, its actor tree, and its machine definition
into a wire-shippable `Checkpoint`; `Restore` rebuilds it on another node, resuming
in place. The move is **gated on schema compatibility**: `Restore` diffs the
source and target machine definitions with [`state/evolution`](../state/evolution)
and refuses a breaking target with `ErrIncompatibleMigration`, so an instance never
resumes against a definition that would misread its state.

```go
cp, err := cluster.Capture(inst, actorSys, machine) // on the source node
// ... ship cp (it is all JSON) to the target node ...
inst, sys, err := cluster.Restore(ctx, cp, targetMachine, // on the target node
cluster.WithActorBehaviors(palette))
// err is ErrIncompatibleMigration if targetMachine is a breaking change.
```

## Performance

Indicative numbers (Apple Silicon dev machine, `go test -bench`); reproduce with
`go test -bench=. -benchmem -run=^$ ./cluster/`. The local `System.Deliver` is a
thin pass-through over the kernel `ActorSystem`; remote delivery over the in-memory
transport adds a map lookup and the delegating call.

## Stability

Stability label: **experimental** (pre-1.0; the API may change). Each module is
independently versioned per-module SemVer.

## Design & discussions

Design rationale lives on the GitHub
[Discussions board](https://github.com/stablekernel/crucible/discussions) under
the **State Machine** category.

## License

Apache-2.0. See the repository [LICENSE](../LICENSE) and [NOTICE](../NOTICE).
77 changes: 77 additions & 0 deletions cluster/bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package cluster_test

import (
"context"
"testing"

"github.com/stablekernel/crucible/cluster"
"github.com/stablekernel/crucible/state"
)

// pingEnt is a long-lived bench actor's context.
type pingEnt struct{}

// pingMachine stays running: a self-transition on "ping" keeps the actor alive so
// a benchmark can deliver to it repeatedly without re-spawning.
func pingMachine() *state.Machine[string, string, *pingEnt] {
return state.Forge[string, string, *pingEnt]("ping").
State("up").
Initial("up").
Transition("up").On("ping").GoTo("up").
Quench()
}

func pingBehavior() state.ActorBehavior {
m := pingMachine()
return func(map[string]any) (state.ActorInstance, error) {
return state.NewActor(m.Cast(&pingEnt{}, state.WithInitialState("up")), nil), nil
}
}

// pingSystem builds a node-scoped System running one long-lived ping actor, and
// returns the System and the actor's ref.
func pingSystem(node string, opts ...cluster.Option) (*cluster.System[string, string, *parentEnt], state.ActorRef) {
parent := parentMachine().Cast(&parentEnt{}, state.WithInitialState("idle"))
actorSys := state.NewActorSystem(parent).Register("child", pingBehavior())
ctx := context.Background()
actorSys.Absorb(ctx, []state.Effect{state.SpawnActor{ID: "ping-1", Src: state.Ref{Name: "child"}}})
sys := cluster.NewSystem(node, actorSys, opts...)
ref, _ := sys.Ref("ping-1")
return sys, ref
}

// BenchmarkDeliver measures the per-delivery overhead the System adds: the local
// path is a thin pass-through over the kernel ActorSystem, and the remote path
// adds the transport's node lookup and the delegating call.
func BenchmarkDeliver(b *testing.B) {
ctx := context.Background()

b.Run("local", func(b *testing.B) {
sys, ref := pingSystem("node-a")
b.ReportAllocs()
b.ResetTimer()
for range b.N {
if _, err := sys.Deliver(ctx, ref, "ping"); err != nil {
b.Fatalf("deliver: %v", err)
}
}
})

b.Run("remote/inmemory", func(b *testing.B) {
tr := cluster.NewInMemoryTransport()
sysB, localRef := pingSystem("node-b")
parentA := parentMachine().Cast(&parentEnt{}, state.WithInitialState("idle"))
sysA := cluster.NewSystem("node-a", state.NewActorSystem(parentA), cluster.WithTransport(tr))
tr.Register("node-a", sysA)
tr.Register("node-b", sysB)
remote := state.ActorRef{ID: localRef.ID, Src: localRef.Src, Node: "node-b"}

b.ReportAllocs()
b.ResetTimer()
for range b.N {
if _, err := sysA.Deliver(ctx, remote, "ping"); err != nil {
b.Fatalf("remote deliver: %v", err)
}
}
})
}
27 changes: 23 additions & 4 deletions cluster/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,27 @@
//
// # Transport
//
// Transport is the seam that moves a delivery to the node that owns the target
// actor. It is host-supplied, so the kernel and this package's core carry no
// network dependency: an in-memory transport drives multi-node tests, and a real
// network transport (gRPC) lives behind the same interface.
// Transport is the seam that moves an actor operation (deliver, spawn) to the node
// that owns the target actor. It is host-supplied, so the kernel and this package's
// core carry no network dependency: the in-tree InMemoryTransport drives multi-node
// tests and single-process development, and a real network transport (gRPC) lives
// behind the same interface.
//
// # Supervision
//
// Supervisor turns the kernel's typed ActorEscalation into a per-source supervision
// policy. Each failed actor is routed to a Decision by the src it was spawned from:
// Escalate forwards the failure to a sink up the hierarchy, Stop contains it,
// Restart re-spawns the actor through a Respawner within a per-source budget, and
// Backoff defers the re-spawn behind an exponentially growing delay applied by the
// host through Tick. It plugs into the seam with ActorSystem.WithEscalationHandler.
//
// # Migration
//
// Capture snapshots a running instance, its actor tree, and its machine definition
// into a wire-shippable Checkpoint; Restore rebuilds it on another node, resuming in
// place. Restore gates the move on schema compatibility — it diffs the source and
// target machine definitions with state/evolution and refuses a breaking target with
// ErrIncompatibleMigration — so an instance never resumes against a definition that
// would misread its persisted configuration.
package cluster
93 changes: 93 additions & 0 deletions cluster/integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package cluster_test

import (
"context"
"errors"
"testing"

"github.com/stablekernel/crucible/cluster"
"github.com/stablekernel/crucible/state"
)

// TestIntegration_RemoteSpawnDeliverSupervise exercises the distribution runtime
// end to end across two nodes: node-a spawns a supervised worker on node-b, drives
// it remotely, the worker fails, node-b's supervisor restarts it within budget, and
// node-a keeps driving the same ref — all without node-a knowing where the worker
// runs.
func TestIntegration_RemoteSpawnDeliverSupervise(t *testing.T) {
ctx := context.Background()
tr := cluster.NewInMemoryTransport()

// node-b hosts workers and supervises them with a restart budget.
parentB := parentMachine().Cast(&parentEnt{}, state.WithInitialState("idle"))
actorSysB := state.NewActorSystem(parentB).Register("worker", pingBehavior())
nodeB := cluster.NewSystem("node-b", actorSysB, cluster.WithTransport(tr))
sup := cluster.NewSupervisor(cluster.WithRestart("worker", 2))
sup.SetRespawner(nodeB)
actorSysB.WithEscalationHandler(sup.Handle)

// node-a only routes.
parentA := parentMachine().Cast(&parentEnt{}, state.WithInitialState("idle"))
nodeA := cluster.NewSystem("node-a", state.NewActorSystem(parentA), cluster.WithTransport(tr))

tr.Register("node-a", nodeA)
tr.Register("node-b", nodeB)

// node-a spawns a worker on node-b and drives it through the returned ref.
ref, err := nodeA.Spawn(ctx, "node-b", "worker", "w-1", nil)
if err != nil {
t.Fatalf("remote spawn: %v", err)
}
if ref.Node != "node-b" {
t.Fatalf("ref Node = %q, want node-b", ref.Node)
}
if delivered, err := nodeA.Deliver(ctx, ref, "ping"); err != nil || !delivered {
t.Fatalf("remote deliver = (%v, %v)", delivered, err)
}

// The worker fails on node-b; the supervisor restarts it within budget.
actorSysB.SettleError(ctx, "w-1", errors.New("boom"))
if nodeB.Running() != 1 {
t.Fatalf("node-b Running() after supervised restart = %d, want 1", nodeB.Running())
}
if got := sup.Handled(); len(got) != 1 || got[0].Decision != cluster.Restart {
t.Fatalf("supervisor Handled = %+v, want one Restart", got)
}

// node-a drives the restarted worker through the same opaque ref.
if delivered, err := nodeA.Deliver(ctx, ref, "ping"); err != nil || !delivered {
t.Fatalf("deliver after restart = (%v, %v)", delivered, err)
}
}

// TestIntegration_MigrateAcrossNodes captures an instance running on one node and
// restores it onto an additively-evolved machine on another, preserving its
// configuration and actors.
func TestIntegration_MigrateAcrossNodes(t *testing.T) {
ctx := context.Background()

// Source: an instance in state b with a running actor.
inst := migSource().Cast(&migEnt{Step: 7}, state.WithInitialState("a"))
inst.Fire(ctx, "go")
srcSys := state.NewActorSystem(inst).Register("child", childBehavior())
srcSys.Absorb(ctx, []state.Effect{state.SpawnActor{ID: "w", Src: state.Ref{Name: "child"}}})

cp, err := cluster.Capture(inst, srcSys, migSource())
if err != nil {
t.Fatalf("capture: %v", err)
}

// Target node runs an additively-evolved machine; the migration is allowed and
// the instance resumes in state b with its actor.
got, sys, err := cluster.Restore(ctx, cp, migAdditive(),
cluster.WithActorBehaviors(map[string]state.ActorBehavior{"child": childBehavior()}))
if err != nil {
t.Fatalf("restore: %v", err)
}
if got.Current() != "b" || got.Entity().Step != 7 {
t.Fatalf("migrated instance = (%q, step %d), want (b, 7)", got.Current(), got.Entity().Step)
}
if _, ok := sys.Ref("w"); !ok {
t.Fatal("migrated actor w not present on target")
}
}
Loading