Skip to content

Commit

Permalink
Improves leader election so that we don't lose events (#153)
Browse files Browse the repository at this point in the history
Currently, when a replica loses its leadership, a new leader isn't elected until leaseDuration seconds.
Here, that is 15s. The max time till we get a new leader is leaseDuration (15s) + retryPeriod (2s) = 17s.

This commit updates the shutdown process such that if the leader replica is sent a shutdown signal,
it sleeps for leaseDuration seconds. This allows the leader replica to continue to export events until
a new leader is elected. And a new leader is elected only if lease hasn't been renewed and leaseDuration expires.

In addition to this, leader election now uses the leases object instead of configMaps and leases. The clusterRole
is also updated to allow writing to the leases object.

For use cases where no event loss is tolerable, users should use maxEventAgeSeconds to > 1.
  • Loading branch information
ronaknnathani committed Feb 23, 2024
1 parent 0376ea2 commit d7ba03a
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 42 deletions.
3 changes: 3 additions & 0 deletions deploy/00-roles.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,6 @@ rules:
- apiGroups: ["*"]
resources: ["*"]
verbs: ["get", "watch", "list"]
- apiGroups: ["coordination.k8s.io"]
resources: ["leases"]
verbs: ["*"]
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ require (
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/emicklei/go-restful/v3 v3.10.1 // indirect
github.com/evanphx/json-patch v4.12.0+incompatible // indirect
github.com/fatih/color v1.15.0 // indirect
github.com/go-kit/log v0.2.1 // indirect
github.com/go-logfmt/logfmt v0.5.1 // indirect
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m
github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/evanphx/json-patch v4.12.0+incompatible h1:4onqiflcdA9EOZ4RxV643DvftH5pOlLGNtQ5lPWQu84=
github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/fatih/color v1.15.0 h1:kOqh6YHBtK8aywxGerMG2Eq3H6Qgoqeo13Bk2Mv/nBs=
github.com/fatih/color v1.15.0/go.mod h1:0h5ZqXfHYED7Bhv2ZJamyIOUej9KtShiJESRwBDUSsw=
github.com/flowstack/go-jsonschema v0.1.1/go.mod h1:yL7fNggx1o8rm9RlgXv7hTBWxdBM0rVwpMwimd3F3N0=
Expand Down
72 changes: 45 additions & 27 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func main() {

configBytes = []byte(os.ExpandEnv(string(configBytes)))

cfg, err := setup.ParseConfigFromBites(configBytes)
cfg, err := setup.ParseConfigFromBytes(configBytes)
if err != nil {
log.Fatal().Msg(err.Error())
}
Expand Down Expand Up @@ -64,6 +64,8 @@ func main() {

cfg.SetDefaults()

log.Info().Msgf("Starting with config: %#v", cfg)

if err := cfg.Validate(); err != nil {
log.Fatal().Err(err).Msg("config validation failed")
}
Expand Down Expand Up @@ -91,45 +93,61 @@ func main() {

w := kube.NewEventWatcher(kubecfg, cfg.Namespace, cfg.MaxEventAgeSeconds, metricsStore, onEvent, cfg.OmitLookup, cfg.CacheSize)

ctx, cancel := context.WithCancel(context.Background())
leaderLost := make(chan bool)
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()

if cfg.LeaderElection.Enabled {
var wasLeader bool
log.Info().Msg("leader election enabled")

onStoppedLeading := func(ctx context.Context) {
select {
case <-ctx.Done():
log.Info().Msg("Context was cancelled, stopping leader election loop")
default:
log.Info().Msg("Lost the leader lease, stopping leader election loop")
}
}

l, err := kube.NewLeaderElector(cfg.LeaderElection.LeaderElectionID, kubecfg,
// this method gets called when this instance becomes the leader
func(_ context.Context) {
log.Info().Msg("leader election got")
wasLeader = true
log.Info().Msg("leader election won")
w.Start()
},
// this method gets called when the leader election loop is closed
// either due to context cancellation or due to losing the leader lease
func() {
log.Error().Msg("leader election lost")
leaderLost <- true
onStoppedLeading(ctx)
},
func(identity string) {
log.Info().Msg("new leader observed: " + identity)
},
)
if err != nil {
log.Fatal().Err(err).Msg("create leaderelector failed")
}
go l.Run(ctx)

// Run returns if either the context is canceled or client stopped holding the leader lease
l.Run(ctx)

// We get here either because we lost the leader lease or the context was canceled.
// In either case we want to stop the event watcher and exit.
// However, if we were the leader, we wait leaseDuration seconds before stopping
// so that we don't lose events until the next leader is elected. The new leader
// will only be elected after leaseDuration seconds.
if wasLeader {
log.Info().Msgf("waiting leaseDuration seconds before stopping: %s", kube.GetLeaseDuration())
time.Sleep(kube.GetLeaseDuration())
}
} else {
log.Info().Msg("leader election disabled")
w.Start()
<-ctx.Done()
}

c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)

gracefulExit := func() {
defer close(c)
defer close(leaderLost)
cancel()
w.Stop()
engine.Stop()
log.Info().Msg("Exiting")
}

select {
case sig := <-c:
log.Info().Str("signal", sig.String()).Msg("Received signal to exit")
gracefulExit()
case <-leaderLost:
log.Warn().Msg("Leader election lost")
gracefulExit()
}
log.Info().Msg("Received signal to exit. Stopping.")
w.Stop()
engine.Stop()
}
9 changes: 7 additions & 2 deletions pkg/kube/leaderelection.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ const (
defaultRetryPeriod = 2 * time.Second
)

func GetLeaseDuration() time.Duration {
return defaultLeaseDuration
}

// NewResourceLock creates a new config map resource lock for use in a leader
// election loop
func newResourceLock(config *rest.Config, leaderElectionID string) (resourcelock.Interface, error) {
Expand All @@ -53,7 +57,7 @@ func newResourceLock(config *rest.Config, leaderElectionID string) (resourcelock
return nil, err
}

return resourcelock.New(resourcelock.ConfigMapsLeasesResourceLock,
return resourcelock.New(resourcelock.LeasesResourceLock,
leaderElectionNamespace,
leaderElectionID,
client.CoreV1(),
Expand Down Expand Up @@ -82,7 +86,7 @@ func getInClusterNamespace() (string, error) {
}

// NewLeaderElector return a leader elector object using client-go
func NewLeaderElector(leaderElectionID string, config *rest.Config, startFunc func(context.Context), stopFunc func()) (*leaderelection.LeaderElector, error) {
func NewLeaderElector(leaderElectionID string, config *rest.Config, startFunc func(context.Context), stopFunc func(), newLeaderFunc func(string)) (*leaderelection.LeaderElector, error) {
resourceLock, err := newResourceLock(config, leaderElectionID)
if err != nil {
return &leaderelection.LeaderElector{}, err
Expand All @@ -96,6 +100,7 @@ func NewLeaderElector(leaderElectionID string, config *rest.Config, startFunc fu
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: startFunc,
OnStoppedLeading: stopFunc,
OnNewLeader: newLeaderFunc,
},
})
return l, err
Expand Down
10 changes: 8 additions & 2 deletions pkg/kube/watcher.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kube

import (
"sync"
"time"

"github.com/resmoio/kubernetes-event-exporter/pkg/metrics"
Expand All @@ -19,6 +20,7 @@ var startUpTime = time.Now()
type EventHandler func(event *EnhancedEvent)

type EventWatcher struct {
wg sync.WaitGroup
informer cache.SharedInformer
stopper chan struct{}
objectMetadataCache ObjectMetadataProvider
Expand Down Expand Up @@ -135,12 +137,16 @@ func (e *EventWatcher) OnDelete(obj interface{}) {
}

func (e *EventWatcher) Start() {
go e.informer.Run(e.stopper)
e.wg.Add(1)
go func() {
defer e.wg.Done()
e.informer.Run(e.stopper)
}()
}

func (e *EventWatcher) Stop() {
e.stopper <- struct{}{}
close(e.stopper)
e.wg.Wait()
}

func (e *EventWatcher) setStartUpTime(time time.Time) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/setup/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/resmoio/kubernetes-event-exporter/pkg/exporter"
)

func ParseConfigFromBites(configBytes []byte) (exporter.Config, error) {
func ParseConfigFromBytes(configBytes []byte) (exporter.Config, error) {
var config exporter.Config
err := yaml.Unmarshal(configBytes, &config)
if err != nil {
Expand Down
20 changes: 10 additions & 10 deletions pkg/setup/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ import (
"github.com/stretchr/testify/assert"
)

func Test_ParseConfigFromBites_ExampleConfigIsCorrect(t *testing.T) {
func Test_ParseConfigFromBytes_ExampleConfigIsCorrect(t *testing.T) {
configBytes, err := os.ReadFile("../../config.example.yaml")
if err != nil {
assert.NoError(t, err, "cannot read config file: "+err.Error())
return
}

config, err := ParseConfigFromBites(configBytes)
config, err := ParseConfigFromBytes(configBytes)

assert.NoError(t, err)
assert.NotEmpty(t, config.LogLevel)
Expand All @@ -26,26 +26,26 @@ func Test_ParseConfigFromBites_ExampleConfigIsCorrect(t *testing.T) {
assert.Equal(t, 10, len(config.Receivers))
}

func Test_ParseConfigFromBites_NoErrors(t *testing.T) {
func Test_ParseConfigFromBytes_NoErrors(t *testing.T) {
configBytes := []byte(`
logLevel: info
logFormat: json
`)

config, err := ParseConfigFromBites(configBytes)
config, err := ParseConfigFromBytes(configBytes)

assert.NoError(t, err)
assert.Equal(t, "info", config.LogLevel)
assert.Equal(t, "json", config.LogFormat)
}

func Test_ParseConfigFromBites_ErrorWhenCurlyBracesNotEscaped(t *testing.T) {
func Test_ParseConfigFromBytes_ErrorWhenCurlyBracesNotEscaped(t *testing.T) {
configBytes := []byte(`
logLevel: {{info}}
logFormat: json
`)

config, err := ParseConfigFromBites(configBytes)
config, err := ParseConfigFromBytes(configBytes)

expectedErrorLine := "> 2 | logLevel: {{info}}"
expectedErrorSuggestion := "Need to wrap values with special characters in quotes"
Expand All @@ -56,26 +56,26 @@ logFormat: json
assert.Equal(t, "", config.LogFormat)
}

func Test_ParseConfigFromBites_OkWhenCurlyBracesEscaped(t *testing.T) {
func Test_ParseConfigFromBytes_OkWhenCurlyBracesEscaped(t *testing.T) {
configBytes := []byte(`
logLevel: "{{info}}"
logFormat: json
`)

config, err := ParseConfigFromBites(configBytes)
config, err := ParseConfigFromBytes(configBytes)

assert.Nil(t, err)
assert.Equal(t, "{{info}}", config.LogLevel)
assert.Equal(t, "json", config.LogFormat)
}

func Test_ParseConfigFromBites_ErrorErrorNotWithCurlyBraces(t *testing.T) {
func Test_ParseConfigFromBytes_ErrorErrorNotWithCurlyBraces(t *testing.T) {
configBytes := []byte(`
logLevelNotYAMLErrorError
logFormat: json
`)

config, err := ParseConfigFromBites(configBytes)
config, err := ParseConfigFromBytes(configBytes)

expectedErrorLine := "> 2 | logLevelNotYAMLErrorError"
expectedErrorSuggestion := "Need to wrap values with special characters in quotes"
Expand Down

0 comments on commit d7ba03a

Please sign in to comment.