Skip to content

Commit

Permalink
feat(python): use new queue markers
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Dec 3, 2023
1 parent 0bac0fb commit 4276eb7
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 41 deletions.
2 changes: 1 addition & 1 deletion python/bullmq/queue_keys.py
Expand Up @@ -8,7 +8,7 @@ def __init__(self, prefix: str = 'bull'):

def getKeys(self, name: str):
names = ["", "active", "wait", "waiting-children", "paused", "completed", "failed", "delayed",
"stalled", "limiter", "prioritized", "id", "stalled-check", "meta", "pc", "events"]
"stalled", "limiter", "prioritized", "id", "stalled-check", "meta", "pc", "events", "marker"]
keys = {}
for name_type in names:
keys[name_type] = self.toKey(name, name_type)
Expand Down
52 changes: 27 additions & 25 deletions python/bullmq/scripts.py
Expand Up @@ -31,31 +31,31 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection
self.redisConnection = redisConnection
self.redisClient = redisConnection.conn
self.commands = {
"addStandardJob": self.redisClient.register_script(self.getScript("addStandardJob-6.lua")),
"addDelayedJob": self.redisClient.register_script(self.getScript("addDelayedJob-7.lua")),
"addStandardJob": self.redisClient.register_script(self.getScript("addStandardJob-7.lua")),
"addDelayedJob": self.redisClient.register_script(self.getScript("addDelayedJob-6.lua")),
"addParentJob": self.redisClient.register_script(self.getScript("addParentJob-4.lua")),
"addPrioritizedJob": self.redisClient.register_script(self.getScript("addPrioritizedJob-8.lua")),
"changePriority": self.redisClient.register_script(self.getScript("changePriority-5.lua")),
"addPrioritizedJob": self.redisClient.register_script(self.getScript("addPrioritizedJob-7.lua")),
"changePriority": self.redisClient.register_script(self.getScript("changePriority-6.lua")),
"cleanJobsInSet": self.redisClient.register_script(self.getScript("cleanJobsInSet-2.lua")),
"extendLock": self.redisClient.register_script(self.getScript("extendLock-2.lua")),
"getCounts": self.redisClient.register_script(self.getScript("getCounts-1.lua")),
"getRanges": self.redisClient.register_script(self.getScript("getRanges-1.lua")),
"getCounts": self.redisClient.register_script(self.getScript("getCounts-1.lua")), #
"getRanges": self.redisClient.register_script(self.getScript("getRanges-1.lua")), #
"getState": self.redisClient.register_script(self.getScript("getState-8.lua")),
"getStateV2": self.redisClient.register_script(self.getScript("getStateV2-8.lua")),
"moveStalledJobsToWait": self.redisClient.register_script(self.getScript("moveStalledJobsToWait-8.lua")),
"moveToActive": self.redisClient.register_script(self.getScript("moveToActive-10.lua")),
"moveToDelayed": self.redisClient.register_script(self.getScript("moveToDelayed-8.lua")),
"moveToFinished": self.redisClient.register_script(self.getScript("moveToFinished-13.lua")),
"moveToActive": self.redisClient.register_script(self.getScript("moveToActive-11.lua")),
"moveToDelayed": self.redisClient.register_script(self.getScript("moveToDelayed-7.lua")),
"moveToFinished": self.redisClient.register_script(self.getScript("moveToFinished-14.lua")),
"moveToWaitingChildren": self.redisClient.register_script(self.getScript("moveToWaitingChildren-4.lua")),
"obliterate": self.redisClient.register_script(self.getScript("obliterate-2.lua")),
"pause": self.redisClient.register_script(self.getScript("pause-5.lua")),
"pause": self.redisClient.register_script(self.getScript("pause-7.lua")),
"removeJob": self.redisClient.register_script(self.getScript("removeJob-1.lua")),
"reprocessJob": self.redisClient.register_script(self.getScript("reprocessJob-6.lua")),
"retryJob": self.redisClient.register_script(self.getScript("retryJob-9.lua")),
"retryJob": self.redisClient.register_script(self.getScript("retryJob-10.lua")),
"moveJobsToWait": self.redisClient.register_script(self.getScript("moveJobsToWait-6.lua")),
"saveStacktrace": self.redisClient.register_script(self.getScript("saveStacktrace-1.lua")),
"updateData": self.redisClient.register_script(self.getScript("updateData-1.lua")),
"updateProgress": self.redisClient.register_script(self.getScript("updateProgress-2.lua")),
"updateProgress": self.redisClient.register_script(self.getScript("updateProgress-3.lua")), #
}

