diff --git a/contrib/charts/navigator/templates/rbac.yaml b/contrib/charts/navigator/templates/rbac.yaml index 1c65f2cf8..42960ff50 100644 --- a/contrib/charts/navigator/templates/rbac.yaml +++ b/contrib/charts/navigator/templates/rbac.yaml @@ -91,10 +91,10 @@ items: verbs: ["*"] - apiGroups: [""] resources: ["services", "configmaps", "serviceaccounts", "pods"] - verbs: ["get", "list", "watch", "update", "create", "delete"] + verbs: ["get", "list", "watch", "update", "create", "delete", "patch"] - apiGroups: ["apps"] resources: ["statefulsets"] - verbs: ["get", "list", "watch", "update", "create", "delete"] + verbs: ["get", "list", "watch", "update", "create", "delete", "patch"] - apiGroups: [""] resources: ["endpoints"] verbs: ["*"] diff --git a/pkg/controllers/elasticsearch/nodepool/resources.go b/pkg/controllers/elasticsearch/nodepool/resources.go index 7e9178ea0..c0bdfb8a6 100644 --- a/pkg/controllers/elasticsearch/nodepool/resources.go +++ b/pkg/controllers/elasticsearch/nodepool/resources.go @@ -163,6 +163,7 @@ func elasticsearchPodTemplateSpec(controllerName string, c *v1alpha1.Elasticsear "--elasticsearch-master-url=$(CLUSTER_URL)", "--elasticsearch-roles=$(ROLES)", "--elasticsearch-plugins=$(PLUGINS)", + "--leader-election-config-map=$(LEADER_ELECTION_CONFIG_MAP)", }, Env: []apiv1.EnvVar{ { @@ -177,6 +178,11 @@ func elasticsearchPodTemplateSpec(controllerName string, c *v1alpha1.Elasticsear Name: "PLUGINS", Value: plugins, }, + { + Name: "LEADER_ELECTION_CONFIG_MAP", + // TODO: trim the length of this string + Value: fmt.Sprintf("elastic-%s-leaderelection", c.Name), + }, { Name: "CLUSTER_URL", Value: "http://" + util.ClientServiceName(c) + ":9200", diff --git a/pkg/controllers/elasticsearch/role/resources.go b/pkg/controllers/elasticsearch/role/resources.go index 40a56fc8b..acefebb9d 100644 --- a/pkg/controllers/elasticsearch/role/resources.go +++ b/pkg/controllers/elasticsearch/role/resources.go @@ -23,6 +23,11 @@ func roleForCluster(c *v1alpha1.ElasticsearchCluster) *rbacv1beta1.Role { Verbs: []string{"create", "update", "patch"}, Resources: []string{"events"}, }, + { + APIGroups: []string{""}, + Verbs: []string{"create", "update", "patch", "get", "list", "watch"}, + Resources: []string{"configmaps"}, + }, { APIGroups: []string{navigator.GroupName}, Verbs: []string{"get", "list", "watch"}, diff --git a/pkg/pilot/elasticsearch/v5/controller.go b/pkg/pilot/elasticsearch/v5/controller.go index 73c6fd1a7..b0ea8e837 100644 --- a/pkg/pilot/elasticsearch/v5/controller.go +++ b/pkg/pilot/elasticsearch/v5/controller.go @@ -4,6 +4,8 @@ import ( "context" "fmt" + "github.com/golang/glog" + "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1" ) @@ -12,10 +14,7 @@ const ( ) func (p *Pilot) syncFunc(pilot *v1alpha1.Pilot) error { - // TODO: perform cluster wide actions if we are a leader - if !p.genericPilot.IsThisPilot(pilot) { - return nil - } + glog.V(4).Infof("ElasticsearchController: syncing current pilot %q", pilot.Name) if pilot.Status.Elasticsearch == nil { pilot.Status.Elasticsearch = &v1alpha1.ElasticsearchPilotStatus{} } @@ -39,3 +38,8 @@ func (p *Pilot) syncFunc(pilot *v1alpha1.Pilot) error { } return nil } + +func (p *Pilot) leaderElectedSyncFunc(pilot *v1alpha1.Pilot) error { + glog.V(4).Infof("ElasticsearchController: leader elected sync of pilot %q", pilot.Name) + return nil +} diff --git a/pkg/pilot/elasticsearch/v5/options.go b/pkg/pilot/elasticsearch/v5/options.go index a3c74707f..002a9238f 100644 --- a/pkg/pilot/elasticsearch/v5/options.go +++ b/pkg/pilot/elasticsearch/v5/options.go @@ -118,6 +118,7 @@ func (o *PilotOptions) Complete() error { o.GenericPilotOptions.SharedInformerFactory = o.sharedInformerFactory o.GenericPilotOptions.CmdFunc = o.pilot.CmdFunc o.GenericPilotOptions.SyncFunc = o.pilot.syncFunc + o.GenericPilotOptions.LeaderElectedSyncFunc = o.pilot.leaderElectedSyncFunc o.GenericPilotOptions.Hooks = o.pilot.Hooks() o.GenericPilotOptions.ReadinessProbe = o.pilot.ReadinessCheck() o.GenericPilotOptions.LivenessProbe = o.pilot.LivenessCheck() diff --git a/pkg/pilot/genericpilot/controller.go b/pkg/pilot/genericpilot/controller.go index a5dc0b8d1..50c2bad55 100644 --- a/pkg/pilot/genericpilot/controller.go +++ b/pkg/pilot/genericpilot/controller.go @@ -20,23 +20,35 @@ const ( ) func (g *GenericPilot) syncPilot(pilot *v1alpha1.Pilot) (err error) { - // don't perform status updates for any Pilot other than our own - if !g.controller.IsThisPilot(pilot) { - return g.Options.SyncFunc(pilot) - } - // we defer this to the end of execution to ensure it is run even if a part // of the sync errors defer func() { - errs := []error{err} - errs = append(errs, g.updatePilotStatus(pilot)) - // err will be nil if the contents of errs is all nil - err = utilerrors.NewAggregate(errs) + if g.IsThisPilot(pilot) { + errs := []error{err} + errs = append(errs, g.updatePilotStatus(pilot)) + // err will be nil if the contents of errs is all nil + err = utilerrors.NewAggregate(errs) + } }() g.lock.Lock() defer g.lock.Unlock() + // we defer this to ensure it is run regardless of whether the pilot has + // actually started, else progress across the rest of the application may + // be stalled + defer func() { + if g.Options.LeaderElectedSyncFunc != nil && g.elector.Leading() { + errs := []error{err} + errs = append(errs, g.Options.LeaderElectedSyncFunc(pilot)) + err = utilerrors.NewAggregate(errs) + } + }() + + if !g.IsThisPilot(pilot) { + return + } + err = g.Options.Hooks.Transition(v1alpha1.PilotPhasePreStart, pilot) if err != nil { g.recorder.Eventf(pilot, corev1.EventTypeWarning, ErrExecHook, MessageErrExecHook, err) @@ -65,11 +77,11 @@ func (g *GenericPilot) syncPilot(pilot *v1alpha1.Pilot) (err error) { return err } - return g.Options.SyncFunc(pilot) + err = g.Options.SyncFunc(pilot) + return } func (g *GenericPilot) stop(pilot *v1alpha1.Pilot) error { - // set g.shutdown = true to signal preStop hooks to run g.shutdown = true glog.V(4).Infof("Waiting for process exit and hooks to execute") diff --git a/pkg/pilot/genericpilot/genericpilot.go b/pkg/pilot/genericpilot/genericpilot.go index 6494738e9..61662944b 100644 --- a/pkg/pilot/genericpilot/genericpilot.go +++ b/pkg/pilot/genericpilot/genericpilot.go @@ -11,6 +11,7 @@ import ( clientset "github.com/jetstack/navigator/pkg/client/clientset/versioned" listersv1alpha1 "github.com/jetstack/navigator/pkg/client/listers/navigator/v1alpha1" "github.com/jetstack/navigator/pkg/pilot/genericpilot/controller" + "github.com/jetstack/navigator/pkg/pilot/genericpilot/leaderelection" "github.com/jetstack/navigator/pkg/pilot/genericpilot/processmanager" ) @@ -34,7 +35,8 @@ type GenericPilot struct { shutdown bool // lock is used internally to coordinate updates to fields on the // GenericPilot structure - lock sync.Mutex + lock sync.Mutex + elector leaderelection.Interface } func (g *GenericPilot) Run() error { @@ -50,25 +52,32 @@ func (g *GenericPilot) Run() error { // block until told to shutdown select { case <-g.Options.StopCh: + glog.Infof("Shutdown signal received") case <-g.waitForProcess(): if err = g.process.Error(); err != nil { - glog.V(4).Infof("Underlying process failed with error: %s", err) + glog.Errorf("Underlying process failed with error: %s", err) } else { - glog.V(4).Infof("Underlying process unexpectedly exited") + glog.Errorf("Underlying process unexpectedly exited") } case err = <-g.runController(ctrlStopCh): if err != nil { - glog.V(4).Infof("Control loop failed with error: %s", err) + glog.Errorf("Control loop failed with error: %s", err) } else { - glog.V(4).Infof("Control loop unexpectedly exited") + glog.Errorf("Control loop unexpectedly exited") + } + case err = <-g.runElector(ctrlStopCh): + if err != nil { + glog.Errorf("Leader elector failed with error: %s", err) + } else { + glog.Errorf("Leader elector unexpectedly exited") } } - glog.V(4).Infof("Shutdown signal received") thisPilot, err := g.controller.ThisPilot() if err != nil { return err } + return g.stop(thisPilot) } @@ -95,8 +104,20 @@ func (g *GenericPilot) runController(stopCh <-chan struct{}) <-chan error { out := make(chan error, 1) go func() { defer close(out) - res := g.controller.Run(stopCh) - out <- res + out <- g.controller.Run(stopCh) }() return out } + +func (g *GenericPilot) runElector(stopCh <-chan struct{}) <-chan error { + out := make(chan error, 1) + go func() { + defer close(out) + out <- g.elector.Run() + }() + return out +} + +func (g *GenericPilot) Elector() leaderelection.Interface { + return g.elector +} diff --git a/pkg/pilot/genericpilot/leaderelection/leaderelection.go b/pkg/pilot/genericpilot/leaderelection/leaderelection.go new file mode 100644 index 000000000..d04d2f398 --- /dev/null +++ b/pkg/pilot/genericpilot/leaderelection/leaderelection.go @@ -0,0 +1,81 @@ +package leaderelection + +import ( + "fmt" + "os" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" + "k8s.io/client-go/tools/record" +) + +const ( + defaultLeaderElectionLeaseDuration = 15 * time.Second + defaultLeaderElectionRenewDeadline = 10 * time.Second + defaultLeaderElectionRetryPeriod = 2 * time.Second +) + +type Interface interface { + Run() error + Leading() bool +} + +type Elector struct { + LockMeta metav1.ObjectMeta + Client kubernetes.Interface + Recorder record.EventRecorder + + leading bool +} + +var _ Interface = &Elector{} + +func (e *Elector) Run() error { + // Identity used to distinguish between multiple controller manager instances + id, err := os.Hostname() + if err != nil { + return fmt.Errorf("error getting hostname: %s", err) + } + + // Lock required for leader election + rl := resourcelock.ConfigMapLock{ + ConfigMapMeta: e.LockMeta, + Client: e.Client.CoreV1(), + LockConfig: resourcelock.ResourceLockConfig{ + Identity: id + "-external-navigator-controller", + EventRecorder: e.Recorder, + }, + } + + leaderElector, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{ + Lock: &rl, + LeaseDuration: defaultLeaderElectionLeaseDuration, + RenewDeadline: defaultLeaderElectionRenewDeadline, + RetryPeriod: defaultLeaderElectionRetryPeriod, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(_ <-chan struct{}) { e.startLeading() }, + OnStoppedLeading: e.stopLeading, + }, + }) + if err != nil { + return err + } + // TODO: detect leader elector crashes + leaderElector.Run() + return nil +} + +func (e *Elector) Leading() bool { + return e.leading +} + +func (e *Elector) startLeading() { + e.leading = true +} + +func (e *Elector) stopLeading() { + e.leading = false +} diff --git a/pkg/pilot/genericpilot/options.go b/pkg/pilot/genericpilot/options.go index 94e98f97c..17e61edde 100644 --- a/pkg/pilot/genericpilot/options.go +++ b/pkg/pilot/genericpilot/options.go @@ -9,6 +9,7 @@ import ( "github.com/golang/glog" "github.com/spf13/pflag" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" kubescheme "k8s.io/client-go/kubernetes/scheme" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" @@ -20,6 +21,7 @@ import ( informers "github.com/jetstack/navigator/pkg/client/informers/externalversions" "github.com/jetstack/navigator/pkg/pilot/genericpilot/controller" "github.com/jetstack/navigator/pkg/pilot/genericpilot/hook" + "github.com/jetstack/navigator/pkg/pilot/genericpilot/leaderelection" "github.com/jetstack/navigator/pkg/pilot/genericpilot/probe" "github.com/jetstack/navigator/pkg/pilot/genericpilot/processmanager" "github.com/jetstack/navigator/pkg/pilot/genericpilot/signals" @@ -41,7 +43,8 @@ type Options struct { // PilotName is the name of this Pilot PilotName string // PilotNamespace is the namespace the corresponding Pilot exists within - PilotNamespace string + PilotNamespace string + LeaderElectionConfigMap string // CmdFunc returns an *exec.Cmd for a given Pilot resource for the pilot CmdFunc func(*v1alpha1.Pilot) (*exec.Cmd, error) @@ -59,7 +62,8 @@ type Options struct { ReadinessProbe probe.Check LivenessProbe probe.Check - SyncFunc func(*v1alpha1.Pilot) error + SyncFunc func(*v1alpha1.Pilot) error + LeaderElectedSyncFunc func(*v1alpha1.Pilot) error } func NewDefaultOptions() *Options { @@ -151,6 +155,14 @@ func (o *Options) Pilot() (*GenericPilot, error) { client: o.NavigatorClient, pilotLister: pilotInformer.Lister(), recorder: recorder, + elector: &leaderelection.Elector{ + LockMeta: metav1.ObjectMeta{ + Name: o.LeaderElectionConfigMap, + Namespace: o.PilotNamespace, + }, + Client: o.KubernetesClient, + Recorder: recorder, + }, } genericPilot.controller = controller.NewController(controller.Options{ PilotName: o.PilotName, @@ -167,4 +179,5 @@ func (o *Options) Pilot() (*GenericPilot, error) { func (o *Options) AddFlags(flags *pflag.FlagSet) { flags.StringVar(&o.PilotName, "pilot-name", "", "The name of this Pilot. If not specified, an auto-detected name will be used.") flags.StringVar(&o.PilotNamespace, "pilot-namespace", "", "The namespace the corresponding Pilot resource for this Pilot exists within.") + flags.StringVar(&o.LeaderElectionConfigMap, "leader-election-config-map", "", "The name of the ConfigMap to use for leader election") }