Skip to content

Commit

Permalink
fix(python): respect concurrency in worker (#2062) fixes #2063
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed Jul 17, 2023
1 parent 2e30d34 commit 1b95185
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 33 deletions.
4 changes: 2 additions & 2 deletions python/bullmq/scripts.py
Expand Up @@ -286,7 +286,7 @@ async def reprocessJob(self, job: Job, state: str):
keys.append(self.keys['wait'])
keys.append(self.keys['meta'])
keys.append(self.keys['paused'])

args = [
job.id,
("R" if job.opts.get("lifo") else "L") + "PUSH",
Expand Down Expand Up @@ -333,7 +333,7 @@ async def retryJobs(self, state: str, count: int, timestamp: int):
result = await self.commands["retryJobs"](keys=keys, args=[count or 1000, timestamp or round(time.time()*1000), current_state])
return result

async def moveToActive(self, token: str, opts: dict, jobId: str = "") -> list[Any]:
async def moveToActive(self, token: str, opts: dict, jobId: str = None) -> list[Any]:
"""
Add an item to the queue
"""
Expand Down
94 changes: 63 additions & 31 deletions python/bullmq/worker.py
Expand Up @@ -41,6 +41,10 @@ def __init__(self, name: str, processor: Callable[[Job, str], asyncio.Future], o
self.processing = set()
self.jobs = set()
self.id = uuid4().hex
self.waiting = None
self.blockUntil = 0
self.limitUntil = 0
self.drained = False

if opts.get("autorun", True):
asyncio.ensure_future(self.run())
Expand All @@ -59,21 +63,19 @@ async def run(self):
token_postfix = 0

while not self.closed:
if len(jobs) == 0 and len(self.processing) < self.opts.get("concurrency") and not self.closing:
while not self.waiting and len(self.processing) < self.opts.get("concurrency") and not self.closing:
token_postfix+=1
token = f'{self.id}:{token_postfix}'
waiting_job = asyncio.ensure_future(self.getNextJob(token))
self.processing.add(waiting_job)

if len(jobs) > 0:
jobs_to_process = [self.processJob(job, job.token) for job in jobs]
processing_jobs = [asyncio.ensure_future(
j) for j in jobs_to_process]
self.processing.update(processing_jobs)

try:
jobs, pending = await getCompleted(self.processing)

jobs_to_process = [self.processJob(job, job.token) for job in jobs]
processing_jobs = [asyncio.ensure_future(
j) for j in jobs_to_process]
pending.update(processing_jobs)
self.processing = pending

if (len(jobs) == 0 or len(self.processing) == 0) and self.closing:
Expand All @@ -96,34 +98,65 @@ async def getNextJob(self, token: str):
@param token: worker token to be assigned to retrieved job
@returns a Job or undefined if no job was available in the queue.
"""
# First try to move a job from the waiting list to the active list
result = await self.scripts.moveToActive(token, self.opts)
job = None
job_id = None

if not self.waiting:
self.waiting = self.waitForJob()

try:
job_id = await self.waiting
job_instance = await self.moveToActive(token, job_id)
return job_instance
finally:
self.waiting = None
else:
job_instance = await self.moveToActive(token)
return job_instance

async def moveToActive(self, token: str, job_id: str = None):
if job_id and job_id.startswith('0:'):
self.blockUntil = int(job_id.split(':')[1]) or 0

result = await self.scripts.moveToActive(token, self.opts, job_id)
job_data = None
id = None
limit_until = None
delay_until = None

if result:
job, job_id, limit_until, delay_until = result

# If there are no jobs in the waiting list we keep waiting with BRPOPLPUSH
if job is None:
timeout = min(delay_until - int(time.time() * 1000)
if delay_until else 5000, 5000) / 1000

redis_version = await self.blockingRedisConnection.getRedisVersion()
# Only Redis v6.0.0 and above supports doubles as block time
timeout = int(math.ceil(timeout)) if isRedisVersionLowerThan(redis_version, '6.0.0') else timeout

job_id = await self.bclient.brpoplpush(self.scripts.keys["wait"], self.scripts.keys["active"], timeout)
if job_id:
job, job_id, limit_until, delay_until = await self.scripts.moveToActive(token, self.opts, job_id)

if job and job_id:
job_instance = Job.fromJSON(self, job, job_id)
job_data, id, limit_until, delay_until = result

return self.nextJobFromJobData(job_data, id, limit_until, delay_until, token)

def nextJobFromJobData(self, job_data = None, job_id: str = None, limit_until: int = 0,
delay_until: int = 0, token: str = None):
self.limitUntil = max(limit_until, 0) or 0

if not job_data:
if not self.drained:
self.drained = True
self.blockUntil = 0

if delay_until:
self.blockUntil = max(delay_until, 0) or 0

if job_data:
self.drained = False
job_instance = Job.fromJSON(self, job_data, job_id)
job_instance.token = token
return job_instance

async def waitForJob(self):
timeout = max(min(self.blockUntil - int(time.time() * 1000)
if self.blockUntil else 5000, 5000) / 1000, 0.00001)

redis_version = await self.blockingRedisConnection.getRedisVersion()
# Only Redis v6.0.0 and above supports doubles as block time
timeout = int(math.ceil(timeout)) if isRedisVersionLowerThan(redis_version, '6.0.0') else timeout

job_id = await self.bclient.brpoplpush(self.scripts.keys["wait"], self.scripts.keys["active"], timeout)

return job_id

async def processJob(self, job: Job, token: str):
try:
self.jobs.add((job, token))
Expand Down Expand Up @@ -181,12 +214,11 @@ async def close(self, force: bool = False):
"""
Close the worker
"""
self.closing = True
if force:
self.forceClosing = True
self.cancelProcessing()

self.closing = True

await self.blockingRedisConnection.close()
await self.redisConnection.close()

Expand All @@ -196,7 +228,7 @@ def cancelProcessing(self):
job.cancel()


async def getCompleted(task_set: set) -> tuple[list[Job], list]:
async def getCompleted(task_set: set) -> tuple[list[Job], set]:
job_set, pending = await asyncio.wait(task_set, return_when=asyncio.FIRST_COMPLETED)
jobs = [extract_result(job_task) for job_task in job_set]
# we filter `None` out to remove:
Expand Down
40 changes: 40 additions & 0 deletions python/tests/worker_tests.py
Expand Up @@ -272,6 +272,46 @@ def completing(job: Job, result):
await parent_queue.close()
await queue.close()

async def test_process_job_respecting_the_concurrency_set(self):
num_jobs_processing = 0
pending_message_to_process = 8
wait = 0.01
job_count = 0
queue = Queue(queueName)

async def process(job: Job, token: str):
nonlocal num_jobs_processing
nonlocal wait
nonlocal pending_message_to_process
num_jobs_processing += 1
self.assertLess(num_jobs_processing, 5)
wait += 0.1
await asyncio.sleep(wait)
self.assertEqual(num_jobs_processing, min(pending_message_to_process, 4))
pending_message_to_process -= 1
num_jobs_processing -= 1

return None

for _ in range(8):
await queue.add("test", data={})

worker = Worker(queueName, process, {"concurrency": 4 })

completed_events = Future()

def completing(job: Job, result):
nonlocal job_count
if job_count == 7:
completed_events.set_result(None)
job_count += 1

worker.on("completed", completing)

await completed_events

await queue.close()
await worker.close()

if __name__ == '__main__':
unittest.main()

0 comments on commit 1b95185

Please sign in to comment.