Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 139 additions & 0 deletions cluster/restart_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package cluster_test

import (
"context"
"errors"
"testing"

"github.com/stablekernel/crucible/cluster"
"github.com/stablekernel/crucible/state"
)

// superviseSystem builds a node-scoped System whose local ActorSystem routes
// failures to sup, with one worker spawned and running. It returns the System so
// a test can fail and observe restarts.
func superviseSystem(t *testing.T, sup *cluster.Supervisor) *cluster.System[string, string, *parentEnt] {
t.Helper()
parent := parentMachine().Cast(&parentEnt{}, state.WithInitialState("idle"))
actorSys := state.NewActorSystem(parent).
Register("child", childBehavior()).
WithEscalationHandler(sup.Handle)
ctx := context.Background()
res := parent.Fire(ctx, "go")
actorSys.Absorb(ctx, res.Effects)
sys := cluster.NewSystem("node-a", actorSys)
if sys.Running() != 1 {
t.Fatalf("setup Running() = %d, want 1", sys.Running())
}
return sys
}

// TestSupervisor_RestartReinstatesActor confirms a Restart decision re-spawns the
// failed actor, so the actor is running again after the failure.
func TestSupervisor_RestartReinstatesActor(t *testing.T) {
ctx := context.Background()
sup := cluster.NewSupervisor(cluster.WithRestart("child", 3))
// Wire the respawner after the system exists.
sys := superviseSystemWithRespawner(t, sup)

if _, routed := sys.Local().SettleError(ctx, "worker-1", errors.New("boom")); routed {
t.Fatal("SettleError routed an onError unexpectedly")
}
if sys.Running() != 1 {
t.Fatalf("Running() after restart = %d, want 1 (actor reinstated)", sys.Running())
}
handled := sup.Handled()
if len(handled) != 1 || handled[0].Decision != cluster.Restart {
t.Fatalf("Handled() = %+v, want one Restart", handled)
}
}

// superviseSystemWithRespawner is like superviseSystem but binds the System as the
// supervisor's respawner via WithRespawner, set before failures occur.
func superviseSystemWithRespawner(t *testing.T, sup *cluster.Supervisor) *cluster.System[string, string, *parentEnt] {
t.Helper()
parent := parentMachine().Cast(&parentEnt{}, state.WithInitialState("idle"))
actorSys := state.NewActorSystem(parent).
Register("child", childBehavior()).
WithEscalationHandler(sup.Handle)
ctx := context.Background()
res := parent.Fire(ctx, "go")
actorSys.Absorb(ctx, res.Effects)
sys := cluster.NewSystem("node-a", actorSys)
sup.SetRespawner(sys)
if sys.Running() != 1 {
t.Fatalf("setup Running() = %d, want 1", sys.Running())
}
return sys
}

// TestSupervisor_RestartBudgetExhausts confirms restarts stop after the per-src
// budget is spent and the failure then escalates to the sink.
func TestSupervisor_RestartBudgetExhausts(t *testing.T) {
ctx := context.Background()
var escalated int
sup := cluster.NewSupervisor(
cluster.WithRestart("child", 2),
cluster.WithEscalationSink(func(context.Context, *state.ActorEscalation) { escalated++ }),
)
sys := superviseSystemWithRespawner(t, sup)

// Two failures are absorbed by restarts; the actor is running each time.
for i := range 2 {
sys.Local().SettleError(ctx, "worker-1", errors.New("boom"))
if sys.Running() != 1 {
t.Fatalf("after failure %d Running() = %d, want 1 (restarted)", i+1, sys.Running())
}
}
if escalated != 0 {
t.Fatalf("escalated %d times before budget exhausted, want 0", escalated)
}

// The third failure exhausts the budget: no restart, escalates instead.
sys.Local().SettleError(ctx, "worker-1", errors.New("boom"))
if sys.Running() != 0 {
t.Fatalf("after budget exhausted Running() = %d, want 0 (not restarted)", sys.Running())
}
if escalated != 1 {
t.Fatalf("escalations after exhaustion = %d, want 1", escalated)
}
}