self.queue_keys = QueueKeys(prefix)
Expand Down Expand Up @@ -116,7 +116,7 @@ def addStandardJob(self, job: Job, timestamp: int, pipe = None):
Add a standard job to the queue
"""
keys = self.getKeys(['wait', 'paused', 'meta', 'id',
'completed', 'events'])
'completed', 'events', 'marker'])
args = self.addJobArgs(job, None)
args.append(timestamp)

Expand All @@ -126,7 +126,7 @@ def addDelayedJob(self, job: Job, timestamp: int, pipe = None):
"""
Add a delayed job to the queue
"""
keys = self.getKeys(['wait', 'paused', 'meta', 'id',
keys = self.getKeys(['marker', 'meta', 'id',
'delayed', 'completed', 'events'])
args = self.addJobArgs(job, None)
args.append(timestamp)
Expand All @@ -137,7 +137,7 @@ def addPrioritizedJob(self, job: Job, timestamp: int, pipe = None):
"""
Add a prioritized job to the queue
"""
keys = self.getKeys(['wait', 'paused', 'meta', 'id',
keys = self.getKeys(['marker', 'meta', 'id',
'prioritized', 'completed', 'events', 'pc'])
args = self.addJobArgs(job, None)
args.append(timestamp)
Expand Down Expand Up @@ -247,6 +247,7 @@ def retryJobArgs(self, job_id: str, lifo: bool, token: str):
keys.append(self.keys['delayed'])
keys.append(self.keys['prioritized'])
keys.append(self.keys['pc'])
keys.append(self.keys['marker'])

push_cmd = "R" if lifo else "L"

Expand All @@ -261,10 +262,9 @@ def moveToDelayedArgs(self, job_id: str, timestamp: int, token: str, delay: int
if timestamp > 0:
max_timestamp = max_timestamp * 0x1000 + (convert_to_int(job_id) & 0xfff)

keys = self.getKeys(['wait', 'active', 'prioritized', 'delayed'])
keys = self.getKeys(['marker', 'active', 'prioritized', 'delayed'])
keys.append(self.toKey(job_id))
keys.append(self.keys['events'])
keys.append(self.keys['paused'])
keys.append(self.keys['meta'])

args = [self.keys[''], round(time.time() * 1000), str(max_timestamp),
Expand Down Expand Up @@ -315,7 +315,8 @@ async def changePriority(self, job_id: str, priority:int = 0, lifo:bool = False)
self.keys['paused'],
self.keys['meta'],
self.keys['prioritized'],
self.keys['pc']]
self.keys['pc'],
self.keys['marker']]

args = [priority, self.toKey(job_id), job_id, 1 if lifo else 0]

Expand Down Expand Up @@ -366,7 +367,7 @@ def pause(self, pause: bool = True):
"""
src = "wait" if pause else "paused"
dst = "paused" if pause else "wait"
keys = self.getKeys([src, dst, 'meta', 'prioritized', 'events'])
keys = self.getKeys([src, dst, 'meta', 'prioritized', 'events', 'delayed', 'marker'])
return self.commands["pause"](keys, args=["paused" if pause else "resumed"])

async def obliterate(self, count: int, force: bool = False):
Expand All @@ -392,7 +393,7 @@ async def retryJobs(self, state: str, count: int, timestamp: int):
result = await self.commands["moveJobsToWait"](keys=keys, args=[count or 1000, timestamp or round(time.time()*1000), current_state])
return result

async def moveToActive(self, token: str, opts: dict, jobId: str = None) -> list[Any]:
async def moveToActive(self, token: str, opts: dict) -> list[Any]:
"""
Add an item to the queue
"""
Expand All @@ -401,10 +402,10 @@ async def moveToActive(self, token: str, opts: dict, jobId: str = None) -> list[
limiter = opts.get("limiter", None)

keys = self.getKeys(['wait', 'active', 'prioritized', 'events',
'stalled', 'limiter', 'delayed', 'paused', 'meta', 'pc'])
'stalled', 'limiter', 'delayed', 'paused', 'meta', 'pc', 'marker'])
packedOpts = msgpack.packb(
{"token": token, "lockDuration": lockDuration, "limiter": limiter}, use_bin_type=True)
args = [self.keys[''], timestamp, jobId or "", packedOpts]
args = [self.keys[''], timestamp, packedOpts]

result = await self.commands["moveToActive"](keys=keys, args=args)

Expand All @@ -417,7 +418,7 @@ def moveToFailed(self, job: Job, failedReason: str, removeOnFailed, token: str,
return self.moveToFinished(job, failedReason, "failedReason", removeOnFailed, "failed", token, opts, fetchNext)

async def updateProgress(self, job_id: str, progress):
keys = [self.toKey(job_id), self.keys['events']]
keys = [self.toKey(job_id), self.keys['events'], self.keys['meta']]
progress_json = json.dumps(progress, separators=(',', ':'))
args = [job_id, progress_json]
result = await self.commands["updateProgress"](keys=keys, args=args)
Expand All @@ -436,6 +437,7 @@ def moveToFinishedArgs(self, job: Job, val: Any, propVal: str, shouldRemove, tar
'stalled', 'limiter', 'delayed', 'paused', 'meta', 'pc', target])
keys.append(self.toKey(job.id))
keys.append(metricsKey)
keys.append(self.keys['marker'])

def getKeepJobs(shouldRemove: bool | dict | int | None):
if type(shouldRemove) == int:
Expand Down Expand Up @@ -475,7 +477,7 @@ def getFailParentOnFailure(job: Job):
}, use_bin_type=True)

args = [job.id, timestamp, propVal, transformed_value or "", target, "",
fetchNext and "fetch" or "", self.keys[''], packedOpts]
fetchNext and "1" or "", self.keys[''], packedOpts]
return (keys, args)

def moveToFailedArgs(self, job: Job, failed_reason: str, shouldRemove, token: str, opts: dict, fetchNext=True):
Expand All @@ -490,7 +492,7 @@ async def moveToFinished(self, job: Job, val: Any, propVal: str, shouldRemove, t
result = await self.commands["moveToFinished"](keys=keys, args=args)

if result is not None:
if result < 0:
if type(result) == int and result < 0:
raise self.finishedErrors(result, job.id, 'finished', 'active')
return raw2NextJobData(result)
return None
Expand Down
33 changes: 20 additions & 13 deletions python/bullmq/worker.py
Expand Up @@ -99,25 +99,24 @@ async def getNextJob(self, token: str):
@param token: worker token to be assigned to retrieved job
@returns a Job or undefined if no job was available in the queue.
"""

if not self.waiting:
if not self.waiting and self.drained:
self.waiting = self.waitForJob()

try:
job_id = await self.waiting
job_instance = await self.moveToActive(token, job_id)
return job_instance
self.blockUntil = await self.waiting
timestamp = int(time.time() * 1000)

if self.blockUntil <= 0 or self.blockUntil <= timestamp:
job_instance = await self.moveToActive(token)
return job_instance
finally:
self.waiting = None
else:
job_instance = await self.moveToActive(token)
return job_instance

async def moveToActive(self, token: str, job_id: str = None):
if job_id and job_id.startswith('0:'):
self.blockUntil = int(job_id.split(':')[1]) or 0

result = await self.scripts.moveToActive(token, self.opts, job_id)
async def moveToActive(self, token: str):
result = await self.scripts.moveToActive(token, self.opts)
job_data = None
id = None
limit_until = None
Expand Down Expand Up @@ -154,16 +153,24 @@ async def waitForJob(self):
# Only Redis v6.0.0 and above supports doubles as block time
timeout = int(math.ceil(timeout)) if isRedisVersionLowerThan(redis_version, '6.0.0') else timeout

job_id = await self.bclient.brpoplpush(self.scripts.keys["wait"], self.scripts.keys["active"], timeout)
result = await self.bclient.bzpopmin(self.scripts.keys["marker"], timeout)
if result:
[_key, member, score] = result

return job_id
if member:
return int(score)
else:
return 0
return 0

async def processJob(self, job: Job, token: str):
try:
self.jobs.add((job, token))
result = await self.processor(job, token)
if not self.forceClosing:
await self.scripts.moveToCompleted(job, result, job.opts.get("removeOnComplete", False), token, self.opts, fetchNext=not self.closing)
# Currently we do not support pre-fetching jobs as in NodeJS version.
# nextJob = await self.scripts.moveToCompleted(job, result, job.opts.get("removeOnComplete", False), token, self.opts, fetchNext=not self.closing)
await self.scripts.moveToCompleted(job, result, job.opts.get("removeOnComplete", False), token, self.opts, fetchNext=False)
job.returnvalue = result
self.emit("completed", job, result)
except WaitingChildrenError:
Expand Down
5 changes: 3 additions & 2 deletions src/commands/moveToFinished-14.lua
Expand Up @@ -121,8 +121,9 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists

if (numRemovedElements < 1) then return -3 end

local metaKey = KEYS[9]
-- Trim events before emiting them to avoid trimming events emitted in this script
trimEvents(KEYS[9], KEYS[4])
trimEvents(metaKey, KEYS[4])

-- If job has a parent we need to
-- 1) remove this job id from parents dependencies
Expand Down Expand Up @@ -201,7 +202,7 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists
-- and not rate limited.
if (ARGV[7] == "1") then

local target, paused = getTargetQueueList(KEYS[9], KEYS[1], KEYS[8])
local target, paused = getTargetQueueList(metaKey, KEYS[1], KEYS[8])

-- Check if there are delayed jobs that can be promoted
promoteDelayedJobs(KEYS[7], KEYS[14], target, KEYS[3], KEYS[4], ARGV[8],
Expand Down

0 comments on commit 4276eb7

Please sign in to comment.