Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat worker limit #142

Merged
merged 15 commits into from
Aug 11, 2019
Merged

Feat worker limit #142

merged 15 commits into from
Aug 11, 2019

Conversation

rubik
Copy link
Contributor

@rubik rubik commented Aug 9, 2019

This PR adds a new parameter to the Worker class: queue_read_limit. It allows limiting the number of job IDs that are read from the queue at each polling interval.

Redis requires to set both offset and count, so I've added another instance attribute called queue_read_offset that sets the offset to 0 in case queue_read_limit is specified, and to None otherwise. I haven't added a corresponding parameter for this attribute because I think that in the majority of the cases it's not needed. I think it could be useful to investigate it in case multiple workers are run in parallel, but I haven't tested this so I refrained from complicating the public interface further.

This PR passes all tests except two, which also fail in the master branch. So they are most likely unrelated to these changes.

Fixes #141

Copy link
Owner

@samuelcolvin samuelcolvin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

otherwise looks good, please update HISTORY.rst adding a new section for v0.17.

tests/conftest.py Outdated Show resolved Hide resolved
@codecov
Copy link

codecov bot commented Aug 10, 2019

Codecov Report

Merging #142 into master will increase coverage by <.01%.
The diff coverage is 100%.

@@            Coverage Diff             @@
##           master     #142      +/-   ##
==========================================
+ Coverage   98.74%   98.75%   +<.01%     
==========================================
  Files           8        8              
  Lines         638      642       +4     
  Branches       90       91       +1     
==========================================
+ Hits          630      634       +4     
  Misses          6        6              
  Partials        2        2

Repository owner deleted a comment from codecov bot Aug 10, 2019
Copy link
Owner

@samuelcolvin samuelcolvin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please update history, also I guess it would be good to have a test that this limit actually works, eg. set queue_read_limit = 5, add 10 jobs to the queue, and check that on the first read only 5 items are retrieved.

@@ -158,6 +161,7 @@ def __init__(
job_timeout: SecondsTimedelta = 300,
keep_result: SecondsTimedelta = 3600,
poll_delay: SecondsTimedelta = 0.5,
queue_read_limit: Optional[int] = None,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe this shouldn't be optional?

Is there any reason not to set it to 1000 or max_jobs?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rubik did you see this comment? What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@samuelcolvin I agree, yes. One of the last commits sets it to max_jobs if it's None.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, so the type shouldn't be Optional, on the class

should be:

self.queue_read_limit: int = queue_read_limit or max_jobs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@samuelcolvin I don't follow. In your line with or you allow queue_read_limit to be None. That means that we need to keep Optional in the type hint. If the user does not specify it, it's None. I changed the assignment to be a one-liner like yours.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What you've done is correct.

My point is that if you do:

foo: Optional[int] = 1

bar = foo

if bar is None:
    bar = 123

Mypy reads this as implicitly setting the type of bar to Optional[int], that doesn't change in the if block.

If instead you do

foo: Optional[int] = 1
bar = foo or 123

The type of bar is int, this is what we do above, except we're explicit and add a type hint.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@samuelcolvin Now I understand, thanks for the explanation.

@codecov
Copy link

codecov bot commented Aug 11, 2019

Codecov Report

Merging #142 into master will increase coverage by <.01%.
The diff coverage is 100%.

@@            Coverage Diff             @@
##           master     #142      +/-   ##
==========================================
+ Coverage   98.74%   98.75%   +<.01%     
==========================================
  Files           8        8              
  Lines         638      642       +4     
  Branches       90       91       +1     
==========================================
+ Hits          630      634       +4     
  Misses          6        6              
  Partials        2        2

@codecov
Copy link

codecov bot commented Aug 11, 2019

Codecov Report

Merging #142 into master will increase coverage by 0.01%.
The diff coverage is 100%.

@@            Coverage Diff             @@
##           master     #142      +/-   ##
==========================================
+ Coverage   98.05%   98.06%   +0.01%     
==========================================
  Files           8        8              
  Lines         667      671       +4     
  Branches       95       95              
==========================================
+ Hits          654      658       +4     
  Misses         10       10              
  Partials        3        3

@rubik
Copy link
Contributor Author

rubik commented Aug 11, 2019

@samuelcolvin Yes, tests are definitely needed. I added them in the second-to-last commit. I also added the history note.

@@ -158,6 +161,7 @@ def __init__(
job_timeout: SecondsTimedelta = 300,
keep_result: SecondsTimedelta = 3600,
poll_delay: SecondsTimedelta = 0.5,
queue_read_limit: Optional[int] = None,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rubik did you see this comment? What do you think?

arq/worker.py Outdated
@@ -280,6 +278,28 @@ def run(self) -> None:
queued_jobs = await self.pool.zcard(self.queue_name)
if queued_jobs == 0:
return
async with self.sem: # don't bother with zrangebyscore until we have "space" to run the jobs
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why we're now doing zrangebyscore in two different places.

Is this really required?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@samuelcolvin This was introduced by mistake, it's absolutely not required. The only thing needed is what is inside _poll_iteration. This has been fixed now.


assert await arq_redis.zcard(default_queue_name) == 4
worker: Worker = worker(functions=[foobar], max_jobs=2)
worker.pool = await create_pool(worker.redis_settings)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@samuelcolvin My understanding was that the pool is initialized in Worker.main, which we don't call in this test. It seems that it's initialized separately too, so I removed this line.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the point is that the worker fixture uses the standard redis connection, thus this isn't required.

assert worker.jobs_retried == 0

await worker._poll_iteration()
await asyncio.sleep(0.01)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this sleep doing? I doubt we need it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@samuelcolvin It introduces a small delay because if Redis and Python are not fast enough, the assert below is not verified. In fact, if I remove it the test fails on my machine.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, I would guess you might need a big delay to avoid intermittent failures on CI where resources are more constrained (I've had problems like this a lot before), would increase to 0.1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@samuelcolvin It makes sense. I increased it to 0.1.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like you've only updated this number in some places.

@rubik
Copy link
Contributor Author

rubik commented Aug 11, 2019

@samuelcolvin I have fixed the linting errors so that the checks can pass. The errors on Python 3.8 remain, but they are not related to this PR. I am not able to understand what causes them.

@samuelcolvin
Copy link
Owner

no problem, looks like aioredis is not compatible with python 3.8.

@samuelcolvin samuelcolvin merged commit 3b35919 into samuelcolvin:master Aug 11, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Retrieving new jobs from the queue with a limit
2 participants