// TestSupervisor_WithRespawnerOption wires the respawner at construction (the
// build-system-first ordering) instead of via SetRespawner, and confirms restart
// still reinstates the actor.
func TestSupervisor_WithRespawnerOption(t *testing.T) {
ctx := context.Background()
parent := parentMachine().Cast(&parentEnt{}, state.WithInitialState("idle"))
actorSys := state.NewActorSystem(parent).Register("child", childBehavior())
sys := cluster.NewSystem("node-a", actorSys)
sup := cluster.NewSupervisor(cluster.WithRestart("child", 1), cluster.WithRespawner(sys))
actorSys.WithEscalationHandler(sup.Handle)

res := parent.Fire(ctx, "go")
actorSys.Absorb(ctx, res.Effects)
sys.Local().SettleError(ctx, "worker-1", errors.New("boom"))
if sys.Running() != 1 {
t.Fatalf("Running() after restart = %d, want 1", sys.Running())
}
}

// TestSupervisor_RestartNoRespawner falls through to escalation when a Restart
// decision is configured but no respawner is wired.
func TestSupervisor_RestartNoRespawner(t *testing.T) {
ctx := context.Background()
var escalated int
sup := cluster.NewSupervisor(
cluster.WithRestart("child", 3),
cluster.WithEscalationSink(func(context.Context, *state.ActorEscalation) { escalated++ }),
)
sys := superviseSystem(t, sup) // no respawner bound

sys.Local().SettleError(ctx, "worker-1", errors.New("boom"))
if escalated != 1 {
t.Fatalf("escalations without respawner = %d, want 1 (fell through)", escalated)
}
if sys.Running() != 0 {
t.Fatalf("Running() = %d, want 0 (no restart possible)", sys.Running())
}
}
92 changes: 82 additions & 10 deletions cluster/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ const (
// Stop contains the failure at this level: the failed actor stays down and the
// failure is not forwarded.
Stop
// Restart re-spawns the failed actor through the configured Respawner, bounded
// by the per-src restart budget set with WithRestart. When the budget is spent
// (or no Respawner is wired) the failure escalates instead. Configure it with
// WithRestart, not WithDecision.
Restart
)

// String renders a Decision for diagnostics.
Expand All @@ -27,11 +32,20 @@ func (d Decision) String() string {
return "stop"
case Escalate:
return "escalate"
case Restart:
return "restart"
default:
return "unknown"
}
}

// Respawner re-creates a failed actor in its local system, replacing the dead
// instance registered under id. *System satisfies it (via Respawn), so wiring
// restart is just handing the supervisor the System.
type Respawner interface {
Respawn(ctx context.Context, src, id string, input map[string]any) (state.ActorRef, error)
}

