Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bpo-32309: Implement asyncio.ThreadPool #18410

Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
95 changes: 95 additions & 0 deletions Doc/library/asyncio-pools.rst
@@ -0,0 +1,95 @@
.. currentmodule:: asyncio

.. versionadded:: 3.9

.. _asyncio-pools:

=====
Pools
=====

**Source code:** :source:`Lib/asyncio/pools.py`

-----------------------------------------------

.. note::
This section of the documentation and all of its members have been
added *provisionally* to asyncio's API. For more details, see
:term:`provisional api`.

Asyncio pools are high-level, asynchronous context managers that can be used
to concurrently run blocking functions and methods.

There are many potential use cases, but a particularly useful one is for
combining libraries without asyncio support with existing asyncio programs.
Normally, calling a non-async function within the event loop would will result
in blocking the event loop until the function returns. However, by using a
pool to run the function, it can be executed in a separate worker (such as a
thread or process) without blocking the event loop.

.. class:: ThreadPool(concurrency=None)

An asynchronous threadpool that provides methods to concurrently
aeros marked this conversation as resolved.
Show resolved Hide resolved
run IO-bound functions, without blocking the event loop.

*concurrency* is an optional argument that limits the number of
threads to utilize in the threadpool. With the default value of
``None``, the amount of threads used will scale based on the
number of processors.

.. coroutinemethod:: run(/, func, *args, **kwargs)
aeros marked this conversation as resolved.
Show resolved Hide resolved

Asynchronously run *func* with its arguments and keyword-arguments
within the threadpool, and return a :class:`asyncio.Future` object
that represents the eventual result of its execution. ::

async with asyncio.ThreadPool() as pool:
await pool.run(time.sleep, 1)

Raises a :exc:`RuntimeError` if the threadpool is *not* running.

.. coroutinemethod:: astart()

Schedule the start of the threadpool and spawn its threads. Note that
this function is called automatically when using ``asyncio.ThreadPool``
as an asynchronous context manager, and does not need to be called
directly.

Raises a :exc:`RuntimeError` if the threadpool is already running or
if it's been closed.

.. coroutinemethod:: aclose()
aeros marked this conversation as resolved.
Show resolved Hide resolved

Schedule the closing of the threadpool. Note that this function is
called automatically when using ``asyncio.ThreadPool`` as an
asynchronous context manager, and does not need to be called directly.

Raises a :exc:`RuntimeError` if the threadpool has already been closed.

Examples
========

Here's an example of concurrently running two IO-bound functions using
:class:`asyncio.ThreadPool`::

import asyncio

def blocking_io():
print("start blocking_io")
with open('/dev/urandom', 'rb') as f:
f.read(100_000)
print("blocking_io complete")

def other_blocking_io():
print("start other_blocking_io")
with open('/dev/zero', 'rb') as f:
f.read(10)
print("other_blocking_io complete")

async def main():
async with asyncio.ThreadPool() as pool:
await asyncio.gather(
pool.run(blocking_io),
pool.run(other_blocking_io))

asyncio.run(main())
3 changes: 3 additions & 0 deletions Doc/library/asyncio.rst
Expand Up @@ -43,6 +43,8 @@ asyncio provides a set of **high-level** APIs to:

* :ref:`synchronize <asyncio-sync>` concurrent code;

* concurrently run blocking functions in a :ref:`pool <asyncio-pools>`;

Additionally, there are **low-level** APIs for
*library and framework developers* to:

Expand Down Expand Up @@ -73,6 +75,7 @@ Additionally, there are **low-level** APIs for
asyncio-subprocess.rst
asyncio-queue.rst
asyncio-exceptions.rst
asyncio-pools.rst

.. toctree::
:caption: Low-level APIs
Expand Down
7 changes: 7 additions & 0 deletions Doc/whatsnew/3.9.rst
Expand Up @@ -137,6 +137,13 @@ details, see the documentation for ``loop.create_datagram_endpoint()``.
(Contributed by Kyle Stanley, Antoine Pitrou, and Yury Selivanov in
:issue:`37228`.)

Added :class:`asyncio.ThreadPool`, an asynchronous context manager for
concurrently running IO-bound functions without blocking the event loop.
It essentially works as a higher-level version of
:meth:`asyncio.loop.run_in_executor` that can take keyword arguments and
be used as a context manager using ``async with``.
(Contributed by Kyle Stanley in :issue:`32309`.)

Added a new :term:`coroutine` :meth:`~asyncio.loop.shutdown_default_executor`
that schedules a shutdown for the default executor that waits on the
:class:`~concurrent.futures.ThreadPoolExecutor` to finish closing. Also,
Expand Down
2 changes: 2 additions & 0 deletions Lib/asyncio/__init__.py
Expand Up @@ -11,6 +11,7 @@
from .exceptions import *
from .futures import *
from .locks import *
from .pools import *
from .protocols import *
from .runners import *
from .queues import *
Expand All @@ -29,6 +30,7 @@
exceptions.__all__ +
futures.__all__ +
locks.__all__ +
pools.__all__ +
protocols.__all__ +
runners.__all__ +
queues.__all__ +
Expand Down
173 changes: 173 additions & 0 deletions Lib/asyncio/pools.py
@@ -0,0 +1,173 @@
"""Support for high-level asynchronous pools in asyncio."""

