diff --git a/pkg/exporter/channel_registry.go b/pkg/exporter/channel_registry.go index f3796239..acbf26e9 100644 --- a/pkg/exporter/channel_registry.go +++ b/pkg/exporter/channel_registry.go @@ -3,6 +3,7 @@ package exporter import ( "context" "sync" + "time" "github.com/tinybirdco/kubernetes-event-exporter/pkg/kube" "github.com/tinybirdco/kubernetes-event-exporter/pkg/metrics" @@ -10,6 +11,11 @@ import ( "github.com/rs/zerolog/log" ) +const ( + // sinkSendTimeout is the maximum time allowed for a sink to process an event + sinkSendTimeout = 30 * time.Second +) + // ChannelBasedReceiverRegistry creates two channels for each receiver. One is for receiving events and other one is // for breaking out of the infinite loop. Each message is passed to receivers // This might not be the best way to implement such feature. A ring buffer can be better @@ -51,15 +57,61 @@ func (r *ChannelBasedReceiverRegistry) Register(name string, receiver sinks.Sink r.wg.Add(1) go func() { + // Ensure panic recovery to prevent sink crashes from stopping event processing + defer func() { + if panicErr := recover(); panicErr != nil { + log.Error(). + Str("sink", name). + Interface("panic", panicErr). + Msg("Sink panic recovered, closing sink") + + // Protect against Close() panicking again + func() { + defer func() { + if closeErr := recover(); closeErr != nil { + log.Error(). + Str("sink", name). + Interface("panic", closeErr). + Msg("Panic during Close() in recovery handler") + } + }() + receiver.Close() + }() + + log.Info().Str("sink", name).Msg("Closed after panic") + r.wg.Done() + } + }() + Loop: for { select { case ev := <-ch: log.Debug().Str("sink", name).Str("event", ev.Message).Msg("sending event to sink") - err := receiver.Send(context.Background(), &ev) + + // Create context with timeout to prevent indefinite blocking + ctx, cancel := context.WithTimeout(context.Background(), sinkSendTimeout) + + // Send event with timeout protection + err := receiver.Send(ctx, &ev) + cancel() // Always cancel context to release resources + if err != nil { r.MetricsStore.SendErrors.Inc() - log.Debug().Err(err).Str("sink", name).Str("event", ev.Message).Msg("Cannot send event") + if err == context.DeadlineExceeded { + log.Error(). + Err(err). + Str("sink", name). + Str("event", ev.Message). + Dur("timeout", sinkSendTimeout). + Msg("Sink send timeout exceeded") + } else { + log.Debug(). + Err(err). + Str("sink", name). + Str("event", ev.Message). + Msg("Cannot send event") + } } case <-exitCh: log.Info().Str("sink", name).Msg("Closing the sink")