/
moveToFinished-14.lua
274 lines (232 loc) · 9.46 KB
/
moveToFinished-14.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
--[[
Move job from active to a finished status (completed o failed)
A job can only be moved to completed if it was active.
The job must be locked before it can be moved to a finished status,
and the lock must be released in this script.
Input:
KEYS[1] wait key
KEYS[2] active key
KEYS[3] prioritized key
KEYS[4] event stream key
KEYS[5] stalled key
-- Rate limiting
KEYS[6] rate limiter key
KEYS[7] delayed key
KEYS[8] paused key
KEYS[9] meta key
KEYS[10] pc priority counter
KEYS[11] completed/failed key
KEYS[12] jobId key
KEYS[13] metrics key
KEYS[14] marker key
ARGV[1] jobId
ARGV[2] timestamp
ARGV[3] msg property returnvalue / failedReason
ARGV[4] return value / failed reason
ARGV[5] target (completed/failed)
ARGV[6] fetch next?
ARGV[7] keys prefix
ARGV[8] opts
opts - token - lock token
opts - keepJobs
opts - lockDuration - lock duration in milliseconds
opts - attempts max attempts
opts - maxMetricsSize
opts - fpof - fail parent on fail
opts - rdof - remove dependency on fail
Output:
0 OK
-1 Missing key.
-2 Missing lock.
-3 Job not in active set
-4 Job has pending dependencies
-6 Lock is not owned by this client
Events:
'completed/failed'
]]
local rcall = redis.call
--- Includes
--- @include "includes/collectMetrics"
--- @include "includes/getNextDelayedTimestamp"
--- @include "includes/moveJobFromPriorityToActive"
--- @include "includes/prepareJobForProcessing"
--- @include "includes/moveParentFromWaitingChildrenToFailed"
--- @include "includes/moveParentToWaitIfNeeded"
--- @include "includes/promoteDelayedJobs"
--- @include "includes/removeJobsByMaxAge"
--- @include "includes/removeJobsByMaxCount"
--- @include "includes/removeParentDependencyKey"
--- @include "includes/trimEvents"
--- @include "includes/updateParentDepsIfNeeded"
--- @include "includes/getRateLimitTTL"
local jobIdKey = KEYS[12]
if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists
local opts = cmsgpack.unpack(ARGV[8])
local token = opts['token']
local attempts = opts['attempts']
local maxMetricsSize = opts['maxMetricsSize']
local maxCount = opts['keepJobs']['count']
local maxAge = opts['keepJobs']['age']
if token ~= "0" then
local lockKey = jobIdKey .. ':lock'
local lockToken = rcall("GET", lockKey)
if lockToken == token then
rcall("DEL", lockKey)
rcall("SREM", KEYS[5], ARGV[1])
else
if lockToken then
-- Lock exists but token does not match
return -6
else
-- Lock is missing completely
return -2
end
end
end
if rcall("SCARD", jobIdKey .. ":dependencies") ~= 0 then -- // Make sure it does not have pending dependencies
return -4
end
local parentReferences = rcall("HMGET", jobIdKey, "parentKey", "parent")
local parentKey = parentReferences[1] or ""
local parentId = ""
local parentQueueKey = ""
if parentReferences[2] ~= false then
local jsonDecodedParent = cjson.decode(parentReferences[2])
parentId = jsonDecodedParent['id']
parentQueueKey = jsonDecodedParent['queueKey']
end
local jobId = ARGV[1]
local timestamp = ARGV[2]
-- Remove from active list (if not active we shall return error)
local numRemovedElements = rcall("LREM", KEYS[2], -1, jobId)
if (numRemovedElements < 1) then return -3 end
local metaKey = KEYS[9]
-- Trim events before emiting them to avoid trimming events emitted in this script
trimEvents(metaKey, KEYS[4])
-- If job has a parent we need to
-- 1) remove this job id from parents dependencies
-- 2) move the job Id to parent "processed" set
-- 3) push the results into parent "results" list
-- 4) if parent's dependencies is empty, then move parent to "wait/paused". Note it may be a different queue!.
if parentId == "" and parentKey ~= "" then
parentId = getJobIdFromKey(parentKey)
parentQueueKey = getJobKeyPrefix(parentKey, ":" .. parentId)
end
if parentId ~= "" then
if ARGV[5] == "completed" then
local dependenciesSet = parentKey .. ":dependencies"
if rcall("SREM", dependenciesSet, jobIdKey) == 1 then
updateParentDepsIfNeeded(parentKey, parentQueueKey,
dependenciesSet, parentId, jobIdKey,
ARGV[4], timestamp)
end
else
if opts['fpof'] then
moveParentFromWaitingChildrenToFailed(parentQueueKey, parentKey,
parentId, jobIdKey,
timestamp)
elseif opts['rdof'] then
local dependenciesSet = parentKey .. ":dependencies"
if rcall("SREM", dependenciesSet, jobIdKey) == 1 then
moveParentToWaitIfNeeded(parentQueueKey, dependenciesSet,
parentKey, parentId, timestamp)
end
end
end
end
local attemptsMade = rcall("HINCRBY", jobIdKey, "atm", 1)
-- Remove job?
if maxCount ~= 0 then
local targetSet = KEYS[11]
-- Add to complete/failed set
rcall("ZADD", targetSet, timestamp, jobId)
rcall("HMSET", jobIdKey, ARGV[3], ARGV[4], "finishedOn", timestamp)
-- "returnvalue" / "failedReason" and "finishedOn"
-- Remove old jobs?
local prefix = ARGV[7]
if maxAge ~= nil then
removeJobsByMaxAge(timestamp, maxAge, targetSet, prefix)
end
if maxCount ~= nil and maxCount > 0 then
removeJobsByMaxCount(maxCount, targetSet, prefix)
end
else
rcall("DEL", jobIdKey, jobIdKey .. ':logs', jobIdKey .. ':processed')
if parentKey ~= "" then
removeParentDependencyKey(jobIdKey, false, parentKey)
end
end
rcall("XADD", KEYS[4], "*", "event", ARGV[5], "jobId", jobId, ARGV[3],
ARGV[4])
if ARGV[5] == "failed" then
if tonumber(attemptsMade) >= tonumber(attempts) then
rcall("XADD", KEYS[4], "*", "event", "retries-exhausted", "jobId",
jobId, "attemptsMade", attemptsMade)
end
end
-- Collect metrics
if maxMetricsSize ~= "" then
collectMetrics(KEYS[13], KEYS[13] .. ':data', maxMetricsSize, timestamp)
end
-- Try to get next job to avoid an extra roundtrip if the queue is not closing,
-- and not rate limited.
if (ARGV[6] == "1") then
local target, paused = getTargetQueueList(metaKey, KEYS[1], KEYS[8])
-- Check if there are delayed jobs that can be promoted
promoteDelayedJobs(KEYS[7], KEYS[14], target, KEYS[3], KEYS[4], ARGV[7],
timestamp, KEYS[10], paused)
local maxJobs = tonumber(opts['limiter'] and opts['limiter']['max'])
-- Check if we are rate limited first.
local expireTime = getRateLimitTTL(maxJobs, KEYS[6])
if expireTime > 0 then return {0, 0, expireTime, 0} end
-- paused queue
if paused then return {0, 0, 0, 0} end
jobId = rcall("RPOPLPUSH", KEYS[1], KEYS[2])
if jobId then
-- Markers in waitlist DEPRECATED in v5: Remove in v6.
if string.sub(jobId, 1, 2) == "0:" then
rcall("LREM", KEYS[2], 1, jobId)
-- If jobId is special ID 0:delay (delay greater than 0), then there is no job to process
-- but if ID is 0:0, then there is at least 1 prioritized job to process
if jobId == "0:0" then
jobId = moveJobFromPriorityToActive(KEYS[3], KEYS[2],
KEYS[10])
return prepareJobForProcessing(KEYS, ARGV[7], target, jobId,
timestamp, maxJobs,
expireTime, opts)
end
else
return prepareJobForProcessing(KEYS, ARGV[7], target, jobId,
timestamp, maxJobs, expireTime,
opts)
end
else
jobId = moveJobFromPriorityToActive(KEYS[3], KEYS[2], KEYS[10])
if jobId then
return prepareJobForProcessing(KEYS, ARGV[7], target, jobId,
timestamp, maxJobs, expireTime,
opts)
end
end
-- Return the timestamp for the next delayed job if any.
local nextTimestamp = getNextDelayedTimestamp(KEYS[7])
if nextTimestamp ~= nil then
-- The result is guaranteed to be positive, since the
-- ZRANGEBYSCORE command would have return a job otherwise.
return {0, 0, 0, nextTimestamp}
end
end
local waitLen = rcall("LLEN", KEYS[1])
if waitLen == 0 then
local activeLen = rcall("LLEN", KEYS[2])
if activeLen == 0 then
local prioritizedLen = rcall("ZCARD", KEYS[3])
if prioritizedLen == 0 then
rcall("XADD", KEYS[4], "*", "event", "drained")
end
end
end
return 0
else
return -1
end