forked from asynkron/protoactor-go
/
priority_queue.go
70 lines (59 loc) · 2.02 KB
/
priority_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package actor
// A priority queue is a sort of meta-queue that uses a queue per priority level.
// The underlying queues can be anything that implements the queue interface.
//
// Messages that implement the PriorityMessage interface (i.e. have a GetPriority
// method) will be consumed in priority order first, queue order second. So if a
// higher priority message arrives, it will jump to the front of the queue from
// the consumer's perspective.
//
// There are 8 priority levels (0-7) because having too many levels impacts
// performance. And 8 priority levels ought to be enough for anybody. ;)
// This means your GetPriority method should return int8s between 0 and 7. If any
// return values are higher or lower, they will be reset to 7 or 0, respectively.
//
// The default priority level is 4 for messages that don't implement PriorityMessage.
// If you want your message processed sooner than un-prioritized messages, have its
// GetPriority method return a larger int8 value.
// Likewise, if you'd like to de-prioritize your message, have its GetPriority method
// return an int8 less than 4.
const (
priorityLevels = 8
DefaultPriority = int8(priorityLevels / 2)
)
type PriorityMessage interface {
GetPriority() int8
}
type priorityQueue struct {
priorityQueues []queue
}
func NewPriorityQueue(queueProducer func() queue) *priorityQueue {
q := &priorityQueue{
priorityQueues: make([]queue, priorityLevels),
}
for p := 0; p < priorityLevels; p++ {
q.priorityQueues[p] = queueProducer()
}
return q
}
func (q *priorityQueue) Push(item interface{}) {
itemPriority := DefaultPriority
if priorityItem, ok := item.(PriorityMessage); ok {
itemPriority = priorityItem.GetPriority()
if itemPriority < 0 {
itemPriority = 0
}
if itemPriority > priorityLevels-1 {
itemPriority = priorityLevels - 1
}
}
q.priorityQueues[itemPriority].Push(item)
}
func (q *priorityQueue) Pop() interface{} {
for p := priorityLevels - 1; p >= 0; p-- {
if item := q.priorityQueues[p].Pop(); item != nil {
return item
}
}
return nil
}