Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/kubeapply-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/segmentio/stats/v4/datadog"
log "github.com/sirupsen/logrus"
prefixed "github.com/x-cray/logrus-prefixed-formatter"
"k8s.io/klog"
"k8s.io/klog/v2"
)

func init() {
Expand Down
2 changes: 1 addition & 1 deletion cmd/kubeapply/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"github.com/segmentio/kubeapply/cmd/kubeapply/subcmd"
log "github.com/sirupsen/logrus"
prefixed "github.com/x-cray/logrus-prefixed-formatter"
"k8s.io/klog"
"k8s.io/klog/v2"
)

var (
Expand Down
2 changes: 1 addition & 1 deletion cmd/kubestar/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"github.com/segmentio/kubeapply/cmd/kubestar/subcmd"
log "github.com/sirupsen/logrus"
prefixed "github.com/x-cray/logrus-prefixed-formatter"
"k8s.io/klog"
"k8s.io/klog/v2"
)

var (
Expand Down
4 changes: 2 additions & 2 deletions data/data.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 16 additions & 20 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
module github.com/segmentio/kubeapply

go 1.14
go 1.15

require (
github.com/Masterminds/semver/v3 v3.1.0
github.com/Masterminds/sprig/v3 v3.1.0
github.com/Masterminds/semver/v3 v3.1.1
github.com/Masterminds/sprig/v3 v3.2.1
github.com/aws/aws-lambda-go v1.15.0
github.com/aws/aws-sdk-go v1.29.16
github.com/briandowns/spinner v1.11.1
Expand All @@ -13,11 +13,8 @@ require (
github.com/fatih/color v1.7.0
github.com/ghodss/yaml v1.0.0
github.com/gogo/protobuf v1.3.1
github.com/google/go-cmp v0.4.1 // indirect
github.com/google/go-github/v30 v30.0.0
github.com/gorilla/mux v1.7.4
github.com/imdario/mergo v0.3.11 // indirect
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/mattn/go-colorable v0.1.4 // indirect
github.com/mattn/go-isatty v0.0.11 // indirect
Expand All @@ -29,27 +26,26 @@ require (
github.com/segmentio/encoding v0.2.7
github.com/segmentio/stats v3.0.0+incompatible
github.com/segmentio/stats/v4 v4.5.3
github.com/sirupsen/logrus v1.4.2
github.com/spf13/cobra v0.0.6
github.com/stretchr/testify v1.6.0
github.com/sirupsen/logrus v1.6.0
github.com/spf13/cobra v1.1.1
github.com/stretchr/testify v1.6.1
github.com/stripe/skycfg v0.0.0-20200303020846-4f599970a3e6
github.com/x-cray/logrus-prefixed-formatter v0.5.2
github.com/zorkian/go-datadog-api v2.28.0+incompatible // indirect
go.starlark.net v0.0.0-20200330013621-be5394c419b6
golang.org/x/crypto v0.0.0-20200510223506-06a226fb4e37 // indirect
golang.org/x/net v0.0.0-20200528225125-3c3fba18258b // indirect
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae // indirect
google.golang.org/appengine v1.6.1 // indirect
go.starlark.net v0.0.0-20201204201740-42d4f566359b
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
gopkg.in/go-playground/assert.v1 v1.2.1 // indirect
gopkg.in/src-d/go-git.v4 v4.13.1
gopkg.in/yaml.v2 v2.3.0
gopkg.in/yaml.v3 v3.0.0-20200601152816-913338de1bd2 // indirect
gopkg.in/zorkian/go-datadog-api.v2 v2.28.0
k8s.io/api v0.17.2
k8s.io/apimachinery v0.17.2
k8s.io/client-go v0.17.2
k8s.io/klog v1.0.0
k8s.io/kubectl v0.17.2
k8s.io/api v0.20.2
k8s.io/apimachinery v0.20.2
k8s.io/client-go v0.20.2
k8s.io/klog/v2 v2.4.0
k8s.io/kubectl v0.20.2
)

// Need to pin to older version to get around https://github.com/stripe/skycfg/issues/86.
replace github.com/golang/protobuf v1.4.3 => github.com/golang/protobuf v1.3.2
460 changes: 331 additions & 129 deletions go.sum

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions pkg/cluster/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ type ClusterClient interface {
Summary(ctx context.Context) (string, error)

// GetStoreValue gets the value of the given key.
GetStoreValue(key string) (string, error)
GetStoreValue(ctx context.Context, key string) (string, error)

// SetStoreValue sets the given key/value pair in the cluster.
SetStoreValue(key string, value string) error
SetStoreValue(ctx context.Context, key string, value string) error

// Config returns the config for this cluster.
Config() *config.ClusterConfig
Expand Down
9 changes: 6 additions & 3 deletions pkg/cluster/fake_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,12 @@ func (cc *FakeClusterClient) Summary(ctx context.Context) (string, error) {
}

// GetStoreValue gets the value of the argument key.
func (cc *FakeClusterClient) GetStoreValue(key string) (string, error) {
func (cc *FakeClusterClient) GetStoreValue(ctx context.Context, key string) (string, error) {
return cc.store[key], nil
}

// SetStoreValue sets the argument key to the argument value.
func (cc *FakeClusterClient) SetStoreValue(key string, value string) error {
func (cc *FakeClusterClient) SetStoreValue(ctx context.Context, key string, value string) error {
cc.store[key] = value
return nil
}
Expand All @@ -139,7 +139,10 @@ func (cc *FakeClusterClient) Config() *config.ClusterConfig {
}

// GetNamespaceUID returns the kubernetes identifier for a given namespace in this cluster.
func (cc *FakeClusterClient) GetNamespaceUID(ctx context.Context, namespace string) (string, error) {
func (cc *FakeClusterClient) GetNamespaceUID(
ctx context.Context,
namespace string,
) (string, error) {
return fmt.Sprintf("ns-%s", namespace), cc.kubectlErr
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/cluster/kube_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,13 +251,13 @@ func (cc *KubeClusterClient) Summary(ctx context.Context) (string, error) {
}

// GetStoreValue gets the value of the argument key.
func (cc *KubeClusterClient) GetStoreValue(key string) (string, error) {
return cc.kubeStore.Get(key)
func (cc *KubeClusterClient) GetStoreValue(ctx context.Context, key string) (string, error) {
return cc.kubeStore.Get(ctx, key)
}

// SetStoreValue sets the value of the argument key to the argument value.
func (cc *KubeClusterClient) SetStoreValue(key string, value string) error {
return cc.kubeStore.Set(key, value)
func (cc *KubeClusterClient) SetStoreValue(ctx context.Context, key string, value string) error {
return cc.kubeStore.Set(ctx, key, value)
}

// Config returns this client's cluster config.
Expand Down Expand Up @@ -311,7 +311,7 @@ func (cc *KubeClusterClient) execApply(

if cc.checkApplyConsistency {
log.Infof("Fetching diff event for key %s", cc.clusterKey)
storeValue, err := cc.GetStoreValue(cc.clusterKey)
storeValue, err := cc.GetStoreValue(ctx, cc.clusterKey)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -388,5 +388,5 @@ func (cc *KubeClusterClient) execDiff(
diffEventStr := string(diffEventBytes)

log.Infof("Setting store key value: %s, %s", cc.clusterKey, diffEventStr)
return diffResult, cc.kubeStore.Set(cc.clusterKey, diffEventStr)
return diffResult, cc.kubeStore.Set(ctx, cc.clusterKey, diffEventStr)
}
82 changes: 37 additions & 45 deletions pkg/store/leaderelection/leaderelection.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
/*
Copyright 2015 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down Expand Up @@ -58,18 +55,16 @@ import (
"fmt"
"time"

log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
rl "k8s.io/client-go/tools/leaderelection/resourcelock"

"k8s.io/klog"
)

const (
// JitterFactor is a multiplier used to add jitter to leader renewal times.
JitterFactor = 1.2
)

Expand Down Expand Up @@ -107,8 +102,6 @@ func NewLeaderElector(lec LeaderElectionConfig) (*LeaderElector, error) {
return &le, nil
}

// LeaderElectionConfig contains the settings associated with a leader
// election process.
type LeaderElectionConfig struct {
// Lock is the resource that will be used for locking
Lock rl.Interface
Expand Down Expand Up @@ -183,17 +176,17 @@ type LeaderElector struct {

// clock is wrapper around time to allow for less flaky testing
clock clock.Clock

// name is the name of the resource lock for debugging
name string
}

// Run starts the leader election loop
// Run starts the leader election loop. Run will not return
// before leader election loop is stopped by ctx or it has
// stopped holding the leader lease
func (le *LeaderElector) Run(ctx context.Context) {
defer runtime.HandleCrash()
defer func() {
runtime.HandleCrash()
le.config.Callbacks.OnStoppedLeading()
}()

if !le.acquire(ctx) {
return // ctx signalled done
}
Expand All @@ -204,7 +197,8 @@ func (le *LeaderElector) Run(ctx context.Context) {
}

// RunOrDie starts a client with the provided config or panics if the config
// fails to validate.
// fails to validate. RunOrDie blocks until leader election loop is
// stopped by ctx or it has stopped holding the leader lease
func RunOrDie(ctx context.Context, lec LeaderElectionConfig) {
le, err := NewLeaderElector(lec)
if err != nil {
Expand All @@ -231,16 +225,16 @@ func (le *LeaderElector) acquire(ctx context.Context) bool {
defer cancel()
succeeded := false
desc := le.config.Lock.Describe()
klog.Infof("attempting to acquire leader lease %v...", desc)
log.Infof("Attempting to acquire leader lease %v...", desc)
wait.JitterUntil(func() {
succeeded = le.tryAcquireOrRenew()
succeeded = le.tryAcquireOrRenew(ctx)
le.maybeReportTransition()
if !succeeded {
klog.V(4).Infof("failed to acquire lease %v", desc)
log.Infof("Failed to acquire lease %v", desc)
return
}
le.config.Lock.RecordEvent("became leader")
klog.Infof("successfully acquired lease %v", desc)
log.Infof("Successfully acquired lease %v", desc)
cancel()
}, le.config.RetryPeriod, JitterFactor, true, ctx.Done())
return succeeded
Expand All @@ -254,48 +248,40 @@ func (le *LeaderElector) renew(ctx context.Context) {
timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline)
defer timeoutCancel()
err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) {
done := make(chan bool, 1)
go func() {
defer close(done)
done <- le.tryAcquireOrRenew()
}()

select {
case <-timeoutCtx.Done():
return false, fmt.Errorf("failed to tryAcquireOrRenew %s", timeoutCtx.Err())
case result := <-done:
return result, nil
}
return le.tryAcquireOrRenew(timeoutCtx), nil
}, timeoutCtx.Done())

le.maybeReportTransition()
desc := le.config.Lock.Describe()
if err == nil {
klog.V(5).Infof("successfully renewed lease %v", desc)
log.Debugf("Successfully renewed lease %v", desc)
return
}
le.config.Lock.RecordEvent("stopped leading")
klog.Infof("failed to renew lease %v: %v", desc, err)
log.Infof("Failed to renew lease %v: %v", desc, err)
cancel()
}, le.config.RetryPeriod, ctx.Done())

// if we hold the lease, give it up
if le.config.ReleaseOnCancel {
le.release()
le.release(ctx)
}
}

// release attempts to release the leader lease if we have acquired it.
func (le *LeaderElector) release() bool {
func (le *LeaderElector) release(ctx context.Context) bool {
if !le.IsLeader() {
return true
}
now := metav1.Now()
leaderElectionRecord := rl.LeaderElectionRecord{
LeaderTransitions: le.observedRecord.LeaderTransitions,
LeaseDurationSeconds: int(le.config.LeaseDuration.Seconds()),
LeaseDurationSeconds: 1,
RenewTime: now,
AcquireTime: now,
}
if err := le.config.Lock.Update(leaderElectionRecord); err != nil {
klog.Errorf("Failed to release lock: %v", err)
if err := le.config.Lock.Update(ctx, leaderElectionRecord); err != nil {
log.Errorf("Failed to release lock: %v", err)
return false
}
le.observedRecord = leaderElectionRecord
Expand All @@ -306,7 +292,7 @@ func (le *LeaderElector) release() bool {
// tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired,
// else it tries to renew the lease if it has already been acquired. Returns true
// on success else returns false.
func (le *LeaderElector) tryAcquireOrRenew() bool {
func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
now := metav1.Now()
leaderElectionRecord := rl.LeaderElectionRecord{
HolderIdentity: le.config.Lock.Identity(),
Expand All @@ -316,14 +302,14 @@ func (le *LeaderElector) tryAcquireOrRenew() bool {
}

// 1. obtain or create the ElectionRecord
oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get()
oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
if err != nil {
if !errors.IsNotFound(err) {
klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
log.Errorf("Error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
return false
}
if err = le.config.Lock.Create(leaderElectionRecord); err != nil {
klog.Errorf("error initially creating leader election record: %v", err)
if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil {
log.Errorf("Error initially creating leader election record: %v", err)
return false
}
le.observedRecord = leaderElectionRecord
Expand All @@ -337,10 +323,16 @@ func (le *LeaderElector) tryAcquireOrRenew() bool {
le.observedRawRecord = oldLeaderElectionRawRecord
le.observedTime = le.clock.Now()
}

// If the renew time is more than 2x the lease duration in the past, don't worry
// about clock skew and just take the lock.
thresholdTime := now.Time.Add(-2 * le.config.LeaseDuration)

if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
oldLeaderElectionRecord.RenewTime.Time.After(thresholdTime) &&
!le.IsLeader() {
klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
log.Infof("Lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
return false
}

Expand All @@ -354,8 +346,8 @@ func (le *LeaderElector) tryAcquireOrRenew() bool {
}

// update the lock itself
if err = le.config.Lock.Update(leaderElectionRecord); err != nil {
klog.Errorf("Failed to update lock: %v", err)
if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil {
log.Errorf("Failed to update lock: %v", err)
return false
}

Expand Down
Loading