Skip to content
95 changes: 51 additions & 44 deletions leader/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,72 +31,89 @@ import (
logf "sigs.k8s.io/controller-runtime/pkg/log"
)

type runModeType string

const (
localRunMode runModeType = "local"
)

// forceRunModeEnv indicates if the operator should be forced to run in either local
// or cluster mode (currently only used for local mode)
var forceRunModeEnv = "OSDK_FORCE_RUN_MODE"

// errNoNamespace indicates that a namespace could not be found for the current
// ErrNoNamespace indicates that a namespace could not be found for the current
// environment
var errNoNamespace = fmt.Errorf("namespace not found for current environment")

// errRunLocal indicates that the operator is set to run in local mode (this error
// is returned by functions that only work on operators running in cluster mode)
var errRunLocal = fmt.Errorf("operator run mode forced to local")
var ErrNoNamespace = fmt.Errorf("namespace not found for current environment")

// podNameEnvVar is the constant for env variable POD_NAME
// which is the name of the current pod.
const podNameEnvVar = "POD_NAME"

var readNamespace = func() ([]byte, error) {
return ioutil.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace")
}

var log = logf.Log.WithName("leader")

// maxBackoffInterval defines the maximum amount of time to wait between
// attempts to become the leader.
const maxBackoffInterval = time.Second * 16

type Option func(*Config) error

type Config struct {
Client crclient.Client
}

func (c *Config) setDefaults() error {
if c.Client == nil {
config, err := config.GetConfig()
if err != nil {
return err
}

client, err := crclient.New(config, crclient.Options{})
if err != nil {
return err
}
c.Client = client
}
return nil
}

func WithClient(cl crclient.Client) Option {
return func(c *Config) error {
c.Client = cl
return nil
}
}

// Become ensures that the current pod is the leader within its namespace. If
// run outside a cluster, it will skip leader election and return nil. It
// continuously tries to create a ConfigMap with the provided name and the
// current pod set as the owner reference. Only one can exist at a time with
// the same name, so the pod that successfully creates the ConfigMap is the
// leader. Upon termination of that pod, the garbage collector will delete the
// ConfigMap, enabling a different pod to become the leader.
func Become(ctx context.Context, lockName string) error {
func Become(ctx context.Context, lockName string, opts ...Option) error {
Copy link
Contributor

@camilamacedo86 camilamacedo86 Jul 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We replaced the sdk impl for the controller-runtime one. So, I do not think that we need to keep it at all.

@joelanford why we will keep our implementation since all opers now are using the controller-runtime one? If yes, i need to revert the change made in operator-framework/operator-sdk#3514.

c/c @jmrodri

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@camilamacedo86 my understanding was that this is a leader for life and we kept it for someone that wanted to have leader for life. It will not be used in the SDK but could be used by an operator developer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lot of existing operators are using the leader-for-life implementation. They'll either need a way to continue using that, or will need an easy path to migrating to the lease-based election.

These days the lease-based election is easy to use and robust, which hasn't always been the case. But even still, each pattern has advantages and disadvantages.

log.Info("Trying to become the leader.")

ns, err := getOperatorNamespace()
if err != nil {
if err == errNoNamespace || err == errRunLocal {
log.Info("Skipping leader election; not running in a cluster.")
return nil
config := Config{}

for _, opt := range opts {
if err := opt(&config); err != nil {
return err
}
return err
}

config, err := config.GetConfig()
if err != nil {
if err := config.setDefaults(); err != nil {
return err
}

client, err := crclient.New(config, crclient.Options{})
ns, err := getOperatorNamespace()
if err != nil {
return err
}

owner, err := myOwnerRef(ctx, client, ns)
owner, err := myOwnerRef(ctx, config.Client, ns)
if err != nil {
return err
}

// check for existing lock from this pod, in case we got restarted
existing := &corev1.ConfigMap{}
key := crclient.ObjectKey{Namespace: ns, Name: lockName}
err = client.Get(ctx, key, existing)
err = config.Client.Get(ctx, key, existing)

switch {
case err == nil:
Expand Down Expand Up @@ -126,15 +143,15 @@ func Become(ctx context.Context, lockName string) error {
// try to create a lock
backoff := time.Second
for {
err := client.Create(ctx, cm)
err := config.Client.Create(ctx, cm)
switch {
case err == nil:
log.Info("Became the leader.")
return nil
case apierrors.IsAlreadyExists(err):
// refresh the lock so we use current leader
key := crclient.ObjectKey{Namespace: ns, Name: lockName}
if err := client.Get(ctx, key, existing); err != nil {
if err := config.Client.Get(ctx, key, existing); err != nil {
log.Info("Leader lock configmap not found.")
continue // configmap got lost ... just wait a bit
}
Expand All @@ -148,7 +165,7 @@ func Become(ctx context.Context, lockName string) error {
default:
leaderPod := &corev1.Pod{}
key = crclient.ObjectKey{Namespace: ns, Name: existingOwners[0].Name}
err = client.Get(ctx, key, leaderPod)
err = config.Client.Get(ctx, key, leaderPod)
switch {
case apierrors.IsNotFound(err):
log.Info("Leader pod has been deleted, waiting for garbage collection to remove the lock.")
Expand All @@ -158,7 +175,7 @@ func Become(ctx context.Context, lockName string) error {
log.Info("Operator pod with leader lock has been evicted.", "leader", leaderPod.Name)
log.Info("Deleting evicted leader.")
// Pod may not delete immediately, continue with backoff
err := client.Delete(ctx, leaderPod)
err := config.Client.Delete(ctx, leaderPod)
if err != nil {
log.Error(err, "Leader pod could not be deleted.")
}
Expand Down Expand Up @@ -210,13 +227,10 @@ func isPodEvicted(pod corev1.Pod) bool {

// getOperatorNamespace returns the namespace the operator should be running in.
func getOperatorNamespace() (string, error) {
if isRunModeLocal() {
return "", errRunLocal
}
nsBytes, err := ioutil.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace")
nsBytes, err := readNamespace()
if err != nil {
if os.IsNotExist(err) {
return "", errNoNamespace
return "", ErrNoNamespace
}
return "", err
}
Expand All @@ -225,17 +239,10 @@ func getOperatorNamespace() (string, error) {
return ns, nil
}

func isRunModeLocal() bool {
return os.Getenv(forceRunModeEnv) == string(localRunMode)
}

// getPod returns a Pod object that corresponds to the pod in which the code
// is currently running.
// It expects the environment variable POD_NAME to be set by the downwards API.
func getPod(ctx context.Context, client crclient.Client, ns string) (*corev1.Pod, error) {
if isRunModeLocal() {
return nil, errRunLocal
}
podName := os.Getenv(podNameEnvVar)
if podName == "" {
return nil, fmt.Errorf("required env %s not set, please configure downward API", podNameEnvVar)
Expand Down
13 changes: 13 additions & 0 deletions leader/leader_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package leader

import (
"testing"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

func TestLeader(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Leader Suite")
}
Loading