From f69f70626da2324a3b9d27b4e7e39bf8110f605b Mon Sep 17 00:00:00 2001 From: iaalm Date: Tue, 18 Nov 2025 15:48:39 +0800 Subject: [PATCH 01/15] Fix gh-105836 --- Lib/asyncio/futures.py | 26 +++++++++++++++---- Misc/ACKS | 1 + ...-11-18-15-48-13.gh-issue-105836.sbUw24.rst | 2 ++ 3 files changed, 24 insertions(+), 5 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2025-11-18-15-48-13.gh-issue-105836.sbUw24.rst diff --git a/Lib/asyncio/futures.py b/Lib/asyncio/futures.py index 6bd00a644789f1..f6885066ad541f 100644 --- a/Lib/asyncio/futures.py +++ b/Lib/asyncio/futures.py @@ -365,6 +365,25 @@ def _copy_future_state(source, dest): else: dest.set_result(result) + +def _cancel_future_in_loop(fut, loop, timeout=None): + """Cancel a future in (maybe another) event loop. + + We need to check loop is not running loop to avoid dead lock. + """ + if loop is None or loop is events._get_running_loop(): + return fut.cancel() + cancel_fut = concurrent.futures.Future() + def _cancel(): + try: + result = fut.cancel() + cancel_fut.set_result(result) + except BaseException as exc: + cancel_fut.set_exception(exc) + loop.call_soon_threadsafe(_cancel) + return cancel_fut.result(timeout=timeout) + + def _chain_future(source, destination): """Chain two futures so that when one completes, so does the other. @@ -389,16 +408,13 @@ def _set_state(future, other): def _call_check_cancel(destination): if destination.cancelled(): - if source_loop is None or source_loop is dest_loop: - source.cancel() - else: - source_loop.call_soon_threadsafe(source.cancel) + _cancel_future_in_loop(source, source_loop) def _call_set_state(source): if (destination.cancelled() and dest_loop is not None and dest_loop.is_closed()): return - if dest_loop is None or dest_loop is source_loop: + if dest_loop is None or dest_loop is events._get_running_loop(): _set_state(destination, source) else: if dest_loop.is_closed(): diff --git a/Misc/ACKS b/Misc/ACKS index f5f15f2eb7ea24..05b409633c4019 100644 --- a/Misc/ACKS +++ b/Misc/ACKS @@ -2118,6 +2118,7 @@ Xiang Zhang Robert Xiao Florent Xicluna Yanbo, Xie +Kaisheng Xu Xinhang Xu Arnon Yaari Alakshendra Yadav diff --git a/Misc/NEWS.d/next/Library/2025-11-18-15-48-13.gh-issue-105836.sbUw24.rst b/Misc/NEWS.d/next/Library/2025-11-18-15-48-13.gh-issue-105836.sbUw24.rst new file mode 100644 index 00000000000000..a13afd818bdbfe --- /dev/null +++ b/Misc/NEWS.d/next/Library/2025-11-18-15-48-13.gh-issue-105836.sbUw24.rst @@ -0,0 +1,2 @@ +Fix :meth:`asyncio.run_coroutine_threadsafe` leaving underlying cancelled +asyncio task running From 8fa2cac33cc785a5907bb50e9bfdea3ae1daedb9 Mon Sep 17 00:00:00 2001 From: iaalm Date: Tue, 25 Nov 2025 15:56:41 +0800 Subject: [PATCH 02/15] Add unit test --- Lib/test/test_asyncio/test_tasks.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py index 931a43816a257a..bfe1e2d6c05f79 100644 --- a/Lib/test/test_asyncio/test_tasks.py +++ b/Lib/test/test_asyncio/test_tasks.py @@ -1,6 +1,7 @@ """Tests for tasks.py.""" import collections +from concurrent.futures import thread import contextlib import contextvars import gc @@ -3680,6 +3681,18 @@ def task_factory(loop, coro): (loop, context), kwargs = callback.call_args self.assertEqual(context['exception'], exc_context.exception) + def test_run_coroutine_threadsafe_and_cancel(self): + async def target(): + # self.loop.run_in_executor(None, _in_another_thread) + thread_future = asyncio.run_coroutine_threadsafe(self.add(1, 2), self.loop) + await asyncio.sleep(0) + + thread_future.cancel() + await asyncio.sleep(0) + + self.loop.run_until_complete(target()) + self.assertEqual(0, len(self.loop._ready)) + class SleepTests(test_utils.TestCase): def setUp(self): From 42f3885dbde8cb2c8acaebc21d4ea851f92e6a11 Mon Sep 17 00:00:00 2001 From: iaalm Date: Tue, 25 Nov 2025 16:56:53 +0800 Subject: [PATCH 03/15] Fix code style --- Lib/test/test_asyncio/test_tasks.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py index bfe1e2d6c05f79..b6a506521c3221 100644 --- a/Lib/test/test_asyncio/test_tasks.py +++ b/Lib/test/test_asyncio/test_tasks.py @@ -1,7 +1,6 @@ """Tests for tasks.py.""" import collections -from concurrent.futures import thread import contextlib import contextvars import gc @@ -3689,7 +3688,7 @@ async def target(): thread_future.cancel() await asyncio.sleep(0) - + self.loop.run_until_complete(target()) self.assertEqual(0, len(self.loop._ready)) From 539821928e4591b04c78f8d2a8256c3c77f3e177 Mon Sep 17 00:00:00 2001 From: iaalm Date: Tue, 25 Nov 2025 17:34:11 +0800 Subject: [PATCH 04/15] Revert test --- Lib/test/test_asyncio/test_tasks.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py index b6a506521c3221..931a43816a257a 100644 --- a/Lib/test/test_asyncio/test_tasks.py +++ b/Lib/test/test_asyncio/test_tasks.py @@ -3680,18 +3680,6 @@ def task_factory(loop, coro): (loop, context), kwargs = callback.call_args self.assertEqual(context['exception'], exc_context.exception) - def test_run_coroutine_threadsafe_and_cancel(self): - async def target(): - # self.loop.run_in_executor(None, _in_another_thread) - thread_future = asyncio.run_coroutine_threadsafe(self.add(1, 2), self.loop) - await asyncio.sleep(0) - - thread_future.cancel() - await asyncio.sleep(0) - - self.loop.run_until_complete(target()) - self.assertEqual(0, len(self.loop._ready)) - class SleepTests(test_utils.TestCase): def setUp(self): From d7675d3d9a1ce951edb313557b283821bff2d7cd Mon Sep 17 00:00:00 2001 From: iaalm Date: Wed, 26 Nov 2025 14:49:36 +0800 Subject: [PATCH 05/15] Improve test case --- Lib/asyncio/futures.py | 24 ++++-------------------- Lib/test/test_asyncio/test_tasks.py | 12 ++++++++++++ 2 files changed, 16 insertions(+), 20 deletions(-) diff --git a/Lib/asyncio/futures.py b/Lib/asyncio/futures.py index f6885066ad541f..29652295218a22 100644 --- a/Lib/asyncio/futures.py +++ b/Lib/asyncio/futures.py @@ -365,25 +365,6 @@ def _copy_future_state(source, dest): else: dest.set_result(result) - -def _cancel_future_in_loop(fut, loop, timeout=None): - """Cancel a future in (maybe another) event loop. - - We need to check loop is not running loop to avoid dead lock. - """ - if loop is None or loop is events._get_running_loop(): - return fut.cancel() - cancel_fut = concurrent.futures.Future() - def _cancel(): - try: - result = fut.cancel() - cancel_fut.set_result(result) - except BaseException as exc: - cancel_fut.set_exception(exc) - loop.call_soon_threadsafe(_cancel) - return cancel_fut.result(timeout=timeout) - - def _chain_future(source, destination): """Chain two futures so that when one completes, so does the other. @@ -408,7 +389,10 @@ def _set_state(future, other): def _call_check_cancel(destination): if destination.cancelled(): - _cancel_future_in_loop(source, source_loop) + if source_loop is None or source_loop is events._get_running_loop(): + source.cancel() + else: + source_loop.call_soon_threadsafe(source.cancel) def _call_set_state(source): if (destination.cancelled() and diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py index 931a43816a257a..b6a506521c3221 100644 --- a/Lib/test/test_asyncio/test_tasks.py +++ b/Lib/test/test_asyncio/test_tasks.py @@ -3680,6 +3680,18 @@ def task_factory(loop, coro): (loop, context), kwargs = callback.call_args self.assertEqual(context['exception'], exc_context.exception) + def test_run_coroutine_threadsafe_and_cancel(self): + async def target(): + # self.loop.run_in_executor(None, _in_another_thread) + thread_future = asyncio.run_coroutine_threadsafe(self.add(1, 2), self.loop) + await asyncio.sleep(0) + + thread_future.cancel() + await asyncio.sleep(0) + + self.loop.run_until_complete(target()) + self.assertEqual(0, len(self.loop._ready)) + class SleepTests(test_utils.TestCase): def setUp(self): From f430392ea4b17cd74435c86348e49c4e7ed3e538 Mon Sep 17 00:00:00 2001 From: iaalm Date: Thu, 27 Nov 2025 21:33:47 +0800 Subject: [PATCH 06/15] Run test in another thread --- Lib/test/test_asyncio/test_tasks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py index b6a506521c3221..d09bc64ddbf0e3 100644 --- a/Lib/test/test_asyncio/test_tasks.py +++ b/Lib/test/test_asyncio/test_tasks.py @@ -3682,14 +3682,14 @@ def task_factory(loop, coro): def test_run_coroutine_threadsafe_and_cancel(self): async def target(): - # self.loop.run_in_executor(None, _in_another_thread) thread_future = asyncio.run_coroutine_threadsafe(self.add(1, 2), self.loop) await asyncio.sleep(0) thread_future.cancel() await asyncio.sleep(0) - self.loop.run_until_complete(target()) + future = self.loop.run_in_executor(None, target) + self.loop.run_until_complete(future) self.assertEqual(0, len(self.loop._ready)) From 670e875b45fceb55feb1302536dcef45d81acfc2 Mon Sep 17 00:00:00 2001 From: iaalm Date: Thu, 27 Nov 2025 23:38:58 +0800 Subject: [PATCH 07/15] Use lock to make sure target started --- Lib/test/test_asyncio/test_tasks.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py index d09bc64ddbf0e3..40a61efd78fa74 100644 --- a/Lib/test/test_asyncio/test_tasks.py +++ b/Lib/test/test_asyncio/test_tasks.py @@ -8,6 +8,7 @@ import random import re import sys +from threading import Event import traceback import types import unittest @@ -3681,15 +3682,21 @@ def task_factory(loop, coro): self.assertEqual(context['exception'], exc_context.exception) def test_run_coroutine_threadsafe_and_cancel(self): - async def target(): - thread_future = asyncio.run_coroutine_threadsafe(self.add(1, 2), self.loop) - await asyncio.sleep(0) + target_started = Event() + async def _target(): + target_started.set() + await asyncio.sleep(0.1) + return 1 + def _in_thread(): + thread_future = asyncio.run_coroutine_threadsafe(_target(), self.loop) + # wait target started then cancel + target_started.wait() thread_future.cancel() - await asyncio.sleep(0) + _ = self.loop.run_in_executor(None, _in_thread) - future = self.loop.run_in_executor(None, target) - self.loop.run_until_complete(future) + # start main loop to do things + self.loop.run_until_complete(asyncio.sleep(0.05)) self.assertEqual(0, len(self.loop._ready)) From f061b1df0be327a36e78aabee5c6023ff4438f99 Mon Sep 17 00:00:00 2001 From: iaalm Date: Thu, 4 Dec 2025 20:11:08 +0800 Subject: [PATCH 08/15] Revert "Run test in another thread" --- Lib/test/test_asyncio/test_tasks.py | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py index 40a61efd78fa74..b6a506521c3221 100644 --- a/Lib/test/test_asyncio/test_tasks.py +++ b/Lib/test/test_asyncio/test_tasks.py @@ -8,7 +8,6 @@ import random import re import sys -from threading import Event import traceback import types import unittest @@ -3682,21 +3681,15 @@ def task_factory(loop, coro): self.assertEqual(context['exception'], exc_context.exception) def test_run_coroutine_threadsafe_and_cancel(self): - target_started = Event() - async def _target(): - target_started.set() - await asyncio.sleep(0.1) - return 1 + async def target(): + # self.loop.run_in_executor(None, _in_another_thread) + thread_future = asyncio.run_coroutine_threadsafe(self.add(1, 2), self.loop) + await asyncio.sleep(0) - def _in_thread(): - thread_future = asyncio.run_coroutine_threadsafe(_target(), self.loop) - # wait target started then cancel - target_started.wait() thread_future.cancel() - _ = self.loop.run_in_executor(None, _in_thread) + await asyncio.sleep(0) - # start main loop to do things - self.loop.run_until_complete(asyncio.sleep(0.05)) + self.loop.run_until_complete(target()) self.assertEqual(0, len(self.loop._ready)) From e5afb6d0aad380a0f677e32c87681e07be0280e3 Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Fri, 5 Dec 2025 06:28:40 -0800 Subject: [PATCH 09/15] Apply nits from code review --- Lib/test/test_asyncio/test_tasks.py | 1 + 1 file changed, 1 insertion(+) diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py index b6a506521c3221..6835e4cee072dc 100644 --- a/Lib/test/test_asyncio/test_tasks.py +++ b/Lib/test/test_asyncio/test_tasks.py @@ -3687,6 +3687,7 @@ async def target(): await asyncio.sleep(0) thread_future.cancel() + await asyncio.sleep(0) self.loop.run_until_complete(target()) From 6602b275fbad4e8f8053e3e35ed5c3b40b3329b2 Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Fri, 5 Dec 2025 06:29:47 -0800 Subject: [PATCH 10/15] Apply style nits from code review --- Lib/test/test_asyncio/test_tasks.py | 1 + 1 file changed, 1 insertion(+) diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py index 6835e4cee072dc..99763c4db3a6ec 100644 --- a/Lib/test/test_asyncio/test_tasks.py +++ b/Lib/test/test_asyncio/test_tasks.py @@ -3688,6 +3688,7 @@ async def target(): thread_future.cancel() + await asyncio.sleep(0) self.loop.run_until_complete(target()) From 3567e94d79802ff3d59cba1bf0777e6227528eff Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Fri, 5 Dec 2025 06:34:45 -0800 Subject: [PATCH 11/15] Too many blank lines (my fault) --- Lib/test/test_asyncio/test_tasks.py | 1 - 1 file changed, 1 deletion(-) diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py index 99763c4db3a6ec..6835e4cee072dc 100644 --- a/Lib/test/test_asyncio/test_tasks.py +++ b/Lib/test/test_asyncio/test_tasks.py @@ -3688,7 +3688,6 @@ async def target(): thread_future.cancel() - await asyncio.sleep(0) self.loop.run_until_complete(target()) From 0598eed76d47ef99d401b88b3c1330bc6d41dc8f Mon Sep 17 00:00:00 2001 From: iaalm Date: Fri, 5 Dec 2025 23:34:20 +0800 Subject: [PATCH 12/15] Fix test for windows --- Lib/test/test_asyncio/test_tasks.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py index 6835e4cee072dc..76f348315bc695 100644 --- a/Lib/test/test_asyncio/test_tasks.py +++ b/Lib/test/test_asyncio/test_tasks.py @@ -3691,7 +3691,14 @@ async def target(): await asyncio.sleep(0) self.loop.run_until_complete(target()) - self.assertEqual(0, len(self.loop._ready)) + + # For windows, it's likely to see asyncio.proactor_events + # .BaseProactorEventLoop._loop_self_reading as ready task + # We should filter it out. + ready_tasks = [ + i for i in self.loop._ready + if i._callback.__name__ != "_loop_self_reading"] + self.assertEqual(0, len(ready_tasks)) class SleepTests(test_utils.TestCase): From 787c9fc5de0fc46391353b0df991fd5aefffba7f Mon Sep 17 00:00:00 2001 From: iaalm Date: Fri, 5 Dec 2025 23:36:58 +0800 Subject: [PATCH 13/15] Fix white space --- Lib/test/test_asyncio/test_tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py index 76f348315bc695..b76b450fa66bdc 100644 --- a/Lib/test/test_asyncio/test_tasks.py +++ b/Lib/test/test_asyncio/test_tasks.py @@ -3696,7 +3696,7 @@ async def target(): # .BaseProactorEventLoop._loop_self_reading as ready task # We should filter it out. ready_tasks = [ - i for i in self.loop._ready + i for i in self.loop._ready if i._callback.__name__ != "_loop_self_reading"] self.assertEqual(0, len(ready_tasks)) From c97e497139f01524dfcb89132d70d5a1f26be4ec Mon Sep 17 00:00:00 2001 From: Kumar Aditya Date: Sat, 6 Dec 2025 21:54:32 +0530 Subject: [PATCH 14/15] improve test --- Lib/test/test_asyncio/test_tasks.py | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py index b76b450fa66bdc..9809621a324450 100644 --- a/Lib/test/test_asyncio/test_tasks.py +++ b/Lib/test/test_asyncio/test_tasks.py @@ -3681,24 +3681,28 @@ def task_factory(loop, coro): self.assertEqual(context['exception'], exc_context.exception) def test_run_coroutine_threadsafe_and_cancel(self): + task = None + thread_future = None + # Use a custom task factory to capture the created Task + def task_factory(loop, coro): + nonlocal task + task = asyncio.Task(coro, loop=loop) + return task + + self.addCleanup(self.loop.set_task_factory, + self.loop.get_task_factory()) + async def target(): - # self.loop.run_in_executor(None, _in_another_thread) - thread_future = asyncio.run_coroutine_threadsafe(self.add(1, 2), self.loop) + nonlocal thread_future + self.loop.set_task_factory(task_factory) + thread_future = asyncio.run_coroutine_threadsafe(asyncio.sleep(10), self.loop) await asyncio.sleep(0) thread_future.cancel() - await asyncio.sleep(0) - self.loop.run_until_complete(target()) - - # For windows, it's likely to see asyncio.proactor_events - # .BaseProactorEventLoop._loop_self_reading as ready task - # We should filter it out. - ready_tasks = [ - i for i in self.loop._ready - if i._callback.__name__ != "_loop_self_reading"] - self.assertEqual(0, len(ready_tasks)) + self.assertTrue(task.cancelled()) + self.assertTrue(thread_future.cancelled()) class SleepTests(test_utils.TestCase): From e3a4011734557cdac8f6022a4c1a0453862e8a24 Mon Sep 17 00:00:00 2001 From: Kumar Aditya Date: Sun, 7 Dec 2025 00:38:49 +0530 Subject: [PATCH 15/15] Apply suggestion from @kumaraditya303 --- .../next/Library/2025-11-18-15-48-13.gh-issue-105836.sbUw24.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Misc/NEWS.d/next/Library/2025-11-18-15-48-13.gh-issue-105836.sbUw24.rst b/Misc/NEWS.d/next/Library/2025-11-18-15-48-13.gh-issue-105836.sbUw24.rst index a13afd818bdbfe..d2edc5b2cb743d 100644 --- a/Misc/NEWS.d/next/Library/2025-11-18-15-48-13.gh-issue-105836.sbUw24.rst +++ b/Misc/NEWS.d/next/Library/2025-11-18-15-48-13.gh-issue-105836.sbUw24.rst @@ -1,2 +1,2 @@ Fix :meth:`asyncio.run_coroutine_threadsafe` leaving underlying cancelled -asyncio task running +asyncio task running.