-
Notifications
You must be signed in to change notification settings - Fork 350
/
updateDelaySet-7.lua
71 lines (57 loc) · 1.89 KB
/
updateDelaySet-7.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
--[[
Updates the delay set, by picking a delayed job that should
be processed now.
Input:
KEYS[1] 'delayed'
KEYS[2] 'wait'
KEYS[3] 'priority'
KEYS[4] 'paused'
KEYS[5] 'meta'
KEYS[6] event's stream
KEYS[7] delayed stream
ARGV[1] queue.toKey('')
ARGV[2] delayed timestamp
Events:
'waiting'
]]
local rcall = redis.call
-- Includes
--- @include "includes/getTargetQueueList"
-- Try to get as much as 1000 jobs at once
local jobs = rcall("ZRANGEBYSCORE", KEYS[1], 0, tonumber(ARGV[2]) * 0x1000,
"LIMIT", 0, 1000)
if (#jobs > 0) then
rcall("ZREM", KEYS[1], unpack(jobs))
-- check if we need to use push in paused instead of waiting
local target = getTargetQueueList(KEYS[5], KEYS[2], KEYS[4])
for _, jobId in ipairs(jobs) do
local priority =
tonumber(rcall("HGET", ARGV[1] .. jobId, "priority")) or 0
if priority == 0 then
-- LIFO or FIFO
rcall("LPUSH", target, jobId)
else
-- Priority add
rcall("ZADD", KEYS[3], priority, jobId)
local count = rcall("ZCOUNT", KEYS[3], 0, priority)
local len = rcall("LLEN", target)
local id = rcall("LINDEX", target, len - (count - 1))
if id then
rcall("LINSERT", target, "BEFORE", id, jobId)
else
rcall("RPUSH", target, jobId)
end
end
-- Emit waiting event
rcall("XADD", KEYS[6], "*", "event", "waiting", "jobId", jobId, "prev",
"delayed")
rcall("HSET", ARGV[1] .. jobId, "delay", 0)
end
end
local nextTimestamp = rcall("ZRANGE", KEYS[1], 0, 0, "WITHSCORES")[2]
local id
if (nextTimestamp ~= nil) then
nextTimestamp = nextTimestamp / 0x1000
id = rcall("XADD", KEYS[7], "*", "nextTimestamp", nextTimestamp)
end
return {nextTimestamp, id}