Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cmd/contour: simplify leadership election setup #2348

Merged
merged 1 commit into from Mar 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
63 changes: 56 additions & 7 deletions cmd/contour/leadership.go
Expand Up @@ -18,24 +18,73 @@ import (
"os"

"github.com/google/uuid"
"github.com/projectcontour/contour/internal/k8s"
"github.com/projectcontour/contour/internal/workgroup"
"github.com/sirupsen/logrus"
"k8s.io/client-go/kubernetes"
coordinationv1 "k8s.io/client-go/kubernetes/typed/coordination/v1"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
)

// setupLeadershipElection registers leadership workers with the group and returns
// a channel which will become ready when this process becomes the leader, or, in the
// event that leadership election is disabled, the channel will be ready immediately.
func setupLeadershipElection(g *workgroup.Group, log logrus.FieldLogger, ctx *serveContext, clients *k8s.Clients, updateNow func()) chan struct{} {
if ctx.DisableLeaderElection {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest you pull this into a separate function becomeElectedLeader(), then check ctx.DisableLeaderElection from the caller.

log.Info("Leader election disabled")

leader := make(chan struct{})
close(leader)
return leader
}

le, leader, deposed := newLeaderElector(log, ctx, clients)

g.AddContext(func(electionCtx context.Context) {
log.WithFields(logrus.Fields{
"configmapname": ctx.LeaderElectionConfig.Name,
"configmapnamespace": ctx.LeaderElectionConfig.Namespace,
}).Info("started leader election")

le.Run(electionCtx)
log.Info("stopped leader election")
})

g.Add(func(stop <-chan struct{}) error {
log := log.WithField("context", "leaderelection")
for {
select {
case <-stop:
// shut down
log.Info("stopped leader election")
return nil
case <-leader:
log.Info("elected as leader, triggering rebuild")
updateNow()

// disable this case
leader = nil
case <-deposed:
// If we get deposed as leader, shut it down.
log.Info("deposed as leader, shutting down")
return nil
}
}
})

return leader
}

// newLeaderElector creates a new leaderelection.LeaderElector and associated
// channels by which to observe elections and depositions.
func newLeaderElector(log logrus.FieldLogger, ctx *serveContext, client *kubernetes.Clientset, coordinationClient *coordinationv1.CoordinationV1Client) (*leaderelection.LeaderElector, chan struct{}, chan struct{}) {
func newLeaderElector(log logrus.FieldLogger, ctx *serveContext, clients *k8s.Clients) (*leaderelection.LeaderElector, chan struct{}, chan struct{}) {

// leaderOK will block gRPC startup until it's closed.
leaderOK := make(chan struct{})
// deposed is closed by the leader election callback when
// we are deposed as leader so that we can clean up.
deposed := make(chan struct{})

rl := newResourceLock(ctx, client, coordinationClient)
rl := newResourceLock(ctx, clients)

// Make the leader elector, ready to be used in the Workgroup.
le, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
Expand Down Expand Up @@ -64,7 +113,7 @@ func newLeaderElector(log logrus.FieldLogger, ctx *serveContext, client *kuberne

// newResourceLock creates a new resourcelock.Interface based on the Pod's name,
// or a uuid if the name cannot be determined.
func newResourceLock(ctx *serveContext, client *kubernetes.Clientset, coordinationClient *coordinationv1.CoordinationV1Client) resourcelock.Interface {
func newResourceLock(ctx *serveContext, clients *k8s.Clients) resourcelock.Interface {
resourceLockID, found := os.LookupEnv("POD_NAME")
if !found {
resourceLockID = uuid.New().String()
Expand All @@ -78,8 +127,8 @@ func newResourceLock(ctx *serveContext, client *kubernetes.Clientset, coordinati
resourcelock.ConfigMapsResourceLock,
ctx.LeaderElectionConfig.Namespace,
ctx.LeaderElectionConfig.Name,
client.CoreV1(),
coordinationClient,
clients.ClientSet().CoreV1(),
clients.CoordinationClient(),
resourcelock.ResourceLockConfig{
Identity: resourceLockID,
},
Expand Down
50 changes: 2 additions & 48 deletions cmd/contour/serve.go
Expand Up @@ -14,7 +14,6 @@
package main

import (
"context"
"fmt"
"net"
"os"
Expand Down Expand Up @@ -42,7 +41,6 @@ import (
"gopkg.in/alecthomas/kingpin.v2"
"gopkg.in/yaml.v2"
coreinformers "k8s.io/client-go/informers"
"k8s.io/client-go/tools/leaderelection"
)

// registerServe registers the serve subcommand and flags
Expand Down Expand Up @@ -279,52 +277,8 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
}
g.Add(debugsvc.Start)

// step 10. if enabled, register leader election
if !ctx.DisableLeaderElection {
var le *leaderelection.LeaderElector
var deposed chan struct{}
le, eventHandler.IsLeader, deposed = newLeaderElector(log, ctx, clients.ClientSet(), clients.CoordinationClient())

g.AddContext(func(electionCtx context.Context) {
log.WithFields(logrus.Fields{
"configmapname": ctx.LeaderElectionConfig.Name,
"configmapnamespace": ctx.LeaderElectionConfig.Namespace,
}).Info("started leader election")

le.Run(electionCtx)
log.Info("stopped leader election")
})

g.Add(func(stop <-chan struct{}) error {
log := log.WithField("context", "leaderelection")
leader := eventHandler.IsLeader
for {
select {
case <-stop:
// shut down
log.Info("stopped leader election")
return nil
case <-leader:
log.Info("elected as leader, triggering rebuild")
eventHandler.UpdateNow()

// disable this case
leader = nil
case <-deposed:
// If we get deposed as leader, shut it down.
log.Info("deposed as leader, shutting down")
return nil
}
}
})
} else {
log.Info("Leader election disabled")

// leadership election disabled, hardwire IsLeader to be always readable.
leader := make(chan struct{})
close(leader)
eventHandler.IsLeader = leader
}
// step 10. register leadership election
eventHandler.IsLeader = setupLeadershipElection(&g, log, ctx, clients, eventHandler.UpdateNow)

// step 12. create grpc handler and register with workgroup.
g.Add(func(stop <-chan struct{}) error {
Expand Down