-
Notifications
You must be signed in to change notification settings - Fork 269
/
election.go
515 lines (445 loc) · 14 KB
/
election.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package election
import (
"context"
"encoding/json"
"fmt"
"math"
"sync"
"time"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/pkg/terror"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
"go.uber.org/atomic"
"go.uber.org/zap"
)
const (
// newSessionDefaultRetryCnt is the default retry times when creating new session.
newSessionDefaultRetryCnt = 3
// newSessionRetryUnlimited is the unlimited retry times when creating new session.
newSessionRetryUnlimited = math.MaxInt64
// newSessionRetryInterval is the interval time when retrying to create a new session.
newSessionRetryInterval = 200 * time.Millisecond
// IsLeader means current compaigner become leader.
IsLeader = "isLeader"
// RetireFromLeader means current compaigner is old leader, and retired.
RetireFromLeader = "retireFromLeader"
// IsNotLeader means current compaigner is not old leader and current leader.
IsNotLeader = "isNotLeader"
)
// CampaignerInfo is the campaigner's information.
type CampaignerInfo struct {
ID string `json:"id"`
// addr is the campaigner's advertise address
Addr string `json:"addr"`
}
func (c *CampaignerInfo) String() string {
infoBytes, err := json.Marshal(c)
if err != nil {
// this should never happened
return fmt.Sprintf("id: %s, addr: %s", c.ID, c.Addr)
}
return string(infoBytes)
}
func getCampaignerInfo(infoBytes []byte) (*CampaignerInfo, error) {
info := &CampaignerInfo{}
err := json.Unmarshal(infoBytes, info)
if err != nil {
return nil, err
}
return info, nil
}
// Election implements the leader election based on etcd.
type Election struct {
// the Election instance does not own the client instance,
// so do not close it in the methods of Election.
cli *clientv3.Client
sessionTTL int
key string
info *CampaignerInfo
infoStr string
ech chan error
leaderCh chan *CampaignerInfo
isLeader atomic.Bool
closed atomic.Bool
cancel context.CancelFunc
bgWg sync.WaitGroup
campaignMu sync.RWMutex
cancelCampaign func()
// notifyBlockTime is the max block time for notify leader
notifyBlockTime time.Duration
// set evictLeader to true if don't hope this member be leader
evictLeader atomic.Bool
resignCh chan struct{}
l log.Logger
}
// NewElection creates a new etcd leader Election instance and starts the campaign loop.
func NewElection(ctx context.Context, cli *clientv3.Client, sessionTTL int, key, id, addr string, notifyBlockTime time.Duration) (*Election, error) {
ctx2, cancel2 := context.WithCancel(ctx)
e := &Election{
cli: cli,
sessionTTL: sessionTTL,
key: key,
info: &CampaignerInfo{
ID: id,
Addr: addr,
},
notifyBlockTime: notifyBlockTime,
leaderCh: make(chan *CampaignerInfo, 1),
ech: make(chan error, 1), // size 1 is enough
cancel: cancel2,
l: log.With(zap.String("component", "election")),
}
e.infoStr = e.info.String()
// try create a session before enter the campaign loop.
// so we can detect potential error earlier.
session, err := e.newSession(ctx, newSessionDefaultRetryCnt)
if err != nil {
cancel2()
return nil, terror.ErrElectionCampaignFail.Delegate(err, "create the initial session")
}
e.bgWg.Add(1)
go func() {
defer e.bgWg.Done()
e.campaignLoop(ctx2, session)
}()
return e, nil
}
// IsLeader returns whether this member is the leader.
func (e *Election) IsLeader() bool {
return e.isLeader.Load()
}
// ID returns the current member's ID.
func (e *Election) ID() string {
return e.info.ID
}
// LeaderInfo returns the current leader's key, ID and address.
// it's similar with https://github.com/etcd-io/etcd/blob/v3.4.3/clientv3/concurrency/election.go#L147.
func (e *Election) LeaderInfo(ctx context.Context) (string, string, string, error) {
resp, err := e.cli.Get(ctx, e.key, clientv3.WithFirstCreate()...)
if err != nil {
return "", "", "", terror.ErrElectionGetLeaderIDFail.Delegate(err)
} else if len(resp.Kvs) == 0 {
// no leader currently elected
return "", "", "", terror.ErrElectionGetLeaderIDFail.Delegate(concurrency.ErrElectionNoLeader)
}
leaderInfo, err := getCampaignerInfo(resp.Kvs[0].Value)
if err != nil {
return "", "", "", terror.ErrElectionGetLeaderIDFail.Delegate(err)
}
return string(resp.Kvs[0].Key), leaderInfo.ID, leaderInfo.Addr, nil
}
// LeaderNotify returns a channel that can fetch the leader's information when the member become the leader or retire from the leader, or get a new leader.
// leader's information can be nil which means this member is leader and retire.
func (e *Election) LeaderNotify() <-chan *CampaignerInfo {
return e.leaderCh
}
// ErrorNotify returns a channel that can fetch errors occurred for campaign.
func (e *Election) ErrorNotify() <-chan error {
return e.ech
}
// Close closes the election instance and release the resources.
func (e *Election) Close() {
e.l.Info("election is closing", zap.Stringer("current member", e.info))
if !e.closed.CAS(false, true) {
e.l.Info("election was already closed", zap.Stringer("current member", e.info))
return
}
e.cancel()
e.bgWg.Wait()
e.l.Info("election is closed", zap.Stringer("current member", e.info))
}
func (e *Election) campaignLoop(ctx context.Context, session *concurrency.Session) {
closeSession := func(se *concurrency.Session) {
err2 := se.Close() // only log this error
if err2 != nil {
e.l.Error("fail to close etcd session", zap.Int64("lease", int64(se.Lease())), zap.Error(err2))
}
}
failpoint.Inject("mockCampaignLoopExitedAbnormally", func(_ failpoint.Value) {
closeSession = func(_ *concurrency.Session) {
e.l.Info("skip closeSession", zap.String("failpoint", "mockCampaignLoopExitedAbnormally"))
}
})
var err error
defer func() {
if session != nil {
closeSession(session) // close the latest session.
}
if err != nil && errors.Cause(err) != ctx.Err() { // only send non-ctx.Err() error
e.ech <- err
}
}()
var (
oldLeaderID string
campaignWg sync.WaitGroup
)
for {
// check context canceled/timeout
select {
case <-session.Done():
e.l.Info("etcd session is done, will try to create a new one", zap.Int64("old lease", int64(session.Lease())))
closeSession(session)
session, err = e.newSession(ctx, newSessionRetryUnlimited) // retry until context is done
if err != nil {
err = terror.ErrElectionCampaignFail.Delegate(err, "create a new session")
return
}
case <-ctx.Done():
e.l.Info("break campaign loop, context is done", zap.Stringer("current member", e.info), zap.Error(ctx.Err()))
return
default:
}
// try to campaign
elec := concurrency.NewElection(session, e.key)
ctx2, cancel2 := context.WithCancel(ctx)
e.campaignMu.Lock()
campaignWg.Add(1)
e.cancelCampaign = func() {
cancel2()
campaignWg.Wait()
}
e.campaignMu.Unlock()
go func() {
defer campaignWg.Done()
if e.evictLeader.Load() {
// skip campaign
return
}
e.l.Debug("begin to campaign", zap.Stringer("current member", e.info))
err2 := elec.Campaign(ctx2, e.infoStr)
if err2 != nil {
// because inner commit may return undetermined error, we try to delete the election key manually
deleted, err3 := e.ClearSessionIfNeeded(ctx, e.ID())
if err3 != nil {
e.l.Warn("failed to clean election key", zap.Error(err3))
} else if deleted {
e.l.Info("successful manually clean election key",
zap.String("campaign error", err2.Error()))
}
// err may be ctx.Err(), but this can be handled in `case <-ctx.Done()`
e.l.Info("fail to campaign", zap.Stringer("current member", e.info), zap.Error(err2))
}
}()
var (
leaderKey string
leaderInfo *CampaignerInfo
)
eleObserveCh := elec.Observe(ctx2)
observeElection:
for {
select {
case <-ctx.Done():
break observeElection
case <-session.Done():
break observeElection
case resp, ok := <-eleObserveCh:
if !ok {
break observeElection
}
e.l.Info("get response from election observe", zap.String("key", string(resp.Kvs[0].Key)), zap.String("value", string(resp.Kvs[0].Value)))
leaderKey = string(resp.Kvs[0].Key)
leaderInfo, err = getCampaignerInfo(resp.Kvs[0].Value)
if err != nil {
// this should never happened
e.l.Error("fail to get leader information", zap.String("value", string(resp.Kvs[0].Value)), zap.Error(err))
continue
}
if oldLeaderID == leaderInfo.ID {
continue
}
oldLeaderID = leaderInfo.ID
break observeElection
}
}
if leaderInfo == nil || len(leaderInfo.ID) == 0 {
cancel2()
campaignWg.Wait()
continue
}
if leaderInfo.ID != e.info.ID {
e.l.Info("current member is not the leader", zap.Stringer("current member", e.info), zap.Stringer("leader", leaderInfo))
e.notifyLeader(ctx, leaderInfo)
cancel2()
campaignWg.Wait()
continue
}
e.l.Info("become leader", zap.Stringer("current member", e.info))
e.notifyLeader(ctx, leaderInfo) // become the leader now
e.watchLeader(ctx, session, leaderKey, elec)
e.l.Info("retire from leader", zap.Stringer("current member", e.info))
e.notifyLeader(ctx, nil) // need to re-campaign
oldLeaderID = ""
cancel2()
campaignWg.Wait()
}
}
// notifyLeader notify the leader's information.
// leader info can be nil, and this is used when retire from leader.
func (e *Election) notifyLeader(ctx context.Context, leaderInfo *CampaignerInfo) {
if leaderInfo != nil && leaderInfo.ID == e.info.ID {
e.isLeader.Store(true)
} else {
e.isLeader.Store(false)
}
select {
case e.leaderCh <- leaderInfo:
case <-time.After(e.notifyBlockTime):
// this should not happened
e.l.Error("ignore notify the leader's information after block a period of time", zap.Stringer("current member", e.info), zap.Stringer("leader", leaderInfo))
case <-ctx.Done():
e.l.Warn("ignore notify the leader's information because context canceled", zap.Stringer("current member", e.info), zap.Stringer("leader", leaderInfo))
}
}
func (e *Election) watchLeader(ctx context.Context, session *concurrency.Session, key string, elec *concurrency.Election) {
e.l.Debug("watch leader key", zap.String("key", key))
e.campaignMu.Lock()
e.resignCh = make(chan struct{})
e.campaignMu.Unlock()
defer func() {
e.campaignMu.Lock()
e.resignCh = nil
e.campaignMu.Unlock()
}()
wCtx, cancel := context.WithCancel(ctx)
defer cancel()
wch := e.cli.Watch(wCtx, key)
for {
if e.evictLeader.Load() {
if err := elec.Resign(ctx); err != nil {
e.l.Info("fail to resign leader", zap.Stringer("current member", e.info), zap.Error(err))
}
return
}
select {
case resp, ok := <-wch:
if !ok {
e.l.Info("watch channel is closed")
return
}
if resp.Canceled {
e.l.Info("watch canceled")
return
}
for _, ev := range resp.Events {
// user may use some etcd client (like etcdctl) to delete the leader key and trigger a new campaign.
if ev.Type == mvccpb.DELETE {
e.l.Info("fail to watch, the leader is deleted", zap.ByteString("key", ev.Kv.Key))
return
}
}
case <-session.Done():
return
case <-ctx.Done():
return
case <-e.resignCh:
if err := elec.Resign(ctx); err != nil {
e.l.Info("fail to resign leader", zap.Stringer("current member", e.info), zap.Error(err))
}
return
}
}
}
// EvictLeader set evictLeader to true, and this member can't be leader.
func (e *Election) EvictLeader() {
if !e.evictLeader.CAS(false, true) {
return
}
e.Resign()
}
// Resign resign the leader.
func (e *Election) Resign() {
// cancel campaign or current member is leader and then resign
e.campaignMu.Lock()
if e.cancelCampaign != nil {
e.cancelCampaign()
e.cancelCampaign = nil
}
if e.resignCh != nil {
close(e.resignCh)
}
e.campaignMu.Unlock()
}
// CancelEvictLeader set evictLeader to false, and this member can campaign leader again.
func (e *Election) CancelEvictLeader() {
if !e.evictLeader.CAS(true, false) {
return
}
e.campaignMu.Lock()
if e.cancelCampaign != nil {
e.cancelCampaign()
e.cancelCampaign = nil
}
e.campaignMu.Unlock()
}
func (e *Election) newSession(ctx context.Context, retryCnt int) (*concurrency.Session, error) {
var (
err error
session *concurrency.Session
)
forLoop:
for i := 0; i < retryCnt; i++ {
if i > 0 {
select {
case e.ech <- terror.ErrElectionCampaignFail.Delegate(err, "create a new session"):
default:
}
select {
case <-time.After(newSessionRetryInterval):
case <-ctx.Done():
break forLoop
}
}
// add more options if needed.
// NOTE: I think use the client's context is better than something like `concurrency.WithContext(ctx)`,
// so we can close the session when the client is still valid.
session, err = concurrency.NewSession(e.cli, concurrency.WithTTL(e.sessionTTL))
if err == nil || errors.Cause(err) == e.cli.Ctx().Err() {
break forLoop
}
}
return session, err
}
// ClearSessionIfNeeded will clear session when deleted master quited abnormally
// returns (triggered deleting session, error).
func (e *Election) ClearSessionIfNeeded(ctx context.Context, id string) (bool, error) {
resp, err := e.cli.Get(ctx, e.key, clientv3.WithPrefix())
if err != nil {
return false, err
}
deleteKey := ""
for _, kv := range resp.Kvs {
leaderInfo, err2 := getCampaignerInfo(kv.Value)
if err2 != nil {
return false, err2
}
if leaderInfo.ID == id {
deleteKey = string(kv.Key)
break
}
}
if deleteKey == "" {
// no campaign info left in etcd, no need to trigger re-campaign
return false, nil
}
delResp, err := e.cli.Delete(ctx, deleteKey)
if err != nil {
return false, err
}
return delResp.Deleted > 0, err
}