-
Notifications
You must be signed in to change notification settings - Fork 210
/
priority.go
145 lines (118 loc) · 3.15 KB
/
priority.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package priorityq
import (
"container/heap"
"errors"
"sync"
"github.com/spacemeshos/go-spacemesh/common/util"
)
var (
// ErrEmptyQueue is returned on attempt to read from an empty queue.
ErrEmptyQueue = errors.New("attempt to read from an empty queue")
// ErrClosed is returned when a queue is closed.
ErrClosed = errors.New("queue is closed")
)
// PriorityQueue is the interface used to interact with the queue.
type PriorityQueue interface {
Write(priority Priority, value interface{}) error
Read() (interface{}, error)
Close()
Length() int
}
// Priority is the type that indicates the priority of different queues.
type Priority int
const (
// High indicates the highest priority.
High = Priority(0)
// Mid indicates the medium priority.
Mid = Priority(1)
// Low indicates the lowest priority.
Low = Priority(2)
)
// An HeapQueueItem is something we manage in a priority queue.
type HeapQueueItem struct {
index int
value interface{}
priority Priority
}
// A HeapQueue implements priority.Interface using container/heap and holds Items.
type HeapQueue struct {
util.Closer
mu sync.Mutex
queue []*HeapQueueItem
}
// New creates a new priority queue based on HeapQueue.
func New() PriorityQueue {
pq := &HeapQueue{
Closer: util.NewCloser(),
queue: make([]*HeapQueueItem, 0),
}
heap.Init(pq)
return pq
}
// Close closes priority queue.
func (pq *HeapQueue) Close() {
pq.Closer.Close()
}
// Length returns priority queue length. It's just a wrapper for Len now.
func (pq *HeapQueue) Length() int {
pq.mu.Lock()
defer pq.mu.Unlock()
return pq.Len()
}
// Read reads a value from the priority queue. It's a wrapper for Pop.
func (pq *HeapQueue) Read() (interface{}, error) {
pq.mu.Lock()
defer pq.mu.Unlock()
if pq.IsClosed() {
return nil, ErrClosed
}
if pq.Len() == 0 {
return nil, ErrEmptyQueue
}
// We may consider adding more logic in the future, so it always returns nil error for now.
return heap.Pop(pq).(*HeapQueueItem).value, nil
}
// Write pushes a value in the priority queue. It's a wrapper for Push.
func (pq *HeapQueue) Write(priority Priority, value interface{}) error {
pq.mu.Lock()
defer pq.mu.Unlock()
if pq.IsClosed() {
return ErrClosed
}
heap.Push(pq, &HeapQueueItem{
value: value,
priority: priority,
})
// We may consider adding more logic in the future, so it always returns nil error for now.
return nil
}
// Len implements heap.Interface.
func (pq *HeapQueue) Len() int { return len(pq.queue) }
// Less implements heap.Interface.
func (pq *HeapQueue) Less(i, j int) bool {
// Pop lowest first
return pq.queue[i].priority < pq.queue[j].priority
}
// Swap implements heap.Interface.
func (pq *HeapQueue) Swap(i, j int) {
pq.queue[i], pq.queue[j] = pq.queue[j], pq.queue[i]
pq.queue[i].index = i
pq.queue[j].index = j
}
// Push implements heap.Interface.
func (pq *HeapQueue) Push(x interface{}) {
n := len(pq.queue)
item := x.(*HeapQueueItem)
item.index = n
pq.queue = append(pq.queue, item)
}
// Pop implements heap.Interface.
func (pq *HeapQueue) Pop() interface{} {
old := pq.queue
n := len(old)
item := old[n-1]
old[n-1] = nil
item.index = -1
pq.queue = old[0 : n-1]
return item
}