diff --git a/cluster/restart_test.go b/cluster/restart_test.go new file mode 100644 index 0000000..84a72f8 --- /dev/null +++ b/cluster/restart_test.go @@ -0,0 +1,139 @@ +package cluster_test + +import ( + "context" + "errors" + "testing" + + "github.com/stablekernel/crucible/cluster" + "github.com/stablekernel/crucible/state" +) + +// superviseSystem builds a node-scoped System whose local ActorSystem routes +// failures to sup, with one worker spawned and running. It returns the System so +// a test can fail and observe restarts. +func superviseSystem(t *testing.T, sup *cluster.Supervisor) *cluster.System[string, string, *parentEnt] { + t.Helper() + parent := parentMachine().Cast(&parentEnt{}, state.WithInitialState("idle")) + actorSys := state.NewActorSystem(parent). + Register("child", childBehavior()). + WithEscalationHandler(sup.Handle) + ctx := context.Background() + res := parent.Fire(ctx, "go") + actorSys.Absorb(ctx, res.Effects) + sys := cluster.NewSystem("node-a", actorSys) + if sys.Running() != 1 { + t.Fatalf("setup Running() = %d, want 1", sys.Running()) + } + return sys +} + +// TestSupervisor_RestartReinstatesActor confirms a Restart decision re-spawns the +// failed actor, so the actor is running again after the failure. +func TestSupervisor_RestartReinstatesActor(t *testing.T) { + ctx := context.Background() + sup := cluster.NewSupervisor(cluster.WithRestart("child", 3)) + // Wire the respawner after the system exists. + sys := superviseSystemWithRespawner(t, sup) + + if _, routed := sys.Local().SettleError(ctx, "worker-1", errors.New("boom")); routed { + t.Fatal("SettleError routed an onError unexpectedly") + } + if sys.Running() != 1 { + t.Fatalf("Running() after restart = %d, want 1 (actor reinstated)", sys.Running()) + } + handled := sup.Handled() + if len(handled) != 1 || handled[0].Decision != cluster.Restart { + t.Fatalf("Handled() = %+v, want one Restart", handled) + } +} + +// superviseSystemWithRespawner is like superviseSystem but binds the System as the +// supervisor's respawner via WithRespawner, set before failures occur. +func superviseSystemWithRespawner(t *testing.T, sup *cluster.Supervisor) *cluster.System[string, string, *parentEnt] { + t.Helper() + parent := parentMachine().Cast(&parentEnt{}, state.WithInitialState("idle")) + actorSys := state.NewActorSystem(parent). + Register("child", childBehavior()). + WithEscalationHandler(sup.Handle) + ctx := context.Background() + res := parent.Fire(ctx, "go") + actorSys.Absorb(ctx, res.Effects) + sys := cluster.NewSystem("node-a", actorSys) + sup.SetRespawner(sys) + if sys.Running() != 1 { + t.Fatalf("setup Running() = %d, want 1", sys.Running()) + } + return sys +} + +// TestSupervisor_RestartBudgetExhausts confirms restarts stop after the per-src +// budget is spent and the failure then escalates to the sink. +func TestSupervisor_RestartBudgetExhausts(t *testing.T) { + ctx := context.Background() + var escalated int + sup := cluster.NewSupervisor( + cluster.WithRestart("child", 2), + cluster.WithEscalationSink(func(context.Context, *state.ActorEscalation) { escalated++ }), + ) + sys := superviseSystemWithRespawner(t, sup) + + // Two failures are absorbed by restarts; the actor is running each time. + for i := range 2 { + sys.Local().SettleError(ctx, "worker-1", errors.New("boom")) + if sys.Running() != 1 { + t.Fatalf("after failure %d Running() = %d, want 1 (restarted)", i+1, sys.Running()) + } + } + if escalated != 0 { + t.Fatalf("escalated %d times before budget exhausted, want 0", escalated) + } + + // The third failure exhausts the budget: no restart, escalates instead. + sys.Local().SettleError(ctx, "worker-1", errors.New("boom")) + if sys.Running() != 0 { + t.Fatalf("after budget exhausted Running() = %d, want 0 (not restarted)", sys.Running()) + } + if escalated != 1 { + t.Fatalf("escalations after exhaustion = %d, want 1", escalated) + } +} + +// TestSupervisor_WithRespawnerOption wires the respawner at construction (the +// build-system-first ordering) instead of via SetRespawner, and confirms restart +// still reinstates the actor. +func TestSupervisor_WithRespawnerOption(t *testing.T) { + ctx := context.Background() + parent := parentMachine().Cast(&parentEnt{}, state.WithInitialState("idle")) + actorSys := state.NewActorSystem(parent).Register("child", childBehavior()) + sys := cluster.NewSystem("node-a", actorSys) + sup := cluster.NewSupervisor(cluster.WithRestart("child", 1), cluster.WithRespawner(sys)) + actorSys.WithEscalationHandler(sup.Handle) + + res := parent.Fire(ctx, "go") + actorSys.Absorb(ctx, res.Effects) + sys.Local().SettleError(ctx, "worker-1", errors.New("boom")) + if sys.Running() != 1 { + t.Fatalf("Running() after restart = %d, want 1", sys.Running()) + } +} + +// TestSupervisor_RestartNoRespawner falls through to escalation when a Restart +// decision is configured but no respawner is wired. +func TestSupervisor_RestartNoRespawner(t *testing.T) { + ctx := context.Background() + var escalated int + sup := cluster.NewSupervisor( + cluster.WithRestart("child", 3), + cluster.WithEscalationSink(func(context.Context, *state.ActorEscalation) { escalated++ }), + ) + sys := superviseSystem(t, sup) // no respawner bound + + sys.Local().SettleError(ctx, "worker-1", errors.New("boom")) + if escalated != 1 { + t.Fatalf("escalations without respawner = %d, want 1 (fell through)", escalated) + } + if sys.Running() != 0 { + t.Fatalf("Running() = %d, want 0 (no restart possible)", sys.Running()) + } +} diff --git a/cluster/supervisor.go b/cluster/supervisor.go index 3bbecc7..9db0b8d 100644 --- a/cluster/supervisor.go +++ b/cluster/supervisor.go @@ -18,6 +18,11 @@ const ( // Stop contains the failure at this level: the failed actor stays down and the // failure is not forwarded. Stop + // Restart re-spawns the failed actor through the configured Respawner, bounded + // by the per-src restart budget set with WithRestart. When the budget is spent + // (or no Respawner is wired) the failure escalates instead. Configure it with + // WithRestart, not WithDecision. + Restart ) // String renders a Decision for diagnostics. @@ -27,11 +32,20 @@ func (d Decision) String() string { return "stop" case Escalate: return "escalate" + case Restart: + return "restart" default: return "unknown" } } +// Respawner re-creates a failed actor in its local system, replacing the dead +// instance registered under id. *System satisfies it (via Respawn), so wiring +// restart is just handing the supervisor the System. +type Respawner interface { + Respawn(ctx context.Context, src, id string, input map[string]any) (state.ActorRef, error) +} + // HandledEscalation records one failure a Supervisor processed, for observability. type HandledEscalation struct { // ActorID is the registry id of the actor that failed. @@ -53,10 +67,13 @@ type HandledEscalation struct { type Supervisor struct { def Decision perSrc map[string]Decision + limits map[string]int // per-src restart budget for the Restart decision sink state.EscalationHandler - mu sync.Mutex - handled []HandledEscalation + mu sync.Mutex + respawner Respawner + restarts map[string]int // per-actor-id restarts already spent + handled []HandledEscalation } // SupervisorOption configures a Supervisor. New strategies arrive as additional @@ -81,16 +98,47 @@ func WithEscalationSink(h state.EscalationHandler) SupervisorOption { return func(s *Supervisor) { s.sink = h } } +// WithRestart sets the Restart decision for failures of actors spawned from src, +// re-spawning the actor up to maxRestarts times (counted per actor id). When the +// budget is spent the failure escalates instead, so a crash-looping actor cannot +// restart-storm. Restart needs a Respawner wired with WithRespawner or SetRespawner. +func WithRestart(src string, maxRestarts int) SupervisorOption { + return func(s *Supervisor) { + s.perSrc[src] = Restart + s.limits[src] = maxRestarts + } +} + +// WithRespawner wires the Respawner a Restart decision re-spawns through. The +// node's *System is the usual respawner. SetRespawner does the same after +// construction, for the common case where the System is built after the Supervisor. +func WithRespawner(r Respawner) SupervisorOption { + return func(s *Supervisor) { s.respawner = r } +} + // NewSupervisor builds a Supervisor. Wire it into a system with // ActorSystem.WithEscalationHandler(sup.Handle). func NewSupervisor(opts ...SupervisorOption) *Supervisor { - s := &Supervisor{perSrc: make(map[string]Decision)} + s := &Supervisor{ + perSrc: make(map[string]Decision), + limits: make(map[string]int), + restarts: make(map[string]int), + } for _, opt := range opts { opt(s) } return s } +// SetRespawner binds the Respawner a Restart decision re-spawns through, after +// construction. It is the ergonomic path when the System (the respawner) is built +// after the Supervisor, since the System's ActorSystem is wired with sup.Handle. +func (s *Supervisor) SetRespawner(r Respawner) { + s.mu.Lock() + defer s.mu.Unlock() + s.respawner = r +} + // DecisionFor returns the decision configured for src, or the default. func (s *Supervisor) DecisionFor(src string) Decision { if d, ok := s.perSrc[src]; ok { @@ -99,24 +147,48 @@ func (s *Supervisor) DecisionFor(src string) Decision { return s.def } -// Handle is the state.EscalationHandler the Supervisor exposes: it records the -// failure, then applies the src's decision — forwarding to the escalation sink for -// Escalate, or containing it for Stop. Wire it with +// Handle is the state.EscalationHandler the Supervisor exposes: it applies the +// src's decision — re-spawning for Restart (within budget), forwarding to the sink +// for Escalate, or containing it for Stop — and records the decision it actually +// applied. A Restart whose budget is spent or whose Respawner is missing escalates +// instead, and that fallthrough is what gets recorded. Wire it with // ActorSystem.WithEscalationHandler(sup.Handle). func (s *Supervisor) Handle(ctx context.Context, esc *state.ActorEscalation) { - d := s.DecisionFor(esc.Src) + applied := s.DecisionFor(esc.Src) + if applied == Restart && !s.tryRestart(ctx, esc) { + applied = Escalate // budget spent or no respawner: give up and propagate + } + if applied == Escalate && s.sink != nil { + s.sink(ctx, esc) + } + s.mu.Lock() s.handled = append(s.handled, HandledEscalation{ ActorID: esc.ActorID, Src: esc.Src, - Decision: d, + Decision: applied, Err: esc.Err, }) s.mu.Unlock() +} - if d == Escalate && s.sink != nil { - s.sink(ctx, esc) +// tryRestart re-spawns the failed actor if its src's restart budget is not yet +// spent and a Respawner is wired, reporting whether a restart was performed. The +// respawn runs outside the supervisor mutex so it may re-enter the system safely. +func (s *Supervisor) tryRestart(ctx context.Context, esc *state.ActorEscalation) bool { + s.mu.Lock() + if s.respawner == nil || s.restarts[esc.ActorID] >= s.limits[esc.Src] { + s.mu.Unlock() + return false } + s.restarts[esc.ActorID]++ + respawner := s.respawner + s.mu.Unlock() + + // A respawn that fails to start still counts against the budget; the next + // failure (or its absence) is what the caller observes. + _, _ = respawner.Respawn(ctx, esc.Src, esc.ActorID, nil) + return true } // Handled returns a snapshot of the failures this supervisor has processed, in diff --git a/cluster/system.go b/cluster/system.go index 8d23843..8b6a651 100644 --- a/cluster/system.go +++ b/cluster/system.go @@ -137,6 +137,16 @@ func (s *System[S, E, C]) SpawnLocal(ctx context.Context, src, id string, input return ref, nil } +// Respawn replaces the actor registered under id with a fresh instance from src: +// it first tears down any existing actor with that id (a failed actor stays +// registered as done until removed), then spawns anew. It is the primitive a +// supervisor's Restart decision drives, satisfying Respawner. Stopping a missing +// id is a no-op, so Respawn also works as a plain spawn. +func (s *System[S, E, C]) Respawn(ctx context.Context, src, id string, input map[string]any) (state.ActorRef, error) { + s.local.Stop(state.ActorRef{ID: id}) + return s.SpawnLocal(ctx, src, id, input) +} + // Ref resolves a local actor id to its ref, reporting whether this node runs it. func (s *System[S, E, C]) Ref(id string) (state.ActorRef, bool) { return s.local.Ref(id)