-
Notifications
You must be signed in to change notification settings - Fork 34
/
queue.go
112 lines (84 loc) · 1.97 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package util
import (
"bytes"
"sync"
)
// Queue is a simple queue structure to store and queue/requeue/dequeue bytes.
type Queue struct {
queue [][]byte
depth int
depthChan chan int
lock *sync.RWMutex
}
// NewQueue returns a prepared Queue object.
func NewQueue() *Queue {
depthChan := make(chan int, 1)
depthChan <- 0
return &Queue{
depthChan: depthChan,
lock: &sync.RWMutex{},
}
}
// Requeue prepends some bytes to the front of the queue.
func (q *Queue) Requeue(b []byte) {
q.lock.Lock()
defer q.lock.Unlock()
n := [][]byte{b}
q.queue = append(n, q.queue...)
q.depth++
<-q.depthChan
q.depthChan <- q.depth
}
// Enqueue queues some bytes at the end of the queue.
func (q *Queue) Enqueue(b []byte) {
q.lock.Lock()
defer q.lock.Unlock()
q.queue = append(q.queue, b)
q.depth++
<-q.depthChan
q.depthChan <- q.depth
}
// Dequeue returns the first bytes in the queue.
func (q *Queue) Dequeue() []byte {
// check the depth before acquiring a full read/write lock which can cause deadlocks with tons
// of concurrent access to enqueue/deque.
if q.getDepth() == 0 {
return nil
}
q.lock.Lock()
defer q.lock.Unlock()
b := q.queue[0]
q.queue = q.queue[1:]
q.depth--
<-q.depthChan
q.depthChan <- q.depth
return b
}
// DequeueAll returns all bytes in the queue.
func (q *Queue) DequeueAll() []byte {
if q.getDepth() == 0 {
return nil
}
q.lock.Lock()
defer q.lock.Unlock()
b := q.queue
q.queue = nil
q.depth = 0
<-q.depthChan
q.depthChan <- q.depth
return bytes.Join(b, []byte{})
}
func (q *Queue) getDepth() int {
// rather than locking/unlocking to access the q.depth, we simply grab the depth from the
// depthChan and then put it back in and return the value we got. this should be slightly faster
// and less cpu than locking/unlocking
d := <-q.depthChan
q.depthChan <- d
return d
}
// GetDepth returns the depth of the queue.
func (q *Queue) GetDepth() int {
q.lock.RLock()
defer q.lock.RUnlock()
return q.depth
}