// HandledEscalation records one failure a Supervisor processed, for observability.
type HandledEscalation struct {
// ActorID is the registry id of the actor that failed.
Expand All @@ -53,10 +67,13 @@ type HandledEscalation struct {
type Supervisor struct {
def Decision
perSrc map[string]Decision
limits map[string]int // per-src restart budget for the Restart decision
sink state.EscalationHandler

mu sync.Mutex
handled []HandledEscalation
mu sync.Mutex
respawner Respawner
restarts map[string]int // per-actor-id restarts already spent
handled []HandledEscalation
}

// SupervisorOption configures a Supervisor. New strategies arrive as additional
Expand All @@ -81,16 +98,47 @@ func WithEscalationSink(h state.EscalationHandler) SupervisorOption {
return func(s *Supervisor) { s.sink = h }
}

// WithRestart sets the Restart decision for failures of actors spawned from src,
// re-spawning the actor up to maxRestarts times (counted per actor id). When the
// budget is spent the failure escalates instead, so a crash-looping actor cannot
// restart-storm. Restart needs a Respawner wired with WithRespawner or SetRespawner.
func WithRestart(src string, maxRestarts int) SupervisorOption {
return func(s *Supervisor) {
s.perSrc[src] = Restart
s.limits[src] = maxRestarts
}
}

// WithRespawner wires the Respawner a Restart decision re-spawns through. The
// node's *System is the usual respawner. SetRespawner does the same after
// construction, for the common case where the System is built after the Supervisor.
func WithRespawner(r Respawner) SupervisorOption {
return func(s *Supervisor) { s.respawner = r }
}

// NewSupervisor builds a Supervisor. Wire it into a system with
// ActorSystem.WithEscalationHandler(sup.Handle).
func NewSupervisor(opts ...SupervisorOption) *Supervisor {
s := &Supervisor{perSrc: make(map[string]Decision)}
s := &Supervisor{
perSrc: make(map[string]Decision),
limits: make(map[string]int),
restarts: make(map[string]int),
}
for _, opt := range opts {
opt(s)
}
return s
}

// SetRespawner binds the Respawner a Restart decision re-spawns through, after
// construction. It is the ergonomic path when the System (the respawner) is built
// after the Supervisor, since the System's ActorSystem is wired with sup.Handle.
func (s *Supervisor) SetRespawner(r Respawner) {
s.mu.Lock()
defer s.mu.Unlock()
s.respawner = r
}

// DecisionFor returns the decision configured for src, or the default.
func (s *Supervisor) DecisionFor(src string) Decision {
if d, ok := s.perSrc[src]; ok {
Expand All @@ -99,24 +147,48 @@ func (s *Supervisor) DecisionFor(src string) Decision {
return s.def
}

// Handle is the state.EscalationHandler the Supervisor exposes: it records the
// failure, then applies the src's decision — forwarding to the escalation sink for
// Escalate, or containing it for Stop. Wire it with
// Handle is the state.EscalationHandler the Supervisor exposes: it applies the
// src's decision — re-spawning for Restart (within budget), forwarding to the sink
// for Escalate, or containing it for Stop — and records the decision it actually
// applied. A Restart whose budget is spent or whose Respawner is missing escalates
// instead, and that fallthrough is what gets recorded. Wire it with
// ActorSystem.WithEscalationHandler(sup.Handle).
func (s *Supervisor) Handle(ctx context.Context, esc *state.ActorEscalation) {
d := s.DecisionFor(esc.Src)
applied := s.DecisionFor(esc.Src)
if applied == Restart && !s.tryRestart(ctx, esc) {
applied = Escalate // budget spent or no respawner: give up and propagate
}
if applied == Escalate && s.sink != nil {
s.sink(ctx, esc)
}

s.mu.Lock()
s.handled = append(s.handled, HandledEscalation{
ActorID: esc.ActorID,
Src: esc.Src,
Decision: d,
Decision: applied,
Err: esc.Err,
})
s.mu.Unlock()
}

if d == Escalate && s.sink != nil {
s.sink(ctx, esc)
// tryRestart re-spawns the failed actor if its src's restart budget is not yet
// spent and a Respawner is wired, reporting whether a restart was performed. The
// respawn runs outside the supervisor mutex so it may re-enter the system safely.
func (s *Supervisor) tryRestart(ctx context.Context, esc *state.ActorEscalation) bool {
s.mu.Lock()
if s.respawner == nil || s.restarts[esc.ActorID] >= s.limits[esc.Src] {
s.mu.Unlock()
return false
}
s.restarts[esc.ActorID]++
respawner := s.respawner
s.mu.Unlock()

// A respawn that fails to start still counts against the budget; the next
// failure (or its absence) is what the caller observes.
_, _ = respawner.Respawn(ctx, esc.Src, esc.ActorID, nil)
return true
}

// Handled returns a snapshot of the failures this supervisor has processed, in
Expand Down
10 changes: 10 additions & 0 deletions cluster/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,16 @@ func (s *System[S, E, C]) SpawnLocal(ctx context.Context, src, id string, input
return ref, nil
}

// Respawn replaces the actor registered under id with a fresh instance from src:
// it first tears down any existing actor with that id (a failed actor stays
// registered as done until removed), then spawns anew. It is the primitive a
// supervisor's Restart decision drives, satisfying Respawner. Stopping a missing
// id is a no-op, so Respawn also works as a plain spawn.
func (s *System[S, E, C]) Respawn(ctx context.Context, src, id string, input map[string]any) (state.ActorRef, error) {
s.local.Stop(state.ActorRef{ID: id})
return s.SpawnLocal(ctx, src, id, input)
}

// Ref resolves a local actor id to its ref, reporting whether this node runs it.
func (s *System[S, E, C]) Ref(id string) (state.ActorRef, bool) {
return s.local.Ref(id)
Expand Down
Loading