This repository has been archived by the owner on Feb 21, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 230
/
gossip.go
639 lines (564 loc) · 18.6 KB
/
gossip.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
// Copyright 2017 Pilosa Corp.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package gossip
import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"os"
"strconv"
"strings"
"sync"
"time"
"github.com/hashicorp/memberlist"
"github.com/pilosa/pilosa/v2"
"github.com/pilosa/pilosa/v2/logger"
"github.com/pilosa/pilosa/v2/roaring"
"github.com/pilosa/pilosa/v2/toml"
"github.com/pkg/errors"
)
// Ensure GossipMemberSet implements interfaces.
var _ memberlist.Delegate = &memberSet{}
// memberSet represents a gossip implementation of MemberSet using memberlist.
type memberSet struct {
mu sync.RWMutex
memberlist *memberlist.Memberlist
broadcasts *memberlist.TransmitLimitedQueue
papi *pilosa.API
config *config
Logger logger.Logger
// stdLogger is only used when passed into memberlist library things that take a std library logger rather than an interface.
stdLogger *log.Logger
// logOutput is similar to stdLogger in that it's passed to memberlist things which can't take a pilosa Logger.
logOutput io.Writer
transport *Transport
eventReceiver *eventReceiver
}
// Open implements the MemberSet interface to start network activity.
func (g *memberSet) Open() (err error) {
g.mu.Lock()
g.memberlist, err = memberlist.Create(g.config.memberlistConfig)
g.mu.Unlock()
if err != nil {
return errors.Wrap(err, "creating memberlist")
}
g.broadcasts = &memberlist.TransmitLimitedQueue{
NumNodes: func() int {
g.mu.RLock()
defer g.mu.RUnlock()
return g.memberlist.NumMembers()
},
RetransmitMult: 3,
}
var uris = make([]*pilosa.URI, len(g.config.gossipSeeds))
for i, addr := range g.config.gossipSeeds {
uris[i], err = pilosa.NewURIFromAddress(addr)
if err != nil {
return fmt.Errorf("new uri from address: %s", err)
}
}
var nodes = make([]*pilosa.Node, len(uris))
for i, uri := range uris {
nodes[i] = &pilosa.Node{URI: *uri}
}
g.mu.RLock()
err = g.joinWithRetry(pilosa.URIs(pilosa.Nodes(nodes).URIs()).HostPortStrings())
g.mu.RUnlock()
if err != nil {
return errors.Wrap(err, "joinWithRetry")
}
return nil
}
// Close attempts to gracefully leave the cluster, and finally calls shutdown
// after (at most) a timeout period.
func (g *memberSet) Close() error {
g.eventReceiver.Close()
leaveErr := g.memberlist.Leave(5 * time.Second)
shutdownErr := g.memberlist.Shutdown()
if leaveErr != nil || shutdownErr != nil {
return fmt.Errorf("leaving: '%v', shutting down: '%v'", leaveErr, shutdownErr)
}
return nil
}
// joinWithRetry wraps the standard memberlist Join function in a retry.
func (g *memberSet) joinWithRetry(hosts []string) error {
err := retry(60, 2*time.Second, func() error {
_, err := g.memberlist.Join(hosts)
return err
})
return err
}
// retry periodically retries function fn a specified number of attempts.
func retry(attempts int, sleep time.Duration, fn func() error) (err error) { // nolint: unparam
for i := 0; ; i++ {
err = fn()
if err == nil {
return
}
if i >= (attempts - 1) {
break
}
time.Sleep(sleep)
log.Println("retrying after error:", err)
}
return fmt.Errorf("after %d attempts, last error: %s", attempts, err)
}
////////////////////////////////////////////////////////////////
type config struct {
gossipSeeds []string
memberlistConfig *memberlist.Config
}
// memberSetOption describes a functional option for GossipMemberSet.
type memberSetOption func(*memberSet) error
// WithTransport is a functional option for providing a transport to NewMemberSet.
func WithTransport(transport *Transport) memberSetOption {
return func(g *memberSet) error {
g.transport = transport
return nil
}
}
// WithLogger is a functional option for providing a Go logger to NewMemberSet.
// If the memberSet's transport is nil, this logger will be used when creating
// one. If WithLogOutput is not used, this logger will be passed to memberlist
// for it to use internally. This logger is not used for logging by code in this
// (gossip) package - for that, use the WithPilosaLogger option.
func WithLogger(logger *log.Logger) memberSetOption {
return func(g *memberSet) error {
g.stdLogger = logger
return nil
}
}
// WithLogOutput allows one to pass a Writer which will in turn be passed to
// memberlist for use in logging.
func WithLogOutput(o io.Writer) memberSetOption {
return func(g *memberSet) error {
g.logOutput = o
return nil
}
}
// WithPilosaLogger allows one to configure a memberSet with a logger of their
// choice which satisfies the pilosa logger interface.
func WithPilosaLogger(l logger.Logger) memberSetOption {
return func(g *memberSet) error {
g.Logger = l
return nil
}
}
// NewMemberSet returns a new instance of GossipMemberSet based on options. The
// logging options which can be passed to NewMemberSet are complicated for
// historical reasons - please pass WithPilosaLogger, and either WithLogOutput
// or WithLogger. If you pass WithLogOutput, be sure to also pass in a Transport
// using WithTransport.
func NewMemberSet(cfg Config, api *pilosa.API, options ...memberSetOption) (*memberSet, error) {
host := api.Node().URI.Host
g := &memberSet{
papi: api,
Logger: logger.NopLogger,
}
// options
for _, opt := range options {
if err := opt(g); err != nil {
return nil, errors.Wrap(err, "executing option")
}
}
ger := newEventReceiver(g.Logger, api)
g.eventReceiver = ger
if g.transport == nil {
port, err := strconv.Atoi(cfg.Port)
if err != nil {
return nil, fmt.Errorf("convert port: %s", err)
}
if g.stdLogger == nil {
if g.logOutput != nil {
g.stdLogger = logger.NewStandardLogger(g.logOutput).Logger()
} else {
g.stdLogger = log.New(os.Stderr, "", log.LstdFlags)
}
}
// Set up the transport.
transport, err := NewTransport(host, port, g.stdLogger)
if err != nil {
return nil, fmt.Errorf("new tranport: %s", err)
}
g.transport = transport
}
port := g.transport.net.GetAutoBindPort()
var gossipKey []byte
var err error
if cfg.Key != "" {
gossipKey, err = ioutil.ReadFile(cfg.Key)
if err != nil {
return nil, fmt.Errorf("reading gossip key: %s", err)
}
}
////////////////////
// memberlist config
conf := memberlist.DefaultWANConfig()
conf.Transport = g.transport.net
conf.Name = api.Node().ID
conf.BindAddr = api.Node().URI.Host
conf.BindPort = port
// AdvertisePort
if cfg.AdvertisePort != "" {
if p, err := strconv.Atoi(cfg.Port); err != nil {
return nil, fmt.Errorf("convert advertise port: %s", err)
} else {
conf.AdvertisePort = p
}
} else {
conf.AdvertisePort = port
}
// AdvertiseHost
if cfg.AdvertiseHost != "" {
conf.AdvertiseAddr = cfg.AdvertiseHost
} else {
conf.AdvertiseAddr = hostToIP(api.Node().URI.Host)
}
//
conf.TCPTimeout = time.Duration(cfg.StreamTimeout)
conf.SuspicionMult = cfg.SuspicionMult
conf.PushPullInterval = time.Duration(cfg.PushPullInterval)
conf.ProbeTimeout = time.Duration(cfg.ProbeTimeout)
conf.ProbeInterval = time.Duration(cfg.ProbeInterval)
conf.GossipNodes = cfg.Nodes
conf.GossipInterval = time.Duration(cfg.Interval)
conf.GossipToTheDeadTime = time.Duration(cfg.ToTheDeadTime)
//
conf.Delegate = g
conf.SecretKey = gossipKey
conf.Events = ger
if g.logOutput != nil {
conf.LogOutput = g.logOutput
} else {
conf.Logger = g.stdLogger
}
g.config = &config{
memberlistConfig: conf,
gossipSeeds: cfg.Seeds,
}
return g, nil
}
// NodeMeta implementation of the memberlist.Delegate interface.
func (g *memberSet) NodeMeta(limit int) []byte {
buf, err := g.papi.Serializer.Marshal(g.papi.Node())
if err != nil {
g.Logger.Printf("marshal message error: %s", err)
return []byte{}
}
return buf
}
// NotifyMsg implementation of the memberlist.Delegate interface
// called when a user-data message is received.
func (g *memberSet) NotifyMsg(b []byte) {
err := g.papi.ClusterMessage(context.Background(), bytes.NewBuffer(b))
if err != nil {
g.Logger.Printf("cluster message error: %s", err)
}
}
// GetBroadcasts implementation of the memberlist.Delegate interface
// called when user data messages can be broadcast.
func (g *memberSet) GetBroadcasts(overhead, limit int) [][]byte {
return g.broadcasts.GetBroadcasts(overhead, limit)
}
// LocalState implementation of the memberlist.Delegate interface
// sends this Node's state data.
func (g *memberSet) LocalState(join bool) []byte {
m := &pilosa.NodeStatus{
Node: g.papi.Node(),
Schema: &pilosa.Schema{Indexes: g.papi.Schema(context.Background())},
}
for _, idx := range m.Schema.Indexes {
is := &pilosa.IndexStatus{Name: idx.Name, CreatedAt: idx.CreatedAt}
for _, f := range idx.Fields {
availableShards := roaring.NewBitmap()
if field, _ := g.papi.Field(context.Background(), idx.Name, f.Name); field != nil {
availableShards = field.AvailableShards(false)
}
fs := &pilosa.FieldStatus{
Name: f.Name,
CreatedAt: f.CreatedAt,
AvailableShards: availableShards,
}
is.Fields = append(is.Fields, fs)
}
m.Indexes = append(m.Indexes, is)
}
// Marshal nodestate data to bytes.
buf, err := pilosa.MarshalInternalMessage(m, g.papi.Serializer)
if err != nil {
g.Logger.Printf("error marshalling nodestate data, err=%s", err)
return []byte{}
}
return buf
}
// MergeRemoteState implementation of the memberlist.Delegate interface
// receive and process the remote side's LocalState.
func (g *memberSet) MergeRemoteState(buf []byte, join bool) {
err := g.papi.ClusterMessage(context.Background(), bytes.NewBuffer(buf))
if err != nil {
g.Logger.Printf("merge state error: %s", err)
}
}
// eventReceiver is used to enable an application to receive
// events about joins and leaves over a channel.
//
// Care must be taken that events are processed in a timely manner from
// the channel, since this delegate will block until an event can be sent.
type eventReceiver struct {
ch chan memberlist.NodeEvent
closed chan struct{}
papi *pilosa.API
logger logger.Logger
}
// newEventReceiver returns a new instance of GossipEventReceiver.
func newEventReceiver(logger logger.Logger, papi *pilosa.API) *eventReceiver {
ger := &eventReceiver{
ch: make(chan memberlist.NodeEvent, 1),
closed: make(chan struct{}),
logger: logger,
papi: papi,
}
go ger.listen()
return ger
}
func (g *eventReceiver) NotifyJoin(n *memberlist.Node) {
// copy node to avoid data race
n2 := *n
n2.Meta = make([]byte, len(n.Meta))
copy(n2.Meta, n.Meta)
select {
case g.ch <- memberlist.NodeEvent{Event: memberlist.NodeJoin, Node: &n2}:
case <-g.closed:
}
}
func (g *eventReceiver) NotifyLeave(n *memberlist.Node) {
// copy node to avoid data race
n2 := *n
n2.Meta = make([]byte, len(n.Meta))
copy(n2.Meta, n.Meta)
select {
case g.ch <- memberlist.NodeEvent{Event: memberlist.NodeLeave, Node: &n2}:
case <-g.closed:
}
}
func (g *eventReceiver) NotifyUpdate(n *memberlist.Node) {
// copy node to avoid data race
n2 := *n
n2.Meta = make([]byte, len(n.Meta))
copy(n2.Meta, n.Meta)
select {
case g.ch <- memberlist.NodeEvent{Event: memberlist.NodeUpdate, Node: &n2}:
case <-g.closed:
}
}
func (g *eventReceiver) Close() {
close(g.closed)
}
func (g *eventReceiver) listen() {
var nodeEventType pilosa.NodeEventType
for {
var e memberlist.NodeEvent
select {
case <-g.closed:
return
case e = <-g.ch:
}
switch e.Event {
case memberlist.NodeJoin:
nodeEventType = pilosa.NodeJoin
case memberlist.NodeLeave:
nodeEventType = pilosa.NodeLeave
case memberlist.NodeUpdate:
nodeEventType = pilosa.NodeUpdate
default:
continue
}
// Get the node from the event.Node meta data.
var n pilosa.Node
if err := g.papi.Serializer.Unmarshal(e.Node.Meta, &n); err != nil {
panic("failed to unmarshal event node meta into node")
}
ne := &pilosa.NodeEvent{
Event: nodeEventType,
Node: &n,
}
buf, err := pilosa.MarshalInternalMessage(ne, g.papi.Serializer)
if err != nil {
panic(err)
}
if err := g.papi.ClusterMessage(context.Background(), bytes.NewBuffer(buf)); err != nil {
g.logger.Printf("receive event error: %s", err)
}
}
}
// Transport is a gossip transport for binding to a port.
type Transport struct {
//memberlist.Transport
net *memberlist.NetTransport
URI *pilosa.URI
}
// NewTransport returns a NetTransport based on the given host and port.
// It will dynamically bind to a port if port is 0.
// This is useful for test cases where specifying a port is not reasonable.
//func NewTransport(host string, port int) (*memberlist.NetTransport, error) {
func NewTransport(host string, port int, logger *log.Logger) (*Transport, error) {
// memberlist config
conf := memberlist.DefaultWANConfig()
conf.BindAddr = host
conf.BindPort = port
conf.AdvertisePort = port
conf.Logger = logger
net, err := newTransport(conf)
if err != nil {
return nil, fmt.Errorf("new transport: %s", err)
}
uri, err := pilosa.NewURIFromHostPort(host, uint16(net.GetAutoBindPort()))
if err != nil {
return nil, fmt.Errorf("new uri from host port: %s", err)
}
return &Transport{
net: net,
URI: uri,
}, nil
}
// newTransport returns a NetTransport based on the memberlist configuration.
// It will dynamically bind to a port if conf.BindPort is 0.
func newTransport(conf *memberlist.Config) (*memberlist.NetTransport, error) {
nc := &memberlist.NetTransportConfig{
BindAddrs: []string{conf.BindAddr},
BindPort: conf.BindPort,
Logger: conf.Logger,
}
// See comment below for details about the retry in here.
makeNetRetry := func(limit int) (*memberlist.NetTransport, error) {
var err error
for try := 0; try < limit; try++ {
var nt *memberlist.NetTransport
if nt, err = memberlist.NewNetTransport(nc); err == nil {
return nt, nil
}
if strings.Contains(err.Error(), "address already in use") {
conf.Logger.Printf("[DEBUG] Got bind error: %v", err)
continue
}
}
return nil, fmt.Errorf("failed to obtain an address: %v", err)
}
// The dynamic bind port operation is inherently racy because
// even though we are using the kernel to find a port for us, we
// are attempting to bind multiple protocols (and potentially
// multiple addresses) with the same port number. We build in a
// few retries here since this often gets transient errors in
// busy unit tests.
limit := 1
if conf.BindPort == 0 {
limit = 10
}
nt, err := makeNetRetry(limit)
if err != nil {
return nil, errors.Wrap(err, "could not set up network transport")
}
return nt, nil
}
// Config holds toml-friendly memberlist configuration.
type Config struct {
// Port indicates the port to which pilosa should bind for internal state sharing.
Port string `toml:"port"`
// AdvertiseHost is the hostname or IP other nodes should use to connect to
// this host. If left blank, the value for Host will be used. This is useful
// in some proxy and NAT scenarios.
AdvertiseHost string `toml:"advertise-host"`
// AdvertisePort is the port other nodes will use to connect to this one.
// Behaves like AdvertiseHost.
AdvertisePort string `toml:"advertise-port"`
Seeds []string `toml:"seeds"`
Key string `toml:"key"`
// StreamTimeout is the timeout for establishing a stream connection with
// a remote node for a full state sync, and for stream read and write
// operations. Maps to memberlist TCPTimeout.
StreamTimeout toml.Duration `toml:"stream-timeout"`
// SuspicionMult is the multiplier for determining the time an
// inaccessible node is considered suspect before declaring it dead.
// The actual timeout is calculated using the formula:
//
// SuspicionTimeout = SuspicionMult * log(N+1) * ProbeInterval
//
// This allows the timeout to scale properly with expected propagation
// delay with a larger cluster size. The higher the multiplier, the longer
// an inaccessible node is considered part of the cluster before declaring
// it dead, giving that suspect node more time to refute if it is indeed
// still alive.
SuspicionMult int `toml:"suspicion-mult"`
// PushPullInterval is the interval between complete state syncs.
// Complete state syncs are done with a single node over TCP and are
// quite expensive relative to standard gossiped messages. Setting this
// to zero will disable state push/pull syncs completely.
//
// Setting this interval lower (more frequent) will increase convergence
// speeds across larger clusters at the expense of increased bandwidth
// usage.
PushPullInterval toml.Duration `toml:"push-pull-interval"`
// ProbeInterval and ProbeTimeout are used to configure probing behavior
// for memberlist.
//
// ProbeInterval is the interval between random node probes. Setting
// this lower (more frequent) will cause the memberlist cluster to detect
// failed nodes more quickly at the expense of increased bandwidth usage.
//
// ProbeTimeout is the timeout to wait for an ack from a probed node
// before assuming it is unhealthy. This should be set to 99-percentile
// of RTT (round-trip time) on your network.
ProbeInterval toml.Duration `toml:"probe-interval"`
ProbeTimeout toml.Duration `toml:"probe-timeout"`
// Interval and Nodes are used to configure the gossip
// behavior of memberlist.
//
// Interval is the interval between sending messages that need
// to be gossiped that haven't been able to piggyback on probing messages.
// If this is set to zero, non-piggyback gossip is disabled. By lowering
// this value (more frequent) gossip messages are propagated across
// the cluster more quickly at the expense of increased bandwidth.
//
// Nodes is the number of random nodes to send gossip messages to
// per Interval. Increasing this number causes the gossip messages
// to propagate across the cluster more quickly at the expense of
// increased bandwidth.
//
// ToTheDeadTime is the interval after which a node has died that
// we will still try to gossip to it. This gives it a chance to refute.
Interval toml.Duration `toml:"interval"`
Nodes int `toml:"nodes"`
ToTheDeadTime toml.Duration `toml:"to-the-dead-time"`
}
// hostToIP converts host to an IP4 address based on net.LookupIP().
func hostToIP(host string) string {
// if host is not an IP addr, check net.LookupIP()
if net.ParseIP(host) == nil {
hosts, err := net.LookupIP(host)
if err != nil {
return host
}
for _, h := range hosts {
// this restricts pilosa to IP4
if h.To4() != nil {
return h.String()
}
}
}
return host
}