Skip to content

Commit

Permalink
fix concurrency with multiple workers (#180)
Browse files Browse the repository at this point in the history
* fix concurrency with multiple workers

* update history
  • Loading branch information
samuelcolvin committed Apr 23, 2020
1 parent 3fec03c commit 20b40fd
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 9 deletions.
5 changes: 3 additions & 2 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
History
-------

v0.18.5 (unreleased)
v0.19.0 (unreleased)
....................
* Python 3.8 support
* Python 3.8 support, #178
* fix concurrency with multiple workers, #180

v0.18.4 (2019-12-19)
....................
Expand Down
2 changes: 1 addition & 1 deletion arq/version.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
__all__ = ('VERSION',)

VERSION = '0.18.5a1'
VERSION = '0.19a1'
17 changes: 13 additions & 4 deletions arq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def __init__(
self.job_timeout_s = to_seconds(job_timeout)
self.keep_result_s = to_seconds(keep_result)
self.poll_delay_s = to_seconds(poll_delay)
self.queue_read_limit = queue_read_limit or max_jobs
self.queue_read_limit = queue_read_limit or max(max_jobs * 5, 100)
self._queue_read_offset = 0
self.max_tries = max_tries
self.health_check_interval = to_seconds(health_check_interval)
Expand Down Expand Up @@ -286,7 +286,11 @@ async def main(self):
await asyncio.gather(*self.tasks)
return

async def _poll_iteration(self):
async def _poll_iteration(self) -> None:
"""
Get ids of pending jobs from the main queue sorted-set data structure and start those jobs, remove
any finished tasks from self.tasks.
"""
count = self.queue_read_limit
if self.burst and self.max_burst_jobs >= 0:
burst_jobs_remaining = self.max_burst_jobs - self._jobs_started()
Expand All @@ -299,7 +303,8 @@ async def _poll_iteration(self):
job_ids = await self.pool.zrangebyscore(
self.queue_name, offset=self._queue_read_offset, count=count, max=now
)
await self.run_jobs(job_ids)

await self.start_jobs(job_ids)

for t in self.tasks:
if t.done():
Expand All @@ -309,7 +314,10 @@ async def _poll_iteration(self):

await self.heart_beat()

async def run_jobs(self, job_ids):
async def start_jobs(self, job_ids: List[str]) -> None:
"""
For each job id, get the job definition, check it's not running and start it in a task
"""
for job_id in job_ids:
await self.sem.acquire()
in_progress_key = in_progress_key_prefix + job_id
Expand All @@ -323,6 +331,7 @@ async def run_jobs(self, job_ids):
if ongoing_exists or not score:
# job already started elsewhere, or already finished and removed from queue
self.sem.release()
logger.debug('job %s already running elsewhere', job_id)
continue

tr = conn.multi_exec()
Expand Down
12 changes: 10 additions & 2 deletions tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,8 @@ async def test_queue_read_limit_equals_max_jobs(arq_redis: ArqRedis, worker):
await arq_redis.enqueue_job('foobar')

assert await arq_redis.zcard(default_queue_name) == 4
worker: Worker = worker(functions=[foobar], max_jobs=2)
worker: Worker = worker(functions=[foobar], queue_read_limit=2)
assert worker.queue_read_limit == 2
assert worker.jobs_complete == 0
assert worker.jobs_failed == 0
assert worker.jobs_retried == 0
Expand All @@ -521,6 +522,13 @@ async def test_queue_read_limit_equals_max_jobs(arq_redis: ArqRedis, worker):
assert worker.jobs_retried == 0


async def test_queue_read_limit_calc(worker):
assert worker(functions=[foobar], queue_read_limit=2, max_jobs=1).queue_read_limit == 2
assert worker(functions=[foobar], queue_read_limit=200, max_jobs=1).queue_read_limit == 200
assert worker(functions=[foobar], max_jobs=18).queue_read_limit == 100
assert worker(functions=[foobar], max_jobs=22).queue_read_limit == 110


async def test_custom_queue_read_limit(arq_redis: ArqRedis, worker):
for _ in range(4):
await arq_redis.enqueue_job('foobar')
Expand Down Expand Up @@ -686,7 +694,7 @@ async def foo(ctx, v):
caplog.set_level(logging.DEBUG, logger='arq.worker')
await arq_redis.enqueue_job('foo', 1, _job_id='testing')
worker: Worker = worker(functions=[func(foo, name='foo')])
await asyncio.gather(*[worker.run_jobs(['testing']) for _ in range(5)])
await asyncio.gather(*[worker.start_jobs(['testing']) for _ in range(5)])
# debug(caplog.text)
assert 'multi-exec error, job testing already started elsewhere' in caplog.text
assert 'WatchVariableError' not in caplog.text

0 comments on commit 20b40fd

Please sign in to comment.