-
Notifications
You must be signed in to change notification settings - Fork 4
/
wfq.go
442 lines (373 loc) · 10.9 KB
/
wfq.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
package wfq
import (
"container/heap"
"sync"
)
/*****************************************************************************
*
*****************************************************************************/
// An implementation of this interface is passed to NewQueue
// and used to obtain properties of items passed to Queue().
// Each method will be called only once per Queue()/item call
//
type Interface interface {
// Key returns the identity of the flow for the item.
// All items with the same key are placed in the same flow.
//
Key(item interface{}) uint64
// Size returns the size of the item.
// The value returned can be any unit but all items in a queue must be
// sized according to the same unit (e.g. bytes).
Size(item interface{}) uint64
// The weight/priority of the item. A higher value represents a higher
// priority. All items with a specific key should (but are not required to)
// have the same weight. Internally the Queue add 1 to the weight so that
// weight range is shifted from 0-255 to 1-256.
Weight(item interface{}) uint8
}
/*****************************************************************************
*
*****************************************************************************/
type heapItem struct {
fi *flowInfo
value interface{}
size uint64
weight uint8
key uint64
vft uint64
}
var hi_pool sync.Pool
func newHeapItem() interface{} {
return new(heapItem)
}
func getHeapItem() *heapItem {
return hi_pool.Get().(*heapItem)
}
func putHeapItem(hi *heapItem) {
hi.fi = nil
hi.value = nil
hi_pool.Put(hi)
}
/*****************************************************************************
*
*****************************************************************************/
type itemHeap []*heapItem
func (h *itemHeap) Len() int {
return len(*h)
}
func (h *itemHeap) Less(i, j int) bool {
return (*h)[i].vft < (*h)[j].vft
}
func (h *itemHeap) Swap(i, j int) {
(*h)[i], (*h)[j] = (*h)[j], (*h)[i]
}
func (h *itemHeap) Push(x interface{}) {
item := x.(*heapItem)
*h = append(*h, item)
}
func (h *itemHeap) Pop() interface{} {
old := *h
n := len(old)
item := old[n-1]
*h = old[0 : n-1]
old[n-1] = nil
return item
}
/*****************************************************************************
*
*****************************************************************************/
type overflowHeapItem struct {
hi *heapItem
arrord uint64
wg sync.WaitGroup
}
func (i *overflowHeapItem) less(o *overflowHeapItem) bool {
if i.hi.weight > o.hi.weight {
return true
} else if i.hi.weight == o.hi.weight && i.arrord < o.arrord {
return true
}
return false
}
var ohi_pool sync.Pool
func newOverflowHeapItem() interface{} {
return new(overflowHeapItem)
}
func getOverflowHeapItem() *overflowHeapItem {
return ohi_pool.Get().(*overflowHeapItem)
}
func putOverflowHeapItem(ohi *overflowHeapItem) {
ohi.hi = nil
ohi_pool.Put(ohi)
}
/*****************************************************************************
*
*****************************************************************************/
type itemOverflowHeap []*overflowHeapItem
func (h *itemOverflowHeap) Len() int {
return len(*h)
}
func (h *itemOverflowHeap) Less(i, j int) bool {
return (*h)[i].less((*h)[j])
}
func (h *itemOverflowHeap) Swap(i, j int) {
(*h)[i], (*h)[j] = (*h)[j], (*h)[i]
}
func (h *itemOverflowHeap) Push(x interface{}) {
item := x.(*overflowHeapItem)
*h = append(*h, item)
}
func (h *itemOverflowHeap) Pop() interface{} {
old := *h
n := len(old)
ohi := old[n-1]
*h = old[0 : n-1]
old[n-1] = nil
return ohi
}
/*****************************************************************************
*
*****************************************************************************/
type flowInfo struct {
cond sync.Cond
last_vft uint64
size uint64
pendSize uint64
weight uint8
inv_w uint64
}
var fi_pool sync.Pool
func newFlowInfo() interface{} {
return new(flowInfo)
}
func getFlowInfo() *flowInfo {
return fi_pool.Get().(*flowInfo)
}
func putFlowInfo(fi *flowInfo) {
fi.cond.L = nil
}
/*****************************************************************************
*
*****************************************************************************/
func init() {
hi_pool.New = newHeapItem
ohi_pool.New = newOverflowHeapItem
fi_pool.New = newFlowInfo
}
/*****************************************************************************
*
*****************************************************************************/
// A queue that implements a version of the weighted fair queue algorithm.
// When all items have the same weight then each flow's throughput will be
// <total throughput>/<number of flows>.
//
// If items have different weights, then all flows with the same weight will
// share their portion of the throughput evenly.
// Each weight "class" receives a portion of the total throughput according to
// the following the formula RWi/W1 + W2 ... + WN where R = total throughput
// and W1 through WN are the weights of the individual flows.
// If the total size of all items that passed through the queue was 10,000, and
// the weights of each of 3 flows was 1, 4 and 18, then the portion of the total
// that was dedicated to each flow would be 10000*1/(1+4+18) = 435 (4.35%),
// 10000*4/(1+4+18) = 1739 (17.39%) and 10000*18/(1+4+18) = 7826 (78.26%).
//
//
//
type Queue struct {
lock sync.Mutex
cond sync.Cond
closed bool
maxQueueSize uint64
maxFlowSize uint64
helper Interface
items itemHeap
overflow itemOverflowHeap
next_ohi *overflowHeapItem
flows map[uint64]*flowInfo
ovfcnt uint64
vt uint64
size uint64
wsum uint64
inv_wsum uint64
}
const (
scaledOne uint64 = 1 << 16
)
// Create a new Queue instance.
// If maxFlowSize > maxQueueSize or if helper is nil then it will panic.
// The maxFlowSize value limits the total size of all items that can be queued in a single flow.
// The maxQueueSize value limits the total size of all items that can be in the queue.
// It is recomeneded that maxQueueSize be set to maxFlowSize*<Max # of expected flows>, and
// that maxFlowSize be at least twice the largest expected item size.
//
func NewQueue(maxQueueSize, maxFlowSize uint64, helper Interface) *Queue {
if maxFlowSize > maxQueueSize {
panic("MaxFlowSize > MaxQueueSize")
}
if helper == nil {
panic("helper is nil")
}
q := new(Queue)
q.cond.L = &q.lock
q.maxQueueSize = maxQueueSize
q.maxFlowSize = maxFlowSize
q.helper = helper
q.flows = make(map[uint64]*flowInfo)
return q
}
// Place on item on the queue. Queue will not return (i.e. block) until the item can be placed on the queue
// or the queue was closed. If Queue returns true, then DeQueue will eventually return the item.
// If Queue returns false, then the item was not placed on the queue because the queue has been closed.
// Queue will panic if the size of the item is greater then maxFlowSize (set in NewQueue).
// Queue is safe for concurrent use.
//
func (q *Queue) Queue(item interface{}) bool {
hi := getHeapItem()
hi.value = item
hi.key = q.helper.Key(item)
hi.size = q.helper.Size(item)
hi.weight = q.helper.Weight(item)
if hi.size == 0 {
panic("Item size is zero")
}
if hi.size > q.maxFlowSize {
panic("Item size is larger than MaxFlowSize")
}
q.lock.Lock()
if q.closed {
q.lock.Unlock()
return false
}
// Get the flowInfo, or add one if there is none
fi, ok := q.flows[hi.key]
if !ok {
fi = getFlowInfo()
fi.cond.L = &q.lock
fi.last_vft = q.vt
fi.weight = hi.weight + 1
fi.inv_w = scaledOne / uint64(fi.weight)
q.flows[hi.key] = fi
q.wsum += uint64(fi.weight)
q.inv_wsum = scaledOne / uint64(q.wsum)
}
hi.fi = fi
// This prevents DeQueue from deleting the flowInfo from q.flows
// while the flow is till active
fi.pendSize += hi.size
// Wait till there is room in the flow queue
for !q.closed && fi.size+hi.size > q.maxFlowSize {
fi.cond.Wait()
}
if q.closed {
q.lock.Unlock()
return false
}
// Calculate the items virtual finish time
hi.vft = fi.last_vft + hi.size*fi.inv_w
fi.last_vft = hi.vft
// Add the item's size to the flow
fi.size += hi.size
// Subtract it's size from pendSize since it is no longer pending
fi.pendSize -= hi.size
if q.size+hi.size > q.maxQueueSize {
/*
The queue is full, place our request in the overflow heap.
Unlike the main heap, the overflow heap is strictly prioritized by
weight and arrival order. A higher priority flow could completely starve out
a lower priority flow if the incoming rate of the higher priority flow exceeds
the total outgoing rate.
*/
ohi := getOverflowHeapItem()
ohi.hi = hi
ohi.arrord = q.ovfcnt
q.ovfcnt++
ohi.wg.Add(1)
if q.next_ohi == nil {
q.next_ohi = ohi
} else {
if ohi.less(q.next_ohi) {
heap.Push(&q.overflow, q.next_ohi)
q.next_ohi = ohi
} else {
heap.Push(&q.overflow, ohi)
}
}
q.lock.Unlock()
ohi.wg.Wait()
putOverflowHeapItem(ohi)
if q.closed {
return false
}
} else {
q.size += hi.size
// The queue has room, place our item in the main heap
heap.Push(&q.items, hi)
q.cond.Signal()
q.lock.Unlock()
}
return true
}
// DeQueue removes the next item from the queue. DeQueue will not return (i.e. block) until an item can
// be returned or the queue is empty and closed. DeQueue will return an item and true if an item could be
// removed from the queue or nil and false, if the queue is empty and closed.
// DeQueue is safe for concurrent use.
//
func (q *Queue) DeQueue() (interface{}, bool) {
q.lock.Lock()
defer q.lock.Unlock()
if q.closed && q.items.Len() == 0 {
return nil, false
}
for !q.closed && q.items.Len() == 0 {
q.cond.Wait()
}
if q.closed && q.items.Len() == 0 {
return nil, false
}
hi := heap.Pop(&q.items).(*heapItem)
item := hi.value
q.vt += hi.size * q.inv_wsum
hi.fi.size -= hi.size
q.size -= hi.size
if hi.fi.size == 0 && hi.fi.pendSize == 0 {
// The flow is empty (i.e. inactive), delete it
delete(q.flows, hi.key)
q.wsum += uint64(hi.fi.weight)
q.inv_wsum = scaledOne / uint64(q.wsum)
putFlowInfo(hi.fi)
putHeapItem(hi)
} else {
hi.fi.cond.Signal()
putHeapItem(hi)
}
if !q.closed {
// While there is room in the queue move items from the overflow to the main heap.
for q.next_ohi != nil && q.size+q.next_ohi.hi.size <= q.maxQueueSize {
q.size += q.next_ohi.hi.size
heap.Push(&q.items, q.next_ohi.hi)
q.next_ohi.wg.Done()
if q.overflow.Len() > 0 {
q.next_ohi = heap.Pop(&q.overflow).(*overflowHeapItem)
} else {
q.next_ohi = nil
}
}
}
return item, true
}
func (q *Queue) Close() {
q.lock.Lock()
defer q.lock.Unlock()
q.closed = true
// All overflow requests get flushed
for q.next_ohi != nil {
q.next_ohi.wg.Done()
q.next_ohi = q.overflow.Pop().(*overflowHeapItem)
}
// Wake up all those waiting to get into a flow queue
for _, fi := range q.flows {
fi.cond.Broadcast()
}
// Wake up all DeQueue'ers
q.cond.Broadcast()
}