Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 114 additions & 0 deletions cluster/migration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package cluster

import (
"context"
"encoding/json"
"errors"
"fmt"

"github.com/stablekernel/crucible/state"
"github.com/stablekernel/crucible/state/evolution"
)

// ErrIncompatibleMigration is returned by Restore when the target node's machine
// definition differs from the source's in a backward-incompatible (breaking) way,
// so resuming the instance there would misinterpret its persisted configuration.
var ErrIncompatibleMigration = errors.New("cluster: target machine is a breaking change from the source")

// Checkpoint is a migratable capture of a running instance: its kernel snapshot,
// its actor tree, and the source machine's IR so the target can gate the move on
// schema compatibility. Every field is JSON, so a Checkpoint ships over any
// transport as bytes.
type Checkpoint struct {
// Snapshot is the marshaled kernel Snapshot of the migrating instance.
Snapshot json.RawMessage `json:"snapshot"`
// Actors is the instance's actor tree as SnapshotActors produced it, keyed by
// actor id; empty when the instance runs no actors.
Actors map[string]json.RawMessage `json:"actors,omitempty"`
// MachineIR is the source machine's serialized definition, diffed against the
// target machine to gate the migration on backward compatibility.
MachineIR json.RawMessage `json:"machineIR"`
}

// Capture snapshots a running instance, its actor tree, and its machine definition
// into a Checkpoint ready to ship to another node. It is a pure read: it neither
// fires the instance nor mutates any actor. Call it at a quiescent point.
func Capture[S comparable, E comparable, C any](inst *state.Instance[S, E, C], sys *state.ActorSystem[S, E, C], machine *state.Machine[S, E, C]) (Checkpoint, error) {
snap, err := state.MarshalSnapshot(inst.Snapshot())
if err != nil {
return Checkpoint{}, fmt.Errorf("cluster: marshal snapshot: %w", err)
}
ir, err := machine.ToJSON()
if err != nil {
return Checkpoint{}, fmt.Errorf("cluster: serialize machine: %w", err)
}
var actors map[string]json.RawMessage
if sys != nil {
if actors, err = sys.SnapshotActors(); err != nil {
return Checkpoint{}, fmt.Errorf("cluster: snapshot actors: %w", err)
}
}
return Checkpoint{Snapshot: snap, Actors: actors, MachineIR: ir}, nil
}

type restoreConfig struct {
behaviors map[string]state.ActorBehavior
}

// RestoreOption configures a Restore. New capabilities arrive as additional
// options, so the signature never breaks.
type RestoreOption func(*restoreConfig)

// WithActorBehaviors registers the child-machine behaviors the target node binds
// before its actor tree is rebuilt, keyed by the src ref name — exactly the
// palette the source registered. An actor whose src is absent here is skipped.
func WithActorBehaviors(behaviors map[string]state.ActorBehavior) RestoreOption {
return func(c *restoreConfig) { c.behaviors = behaviors }
}

// Restore rebuilds a captured instance on machine and reconstructs its actor tree,
// gating the move on schema compatibility: if machine is a breaking change from
// the source definition the Checkpoint carries, it refuses with
// ErrIncompatibleMigration rather than resume an instance against a definition
// that would misread its state. An additive (or identical) target is allowed. It
// returns the resumed instance and its actor system; the instance is resumed in
// place (no entry actions re-run), and a host re-arms timers/services by absorbing
// the instance's ResumeEffects through its drivers.
func Restore[S comparable, E comparable, C any](ctx context.Context, cp Checkpoint, machine *state.Machine[S, E, C], opts ...RestoreOption) (*state.Instance[S, E, C], *state.ActorSystem[S, E, C], error) {
var cfg restoreConfig
for _, opt := range opts {
opt(&cfg)
}

targetIR, err := machine.ToJSON()
if err != nil {
return nil, nil, fmt.Errorf("cluster: serialize target machine: %w", err)
}
report, err := evolution.DiffJSON[S, E, C](cp.MachineIR, targetIR)
if err != nil {
return nil, nil, fmt.Errorf("cluster: diff machines for migration: %w", err)
}
if report.Breaking() {
return nil, nil, fmt.Errorf("%w: %s", ErrIncompatibleMigration, report)
}

snap, err := state.UnmarshalSnapshot[S, E, C](cp.Snapshot)
if err != nil {
return nil, nil, fmt.Errorf("cluster: unmarshal snapshot: %w", err)
}
inst, err := machine.Restore(snap)
if err != nil {
return nil, nil, fmt.Errorf("cluster: restore instance: %w", err)
}

sys := state.NewActorSystem(inst)
for src, behavior := range cfg.behaviors {
sys.Register(src, behavior)
}
if len(cp.Actors) > 0 {
if err := sys.RestoreActors(ctx, cp.Actors); err != nil {
return nil, nil, fmt.Errorf("cluster: restore actors: %w", err)
}
}
return inst, sys, nil
}
148 changes: 148 additions & 0 deletions cluster/migration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package cluster_test

