/
tracker.go
368 lines (333 loc) · 11.1 KB
/
tracker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
// Copyright 2015 Canonical Ltd.
// Licensed under the AGPLv3, see LICENCE file for details.
package leadership
import (
"time"
"github.com/juju/errors"
"github.com/juju/loggo"
"github.com/juju/names"
"launchpad.net/tomb"
"github.com/juju/juju/core/leadership"
)
var logger = loggo.GetLogger("juju.worker.leadership")
type Tracker struct {
tomb tomb.Tomb
claimer leadership.Claimer
unitName string
serviceName string
duration time.Duration
isMinion bool
claimLease chan struct{}
renewLease <-chan time.Time
claimTickets chan chan bool
waitLeaderTickets chan chan bool
waitMinionTickets chan chan bool
waitingLeader []chan bool
waitingMinion []chan bool
}
// NewTracker returns a *Tracker that attempts to claim and retain service
// leadership for the supplied unit. It will claim leadership for twice the
// supplied duration, and once it's leader it will renew leadership every
// time the duration elapses.
// Thus, successful leadership claims on the resulting Tracker will guarantee
// leadership for the duration supplied here without generating additional
// calls to the supplied manager (which may very well be on the other side of
// a network connection).
func NewTracker(tag names.UnitTag, claimer leadership.Claimer, duration time.Duration) *Tracker {
unitName := tag.Id()
serviceName, _ := names.UnitService(unitName)
t := &Tracker{
unitName: unitName,
serviceName: serviceName,
claimer: claimer,
duration: duration,
claimTickets: make(chan chan bool),
waitLeaderTickets: make(chan chan bool),
waitMinionTickets: make(chan chan bool),
}
go func() {
defer t.tomb.Done()
defer func() {
for _, ticketCh := range t.waitingLeader {
close(ticketCh)
}
for _, ticketCh := range t.waitingMinion {
close(ticketCh)
}
}()
err := t.loop()
// TODO: jam 2015-04-02 is this the most elegant way to make
// sure we shutdown cleanly? Essentially the lowest level sees
// that we are dying, and propagates an ErrDying up to us so
// that we shut down, which we then are passing back into
// Tomb.Kill().
// Tomb.Kill() special cases the exact object ErrDying, and has
// no idea about errors.Cause and the general errors.Trace
// mechanisms that we use.
// So we explicitly unwrap before calling tomb.Kill() else
// tomb.Stop() thinks that we have a genuine error.
switch cause := errors.Cause(err); cause {
case tomb.ErrDying:
err = cause
}
t.tomb.Kill(err)
}()
return t
}
// Kill is part of the worker.Worker interface.
func (t *Tracker) Kill() {
t.tomb.Kill(nil)
}
// Wait is part of the worker.Worker interface.
func (t *Tracker) Wait() error {
return t.tomb.Wait()
}
// ServiceName is part of the leadership.Tracker interface.
func (t *Tracker) ServiceName() string {
return t.serviceName
}
// ClaimDuration is part of the leadership.Tracker interface.
func (t *Tracker) ClaimDuration() time.Duration {
return t.duration
}
// ClaimLeader is part of the leadership.Tracker interface.
func (t *Tracker) ClaimLeader() leadership.Ticket {
return t.submit(t.claimTickets)
}
// WaitLeader is part of the leadership.Tracker interface.
func (t *Tracker) WaitLeader() leadership.Ticket {
return t.submit(t.waitLeaderTickets)
}
// WaitMinion is part of the leadership.Tracker interface.
func (t *Tracker) WaitMinion() leadership.Ticket {
return t.submit(t.waitMinionTickets)
}
func (t *Tracker) loop() error {
logger.Debugf("%s making initial claim for %s leadership", t.unitName, t.serviceName)
if err := t.refresh(); err != nil {
return errors.Trace(err)
}
for {
select {
case <-t.tomb.Dying():
return tomb.ErrDying
case <-t.claimLease:
logger.Debugf("%s claiming lease for %s leadership", t.unitName, t.serviceName)
t.claimLease = nil
if err := t.refresh(); err != nil {
return errors.Trace(err)
}
case <-t.renewLease:
logger.Debugf("%s renewing lease for %s leadership", t.unitName, t.serviceName)
t.renewLease = nil
if err := t.refresh(); err != nil {
return errors.Trace(err)
}
case ticketCh := <-t.claimTickets:
logger.Debugf("%s got claim request for %s leadership", t.unitName, t.serviceName)
if err := t.resolveClaim(ticketCh); err != nil {
return errors.Trace(err)
}
case ticketCh := <-t.waitLeaderTickets:
logger.Debugf("%s got wait request for %s leadership", t.unitName, t.serviceName)
if err := t.resolveWaitLeader(ticketCh); err != nil {
return errors.Trace(err)
}
case ticketCh := <-t.waitMinionTickets:
logger.Debugf("%s got wait request for %s leadership loss", t.unitName, t.serviceName)
if err := t.resolveWaitMinion(ticketCh); err != nil {
return errors.Trace(err)
}
}
}
}
// refresh makes a leadership request, and updates Tracker state to conform to
// latest known reality.
func (t *Tracker) refresh() error {
logger.Debugf("checking %s for %s leadership", t.unitName, t.serviceName)
leaseDuration := 2 * t.duration
// TODO(fwereade): 2016-03-17 lp:1558657
untilTime := time.Now().Add(leaseDuration)
err := t.claimer.ClaimLeadership(t.serviceName, t.unitName, leaseDuration)
switch {
case err == nil:
return t.setLeader(untilTime)
case errors.Cause(err) == leadership.ErrClaimDenied:
return t.setMinion()
}
return errors.Annotatef(err, "leadership failure")
}
// setLeader arranges for lease renewal.
func (t *Tracker) setLeader(untilTime time.Time) error {
logger.Debugf("%s confirmed for %s leadership until %s", t.unitName, t.serviceName, untilTime)
renewTime := untilTime.Add(-t.duration)
logger.Infof("%s will renew %s leadership at %s", t.unitName, t.serviceName, renewTime)
t.isMinion = false
t.claimLease = nil
// TODO(fwereade): 2016-03-17 lp:1558657
t.renewLease = time.After(renewTime.Sub(time.Now()))
for len(t.waitingLeader) > 0 {
logger.Debugf("notifying %s ticket of impending %s leadership", t.unitName, t.serviceName)
var ticketCh chan bool
ticketCh, t.waitingLeader = t.waitingLeader[0], t.waitingLeader[1:]
defer close(ticketCh)
if err := t.sendTrue(ticketCh); err != nil {
return errors.Trace(err)
}
}
return nil
}
// setMinion arranges for lease acquisition when there's an opportunity.
func (t *Tracker) setMinion() error {
logger.Infof("%s leadership for %s denied", t.serviceName, t.unitName)
t.isMinion = true
t.renewLease = nil
if t.claimLease == nil {
t.claimLease = make(chan struct{})
go func() {
defer close(t.claimLease)
logger.Debugf("%s waiting for %s leadership release", t.unitName, t.serviceName)
err := t.claimer.BlockUntilLeadershipReleased(t.serviceName)
if err != nil {
logger.Warningf("error while %s waiting for %s leadership release: %v", t.unitName, t.serviceName, err)
}
// We don't need to do anything else with the error, because we just
// close the claimLease channel and trigger a leadership claim on the
// main loop; if anything's gone seriously wrong we'll find out right
// away and shut down anyway. (And if this goroutine outlives the
// Tracker, it keeps it around as a zombie, but I don't see a way
// around that...)
}()
}
for len(t.waitingMinion) > 0 {
logger.Debugf("notifying %s ticket of impending loss of %s leadership", t.unitName, t.serviceName)
var ticketCh chan bool
ticketCh, t.waitingMinion = t.waitingMinion[0], t.waitingMinion[1:]
defer close(ticketCh)
if err := t.sendTrue(ticketCh); err != nil {
return errors.Trace(err)
}
}
return nil
}
// isLeader returns true if leadership is guaranteed for the Tracker's duration.
func (t *Tracker) isLeader() (bool, error) {
if !t.isMinion {
// Last time we looked, we were leader.
select {
case <-t.tomb.Dying():
return false, errors.Trace(tomb.ErrDying)
case <-t.renewLease:
logger.Debugf("%s renewing lease for %s leadership", t.unitName, t.serviceName)
t.renewLease = nil
if err := t.refresh(); err != nil {
return false, errors.Trace(err)
}
default:
logger.Debugf("%s still has %s leadership", t.unitName, t.serviceName)
}
}
return !t.isMinion, nil
}
// resolveClaim will send true on the supplied channel if leadership can be
// successfully verified, and will always close it whether or not it sent.
func (t *Tracker) resolveClaim(ticketCh chan bool) error {
logger.Debugf("resolving %s leadership ticket for %s...", t.serviceName, t.unitName)
defer close(ticketCh)
if leader, err := t.isLeader(); err != nil {
return errors.Trace(err)
} else if !leader {
logger.Debugf("%s is not %s leader", t.unitName, t.serviceName)
return nil
}
logger.Debugf("confirming %s leadership for %s", t.serviceName, t.unitName)
return t.sendTrue(ticketCh)
}
// resolveWaitLeader will send true on the supplied channel if leadership can be
// guaranteed for the Tracker's duration. It will then close the channel. If
// leadership cannot be guaranteed, the channel is left untouched until either
// the termination of the Tracker or the next invocation of setLeader; at which
// point true is sent if applicable, and the channel is closed.
func (t *Tracker) resolveWaitLeader(ticketCh chan bool) error {
var dontClose bool
defer func() {
if !dontClose {
close(ticketCh)
}
}()
if leader, err := t.isLeader(); err != nil {
return errors.Trace(err)
} else if leader {
logger.Debugf("reporting %s leadership for %s", t.serviceName, t.unitName)
return t.sendTrue(ticketCh)
}
logger.Debugf("waiting for %s to attain %s leadership", t.unitName, t.serviceName)
t.waitingLeader = append(t.waitingLeader, ticketCh)
dontClose = true
return nil
}
// resolveWaitMinion will close the supplied channel as soon as leadership cannot
// be guaranteed beyond the Tracker's duration.
func (t *Tracker) resolveWaitMinion(ticketCh chan bool) error {
var dontClose bool
defer func() {
if !dontClose {
close(ticketCh)
}
}()
if leader, err := t.isLeader(); err != nil {
return errors.Trace(err)
} else if leader {
logger.Debugf("waiting for %s to lose %s leadership", t.unitName, t.serviceName)
t.waitingMinion = append(t.waitingMinion, ticketCh)
dontClose = true
} else {
logger.Debugf("reporting %s leadership loss for %s", t.serviceName, t.unitName)
}
return nil
}
func (t *Tracker) sendTrue(ticketCh chan bool) error {
select {
case <-t.tomb.Dying():
return tomb.ErrDying
case ticketCh <- true:
return nil
}
}
func (t *Tracker) submit(tickets chan chan bool) leadership.Ticket {
ticketCh := make(chan bool, 1)
select {
case <-t.tomb.Dying():
close(ticketCh)
case tickets <- ticketCh:
}
ticket := &ticket{
ch: ticketCh,
ready: make(chan struct{}),
}
go ticket.run()
return ticket
}
// ticket is used by Tracker to communicate leadership status back to a client.
type ticket struct {
ch chan bool
ready chan struct{}
success bool
}
func (t *ticket) run() {
defer close(t.ready)
// This is only safe/sane because the Tracker promises to close all pending
// ticket channels when it shuts down.
if <-t.ch {
t.success = true
}
}
// Ready is part of the leadership.Ticket interface.
func (t *ticket) Ready() <-chan struct{} {
return t.ready
}
// Wait is part of the leadership.Ticket interface.
func (t *ticket) Wait() bool {
<-t.ready
return t.success
}