Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 41 additions & 12 deletions leaderelection/leader_election.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package leaderelection
import (
"context"
"fmt"
"io/ioutil"
"os"
"regexp"
"strings"
"time"

"k8s.io/api/core/v1"
Expand Down Expand Up @@ -62,16 +64,15 @@ type leaderElection struct {
}

// NewLeaderElection returns the default & preferred leader election type
func NewLeaderElection(clientset kubernetes.Interface, lockName, lockNamespace string, runFunc func(ctx context.Context)) *leaderElection {
return NewLeaderElectionWithLeases(clientset, lockName, lockNamespace, runFunc)
func NewLeaderElection(clientset kubernetes.Interface, lockName string, runFunc func(ctx context.Context)) *leaderElection {
return NewLeaderElectionWithLeases(clientset, lockName, runFunc)
}

// NewLeaderElectionWithLeases returns an implementation of leader election using Leases
func NewLeaderElectionWithLeases(clientset kubernetes.Interface, lockName, lockNamespace string, runFunc func(ctx context.Context)) *leaderElection {
func NewLeaderElectionWithLeases(clientset kubernetes.Interface, lockName string, runFunc func(ctx context.Context)) *leaderElection {
return &leaderElection{
runFunc: runFunc,
lockName: lockName,
namespace: lockNamespace,
resourceLock: resourcelock.LeasesResourceLock,
leaseDuration: defaultLeaseDuration,
renewDeadline: defaultRenewDeadline,
Expand All @@ -81,11 +82,10 @@ func NewLeaderElectionWithLeases(clientset kubernetes.Interface, lockName, lockN
}

// NewLeaderElectionWithEndpoints returns an implementation of leader election using Endpoints
func NewLeaderElectionWithEndpoints(clientset kubernetes.Interface, lockName, lockNamespace string, runFunc func(ctx context.Context)) *leaderElection {
func NewLeaderElectionWithEndpoints(clientset kubernetes.Interface, lockName string, runFunc func(ctx context.Context)) *leaderElection {
return &leaderElection{
runFunc: runFunc,
lockName: lockName,
namespace: lockNamespace,
resourceLock: resourcelock.EndpointsResourceLock,
leaseDuration: defaultLeaseDuration,
renewDeadline: defaultRenewDeadline,
Expand All @@ -95,11 +95,10 @@ func NewLeaderElectionWithEndpoints(clientset kubernetes.Interface, lockName, lo
}

// NewLeaderElectionWithConfigMaps returns an implementation of leader election using ConfigMaps
func NewLeaderElectionWithConfigMaps(clientset kubernetes.Interface, lockName, lockNamespace string, runFunc func(ctx context.Context)) *leaderElection {
func NewLeaderElectionWithConfigMaps(clientset kubernetes.Interface, lockName string, runFunc func(ctx context.Context)) *leaderElection {
return &leaderElection{
runFunc: runFunc,
lockName: lockName,
namespace: lockNamespace,
resourceLock: resourcelock.ConfigMapsResourceLock,
leaseDuration: defaultLeaseDuration,
renewDeadline: defaultRenewDeadline,
Expand All @@ -108,20 +107,29 @@ func NewLeaderElectionWithConfigMaps(clientset kubernetes.Interface, lockName, l
}
}

func (l *leaderElection) WithIdentity(identity string) {
func (l *leaderElection) WithIdentity(identity string) *leaderElection {
l.identity = identity
return l
}

func (l *leaderElection) WithLeaseDuration(leaseDuration time.Duration) {
func (l *leaderElection) WithNamespace(namespace string) *leaderElection {
l.namespace = namespace
return l
}

func (l *leaderElection) WithLeaseDuration(leaseDuration time.Duration) *leaderElection {
l.leaseDuration = leaseDuration
return l
}

func (l *leaderElection) WithRenewDeadline(renewDeadline time.Duration) {
func (l *leaderElection) WithRenewDeadline(renewDeadline time.Duration) *leaderElection {
l.renewDeadline = renewDeadline
return l
}

func (l *leaderElection) WithRetryPeriod(retryPeriod time.Duration) {
func (l *leaderElection) WithRetryPeriod(retryPeriod time.Duration) *leaderElection {
l.retryPeriod = retryPeriod
return l
}

func (l *leaderElection) Run() error {
Expand All @@ -134,6 +142,10 @@ func (l *leaderElection) Run() error {
l.identity = id
}

if l.namespace == "" {
l.namespace = inClusterNamespace()
}

broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: l.clientset.CoreV1().Events(l.namespace)})
eventRecorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: fmt.Sprintf("%s/%s", l.lockName, string(l.identity))})
Expand Down Expand Up @@ -185,3 +197,20 @@ func sanitizeName(name string) string {
}
return name
}

// inClusterNamespace returns the namespace in which the pod is running in by checking
// the env var POD_NAMESPACE, then the file /var/run/secrets/kubernetes.io/serviceaccount/namespace.
// if neither returns a valid namespace, the "default" namespace is returned
func inClusterNamespace() string {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer not to fall back to default here and just return an error. I left the fallback to default for now since this is used in external-provisioner. Do we know of any situations where a sidecar wouldn't run in the cluster? If not I think the fallback to default should be removed and instead exit with error

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we say that for Lease mechanism (which should be all new), require a namespace field to be set if not found, but for the other two, we allow the default for backwards compatibility?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could, though the implementation will be a bit messy I think. I added the WithNamespace method for the cases where we need to account for backwards compatibility. I guess the this boils down to two questions:

  1. how reliable is it for the namespace file to exist (I assume this is not going to change anytime in the near future)
  2. would any user run this "out of cluster"

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I don't expect this to change. It would probably be considered an API break
  2. It's possible someone may want to run this out of cluster, but we can wait for a feature request to add the option back.

Copy link
Contributor Author

@andrewsykim andrewsykim Apr 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm okay with the current PR merging as is, but having the --leader-election-namespace flag as optional in all the sidecars. Setting --leader-election-namespace calls the namespace override using WithNamespace. Otherwise, default to cluster namespace. Thoughts?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine to maintain backwards compatibility. If the default doesn't work, then the user needs to explicitly specify something.

if ns := os.Getenv("POD_NAMESPACE"); ns != "" {
return ns
}

if data, err := ioutil.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace"); err == nil {
if ns := strings.TrimSpace(string(data)); len(ns) > 0 {
return ns
}
}

return "default"
}