import (
"context"
"encoding/json"
"errors"
"testing"

"github.com/stablekernel/crucible/cluster"
"github.com/stablekernel/crucible/state"
)

// migEnt is the migrated instance's context.
type migEnt struct {
Step int `json:"step"`
}

// migSource is the machine an instance runs on the source node: a, b, c with
// a --go--> b.
func migSource() *state.Machine[string, string, *migEnt] {
return state.Forge[string, string, *migEnt]("mig").
State("a").
State("b").
State("c").
Initial("a").
Transition("a").On("go").GoTo("b").
Quench()
}

// migAdditive adds a state d — a backward-compatible (additive) evolution.
func migAdditive() *state.Machine[string, string, *migEnt] {
return state.Forge[string, string, *migEnt]("mig").
State("a").
State("b").
State("c").
State("d").
Initial("a").
Transition("a").On("go").GoTo("b").
Quench()
}

// migBreaking removes state c — a breaking evolution.
func migBreaking() *state.Machine[string, string, *migEnt] {
return state.Forge[string, string, *migEnt]("mig").
State("a").
State("b").
Initial("a").
Transition("a").On("go").GoTo("b").
Quench()
}

// capturedInB drives a fresh source instance to state b and captures it.
func capturedInB(t *testing.T) cluster.Checkpoint {
t.Helper()
ctx := context.Background()
inst := migSource().Cast(&migEnt{Step: 1}, state.WithInitialState("a"))
inst.Fire(ctx, "go")
if inst.Current() != "b" {
t.Fatalf("source instance state = %q, want b", inst.Current())
}
sys := state.NewActorSystem(inst)
cp, err := cluster.Capture(inst, sys, migSource())
if err != nil {
t.Fatalf("Capture: %v", err)
}
return cp
}

// TestMigration_RoundTrip captures an instance on the source machine and restores
// it on the target (same machine), reaching the identical configuration and
// context without re-running the transition.
func TestMigration_RoundTrip(t *testing.T) {
ctx := context.Background()
cp := capturedInB(t)

// The checkpoint is wire-shippable: round-trip it through JSON as a transport
// would.
raw, err := json.Marshal(cp)
if err != nil {
t.Fatalf("marshal checkpoint: %v", err)
}
var shipped cluster.Checkpoint
if err = json.Unmarshal(raw, &shipped); err != nil {
t.Fatalf("unmarshal checkpoint: %v", err)
}

inst, _, err := cluster.Restore(ctx, shipped, migSource())
if err != nil {
t.Fatalf("Restore: %v", err)
}
if inst.Current() != "b" {
t.Fatalf("restored state = %q, want b", inst.Current())
}
if inst.Entity().Step != 1 {
t.Fatalf("restored context Step = %d, want 1", inst.Entity().Step)
}
}

// TestMigration_AdditiveTargetAllowed lets an instance migrate onto an additively
// evolved machine.
func TestMigration_AdditiveTargetAllowed(t *testing.T) {
ctx := context.Background()
cp := capturedInB(t)
inst, _, err := cluster.Restore(ctx, cp, migAdditive())
if err != nil {
t.Fatalf("Restore onto additive target: %v", err)
}
if inst.Current() != "b" {
t.Fatalf("restored state = %q, want b", inst.Current())
}
}

// TestMigration_BreakingTargetRefused refuses to migrate onto a machine whose
// definition changed in a backward-incompatible way.
func TestMigration_BreakingTargetRefused(t *testing.T) {
ctx := context.Background()
cp := capturedInB(t)
_, _, err := cluster.Restore(ctx, cp, migBreaking())
if !errors.Is(err, cluster.ErrIncompatibleMigration) {
t.Fatalf("Restore onto breaking target err = %v, want ErrIncompatibleMigration", err)
}
}

// TestMigration_ActorsMove confirms a migrated instance carries its running actors.
func TestMigration_ActorsMove(t *testing.T) {
ctx := context.Background()
inst := migSource().Cast(&migEnt{Step: 2}, state.WithInitialState("a"))
sys := state.NewActorSystem(inst).Register("child", childBehavior())
// Spawn an actor directly so the snapshot carries it.
sys.Absorb(ctx, []state.Effect{state.SpawnActor{ID: "w-mig", Src: state.Ref{Name: "child"}}})
if sys.Running() != 1 {
t.Fatalf("source Running() = %d, want 1", sys.Running())
}

cp, err := cluster.Capture(inst, sys, migSource())
if err != nil {
t.Fatalf("Capture: %v", err)
}
_, sys2, err := cluster.Restore(ctx, cp, migSource(), cluster.WithActorBehaviors(map[string]state.ActorBehavior{
"child": childBehavior(),
}))
if err != nil {
t.Fatalf("Restore: %v", err)
}
if _, ok := sys2.Ref("w-mig"); !ok {
t.Fatal("migrated actor w-mig not present on the target system")
}
}
Loading