Skip to content

Commit

Permalink
feat(queue): add getJobLogs method [python] (#2523) ref #2472
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed Apr 13, 2024
1 parent deca5d0 commit a24a16e
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 1 deletion.
25 changes: 25 additions & 0 deletions python/bullmq/queue.py
Expand Up @@ -116,6 +116,31 @@ def getRateLimitTtl(self):
"""
return self.client.pttl(self.keys["limiter"])

async def getJobLogs(self, job_id:str, start = 0, end = -1, asc = True):
"""
Returns the logs for a given Job.
@param job_id: The id of the job to get the logs for.
@param start: Zero based index from where to start returning jobs.
@param end: Zero based index where to stop returning jobs.
@param asc: If true, the jobs will be returned in ascending order.
"""

logs_key = self.toKey(job_id + ":logs")
pipe = self.redisConnection.conn.pipeline(transaction=True)
if asc:
pipe.lrange(logs_key, start, end)
else:
pipe.lrange(logs_key, -(end+1), -(start+1))
pipe.llen(logs_key)
result = await pipe.execute()
if not asc:
result[0].reverse()
return {
"logs": result[0],
"count": result[1]
}

async def obliterate(self, force: bool = False):
"""
Completely destroys the queue and all of its contents irreversibly.
Expand Down
1 change: 0 additions & 1 deletion python/bullmq/worker.py
Expand Up @@ -154,7 +154,6 @@ def nextJobFromJobData(self, job_data = None, job_id: str = None, limit_until: i
async def waitForJob(self):
block_timeout = self.getBlockTimeout(self.blockUntil)
block_timeout = block_timeout if self.blockingRedisConnection.capabilities.get("canDoubleTimeout", False) else math.ceil(block_timeout)
block_timeout = min(block_timeout, maximum_block_timeout)

result = await self.bclient.bzpopmin(self.scripts.keys["marker"], block_timeout)
if result:
Expand Down
2 changes: 2 additions & 0 deletions python/tests/job_tests.py
Expand Up @@ -58,6 +58,8 @@ async def test_job_log(self):

self.assertEqual(log_count, 2)

logs = await queue.getJobLogs(job.id)
self.assertEqual(logs, {"logs": ["some log text 1", "some log text 2"], "count": 2})
await queue.close()

async def test_update_job_data(self):
Expand Down

0 comments on commit a24a16e

Please sign in to comment.