From 491ed6dcbc2724e6b459444c243ecd03f5e06cde Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Rodr=C3=ADguez?= Date: Tue, 14 Oct 2025 17:06:24 +0200 Subject: [PATCH 1/3] feat: sink panic and send timeout --- pkg/exporter/channel_registry.go | 43 ++++++++++++++++++++++++++++++-- 1 file changed, 41 insertions(+), 2 deletions(-) diff --git a/pkg/exporter/channel_registry.go b/pkg/exporter/channel_registry.go index f3796239..8ddbe128 100644 --- a/pkg/exporter/channel_registry.go +++ b/pkg/exporter/channel_registry.go @@ -3,6 +3,7 @@ package exporter import ( "context" "sync" + "time" "github.com/tinybirdco/kubernetes-event-exporter/pkg/kube" "github.com/tinybirdco/kubernetes-event-exporter/pkg/metrics" @@ -10,6 +11,11 @@ import ( "github.com/rs/zerolog/log" ) +const ( + // sinkSendTimeout is the maximum time allowed for a sink to process an event + sinkSendTimeout = 30 * time.Second +) + // ChannelBasedReceiverRegistry creates two channels for each receiver. One is for receiving events and other one is // for breaking out of the infinite loop. Each message is passed to receivers // This might not be the best way to implement such feature. A ring buffer can be better @@ -51,15 +57,48 @@ func (r *ChannelBasedReceiverRegistry) Register(name string, receiver sinks.Sink r.wg.Add(1) go func() { + // Ensure panic recovery to prevent sink crashes from stopping event processing + defer func() { + if r := recover(); r != nil { + log.Error(). + Str("sink", name). + Interface("panic", r). + Msg("Sink panic recovered, closing sink") + receiver.Close() + log.Info().Str("sink", name).Msg("Closed after panic") + r.wg.Done() + } + }() + Loop: for { select { case ev := <-ch: log.Debug().Str("sink", name).Str("event", ev.Message).Msg("sending event to sink") - err := receiver.Send(context.Background(), &ev) + + // Create context with timeout to prevent indefinite blocking + ctx, cancel := context.WithTimeout(context.Background(), sinkSendTimeout) + + // Send event with timeout protection + err := receiver.Send(ctx, &ev) + cancel() // Always cancel context to release resources + if err != nil { r.MetricsStore.SendErrors.Inc() - log.Debug().Err(err).Str("sink", name).Str("event", ev.Message).Msg("Cannot send event") + if err == context.DeadlineExceeded { + log.Error(). + Err(err). + Str("sink", name). + Str("event", ev.Message). + Dur("timeout", sinkSendTimeout). + Msg("Sink send timeout exceeded") + } else { + log.Debug(). + Err(err). + Str("sink", name). + Str("event", ev.Message). + Msg("Cannot send event") + } } case <-exitCh: log.Info().Str("sink", name).Msg("Closing the sink") From c5555366fc16f8292e3051b432581c7fc2d56abb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Rodr=C3=ADguez?= Date: Tue, 14 Oct 2025 17:18:54 +0200 Subject: [PATCH 2/3] Fix variable shadowing --- pkg/exporter/channel_registry.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/exporter/channel_registry.go b/pkg/exporter/channel_registry.go index 8ddbe128..40bd3813 100644 --- a/pkg/exporter/channel_registry.go +++ b/pkg/exporter/channel_registry.go @@ -59,10 +59,10 @@ func (r *ChannelBasedReceiverRegistry) Register(name string, receiver sinks.Sink go func() { // Ensure panic recovery to prevent sink crashes from stopping event processing defer func() { - if r := recover(); r != nil { + if panicErr := recover(); panicErr != nil { log.Error(). Str("sink", name). - Interface("panic", r). + Interface("panic", panicErr). Msg("Sink panic recovered, closing sink") receiver.Close() log.Info().Str("sink", name).Msg("Closed after panic") From 17c7ee1ccf40a14d31d56ec1ed522d1e26e1cf0e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Rodr=C3=ADguez?= Date: Tue, 14 Oct 2025 17:24:56 +0200 Subject: [PATCH 3/3] Close panicking protection --- pkg/exporter/channel_registry.go | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/pkg/exporter/channel_registry.go b/pkg/exporter/channel_registry.go index 40bd3813..acbf26e9 100644 --- a/pkg/exporter/channel_registry.go +++ b/pkg/exporter/channel_registry.go @@ -64,7 +64,20 @@ func (r *ChannelBasedReceiverRegistry) Register(name string, receiver sinks.Sink Str("sink", name). Interface("panic", panicErr). Msg("Sink panic recovered, closing sink") - receiver.Close() + + // Protect against Close() panicking again + func() { + defer func() { + if closeErr := recover(); closeErr != nil { + log.Error(). + Str("sink", name). + Interface("panic", closeErr). + Msg("Panic during Close() in recovery handler") + } + }() + receiver.Close() + }() + log.Info().Str("sink", name).Msg("Closed after panic") r.wg.Done() }