forked from nyaruka/courier
-
Notifications
You must be signed in to change notification settings - Fork 2
/
queue.go
274 lines (227 loc) · 9.03 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
package queue
import (
"strconv"
"sync"
"time"
"github.com/garyburd/redigo/redis"
"github.com/sirupsen/logrus"
)
// Priority represents the priority of an item in a queue
type Priority int64
// WorkerToken represents a token that a worker should return when a task is complete
type WorkerToken string
const (
// DefaultPriority is the normal priority for msgs, messages are sent first in first out
DefaultPriority = 1
// BulkPriority is our priority for bulk messages (sent in batches) These will only be
// processed after all default priority messages are deault with
BulkPriority = 0
)
const (
// EmptyQueue means there are no items to retrive, caller should sleep and try again later
EmptyQueue = WorkerToken("empty")
// Retry means the caller should immediately call again to get the next value
Retry = WorkerToken("retry")
)
var luaPush = redis.NewScript(6, `-- KEYS: [EpochMS, QueueType, QueueName, TPS, Priority, Value]
-- first push onto our specific queue
-- our queue name is built from the type, name and tps, usually something like: "msgs:uuid1-uuid2-uuid3-uuid4|tps"
local queueKey = KEYS[2] .. ":" .. KEYS[3] .. "|" .. KEYS[4]
-- our priority queue name also includes the priority of the message (we have one queue for default and one for bulk)
local priorityQueueKey = queueKey .. "/" .. KEYS[5]
redis.call("zadd", priorityQueueKey, KEYS[1], KEYS[6])
local tps = tonumber(KEYS[4])
-- if we have a TPS, check whether we are currently throttled
local curr = -1
if tps > 0 then
local tpsKey = queueKey .. ":tps:" .. math.floor(KEYS[1])
curr = tonumber(redis.call("get", tpsKey))
end
-- if we aren't then add to our active
if not curr or curr < tps then
redis.call("zincrby", KEYS[2] .. ":active", 0, queueKey)
return 1
else
return 0
end
`)
// PushOntoQueue pushes the passed in value to the passed in queue, making sure that no more than the
// specified transactions per second are popped off at a time. A tps value of 0 means there is no
// limit to the rate that messages can be consumed
func PushOntoQueue(conn redis.Conn, qType string, queue string, tps int, value string, priority Priority) error {
epochMS := strconv.FormatFloat(float64(time.Now().UnixNano()/int64(time.Microsecond))/float64(1000000), 'f', 6, 64)
_, err := redis.Int(luaPush.Do(conn, epochMS, qType, queue, tps, priority, value))
return err
}
var luaPop = redis.NewScript(2, `-- KEYS: [EpochMS QueueType]
-- get the first key off our active list
local result = redis.call("zrange", KEYS[2] .. ":active", 0, 0, "WITHSCORES")
local queue = result[1]
local workers = result[2]
-- nothing? return nothing
if not queue then
return {"empty", ""}
end
-- figure out our max transaction per second
local delim = string.find(queue, "|")
local tps = 0
local tpsKey = ""
if delim then
tps = tonumber(string.sub(queue, delim+1))
end
-- if we have a tps, then check whether we exceed it
if tps > 0 then
tpsKey = queue .. ":tps:" .. math.floor(KEYS[1])
local curr = redis.call("get", tpsKey)
-- we are at or above our tps, move to our throttled queue
if curr and tonumber(curr) >= tps then
redis.call("zincrby", KEYS[2] .. ":throttled", workers, queue)
redis.call("zrem", KEYS[2] .. ":active", queue)
return {"retry", ""}
end
end
-- pop our next value out, first from our default queue
local resultQueue = queue .. "/1"
local result = redis.call("zrangebyscore", resultQueue, 0, "+inf", "WITHSCORES", "LIMIT", 0, 1)
-- keep track as to whether this result is in the future (and therefore ineligible)
local isFutureResult = result[1] and tonumber(result[2]) > tonumber(KEYS[1])
-- if we didn't find one, try again from our bulk queue
if not result[1] or isFutureResult then
local bulkQueue = queue .. "/0"
local bulkResult = redis.call("zrangebyscore", bulkQueue, 0, "+inf", "WITHSCORES", "LIMIT", 0, 1)
-- if we got a result
if bulkResult[1] then
-- if it is in the future, set ourselves as in the future
if tonumber(bulkResult[2]) > tonumber(KEYS[1]) then
isFutureResult = true
-- otherwise, this is a valid result
else
redis.call("echo", "found result")
isFutureResult = false
result = bulkResult
resultQueue = bulkQueue
end
end
end
-- if we found one
if result[1] and not isFutureResult then
-- then remove it from the queue
redis.call('zremrangebyrank', resultQueue, 0, 0)
-- increment our tps for this second if we have a limit
if tps > 0 then
redis.call("incr", tpsKey)
redis.call("expire", tpsKey, 10)
end
-- and add a worker to this queue
redis.call("zincrby", KEYS[2] .. ":active", 1, queue)
-- is this a compound message? (a JSON array, if so, we return the first element but schedule the others
-- for 5 seconds from now
local popValue = result[1]
if string.sub(popValue, 1, 1) == "[" then
-- parse it as JSON to get the first element out
local valueList = cjson.decode(popValue)
popValue = cjson.encode(valueList[1])
table.remove(valueList, 1)
-- encode it back if there is anything left
if table.getn(valueList) > 0 then
local remaining = ""
if table.getn(valueList) == 1 then
remaining = cjson.encode(valueList[1])
else
remaining = cjson.encode(valueList)
end
-- schedule it in the future 5 seconds on our main queue
redis.call("zadd", queue .. "/1", tonumber(KEYS[1]) + 5, remaining)
redis.call("zincrby", KEYS[2] .. ":future", 0, queue)
end
end
return {queue, popValue}
-- otherwise, the queue only contains future results, remove from active and add to future, have the caller retry
elseif isFutureResult then
redis.call("zincrby", KEYS[2] .. ":future", 0, queue)
redis.call("zrem", KEYS[2] .. ":active", queue)
return {"retry", ""}
-- otherwise, the queue is empty, remove it from active
else
redis.call("zrem", KEYS[2] .. ":active", queue)
return {"retry", ""}
end
`)
// PopFromQueue pops the next available message from the passed in queue. If QueueRetry
// is returned the caller should immediately make another call to get the next value. A
// worker token of EmptyQueue will be returned if there are no more items to retrive.
// Otherwise the WorkerToken should be saved in order to mark the task as complete later.
func PopFromQueue(conn redis.Conn, qType string) (WorkerToken, string, error) {
epochMS := strconv.FormatFloat(float64(time.Now().UnixNano()/int64(time.Microsecond))/float64(1000000), 'f', 6, 64)
values, err := redis.Strings(luaPop.Do(conn, epochMS, qType))
if err != nil {
logrus.Error(err)
return "", "", err
}
return WorkerToken(values[0]), values[1], nil
}
var luaComplete = redis.NewScript(2, `-- KEYS: [QueueType, Queue]
-- decrement throttled if present
local throttled = tonumber(redis.call("zadd", KEYS[1] .. ":throttled", "XX", "CH", "INCR", -1, KEYS[2]))
-- if we didn't decrement anything, do so to our active set
if not throttled or throttled == 0 then
local active = tonumber(redis.call("zincrby", KEYS[1] .. ":active", -1, KEYS[2]))
-- reset to zero if we somehow go below
if active < 0 then
redis.call("zadd", KEYS[1] .. ":active", 0, KEYS[2])
end
end
`)
// MarkComplete marks a task as complete for the passed in queue and queue result. It is
// important for callers to call this so that workers are evenly spread across all
// queues with jobs in them
func MarkComplete(conn redis.Conn, qType string, token WorkerToken) error {
_, err := luaComplete.Do(conn, qType, token)
return err
}
var luaDethrottle = redis.NewScript(1, `-- KEYS: [QueueType]
-- get all the keys from our throttle list
local throttled = redis.call("zrange", KEYS[1] .. ":throttled", 0, -1, "WITHSCORES")
-- add them to our active list
if next(throttled) then
local activeKey = KEYS[1] .. ":active"
for i=1,#throttled,2 do
redis.call("zincrby", activeKey, throttled[i+1], throttled[i])
end
redis.call("del", KEYS[1] .. ":throttled")
end
-- get all the keys in the future
local future = redis.call("zrange", KEYS[1] .. ":future", 0, -1, "WITHSCORES")
-- add them to our active list
if next(future) then
local activeKey = KEYS[1] .. ":active"
for i=1,#future,2 do
redis.call("zincrby", activeKey, future[i+1], future[i])
end
redis.call("del", KEYS[1] .. ":future")
end
`)
// StartDethrottler starts a goroutine responsible for dethrottling any queues that were
// throttled every second. The passed in quitter chan can be used to shut down the goroutine
func StartDethrottler(redis *redis.Pool, quitter chan bool, wg *sync.WaitGroup, qType string) {
go func() {
wg.Add(1)
// figure out our next delay, we want to land just on the other side of a second boundary
delay := time.Second - time.Duration(time.Now().UnixNano()%int64(time.Second))
for true {
select {
case <-quitter:
wg.Done()
return
case <-time.After(delay):
conn := redis.Get()
_, err := luaDethrottle.Do(conn, qType)
if err != nil {
logrus.WithError(err).Error("error dethrottling")
}
conn.Close()
delay = time.Second - time.Duration(time.Now().UnixNano()%int64(time.Second))
}
}
}()
}