forked from purpleidea/mgmt
/
etcd.go
2298 lines (2089 loc) · 77.2 KB
/
etcd.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Mgmt
// Copyright (C) 2013-2017+ James Shubin and the project contributors
// Written by James Shubin <james@shubin.ca> and the project contributors
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
// TODO: Add TTL's (eg: volunteering)
// TODO: Remove race around leader operations
// TODO: Fix server reuse issue (bind: address already in use)
// TODO: Fix unstarted member
// TODO: Fix excessive StartLoop/FinishLoop
// TODO: Add VIP for servers (incorporate with net resource)
// TODO: Auto assign ports/ip's for peers (if possible)
// TODO: Fix godoc
// Package etcd implements the distributed key value store integration.
// This also takes care of managing and clustering the embedded etcd server.
// The elastic etcd algorithm works in the following way:
// * When you start up mgmt, you can pass it a list of seeds.
// * If no seeds are given, then assume you are the first server and startup.
// * If a seed is given, connect as a client, and optionally volunteer to be a server.
// * All volunteering clients should listen for a message from the master for nomination.
// * If a client has been nominated, it should startup a server.
// * All servers should list for their nomination to be removed and shutdown if so.
// * The elected leader should decide who to nominate/unnominate to keep the right number of servers.
//
// Smoke testing:
// mkdir /tmp/mgmt{A..E}
// ./mgmt run --yaml examples/etcd1a.yaml --hostname h1 --tmp-prefix --no-pgp
// ./mgmt run --yaml examples/etcd1b.yaml --hostname h2 --tmp-prefix --no-pgp --seeds http://127.0.0.1:2379 --client-urls http://127.0.0.1:2381 --server-urls http://127.0.0.1:2382
// ./mgmt run --yaml examples/etcd1c.yaml --hostname h3 --tmp-prefix --no-pgp --seeds http://127.0.0.1:2379 --client-urls http://127.0.0.1:2383 --server-urls http://127.0.0.1:2384
// ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2379 put /_mgmt/idealClusterSize 3
// ./mgmt run --yaml examples/etcd1d.yaml --hostname h4 --tmp-prefix --no-pgp --seeds http://127.0.0.1:2379 --client-urls http://127.0.0.1:2385 --server-urls http://127.0.0.1:2386
// ./mgmt run --yaml examples/etcd1e.yaml --hostname h5 --tmp-prefix --no-pgp --seeds http://127.0.0.1:2379 --client-urls http://127.0.0.1:2387 --server-urls http://127.0.0.1:2388
// ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2379 member list
// ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2381 put /_mgmt/idealClusterSize 5
// ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2381 member list
package etcd
import (
"bytes"
"errors"
"fmt"
"log"
"math"
"net/url"
"os"
"path"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/purpleidea/mgmt/converger"
"github.com/purpleidea/mgmt/event"
"github.com/purpleidea/mgmt/resources"
"github.com/purpleidea/mgmt/util"
etcd "github.com/coreos/etcd/clientv3" // "clientv3"
"github.com/coreos/etcd/embed"
"github.com/coreos/etcd/etcdserver"
rpctypes "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
etcdtypes "github.com/coreos/etcd/pkg/types"
raft "github.com/coreos/etcd/raft"
context "golang.org/x/net/context"
"google.golang.org/grpc"
)
// constant parameters which may need to be tweaked or customized
const (
NS = "_mgmt" // root namespace for mgmt operations
seedSentinel = "_seed" // you must not name your hostname this
maxStartServerTimeout = 60 // max number of seconds to wait for server to start
maxStartServerRetries = 3 // number of times to retry starting the etcd server
maxClientConnectRetries = 5 // number of times to retry consecutive connect failures
selfRemoveTimeout = 3 // give unnominated members a chance to self exit
exitDelay = 3 // number of sec of inactivity after exit to clean up
DefaultIdealClusterSize = 5 // default ideal cluster size target for initial seed
DefaultClientURL = "127.0.0.1:2379"
DefaultServerURL = "127.0.0.1:2380"
)
var (
errApplyDeltaEventsInconsistent = errors.New("Etcd: ApplyDeltaEvents: Inconsistent key!")
)
// AW is a struct for the AddWatcher queue
type AW struct {
path string
opts []etcd.OpOption
callback func(*RE) error
errCheck bool
skipConv bool // ask event to skip converger updates
resp event.Resp
cancelFunc func() // data
}
// RE is a response + error struct since these two values often occur together
// This is now called an event with the move to the etcd v3 API
type RE struct {
response etcd.WatchResponse
path string
err error
callback func(*RE) error
errCheck bool // should we check the error of the callback?
skipConv bool // event skips converger updates
retryHint bool // set to true for one event after a watcher failure
retries uint // number of times we've retried on error
}
// KV is a key + value struct to hold the two items together
type KV struct {
key string
value string
opts []etcd.OpOption
resp event.Resp
}
// GQ is a struct for the get queue
type GQ struct {
path string
skipConv bool
opts []etcd.OpOption
resp event.Resp
data map[string]string
}
// DL is a struct for the delete queue
type DL struct {
path string
opts []etcd.OpOption
resp event.Resp
data int64
}
// TN is a struct for the txn queue
type TN struct {
ifcmps []etcd.Cmp
thenops []etcd.Op
elseops []etcd.Op
resp event.Resp
data *etcd.TxnResponse
}
// Flags are some constant flags which are used throughout the program.
type Flags struct {
Debug bool // add additional log messages
Trace bool // add execution flow log messages
Verbose bool // add extra log message output
}
// EmbdEtcd provides the embedded server and client etcd functionality
type EmbdEtcd struct { // EMBeddeD etcd
// etcd client connection related
cLock sync.Mutex // client connect lock
rLock sync.RWMutex // client reconnect lock
client *etcd.Client
cError error // permanent client error
ctxErr error // permanent ctx error
// exit and cleanup related
cancelLock sync.Mutex // lock for the cancels list
cancels []func() // array of every cancel function for watches
exiting bool
exitchan chan struct{}
exitTimeout <-chan time.Time
hostname string
memberID uint64 // cluster membership id of server if running
endpoints etcdtypes.URLsMap // map of servers a client could connect to
clientURLs etcdtypes.URLs // locations to listen for clients if i am a server
serverURLs etcdtypes.URLs // locations to listen for servers if i am a server (peer)
noServer bool // disable all server peering if true
// local tracked state
nominated etcdtypes.URLsMap // copy of who's nominated to locally track state
lastRevision int64 // the revision id of message being processed
idealClusterSize uint16 // ideal cluster size
// etcd channels
awq chan *AW // add watch queue
wevents chan *RE // response+error
setq chan *KV // set queue
getq chan *GQ // get queue
delq chan *DL // delete queue
txnq chan *TN // txn queue
flags Flags
prefix string // folder prefix to use for misc storage
converger converger.Converger // converged tracking
// etcd server related
serverwg sync.WaitGroup // wait for server to shutdown
server *embed.Etcd // technically this contains the server struct
dataDir string // our data dir, prefix + "etcd"
}
// NewEmbdEtcd creates the top level embedded etcd struct client and server obj
func NewEmbdEtcd(hostname string, seeds, clientURLs, serverURLs etcdtypes.URLs, noServer bool, idealClusterSize uint16, flags Flags, prefix string, converger converger.Converger) *EmbdEtcd {
endpoints := make(etcdtypes.URLsMap)
if hostname == seedSentinel { // safety
return nil
}
if len(seeds) > 0 {
endpoints[seedSentinel] = seeds
idealClusterSize = 0 // unset, get from running cluster
}
obj := &EmbdEtcd{
exitchan: make(chan struct{}), // exit signal for main loop
exitTimeout: nil,
awq: make(chan *AW),
wevents: make(chan *RE),
setq: make(chan *KV),
getq: make(chan *GQ),
delq: make(chan *DL),
txnq: make(chan *TN),
nominated: make(etcdtypes.URLsMap),
hostname: hostname,
endpoints: endpoints,
clientURLs: clientURLs,
serverURLs: serverURLs,
noServer: noServer,
idealClusterSize: idealClusterSize,
converger: converger,
flags: flags,
prefix: prefix,
dataDir: path.Join(prefix, "etcd"),
}
// TODO: add some sort of auto assign method for picking these defaults
// add a default so that our local client can connect locally if needed
if len(obj.LocalhostClientURLs()) == 0 { // if we don't have any localhost URLs
u := url.URL{Scheme: "http", Host: DefaultClientURL} // default
obj.clientURLs = append([]url.URL{u}, obj.clientURLs...) // prepend
}
// add a default for local use and testing, harmless and useful!
if !obj.noServer && len(obj.serverURLs) == 0 {
if len(obj.endpoints) > 0 {
obj.noServer = true // we didn't have enough to be a server
}
u := url.URL{Scheme: "http", Host: DefaultServerURL} // default
obj.serverURLs = []url.URL{u}
}
return obj
}
// GetConfig returns the config struct to be used for the etcd client connect
func (obj *EmbdEtcd) GetConfig() etcd.Config {
endpoints := []string{}
// XXX: filter out any urls which wouldn't resolve here ?
for _, eps := range obj.endpoints { // flatten map
for _, u := range eps {
endpoints = append(endpoints, u.Host) // remove http:// prefix
}
}
sort.Strings(endpoints) // sort for determinism
cfg := etcd.Config{
Endpoints: endpoints,
// RetryDialer chooses the next endpoint to use
// it comes with a default dialer if unspecified
DialTimeout: 5 * time.Second,
}
return cfg
}
// Connect connects the client to a server, and then builds the *API structs.
// If reconnect is true, it will force a reconnect with new config endpoints.
func (obj *EmbdEtcd) Connect(reconnect bool) error {
if obj.flags.Debug {
log.Println("Etcd: Connect...")
}
obj.cLock.Lock()
defer obj.cLock.Unlock()
if obj.cError != nil { // stop on permanent error
return obj.cError
}
if obj.client != nil { // memoize
if reconnect {
// i think this requires the rLock when using it concurrently
err := obj.client.Close()
if err != nil {
log.Printf("Etcd: (Re)Connect: Close: Error: %+v", err)
}
obj.client = nil // for kicks
} else {
return nil
}
}
var emax uint16 // = 0
for { // loop until connect
var err error
cfg := obj.GetConfig()
if eps := obj.endpoints; len(eps) > 0 {
log.Printf("Etcd: Connect: Endpoints: %v", eps)
} else {
log.Printf("Etcd: Connect: Endpoints: []")
}
obj.client, err = etcd.New(cfg) // connect!
if err == etcd.ErrNoAvailableEndpoints {
emax++
if emax > maxClientConnectRetries {
log.Printf("Etcd: The dataDir (%s) might be inconsistent or corrupt.", obj.dataDir)
log.Printf("Etcd: Please see: %s", "https://github.com/purpleidea/mgmt/blob/master/DOCUMENTATION.md#what-does-the-error-message-about-an-inconsistent-datadir-mean")
obj.cError = fmt.Errorf("can't find an available endpoint")
return obj.cError
}
err = &CtxDelayErr{time.Duration(emax) * time.Second, "No endpoints available yet!"} // retry with backoff...
}
if err != nil {
log.Printf("Etcd: Connect: CtxError...")
if _, e := obj.CtxError(context.TODO(), err); e != nil {
log.Printf("Etcd: Connect: CtxError: Fatal: %v", e)
obj.cError = e
return e // fatal error
}
continue
}
// check if we're actually connected here, because this must
// block if we're not connected
if obj.client == nil {
log.Printf("Etcd: Connect: Is nil!")
continue
}
break
}
return nil
}
// Startup is the main entry point to kick off the embedded etcd client & server
func (obj *EmbdEtcd) Startup() error {
bootstrapping := len(obj.endpoints) == 0 // because value changes after start
// connect but don't block here, because servers might not be up yet...
go func() {
if err := obj.Connect(false); err != nil {
log.Printf("Etcd: Startup: Error: %v", err)
// XXX: Now cause Startup() to exit with error somehow!
}
}()
go obj.CbLoop() // start callback loop
go obj.Loop() // start main loop
// TODO: implement native etcd watcher method on member API changes
path := fmt.Sprintf("/%s/nominated/", NS)
go obj.AddWatcher(path, obj.nominateCallback, true, false, etcd.WithPrefix()) // no block
// setup ideal cluster size watcher
key := fmt.Sprintf("/%s/idealClusterSize", NS)
go obj.AddWatcher(key, obj.idealClusterSizeCallback, true, false) // no block
// if we have no endpoints, it means we are bootstrapping...
if !bootstrapping {
log.Println("Etcd: Startup: Getting initial values...")
if nominated, err := Nominated(obj); err == nil {
obj.nominated = nominated // store a local copy
} else {
log.Printf("Etcd: Startup: Nominate lookup error.")
obj.Destroy()
return fmt.Errorf("Etcd: Startup: Error: %v", err)
}
// get initial ideal cluster size
if idealClusterSize, err := GetClusterSize(obj); err == nil {
obj.idealClusterSize = idealClusterSize
log.Printf("Etcd: Startup: Ideal cluster size is: %d", idealClusterSize)
} else {
// perhaps the first server didn't set it yet. it's ok,
// we can get it from the watcher if it ever gets set!
log.Printf("Etcd: Startup: Ideal cluster size lookup error.")
}
}
if !obj.noServer {
path := fmt.Sprintf("/%s/volunteers/", NS)
go obj.AddWatcher(path, obj.volunteerCallback, true, false, etcd.WithPrefix()) // no block
}
// if i am alone and will have to be a server...
if !obj.noServer && bootstrapping {
log.Printf("Etcd: Bootstrapping...")
// give an initial value to the obj.nominate map we keep in sync
// this emulates Nominate(obj, obj.hostname, obj.serverURLs)
obj.nominated[obj.hostname] = obj.serverURLs // initial value
// NOTE: when we are stuck waiting for the server to start up,
// it is probably happening on this call right here...
obj.nominateCallback(nil) // kick this off once
}
// self volunteer
if !obj.noServer && len(obj.serverURLs) > 0 {
// we run this in a go routine because it blocks waiting for server
log.Printf("Etcd: Startup: Volunteering...")
go Volunteer(obj, obj.serverURLs)
}
if bootstrapping {
if err := SetClusterSize(obj, obj.idealClusterSize); err != nil {
log.Printf("Etcd: Startup: Ideal cluster size storage error.")
obj.Destroy()
return fmt.Errorf("Etcd: Startup: Error: %v", err)
}
}
go obj.AddWatcher(fmt.Sprintf("/%s/endpoints/", NS), obj.endpointCallback, true, false, etcd.WithPrefix())
if err := obj.Connect(false); err != nil { // don't exit from this Startup function until connected!
return err
}
return nil
}
// Destroy cleans up the entire embedded etcd system. Use DestroyServer if you
// only want to shutdown the embedded server portion.
func (obj *EmbdEtcd) Destroy() error {
// this should also trigger an unnominate, which should cause a shutdown
log.Printf("Etcd: Destroy: Unvolunteering...")
if err := Volunteer(obj, nil); err != nil { // unvolunteer so we can shutdown...
log.Printf("Etcd: Destroy: Error: %v", err) // we have a problem
}
obj.serverwg.Wait() // wait for server shutdown signal
obj.exiting = true // must happen before we run the cancel functions!
// clean up any watchers which might want to continue
obj.cancelLock.Lock() // TODO: do we really need the lock here on exit?
log.Printf("Etcd: Destroy: Cancelling %d operations...", len(obj.cancels))
for _, cancelFunc := range obj.cancels {
cancelFunc()
}
obj.cancelLock.Unlock()
obj.exitchan <- struct{}{} // cause main loop to exit
obj.rLock.Lock()
if obj.client != nil {
obj.client.Close()
}
obj.client = nil
obj.rLock.Unlock()
// this happens in response to the unnominate callback. not needed here!
//if obj.server != nil {
// return obj.DestroyServer()
//}
return nil
}
// CtxDelayErr requests a retry in Delta duration
type CtxDelayErr struct {
Delta time.Duration
Message string
}
func (obj *CtxDelayErr) Error() string {
return fmt.Sprintf("CtxDelayErr(%v): %s", obj.Delta, obj.Message)
}
// CtxRetriesErr lets you retry as long as you have retries available
// TODO: consider combining this with CtxDelayErr
type CtxRetriesErr struct {
Retries uint
Message string
}
func (obj *CtxRetriesErr) Error() string {
return fmt.Sprintf("CtxRetriesErr(%v): %s", obj.Retries, obj.Message)
}
// CtxPermanentErr is a permanent failure error to notify about borkage.
type CtxPermanentErr struct {
Message string
}
func (obj *CtxPermanentErr) Error() string {
return fmt.Sprintf("CtxPermanentErr: %s", obj.Message)
}
// CtxReconnectErr requests a client reconnect to the new endpoint list
type CtxReconnectErr struct {
Message string
}
func (obj *CtxReconnectErr) Error() string {
return fmt.Sprintf("CtxReconnectErr: %s", obj.Message)
}
// CancelCtx adds a tracked cancel function around an existing context
func (obj *EmbdEtcd) CancelCtx(ctx context.Context) (context.Context, func()) {
cancelCtx, cancelFunc := context.WithCancel(ctx)
obj.cancelLock.Lock()
obj.cancels = append(obj.cancels, cancelFunc) // not thread-safe, needs lock
obj.cancelLock.Unlock()
return cancelCtx, cancelFunc
}
// TimeoutCtx adds a tracked cancel function with timeout around an existing context
func (obj *EmbdEtcd) TimeoutCtx(ctx context.Context, t time.Duration) (context.Context, func()) {
timeoutCtx, cancelFunc := context.WithTimeout(ctx, t)
obj.cancelLock.Lock()
obj.cancels = append(obj.cancels, cancelFunc) // not thread-safe, needs lock
obj.cancelLock.Unlock()
return timeoutCtx, cancelFunc
}
// CtxError is called whenever there is a connection or other client problem
// that needs to be resolved before we can continue, eg: connection disconnected,
// change of server to connect to, etc... It modifies the context if needed.
func (obj *EmbdEtcd) CtxError(ctx context.Context, err error) (context.Context, error) {
if obj.ctxErr != nil { // stop on permanent error
return ctx, obj.ctxErr
}
const ctxErr = "ctxErr"
const ctxIter = "ctxIter"
expBackoff := func(tmin, texp, iter, tmax int) time.Duration {
// https://en.wikipedia.org/wiki/Exponential_backoff
// tmin <= texp^iter - 1 <= tmax // TODO: check my math
return time.Duration(math.Min(math.Max(math.Pow(float64(texp), float64(iter))-1.0, float64(tmin)), float64(tmax))) * time.Millisecond
}
var isTimeout = false
var iter int // = 0
if ctxerr, ok := ctx.Value(ctxErr).(error); ok {
if obj.flags.Debug {
log.Printf("Etcd: CtxError: err(%v), ctxerr(%v)", err, ctxerr)
}
if i, ok := ctx.Value(ctxIter).(int); ok {
iter = i + 1 // load and increment
if obj.flags.Debug {
log.Printf("Etcd: CtxError: Iter: %v", iter)
}
}
isTimeout = err == context.DeadlineExceeded
if obj.flags.Debug {
log.Printf("Etcd: CtxError: isTimeout: %v", isTimeout)
}
if !isTimeout {
iter = 0 // reset timer
}
err = ctxerr // restore error
} else if obj.flags.Debug {
log.Printf("Etcd: CtxError: No value found")
}
ctxHelper := func(tmin, texp, tmax int) context.Context {
t := expBackoff(tmin, texp, iter, tmax)
if obj.flags.Debug {
log.Printf("Etcd: CtxError: Timeout: %v", t)
}
ctxT, _ := obj.TimeoutCtx(ctx, t)
ctxV := context.WithValue(ctxT, ctxIter, iter) // save iter
ctxF := context.WithValue(ctxV, ctxErr, err) // save err
return ctxF
}
_ = ctxHelper // TODO
isGrpc := func(e error) bool { // helper function
return grpc.ErrorDesc(err) == e.Error()
}
if err == nil {
log.Fatal("Etcd: CtxError: Error: Unexpected lack of error!")
}
if obj.exiting {
obj.ctxErr = fmt.Errorf("Etcd: CtxError: Exit in progress!")
return ctx, obj.ctxErr
}
// happens when we trigger the cancels during reconnect
if err == context.Canceled {
// TODO: do we want to create a fresh ctx here for all cancels?
//ctx = context.Background()
ctx, _ = obj.CancelCtx(ctx) // add a new one
return ctx, nil // we should retry, reconnect probably happened
}
if delayErr, ok := err.(*CtxDelayErr); ok { // custom delay error
log.Printf("Etcd: CtxError: Reason: %s", delayErr.Error())
time.Sleep(delayErr.Delta) // sleep the amount of time requested
return ctx, nil
}
if retriesErr, ok := err.(*CtxRetriesErr); ok { // custom retry error
log.Printf("Etcd: CtxError: Reason: %s", retriesErr.Error())
if retriesErr.Retries == 0 {
obj.ctxErr = fmt.Errorf("Etcd: CtxError: CtxRetriesErr: No more retries!")
return ctx, obj.ctxErr
}
return ctx, nil
}
if permanentErr, ok := err.(*CtxPermanentErr); ok { // custom permanent error
obj.ctxErr = fmt.Errorf("Etcd: CtxError: Reason: %s", permanentErr.Error())
return ctx, obj.ctxErr // quit
}
if err == etcd.ErrNoAvailableEndpoints { // etcd server is probably starting up
// TODO: tmin, texp, tmax := 500, 2, 16000 // ms, exp base, ms
// TODO: return ctxHelper(tmin, texp, tmax), nil
log.Printf("Etcd: CtxError: No endpoints available yet!")
time.Sleep(500 * time.Millisecond) // a ctx timeout won't help!
return ctx, nil // passthrough
}
// etcd server is apparently still starting up...
if err == rpctypes.ErrNotCapable { // isGrpc(rpctypes.ErrNotCapable) also matches
log.Printf("Etcd: CtxError: Server is starting up...")
time.Sleep(500 * time.Millisecond) // a ctx timeout won't help!
return ctx, nil // passthrough
}
if err == grpc.ErrClientConnTimeout { // sometimes caused by "too many colons" misconfiguration
return ctx, fmt.Errorf("Etcd: Error: Misconfiguration: %v", err) // permanent failure?
}
// this can happen if my client connection shuts down, but without any
// available alternatives. in this case, rotate it off to someone else
reconnectErr, isReconnectErr := err.(*CtxReconnectErr) // custom reconnect error
switch {
case isReconnectErr:
log.Printf("Etcd: CtxError: Reason: %s", reconnectErr.Error())
fallthrough
case err == raft.ErrStopped: // TODO: does this ever happen?
fallthrough
case err == etcdserver.ErrStopped: // TODO: does this ever happen?
fallthrough
case isGrpc(raft.ErrStopped):
fallthrough
case isGrpc(etcdserver.ErrStopped):
fallthrough
case isGrpc(grpc.ErrClientConnClosing):
if obj.flags.Debug {
log.Printf("Etcd: CtxError: Error(%T): %+v", err, err)
log.Printf("Etcd: Endpoints are: %v", obj.client.Endpoints())
log.Printf("Etcd: Client endpoints are: %v", obj.endpoints)
}
if obj.flags.Debug {
log.Printf("Etcd: CtxError: Locking...")
}
obj.rLock.Lock()
// TODO: should this really be nested inside the other lock?
obj.cancelLock.Lock()
// we need to cancel any WIP connections like Txn()'s and so on
// we run the cancel()'s that are stored up so they don't block
log.Printf("Etcd: CtxError: Cancelling %d operations...", len(obj.cancels))
for _, cancelFunc := range obj.cancels {
cancelFunc()
}
obj.cancels = []func(){} // reset
obj.cancelLock.Unlock()
log.Printf("Etcd: CtxError: Reconnecting...")
if err := obj.Connect(true); err != nil {
defer obj.rLock.Unlock()
obj.ctxErr = fmt.Errorf("Etcd: Permanent connect error: %v", err)
return ctx, obj.ctxErr
}
if obj.flags.Debug {
log.Printf("Etcd: CtxError: Unlocking...")
}
obj.rLock.Unlock()
log.Printf("Etcd: CtxError: Reconnected!")
return ctx, nil
}
// FIXME: we might be one of the members in a two member cluster that
// had the other member crash.. hmmm bork?!
if isGrpc(context.DeadlineExceeded) {
log.Printf("Etcd: CtxError: DeadlineExceeded(%T): %+v", err, err) // TODO
}
if err == rpctypes.ErrDuplicateKey {
log.Fatalf("Etcd: CtxError: Programming error: %+v", err)
}
// if you hit this code path here, please report the unmatched error!
log.Printf("Etcd: CtxError: Unknown error(%T): %+v", err, err)
time.Sleep(1 * time.Second)
obj.ctxErr = fmt.Errorf("Etcd: CtxError: Unknown error!")
return ctx, obj.ctxErr
}
// CbLoop is the loop where callback execution is serialized
func (obj *EmbdEtcd) CbLoop() {
cuid := obj.converger.Register()
cuid.SetName("Etcd: CbLoop")
defer cuid.Unregister()
if e := obj.Connect(false); e != nil {
return // fatal
}
// we use this timer because when we ignore un-converge events and loop,
// we reset the ConvergedTimer case statement, ruining the timeout math!
cuid.StartTimer()
for {
ctx := context.Background() // TODO: inherit as input argument?
select {
// etcd watcher event
case re := <-obj.wevents:
if !re.skipConv { // if we want to count it...
cuid.ResetTimer() // activity!
}
if obj.flags.Trace {
log.Printf("Trace: Etcd: CbLoop: Event: StartLoop")
}
for {
if obj.exiting { // the exit signal has been sent!
//re.resp.NACK() // nope!
break
}
if obj.flags.Trace {
log.Printf("Trace: Etcd: CbLoop: rawCallback()")
}
err := rawCallback(ctx, re)
if obj.flags.Trace {
log.Printf("Trace: Etcd: CbLoop: rawCallback(): %v", err)
}
if err == nil {
//re.resp.ACK() // success
break
}
re.retries++ // increment error retry count
if ctx, err = obj.CtxError(ctx, err); err != nil {
break // TODO: it's bad, break or return?
}
}
if obj.flags.Trace {
log.Printf("Trace: Etcd: CbLoop: Event: FinishLoop")
}
// exit loop commit
case <-obj.exitTimeout:
log.Println("Etcd: Exiting callback loop!")
cuid.StopTimer() // clean up nicely
return
}
}
}
// Loop is the main loop where everything is serialized
func (obj *EmbdEtcd) Loop() {
cuid := obj.converger.Register()
cuid.SetName("Etcd: Loop")
defer cuid.Unregister()
if e := obj.Connect(false); e != nil {
return // fatal
}
cuid.StartTimer()
for {
ctx := context.Background() // TODO: inherit as input argument?
// priority channel...
select {
case aw := <-obj.awq:
cuid.ResetTimer() // activity!
if obj.flags.Trace {
log.Printf("Trace: Etcd: Loop: PriorityAW: StartLoop")
}
obj.loopProcessAW(ctx, aw)
if obj.flags.Trace {
log.Printf("Trace: Etcd: Loop: PriorityAW: FinishLoop")
}
continue // loop to drain the priority channel first!
default:
// passthrough to normal channel
}
select {
// add watcher
case aw := <-obj.awq:
cuid.ResetTimer() // activity!
if obj.flags.Trace {
log.Printf("Trace: Etcd: Loop: AW: StartLoop")
}
obj.loopProcessAW(ctx, aw)
if obj.flags.Trace {
log.Printf("Trace: Etcd: Loop: AW: FinishLoop")
}
// set kv pair
case kv := <-obj.setq:
cuid.ResetTimer() // activity!
if obj.flags.Trace {
log.Printf("Trace: Etcd: Loop: Set: StartLoop")
}
for {
if obj.exiting { // the exit signal has been sent!
kv.resp.NACK() // nope!
break
}
err := obj.rawSet(ctx, kv)
if err == nil {
kv.resp.ACK() // success
break
}
if ctx, err = obj.CtxError(ctx, err); err != nil { // try to reconnect, etc...
break // TODO: it's bad, break or return?
}
}
if obj.flags.Trace {
log.Printf("Trace: Etcd: Loop: Set: FinishLoop")
}
// get value
case gq := <-obj.getq:
if !gq.skipConv {
cuid.ResetTimer() // activity!
}
if obj.flags.Trace {
log.Printf("Trace: Etcd: Loop: Get: StartLoop")
}
for {
if obj.exiting { // the exit signal has been sent!
gq.resp.NACK() // nope!
break
}
data, err := obj.rawGet(ctx, gq)
if err == nil {
gq.data = data // update struct
gq.resp.ACK() // success
break
}
if ctx, err = obj.CtxError(ctx, err); err != nil {
break // TODO: it's bad, break or return?
}
}
if obj.flags.Trace {
log.Printf("Trace: Etcd: Loop: Get: FinishLoop")
}
// delete value
case dl := <-obj.delq:
cuid.ResetTimer() // activity!
if obj.flags.Trace {
log.Printf("Trace: Etcd: Loop: Delete: StartLoop")
}
for {
if obj.exiting { // the exit signal has been sent!
dl.resp.NACK() // nope!
break
}
data, err := obj.rawDelete(ctx, dl)
if err == nil {
dl.data = data // update struct
dl.resp.ACK() // success
break
}
if ctx, err = obj.CtxError(ctx, err); err != nil {
break // TODO: it's bad, break or return?
}
}
if obj.flags.Trace {
log.Printf("Trace: Etcd: Loop: Delete: FinishLoop")
}
// run txn
case tn := <-obj.txnq:
cuid.ResetTimer() // activity!
if obj.flags.Trace {
log.Printf("Trace: Etcd: Loop: Txn: StartLoop")
}
for {
if obj.exiting { // the exit signal has been sent!
tn.resp.NACK() // nope!
break
}
data, err := obj.rawTxn(ctx, tn)
if err == nil {
tn.data = data // update struct
tn.resp.ACK() // success
break
}
if ctx, err = obj.CtxError(ctx, err); err != nil {
break // TODO: it's bad, break or return?
}
}
if obj.flags.Trace {
log.Printf("Trace: Etcd: Loop: Txn: FinishLoop")
}
// exit loop signal
case <-obj.exitchan:
log.Println("Etcd: Exiting loop shortly...")
// activate exitTimeout switch which only opens after N
// seconds of inactivity in this select switch, which
// lets everything get bled dry to avoid blocking calls
// which would otherwise block us from exiting cleanly!
obj.exitTimeout = util.TimeAfterOrBlock(exitDelay)
// exit loop commit
case <-obj.exitTimeout:
log.Println("Etcd: Exiting loop!")
cuid.StopTimer() // clean up nicely
return
}
}
}
// loopProcessAW is a helper function to facilitate creating priority channels!
func (obj *EmbdEtcd) loopProcessAW(ctx context.Context, aw *AW) {
for {
if obj.exiting { // the exit signal has been sent!
aw.resp.NACK() // nope!
return
}
// cancelFunc is our data payload
cancelFunc, err := obj.rawAddWatcher(ctx, aw)
if err == nil {
aw.cancelFunc = cancelFunc // update struct
aw.resp.ACK() // success
return
}
if ctx, err = obj.CtxError(ctx, err); err != nil {
return // TODO: do something else ?
}
}
}
// Set queues up a set operation to occur using our mainloop
func (obj *EmbdEtcd) Set(key, value string, opts ...etcd.OpOption) error {
resp := event.NewResp()
obj.setq <- &KV{key: key, value: value, opts: opts, resp: resp}
if err := resp.Wait(); err != nil { // wait for ack/nack
return fmt.Errorf("Etcd: Set: Probably received an exit: %v", err)
}
return nil
}
// rawSet actually implements the key set operation
func (obj *EmbdEtcd) rawSet(ctx context.Context, kv *KV) error {
if obj.flags.Trace {
log.Printf("Trace: Etcd: rawSet()")
}
// key is the full key path
// TODO: should this be : obj.client.KV.Put or obj.client.Put ?
obj.rLock.RLock() // these read locks need to wrap any use of obj.client
response, err := obj.client.KV.Put(ctx, kv.key, kv.value, kv.opts...)
obj.rLock.RUnlock()
log.Printf("Etcd: Set(%s): %v", kv.key, response) // w00t... bonus
if obj.flags.Trace {
log.Printf("Trace: Etcd: rawSet(): %v", err)
}
return err
}
// Get performs a get operation and waits for an ACK to continue
func (obj *EmbdEtcd) Get(path string, opts ...etcd.OpOption) (map[string]string, error) {
return obj.ComplexGet(path, false, opts...)
}
// ComplexGet performs a get operation and waits for an ACK to continue. It can
// accept more arguments that are useful for the less common operations.
// TODO: perhaps a get should never cause an un-converge ?
func (obj *EmbdEtcd) ComplexGet(path string, skipConv bool, opts ...etcd.OpOption) (map[string]string, error) {
resp := event.NewResp()
gq := &GQ{path: path, skipConv: skipConv, opts: opts, resp: resp, data: nil}
obj.getq <- gq // send
if err := resp.Wait(); err != nil { // wait for ack/nack
return nil, fmt.Errorf("Etcd: Get: Probably received an exit: %v", err)
}
return gq.data, nil
}
func (obj *EmbdEtcd) rawGet(ctx context.Context, gq *GQ) (result map[string]string, err error) {
if obj.flags.Trace {
log.Printf("Trace: Etcd: rawGet()")
}
obj.rLock.RLock()
response, err := obj.client.KV.Get(ctx, gq.path, gq.opts...)
obj.rLock.RUnlock()
if err != nil || response == nil {
return nil, err
}
// TODO: write a response.ToMap() function on https://godoc.org/github.com/coreos/etcd/etcdserver/etcdserverpb#RangeResponse
result = make(map[string]string)
for _, x := range response.Kvs {
result[bytes.NewBuffer(x.Key).String()] = bytes.NewBuffer(x.Value).String()
}
if obj.flags.Trace {
log.Printf("Trace: Etcd: rawGet(): %v", result)
}