Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat worker limit #142

Merged
merged 15 commits into from
Aug 11, 2019
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
History
-------

v0.17.0 (unreleased)
....................
* add ``worker.queue_read_limit``, fix #141, by @rubik

v0.16.1 (2019-08-02)
....................
* prevent duplicate ``job_id`` when job result exists, fix #137
Expand Down
44 changes: 32 additions & 12 deletions arq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,13 @@ class Worker:
:param job_timeout: default job timeout (max run time)
:param keep_result: default duration to keep job results for
:param poll_delay: duration between polling the queue for new jobs
:param queue_read_limit: the maximum number of jobs to pull from the queue each time it's polled; by default it
equals ``max_jobs``
:param max_tries: default maximum number of times to retry a job
:param health_check_interval: how often to set the health check key
:param health_check_key: redis key under which health check is set
:param retry_jobs: whether to retry jobs on Retry or CancelledError or not
:param max_burst_jobs: the maximum number of jobs to process in burst mode (disabled with negative values)
"""

def __init__(
Expand All @@ -158,6 +162,7 @@ def __init__(
job_timeout: SecondsTimedelta = 300,
keep_result: SecondsTimedelta = 3600,
poll_delay: SecondsTimedelta = 0.5,
queue_read_limit: Optional[int] = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe this shouldn't be optional?

Is there any reason not to set it to 1000 or max_jobs?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rubik did you see this comment? What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@samuelcolvin I agree, yes. One of the last commits sets it to max_jobs if it's None.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, so the type shouldn't be Optional, on the class

should be:

self.queue_read_limit: int = queue_read_limit or max_jobs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@samuelcolvin I don't follow. In your line with or you allow queue_read_limit to be None. That means that we need to keep Optional in the type hint. If the user does not specify it, it's None. I changed the assignment to be a one-liner like yours.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What you've done is correct.

My point is that if you do:

foo: Optional[int] = 1

bar = foo

if bar is None:
    bar = 123

Mypy reads this as implicitly setting the type of bar to Optional[int], that doesn't change in the if block.

If instead you do

foo: Optional[int] = 1
bar = foo or 123

The type of bar is int, this is what we do above, except we're explicit and add a type hint.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@samuelcolvin Now I understand, thanks for the explanation.

max_tries: int = 5,
health_check_interval: SecondsTimedelta = 3600,
health_check_key: Optional[str] = None,
Expand All @@ -180,6 +185,10 @@ def __init__(
self.job_timeout_s = to_seconds(job_timeout)
self.keep_result_s = to_seconds(keep_result)
self.poll_delay_s = to_seconds(poll_delay)
self.queue_read_limit = queue_read_limit
if self.queue_read_limit is None:
self.queue_read_limit = max_jobs
self._queue_read_offset = 0
self.max_tries = max_tries
self.health_check_interval = to_seconds(health_check_interval)
if health_check_key is None:
Expand Down Expand Up @@ -258,18 +267,7 @@ async def main(self):
await self.on_startup(self.ctx)

async for _ in poll(self.poll_delay_s): # noqa F841
async with self.sem: # don't bother with zrangebyscore until we have "space" to run the jobs
now = timestamp_ms()
job_ids = await self.pool.zrangebyscore(self.queue_name, max=now)
await self.run_jobs(job_ids)

# required to make sure errors in run_job get propagated
for t in self.tasks:
if t.done():
self.tasks.remove(t)
t.result()

await self.heart_beat()
await self._poll_iteration()

if self.burst:
if (
Expand All @@ -280,6 +278,28 @@ async def main(self):
queued_jobs = await self.pool.zcard(self.queue_name)
if queued_jobs == 0:
return
async with self.sem: # don't bother with zrangebyscore until we have "space" to run the jobs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why we're now doing zrangebyscore in two different places.

Is this really required?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@samuelcolvin This was introduced by mistake, it's absolutely not required. The only thing needed is what is inside _poll_iteration. This has been fixed now.

now = timestamp_ms()
job_ids = await self.pool.zrangebyscore(
self.queue_name, offset=self._queue_read_offset, count=self.queue_read_limit, max=now
)
await self.run_jobs(job_ids)

async def _poll_iteration(self):
async with self.sem: # don't bother with zrangebyscore until we have "space" to run the jobs
now = timestamp_ms()
job_ids = await self.pool.zrangebyscore(
self.queue_name, offset=self._queue_read_offset, count=self.queue_read_limit, max=now
)
await self.run_jobs(job_ids)

# required to make sure errors in run_job get propagated
for t in self.tasks:
if t.done():
self.tasks.remove(t)
t.result()

await self.heart_beat()

async def run_jobs(self, job_ids):
for job_id in job_ids:
Expand Down
6 changes: 4 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ async def arq_redis(loop):
async def worker(arq_redis):
worker_: Worker = None

def create(functions=[], burst=True, poll_delay=0, **kwargs):
def create(functions=[], burst=True, poll_delay=0, max_jobs=10, **kwargs):
nonlocal worker_
worker_ = Worker(functions=functions, redis_pool=arq_redis, burst=burst, poll_delay=poll_delay, **kwargs)
worker_ = Worker(
functions=functions, redis_pool=arq_redis, burst=burst, poll_delay=poll_delay, max_jobs=max_jobs, **kwargs
)
return worker_

yield create
Expand Down
55 changes: 54 additions & 1 deletion tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import pytest
from aioredis import create_redis_pool

from arq.connections import ArqRedis
from arq.connections import ArqRedis, create_pool
from arq.constants import default_queue_name, health_check_key_suffix, job_key_prefix
from arq.jobs import Job, JobStatus
from arq.worker import FailedJobs, JobExecutionFailed, Retry, Worker, async_check_health, check_health, func, run_worker
Expand All @@ -26,6 +26,7 @@ class Settings:
functions = [func(foobar, name='foobar')]
burst = True
poll_delay = 0
queue_read_limit = 10

loop.run_until_complete(arq_redis.enqueue_job('foobar'))
worker = run_worker(Settings)
Expand Down Expand Up @@ -449,3 +450,55 @@ async def test_repeat_job_result(arq_redis: ArqRedis, worker):
assert await j1.status() == JobStatus.complete

assert await arq_redis.enqueue_job('foobar', _job_id='job_id') is None


async def test_queue_read_limit_equals_max_jobs(arq_redis: ArqRedis, worker):
for _ in range(4):
await arq_redis.enqueue_job('foobar')

assert await arq_redis.zcard(default_queue_name) == 4
worker: Worker = worker(functions=[foobar], max_jobs=2)
worker.pool = await create_pool(worker.redis_settings)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@samuelcolvin My understanding was that the pool is initialized in Worker.main, which we don't call in this test. It seems that it's initialized separately too, so I removed this line.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the point is that the worker fixture uses the standard redis connection, thus this isn't required.

assert worker.jobs_complete == 0
assert worker.jobs_failed == 0
assert worker.jobs_retried == 0

await worker._poll_iteration()
await asyncio.sleep(0.01)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this sleep doing? I doubt we need it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@samuelcolvin It introduces a small delay because if Redis and Python are not fast enough, the assert below is not verified. In fact, if I remove it the test fails on my machine.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, I would guess you might need a big delay to avoid intermittent failures on CI where resources are more constrained (I've had problems like this a lot before), would increase to 0.1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@samuelcolvin It makes sense. I increased it to 0.1.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like you've only updated this number in some places.

assert await arq_redis.zcard(default_queue_name) == 2
assert worker.jobs_complete == 2
assert worker.jobs_failed == 0
assert worker.jobs_retried == 0

await worker._poll_iteration()
await asyncio.sleep(0.01)
assert await arq_redis.zcard(default_queue_name) == 0
assert worker.jobs_complete == 4
assert worker.jobs_failed == 0
assert worker.jobs_retried == 0


async def test_custom_queue_read_limit(arq_redis: ArqRedis, worker):
for _ in range(4):
await arq_redis.enqueue_job('foobar')

assert await arq_redis.zcard(default_queue_name) == 4
worker: Worker = worker(functions=[foobar], max_jobs=4, queue_read_limit=2)
worker.pool = await create_pool(worker.redis_settings)
assert worker.jobs_complete == 0
assert worker.jobs_failed == 0
assert worker.jobs_retried == 0

await worker._poll_iteration()
await asyncio.sleep(0.01)
assert await arq_redis.zcard(default_queue_name) == 2
assert worker.jobs_complete == 2
assert worker.jobs_failed == 0
assert worker.jobs_retried == 0

await worker._poll_iteration()
await asyncio.sleep(0.01)
assert await arq_redis.zcard(default_queue_name) == 0
assert worker.jobs_complete == 4
assert worker.jobs_failed == 0
assert worker.jobs_retried == 0