__all__ = 'ThreadPool',


import concurrent.futures
import functools
import threading
import os

from abc import ABC, abstractmethod

from . import events
from . import exceptions
from . import futures


class AbstractPool(ABC):
"""Abstract base class for asynchronous pools."""

@abstractmethod
async def astart(self):
raise NotImplementedError
aeros marked this conversation as resolved.
Show resolved Hide resolved

@abstractmethod
async def __aenter__(self):
await self.astart()
return self

@abstractmethod
async def aclose(self):
raise NotImplementedError

@abstractmethod
async def __aexit__(self, exc_type, exc_value, exc_traceback):
await self.aclose()

@abstractmethod
async def run(self, /, func, *args, **kwargs):
"""Asynchronously run function *func* using the pool.

Return a future, representing the eventual result of *func*.
"""
raise NotImplementedError


class ThreadPool(AbstractPool):
"""Asynchronous threadpool for running IO-bound functions.

Directly calling an IO-bound function within the main thread will block
other operations from occurring until it is completed. By using a
threadpool, several IO-bound functions can be ran concurrently within
their own threads, without blocking other operations.

The optional argument *concurrency* sets the number of threads within the
threadpool. If *concurrency* is `None`, the maximum number of threads will
be used; based on the number of CPU cores.

This threadpool is intended to be used as an asynchronous context manager,
using the `async with` syntax, which provides automatic initialization and
finalization of resources. For example:

import asyncio

def blocking_io():
print("start blocking_io")
with open('/dev/urandom', 'rb') as f:
f.read(100_000)
print("blocking_io complete")

def other_blocking_io():
print("start other_blocking_io")
with open('/dev/zero', 'rb') as f:
f.read(10)
print("other_blocking_io complete")

async def main():
async with asyncio.ThreadPool() as pool:
await asyncio.gather(
pool.run(blocking_io),
pool.run(other_blocking_io))

asyncio.run(main())
"""

def __init__(self, concurrency=None):
if concurrency is None:
concurrency = min(32, (os.cpu_count() or 1) + 4)
aeros marked this conversation as resolved.
Show resolved Hide resolved

self._concurrency = concurrency
self._running = False
self._closed = False
self._loop = None
self._pool = None

async def astart(self):
self._loop = events.get_running_loop()
await self._spawn_threadpool()

async def __aenter__(self):
await self.astart()
return self

async def aclose(self):
await self._shutdown_threadpool()

async def __aexit__(self, exc_type, exc_value, exc_traceback):
await self.aclose()

async def run(self, /, func, *args, **kwargs):
aeros marked this conversation as resolved.
Show resolved Hide resolved
if not self._running:
aeros marked this conversation as resolved.
Show resolved Hide resolved
raise RuntimeError(f"unable to run {func!r}, "
"threadpool is not running")

func_call = functools.partial(func, *args, **kwargs)
executor = self._pool
return await futures.wrap_future(
executor.submit(func_call), loop=self._loop)

async def _spawn_threadpool(self):
"""Schedule the spawning of the threadpool.

Asynchronously spawns a threadpool with *concurrency* threads.
"""
if self._running:
raise RuntimeError("threadpool is already running")

if self._closed:
raise RuntimeError("threadpool is closed")

future = self._loop.create_future()
thread = threading.Thread(target=self._do_spawn, args=(future,))
aeros marked this conversation as resolved.
Show resolved Hide resolved
thread.start()
try:
await future
finally:
thread.join()

def _do_spawn(self, future):
try:
self._pool = concurrent.futures.ThreadPoolExecutor(
max_workers=self._concurrency)
self._running = True
self._loop.call_soon_threadsafe(future.set_result, None)
except Exception as ex:
self._loop.call_soon_threadsafe(future.exception, ex)
aeros marked this conversation as resolved.
Show resolved Hide resolved

async def _shutdown_threadpool(self):
"""Schedule the shutdown of the threadpool.

Asynchronously joins all of the threads in the threadpool.
"""
if self._closed:
raise RuntimeError("threadpool is already closed")

# Set _running to False as early as possible
self._running = False

future = self._loop.create_future()
thread = threading.Thread(target=self._do_shutdown, args=(future,))
aeros marked this conversation as resolved.
Show resolved Hide resolved
thread.start()
try:
await future
finally:
thread.join()

def _do_shutdown(self, future):
try:
self._pool.shutdown()
self._closed = True
self._loop.call_soon_threadsafe(future.set_result, None)
except Exception as ex:
self._loop.call_soon_threadsafe(future.exception, ex)
aeros marked this conversation as resolved.
Show resolved Hide resolved