-
Notifications
You must be signed in to change notification settings - Fork 669
/
leadership.go
156 lines (140 loc) · 4.72 KB
/
leadership.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
// Copyright Project Contour Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"os"
"github.com/google/uuid"
"github.com/projectcontour/contour/internal/k8s"
"github.com/projectcontour/contour/internal/workgroup"
"github.com/projectcontour/contour/pkg/config"
"github.com/sirupsen/logrus"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
)
func disableLeaderElection(log logrus.FieldLogger) chan struct{} {
log.Info("Leader election disabled")
leader := make(chan struct{})
close(leader)
return leader
}
// setupLeadershipElection registers leadership workers with the group and returns
// a channel which will become ready when this process becomes the leader, or, in the
// event that leadership election is disabled, the channel will be ready immediately.
func setupLeadershipElection(
g *workgroup.Group,
log logrus.FieldLogger,
conf *config.LeaderElectionParameters,
clients *k8s.Clients, updateNow func(),
) chan struct{} {
le, leader, deposed := newLeaderElector(log, conf, clients)
g.AddContext(func(electionCtx context.Context) {
log.WithFields(logrus.Fields{
"configmapname": conf.Name,
"configmapnamespace": conf.Namespace,
}).Info("started leader election")
le.Run(electionCtx)
log.Info("stopped leader election")
})
g.Add(func(stop <-chan struct{}) error {
log := log.WithField("context", "leaderelection")
for {
select {
case <-stop:
// shut down
log.Info("stopped leader election")
return nil
case <-leader:
log.Info("elected as leader, triggering rebuild")
updateNow()
// disable this case
leader = nil
case <-deposed:
// If we get deposed as leader, shut it down.
log.Info("deposed as leader, shutting down")
return nil
}
}
})
return leader
}
// newLeaderElector creates a new leaderelection.LeaderElector and associated
// channels by which to observe elections and depositions.
func newLeaderElector(
log logrus.FieldLogger,
conf *config.LeaderElectionParameters,
clients *k8s.Clients,
) (*leaderelection.LeaderElector, chan struct{}, chan struct{}) {
log = log.WithField("context", "leaderelection")
// leaderOK will block gRPC startup until it's closed.
leaderOK := make(chan struct{})
// deposed is closed by the leader election callback when
// we are deposed as leader so that we can clean up.
deposed := make(chan struct{})
rl := newResourceLock(log, conf, clients)
// Make the leader elector, ready to be used in the Workgroup.
le, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: conf.LeaseDuration,
RenewDeadline: conf.RenewDeadline,
RetryPeriod: conf.RetryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(_ context.Context) {
log.WithFields(logrus.Fields{
"lock": rl.Describe(),
"identity": rl.Identity(),
}).Info("elected leader")
close(leaderOK)
},
OnStoppedLeading: func() {
// The context being canceled will trigger a handler that will
// deal with being deposed.
close(deposed)
},
},
})
if err != nil {
log.WithError(err).Fatal("failed to create leader elector")
}
return le, leaderOK, deposed
}
// newResourceLock creates a new resourcelock.Interface based on the Pod's name,
// or a uuid if the name cannot be determined.
func newResourceLock(log logrus.FieldLogger, conf *config.LeaderElectionParameters, clients *k8s.Clients) resourcelock.Interface {
resourceLockID, found := os.LookupEnv("POD_NAME")
if !found {
resourceLockID = uuid.New().String()
}
rl, err := resourcelock.New(
// TODO(youngnick) change this to a Lease object instead
// of the configmap once the Lease API has been GA for a full support
// cycle (ie nine months).
// Figure out the resource lock ID
resourcelock.ConfigMapsResourceLock,
conf.Namespace,
conf.Name,
clients.ClientSet().CoreV1(),
clients.ClientSet().CoordinationV1(),
resourcelock.ResourceLockConfig{
Identity: resourceLockID,
},
)
if err != nil {
log.WithError(err).
WithField("name", conf.Name).
WithField("namespace", conf.Namespace).
WithField("identity", resourceLockID).
Fatal("failed to create new resource lock")
}
return rl
}