/
addJob-9.lua
180 lines (154 loc) · 5.39 KB
/
addJob-9.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
--[[
Adds a job to the queue by doing the following:
- Increases the job counter if needed.
- Creates a new job key with the job data.
- if delayed:
- computes timestamp.
- adds to delayed zset.
- Emits a global event 'delayed' if the job is delayed.
- if not delayed
- Adds the jobId to the wait/paused list in one of three ways:
- LIFO
- FIFO
- prioritized.
- Adds the job to the "added" list so that workers gets notified.
Input:
KEYS[1] 'wait',
KEYS[2] 'paused'
KEYS[3] 'meta'
KEYS[4] 'id'
KEYS[5] 'delayed'
KEYS[6] 'priority'
KEYS[7] 'completed'
KEYS[8] events stream key
KEYS[9] delay stream key
ARGV[1] msgpacked arguments array
[1] key prefix,
[2] custom id (will not generate one automatically)
[3] name
[4] timestamp
[5] parentKey?
[6] waitChildrenKey key.
[7] parent dependencies key.
[8] repeat job key
ARGV[2] Json stringified job data
ARGV[3] msgpacked options
Output:
jobId - OK
-5 - Missing parent key
]]
local jobId
local jobIdKey
local rcall = redis.call
local args = cmsgpack.unpack(ARGV[1])
local data = ARGV[2]
local opts = cmsgpack.unpack(ARGV[3])
local parentKey = args[5]
local repeatJobKey = args[8]
local parentId
local parentQueueKey
local parentData
-- Includes
--- @include "includes/destructureJobKey"
--- @include "includes/getTargetQueueList"
--- @include "includes/trimEvents"
if parentKey ~= nil then
if rcall("EXISTS", parentKey) ~= 1 then
return -5
end
parentId = getJobIdFromKey(parentKey)
parentQueueKey = getJobKeyPrefix(parentKey, ":" .. parentId)
local parent = {}
parent['id'] = parentId
parent['queueKey'] = parentQueueKey
parentData = cjson.encode(parent)
end
local jobCounter = rcall("INCR", KEYS[4])
-- Includes
--- @include "includes/updateParentDepsIfNeeded"
-- Trim events before emiting them to avoid trimming events emitted in this script
trimEvents(KEYS[3], KEYS[8])
local parentDependenciesKey = args[7]
if args[2] == "" then
jobId = jobCounter
jobIdKey = args[1] .. jobId
else
jobId = args[2]
jobIdKey = args[1] .. jobId
if rcall("EXISTS", jobIdKey) == 1 then
if parentKey ~= nil then
if rcall("ZSCORE", KEYS[7], jobId) ~= false then
local returnvalue = rcall("HGET", jobIdKey, "returnvalue")
updateParentDepsIfNeeded(parentKey, parentQueueKey, parentDependenciesKey, parentId, jobIdKey, returnvalue)
else
if parentDependenciesKey ~= nil then
rcall("SADD", parentDependenciesKey, jobIdKey)
end
end
rcall("HMSET", jobIdKey, "parentKey", parentKey, "parent", parentData)
end
return jobId .. "" -- convert to string
end
end
-- Store the job.
local jsonOpts = cjson.encode(opts)
local delay = opts['delay'] or 0
local priority = opts['priority'] or 0
local timestamp = args[4]
local optionalValues = {}
if parentKey ~= nil then
table.insert(optionalValues, "parentKey")
table.insert(optionalValues, parentKey)
table.insert(optionalValues, "parent")
table.insert(optionalValues, parentData)
end
if repeatJobKey ~= nil then
table.insert(optionalValues, "rjk")
table.insert(optionalValues, repeatJobKey)
end
rcall("HMSET", jobIdKey, "name", args[3], "data", ARGV[2], "opts", jsonOpts,
"timestamp", timestamp, "delay", delay, "priority", priority, unpack(optionalValues))
-- TODO: do not send data and opts to the event added (for performance reasons).
rcall("XADD", KEYS[8], "*", "event", "added", "jobId", jobId, "name", args[3], "data", ARGV[2], "opts", jsonOpts)
-- Check if job is delayed
local delayedTimestamp = (delay > 0 and (timestamp + delay)) or 0
-- Check if job is a parent, if so add to the parents set
local waitChildrenKey = args[6]
if waitChildrenKey ~= nil then
rcall("ZADD", waitChildrenKey, timestamp, jobId)
rcall("XADD", KEYS[8], "*", "event", "waiting-children", "jobId", jobId)
elseif (delayedTimestamp ~= 0) then
local timestamp = delayedTimestamp * 0x1000 + bit.band(jobCounter, 0xfff)
rcall("ZADD", KEYS[5], timestamp, jobId)
rcall("XADD", KEYS[8], "*", "event", "delayed", "jobId", jobId, "delay",
delayedTimestamp)
rcall("XADD", KEYS[9], "*", "nextTimestamp", delayedTimestamp)
else
local target = getTargetQueueList(KEYS[3], KEYS[1], KEYS[2])
-- Standard or priority add
if priority == 0 then
-- LIFO or FIFO
local pushCmd = opts['lifo'] and 'RPUSH' or 'LPUSH';
rcall(pushCmd, target, jobId)
else
-- Priority add
rcall("ZADD", KEYS[6], priority, jobId)
local count = rcall("ZCOUNT", KEYS[6], 0, priority)
local len = rcall("LLEN", target)
local id = rcall("LINDEX", target, len - (count - 1))
if id then
rcall("LINSERT", target, "BEFORE", id, jobId)
else
rcall("RPUSH", target, jobId)
end
end
-- Emit waiting event
rcall("XADD", KEYS[8], "*", "event", "waiting", "jobId", jobId)
end
-- Check if this job is a child of another job, if so add it to the parents dependencies
-- TODO: Should not be possible to add a child job to a parent that is not in the "waiting-children" status.
-- fail in this case.
if parentDependenciesKey ~= nil then
rcall("SADD", parentDependenciesKey, jobIdKey)
end
return jobId .. "" -- convert to string