Skip to content

Commit

Permalink
feat(job): add isWaiting method [python] (#2328)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed Dec 13, 2023
1 parent 0365b3f commit 5db9f95
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 5 deletions.
9 changes: 5 additions & 4 deletions python/bullmq/job.py
Expand Up @@ -92,16 +92,17 @@ async def remove(self, opts: dict = {}):
def isDelayed(self):
return self.isInZSet('delayed')

async def isInZSet(self, set: str):
score = await self.queue.client.zscore(self.scripts.toKey(set), self.id)

return score is not None
async def isWaiting(self):
return ( await self.isInList('wait') or await self.isInList('paused'))

async def isInZSet(self, set: str):
score = await self.queue.client.zscore(self.scripts.toKey(set), self.id)

return score is not None

def isInList(self, list_name: str):
return self.scripts.isJobInList(self.scripts.toKey(list_name), self.id)

async def moveToFailed(self, err, token:str, fetchNext:bool = False):
error_message = str(err)
self.failedReason = error_message
Expand Down
13 changes: 12 additions & 1 deletion python/bullmq/scripts.py
Expand Up @@ -42,6 +42,7 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection
"getRanges": self.redisClient.register_script(self.getScript("getRanges-1.lua")),
"getState": self.redisClient.register_script(self.getScript("getState-8.lua")),
"getStateV2": self.redisClient.register_script(self.getScript("getStateV2-8.lua")),
"isJobInList": self.redisClient.register_script(self.getScript("isJobInList-1.lua")),
"moveStalledJobsToWait": self.redisClient.register_script(self.getScript("moveStalledJobsToWait-8.lua")),
"moveToActive": self.redisClient.register_script(self.getScript("moveToActive-10.lua")),
"moveToDelayed": self.redisClient.register_script(self.getScript("moveToDelayed-8.lua")),
Expand Down Expand Up @@ -317,7 +318,7 @@ def getCounts(self, types):

return self.commands["getCounts"](keys=keys, args=transformed_types)

async def getState(self, job_id):
async def getState(self, job_id: str):
keys = self.getKeys(['completed', 'failed', 'delayed', 'active', 'wait',
'paused', 'waiting-children', 'prioritized'])

Expand All @@ -332,6 +333,16 @@ async def getState(self, job_id):
result = await self.commands["getStateV2"](keys=keys, args=args)
return result

async def isJobInList(self, list_key: str, job_id: str):
redis_version = await self.redisConnection.getRedisVersion()

result = None
if isRedisVersionLowerThan(redis_version, '6.0.6'):
result = await self.commands["getState"](keys=[list_key], args=[job_id])
else:
result = await self.redisClient.lpos(list_key, job_id)
return isinstance(result, int)

async def changePriority(self, job_id: str, priority:int = 0, lifo:bool = False):
keys = [self.keys['wait'],
self.keys['paused'],
Expand Down
2 changes: 2 additions & 0 deletions python/tests/job_tests.py
Expand Up @@ -76,6 +76,8 @@ async def test_promote_delayed_job(self):
self.assertEqual(job.delay, 0)
isDelayedAfterPromote = await job.isDelayed()
self.assertEqual(isDelayedAfterPromote, False)
isWaiting = await job.isWaiting()
self.assertEqual(isWaiting, True)

await queue.close()

Expand Down

0 comments on commit 5db9f95

Please sign in to comment.