-
Notifications
You must be signed in to change notification settings - Fork 211
/
priorityq.go
121 lines (100 loc) · 2.95 KB
/
priorityq.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package priorityq
import (
"errors"
)
var (
// ErrUnknownPriority indicates an incorrect attempt to make a write with an unknown priority
ErrUnknownPriority = errors.New("unknown priority")
// ErrQueueClosed indicates an attempt to read from a closed priority queue instance
ErrQueueClosed = errors.New("the queue is closed")
// ErrUnexpected indicates an unexpected error attempting to read from the queue
ErrUnexpected = errors.New("unexpected error reading from queues")
)
// Priority is the type that indicates the priority of different queues
type Priority int
const (
prioritiesCount = 3 // the number of priorities
// High indicates the highest priority
High = Priority(0)
// Mid indicates the medium priority
Mid = Priority(1)
// Low indicates the lowest priority
Low = Priority(2)
)
// Queue is the priority queue
type Queue struct {
waitCh chan struct{} // the channel that indicates if there is a message ready in the queue
queues []chan interface{} // the queues where the index is the priority
}
// PriorityQueue is the interface used to interact with the queue
type PriorityQueue interface {
Write(prio Priority, m interface{}) error
Read() (interface{}, error)
Close()
Length() int
}
// New returns a new priority queue where each priority has a buffer of prioQueueLimit
func New(prioQueueLimit int) *Queue {
// set queue for each priority
qs := make([]chan interface{}, prioritiesCount)
for i := range qs {
qs[i] = make(chan interface{}, prioQueueLimit)
}
return &Queue{
waitCh: make(chan struct{}, prioQueueLimit*prioritiesCount),
queues: qs,
}
}
// Write a message m to the associated queue with the provided priority
// This method blocks iff the queue is full
// Returns an error iff a queue does not exist for the provided name
// Note: writing to the pq after a call to close is forbidden and will result in a panic
func (pq *Queue) Write(prio Priority, m interface{}) error {
if int(prio) >= cap(pq.queues) {
return ErrUnknownPriority
}
pq.queues[prio] <- m
pq.waitCh <- struct{}{}
return nil
}
// Read returns the next message by priority
// An error is returned iff the priority queue has been closed
func (pq *Queue) Read() (interface{}, error) {
<-pq.waitCh // wait for m
// pick by priority
for _, q := range pq.queues {
if q == nil { // if not set just continue
continue
}
// if set, try read
select {
case m, ok := <-q:
if !ok {
// channels are starting to close
return nil, ErrQueueClosed
}
return m, nil
default: // empty, try next
continue
}
}
// should be unreachable
return nil, ErrUnexpected
}
// Length returns the number of pending messages in all of the queues
func (pq *Queue) Length() (length int) {
for _, q := range pq.queues {
length += len(q)
}
return
}
// Close the priority queue
// No messages should be expected to be read after a call to Close
func (pq *Queue) Close() {
for _, q := range pq.queues {
if q != nil {
close(q)
}
}
close(pq.waitCh)
}