-
-
Notifications
You must be signed in to change notification settings - Fork 30.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement asyncio.run_in_executor shortcut #76490
Comments
loop.create_task() and loop.run_in_executor are present very often in user code. But they are require a loop instance, actual call looks like loop = asyncio.get_running_loop()
loop.create_task(coro()) The proposal adds create_task(coro) and run_in_executor(executor, func, *args) shortcuts for this. |
I don't like the low-level API of run_in_executor. "executor" being the first argument, the inability to pass **kwargs, etc. I'd expect to see a more high-level API, perhaps the one that supports 'async with': async with asyncio.ThreadPool() as pool:
f1 = pool.run(func1)
f2 = pool.run(func2)
r3 = await pool.run(func3)
r1 = f1.result()
r2 = f2.result()
print(r1, r2, r3) I mean it's great that we can use 'concurrent.futures' in asyncio, but having native asyncio pools implementation would be way more convenient to the users. In any case, can we focus this issue on the "run_in_executor" API, and open a new one for "create_task"? |
Removed create_task() from title |
https://bugs.python.org/issue32311 opened for create_task() |
In Python 3.7 loop.run_in_executor() is the only user-faced method that requires a loop. asyncio.ThreadPool() sounds great. Maybe thread group can provide better api. But for Python 3.8 adding Yuri, what do you think? |
I agree that there's an issue with this, but I think for the high level API it can be an asynchronous context manager that makes use of concurrent.futures.ThreadPoolExecutor, since that one seems to fit the majority of asyncio use cases. If users want to utilize concurrent.futures.ProcessPoolExecutor or a customized one, they can use loop.run_in_executor() instead. Unless there's something I'm missing with regards to concurrent.futures.ThreadPoolExecutor that specifically makes it less usable. Can't asyncio.ThreadPool make use of concurrent.futures.ThreadPoolExecutor in the internal details while providing a better API?
I'd be willing to work on implementing the asyncio.ThreadPool API as Yury described it.
I can see your point Andrew, but I think that if we implement asyncio.run_in_executor() and then later add asyncio.ThreadPool(), it's going to be against "one obvious way" and we'll end up adding two high-level functions for the same purpose. This principle can't always be adhered to, but I think we should try to when it's possible. Personally, I think the API for asyncio.ThreadPool would be much more robust, so it seems worthwhile to try implementing this one first. If there are a number of unforeseen complications, asyncio.run_in_executor() might still be a decent fallback. |
Clarification: asyncio.run_in_executor() would be a function, but asyncio.ThreadPool would be a context manager class. |
So, here's a prototype implementation of asyncio.ThreadPool that would function exactly as Yury described, but I'm not convinced about the design. Usually, it seems preferred to separate the context manager from the rest of the class (as was done with asyncio.Lock), but I think this one might be simple enough to be a single class: class ThreadPool:
def __init__(self, timeout=None):
self.timeout = timeout
self._loop = None
async def __aenter__(self):
self._loop = asyncio.get_running_loop()
# Ensure that ThreadPoolExecutor is being used
self._loop.default_executor = concurrent.futures.ThreadPoolExecutor()
return self
async def __aexit__(self, *args):
await self._loop.shutdown_default_executor(timeout=self.timeout)
def run(self, func, *args, **kwargs):
call = functools.partial(func, *args, **kwargs)
return self._loop.run_in_executor(None, call) It utilizes the existing lower level event loop API to provide an asynchronous context manager. I'd say the main advantage is that it's significantly more user friendly, as there's no loop or executor argument to be specified, and users can freely pass kwargs to run() thanks to functools.partial(). Additionally, I also included a timeout parameter for shutting down the ThreadPoolExecutor, which will depend upon #60564. This can be used as such: async def main():
async with asyncio.ThreadPool(timeout=600) as pool:
fut1 = pool.run(func)
fut2 = pool.run(func, arg1, arg2)
fut2 = pool.run(func, arg1, kwarg1=True)
print(await asyncio.gather(fut1, fut2, fut3))
asyncio.run(main()) I don't expect that this would be the final design, but I think it's a decent prototype to demonstrate the functionality. Thoughts? |
|
I don't think changing the default executor is a good approach. What happens, if two or more thread pools are running at the same time? In that case they will use the same default executor anyway, so creating a new executor each time seems like a waste. Shutting down the default executor seems unnecessary and could impact lower level code which is using it. The default executor is shutdown at the end of asyncio.run anyway. I also think it would be good to have a synchronous entry point, and not require a context manager. Having a ThreadPool per class instance would be a common pattern. class ThreadPool:
def __init__(self, timeout=None):
self.timeout = timeout
self._loop = asyncio.get_event_loop()
self._executor = concurrent.futures.ThreadPoolExecutor()
async def close(self):
await self._executor.shutdown(timeout=self.timeout)
async def __aenter__(self):
return self
async def __aexit__(self, *args):
await self.close()
def run(self, func, *args, **kwargs):
call = functools.partial(func, *args, **kwargs)
return self._loop.run_in_executor(self._executor, call) I'm not sure if a new ThreadPoolExecutor really needs to be created for each ThreadPool though. |
Run method should be: async def run(self, func, *args, **kwargs):
call = functools.partial(func, *args, **kwargs)
return await self._loop.run_in_executor(None, call) |
Paul's version looks better. Two notes:
|
I agree that it would be better to have ThreadPool use an internal executor rather than relying on the event loop's default executor. The main reason I hadn't was because we hadn't implemented an asynchronous executor shutdown outside of loop.shutdown_default_executor(), but we could potentially move the functionality to a private function (in Lib/asyncio/base_events.py) so it's reusable for ThreadPool. It could be something like this: async def _shutdown_executor(executor, loop):
future = loop.create_future()
thread = threading.Thread(target=loop._do_shutdown, args=(executor,future))
thread.start()
try:
await future
finally:
thread.join()
def _do_shutdown(loop, executor, future):
try:
executor.shutdown(wait=True)
loop.call_soon_threadsafe(future.set_result, None)
except Exception as ex:
loop.call_soon_threadsafe(future.set_exception, ex) Note that the above would be for internal use only, for the existing loop.shutdown_default_executor() and the new asyncio.ThreadPool. For it to support both, it would have to accept an explicit loop argument. It also does not need a robust API, since it's private.
I agree with your point regarding the shutdown of the default executor one. But I think we should shutdown the internal executor for the ThreadPool, as a main point context managers is to start and clean up their own resources. Also, I'm aware that asyncio.run() shuts down the default executor, I implemented that fairly recently in #59940. ;) Another substantial concern is in the case of a coroutine that contains asyncio.ThreadPool being executed without asyncio.run(). There are still use cases for using the lower level loop.run_until_complete() for more complex asyncio programs. I don't think we should make asyncio.ThreadPool dependent on the coroutine being executed with asyncio.run(). Thus, it makes sense that ThreadPool should create a new instance of ThreadPoolExecutor upon entry of the context manager and then shut it down upon exit.
IMO, a context manager should create and then finalize it's own resources, rather than sharing the same executor across contexts. Sharing the same one seems to defeat the purpose of using a context manager in the first place, no?
async def run(self, func, *args, **kwargs):
call = functools.partial(func, *args, **kwargs)
return await self._loop.run_in_executor(None, call) Correction: if we're using an internal executor now, this should instead be self._loop.run_in_executor(self._executor, call). With
Agreed, good point.
I think he had some good points, particularly around using an internal executor instead the event loop's default executor; but there's some parts that I disagree with, see above reasons.
Note: If get_running_loop() is used instead, it has to set self._loop within a coroutine (since get_running_loop() can only be used within coroutines), that's why in my version it was within __aenter__. I think this would make the most sense.
That's why in my version I was using the existing event loop API, since we had already implemented an asynchronous loop.shutdown_default_executor() method that calls executor.shutdown(). However, if we added a private _shutdown_executor() and _do_shutdown() as I mentioned above, that wouldn't be an issue. Thanks for the feedback on the prototype Paul and Andrew, both of you brought up some good points. I'll start working on a PR. |
Also, I agree with Paul's idea of initializing the ThreadPoolExecutor in the __init__ instead of __aenter__, that makes more sense now that I think about it. |
Correction:
Also, it might make more sense to rename _do_shutdown() to _do_executor_shutdown() to give the function's name more context; renaming shouldn't be an issue since it's private. Plus, it was just added recently in 3.9, so there's even less backwards compatibility to be concerned with. |
Actually, I think it would be better to move the functionality of loop.shutdown_default_executor() to a new private method loop._shutdown_executor() that takes an executor argument rather than shutting down the default one. This could be used in both loop.shutdown_default_executor() and ThreadPool. There's no need to move it to function instead of being a method of BaseEventLoop though, that doesn't make sense now that I think about it more. Sorry if my thoughts are a bit disorganized, I think I need to get some sleep. (: |
Good points. I made a mistake in run Should be: async def run(self, func, *args, **kwargs):
call = functools.partial(func, *args, **kwargs)
return await self._loop.run_in_executor(self._executor, call) Also in this case run awaits and returns the result. Yury suggested earlier just to return the future and not await. I have no strong opinion either way. The above example does seem more higher level but Yury's example is more flexible. I agree that shutdown_default_executor and _do_shutdown should be changed to accept an executor argument so that any executor can be shutdown asynchronously. So the loop API would have a shutdown_executor method. shutdown_default_executor would just call shutdown_executor with the default executor as argument. That would be a good first step. |
Yeah that's roughly what my initial version was doing. I'm personally leaning a bit more towards returning the future rather than the result, but I'm okay with either option. What are your thoughts on this Yury and Andrew?
We could potentially add an internal method _shutdown_executor, but this would also require modification of _do_shutdown (so that it shuts down the executor passed, rather than the default one). I mentioned this in an earlier example, but this one shows all three together and changes _shutdown_executor to a method of BaseEventLoop:
async def _shutdown_executor(self, executor):
if executor is None:
return
future = self.create_future()
thread = threading.Thread(target=self._do_shutdown, args=(executor,future))
thread.start()
try:
await future
finally:
thread.join()
def _do_shutdown(self, executor, future):
try:
executor.shutdown(wait=True)
self.call_soon_threadsafe(future.set_result, None)
except Exception as ex:
self.call_soon_threadsafe(future.set_exception, ex) Functionally, it works the same for shutdown_default_executor(), but allows _shutdown_executor to be used for asyncio.ThreadPool as well. Since #60564 (adding timeout param) also makes changes to shutdown_default_executor(), it will be blocking this issue. |
Few thoughts:
async def main():
async with asyncio.ThreadPool(concurrency=10) as pool:
await something(pool)
await something_else(pool)
asyncio.run(main())
|
I'm definitely on board with the usage of an async context manager and the functionality shown in the example, but I'm not sure that I entirely understand what the "concurrency" kwarg in "concurrency=10" is supposed to represent in this case. Could you elaborate on what that would do functionally?
Strong +1 on all of those points, I agree.
I believe behavior occurs within shutdown_default_executor(), correct? Specifically, within for ThreadPoolExecutor when executor.shutdown(wait=True) is called and all of the threads are joined without a timeout, it simply waits for each thread to terminate gracefully. So if we moved that functionality to a private coroutine method _shutdown_executor() as suggested in my above example, this could also be done for ThreadPool. Unless you want ThreadPool to be it's own entirely separate implementation that doesn't depend at all on ThreadPoolExecutor. Personally I think we can use ThreadPoolExecutor in the internal details; it seems that the main issue with it isn't the functionality, but rather the low level API offered with loop.run_in_executor(). Also another point to consider is if we should have users explicitly call pool.aclose() or if this should be done automatically when exiting the context manager through the __aexit__. I prefer the latter myself for similar reasons to what I previously mentioned, with a context manager initializing it's own resources on entry and finalizing them upon exit. |
Correction, I phrased this poorly: I believe this behavior occurs within shutdown_default_executor(), correct? Specifically, when executor.shutdown(wait=True) is called within _do_shutdown() and ... |
Number of OS threads to spawn. |
Ah I see, so this would correspond with the "max_workers" argument of ThreadPoolExecutor then, correct? If so, we could pass this in the __init__ for ThreadPool: def __init__(self, concurrency):
...
self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) IMO, I think it would be a bit more clear to just explicitly call it "threads" or "max_threads", as that explains what it's effectively doing. While "concurrency" is still a perfectly correct way of describing the behavior, I think it might be a little too abstract for an argument name. |
Minor clarification: the default should probably be None, which would effectively set the default maximum number of threads to min(32, os.cpu_count() + 4), once it's passed to ThreadPoolExecutor. |
And that's why I like it. If we add ProcessPool it will have the same argument: concurrency. max_workers isn't correct, as we want to spawn all threads and all processes when we start. Thus btw makes me think that initializing threads/processes in __init__ is a bad idea, as spawning can be asynchronous. |
Ah, I see, that would make sense then if we're considering adding a ProcessPool at some point and want to make the argument name the same. Based on your ideas so far, it seems that it will likely not be compatible with the existing ThreadPoolExecutor. From my understanding, the executor classes are designed around spawning the threads (or processes in the case of ProcessPoolExecutor) as needed up to max_workers, rather than spawning them upon startup. The asynchronous spawning of threads or processes would also not be compatible with the executor subclasses as far as I can tell. I can start working on a draft/prototype for a design. It will likely take more time to implement this, but it will give us the chance to have a native asyncio ThreadPool that doesn't directly rely upon the API in concurrent.futures. Also, would you prefer for there to be an abstract asyncio.AbstractPool class that ThreadPool inherits from? I think this would make it more streamlined to implement a similar ProcessPool at some point in the future. This would be similar to the relationship between the Executor class and ThreadPoolExecutor, or AbstractEventLoop and BaseEventLoop. Let me know if you approve of this idea Yury and Andrew. It's quite a bit more involved than implementing a simple high level version of loop.run_in_executor(), but I think it would prove to be worthwhile in the long term. |
No, that would be too much work. Writing a thread pool or process pool from scratch is an extremely tedious task and it will take us years to stabilize the code. It's not simple. We should design *our* API correctly though. And that means that we can't initialize our pools in __init__. Something along these lines would work: class ThreadPool:
async def start(): ...
async def __aenter__(self):
await self.start()
return self
async def aclose(): ...
async def __aexit__(self, *exc):
await self.aclose()
It shouldn't be much harder than run_in_executor() as we continue to rely on concurrent.future (at least for the first version). We need to start the discussion about this API on discourse. Please give me a few days to organize that. |
I can see that it would be a lot of additional work, that's why I was using ThreadPoolExecutor in my earlier prototype. The main issue that I'm not seeing though is how exactly we're going to actually spawn the threads or processes asynchronously upon *startup* in ThreadPool using ThreadPoolExecutor's existing public API, with only the submit(), map(), and shutdown() methods. Unless I'm misunderstanding something, the executor classes were not designed with that intention in mind. With Executor, a new thread/process (worker) is spawned *as needed* when submit() is called throughout the lifespan of the Executor up to max_workers, rather than upon startup as you're wanting ThreadPool to do. Thus, it seemed to make more sense to me to actually build up a new Pool class from scratch that was largely based on Executor, but with significantly differing functionality. Otherwise, it seems like we would have to make some modifications to ThreadPoolExecutor, or inherit from it and then redesign the internals of some of the methods to change the way the threads/processes are spawned.
I think that run_in_executor() was far more simple compared to this. The functionality of run_in_executor() almost maps directly to executor.submit(), other than a few conditional checks and converting the concurrent.futures.Future returned to an asyncio.Future through wrap_future().
I agree that we should probably continue this discussion on discourse, as it probably goes beyond the scope of a single issue. No problem. |
I'm going to have to rescind the above statements. I was able to implement a new prototype of asyncio.ThreadPool (using ThreadPoolExecutor) that spawns it's threads asynchronously on startup. Since this one a bit more involved than the previous code examples, I created a gist: https://gist.github.com/aeros/8a86de6b13f17b9f717ea539ee1ee78f It's by no means a complete implementation, but it at least proves the functionality that Yury described is very much possible using the existing ThreadPoolExecutor class. |
Nice work! This is a great excercise, but we can really just use concurrent.futures.ThreadPool as is. Spawning threads is fast. As I mentioned before all we need to do is to design *our* API to NOT initialize pools in __init__, that's it. The design outlined in https://bugs.python.org/msg355881 would do that. |
Thanks, it was quite helpful for better understanding the internals of ThreadPoolExecutor. I think that I'm still not understanding something important though. Even if we initialize our ThreadPoolExecutor outside of __init__ (in a start() coroutine method, as your proposing), it seems like the threads will be spawned throughout the lifespan of the threadpool, rather than upon startup since the new threads are spawned in ThreadPoolExecutor *after* executor.submit() is called (up to max_workers) rather than upon initialization. So even if an instance of ThreadPoolExecututor is initialized asynchronously within a start() coroutine method, the individual threads within it won't be spawned at the same time. That's why I wrote an explicit way of spawning threads in the above example, based on the way that ThreadPoolExecutor spawns threads in _adjust_thread_count(), which is called at the end of submit(). |
It's also worth mentioning that ThreadPoolExecutor only spawns up to one additional thread at a time for each executor.submit() called. |
Correct.
Correct. There are a few points of this approach: (a) design the API correctly;
Yeah. In your current approach you're using ThreadPoolExecutor private API, which makes the code a bit fragile. There are two alternatives:
I'm in favor of (2), but let's go through (a-d) steps to get there. |
That sounds like a good strategy. I'll start working on step a, to build a more robust working implementation (that only uses ThreadPoolExecutor's public API), and then work on c and d once the API is approved.
Agreed, I'm also in favor of option (2), but doing (a-d) first. I think this approach will provide far more stability (rather than implementing (2) immediately), as we'll be able to write extensive test coverage using a ThreadPoolExecutor implementation as a base, and then ensure the native asyncio threadpool implementation has the same intended behavior. Afterwards, could even keep the ThreadPoolExecutor version as private for testing purposes. The native asyncio version will likely require some additional tests to ensure that the threads are being spawned eagerly, but they should have very similar overall behavior, with the asyncio version having better performance. I'm thinking that we could create a new Lib/asyncio/pools.py, for the initial ThreadPoolExecutor implementation, to give us a separate area to work with for native asyncio one in the future. A similar asyncio.ProcessPool API could also be eventually created there as well. It might be feasible to fit the initial implementation in Lib/asyncio/base_events.py, but IMO the native asyncio version would not fit. (From the user end it would make no difference, as long as we add pools.__all__ to Lib/asyncio/init.py) My only remaining question that I can think of: should we implement an asyncio.ProcessPool API (using ProcessPoolExecutor's public API) before working on the native asyncio version of asyncio.ThreadPool? This would likely allow us to release the executor versions well before the 3.9 beta (2020-05-18), and then the native asyncio versions (with eager spawning) either before 3.9rc1 (2020-08-10) or at some point during 3.10 alpha. I suspect that writing the extensive test coverage will take the most time. |
Yury and Andrew, here's my latest API design for asyncio.ThreadPool: master...aeros:asyncio-threadpool. This is for the initial ThreadPoolExecutor version, using the design based on Yury's suggestions. I plan on extending upon the docstrings, writing tests, and the documentation for it. My idea was to use the new Lib/asyncio/pools.py and AbstractPool to eventually implement an asyncio.ProcessPool (and the native version of asyncio.ThreadPool). I plan on opening a PR after I finish writing some tests and documentation, I'd like to include it all in the same PR if possible. But let me know what you think about the current API design, it would be much easier for me to make modifications at this stage. |
So, I just had an interesting idea... what if ThreadPool.run() returned a Task instead of a coroutine object? With the current version of asyncio.ThreadPool, if a user wants to create a Task, they would have to do something like this: async with asyncio.ThreadPool() as pool:
task = asyncio.create_task(pool.run(io_blocking_func, 10, kwarg1='test'))
other_task = asyncio.create_task(pool.run(io_blocking_func, 12))
if some_conditional:
task.cancel()
results = await asyncio.gather(task, other_task, return_exceptions=True)
... To me, this looks like excessive boilerplate, particularly for a higher level API. Even for rather straightforward behavior, it requires nested function calls. If we were to return a task directly, this would be significantly cleaner: async with asyncio.ThreadPool() as pool:
task = pool.run(io_blocking_func, 10, kwarg1='test')
other_task = pool.run(io_blocking_func, 12)
if some_conditional:
task.cancel()
results = await asyncio.gather(task, other_task, return_exceptions=True)
... Since asyncio.ThreadPool is intended to be a high-level API, I don't think it's an issue to return a Task from it's run() method. It would make it significantly easier and more convenient to work with from a user perspective. Thoughts? |
After having some time to think this over, I prefer the current behavior. I don't think there would be significant enough improvement from returning a Task instead, and it would likely result in an overall performance loss. Also, as a general update on the project, I'm close to being ready to open a PR to implement asyncio.ThreadPool. I finished the basic implementation and added a decent number of new tests to ensure its functionality. Here's my current progress: master...aeros:asyncio-threadpool I just need to work on adding the new documentation, and more specifically finding a good place for it in the current asyncio docs. Do you have any ideas for that, Yury? I figured that you might already have a preference in mind. |
Note for myself: Python 3.9 release manager (Lukasz) approved adding the feature to Python 3.9.0 beta2: |
Now that the |
Note: these values reflect the state of the issue at the time it was migrated and might not reflect the current state.
Show more details
GitHub fields:
bugs.python.org fields:
The text was updated successfully, but these errors were encountered: