Skip to content

Commit

Permalink
Feat: support leader election for kube-trigger (#45)
Browse files Browse the repository at this point in the history
Signed-off-by: yangsoon <songyang.song@alibaba-inc.com>
Co-authored-by: yangsoon <songyang.song@alibaba-inc.com>
  • Loading branch information
yangsoon and yangsoon committed Apr 17, 2023
1 parent 87c090d commit 2a3243f
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 16 deletions.
142 changes: 126 additions & 16 deletions pkg/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,22 @@ import (
"fmt"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/uuid"
v1 "k8s.io/client-go/kubernetes/typed/coordination/v1"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
ctrl "sigs.k8s.io/controller-runtime"

"github.com/kubevela/kube-trigger/pkg/config"
"github.com/kubevela/kube-trigger/pkg/eventhandler"
Expand All @@ -54,6 +63,11 @@ const (
FlagTimeout = "timeout"

FlagRegistrySize = "registry-size"

FlagLeaderElect = "leader-elect"
FlagLeaderElectionLeaseDuration = "leader-election-lease-duration"
FlagLeaderElectionRenewDeadline = "leader-election-renew-deadline"
FlagLeaderElectionRetryPeriod = "leader-election-retry-period"
)

const (
Expand All @@ -68,8 +82,13 @@ Options have a priority like this: cli-flags > env > default-values`
)

var (
logger = logrus.WithField("kubetrigger", "main")
opt = newOption()
logger = logrus.WithField("kubetrigger", "main")
opt = newOption()
enableLeaderElection bool

leaseDuration time.Duration
renewDeadline time.Duration
retryPeriod time.Duration
)

// NewCommand news a command
Expand Down Expand Up @@ -115,10 +134,35 @@ func addFlags(opt *option, f *pflag.FlagSet) {
f.IntVar(&opt.Timeout, FlagTimeout, defaultTimeout, "Timeout for running each action")
f.IntVar(&opt.RegistrySize, FlagRegistrySize, defaultRegistrySize, "Cache size for filters and actions")
f.StringVar(&k8sresourcewatcher.MultiClusterConfigType, "multi-cluster-config-type", k8sresourcewatcher.TypeClusterGateway, "Multi-cluster config type, supported types: cluster-gateway, cluster-gateway-kubeconfig")
f.BoolVar(&enableLeaderElection, FlagLeaderElect, false, "Enable leader election for kube-trigger. Enabling this will ensure there is only one active kube-trigger.")
f.DurationVar(&leaseDuration, FlagLeaderElectionLeaseDuration, defaultLeaseDuration, "The duration that non-leader candidates will wait to force acquire leadership.")
f.DurationVar(&renewDeadline, FlagLeaderElectionRenewDeadline, defaultRenewDeadline, "The duration that the acting controlplane will retry refreshing leadership before giving up.")
f.DurationVar(&retryPeriod, FlagLeaderElectionRetryPeriod, defaultRetryPeriod, "The duration the LeaderElector clients should wait between tries of actions.")
}

func runCli(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
ctx, cancel := context.WithCancel(cmd.Context())
defer cancel()
r := NewRunner()
if err := r.Start(ctx); err != nil {
return err
}
// Listen to termination signals.
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGTERM, syscall.SIGINT)
select {
case err := <-r.Err():
logger.Errorf("runner stop with err: %v", err)
return err
case <-ctx.Done():
logger.Infof("context cancelled, stopping")
case <-sigterm:
logger.Infof("received termination signal, stopping")
}
return nil
}

func run(ctx context.Context) error {
// Set log level. No need to check error, we validated it previously.
level, _ := logrus.ParseLevel(opt.LogLevel)
logrus.SetLevel(level)
Expand Down Expand Up @@ -149,9 +193,6 @@ func runCli(cmd *cobra.Command, args []string) error {
return errors.Wrap(err, "error when creating executor")
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

instances := make(map[string]types.Source)

// Run watchers.
Expand Down Expand Up @@ -192,18 +233,87 @@ func runCli(cmd *cobra.Command, args []string) error {
}

// Let the workers run Actions.
go exe.RunJobs(ctx)
exe.RunJobs(ctx)
return nil
}

// Listen to termination signals.
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGTERM)
signal.Notify(sigterm, syscall.SIGINT)
select {
case <-ctx.Done():
logger.Infof("context cancelled, stopping")
case <-sigterm:
logger.Infof("received termination signal, stopping")
// Runner manages the task execution.
type Runner struct {
errChan chan error
start func(ctx context.Context) error
once sync.Once
}

// NewRunner new a Runner
func NewRunner() *Runner {
return &Runner{
errChan: make(chan error),
start: run,
}
}

// Start the task.
func (r *Runner) Start(ctx context.Context) error {
if enableLeaderElection {
return r.startLeaderElection(ctx)
}
go func() {
if err := r.start(ctx); err != nil {
r.errChan <- err
}
}()
return nil
}

func (r *Runner) startLeaderElection(ctx context.Context) error {
cclient, err := v1.NewForConfig(ctrl.GetConfigOrDie())
if err != nil {
return err
}
leaderElectionName := fmt.Sprintf("kube-trigger-%s",
strings.ToLower(strings.ReplaceAll(version.Version, ".", "-")),
)
leaderElectionID := fmt.Sprintf("%s-%s", leaderElectionName, uuid.NewUUID())
lock := &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Name: leaderElectionName,
Namespace: "vela-system",
},
Client: cclient,
LockConfig: resourcelock.ResourceLockConfig{
Identity: leaderElectionID,
},
}
callbacks := leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
r.once.Do(func() {
if err = r.start(ctx); err != nil {
r.errChan <- err
}
})
},
OnStoppedLeading: func() {
r.errChan <- errors.New("leader election lost")
},
}
l, err := leaderelection.NewLeaderElector(
leaderelection.LeaderElectionConfig{
Lock: lock,
LeaseDuration: leaseDuration,
RenewDeadline: renewDeadline,
RetryPeriod: retryPeriod,
Callbacks: callbacks,
})
if err != nil {
return err
}
go func() {
l.Run(ctx)
}()
return err
}

// Err return the runner's runtime error
func (r *Runner) Err() chan error {
return r.errChan
}
5 changes: 5 additions & 0 deletions pkg/cmd/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ const (
defaultTimeout = 10

defaultRegistrySize = 100

// Values taken from: https://github.com/kubernetes/component-base/blob/master/config/v1alpha1/defaults.go
defaultLeaseDuration = 15 * time.Second
defaultRenewDeadline = 10 * time.Second
defaultRetryPeriod = 2 * time.Second
)

func newOption() *option {
Expand Down

0 comments on commit 2a3243f

Please sign in to comment.