-
Notifications
You must be signed in to change notification settings - Fork 211
/
peers.go
229 lines (207 loc) · 5.34 KB
/
peers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
package peers
import (
"strings"
"sync"
"time"
"github.com/libp2p/go-libp2p/core/peer"
"go.uber.org/zap/zapcore"
"github.com/spacemeshos/go-spacemesh/p2p"
)
type data struct {
id peer.ID
success, failures int
failRate float64
averageLatency float64
}
func (d *data) latency(global float64) float64 {
switch {
case d.success+d.failures == 0:
return 0.9 * global // to prioritize trying out new peer
default:
return d.averageLatency + d.failRate*global
}
}
func (p *data) less(other *data, global float64) bool {
peerLatency := p.latency(global)
otherLatency := other.latency(global)
if peerLatency < otherLatency {
return true
} else if peerLatency > otherLatency {
return false
}
return strings.Compare(string(p.id), string(other.id)) == -1
}
func New() *Peers {
return &Peers{
peers: map[peer.ID]*data{},
}
}
type Peers struct {
mu sync.Mutex
peers map[peer.ID]*data
// globalLatency is the average latency of all successful responses from peers.
// It is used as a reference value for new peers.
// And to adjust average peer latency based on failure rate.
globalLatency float64
}
func (p *Peers) Add(id peer.ID) bool {
p.mu.Lock()
defer p.mu.Unlock()
_, exist := p.peers[id]
if exist {
return false
}
p.peers[id] = &data{id: id}
return true
}
func (p *Peers) Delete(id peer.ID) {
p.mu.Lock()
defer p.mu.Unlock()
delete(p.peers, id)
}
// OnLatency updates average peer and global latency.
func (p *Peers) onLatency(id peer.ID, size int, latency time.Duration, failed bool) {
// We assume that latency is proportional to the size of the message
// and define it as a duration to transmit 1kiB.
// To account for the additional overhead of transmitting small messages,
// we treat them as if they were 1kiB.
latency = latency / time.Duration(max(size/1024, 1))
p.mu.Lock()
defer p.mu.Unlock()
peer, exist := p.peers[id]
if !exist {
return
}
if failed {
peer.failures++
} else {
peer.success++
}
peer.failRate = float64(peer.failures) / float64(peer.success+peer.failures)
if peer.averageLatency != 0 {
delta := (float64(latency) - float64(peer.averageLatency)) / 10 // 86% of the value is the last 19
peer.averageLatency += delta
} else {
peer.averageLatency = float64(latency)
}
if p.globalLatency != 0 {
delta := (float64(latency) - float64(p.globalLatency)) / 25 // 86% of the value is the last 49
p.globalLatency += delta
} else {
p.globalLatency = float64(latency)
}
}
func (p *Peers) OnFailure(id peer.ID, size int, latency time.Duration) {
p.onLatency(id, size, latency, true)
}
// OnLatency updates average peer and global latency.
func (p *Peers) OnLatency(id peer.ID, size int, latency time.Duration) {
p.onLatency(id, size, latency, false)
}
// SelectBest peer with preferences.
func (p *Peers) SelectBestFrom(peers []peer.ID) peer.ID {
p.mu.Lock()
defer p.mu.Unlock()
var best *data
for _, peer := range peers {
pdata, exist := p.peers[peer]
if !exist {
continue
}
if best == nil {
best = pdata
} else if pdata.less(best, p.globalLatency) {
best = pdata
}
}
if best != nil {
return best.id
}
return p2p.NoPeer
}
// SelectBest selects at most n peers sorted by responsiveness and latency.
//
// SelectBest parametrized by N because sync protocol relies on receiving data from redundant
// connections to guarantee that it will get complete data set.
// If it doesn't get complete data set it will have to fallback into hash resolution, which is
// generally more expensive.
func (p *Peers) SelectBest(n int) []peer.ID {
p.mu.Lock()
defer p.mu.Unlock()
lth := min(len(p.peers), n)
if lth == 0 {
return nil
}
best := make([]*data, 0, lth)
for _, peer := range p.peers {
for i := range best {
if peer.less(best[i], p.globalLatency) {
best[i], peer = peer, best[i]
}
}
if len(best) < cap(best) {
best = append(best, peer)
}
}
rst := make([]peer.ID, len(best))
for i := range rst {
rst[i] = best[i].id
}
return rst
}
func (p *Peers) Total() int {
p.mu.Lock()
defer p.mu.Unlock()
return len(p.peers)
}
func (p *Peers) Stats() Stats {
best := p.SelectBest(5)
p.mu.Lock()
defer p.mu.Unlock()
stats := Stats{
Total: len(p.peers),
GlobalAverageLatency: time.Duration(p.globalLatency),
}
for _, peer := range best {
peerData, exist := p.peers[peer]
if !exist {
continue
}
stats.BestPeers = append(stats.BestPeers, PeerStats{
ID: peerData.id,
Success: peerData.success,
Failures: peerData.failures,
Latency: time.Duration(peerData.averageLatency),
})
}
return stats
}
type Stats struct {
Total int
GlobalAverageLatency time.Duration
BestPeers []PeerStats
}
func (s *Stats) MarshalLogObject(enc zapcore.ObjectEncoder) error {
enc.AddInt("total", s.Total)
enc.AddDuration("global average latency", s.GlobalAverageLatency)
enc.AddArray("best peers", zapcore.ArrayMarshalerFunc(func(arrEnc zapcore.ArrayEncoder) error {
for _, peer := range s.BestPeers {
arrEnc.AppendObject(&peer)
}
return nil
}))
return nil
}
type PeerStats struct {
ID peer.ID
Success int
Failures int
Latency time.Duration
}
func (p *PeerStats) MarshalLogObject(enc zapcore.ObjectEncoder) error {
enc.AddString("id", p.ID.String())
enc.AddInt("success", p.Success)
enc.AddInt("failures", p.Failures)
enc.AddDuration("latency per 1024 bytes", p.Latency)
return nil
}