Skip to content
This repository has been archived by the owner on Apr 4, 2023. It is now read-only.

Perform leaderelection in GenericPilot #186

Merged
merged 3 commits into from
Jan 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions contrib/charts/navigator/templates/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ items:
verbs: ["*"]
- apiGroups: [""]
resources: ["services", "configmaps", "serviceaccounts", "pods"]
verbs: ["get", "list", "watch", "update", "create", "delete"]
verbs: ["get", "list", "watch", "update", "create", "delete", "patch"]
- apiGroups: ["apps"]
resources: ["statefulsets"]
verbs: ["get", "list", "watch", "update", "create", "delete"]
verbs: ["get", "list", "watch", "update", "create", "delete", "patch"]
- apiGroups: [""]
resources: ["endpoints"]
verbs: ["*"]
Expand Down
6 changes: 6 additions & 0 deletions pkg/controllers/elasticsearch/nodepool/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ func elasticsearchPodTemplateSpec(controllerName string, c *v1alpha1.Elasticsear
"--elasticsearch-master-url=$(CLUSTER_URL)",
"--elasticsearch-roles=$(ROLES)",
"--elasticsearch-plugins=$(PLUGINS)",
"--leader-election-config-map=$(LEADER_ELECTION_CONFIG_MAP)",
},
Env: []apiv1.EnvVar{
{
Expand All @@ -177,6 +178,11 @@ func elasticsearchPodTemplateSpec(controllerName string, c *v1alpha1.Elasticsear
Name: "PLUGINS",
Value: plugins,
},
{
Name: "LEADER_ELECTION_CONFIG_MAP",
// TODO: trim the length of this string
Value: fmt.Sprintf("elastic-%s-leaderelection", c.Name),
},
{
Name: "CLUSTER_URL",
Value: "http://" + util.ClientServiceName(c) + ":9200",
Expand Down
5 changes: 5 additions & 0 deletions pkg/controllers/elasticsearch/role/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ func roleForCluster(c *v1alpha1.ElasticsearchCluster) *rbacv1beta1.Role {
Verbs: []string{"create", "update", "patch"},
Resources: []string{"events"},
},
{
APIGroups: []string{""},
Verbs: []string{"create", "update", "patch", "get", "list", "watch"},
Resources: []string{"configmaps"},
},
{
APIGroups: []string{navigator.GroupName},
Verbs: []string{"get", "list", "watch"},
Expand Down
12 changes: 8 additions & 4 deletions pkg/pilot/elasticsearch/v5/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"fmt"

"github.com/golang/glog"

"github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1"
)

Expand All @@ -12,10 +14,7 @@ const (
)

func (p *Pilot) syncFunc(pilot *v1alpha1.Pilot) error {
// TODO: perform cluster wide actions if we are a leader
if !p.genericPilot.IsThisPilot(pilot) {
return nil
}
glog.V(4).Infof("ElasticsearchController: syncing current pilot %q", pilot.Name)
if pilot.Status.Elasticsearch == nil {
pilot.Status.Elasticsearch = &v1alpha1.ElasticsearchPilotStatus{}
}
Expand All @@ -39,3 +38,8 @@ func (p *Pilot) syncFunc(pilot *v1alpha1.Pilot) error {
}
return nil
}

func (p *Pilot) leaderElectedSyncFunc(pilot *v1alpha1.Pilot) error {
glog.V(4).Infof("ElasticsearchController: leader elected sync of pilot %q", pilot.Name)
return nil
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What operations might be performed here?
#97 talks about excludeShards, but that seems like something that would be performed by the Pilot that had been selected to leave the cluster by the ES controller.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So in the ES pilot case, excludeShards will be handled here. I've got some more work locally to actually do this. This needs to be leader elected as it requires updating the ES API with the new list of nodes that should be excluded. We could do it in each individual Pilot and just hope there's no races, but this way is far more concrete.

1 change: 1 addition & 0 deletions pkg/pilot/elasticsearch/v5/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func (o *PilotOptions) Complete() error {
o.GenericPilotOptions.SharedInformerFactory = o.sharedInformerFactory
o.GenericPilotOptions.CmdFunc = o.pilot.CmdFunc
o.GenericPilotOptions.SyncFunc = o.pilot.syncFunc
o.GenericPilotOptions.LeaderElectedSyncFunc = o.pilot.leaderElectedSyncFunc
o.GenericPilotOptions.Hooks = o.pilot.Hooks()
o.GenericPilotOptions.ReadinessProbe = o.pilot.ReadinessCheck()
o.GenericPilotOptions.LivenessProbe = o.pilot.LivenessCheck()
Expand Down
34 changes: 23 additions & 11 deletions pkg/pilot/genericpilot/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,35 @@ const (
)

func (g *GenericPilot) syncPilot(pilot *v1alpha1.Pilot) (err error) {
// don't perform status updates for any Pilot other than our own
if !g.controller.IsThisPilot(pilot) {
return g.Options.SyncFunc(pilot)
}

// we defer this to the end of execution to ensure it is run even if a part
// of the sync errors
defer func() {
errs := []error{err}
errs = append(errs, g.updatePilotStatus(pilot))
// err will be nil if the contents of errs is all nil
err = utilerrors.NewAggregate(errs)
if g.IsThisPilot(pilot) {
errs := []error{err}
errs = append(errs, g.updatePilotStatus(pilot))
// err will be nil if the contents of errs is all nil
err = utilerrors.NewAggregate(errs)
}
}()

g.lock.Lock()
defer g.lock.Unlock()

// we defer this to ensure it is run regardless of whether the pilot has
// actually started, else progress across the rest of the application may
// be stalled
defer func() {
if g.Options.LeaderElectedSyncFunc != nil && g.elector.Leading() {
errs := []error{err}
errs = append(errs, g.Options.LeaderElectedSyncFunc(pilot))
err = utilerrors.NewAggregate(errs)
}
}()

if !g.IsThisPilot(pilot) {
return
}

err = g.Options.Hooks.Transition(v1alpha1.PilotPhasePreStart, pilot)
if err != nil {
g.recorder.Eventf(pilot, corev1.EventTypeWarning, ErrExecHook, MessageErrExecHook, err)
Expand Down Expand Up @@ -65,11 +77,11 @@ func (g *GenericPilot) syncPilot(pilot *v1alpha1.Pilot) (err error) {
return err
}

return g.Options.SyncFunc(pilot)
err = g.Options.SyncFunc(pilot)
return
}

func (g *GenericPilot) stop(pilot *v1alpha1.Pilot) error {
// set g.shutdown = true to signal preStop hooks to run
g.shutdown = true
glog.V(4).Infof("Waiting for process exit and hooks to execute")

Expand Down
37 changes: 29 additions & 8 deletions pkg/pilot/genericpilot/genericpilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
clientset "github.com/jetstack/navigator/pkg/client/clientset/versioned"
listersv1alpha1 "github.com/jetstack/navigator/pkg/client/listers/navigator/v1alpha1"
"github.com/jetstack/navigator/pkg/pilot/genericpilot/controller"
"github.com/jetstack/navigator/pkg/pilot/genericpilot/leaderelection"
"github.com/jetstack/navigator/pkg/pilot/genericpilot/processmanager"
)

Expand All @@ -34,7 +35,8 @@ type GenericPilot struct {
shutdown bool
// lock is used internally to coordinate updates to fields on the
// GenericPilot structure
lock sync.Mutex
lock sync.Mutex
elector leaderelection.Interface
}

func (g *GenericPilot) Run() error {
Expand All @@ -50,25 +52,32 @@ func (g *GenericPilot) Run() error {
// block until told to shutdown
select {
case <-g.Options.StopCh:
glog.Infof("Shutdown signal received")
case <-g.waitForProcess():
if err = g.process.Error(); err != nil {
glog.V(4).Infof("Underlying process failed with error: %s", err)
glog.Errorf("Underlying process failed with error: %s", err)
} else {
glog.V(4).Infof("Underlying process unexpectedly exited")
glog.Errorf("Underlying process unexpectedly exited")
}
case err = <-g.runController(ctrlStopCh):
if err != nil {
glog.V(4).Infof("Control loop failed with error: %s", err)
glog.Errorf("Control loop failed with error: %s", err)
} else {
glog.V(4).Infof("Control loop unexpectedly exited")
glog.Errorf("Control loop unexpectedly exited")
}
case err = <-g.runElector(ctrlStopCh):
if err != nil {
glog.Errorf("Leader elector failed with error: %s", err)
} else {
glog.Errorf("Leader elector unexpectedly exited")
}
}

glog.V(4).Infof("Shutdown signal received")
thisPilot, err := g.controller.ThisPilot()
if err != nil {
return err
}

return g.stop(thisPilot)
}

Expand All @@ -95,8 +104,20 @@ func (g *GenericPilot) runController(stopCh <-chan struct{}) <-chan error {
out := make(chan error, 1)
go func() {
defer close(out)
res := g.controller.Run(stopCh)
out <- res
out <- g.controller.Run(stopCh)
}()
return out
}

func (g *GenericPilot) runElector(stopCh <-chan struct{}) <-chan error {
out := make(chan error, 1)
go func() {
defer close(out)
out <- g.elector.Run()
}()
return out
}

func (g *GenericPilot) Elector() leaderelection.Interface {
return g.elector
}
81 changes: 81 additions & 0 deletions pkg/pilot/genericpilot/leaderelection/leaderelection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package leaderelection

import (
"fmt"
"os"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"
)

const (
defaultLeaderElectionLeaseDuration = 15 * time.Second
defaultLeaderElectionRenewDeadline = 10 * time.Second
defaultLeaderElectionRetryPeriod = 2 * time.Second
)

type Interface interface {
Run() error
Leading() bool
}

type Elector struct {
LockMeta metav1.ObjectMeta
Client kubernetes.Interface
Recorder record.EventRecorder

leading bool
}

var _ Interface = &Elector{}

func (e *Elector) Run() error {
// Identity used to distinguish between multiple controller manager instances
id, err := os.Hostname()
if err != nil {
return fmt.Errorf("error getting hostname: %s", err)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hostname is unique to the Pod running the pilot process, right?
What will it be? The Pod name?
In which case, it might be better to use the supplied --pilot-name, especially when we get round to writing tests for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100% sure on this one. Pilot name does make more sense, however if there is a Deployment managing our Pods, and the update strategy is set to rollingUpdate, there are instances where two pods can be running at once (and both would have the same pilot name). Typically hostname is used when using the leader election packages as it should be unique (hence why we use Hostname when performing this same leader election in navigator-controller)


// Lock required for leader election
rl := resourcelock.ConfigMapLock{
ConfigMapMeta: e.LockMeta,
Client: e.Client.CoreV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: id + "-external-navigator-controller",
EventRecorder: e.Recorder,
},
}

leaderElector, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
Lock: &rl,
LeaseDuration: defaultLeaderElectionLeaseDuration,
RenewDeadline: defaultLeaderElectionRenewDeadline,
RetryPeriod: defaultLeaderElectionRetryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(_ <-chan struct{}) { e.startLeading() },
OnStoppedLeading: e.stopLeading,
},
})
if err != nil {
return err
}
// TODO: detect leader elector crashes
leaderElector.Run()
return nil
}

func (e *Elector) Leading() bool {
return e.leading
}

func (e *Elector) startLeading() {
e.leading = true
}

func (e *Elector) stopLeading() {
e.leading = false
}
17 changes: 15 additions & 2 deletions pkg/pilot/genericpilot/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/golang/glog"
"github.com/spf13/pflag"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
kubescheme "k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
Expand All @@ -20,6 +21,7 @@ import (
informers "github.com/jetstack/navigator/pkg/client/informers/externalversions"
"github.com/jetstack/navigator/pkg/pilot/genericpilot/controller"
"github.com/jetstack/navigator/pkg/pilot/genericpilot/hook"
"github.com/jetstack/navigator/pkg/pilot/genericpilot/leaderelection"
"github.com/jetstack/navigator/pkg/pilot/genericpilot/probe"
"github.com/jetstack/navigator/pkg/pilot/genericpilot/processmanager"
"github.com/jetstack/navigator/pkg/pilot/genericpilot/signals"
Expand All @@ -41,7 +43,8 @@ type Options struct {
// PilotName is the name of this Pilot
PilotName string
// PilotNamespace is the namespace the corresponding Pilot exists within
PilotNamespace string
PilotNamespace string
LeaderElectionConfigMap string

// CmdFunc returns an *exec.Cmd for a given Pilot resource for the pilot
CmdFunc func(*v1alpha1.Pilot) (*exec.Cmd, error)
Expand All @@ -59,7 +62,8 @@ type Options struct {
ReadinessProbe probe.Check
LivenessProbe probe.Check

SyncFunc func(*v1alpha1.Pilot) error
SyncFunc func(*v1alpha1.Pilot) error
LeaderElectedSyncFunc func(*v1alpha1.Pilot) error
}

func NewDefaultOptions() *Options {
Expand Down Expand Up @@ -151,6 +155,14 @@ func (o *Options) Pilot() (*GenericPilot, error) {
client: o.NavigatorClient,
pilotLister: pilotInformer.Lister(),
recorder: recorder,
elector: &leaderelection.Elector{
LockMeta: metav1.ObjectMeta{
Name: o.LeaderElectionConfigMap,
Namespace: o.PilotNamespace,
},
Client: o.KubernetesClient,
Recorder: recorder,
},
}
genericPilot.controller = controller.NewController(controller.Options{
PilotName: o.PilotName,
Expand All @@ -167,4 +179,5 @@ func (o *Options) Pilot() (*GenericPilot, error) {
func (o *Options) AddFlags(flags *pflag.FlagSet) {
flags.StringVar(&o.PilotName, "pilot-name", "", "The name of this Pilot. If not specified, an auto-detected name will be used.")
flags.StringVar(&o.PilotNamespace, "pilot-namespace", "", "The namespace the corresponding Pilot resource for this Pilot exists within.")
flags.StringVar(&o.LeaderElectionConfigMap, "leader-election-config-map", "", "The name of the ConfigMap to use for leader election")
}