diff --git a/leaderelection/leader_election.go b/leaderelection/leader_election.go index 59779f1f8..d23c22bf2 100644 --- a/leaderelection/leader_election.go +++ b/leaderelection/leader_election.go @@ -19,8 +19,10 @@ package leaderelection import ( "context" "fmt" + "io/ioutil" "os" "regexp" + "strings" "time" "k8s.io/api/core/v1" @@ -62,16 +64,15 @@ type leaderElection struct { } // NewLeaderElection returns the default & preferred leader election type -func NewLeaderElection(clientset kubernetes.Interface, lockName, lockNamespace string, runFunc func(ctx context.Context)) *leaderElection { - return NewLeaderElectionWithLeases(clientset, lockName, lockNamespace, runFunc) +func NewLeaderElection(clientset kubernetes.Interface, lockName string, runFunc func(ctx context.Context)) *leaderElection { + return NewLeaderElectionWithLeases(clientset, lockName, runFunc) } // NewLeaderElectionWithLeases returns an implementation of leader election using Leases -func NewLeaderElectionWithLeases(clientset kubernetes.Interface, lockName, lockNamespace string, runFunc func(ctx context.Context)) *leaderElection { +func NewLeaderElectionWithLeases(clientset kubernetes.Interface, lockName string, runFunc func(ctx context.Context)) *leaderElection { return &leaderElection{ runFunc: runFunc, lockName: lockName, - namespace: lockNamespace, resourceLock: resourcelock.LeasesResourceLock, leaseDuration: defaultLeaseDuration, renewDeadline: defaultRenewDeadline, @@ -81,11 +82,10 @@ func NewLeaderElectionWithLeases(clientset kubernetes.Interface, lockName, lockN } // NewLeaderElectionWithEndpoints returns an implementation of leader election using Endpoints -func NewLeaderElectionWithEndpoints(clientset kubernetes.Interface, lockName, lockNamespace string, runFunc func(ctx context.Context)) *leaderElection { +func NewLeaderElectionWithEndpoints(clientset kubernetes.Interface, lockName string, runFunc func(ctx context.Context)) *leaderElection { return &leaderElection{ runFunc: runFunc, lockName: lockName, - namespace: lockNamespace, resourceLock: resourcelock.EndpointsResourceLock, leaseDuration: defaultLeaseDuration, renewDeadline: defaultRenewDeadline, @@ -95,11 +95,10 @@ func NewLeaderElectionWithEndpoints(clientset kubernetes.Interface, lockName, lo } // NewLeaderElectionWithConfigMaps returns an implementation of leader election using ConfigMaps -func NewLeaderElectionWithConfigMaps(clientset kubernetes.Interface, lockName, lockNamespace string, runFunc func(ctx context.Context)) *leaderElection { +func NewLeaderElectionWithConfigMaps(clientset kubernetes.Interface, lockName string, runFunc func(ctx context.Context)) *leaderElection { return &leaderElection{ runFunc: runFunc, lockName: lockName, - namespace: lockNamespace, resourceLock: resourcelock.ConfigMapsResourceLock, leaseDuration: defaultLeaseDuration, renewDeadline: defaultRenewDeadline, @@ -108,20 +107,29 @@ func NewLeaderElectionWithConfigMaps(clientset kubernetes.Interface, lockName, l } } -func (l *leaderElection) WithIdentity(identity string) { +func (l *leaderElection) WithIdentity(identity string) *leaderElection { l.identity = identity + return l } -func (l *leaderElection) WithLeaseDuration(leaseDuration time.Duration) { +func (l *leaderElection) WithNamespace(namespace string) *leaderElection { + l.namespace = namespace + return l +} + +func (l *leaderElection) WithLeaseDuration(leaseDuration time.Duration) *leaderElection { l.leaseDuration = leaseDuration + return l } -func (l *leaderElection) WithRenewDeadline(renewDeadline time.Duration) { +func (l *leaderElection) WithRenewDeadline(renewDeadline time.Duration) *leaderElection { l.renewDeadline = renewDeadline + return l } -func (l *leaderElection) WithRetryPeriod(retryPeriod time.Duration) { +func (l *leaderElection) WithRetryPeriod(retryPeriod time.Duration) *leaderElection { l.retryPeriod = retryPeriod + return l } func (l *leaderElection) Run() error { @@ -134,6 +142,10 @@ func (l *leaderElection) Run() error { l.identity = id } + if l.namespace == "" { + l.namespace = inClusterNamespace() + } + broadcaster := record.NewBroadcaster() broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: l.clientset.CoreV1().Events(l.namespace)}) eventRecorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: fmt.Sprintf("%s/%s", l.lockName, string(l.identity))}) @@ -185,3 +197,20 @@ func sanitizeName(name string) string { } return name } + +// inClusterNamespace returns the namespace in which the pod is running in by checking +// the env var POD_NAMESPACE, then the file /var/run/secrets/kubernetes.io/serviceaccount/namespace. +// if neither returns a valid namespace, the "default" namespace is returned +func inClusterNamespace() string { + if ns := os.Getenv("POD_NAMESPACE"); ns != "" { + return ns + } + + if data, err := ioutil.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace"); err == nil { + if ns := strings.TrimSpace(string(data)); len(ns) > 0 { + return ns + } + } + + return "default" +}