-
-
Notifications
You must be signed in to change notification settings - Fork 173
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Retrieving new jobs from the queue with a limit #141
Comments
I don't think that's the case. The job IDs are retrieved, but they are not "taken" - another worker could still execute the job. However I may be wrong. Please try this out and let me know if you do find a problem. |
@samuelcolvin Yes, that's what I meant. The reason I was checking is that in my case the queue usually has tens to hundreds of thousands of jobs. It doesn't seem optimal at all for a worker to take all the IDs each time, if there's no way to run the corresponding jobs. I will try it out though, yes. |
Makes sense. Limit would make sense. To be honest I havent run arq v0.16 with that many pending jobs, so it's slightly unknown territory. Very happy to try and help make it work though. |
@samuelcolvin From my tests, after the queue gets big, the problem is not so much in the workers but in Redis. Since Redis is single-threaded the operations start to slow down because the job ID read takes a lot of time. I think adding a import math
if self.queue_read_limit_coef is not None:
free_actors = self.max_jobs - self._sem._value
limit = math.ceil(free_jobs * self.queue_read_limit_coef)
job_ids = await self.pool.zrangebyscore(self.queue_name, limit=limit)
else:
job_ids = await self.pool.zrangebyscore(self.queue_name, max=now) What do you think? |
It should be possible to use We definitely don't want to be processing any jobs not yet scheduled to be executed. I also think better to use a simpler setting to understand, eg. |
@samuelcolvin Fair enough, I guess I can simply use my |
yes, makes sense. |
@samuelcolvin Are you planning to work on this soon or should I open a PR? I think I'll be able to do this tomorrow or during the weekend. |
Would be great if you could submit a pr. |
By checking the source code it seems to me that a worker always takes as many jobs as possible from the queue, independently of how many free actors are there:
https://github.com/samuelcolvin/arq/blob/b2d397ae3b07340674f6f0edad144beaf4d4ef54/arq/worker.py#L260-L264
I think this has great potential for improvement. If the queue is large, a worker will keep all the job IDs in memory without being able to execute all of those in a meaningful time. On the contrary, if the semaphore indicates that it's full, why take jobs? (Or maybe we should take just a few.) I was also planning to run multiple instances of my worker, and in that case the parallelism would benefit from each worker taking a subset of jobs that they can manage.
With that in mind I propose that the above call to Redis is changed to:
where
limit
is the number of free spots in the semaphore or some multiple of that (ideally configurable, I think the optimal value would mostly depend on the use case but it's probably slightly above1
in most of them).I also don't quite understand the usefulness of
now
. I think that in the great majority of the cases, all the enqueued jobs have a timestamp below the current timestamp. But even if they had a slightly higher timestamp, why would a free worker not run it?The text was updated successfully, but these errors were encountered: