Skip to content

Commit

Permalink
engine: Implement clean closing of sinks
Browse files Browse the repository at this point in the history
Before the change, the engine didn't call the `Close()` method of the
sinks. This is needed in some cases, i.e when a sink implementation is
buffered.

This change adds a `Close()` method to the registry that
will signal sinks to exit and wait for all sinks to exit before
returning. This is then used in the engine stop logic.

In the channel-based registry, the closing of all sinks is done in
parallel (using a `sync.WaitGroup`). In the sync registry, sinks are
closed sequentially.

Fixes issue opsgenie#10
  • Loading branch information
mdemierre committed Dec 15, 2019
1 parent 14d0120 commit 106e78a
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 2 deletions.
2 changes: 2 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,6 @@ func main() {
log.Info().Str("signal", sig.String()).Msg("Received signal to exit")
defer close(c)
w.Stop()
engine.Stop()
log.Info().Msg("Exiting")
}
27 changes: 25 additions & 2 deletions pkg/exporter/channel_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package exporter

import (
"context"
"sync"

"github.com/opsgenie/kubernetes-event-exporter/pkg/kube"
"github.com/opsgenie/kubernetes-event-exporter/pkg/sinks"
"github.com/rs/zerolog/log"
Expand All @@ -11,9 +13,11 @@ import (
// for breaking out of the infinite loop. Each message is passed to receivers
// This might not be the best way to implement such feature. A ring buffer can be better
// and we might need a mechanism to drop the vents
// On closing, the registry sends a signal on all exit channels, and then waits for all to complete.
type ChannelBasedReceiverRegistry struct {
ch map[string]chan kube.EnhancedEvent
exitCh map[string]chan interface{}
wg *sync.WaitGroup
}

func (r *ChannelBasedReceiverRegistry) SendEvent(name string, event *kube.EnhancedEvent) {
Expand All @@ -39,7 +43,13 @@ func (r *ChannelBasedReceiverRegistry) Register(name string, receiver sinks.Sink
r.ch[name] = ch
r.exitCh[name] = exitCh

if r.wg == nil {
r.wg = &sync.WaitGroup{}
}
r.wg.Add(1)

go func() {
Loop:
for {
select {
case ev := <-ch:
Expand All @@ -49,9 +59,22 @@ func (r *ChannelBasedReceiverRegistry) Register(name string, receiver sinks.Sink
log.Debug().Err(err).Str("sink", name).Str("event", ev.Message).Msg("Cannot send event")
}
case <-exitCh:
log.Info().Str("receiver", name).Msg("Killing the receiver")
break
log.Info().Str("sink", name).Msg("Closing the sink")
break Loop
}
}
receiver.Close()
log.Info().Str("sink", name).Msg("Closed")
r.wg.Done()
}()
}

// Close signals closing to all sinks and waits for them to complete.
// The wait could block indefinitely depending on the sink implementations.
func (r *ChannelBasedReceiverRegistry) Close() {
// Send exit command and wait for exit of all sinks
for _, ec := range r.exitCh {
ec <- 1
}
r.wg.Wait()
}
7 changes: 7 additions & 0 deletions pkg/exporter/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,10 @@ func NewEngine(config *Config, registry ReceiverRegistry) *Engine {
func (e *Engine) OnEvent(event *kube.EnhancedEvent) {
e.Route.ProcessEvent(event, e.Registry)
}

// Stop stops all registered sinks
func (e *Engine) Stop() {
log.Info().Msg("Closing sinks")
e.Registry.Close()
log.Info().Msg("All sinks closed")
}
1 change: 1 addition & 0 deletions pkg/exporter/recievers.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ import (
type ReceiverRegistry interface {
SendEvent(string, *kube.EnhancedEvent)
Register(string, sinks.Sink)
Close()
}
4 changes: 4 additions & 0 deletions pkg/exporter/route_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ func (t *testReceiverRegistry) SendEvent(name string, event *kube.EnhancedEvent)
t.rcvd[name] = append(t.rcvd[name], event)
}

func (t *testReceiverRegistry) Close() {
// No-op
}

func (t *testReceiverRegistry) isEventRcvd(name string, event *kube.EnhancedEvent) bool {
if val, ok := t.rcvd[name]; !ok {
return false
Expand Down
7 changes: 7 additions & 0 deletions pkg/exporter/sync_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,10 @@ func (s *SyncRegistry) Register(name string, sink sinks.Sink) {

s.reg[name] = sink
}

func (s *SyncRegistry) Close() {
for name, sink := range s.reg {
log.Info().Str("sink", name).Msg("Closing sink")
sink.Close()
}
}

0 comments on commit 106e78a

Please sign in to comment.