generated from ipfs/ipfs-repository-template
-
Notifications
You must be signed in to change notification settings - Fork 95
/
scoreledger.go
353 lines (304 loc) · 9.24 KB
/
scoreledger.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
package decision
import (
"sync"
"time"
"github.com/benbjohnson/clock"
peer "github.com/libp2p/go-libp2p/core/peer"
)
const (
// the alpha for the EWMA used to track short term usefulness
shortTermAlpha = 0.5
// the alpha for the EWMA used to track long term usefulness
longTermAlpha = 0.05
// how frequently the engine should sample usefulness. Peers that
// interact every shortTerm time period are considered "active".
shortTerm = 10 * time.Second
// long term ratio defines what "long term" means in terms of the
// shortTerm duration. Peers that interact once every longTermRatio are
// considered useful over the long term.
longTermRatio = 10
// long/short term scores for tagging peers
longTermScore = 10 // this is a high tag but it grows _very_ slowly.
shortTermScore = 10 // this is a high tag but it'll go away quickly if we aren't using the peer.
)
// Stores the data exchange relationship between two peers.
type scoreledger struct {
// Partner is the remote Peer.
partner peer.ID
// tracks bytes sent...
bytesSent uint64
// ...and received.
bytesRecv uint64
// lastExchange is the time of the last data exchange.
lastExchange time.Time
// These scores keep track of how useful we think this peer is. Short
// tracks short-term usefulness and long tracks long-term usefulness.
shortScore, longScore float64
// Score keeps track of the score used in the peer tagger. We track it
// here to avoid unnecessarily updating the tags in the connection manager.
score int
// exchangeCount is the number of exchanges with this peer
exchangeCount uint64
// the record lock
lock sync.RWMutex
clock clock.Clock
}
// Receipt is a summary of the ledger for a given peer
// collecting various pieces of aggregated data for external
// reporting purposes.
type Receipt struct {
Peer string
Value float64
Sent uint64
Recv uint64
Exchanged uint64
}
// Increments the sent counter.
func (l *scoreledger) AddToSentBytes(n int) {
l.lock.Lock()
defer l.lock.Unlock()
l.exchangeCount++
l.lastExchange = l.clock.Now()
l.bytesSent += uint64(n)
}
// Increments the received counter.
func (l *scoreledger) AddToReceivedBytes(n int) {
l.lock.Lock()
defer l.lock.Unlock()
l.exchangeCount++
l.lastExchange = l.clock.Now()
l.bytesRecv += uint64(n)
}
// Returns the Receipt for this ledger record.
func (l *scoreledger) Receipt() *Receipt {
l.lock.RLock()
defer l.lock.RUnlock()
return &Receipt{
Peer: l.partner.String(),
Value: float64(l.bytesSent) / float64(l.bytesRecv+1),
Sent: l.bytesSent,
Recv: l.bytesRecv,
Exchanged: l.exchangeCount,
}
}
// DefaultScoreLedger is used by Engine as the default ScoreLedger.
type DefaultScoreLedger struct {
// the score func
scorePeer ScorePeerFunc
// is closed on Close
closing chan struct{}
// protects the fields immediatly below
lock sync.RWMutex
// ledgerMap lists score ledgers by their partner key.
ledgerMap map[peer.ID]*scoreledger
// how frequently the engine should sample peer usefulness
peerSampleInterval time.Duration
// used by the tests to detect when a sample is taken
sampleCh chan struct{}
clock clock.Clock
}
// scoreWorker keeps track of how "useful" our peers are, updating scores in the
// connection manager.
//
// It does this by tracking two scores: short-term usefulness and long-term
// usefulness. Short-term usefulness is sampled frequently and highly weights
// new observations. Long-term usefulness is sampled less frequently and highly
// weights on long-term trends.
//
// In practice, we do this by keeping two EWMAs. If we see an interaction
// within the sampling period, we record the score, otherwise, we record a 0.
// The short-term one has a high alpha and is sampled every shortTerm period.
// The long-term one has a low alpha and is sampled every
// longTermRatio*shortTerm period.
//
// To calculate the final score, we sum the short-term and long-term scores then
// adjust it ±25% based on our debt ratio. Peers that have historically been
// more useful to us than we are to them get the highest score.
func (dsl *DefaultScoreLedger) scoreWorker() {
ticker := dsl.clock.Ticker(dsl.peerSampleInterval)
defer ticker.Stop()
type update struct {
peer peer.ID
score int
}
var (
lastShortUpdate, lastLongUpdate time.Time
updates []update
)
for i := 0; ; i = (i + 1) % longTermRatio {
var now time.Time
select {
case now = <-ticker.C:
case <-dsl.closing:
return
}
// The long term update ticks every `longTermRatio` short
// intervals.
updateLong := i == 0
dsl.lock.Lock()
for _, l := range dsl.ledgerMap {
l.lock.Lock()
// Update the short-term score.
if l.lastExchange.After(lastShortUpdate) {
l.shortScore = ewma(l.shortScore, shortTermScore, shortTermAlpha)
} else {
l.shortScore = ewma(l.shortScore, 0, shortTermAlpha)
}
// Update the long-term score.
if updateLong {
if l.lastExchange.After(lastLongUpdate) {
l.longScore = ewma(l.longScore, longTermScore, longTermAlpha)
} else {
l.longScore = ewma(l.longScore, 0, longTermAlpha)
}
}
// Calculate the new score.
//
// The accounting score adjustment prefers peers _we_
// need over peers that need us. This doesn't help with
// leeching.
var lscore float64
if l.bytesRecv == 0 {
lscore = 0
} else {
lscore = float64(l.bytesRecv) / float64(l.bytesRecv+l.bytesSent)
}
score := int((l.shortScore + l.longScore) * (lscore*.5 + .75))
// Avoid updating the connection manager unless there's a change. This can be expensive.
if l.score != score {
// put these in a list so we can perform the updates outside _global_ the lock.
updates = append(updates, update{l.partner, score})
l.score = score
}
l.lock.Unlock()
}
dsl.lock.Unlock()
// record the times.
lastShortUpdate = now
if updateLong {
lastLongUpdate = now
}
// apply the updates
for _, update := range updates {
dsl.scorePeer(update.peer, update.score)
}
// Keep the memory. It's not much and it saves us from having to allocate.
updates = updates[:0]
// Used by the tests
if dsl.sampleCh != nil {
dsl.sampleCh <- struct{}{}
}
}
}
// Returns the score ledger for the given peer or nil if that peer
// is not on the ledger.
func (dsl *DefaultScoreLedger) find(p peer.ID) *scoreledger {
// Take a read lock (as it's less expensive) to check if we have
// a ledger for the peer.
dsl.lock.RLock()
l, ok := dsl.ledgerMap[p]
dsl.lock.RUnlock()
if ok {
return l
}
return nil
}
// Returns a new scoreledger.
func newScoreLedger(p peer.ID, clock clock.Clock) *scoreledger {
return &scoreledger{
partner: p,
clock: clock,
}
}
// Lazily instantiates a ledger.
func (dsl *DefaultScoreLedger) findOrCreate(p peer.ID) *scoreledger {
l := dsl.find(p)
if l != nil {
return l
}
// There's no ledger, so take a write lock, then check again and
// create the ledger if necessary.
dsl.lock.Lock()
defer dsl.lock.Unlock()
l, ok := dsl.ledgerMap[p]
if !ok {
l = newScoreLedger(p, dsl.clock)
dsl.ledgerMap[p] = l
}
return l
}
// GetReceipt returns aggregated data communication with a given peer.
func (dsl *DefaultScoreLedger) GetReceipt(p peer.ID) *Receipt {
l := dsl.find(p)
if l != nil {
return l.Receipt()
}
// Return a blank receipt otherwise.
return &Receipt{
Peer: p.String(),
Value: 0,
Sent: 0,
Recv: 0,
Exchanged: 0,
}
}
// Starts the default ledger sampling process.
func (dsl *DefaultScoreLedger) Start(scorePeer ScorePeerFunc) {
dsl.init(scorePeer)
go dsl.scoreWorker()
}
// Stops the sampling process.
func (dsl *DefaultScoreLedger) Stop() {
close(dsl.closing)
}
// Initializes the score ledger.
func (dsl *DefaultScoreLedger) init(scorePeer ScorePeerFunc) {
dsl.lock.Lock()
defer dsl.lock.Unlock()
dsl.scorePeer = scorePeer
}
// Increments the sent counter for the given peer.
func (dsl *DefaultScoreLedger) AddToSentBytes(p peer.ID, n int) {
l := dsl.findOrCreate(p)
l.AddToSentBytes(n)
}
// Increments the received counter for the given peer.
func (dsl *DefaultScoreLedger) AddToReceivedBytes(p peer.ID, n int) {
l := dsl.findOrCreate(p)
l.AddToReceivedBytes(n)
}
// PeerConnected should be called when a new peer connects, meaning
// we should open accounting.
func (dsl *DefaultScoreLedger) PeerConnected(p peer.ID) {
dsl.lock.Lock()
defer dsl.lock.Unlock()
_, ok := dsl.ledgerMap[p]
if !ok {
dsl.ledgerMap[p] = newScoreLedger(p, dsl.clock)
}
}
// PeerDisconnected should be called when a peer disconnects to
// clean up the accounting.
func (dsl *DefaultScoreLedger) PeerDisconnected(p peer.ID) {
dsl.lock.Lock()
defer dsl.lock.Unlock()
delete(dsl.ledgerMap, p)
}
// Creates a new instance of the default score ledger.
func NewDefaultScoreLedger() *DefaultScoreLedger {
return &DefaultScoreLedger{
ledgerMap: make(map[peer.ID]*scoreledger),
closing: make(chan struct{}),
peerSampleInterval: shortTerm,
clock: clock.New(),
}
}
// Creates a new instance of the default score ledger with testing
// parameters.
func NewTestScoreLedger(peerSampleInterval time.Duration, sampleCh chan struct{}, clock clock.Clock) *DefaultScoreLedger {
dsl := NewDefaultScoreLedger()
dsl.peerSampleInterval = peerSampleInterval
dsl.sampleCh = sampleCh
dsl.clock = clock
return dsl
}