New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
More informative Queue class: new method that returns number of unfinished tasks #73789
Comments
Class queue.Queue control the number of unfinished tasks via method task_done(). But it is only possible to get the information about all task done (via join() method). But it is not good idea to provide write access to this internal variable (as it controls internal queue class status). Better way - provide RO access via class method like qsize or empty. It can be look like this: def unfinished(self):
return self.unfinished_tasks One example of this method usage: there is not optimal function _adjust_thread_count in concurrent.futures.ThreadPoolExecutor with following comment: It can be easily done with following condition: if self._work_queue.unfinished() <= len(self._threads):
return |
Sorry, but I don't share your certainty of the usefulness of this method. Since the task_done/join API was added many years ago, I've not seen any use cases arise where we needed to quantify the number of unfinished tasks. Have you seen an actual need in real code or is this PR more of an intuitive guess that the extra method might be useful? In the examples of task_done/join that I've seen, the number of unfinished tasks is typically in the range: qsize() plus between zero and the number of parallel consumers. Where the actual value it falls this range doesn't seem very useful. One the queue is empty, join() just waits to the parallel consumers to complete their one final task that is already under way. Also, I'm reluctant to expand the API for several reasons. Keeping it small makes it more intelligible (giving users additional options for things theh rarely need makes it more difficult to make correct decisions in the common cases). Also, it would be nice to avoid a ripple effect into the other APIs such as multiprocessing which are supersets of this API. And lastly, I'm disinclined to have another informational method like empty, full, and qsize which have to be documented as potentially introducing unexpected race conditions in user code (all three methods return information that may be completely out-of-date or wrong by the time the caller sees the result). |
I concur with Raymond. |
Raymond, Serhiy, thanks for your opinions. I agree that this method like empty, full, and qsize returns information that may be out-of-date in time of its usage. But like those empty, full, and qsize it provides information that helps to make some decisions. What I disagree that this information can be correctly estimated based on qsize and number of parallel consumers as there is no information what exactly consumers do at the moment. Such estimation will be absolutely useless for a decision making. In the example (see https://github.com/slytomcat/cpython/commit/ea313986d86c083e81624fbc8a7ab6a75784e15d) we need to estimate the number of unfinished tasks and comparing it to the number of active threads we can decide: should we create a new thread or not. There is no reason to wait until all tasks done (via join) we need to make decision in working conditions. Actually I've implemented this solution (Queue.unfinished + modified concurrent.futures.ThreadPoolExecutor) in my project (see https://github.com/slytomcat/yandex-disk-client) and it works perfectly: the number of working threads is increasing only on massive flow of tasks, while weak flow of tasks keeps low number of working threads. |
Shouldn't this just check to see if qsize() is greater than zero? That would mean that there are no idle threads. I'm not seeing why there would be any idle threads if there is work in the queue (the queue unblocks idle threads). I don't think "unfinished() - num_threads" makes sense. The meaning of "unfinished() - qsize()" is the number of non-idle worker threads (work is taken out of the queue but has not yet reported that the task is done). |
Not exactly.... there are 3 cases: If qsize <> 0 it means there is no idle consumers threads, all of them must be busy: we need to create one more. No doubt. If qsize = 0 it means one of two cases:
But there is no way to distinguish two last cases. That's why I consider that (unfinished() <= num_threads) gives clear key for decision. |
One more problem that adjusting of number of threads is performed exactly after placing the new task in the queue. In in some cases we can find that qsuze <> 0 but it doesn't mean that there is no idle threads. It can be equal to 1 (just queued task) as no threads manage to get it yet (don't forgot about GIL). So even qsuze <> 0 dosn't mean that we need to create a new thread. But (unfinished() <= num_threads) works perfect in this case also. |
num_threads - unfinished() = the estimation for number of idle threads. |
IMO, this isn't worth expanding the Queue API. The thread pool concept explicitly allows up to poolsize threads and occasionally having one extra thread within the allowed poolsize isn't important. Also, it is unclear why a new thread is being launched if there in nothing in the work queue (unfinished tasks is only interesting when qsize==0). For other users, the new method is problematic because it is distracting and prone to misuse (potentially out-of-date upon return and potentially meaningless if task_done isn't being used). I believe the new method will cause more problems than it fixes. Also, I really don't like the mission creep. Queue objects are primarily about managing a queue of inputs. Monitoring and managing consumers is another (mostly orthogonal) realm. |
I can't fully agree with this:
Monitoring of consumers is already added via task_done() and join() methods. At least this two methods allows to understand that all consumers are in idle state.
This sounds reasonable because I also understand that this method is not usable in all cases as in most cases task_done is not used. Probably my idea for method unfinished is really not so good... Actually I've manage to find unpublished Queue.unfinished_tasks variable just in several minutes when I tried to find solution for threaded PoolExecutor. Hope that any curious developer can find it too. I don't mind if you close this CR. Thanks for pleasant discussion. |
I've closed pull request on GitHub. |
Note: these values reflect the state of the issue at the time it was migrated and might not reflect the current state.
Show more details
GitHub fields:
bugs.python.org fields:
The text was updated successfully, but these errors were encountered: