/
queues.go
209 lines (189 loc) · 4.82 KB
/
queues.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
package tasks
import (
"container/ring"
"time"
)
func newMergeTaskQueue(group string) *mergeTaskQueue {
return &mergeTaskQueue{
name: group,
tasks: make([]Task, 0),
cleanupTimestamp: time.Now(),
}
}
type mergeTaskQueue struct {
name string
tasks []Task
cleanupTimestamp time.Time
}
// len returns the length of taskQueue.
func (q *mergeTaskQueue) len() int {
return len(q.tasks)
}
// push add a new task to the end of taskQueue.
func (q *mergeTaskQueue) push(t Task) {
q.tasks = append(q.tasks, t)
}
// front returns the first element of taskQueue,
// returns nil if task queue is empty.
func (q *mergeTaskQueue) front() Task {
if q.len() > 0 {
return q.tasks[0]
}
return nil
}
// pop pops the first element of taskQueue,
func (q *mergeTaskQueue) pop() {
if q.len() > 0 {
q.tasks = q.tasks[1:]
if q.len() == 0 {
q.cleanupTimestamp = time.Now()
}
}
}
// Return true if user based task is empty and empty for d time.
func (q *mergeTaskQueue) expire(d time.Duration) bool {
if q.len() != 0 {
return false
}
if time.Since(q.cleanupTimestamp) > d {
return true
}
return false
}
// tryMerge try to a new task to any task in queue.
func (q *mergeTaskQueue) tryMerge(task MergeTask, maxNQ int64) bool {
nqRest := maxNQ - task.NQ()
// No need to perform any merge if task.nq is greater than maxNQ.
if nqRest <= 0 {
return false
}
for i := q.len() - 1; i >= 0; i-- {
if taskInQueue := tryIntoMergeTask(q.tasks[i]); taskInQueue != nil {
// Try to merge it if limit of nq is enough.
if taskInQueue.NQ() <= nqRest && taskInQueue.MergeWith(task) {
return true
}
}
}
return false
}
// newFairPollingTaskQueue create a fair polling task queue.
func newFairPollingTaskQueue() *fairPollingTaskQueue {
return &fairPollingTaskQueue{
count: 0,
route: make(map[string]*ring.Ring),
checkpoint: nil,
}
}
// fairPollingTaskQueue is a fairly polling queue.
type fairPollingTaskQueue struct {
count int
route map[string]*ring.Ring
checkpoint *ring.Ring
}
// len returns the item count in FairPollingQueue.
func (q *fairPollingTaskQueue) len() int {
return q.count
}
// groupLen returns the length of a group.
func (q *fairPollingTaskQueue) groupLen(group string) int {
if r, ok := q.route[group]; ok {
return r.Value.(*mergeTaskQueue).len()
}
return 0
}
// tryMergeWithOtherGroup try to merge given task into exists tasks in the other group.
func (q *fairPollingTaskQueue) tryMergeWithOtherGroup(group string, task MergeTask, maxNQ int64) bool {
if q.count == 0 {
return false
}
// Try to merge task into other group before checkpoint.
node := q.checkpoint.Prev()
queuesLen := q.checkpoint.Len()
for i := 0; i < queuesLen; i++ {
prev := node.Prev()
queue := node.Value.(*mergeTaskQueue)
if queue.len() == 0 || queue.name == group {
continue
}
if queue.tryMerge(task, maxNQ) {
return true
}
node = prev
}
return false
}
// tryMergeWithSameGroup try to merge given task into exists tasks in the same group.
func (q *fairPollingTaskQueue) tryMergeWithSameGroup(group string, task MergeTask, maxNQ int64) bool {
if q.count == 0 {
return false
}
// Applied to task with same group first.
if r, ok := q.route[group]; ok {
// Try to merge task into queue.
if r.Value.(*mergeTaskQueue).tryMerge(task, maxNQ) {
return true
}
}
return false
}
// push add a new task into queue, try merge first.
func (q *fairPollingTaskQueue) push(group string, task Task) {
// Add a new task.
if r, ok := q.route[group]; ok {
// Add new task to the back of queue if queue exist.
r.Value.(*mergeTaskQueue).push(task)
} else {
// Create a new task queue, and add it to the route and queues.
newQueue := newMergeTaskQueue(group)
newQueue.push(task)
newRing := ring.New(1)
newRing.Value = newQueue
q.route[group] = newRing
if q.checkpoint == nil {
// Create new ring if not exist.
q.checkpoint = newRing
} else {
// Add the new ring before the checkpoint.
q.checkpoint.Prev().Link(newRing)
}
}
q.count++
}
// pop pop next ready task.
func (q *fairPollingTaskQueue) pop(queueExpire time.Duration) (task Task) {
// Return directly if there's no task exists.
if q.count == 0 {
return
}
checkpoint := q.checkpoint
queuesLen := q.checkpoint.Len()
for i := 0; i < queuesLen; i++ {
next := checkpoint.Next()
// Find task in this queue.
queue := checkpoint.Value.(*mergeTaskQueue)
// empty task queue for this user.
if queue.len() == 0 {
// expire the queue.
if queue.expire(queueExpire) {
delete(q.route, queue.name)
if checkpoint.Len() == 1 {
checkpoint = nil
break
} else {
checkpoint.Prev().Unlink(1)
}
}
checkpoint = next
continue
}
task = queue.front()
queue.pop()
q.count--
checkpoint = next
break
}
// Update checkpoint.
q.checkpoint = checkpoint
return
}