Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 71 additions & 12 deletions coreapi/recover.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"log/slog"
"runtime/debug"
"sync"
"sync/atomic"
)

Expand All @@ -26,6 +27,41 @@ func ResetPluginRecoveredPanicCountForTest() {
pluginRecoveredPanicCount.Store(0)
}

// maxPanicsBeforeUnhealthy is the per-plugin panic threshold.
// After this many panics, IsPluginHealthy returns false and a
// "plugin.<name>.unhealthy" event is published (once).
const maxPanicsBeforeUnhealthy = 3

// pluginPanicCounts maps plugin name → per-plugin panic counter.
// Used by IsPluginHealthy and RecoverPlugin to detect plugins that
// have exceeded the max-panics threshold and should be restarted
// or unloaded by the daemon supervisor.
var pluginPanicCounts sync.Map // map[string]*atomic.Uint64

// PluginPanicCount returns how many panics RecoverPlugin has caught
// for the named plugin since process start (or last reset).
func PluginPanicCount(name string) uint64 {
if v, ok := pluginPanicCounts.Load(name); ok {
return v.(*atomic.Uint64).Load()
}
return 0
}

// IsPluginHealthy returns false when a plugin has exceeded
// maxPanicsBeforeUnhealthy panics caught by RecoverPlugin.
func IsPluginHealthy(name string) bool {
return PluginPanicCount(name) < maxPanicsBeforeUnhealthy
}

// ResetPluginHealthForTest clears all per-plugin panic counters.
func ResetPluginHealthForTest() {
pluginPanicCounts.Range(func(key, _ any) bool {
pluginPanicCounts.Delete(key)
return true
})
ResetPluginRecoveredPanicCountForTest()
}

// RecoverPlugin is the L11 panic-recovery shim used at the top of
// every plugin entrypoint goroutine: Service.Start helper goroutines,
// acceptLoop, and per-connection handlers. Usage:
Expand All @@ -36,44 +72,67 @@ func ResetPluginRecoveredPanicCountForTest() {
// 1. Recovers (caller goroutine continues / loop iteration is dropped)
// 2. Logs at ERROR with structured plugin/op fields, panic value, and
// full goroutine stack trace
// 3. Increments PluginRecoveredPanicCount
// 3. Increments PluginRecoveredPanicCount and per-plugin panic count
// 4. Publishes a "plugin.<plugin>.panic" event on the bus (if
// events != nil) so observability subscribers see the recovery
// 5. Calls onPanic(r) if non-nil — typical use is per-conn close,
// 5. When the per-plugin panic count reaches maxPanicsBeforeUnhealthy
// (3), publishes one "plugin.<plugin>.unhealthy" event — the daemon
// supervisor should react by restarting or unloading the plugin
// 6. Calls onPanic(r) if non-nil — typical use is per-conn close,
// or signaling a future per-plugin supervisor for restart
//
// TODO(03-INVARIANTS.md §8): per-plugin supervisor not yet implemented.
// Today the boundary just survives + logs. A future tier will signal a
// restart of the panicked plugin via the onPanic callback.
//
// This must be the OUTERMOST defer in the goroutine: defers run LIFO,
// so other defers (conn.Close, mu.Unlock, removeSub) run first.
func RecoverPlugin(plugin, op string, events EventBus, onPanic func(any)) {
r := recover()
if r == nil {
return
}
count := pluginRecoveredPanicCount.Add(1)

// Global counter.
globalCount := pluginRecoveredPanicCount.Add(1)

// Per-plugin counter (lazily allocate).
perPluginVal, _ := pluginPanicCounts.LoadOrStore(plugin, new(atomic.Uint64))
perPlugin := perPluginVal.(*atomic.Uint64)
perPluginCount := perPlugin.Add(1)

slog.Error("plugin panic recovered",
"layer", "L11",
"plugin", plugin,
"op", op,
"panic", r,
"recovered_total", count,
"recovered_total", globalCount,
"plugin_panics", perPluginCount,
"stack", string(debug.Stack()),
)

if events != nil {
// Defensive: a publisher that itself panics must not propagate.
func() {
defer func() { _ = recover() }()
events.Publish("plugin."+plugin+".panic", map[string]any{
"plugin": plugin,
"op": op,
"panic": fmt.Sprintf("%v", r),
"recovered_total": count,
"plugin": plugin,
"op": op,
"panic": fmt.Sprintf("%v", r),
"recovered_total": globalCount,
"plugin_panics": perPluginCount,
})
}()

// One-shot unhealthy event when the threshold is crossed.
if perPluginCount == maxPanicsBeforeUnhealthy {
func() {
defer func() { _ = recover() }()
events.Publish("plugin."+plugin+".unhealthy", map[string]any{
"plugin": plugin,
"plugin_panics": perPluginCount,
"recovered_total": globalCount,
})
}()
}
}

if onPanic != nil {
func() {
defer func() { _ = recover() }()
Expand Down
61 changes: 61 additions & 0 deletions coreapi/zz_recover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,64 @@ func TestL11PluginPanicCallbackPanicSwallowed(t *testing.T) {
t.Fatal("L11 boundary did not record the primary panic")
}
}

// TestL11PerPluginUnhealthy verifies per-plugin panic tracking
// and the unhealthy-signal fires once when the threshold is crossed.
func TestL11PerPluginUnhealthy(t *testing.T) {
ResetPluginHealthForTest()
bus := &fakeBus{}
const name = "unstable-plugin"

if !IsPluginHealthy(name) || PluginPanicCount(name) != 0 {
t.Fatal("fresh plugin unhealthy or non-zero count")
}

// Induce panics up to threshold-1; plugin stays healthy.
boom := func() {
wg := sync.WaitGroup{}
wg.Add(1)
go func() { defer wg.Done(); defer RecoverPlugin(name, "loop", bus, nil); panic("x") }()
wg.Wait()
}
for i := 1; i < maxPanicsBeforeUnhealthy; i++ {
boom()
if got := PluginPanicCount(name); got != uint64(i) || !IsPluginHealthy(name) {
t.Fatalf("panic %d: count=%d healthy=%v", i, got, IsPluginHealthy(name))
}
}

// Threshold-crossing panic makes plugin unhealthy + fires event.
boom()
if IsPluginHealthy(name) {
t.Fatal("plugin should be unhealthy after threshold")
}
found := false
for _, tp := range bus.topics {
if tp == "plugin."+name+".unhealthy" {
found = true
break
}
}
if !found {
t.Fatalf("unhealthy event missing, topics: %v", bus.topics)
}

// One-shot: extra panic does not re-fire unhealthy.
before := 0
for _, tp := range bus.topics {
if tp == "plugin."+name+".unhealthy" {
before++
}
}
boom()
after := 0
for _, tp := range bus.topics {
if tp == "plugin."+name+".unhealthy" {
after++
}
}
if after != before {
t.Fatalf("unhealthy event re-fired (%d→%d)", before, after)
}
ResetPluginHealthForTest()
}
Loading