Skip to content

Commit

Permalink
feat(python): add updateProgress method in job class(#1830)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed Apr 21, 2023
1 parent 182b4bb commit e1e1aa2
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 12 deletions.
2 changes: 1 addition & 1 deletion python/README.md
Expand Up @@ -18,7 +18,7 @@ have been ported so far:

- [x] Workers
- [ ] Job events.
- [ ] Job progress.
- [x] Job progress.
- [ ] Job retries.
- [ ] Job backoff.
- [x] Getters.
Expand Down
18 changes: 13 additions & 5 deletions python/bullmq/job.py
@@ -1,5 +1,8 @@
from redis import Redis
from typing import List, Any
from __future__ import annotations
from typing import List, Any, TYPE_CHECKING
from bullmq.scripts import Scripts
if TYPE_CHECKING:
from bullmq.queue import Queue
from bullmq.types import JobOptions

import json
Expand All @@ -22,7 +25,7 @@ class Job:
A Job instance is also passed to the Worker's process function.
"""

def __init__(self, client: Redis, name: str, data: Any, opts: JobOptions = {}):
def __init__(self, queue: Queue, name: str, data: Any, opts: JobOptions = {}):
self.name = name
self.id = opts.get("jobId", None)
self.progress = 0
Expand All @@ -40,9 +43,14 @@ def __init__(self, client: Redis, name: str, data: Any, opts: JobOptions = {}):
self.failedReason = None
self.repeatJobKey = None
self.stacktrace: List[str] = []
self.scripts = Scripts(queue.prefix, queue.name, queue.redisConnection.conn)

def updateProgress(self, progress):
self.progress = progress
return self.scripts.updateProgress(self.id, progress)


def fromJSON(client: Redis, rawData: dict, jobId: str | None = None):
def fromJSON(queue: Queue, rawData: dict, jobId: str | None = None):
"""
Instantiates a Job from a JobJsonRaw object (coming from a deserialized JSON object)
Expand All @@ -53,7 +61,7 @@ def fromJSON(client: Redis, rawData: dict, jobId: str | None = None):
data = json.loads(rawData.get("data", '{}'))
opts = optsFromJSON(json.loads(rawData.get("opts", '{}')))

job = Job(client, rawData.get("name"), data, opts)
job = Job(queue, rawData.get("name"), data, opts)
job.id = jobId or rawData.get("id", b'').decode("utf-8")

job.progress = json.loads(rawData.get("progress", '0'))
Expand Down
4 changes: 2 additions & 2 deletions python/bullmq/queue.py
Expand Up @@ -28,7 +28,7 @@ async def add(self, name: str, data, opts: JobOptions = {}):
@param data: Arbitrary data to append to the job.
@param opts: Job options that affects how the job is going to be processed.
"""
job = Job(self.client, name, data, opts)
job = Job(self, name, data, opts)
job_id = await self.scripts.addJob(job)
job.id = job_id
return job
Expand Down Expand Up @@ -152,6 +152,6 @@ def close(self):
async def fromId(queue: Queue, jobId: str):
key = f"{queue.prefix}:{queue.name}:{jobId}"
raw_data = await queue.client.hgetall(key)
return Job.fromJSON(queue.client, raw_data, jobId)
return Job.fromJSON(queue, raw_data, jobId)

Job.fromId = staticmethod(fromId)
21 changes: 18 additions & 3 deletions python/bullmq/scripts.py
Expand Up @@ -2,10 +2,13 @@
This class is used to load and execute Lua scripts.
It is a wrapper around the Redis client.
"""
from typing import Any

from __future__ import annotations
from redis import Redis
from bullmq.job import Job
from bullmq.error_code import ErrorCode
from typing import Any, TYPE_CHECKING
if TYPE_CHECKING:
from bullmq.job import Job

import time
import json
Expand All @@ -25,14 +28,15 @@ def __init__(self, prefix: str, queueName: str, redisClient: Redis):
self.redisClient = redisClient
self.commands = {
"addJob": redisClient.register_script(self.getScript("addJob-8.lua")),
"extendLock": redisClient.register_script(self.getScript("extendLock-2.lua")),
"getCounts": redisClient.register_script(self.getScript("getCounts-1.lua")),
"obliterate": redisClient.register_script(self.getScript("obliterate-2.lua")),
"pause": redisClient.register_script(self.getScript("pause-4.lua")),
"moveToActive": redisClient.register_script(self.getScript("moveToActive-9.lua")),
"moveToFinished": redisClient.register_script(self.getScript("moveToFinished-12.lua")),
"extendLock": redisClient.register_script(self.getScript("extendLock-2.lua")),
"moveStalledJobsToWait": redisClient.register_script(self.getScript("moveStalledJobsToWait-8.lua")),
"retryJobs": redisClient.register_script(self.getScript("retryJobs-6.lua")),
"updateProgress": redisClient.register_script(self.getScript("updateProgress-2.lua")),
}

# loop all the names and add them to the keys object
Expand Down Expand Up @@ -148,6 +152,17 @@ def moveToCompleted(self, job: Job, val: Any, removeOnComplete, token: str, opts
def moveToFailed(self, job: Job, failedReason: str, removeOnFailed, token: str, opts: dict, fetchNext=True):
return self.moveToFinished(job, failedReason, "failedReason", removeOnFailed, "failed", token, opts, fetchNext)

async def updateProgress(self, job_id: str, progress):
keys = [self.toKey(job_id), self.keys['events']]
progress_json = json.dumps(progress, separators=(',', ':'))
args = [job_id, progress_json]
result = await self.commands["updateProgress"](keys=keys, args=args)

if result is not None:
if result < 0:
raise finishedErrors(result, job_id, 'updateProgress')
return None

async def moveToFinished(self, job: Job, val: Any, propVal: str, shouldRemove, target, token: str, opts: dict, fetchNext=True) -> list[Any] | None:
timestamp = round(time.time() * 1000)
metricsKey = self.toKey('metrics:' + target)
Expand Down
3 changes: 2 additions & 1 deletion python/bullmq/worker.py
Expand Up @@ -27,6 +27,7 @@ def __init__(self, name: str, processor: Callable[[Job, str], asyncio.Future], o
self.blockingRedisConnection = RedisConnection(redis_opts)
self.client = self.redisConnection.conn
self.bclient = self.blockingRedisConnection.conn
self.prefix = opts.get("prefix", "bull")
self.scripts = Scripts(opts.get("prefix", "bull"), name, self.client)
self.closing = False
self.forceClosing = False
Expand Down Expand Up @@ -104,7 +105,7 @@ async def getNextJob(self, token: str):
job, job_id = await self.scripts.moveToActive(token, self.opts, job_id)

if job and job_id:
return Job.fromJSON(self.client, job, job_id)
return Job.fromJSON(self, job, job_id)

async def processJob(self, job: Job, token: str):
try:
Expand Down
1 change: 1 addition & 0 deletions python/run_tests.sh
@@ -1,4 +1,5 @@
#!/bin/bash
redis-cli flushall
python3 -m unittest -v tests.job_tests
python3 -m unittest -v tests.queue_tests
python3 -m unittest -v tests.worker_tests
42 changes: 42 additions & 0 deletions python/tests/job_tests.py
@@ -0,0 +1,42 @@
"""
Tests for job class.
https://bbc.github.io/cloudfit-public-docs/asyncio/testing.html
"""

import unittest

from bullmq import Queue, Job

queueName = "__bullmq_test_queue__"

class TestJob(unittest.IsolatedAsyncioTestCase):

async def asyncSetUp(self):
print("Setting up test queue")
# Delete test queue
queue = Queue(queueName)
await queue.pause()
await queue.obliterate()
await queue.close()

async def test_set_and_get_progress_as_number(self):
queue = Queue(queueName)
job = await queue.add("test-job", {"foo": "bar"}, {})
await job.updateProgress(42)
stored_job = await Job.fromId(queue, job.id)
self.assertEqual(stored_job.progress, 42)

await queue.close()

async def test_set_and_get_progress_as_object(self):
queue = Queue(queueName)
job = await queue.add("test-job", {"foo": "bar"}, {})
await job.updateProgress({"total": 120, "completed": 40})
stored_job = await Job.fromId(queue, job.id)
self.assertEqual(stored_job.progress, {"total": 120, "completed": 40})

await queue.close()

if __name__ == '__main__':
unittest.main()

0 comments on commit e1e1aa2

Please sign in to comment.