-
Notifications
You must be signed in to change notification settings - Fork 3
/
gradual.go
98 lines (79 loc) · 2.11 KB
/
gradual.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package survey
import (
"context"
"time"
"github.com/jpillora/backoff"
"github.com/libp2p/go-libp2p/core/discovery"
"github.com/libp2p/go-libp2p/core/peer"
)
// GradualSurveyor queries a progressively larger subset of the
// cluster keyspace. It is recommended for bootstrapping large
// clusters where multicast storms may degrade the service.
//
// See specs/survey.md for details.
type GradualSurveyor struct {
// Factor controls how long the GradualSurveyor will wait for
// peers at each distance. The raw wait time is calculated by
// evaluating 'distance * factor', before applying min, max &
// jitter. Defaults to 1.5.
Factor float64
// Min and Max wait durations. Defaults to 5s and 90s.
Min, Max time.Duration
// If true, use the absolute wait durations calculated via min,
// max and factor. By default, the final wait time is sampled
// uniformly from the interval (w/2, w), where 'w' is the wait
// duration.
DisableJitter bool
*Surveyor
}
func (g GradualSurveyor) FindPeers(ctx context.Context, ns string, opt ...discovery.Option) (<-chan peer.AddrInfo, error) {
ctxSurv, cancel := context.WithCancel(ctx)
found, err := g.Surveyor.FindPeers(ctxSurv, ns, append(opt, WithDistance(uint8(0)))...)
if err != nil {
cancel()
return nil, err
}
out := make(chan peer.AddrInfo, 1)
go func() {
defer close(out)
defer cancel()
b := g.backoff()
for {
select {
case <-time.After(b.Duration()):
cancel()
ctxSurv, cancel = context.WithCancel(ctx)
found, err = g.Surveyor.FindPeers(ctxSurv, ns, append(opt, WithDistance(uint8(b.Attempt())))...)
if err != nil {
g.sock.Log().WithError(err).Debug("retry failed")
}
case info := <-found:
select {
case out <- info:
default:
}
case <-ctx.Done():
return
}
}
}()
return out, nil
}
func (g GradualSurveyor) backoff() backoff.Backoff {
var b = backoff.Backoff{
Factor: g.Factor,
Min: g.Max,
Max: g.Max,
Jitter: !g.DisableJitter,
}
if b.Factor == 0 {
b.Factor = 1.5
}
if b.Min == 0 {
b.Min = time.Second * 5
}
if b.Max == 0 {
b.Max = time.Second * 90
}
return b
}