-
Notifications
You must be signed in to change notification settings - Fork 31
Perform leaderelection in GenericPilot #186
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,8 @@ import ( | |
"context" | ||
"fmt" | ||
|
||
"github.com/golang/glog" | ||
|
||
"github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1" | ||
) | ||
|
||
|
@@ -12,10 +14,7 @@ const ( | |
) | ||
|
||
func (p *Pilot) syncFunc(pilot *v1alpha1.Pilot) error { | ||
// TODO: perform cluster wide actions if we are a leader | ||
if !p.genericPilot.IsThisPilot(pilot) { | ||
return nil | ||
} | ||
glog.V(4).Infof("ElasticsearchController: syncing current pilot %q", pilot.Name) | ||
if pilot.Status.Elasticsearch == nil { | ||
pilot.Status.Elasticsearch = &v1alpha1.ElasticsearchPilotStatus{} | ||
} | ||
|
@@ -39,3 +38,8 @@ func (p *Pilot) syncFunc(pilot *v1alpha1.Pilot) error { | |
} | ||
return nil | ||
} | ||
|
||
func (p *Pilot) leaderElectedSyncFunc(pilot *v1alpha1.Pilot) error { | ||
glog.V(4).Infof("ElasticsearchController: leader elected sync of pilot %q", pilot.Name) | ||
return nil | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What operations might be performed here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So in the ES pilot case, excludeShards will be handled here. I've got some more work locally to actually do this. This needs to be leader elected as it requires updating the ES API with the new list of nodes that should be excluded. We could do it in each individual Pilot and just hope there's no races, but this way is far more concrete. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,6 +11,7 @@ import ( | |
clientset "github.com/jetstack/navigator/pkg/client/clientset/versioned" | ||
listersv1alpha1 "github.com/jetstack/navigator/pkg/client/listers/navigator/v1alpha1" | ||
"github.com/jetstack/navigator/pkg/pilot/genericpilot/controller" | ||
"github.com/jetstack/navigator/pkg/pilot/genericpilot/leaderelection" | ||
"github.com/jetstack/navigator/pkg/pilot/genericpilot/processmanager" | ||
) | ||
|
||
|
@@ -34,7 +35,8 @@ type GenericPilot struct { | |
shutdown bool | ||
// lock is used internally to coordinate updates to fields on the | ||
// GenericPilot structure | ||
lock sync.Mutex | ||
lock sync.Mutex | ||
elector leaderelection.Interface | ||
} | ||
|
||
func (g *GenericPilot) Run() error { | ||
|
@@ -62,6 +64,12 @@ func (g *GenericPilot) Run() error { | |
} else { | ||
glog.V(4).Infof("Control loop unexpectedly exited") | ||
} | ||
case err = <-g.runElector(ctrlStopCh): | ||
if err != nil { | ||
glog.V(4).Infof("Leader elector failed with error: %s", err) | ||
} else { | ||
glog.V(4).Infof("Leader elector unexpectedly exited") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Log these as errors? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So as soon as the the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So we don't explicitly wait for each goroutine to exit right now - instead we rely on the call to We can change this to explicitly wait for the controller to finish 'draining' it's workers, and perhaps we should. As we don't currently rely on that though I've left it out for now in an attempt to not further block up process exit. I can foresee us needing to change this like you say in future though, as the controller is responsible for reporting Pilot status etc. |
||
} | ||
} | ||
|
||
glog.V(4).Infof("Shutdown signal received") | ||
|
@@ -95,8 +103,20 @@ func (g *GenericPilot) runController(stopCh <-chan struct{}) <-chan error { | |
out := make(chan error, 1) | ||
go func() { | ||
defer close(out) | ||
res := g.controller.Run(stopCh) | ||
out <- res | ||
out <- g.controller.Run(stopCh) | ||
}() | ||
return out | ||
} | ||
|
||
func (g *GenericPilot) runElector(stopCh <-chan struct{}) <-chan error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The stopCh isn't used. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
out := make(chan error, 1) | ||
go func() { | ||
defer close(out) | ||
out <- g.elector.Run() | ||
}() | ||
return out | ||
} | ||
|
||
func (g *GenericPilot) Elector() leaderelection.Interface { | ||
return g.elector | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
package leaderelection | ||
|
||
import ( | ||
"fmt" | ||
"os" | ||
"time" | ||
|
||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/client-go/kubernetes" | ||
"k8s.io/client-go/tools/leaderelection" | ||
"k8s.io/client-go/tools/leaderelection/resourcelock" | ||
"k8s.io/client-go/tools/record" | ||
) | ||
|
||
const ( | ||
defaultLeaderElectionLeaseDuration = 15 * time.Second | ||
defaultLeaderElectionRenewDeadline = 10 * time.Second | ||
defaultLeaderElectionRetryPeriod = 2 * time.Second | ||
) | ||
|
||
type Interface interface { | ||
Run() error | ||
Leading() bool | ||
} | ||
|
||
type Elector struct { | ||
LockMeta metav1.ObjectMeta | ||
Client kubernetes.Interface | ||
Recorder record.EventRecorder | ||
|
||
leading bool | ||
} | ||
|
||
var _ Interface = &Elector{} | ||
|
||
func (e *Elector) Run() error { | ||
// Identity used to distinguish between multiple controller manager instances | ||
id, err := os.Hostname() | ||
if err != nil { | ||
return fmt.Errorf("error getting hostname: %s", err) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hostname is unique to the Pod running the pilot process, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not 100% sure on this one. Pilot name does make more sense, however if there is a Deployment managing our Pods, and the update strategy is set to rollingUpdate, there are instances where two pods can be running at once (and both would have the same pilot name). Typically hostname is used when using the leader election packages as it should be unique (hence why we use Hostname when performing this same leader election in navigator-controller) |
||
|
||
// Lock required for leader election | ||
rl := resourcelock.ConfigMapLock{ | ||
ConfigMapMeta: e.LockMeta, | ||
Client: e.Client.CoreV1(), | ||
LockConfig: resourcelock.ResourceLockConfig{ | ||
Identity: id + "-external-navigator-controller", | ||
EventRecorder: e.Recorder, | ||
}, | ||
} | ||
|
||
leaderElector, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{ | ||
Lock: &rl, | ||
LeaseDuration: defaultLeaderElectionLeaseDuration, | ||
RenewDeadline: defaultLeaderElectionRenewDeadline, | ||
RetryPeriod: defaultLeaderElectionRetryPeriod, | ||
Callbacks: leaderelection.LeaderCallbacks{ | ||
OnStartedLeading: func(_ <-chan struct{}) { e.startLeading() }, | ||
OnStoppedLeading: e.stopLeading, | ||
}, | ||
}) | ||
if err != nil { | ||
return err | ||
} | ||
// TODO: detect leader elector crashes | ||
leaderElector.Run() | ||
return nil | ||
} | ||
|
||
func (e *Elector) Leading() bool { | ||
return e.leading | ||
} | ||
|
||
func (e *Elector) startLeading() { | ||
e.leading = true | ||
} | ||
|
||
func (e *Elector) stopLeading() { | ||
e.leading = false | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note to self: this config map name is not unique enough