diff --git a/coreapi/recover.go b/coreapi/recover.go index 7c771d4..1132b0f 100644 --- a/coreapi/recover.go +++ b/coreapi/recover.go @@ -6,6 +6,7 @@ import ( "fmt" "log/slog" "runtime/debug" + "sync" "sync/atomic" ) @@ -26,6 +27,41 @@ func ResetPluginRecoveredPanicCountForTest() { pluginRecoveredPanicCount.Store(0) } +// maxPanicsBeforeUnhealthy is the per-plugin panic threshold. +// After this many panics, IsPluginHealthy returns false and a +// "plugin..unhealthy" event is published (once). +const maxPanicsBeforeUnhealthy = 3 + +// pluginPanicCounts maps plugin name → per-plugin panic counter. +// Used by IsPluginHealthy and RecoverPlugin to detect plugins that +// have exceeded the max-panics threshold and should be restarted +// or unloaded by the daemon supervisor. +var pluginPanicCounts sync.Map // map[string]*atomic.Uint64 + +// PluginPanicCount returns how many panics RecoverPlugin has caught +// for the named plugin since process start (or last reset). +func PluginPanicCount(name string) uint64 { + if v, ok := pluginPanicCounts.Load(name); ok { + return v.(*atomic.Uint64).Load() + } + return 0 +} + +// IsPluginHealthy returns false when a plugin has exceeded +// maxPanicsBeforeUnhealthy panics caught by RecoverPlugin. +func IsPluginHealthy(name string) bool { + return PluginPanicCount(name) < maxPanicsBeforeUnhealthy +} + +// ResetPluginHealthForTest clears all per-plugin panic counters. +func ResetPluginHealthForTest() { + pluginPanicCounts.Range(func(key, _ any) bool { + pluginPanicCounts.Delete(key) + return true + }) + ResetPluginRecoveredPanicCountForTest() +} + // RecoverPlugin is the L11 panic-recovery shim used at the top of // every plugin entrypoint goroutine: Service.Start helper goroutines, // acceptLoop, and per-connection handlers. Usage: @@ -36,16 +72,15 @@ func ResetPluginRecoveredPanicCountForTest() { // 1. Recovers (caller goroutine continues / loop iteration is dropped) // 2. Logs at ERROR with structured plugin/op fields, panic value, and // full goroutine stack trace -// 3. Increments PluginRecoveredPanicCount +// 3. Increments PluginRecoveredPanicCount and per-plugin panic count // 4. Publishes a "plugin..panic" event on the bus (if // events != nil) so observability subscribers see the recovery -// 5. Calls onPanic(r) if non-nil — typical use is per-conn close, +// 5. When the per-plugin panic count reaches maxPanicsBeforeUnhealthy +// (3), publishes one "plugin..unhealthy" event — the daemon +// supervisor should react by restarting or unloading the plugin +// 6. Calls onPanic(r) if non-nil — typical use is per-conn close, // or signaling a future per-plugin supervisor for restart // -// TODO(03-INVARIANTS.md §8): per-plugin supervisor not yet implemented. -// Today the boundary just survives + logs. A future tier will signal a -// restart of the panicked plugin via the onPanic callback. -// // This must be the OUTERMOST defer in the goroutine: defers run LIFO, // so other defers (conn.Close, mu.Unlock, removeSub) run first. func RecoverPlugin(plugin, op string, events EventBus, onPanic func(any)) { @@ -53,27 +88,51 @@ func RecoverPlugin(plugin, op string, events EventBus, onPanic func(any)) { if r == nil { return } - count := pluginRecoveredPanicCount.Add(1) + + // Global counter. + globalCount := pluginRecoveredPanicCount.Add(1) + + // Per-plugin counter (lazily allocate). + perPluginVal, _ := pluginPanicCounts.LoadOrStore(plugin, new(atomic.Uint64)) + perPlugin := perPluginVal.(*atomic.Uint64) + perPluginCount := perPlugin.Add(1) + slog.Error("plugin panic recovered", "layer", "L11", "plugin", plugin, "op", op, "panic", r, - "recovered_total", count, + "recovered_total", globalCount, + "plugin_panics", perPluginCount, "stack", string(debug.Stack()), ) + if events != nil { // Defensive: a publisher that itself panics must not propagate. func() { defer func() { _ = recover() }() events.Publish("plugin."+plugin+".panic", map[string]any{ - "plugin": plugin, - "op": op, - "panic": fmt.Sprintf("%v", r), - "recovered_total": count, + "plugin": plugin, + "op": op, + "panic": fmt.Sprintf("%v", r), + "recovered_total": globalCount, + "plugin_panics": perPluginCount, }) }() + + // One-shot unhealthy event when the threshold is crossed. + if perPluginCount == maxPanicsBeforeUnhealthy { + func() { + defer func() { _ = recover() }() + events.Publish("plugin."+plugin+".unhealthy", map[string]any{ + "plugin": plugin, + "plugin_panics": perPluginCount, + "recovered_total": globalCount, + }) + }() + } } + if onPanic != nil { func() { defer func() { _ = recover() }() diff --git a/coreapi/zz_recover_test.go b/coreapi/zz_recover_test.go index b8a40ad..c6c3a45 100644 --- a/coreapi/zz_recover_test.go +++ b/coreapi/zz_recover_test.go @@ -128,3 +128,64 @@ func TestL11PluginPanicCallbackPanicSwallowed(t *testing.T) { t.Fatal("L11 boundary did not record the primary panic") } } + +// TestL11PerPluginUnhealthy verifies per-plugin panic tracking +// and the unhealthy-signal fires once when the threshold is crossed. +func TestL11PerPluginUnhealthy(t *testing.T) { + ResetPluginHealthForTest() + bus := &fakeBus{} + const name = "unstable-plugin" + + if !IsPluginHealthy(name) || PluginPanicCount(name) != 0 { + t.Fatal("fresh plugin unhealthy or non-zero count") + } + + // Induce panics up to threshold-1; plugin stays healthy. + boom := func() { + wg := sync.WaitGroup{} + wg.Add(1) + go func() { defer wg.Done(); defer RecoverPlugin(name, "loop", bus, nil); panic("x") }() + wg.Wait() + } + for i := 1; i < maxPanicsBeforeUnhealthy; i++ { + boom() + if got := PluginPanicCount(name); got != uint64(i) || !IsPluginHealthy(name) { + t.Fatalf("panic %d: count=%d healthy=%v", i, got, IsPluginHealthy(name)) + } + } + + // Threshold-crossing panic makes plugin unhealthy + fires event. + boom() + if IsPluginHealthy(name) { + t.Fatal("plugin should be unhealthy after threshold") + } + found := false + for _, tp := range bus.topics { + if tp == "plugin."+name+".unhealthy" { + found = true + break + } + } + if !found { + t.Fatalf("unhealthy event missing, topics: %v", bus.topics) + } + + // One-shot: extra panic does not re-fire unhealthy. + before := 0 + for _, tp := range bus.topics { + if tp == "plugin."+name+".unhealthy" { + before++ + } + } + boom() + after := 0 + for _, tp := range bus.topics { + if tp == "plugin."+name+".unhealthy" { + after++ + } + } + if after != before { + t.Fatalf("unhealthy event re-fired (%d→%d)", before, after) + } + ResetPluginHealthForTest() +}