-
-
Notifications
You must be signed in to change notification settings - Fork 31.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
concurrent.futures.ThreadPoolExecutor/ProcessPoolExecutor should accept an initializer argument #65622
Comments
It would be useful if concurrent.futures.ThreadPoolExecutor took an initializer argument, like multiprocessing.Pool. This is useful for example to load a large dataset once upon initialization of each worker process, without have to pass the dataset as an argument with every job submission, which requires serialization. concurrent.futures has some advantages over multiprocessing such as detecting killed processes ( http://bugs.python.org/issue9205 ), so it would be good if the advantages of both can be combined. It appears that the following fork of concurrent.futures has added these arguments: https://github.com/enthought/encore/blob/7101984bc384da8e7975876ca2809cc0103c3efc/encore/concurrent/futures/enhanced_thread_pool_executor.py |
Do you mean ProcessPoolExecutor? Thread based pools don't involve serialization. It would be good for ThreadPoolExecutor to have it as well, to make it easier to switch between executors, but only ProcessPoolExecutor is suffering from serialization overhead. Threads share the same memory space after all; references to data get passed directly, though you might choose to copy.copy or copy.deepcopy a root data "template" so each thread has its own unique copy that it can mutate. |
Yes I did mean ProcessPoolExecutor, but indeed, it's good to have for threads as well. I could try to make a patch if it is likely that it would be accepted. |
I'm not a core developer, but writing the patch is usually considered helpful. Two notes:
|
BTW, I think there's a design mistake in the EnhancedThreadPoolExecutor that's worth avoiding in any std. lib. implementation: the initialiser and uninitialiser for the EnhancedThreadPoolExecutor accept no arguments. In retrospect, it would have been better to have them take the thread itself as a single argument. We often found ourselves needing this - it's not hard to work around with a threading.current_thread() call, but it's mildly annoying to have to do so. |
Here's a patch. I have added initializer and initargs keywords to both ThreadPoolExecutor and ProcessPoolExecutor, with the same semantics as multiprocessing.Pool. I couldn't figure out what to do if the initializer fails with a ProcessPoolExecutor: how to properly send the traceback back? I also haven't gotten around to figure out how to write tests. |
Here's a version with tests. Detecting an execption in the initializer works with ProcessPoolExecutor, but not with ThreadPoolExecutor. |
Related: |
Giampaolo, this patch is for ProcessPoolExecutor as well. About keyboard interrupts, if my tests are correct, they work |
It seems like everyone agrees that this functionality is useful, so I'm reviving this in hopes of getting a patch pushed through. I've updated Andreas' patch so that it applies cleanly against the latest tree, and tweaked the handling of exceptions in initializer. Now, ProcessPoolExecutor will raise a BrokenPoolException should an initializer method fail, and ThreadPoolExecutor will raise a RunTimeError stating that the pool can't be used because an initializer failed. I was hoping to use multiprocessing.Pool's handling of initializer exceptions as a guide for the right behavior here, but it actually does terrible job: an exception raised in the initializer is completely unhandled, and results in an endless loop of new processes being started up and immediately failing. But that's a separate bug report. :) For now there are still unit tests for testing exceptions being raised in the initializer, but they're noisy; the traceback for each initializer exception gets printed to stdout. I'm not sure if that's undesirable behavior or not. If the new behavior looks ok, the docs will need an update to. |
Here's an updated patch. Changes:
*Updated the docs. |
It seems the addition of the initargs argument doesn't tackle Mark's objection here:
Regardless, I'm going to review the patch soon. |
This would be inconsistent with multiprocessing.Pool's initializer/initargs behavior. I'm not sure if consistency is important there or not, but its worth pointing out. I'm also not sure how useful it would be for ProcessPoolExecutor to receive an instance of itself. With multiprocessing.Pool, initializer is a commonly used to pass objects that can only be inherited to workers (Queues, Locks, etc.). The same pattern would be useful for ProcessPoolExecutor, which means initializer needs to be able to take an arbitrary number of arguments, rather than just an instance to the Process object itself. |
Another updated patch. This one changes ProcessPoolExecutor behavior so that RuntimeErrors are raised in any active Futures, and on subsequent calls to submit after the initializer fails. This makes its behavior consistent with ThreadPoolExecutor. |
Agreed that this doesn't make much sense; I hadn't really thought about the ProcessPoolExecutor case. I withdraw my comments! |
any chance if this getting into 3.5.2? I have some gross code to get around it (setting global properties) |
Would also love to see this in the stdlib soon. My use case is logging setup(forward logs to the main process). |
For cross-referencing purposes: I have proposed in http://bugs.python.org/issue25293 to allow passing a Thread/Process subclass as argument instead of an initializer function, which would both handle Mark Dickinson's comment (http://bugs.python.org/issue21423#msg218040) about passing the thread object as argument, and also allow for finalization. |
I've opened a PR for this at #4241, loosely based on Dan's last patch. |
By the way, the submitted PR follows Dan's argument about the initializer's argument: the actual call is If you want to comment on the PR, I would recommend doing it quick, as I plan to merge in a day or two :-) |
test_concurrent_futures now produces too much output on stderr. $ ./python -m test test_concurrent_futures >/dev/null
Exception in initializer:
Traceback (most recent call last):
File "/home/serhiy/py/cpython/Lib/concurrent/futures/process.py", line 170, in _process_worker
initializer(*initargs)
File "/home/serhiy/py/cpython/Lib/test/test_concurrent_futures.py", line 66, in init_fail
raise ValueError('error in initializer')
ValueError: error in initializer
Exception in initializer:
Traceback (most recent call last):
File "/home/serhiy/py/cpython/Lib/concurrent/futures/process.py", line 170, in _process_worker
initializer(*initargs)
File "/home/serhiy/py/cpython/Lib/test/test_concurrent_futures.py", line 66, in init_fail
raise ValueError('error in initializer')
ValueError: error in initializer
... What is worse, this output looks as errors report. |
And please don't forget to edit a commit message when merge a PR. |
Unfortunately, there is no obvious way to capture the output of the child process here, short of running the entire test under a subprocess. |
What is the best way to silence logging in subprocesses? |
Are you referring to the output shown in msg305601? If it's caused by logging statements, the best way would be either to pipe stderr to /dev/null or to change the logging to use sys.stdout (sys.stderr is just the default) and then pipe stdout to /dev/null. I hope I haven't misunderstood your question, but I fear I may have. |
I'm not well experienced with logging, but if we can change the logging in subprocesses, I think we could change it to not output messages at all. It would be better to save logging messages in subprocesses and check that expected logging messages are emitted in the main process. There is assertLogs(), but it works only with logging in the same process. |
Serhiy, I think I have found a way to deal with the logging output: |
Thank you Antoine! |
Note: these values reflect the state of the issue at the time it was migrated and might not reflect the current state.
Show more details
GitHub fields:
bugs.python.org fields:
The text was updated successfully, but these errors were encountered: