Skip to content

Commit

Permalink
feat(python): add changePriority method (#1943)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed Jun 3, 2023
1 parent f391f2a commit 945bcd3
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 1 deletion.
3 changes: 3 additions & 0 deletions python/bullmq/job.py
Expand Up @@ -65,6 +65,9 @@ def retry(self, state: str = "failed"):
def getState(self):
return self.scripts.getState(self.id)

def changePriority(self, opts: dict):
return self.scripts.changePriority(self.id, opts.get("priority", 0), opts.get("lifo", False))

def updateProgress(self, progress):
self.progress = progress
return self.scripts.updateProgress(self.id, progress)
Expand Down
18 changes: 17 additions & 1 deletion python/bullmq/scripts.py
Expand Up @@ -31,6 +31,7 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection
self.redisClient = redisConnection.conn
self.commands = {
"addJob": self.redisClient.register_script(self.getScript("addJob-8.lua")),
"changePriority": self.redisClient.register_script(self.getScript("changePriority-4.lua")),
"extendLock": self.redisClient.register_script(self.getScript("extendLock-2.lua")),
"getCounts": self.redisClient.register_script(self.getScript("getCounts-1.lua")),
"getState": self.redisClient.register_script(self.getScript("getState-7.lua")),
Expand All @@ -39,8 +40,8 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection
"moveToDelayed": self.redisClient.register_script(self.getScript("moveToDelayed-8.lua")),
"moveToFinished": self.redisClient.register_script(self.getScript("moveToFinished-12.lua")),
"moveStalledJobsToWait": self.redisClient.register_script(self.getScript("moveStalledJobsToWait-8.lua")),
"pause": self.redisClient.register_script(self.getScript("pause-4.lua")),
"obliterate": self.redisClient.register_script(self.getScript("obliterate-2.lua")),
"pause": self.redisClient.register_script(self.getScript("pause-4.lua")),
"reprocessJob": self.redisClient.register_script(self.getScript("reprocessJob-6.lua")),
"retryJob": self.redisClient.register_script(self.getScript("retryJob-8.lua")),
"retryJobs": self.redisClient.register_script(self.getScript("retryJobs-6.lua")),
Expand Down Expand Up @@ -168,6 +169,21 @@ async def getState(self, job_id):
result = await self.commands["getStateV2"](keys=keys, args=args)
return result

async def changePriority(self, job_id: str, priority:int = 0, lifo:bool = False):
keys = [self.keys['wait'],
self.keys['paused'],
self.keys['meta'],
self.keys['priority']]

args = [priority, self.toKey(job_id), job_id, 1 if lifo else 0]

result = await self.commands["changePriority"](keys=keys, args=args)

if result is not None:
if result < 0:
raise self.finishedErrors(result, job_id, 'updateData')
return None

async def updateData(self, job_id: str, data):
keys = [self.toKey(job_id)]
data_json = json.dumps(data, separators=(',', ':'))
Expand Down

0 comments on commit 945bcd3

Please sign in to comment.