Skip to content

Commit

Permalink
GH-82448: Add thread timeout for loop.shutdown_default_executor (#97561)
Browse files Browse the repository at this point in the history
Co-authored-by: Kyle Stanley <aeros167@gmail.com>
  • Loading branch information
kumaraditya303 and aeros committed Sep 28, 2022
1 parent 9a404b1 commit 575a253
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 9 deletions.
11 changes: 10 additions & 1 deletion Doc/library/asyncio-eventloop.rst
Original file line number Diff line number Diff line change
Expand Up @@ -180,18 +180,27 @@ Running and stopping the loop

.. versionadded:: 3.6

.. coroutinemethod:: loop.shutdown_default_executor()
.. coroutinemethod:: loop.shutdown_default_executor(timeout=None)

Schedule the closure of the default executor and wait for it to join all of
the threads in the :class:`ThreadPoolExecutor`. After calling this method, a
:exc:`RuntimeError` will be raised if :meth:`loop.run_in_executor` is called
while using the default executor.

The *timeout* parameter specifies the amount of time the executor will
be given to finish joining. The default value is ``None``, which means the
executor will be given an unlimited amount of time.

If the timeout duration is reached, a warning is emitted and executor is
terminated without waiting for its threads to finish joining.

Note that there is no need to call this function when
:func:`asyncio.run` is used.

.. versionadded:: 3.9

.. versionchanged:: 3.12
Added the *timeout* parameter.

Scheduling callbacks
^^^^^^^^^^^^^^^^^^^^
Expand Down
6 changes: 5 additions & 1 deletion Doc/library/asyncio-runner.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ Running an asyncio Program

This function runs the passed coroutine, taking care of
managing the asyncio event loop, *finalizing asynchronous
generators*, and closing the threadpool.
generators*, and closing the executor.

This function cannot be called when another asyncio event loop is
running in the same thread.
Expand All @@ -41,6 +41,10 @@ Running an asyncio Program
the end. It should be used as a main entry point for asyncio
programs, and should ideally only be called once.

The executor is given a timeout duration of 5 minutes to shutdown.
If the executor hasn't finished within that duration, a warning is
emitted and the executor is closed.

Example::

async def main():
Expand Down
17 changes: 14 additions & 3 deletions Lib/asyncio/base_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,8 +561,13 @@ async def shutdown_asyncgens(self):
'asyncgen': agen
})

async def shutdown_default_executor(self):
"""Schedule the shutdown of the default executor."""
async def shutdown_default_executor(self, timeout=None):
"""Schedule the shutdown of the default executor.
The timeout parameter specifies the amount of time the executor will
be given to finish joining. The default value is None, which means
that the executor will be given an unlimited amount of time.
"""
self._executor_shutdown_called = True
if self._default_executor is None:
return
Expand All @@ -572,7 +577,13 @@ async def shutdown_default_executor(self):
try:
await future
finally:
thread.join()
thread.join(timeout)

if thread.is_alive():
warnings.warn("The executor did not finishing joining "
f"its threads within {timeout} seconds.",
RuntimeWarning, stacklevel=2)
self._default_executor.shutdown(wait=False)

def _do_shutdown(self, future):
try:
Expand Down
3 changes: 3 additions & 0 deletions Lib/asyncio/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
FLOW_CONTROL_HIGH_WATER_SSL_READ = 256 # KiB
FLOW_CONTROL_HIGH_WATER_SSL_WRITE = 512 # KiB

# Default timeout for joining the threads in the threadpool
THREAD_JOIN_TIMEOUT = 300

# The enum should be here to break circular dependencies between
# base_events and sslproto
class _SendfileMode(enum.Enum):
Expand Down
13 changes: 9 additions & 4 deletions Lib/asyncio/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from . import events
from . import exceptions
from . import tasks

from . import constants

class _State(enum.Enum):
CREATED = "created"
Expand Down Expand Up @@ -69,7 +69,8 @@ def close(self):
loop = self._loop
_cancel_all_tasks(loop)
loop.run_until_complete(loop.shutdown_asyncgens())
loop.run_until_complete(loop.shutdown_default_executor())
loop.run_until_complete(
loop.shutdown_default_executor(constants.THREAD_JOIN_TIMEOUT))
finally:
if self._set_event_loop:
events.set_event_loop(None)
Expand Down Expand Up @@ -160,8 +161,8 @@ def run(main, *, debug=None):
"""Execute the coroutine and return the result.
This function runs the passed coroutine, taking care of
managing the asyncio event loop and finalizing asynchronous
generators.
managing the asyncio event loop, finalizing asynchronous
generators and closing the default executor.
This function cannot be called when another asyncio event loop is
running in the same thread.
Expand All @@ -172,6 +173,10 @@ def run(main, *, debug=None):
It should be used as a main entry point for asyncio programs, and should
ideally only be called once.
The executor is given a timeout duration of 5 minutes to shutdown.
If the executor hasn't finished within that duration, a warning is
emitted and the executor is closed.
Example:
async def main():
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Add *timeout* parameter to :meth:`asyncio.loop.shutdown_default_executor`.
The default value is ``None``, which means the executor will be given an unlimited amount of time.
When called from :class:`asyncio.Runner` or :func:`asyncio.run`, the default timeout is 5 minutes.

0 comments on commit 575a253

Please sign in to comment.