/
peertracker.go
401 lines (331 loc) · 11 KB
/
peertracker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
package peertracker
import (
"math"
"math/bits"
"sync"
"github.com/benbjohnson/clock"
pq "github.com/ipfs/go-ipfs-pq"
"github.com/ipfs/go-peertaskqueue/peertask"
"github.com/libp2p/go-libp2p/core/peer"
)
var clockInstance = clock.New()
// TaskMerger is an interface that is used to merge new tasks into the active
// and pending queues
type TaskMerger interface {
// HasNewInfo indicates whether the given task has more information than
// the existing group of tasks (which have the same Topic), and thus should
// be merged.
HasNewInfo(task peertask.Task, existing []*peertask.Task) bool
// Merge copies relevant fields from a new task to an existing task.
Merge(task peertask.Task, existing *peertask.Task)
}
// DefaultTaskMerger is the TaskMerger used by default. It never overwrites an
// existing task (with the same Topic).
type DefaultTaskMerger struct{}
func (*DefaultTaskMerger) HasNewInfo(task peertask.Task, existing []*peertask.Task) bool {
return false
}
func (*DefaultTaskMerger) Merge(task peertask.Task, existing *peertask.Task) {
}
// PeerTracker tracks task blocks for a single peer, as well as active tasks
// for that peer
type PeerTracker struct {
target peer.ID
// Tasks that are pending being made active
pendingTasks map[peertask.Topic]*peertask.QueueTask
activelk sync.Mutex
// Tasks that have been made active. Unfortuantely, we can have multiple for the same topic
// as we might get a "supperior" request after starting to handle the initial one.
activeTasks map[peertask.Topic][]*peertask.Task
activeWork int
maxActiveWorkPerPeer int
// for the PQ interface
index int
freezeVal int
queueTaskComparator peertask.QueueTaskComparator
// priority queue of tasks belonging to this peer
taskQueue pq.PQ
taskMerger TaskMerger
}
// Option is a function that configures the peer tracker
type Option func(*PeerTracker)
// WithQueueTaskComparator sets a custom QueueTask comparison function for the
// peer tracker's task queue.
func WithQueueTaskComparator(f peertask.QueueTaskComparator) Option {
return func(pt *PeerTracker) {
pt.queueTaskComparator = f
}
}
// New creates a new PeerTracker
func New(target peer.ID, taskMerger TaskMerger, maxActiveWorkPerPeer int, opts ...Option) *PeerTracker {
pt := &PeerTracker{
target: target,
queueTaskComparator: peertask.PriorityCompare,
pendingTasks: make(map[peertask.Topic]*peertask.QueueTask),
activeTasks: make(map[peertask.Topic][]*peertask.Task),
taskMerger: taskMerger,
maxActiveWorkPerPeer: maxActiveWorkPerPeer,
}
for _, opt := range opts {
opt(pt)
}
pt.taskQueue = pq.New(peertask.WrapCompare(pt.queueTaskComparator))
return pt
}
// PeerComparator is used for peer prioritization.
// It should return true if peer 'a' has higher priority than peer 'b'
type PeerComparator func(a, b *PeerTracker) bool
// DefaultPeerComparator implements the default peer prioritization logic.
func DefaultPeerComparator(pa, pb *PeerTracker) bool {
// having no pending tasks means lowest priority
paPending := len(pa.pendingTasks)
pbPending := len(pb.pendingTasks)
if paPending == 0 {
return false
}
if pbPending == 0 {
return true
}
// Frozen peers have lowest priority
if pa.freezeVal > pb.freezeVal {
return false
}
if pa.freezeVal < pb.freezeVal {
return true
}
// If each peer has an equal amount of work in its active queue, choose the
// peer with the most amount of work pending
if pa.activeWork == pb.activeWork {
return paPending > pbPending
}
// Choose the peer with the least amount of work in its active queue.
// This way we "keep peers busy" by sending them as much data as they can
// process.
return pa.activeWork < pb.activeWork
}
// TaskPriorityPeerComparator prioritizes peers based on their highest priority task.
func TaskPriorityPeerComparator(comparator peertask.QueueTaskComparator) PeerComparator {
return func(pa, pb *PeerTracker) bool {
ta := pa.taskQueue.Peek()
tb := pb.taskQueue.Peek()
if ta == nil {
return false
}
if tb == nil {
return true
}
return comparator(ta.(*peertask.QueueTask), tb.(*peertask.QueueTask))
}
}
// Target returns the peer that this peer tracker tracks tasks for
func (p *PeerTracker) Target() peer.ID {
return p.target
}
// IsIdle returns true if the peer has no active tasks or queued tasks
func (p *PeerTracker) IsIdle() bool {
p.activelk.Lock()
defer p.activelk.Unlock()
return len(p.pendingTasks) == 0 && len(p.activeTasks) == 0
}
// PeerTrackerStats captures number of active and pending tasks for this peer.
type PeerTrackerStats struct {
NumPending int
NumActive int
}
// Stats returns current statistics for this peer.
func (p *PeerTracker) Stats() *PeerTrackerStats {
p.activelk.Lock()
defer p.activelk.Unlock()
return &PeerTrackerStats{NumPending: len(p.pendingTasks), NumActive: len(p.activeTasks)}
}
// PeerTrackerTopics captures the current state of topics in this peers queue
type PeerTrackerTopics struct {
Pending []peertask.Topic
Active []peertask.Topic
}
// Topics gives a full list of current and active topics for this peer
// Stats returns current statistics for this peer.
func (p *PeerTracker) Topics() *PeerTrackerTopics {
p.activelk.Lock()
defer p.activelk.Unlock()
pending := make([]peertask.Topic, 0, len(p.pendingTasks))
for topic := range p.pendingTasks {
pending = append(pending, topic)
}
active := make([]peertask.Topic, 0, len(p.activeTasks))
for topic := range p.activeTasks {
active = append(active, topic)
}
return &PeerTrackerTopics{Pending: pending, Active: active}
}
// Index implements pq.Elem.
func (p *PeerTracker) Index() int {
return p.index
}
// SetIndex implements pq.Elem.
func (p *PeerTracker) SetIndex(i int) {
p.index = i
}
// PushTasks adds a group of tasks onto a peer's queue
func (p *PeerTracker) PushTasks(tasks ...peertask.Task) {
p.PushTasksTruncated(math.MaxUint, tasks...)
}
// PushTasksTruncated is like PushTasks but it will never grow the queue more than n.
// When truncation happen we will keep older tasks in the queue to avoid some infinite
// tasks rotations if we are continously receiving work faster than we process it.
func (p *PeerTracker) PushTasksTruncated(n uint, tasks ...peertask.Task) {
now := clockInstance.Now()
p.activelk.Lock()
defer p.activelk.Unlock()
l := p.taskQueue.Len()
if l < 0 {
panic("negative length")
}
if wouldBe := uint(l + len(tasks)); wouldBe > n {
available, o := bits.Sub(n, uint(l), 0)
if o != 0 {
// happen if you mix Truncated and Untrucated or varies n.
available = 0
}
tasks = tasks[:available]
}
for _, task := range tasks {
// If the new task doesn't add any more information over what we
// already have in the active queue, then we can skip the new task
if !p.taskHasMoreInfoThanActiveTasks(task) {
continue
}
// If there is already a non-active task with this Topic
if existingTask, ok := p.pendingTasks[task.Topic]; ok {
// If the new task has a higher priority than the old task,
if task.Priority > existingTask.Priority {
// Update the priority and the task's position in the queue
existingTask.Priority = task.Priority
p.taskQueue.Update(existingTask.Index())
}
p.taskMerger.Merge(task, &existingTask.Task)
// A task with the Topic exists, so we don't need to add
// the new task to the queue
continue
}
// Push the new task onto the queue
qTask := peertask.NewQueueTask(task, p.target, now)
p.pendingTasks[task.Topic] = qTask
p.taskQueue.Push(qTask)
}
}
// PopTasks pops as many tasks off the queue as necessary to cover
// targetMinWork, in priority order. If there are not enough tasks to cover
// targetMinWork it just returns whatever is in the queue.
// The second response argument is pending work: the amount of work in the
// queue for this peer.
func (p *PeerTracker) PopTasks(targetMinWork int) ([]*peertask.Task, int) {
var out []*peertask.Task
work := 0
for p.taskQueue.Len() > 0 && p.freezeVal == 0 && work < targetMinWork {
if p.maxActiveWorkPerPeer > 0 {
// Do not add work to a peer that is already maxed out
p.activelk.Lock()
activeWork := p.activeWork
p.activelk.Unlock()
if activeWork >= p.maxActiveWorkPerPeer {
break
}
}
// Pop the next task off the queue
t := p.taskQueue.Pop().(*peertask.QueueTask)
// Start the task (this makes it "active")
p.startTask(&t.Task)
out = append(out, &t.Task)
work += t.Work
}
return out, p.getPendingWork()
}
// startTask signals that a task was started for this peer.
func (p *PeerTracker) startTask(task *peertask.Task) {
p.activelk.Lock()
defer p.activelk.Unlock()
// Remove task from pending queue
delete(p.pendingTasks, task.Topic)
// Add task to active queue
if _, ok := p.activeTasks[task]; !ok {
p.activeTasks[task.Topic] = append(p.activeTasks[task.Topic], task)
p.activeWork += task.Work
}
}
func (p *PeerTracker) getPendingWork() int {
total := 0
for _, t := range p.pendingTasks {
total += t.Work
}
return total
}
// TaskDone signals that a task was completed for this peer.
func (p *PeerTracker) TaskDone(task *peertask.Task) {
p.activelk.Lock()
defer p.activelk.Unlock()
// Remove task from active queue
activeTasks, ok := p.activeTasks[task.Topic]
if !ok {
return
}
// There will usually be 0 through 2 of these, so this should always be fast.
newTasks := activeTasks[:0]
for _, t := range activeTasks {
if task == t {
p.activeWork -= t.Work
continue
}
newTasks = append(newTasks, t)
}
if p.activeWork < 0 {
panic("more tasks finished than started!")
}
if len(newTasks) == 0 {
delete(p.activeTasks, task.Topic)
} else {
// Garbage collection.
for i := len(newTasks); i < len(activeTasks); i++ {
activeTasks[i] = nil
}
p.activeTasks[task.Topic] = newTasks
}
}
// Remove removes the task with the given topic from this peer's queue
func (p *PeerTracker) Remove(topic peertask.Topic) bool {
t, ok := p.pendingTasks[topic]
if ok {
delete(p.pendingTasks, topic)
p.taskQueue.Remove(t.Index())
}
return ok
}
// Freeze increments the freeze value for this peer. While a peer is frozen
// (freeze value > 0) it will not execute tasks.
func (p *PeerTracker) Freeze() {
p.freezeVal++
}
// Thaw decrements the freeze value for this peer. While a peer is frozen
// (freeze value > 0) it will not execute tasks.
func (p *PeerTracker) Thaw() bool {
p.freezeVal -= (p.freezeVal + 1) / 2
return p.freezeVal <= 0
}
// FullThaw completely unfreezes this peer so it can execute tasks.
func (p *PeerTracker) FullThaw() {
p.freezeVal = 0
}
// IsFrozen returns whether this peer is frozen and unable to execute tasks.
func (p *PeerTracker) IsFrozen() bool {
return p.freezeVal > 0
}
// Indicates whether the new task adds any more information over tasks that are
// already in the active task queue
func (p *PeerTracker) taskHasMoreInfoThanActiveTasks(task peertask.Task) bool {
tasksWithTopic := p.activeTasks[task.Topic]
// No tasks with that topic, so the new task adds information
if len(tasksWithTopic) == 0 {
return true
}
return p.taskMerger.HasNewInfo(task, tasksWithTopic)
}