Skip to content

Commit

Permalink
feat(python): add addBulk method in queue class (#2161)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed Aug 31, 2023
1 parent 1b0ff8e commit 555dd44
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 0 deletions.
38 changes: 38 additions & 0 deletions python/bullmq/queue.py
Expand Up @@ -19,6 +19,7 @@ def __init__(self, name: str, redisOpts: dict | str = {}, opts: QueueBaseOptions
self.redisConnection = RedisConnection(redisOpts)
self.client = self.redisConnection.conn
self.opts = opts
self.jobsOpts = opts.get("defaultJobOptions", {})
self.prefix = opts.get("prefix", "bull")
self.scripts = Scripts(
self.prefix, name, self.redisConnection)
Expand All @@ -41,6 +42,43 @@ async def add(self, name: str, data, opts: JobOptions = {}):
job.id = job_id
return job

async def addBulk(self, jobs: list[dict[str,dict | str]]):
"""
Adds an array of jobs to the queue. This method may be faster than adding
one job at a time in a sequence
"""
jobs_data = []
for job in jobs:
opts = {}
opts.update(self.jobsOpts)
opts.update(job.get("opts", {}))

jobs_data.append({
"name": job.get("name"),
"data": job.get("data"),
"opts": opts
})

result = []
async with self.redisConnection.conn.pipeline(transaction=True) as pipe:
for job_data in jobs_data:
current_job_opts = job_data.get("opts", {})
job = Job(
queue=self,
name=job_data.get("name"),
data=job_data.get("data"),
opts=current_job_opts,
job_id = current_job_opts.get("jobId")
)
job_id = await self.scripts.addJob(job, pipe)
job.id = job_id
result.append(job)
job_ids = await pipe.execute()
for index, job_id in enumerate(job_ids):
result[index].id = job_id

return result

def pause(self):
"""
Pauses the processing of this queue globally.
Expand Down
1 change: 1 addition & 0 deletions python/run_tests.sh
@@ -1,5 +1,6 @@
#!/bin/bash
redis-cli flushall
python3 -m unittest -v tests.bulk_tests
python3 -m unittest -v tests.delay_tests
python3 -m unittest -v tests.flow_tests
python3 -m unittest -v tests.job_tests
Expand Down
70 changes: 70 additions & 0 deletions python/tests/bulk_tests.py
@@ -0,0 +1,70 @@
"""
Tests for add bulk jobs.
https://bbc.github.io/cloudfit-public-docs/asyncio/testing.html
"""

import unittest

from asyncio import Future
from bullmq import Queue, Job, Worker
from uuid import uuid4

queueName = f"__test_queue__{uuid4().hex}"

class TestJob(unittest.IsolatedAsyncioTestCase):

async def asyncSetUp(self):
print("Setting up test queue")
# Delete test queue
queue = Queue(queueName)
await queue.pause()
await queue.obliterate()
await queue.close()

async def test_process_jobs(self):
name = "test"
queue = Queue(queueName)

async def process(job: Job, token: str):
if job.data.get("idx") == 0:
self.assertEqual(job.data.get("foo"), "bar")
else:
self.assertEqual(job.data.get("idx"), 1)
self.assertEqual(job.data.get("foo"), "baz")
return "done"

worker = Worker(queueName, process)

completed_events = Future()

job_count = 1
def completing(job: Job, result):
nonlocal job_count
if job_count == 2:
completed_events.set_result(None)
job_count += 1

worker.on("completed", completing)

jobs = await queue.addBulk(
[
{"name": name, "data": {"idx": 0, "foo": "bar"}},
{"name": name, "data": {"idx": 1, "foo": "baz"}}
]
)

await completed_events

self.assertEqual(len(jobs), 2)

self.assertIsNotNone(jobs[0].id)
self.assertEqual(jobs[0].data.get("foo"),"bar")
self.assertIsNotNone(jobs[1].id)
self.assertEqual(jobs[1].data.get("foo"),"baz")

await queue.close()
await worker.close()

if __name__ == '__main__':
unittest.main()

0 comments on commit 555dd44

Please sign in to comment.