diff --git a/HISTORY.rst b/HISTORY.rst index 5a966763..7c336f37 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -3,9 +3,10 @@ History ------- -v0.18.5 (unreleased) +v0.19.0 (unreleased) .................... -* Python 3.8 support +* Python 3.8 support, #178 +* fix concurrency with multiple workers, #180 v0.18.4 (2019-12-19) .................... diff --git a/arq/version.py b/arq/version.py index 2f5683b6..731728a3 100644 --- a/arq/version.py +++ b/arq/version.py @@ -1,3 +1,3 @@ __all__ = ('VERSION',) -VERSION = '0.18.5a1' +VERSION = '0.19a1' diff --git a/arq/worker.py b/arq/worker.py index be8afc7b..af7f0cf0 100644 --- a/arq/worker.py +++ b/arq/worker.py @@ -193,7 +193,7 @@ def __init__( self.job_timeout_s = to_seconds(job_timeout) self.keep_result_s = to_seconds(keep_result) self.poll_delay_s = to_seconds(poll_delay) - self.queue_read_limit = queue_read_limit or max_jobs + self.queue_read_limit = queue_read_limit or max(max_jobs * 5, 100) self._queue_read_offset = 0 self.max_tries = max_tries self.health_check_interval = to_seconds(health_check_interval) @@ -286,7 +286,11 @@ async def main(self): await asyncio.gather(*self.tasks) return - async def _poll_iteration(self): + async def _poll_iteration(self) -> None: + """ + Get ids of pending jobs from the main queue sorted-set data structure and start those jobs, remove + any finished tasks from self.tasks. + """ count = self.queue_read_limit if self.burst and self.max_burst_jobs >= 0: burst_jobs_remaining = self.max_burst_jobs - self._jobs_started() @@ -299,7 +303,8 @@ async def _poll_iteration(self): job_ids = await self.pool.zrangebyscore( self.queue_name, offset=self._queue_read_offset, count=count, max=now ) - await self.run_jobs(job_ids) + + await self.start_jobs(job_ids) for t in self.tasks: if t.done(): @@ -309,7 +314,10 @@ async def _poll_iteration(self): await self.heart_beat() - async def run_jobs(self, job_ids): + async def start_jobs(self, job_ids: List[str]) -> None: + """ + For each job id, get the job definition, check it's not running and start it in a task + """ for job_id in job_ids: await self.sem.acquire() in_progress_key = in_progress_key_prefix + job_id @@ -323,6 +331,7 @@ async def run_jobs(self, job_ids): if ongoing_exists or not score: # job already started elsewhere, or already finished and removed from queue self.sem.release() + logger.debug('job %s already running elsewhere', job_id) continue tr = conn.multi_exec() diff --git a/tests/test_worker.py b/tests/test_worker.py index 5a55eae4..d8987664 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -501,7 +501,8 @@ async def test_queue_read_limit_equals_max_jobs(arq_redis: ArqRedis, worker): await arq_redis.enqueue_job('foobar') assert await arq_redis.zcard(default_queue_name) == 4 - worker: Worker = worker(functions=[foobar], max_jobs=2) + worker: Worker = worker(functions=[foobar], queue_read_limit=2) + assert worker.queue_read_limit == 2 assert worker.jobs_complete == 0 assert worker.jobs_failed == 0 assert worker.jobs_retried == 0 @@ -521,6 +522,13 @@ async def test_queue_read_limit_equals_max_jobs(arq_redis: ArqRedis, worker): assert worker.jobs_retried == 0 +async def test_queue_read_limit_calc(worker): + assert worker(functions=[foobar], queue_read_limit=2, max_jobs=1).queue_read_limit == 2 + assert worker(functions=[foobar], queue_read_limit=200, max_jobs=1).queue_read_limit == 200 + assert worker(functions=[foobar], max_jobs=18).queue_read_limit == 100 + assert worker(functions=[foobar], max_jobs=22).queue_read_limit == 110 + + async def test_custom_queue_read_limit(arq_redis: ArqRedis, worker): for _ in range(4): await arq_redis.enqueue_job('foobar') @@ -686,7 +694,7 @@ async def foo(ctx, v): caplog.set_level(logging.DEBUG, logger='arq.worker') await arq_redis.enqueue_job('foo', 1, _job_id='testing') worker: Worker = worker(functions=[func(foo, name='foo')]) - await asyncio.gather(*[worker.run_jobs(['testing']) for _ in range(5)]) + await asyncio.gather(*[worker.start_jobs(['testing']) for _ in range(5)]) # debug(caplog.text) assert 'multi-exec error, job testing already started elsewhere' in caplog.text assert 'WatchVariableError' not in caplog.text