Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(priority): change priority as a new state #1984

Merged
merged 33 commits into from
Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
193b92a
perf(priority): change priority as a new state
roggervalf Jun 14, 2023
d0b196e
refactor(priority): reuse addJobWithPriority include
roggervalf Jun 14, 2023
0f87cf8
chore: pass paused param in missing places
roggervalf Jun 14, 2023
d42e66c
refactor(change-priority): consider last job in wait list
roggervalf Jun 14, 2023
f386db6
refactor(pause): consider moving prioritized job to wait if needed
roggervalf Jun 14, 2023
6b77001
chore: fix test cases
roggervalf Jun 15, 2023
0f98015
chore: use correct variables in addJobWIthPriority
roggervalf Jun 15, 2023
18b3c1d
refactor(priority): set marker when wait len is 0
roggervalf Jun 15, 2023
499017f
chore: remove not needed check for last job
roggervalf Jun 15, 2023
933dc97
docs: add better description for addPriorityMarkerIfNeeded
roggervalf Jun 15, 2023
5c3db07
chore: update python pause script
roggervalf Jun 15, 2023
e36049c
chore: delete extra param
roggervalf Jun 15, 2023
be80d90
refactor: use timestamp for fifo order in priority zset
roggervalf Jun 17, 2023
b28fa1b
refactor(priority): consider fifo order with lexicographical order
roggervalf Jun 19, 2023
fb73c30
chore: restore python changelog
roggervalf Jun 19, 2023
220e0a4
chore(python): fix scripts references
roggervalf Jun 19, 2023
f535033
test: fix repeat test case
roggervalf Jun 19, 2023
0be0c0b
fix: flaky test
roggervalf Jun 19, 2023
bbf3afb
refactor: add moveJobFromPriorityToActive include
roggervalf Jun 19, 2023
a5de7da
test(priority): fix flaky test
roggervalf Jun 19, 2023
fab6b3a
chore: merge branch 'master' into better-priority
roggervalf Jun 20, 2023
527eef5
refactor(priority): use priority counter key
roggervalf Jun 20, 2023
9020413
chore: remove extra args
roggervalf Jun 20, 2023
acd0fef
refactor: update params
roggervalf Jun 20, 2023
3d3a8e3
refactor(priority): use counter and priority as score
roggervalf Jun 21, 2023
ab92413
feat(queue): add removePriorityKey method
roggervalf Jun 21, 2023
570ef51
refactor: reset priority counter when prioritized state is empty
roggervalf Jun 21, 2023
6f20687
refactor(drained): consider prioritized length
roggervalf Jun 21, 2023
8fc8fb7
refactor: change methods names updateData and removeDeprecatedPriorit…
roggervalf Jun 21, 2023
4f069c6
chore: fix typo
roggervalf Jun 21, 2023
070af22
refactor(push-back): re-add job at the head of same prioritized jobs
roggervalf Jun 21, 2023
6d13180
chore: address comments
roggervalf Jun 21, 2023
0a53a05
chore: merge branch 'master' into better-priority
roggervalf Jun 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions python/bullmq/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection
"moveToDelayed": self.redisClient.register_script(self.getScript("moveToDelayed-8.lua")),
"moveToFinished": self.redisClient.register_script(self.getScript("moveToFinished-12.lua")),
"obliterate": self.redisClient.register_script(self.getScript("obliterate-2.lua")),
"pause": self.redisClient.register_script(self.getScript("pause-4.lua")),
"pause": self.redisClient.register_script(self.getScript("pause-5.lua")),
"removeJob": self.redisClient.register_script(self.getScript("removeJob-1.lua")),
"reprocessJob": self.redisClient.register_script(self.getScript("reprocessJob-6.lua")),
"retryJob": self.redisClient.register_script(self.getScript("retryJob-8.lua")),
Expand Down Expand Up @@ -231,7 +231,7 @@ def pause(self, pause: bool = True):
"""
src = "wait" if pause else "paused"
dst = "paused" if pause else "wait"
keys = self.getKeys([src, dst, 'meta', 'events'])
keys = self.getKeys([src, dst, 'meta', 'priority', 'events'])
return self.commands["pause"](keys, args=["paused" if pause else "resumed"])

async def obliterate(self, count: int, force: bool = False):
Expand Down
1 change: 1 addition & 0 deletions src/classes/queue-getters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ export class QueueGetters<
'delayed',
'failed',
'paused',
'priority',
'waiting',
'waiting-children',
];
Expand Down
1 change: 1 addition & 0 deletions src/classes/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ export class Queue<
| 'wait'
| 'active'
| 'paused'
| 'priority'
| 'delayed'
| 'failed' = 'completed',
): Promise<string[]> {
Expand Down
4 changes: 2 additions & 2 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ export class Scripts {
dst = 'wait';
}

const keys = [src, dst, 'meta'].map((name: string) =>
const keys = [src, dst, 'meta', 'priority'].map((name: string) =>
this.queue.toKey(name),
);

Expand Down Expand Up @@ -837,7 +837,7 @@ export class Scripts {
const args: (string | number | boolean | Buffer)[] = [
queueKeys[''],
Date.now(),
jobId,
jobId || '',
pack({
token,
lockDuration: opts.lockDuration,
Expand Down
5 changes: 2 additions & 3 deletions src/commands/addJob-8.lua
Original file line number Diff line number Diff line change
Expand Up @@ -145,16 +145,15 @@ elseif (delayedTimestamp ~= 0) then
local target = getTargetQueueList(KEYS[3], KEYS[1], KEYS[2])
addDelayMarkerIfNeeded(target, KEYS[5])
else
local target = getTargetQueueList(KEYS[3], KEYS[1], KEYS[2])
local target, paused = getTargetQueueList(KEYS[3], KEYS[1], KEYS[2])

-- Standard or priority add
if priority == 0 then
-- LIFO or FIFO
local pushCmd = opts['lifo'] and 'RPUSH' or 'LPUSH'
rcall(pushCmd, target, jobId)
else
-- Priority add
addJobWithPriority(KEYS[6], priority, target, jobId)
addJobWithPriority(KEYS[1], KEYS[6], priority, paused, jobId)
end
-- Emit waiting event
rcall("XADD", KEYS[8], "*", "event", "waiting", "jobId", jobId)
Expand Down
34 changes: 19 additions & 15 deletions src/commands/changePriority-4.lua
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,25 @@ local rcall = redis.call
--- @include "includes/getTargetQueueList"

if rcall("EXISTS", jobKey) == 1 then
local target = getTargetQueueList(KEYS[3], KEYS[1], KEYS[2])

local numRemovedElements = rcall("LREM", target, -1, jobId)
if numRemovedElements > 0 then
rcall("ZREM", KEYS[4], jobId)

-- Standard or priority add
if priority == 0 then
-- LIFO or FIFO
local pushCmd = ARGV[4] == '1' and 'RPUSH' or 'LPUSH';
rcall(pushCmd, target, jobId)
else
-- Priority add
addJobWithPriority(KEYS[4], priority, target, jobId)
end
local target, paused = getTargetQueueList(KEYS[3], KEYS[1], KEYS[2])

local isPrioritized = rcall("ZREM", KEYS[4], jobId) > 0
if isPrioritized then
-- Priority add
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need for the comment as the function is descriptive enough

addJobWithPriority(KEYS[1], KEYS[4], priority, paused, jobId)
else
local numRemovedElements = rcall("LREM", target, -1, jobId)
if numRemovedElements > 0 then
-- Standard or priority add
if priority == 0 then
-- LIFO or FIFO
local pushCmd = ARGV[4] == '1' and 'RPUSH' or 'LPUSH';
rcall(pushCmd, target, jobId)
else
-- Priority add
addJobWithPriority(KEYS[1], KEYS[4], priority, paused, jobId)
end
end
end

rcall("HSET", jobKey, "priority", priority)
Expand Down
2 changes: 2 additions & 0 deletions src/commands/cleanJobsInSet-2.lua
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ if ARGV[4] == "active" then
result = cleanList(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], false)
elseif ARGV[4] == "delayed" then
result = cleanSet(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], limit, {"processedOn", "timestamp"})
elseif ARGV[4] == "priority" then
result = cleanSet(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], limit, {"timestamp"})
elseif ARGV[4] == "wait" or ARGV[4] == "paused" then
result = cleanList(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], true)
else
Expand Down
2 changes: 1 addition & 1 deletion src/commands/drain-4.lua
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ if KEYS[3] ~= "" then
removeZSetJobs(KEYS[3], true, queueBaseKey, 0) --delayed
end

rcall("DEL", KEYS[4])
removeZSetJobs(KEYS[4], true, queueBaseKey, 0) --priority
15 changes: 6 additions & 9 deletions src/commands/includes/addJobWithPriority.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,12 @@
Function to add job considering priority.
]]

local function addJobWithPriority(priorityKey, priority, targetKey, jobId)
rcall("ZADD", priorityKey, priority, jobId)
local count = rcall("ZCOUNT", priorityKey, 0, priority)
-- Includes
--- @include "addPriorityMarkerIfNeeded"

local len = rcall("LLEN", targetKey)
local id = rcall("LINDEX", targetKey, len - (count - 1))
if id then
rcall("LINSERT", targetKey, "BEFORE", id, jobId)
else
rcall("RPUSH", targetKey, jobId)
local function addJobWithPriority(waitKey, priorityKey, priority, paused, jobId)
rcall("ZADD", priorityKey, priority, jobId)
if not paused then
addPriorityMarkerIfNeeded(waitKey)
end
end
12 changes: 12 additions & 0 deletions src/commands/includes/addPriorityMarkerIfNeeded.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
--[[
Function priority marker to wait if needed
in order to wake up our workers and to respect priority
order as much as possible
]]
local function addPriorityMarkerIfNeeded(waitKey)
local waitLen = rcall("LLEN", waitKey)

if waitLen == 0 then
rcall("LPUSH", waitKey, "0:0")
end
end
3 changes: 0 additions & 3 deletions src/commands/includes/cleanList.lua
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ local function cleanList(listKey, jobKeyPrefix, rangeStart, rangeEnd,
-- occur at the end of the script
rcall("LSET", listKey, rangeEnd - jobIdsLen + i, deletionMarker)
removeJob(job, true, jobKeyPrefix)
if isWaiting then
rcall("ZREM", jobKeyPrefix .. "priority", job)
end
deletedCount = deletedCount + 1
table.insert(deleted, job)
end
Expand Down
2 changes: 1 addition & 1 deletion src/commands/includes/getRateLimitTTL.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ local function getRateLimitTTL(maxJobs, rateLimiterKey)
if maxJobs then
local pttl = rcall("PTTL", rateLimiterKey)

if pttl <= 0 then
if pttl == 0 then
rcall("DEL", rateLimiterKey)
end

Expand Down
32 changes: 21 additions & 11 deletions src/commands/includes/moveJobFromWaitToActive.lua
Original file line number Diff line number Diff line change
Expand Up @@ -17,39 +17,49 @@
opts - limiter
]]

-- Includes
--- @include "addJobWithPriority"

local function moveJobFromWaitToActive(keys, keyPrefix, targetKey, jobId, processedOn,
maxJobs, expireTime, opts)
maxJobs, expireTime, paused, opts)
local jobKey = keyPrefix .. jobId
-- Check if we need to perform rate limiting.
if maxJobs then
local rateLimiterKey = keys[6];
local jobCounter = tonumber(rcall("INCR", rateLimiterKey))

if jobCounter == 1 then
local limiterDuration = opts['limiter'] and opts['limiter']['duration']
local integerDuration = math.floor(math.abs(limiterDuration))
rcall("PEXPIRE", rateLimiterKey, integerDuration)
end

-- check if we passed rate limit, we need to remove the job and return expireTime
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should rename "passed" to "exceeded" here.

if expireTime > 0 then
-- remove from active queue and add back to the wait list
rcall("LREM", keys[2], 1, jobId)
rcall("RPUSH", targetKey, jobId)

local priority = tonumber(rcall("HGET", jobKey, "priority")) or 0

if priority == 0 then
rcall("RPUSH", targetKey, jobId)
else
addJobWithPriority(keys[1], keys[3], priority, paused, jobId)
end

-- Return when we can process more jobs
return {0, 0, expireTime, 0}
end

local jobCounter = tonumber(rcall("INCR", rateLimiterKey))

if jobCounter == 1 then
local limiterDuration = opts['limiter'] and opts['limiter']['duration']
local integerDuration = math.floor(math.abs(limiterDuration))
rcall("PEXPIRE", rateLimiterKey, integerDuration)
end
end

local jobKey = keyPrefix .. jobId
local lockKey = jobKey .. ':lock'

-- get a lock
if opts['token'] ~= "0" then
rcall("SET", lockKey, opts['token'], "PX", opts['lockDuration'])
end

rcall("ZREM", keys[3], jobId) -- remove from priority
rcall("XADD", keys[4], "*", "event", "active", "jobId", jobId, "prev", "waiting")
rcall("HSET", jobKey, "processedOn", processedOn)
rcall("HINCRBY", jobKey, "attemptsMade", 1)
Expand Down
12 changes: 7 additions & 5 deletions src/commands/includes/promoteDelayedJobs.lua
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,30 @@
--- @include "addJobWithPriority"

-- Try to get as much as 1000 jobs at once
local function promoteDelayedJobs(delayedKey, targetKey, priorityKey,
eventStreamKey, prefix, timestamp)
local function promoteDelayedJobs(delayedKey, waitKey, targetKey, priorityKey,
eventStreamKey, prefix, timestamp, paused)
local jobs = rcall("ZRANGEBYSCORE", delayedKey, 0, (timestamp + 1) * 0x1000, "LIMIT", 0, 1000)

if (#jobs > 0) then
rcall("ZREM", delayedKey, unpack(jobs))

for _, jobId in ipairs(jobs) do
local jobKey = prefix .. jobId
local priority =
tonumber(rcall("HGET", prefix .. jobId, "priority")) or 0
tonumber(rcall("HGET", jobKey, "priority")) or 0

if priority == 0 then
-- LIFO or FIFO
rcall("LPUSH", targetKey, jobId)
else
addJobWithPriority(priorityKey, priority, targetKey, jobId)
rcall("SET", "DEBUG", "DELAYED")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't forget to remove the debug code 👍

addJobWithPriority(waitKey, priorityKey, priority, paused, jobId)
end

-- Emit waiting event
rcall("XADD", eventStreamKey, "*", "event", "waiting", "jobId",
jobId, "prev", "delayed")
rcall("HSET", prefix .. jobId, "delay", 0)
rcall("HSET", jobKey, "delay", 0)
end
end
end
3 changes: 3 additions & 0 deletions src/commands/includes/removeJobFromAnyState.lua
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ local function removeJobFromAnyState( prefix, jobId)
elseif rcall("ZSCORE", prefix .. "failed", jobId) then
rcall("ZREM", prefix .. "failed", jobId)
return "failed"
elseif rcall("ZSCORE", prefix .. "priority", jobId) then
rcall("ZREM", prefix .. "priority", jobId)
return "priority"
-- We remove only 1 element from the list, since we assume they are not added multiple times
elseif rcall("LREM", prefix .. "wait", 1, jobId) == 1 then
return "wait"
Expand Down
5 changes: 3 additions & 2 deletions src/commands/includes/updateParentDepsIfNeeded.lua
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ local function updateParentDepsIfNeeded(parentKey, parentQueueKey, parentDepende
local activeParent = rcall("ZSCORE", parentQueueKey .. ":waiting-children", parentId)
if rcall("SCARD", parentDependenciesKey) == 0 and activeParent then
rcall("ZREM", parentQueueKey .. ":waiting-children", parentId)
local parentTarget = getTargetQueueList(parentQueueKey .. ":meta", parentQueueKey .. ":wait",
local parentWaitKey = parentQueueKey .. ":wait"
local parentTarget, paused = getTargetQueueList(parentQueueKey .. ":meta", parentWaitKey,
parentQueueKey .. ":paused")
local jobAttributes = rcall("HMGET", parentKey, "priority", "delay")
local priority = tonumber(jobAttributes[1]) or 0
Expand All @@ -30,7 +31,7 @@ local function updateParentDepsIfNeeded(parentKey, parentQueueKey, parentDepende
elseif priority == 0 then
rcall("RPUSH", parentTarget, parentId)
else
addJobWithPriority(parentQueueKey .. ":priority", priority, parentTarget, parentId)
addJobWithPriority(parentWaitKey, parentQueueKey .. ":priority", priority, paused, parentId)
end

rcall("XADD", parentQueueKey .. ":events", "*", "event", "waiting", "jobId", parentId, "prev", "waiting-children")
Expand Down
4 changes: 2 additions & 2 deletions src/commands/moveJobFromActiveToWait-9.lua
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ local pttl = rcall("PTTL", KEYS[7])
if lockToken == token and pttl > 0 then
local removed = rcall("LREM", KEYS[1], 1, jobId)
if (removed > 0) then
local target = getTargetQueueList(KEYS[6], KEYS[2], KEYS[5])
local target, paused = getTargetQueueList(KEYS[6], KEYS[2], KEYS[5])

rcall("SREM", KEYS[3], jobId)

local priority = tonumber(rcall("HGET", ARGV[3], "priority")) or 0

if priority > 0 then
addJobWithPriority(KEYS[8], priority, target, jobId)
addJobWithPriority(KEYS[2], KEYS[8], priority, paused, jobId)
else
rcall("RPUSH", target, jobId)
end
Expand Down
28 changes: 24 additions & 4 deletions src/commands/moveToActive-9.lua
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ local rcall = redis.call
--- @include "includes/getTargetQueueList"
--- @include "includes/promoteDelayedJobs"

local target = getTargetQueueList(KEYS[9], KEYS[1], KEYS[8])
local target, paused = getTargetQueueList(KEYS[9], KEYS[1], KEYS[8])

-- Check if there are delayed jobs that we can move to wait.
promoteDelayedJobs(KEYS[7], target, KEYS[3], KEYS[4], ARGV[1], ARGV[2])
promoteDelayedJobs(KEYS[7], KEYS[1], target, KEYS[3], KEYS[4], ARGV[1], ARGV[2], paused)

local opts = cmsgpack.unpack(ARGV[4])
local maxJobs = tonumber(opts['limiter'] and opts['limiter']['max'])
Expand All @@ -59,6 +59,9 @@ else
return { 0, 0, expireTime, 0 }
end

-- paused queue
if paused then return {0, 0, 0, 0} end

-- no job ID, try non-blocking move from wait to active
jobId = rcall("RPOPLPUSH", KEYS[1], KEYS[2])
end
Expand All @@ -72,6 +75,9 @@ if jobId then
return { 0, 0, expireTime, 0 }
end

-- paused queue
if paused then return {0, 0, 0, 0} end

-- Move again since we just got the marker job.
jobId = rcall("RPOPLPUSH", KEYS[1], KEYS[2])

Expand All @@ -85,14 +91,28 @@ if jobId then

if jobId then
-- this script is not really moving, it is preparing the job for processing
return moveJobFromWaitToActive(KEYS, ARGV[1], target, jobId, ARGV[2], maxJobs, expireTime, opts)
return moveJobFromWaitToActive(KEYS, ARGV[1], target, jobId, ARGV[2], maxJobs, expireTime, paused, opts)
else
local prioritizedJob = rcall("ZPOPMIN", KEYS[3])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be refactored.

if #prioritizedJob > 0 then
jobId = prioritizedJob[1]
rcall("LPUSH", KEYS[2], jobId)
return moveJobFromWaitToActive(KEYS, ARGV[1], target, jobId, ARGV[2], maxJobs, expireTime, paused, opts)
end
end
else
local prioritizedJob = rcall("ZPOPMIN", KEYS[3])
if #prioritizedJob > 0 then
jobId = prioritizedJob[1]
rcall("LPUSH", KEYS[2], jobId)
return moveJobFromWaitToActive(KEYS, ARGV[1], target, jobId, ARGV[2], maxJobs, expireTime, paused, opts)
end
end

-- Return the timestamp for the next delayed job if any.
local nextTimestamp = getNextDelayedTimestamp(KEYS[7])
if (nextTimestamp ~= nil) then
return { 0, 0, 0, nextTimestamp}
return { 0, 0, 0, nextTimestamp }
end

return { 0, 0, 0, 0}