From 2caaffcabe9f05abd971cad664381eba152b8091 Mon Sep 17 00:00:00 2001 From: Kumar Aditya Date: Tue, 31 Dec 2024 07:45:01 +0000 Subject: [PATCH 1/8] add ThreadSafeHandle --- Lib/asyncio/base_events.py | 11 +++++++---- Lib/asyncio/events.py | 20 +++++++++++++++++++- Lib/asyncio/unix_events.py | 2 +- Lib/test/test_asyncio/test_tasks.py | 4 ++-- 4 files changed, 29 insertions(+), 8 deletions(-) diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index 5dbe4b28d236d3..27a2d2ab639779 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -873,7 +873,10 @@ def call_soon_threadsafe(self, callback, *args, context=None): self._check_closed() if self._debug: self._check_callback(callback, 'call_soon_threadsafe') - handle = self._call_soon(callback, args, context) + handle = events._ThreadSafeHandle(callback, args, self, context) + self._ready.append(handle) + if handle._source_traceback: + del handle._source_traceback[-1] if handle._source_traceback: del handle._source_traceback[-1] self._write_to_self() @@ -1937,7 +1940,7 @@ def call_exception_handler(self, context): def _add_callback(self, handle): """Add a Handle to _ready.""" - if not handle._cancelled: + if not handle.cancelled(): self._ready.append(handle) def _add_callback_signalsafe(self, handle): @@ -1966,7 +1969,7 @@ def _run_once(self): # is too high new_scheduled = [] for handle in self._scheduled: - if handle._cancelled: + if handle.cancelled(): handle._scheduled = False else: new_scheduled.append(handle) @@ -2016,7 +2019,7 @@ def _run_once(self): ntodo = len(self._ready) for i in range(ntodo): handle = self._ready.popleft() - if handle._cancelled: + if handle.cancelled(): continue if self._debug: try: diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py index 6e291d28ec81ae..fdf63a9d9f5357 100644 --- a/Lib/asyncio/events.py +++ b/Lib/asyncio/events.py @@ -59,7 +59,7 @@ def __init__(self, callback, args, loop, context=None): def _repr_info(self): info = [self.__class__.__name__] - if self._cancelled: + if self.cancelled(): info.append('cancelled') if self._callback is not None: info.append(format_helpers._format_callback_source( @@ -113,6 +113,24 @@ def _run(self): self._loop.call_exception_handler(context) self = None # Needed to break cycles when an exception occurs. +class _ThreadSafeHandle(Handle): + + def __init__(self, callback, args, loop, context=None): + super().__init__(callback, args, loop, context) + self._lock = threading.RLock() + + def cancel(self): + with self._lock: + return super().cancel() + + def cancelled(self): + with self._lock: + return super().cancelled() + + def _run(self): + with self._lock: + return super()._run() + class TimerHandle(Handle): """Object returned by timed callback registration methods.""" diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index f69c6a64c39ae6..1aab2ba3203954 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -138,7 +138,7 @@ def _handle_signal(self, sig): handle = self._signal_handlers.get(sig) if handle is None: return # Assume it's some race condition. - if handle._cancelled: + if handle.cancelled(): self.remove_signal_handler(sig) # Remove it properly. else: self._add_callback_signalsafe(handle) diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py index b5363226ad79f4..bb59e5117108b0 100644 --- a/Lib/test/test_asyncio/test_tasks.py +++ b/Lib/test/test_asyncio/test_tasks.py @@ -1810,11 +1810,11 @@ def call_later(delay, callback, *args): loop.call_later = call_later test_utils.run_briefly(loop) - self.assertFalse(handle._cancelled) + self.assertFalse(handle.cancelled()) t.cancel() test_utils.run_briefly(loop) - self.assertTrue(handle._cancelled) + self.assertTrue(handle.cancelled()) def test_task_cancel_sleeping_task(self): From 9c8d0cf3eafcf0161468ee4e861ca64bc6593678 Mon Sep 17 00:00:00 2001 From: Kumar Aditya Date: Tue, 31 Dec 2024 07:49:11 +0000 Subject: [PATCH 2/8] fixes --- Lib/asyncio/base_events.py | 6 +++--- Lib/asyncio/events.py | 2 ++ Lib/asyncio/unix_events.py | 2 +- Lib/test/test_asyncio/test_tasks.py | 4 ++-- 4 files changed, 8 insertions(+), 6 deletions(-) diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index 27a2d2ab639779..9e6f6e3ee7e3ec 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -1940,7 +1940,7 @@ def call_exception_handler(self, context): def _add_callback(self, handle): """Add a Handle to _ready.""" - if not handle.cancelled(): + if not handle._cancelled: self._ready.append(handle) def _add_callback_signalsafe(self, handle): @@ -1969,7 +1969,7 @@ def _run_once(self): # is too high new_scheduled = [] for handle in self._scheduled: - if handle.cancelled(): + if handle._cancelled: handle._scheduled = False else: new_scheduled.append(handle) @@ -2019,7 +2019,7 @@ def _run_once(self): ntodo = len(self._ready) for i in range(ntodo): handle = self._ready.popleft() - if handle.cancelled(): + if handle._cancelled: continue if self._debug: try: diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py index fdf63a9d9f5357..1ee17d8b649b5e 100644 --- a/Lib/asyncio/events.py +++ b/Lib/asyncio/events.py @@ -129,6 +129,8 @@ def cancelled(self): def _run(self): with self._lock: + if self._cancelled: + return return super()._run() diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index 1aab2ba3203954..f69c6a64c39ae6 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -138,7 +138,7 @@ def _handle_signal(self, sig): handle = self._signal_handlers.get(sig) if handle is None: return # Assume it's some race condition. - if handle.cancelled(): + if handle._cancelled: self.remove_signal_handler(sig) # Remove it properly. else: self._add_callback_signalsafe(handle) diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py index bb59e5117108b0..b5363226ad79f4 100644 --- a/Lib/test/test_asyncio/test_tasks.py +++ b/Lib/test/test_asyncio/test_tasks.py @@ -1810,11 +1810,11 @@ def call_later(delay, callback, *args): loop.call_later = call_later test_utils.run_briefly(loop) - self.assertFalse(handle.cancelled()) + self.assertFalse(handle._cancelled) t.cancel() test_utils.run_briefly(loop) - self.assertTrue(handle.cancelled()) + self.assertTrue(handle._cancelled) def test_task_cancel_sleeping_task(self): From 894b26f79487401e4697c580830bc4ff2757c439 Mon Sep 17 00:00:00 2001 From: Kumar Aditya Date: Sun, 5 Jan 2025 11:23:30 +0000 Subject: [PATCH 3/8] more tests --- Lib/test/test_asyncio/test_events.py | 117 +++++++++++++++++++++++++++ 1 file changed, 117 insertions(+) diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index c8439c9af5e6ba..7bec15d1e9e170 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -353,6 +353,123 @@ def run_in_thread(): t.join() self.assertEqual(results, ['hello', 'world']) + def test_call_soon_threadsafe_handle_block_check_cancelled(self): + results = [] + + callback_started = threading.Event() + callback_finished = threading.Event() + def callback(arg): + callback_started.set() + results.append(arg) + time.sleep(1) + callback_finished.set() + + def run_in_thread(): + handle = self.loop.call_soon_threadsafe(callback, 'hello') + self.assertIsInstance(handle, events._ThreadSafeHandle) + callback_started.wait() + # callback started so it should block checking for cancellation + # until it finishes + self.assertFalse(handle.cancelled()) + self.assertTrue(callback_finished.is_set()) + self.loop.call_soon_threadsafe(self.loop.stop) + + t = threading.Thread(target=run_in_thread) + t.start() + + self.loop.run_forever() + t.join() + self.assertEqual(results, ['hello']) + + def test_call_soon_threadsafe_handle_block_cancellation(self): + results = [] + + callback_started = threading.Event() + callback_finished = threading.Event() + def callback(arg): + callback_started.set() + results.append(arg) + time.sleep(1) + callback_finished.set() + + def run_in_thread(): + handle = self.loop.call_soon_threadsafe(callback, 'hello') + self.assertIsInstance(handle, events._ThreadSafeHandle) + callback_started.wait() + # callback started so it should not be cancel it from other thread until it finishes + handle.cancel() + self.assertTrue(callback_finished.is_set()) + self.loop.call_soon_threadsafe(self.loop.stop) + + t = threading.Thread(target=run_in_thread) + t.start() + + self.loop.run_forever() + t.join() + self.assertEqual(results, ['hello']) + + def test_call_soon_threadsafe_handle_cancel_same_thread(self): + results = [] + callback_started = threading.Event() + callback_finished = threading.Event() + + fut = concurrent.futures.Future() + def callback(arg): + callback_started.set() + handle = fut.result() + handle.cancel() + results.append(arg) + callback_finished.set() + self.loop.stop() + + def run_in_thread(): + handle = self.loop.call_soon_threadsafe(callback, 'hello') + fut.set_result(handle) + self.assertIsInstance(handle, events._ThreadSafeHandle) + callback_started.wait() + # callback cancels itself from same thread so it has no effect + # it runs to completion + self.assertTrue(handle.cancelled()) + self.assertTrue(callback_finished.is_set()) + self.loop.call_soon_threadsafe(self.loop.stop) + + t = threading.Thread(target=run_in_thread) + t.start() + + self.loop.run_forever() + t.join() + self.assertEqual(results, ['hello']) + + def test_call_soon_threadsafe_handle_cancel_other_thread(self): + results = [] + ev = threading.Event() + + callback_finished = threading.Event() + def callback(arg): + results.append(arg) + callback_finished.set() + self.loop.stop() + + def run_in_thread(): + handle = self.loop.call_soon_threadsafe(callback, 'hello') + # handle can be cancelled from other thread if not started yet + self.assertIsInstance(handle, events._ThreadSafeHandle) + handle.cancel() + self.assertTrue(handle.cancelled()) + self.assertFalse(callback_finished.is_set()) + ev.set() + self.loop.call_soon_threadsafe(self.loop.stop) + + # block the main loop until the callback is added and cancelled in the + # other thread + self.loop.call_soon(ev.wait) + t = threading.Thread(target=run_in_thread) + t.start() + self.loop.run_forever() + t.join() + self.assertEqual(results, []) + self.assertFalse(callback_finished.is_set()) + def test_call_soon_threadsafe_same_thread(self): results = [] From ff3600dd7c0ac1c337b43aff39027d7eb9395c06 Mon Sep 17 00:00:00 2001 From: "blurb-it[bot]" <43283697+blurb-it[bot]@users.noreply.github.com> Date: Sun, 5 Jan 2025 11:46:18 +0000 Subject: [PATCH 4/8] =?UTF-8?q?=F0=9F=93=9C=F0=9F=A4=96=20Added=20by=20blu?= =?UTF-8?q?rb=5Fit.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../next/Library/2025-01-05-11-46-14.gh-issue-128340.gKI0uU.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 Misc/NEWS.d/next/Library/2025-01-05-11-46-14.gh-issue-128340.gKI0uU.rst diff --git a/Misc/NEWS.d/next/Library/2025-01-05-11-46-14.gh-issue-128340.gKI0uU.rst b/Misc/NEWS.d/next/Library/2025-01-05-11-46-14.gh-issue-128340.gKI0uU.rst new file mode 100644 index 00000000000000..790400a19f334b --- /dev/null +++ b/Misc/NEWS.d/next/Library/2025-01-05-11-46-14.gh-issue-128340.gKI0uU.rst @@ -0,0 +1 @@ +Add internal thread safe handle to be used in :meth:`asyncio.loop.call_soon_threadsafe` for thread safe cancellation. From 419cfa65e9408db76e8a65769942beceefba9d98 Mon Sep 17 00:00:00 2001 From: Kumar Aditya Date: Sun, 5 Jan 2025 11:47:13 +0000 Subject: [PATCH 5/8] fix comment --- Lib/test/test_asyncio/test_events.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index 7bec15d1e9e170..ed75b909317357 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -396,7 +396,8 @@ def run_in_thread(): handle = self.loop.call_soon_threadsafe(callback, 'hello') self.assertIsInstance(handle, events._ThreadSafeHandle) callback_started.wait() - # callback started so it should not be cancel it from other thread until it finishes + # callback started so it cannot be cancelled from other thread until + # it finishes handle.cancel() self.assertTrue(callback_finished.is_set()) self.loop.call_soon_threadsafe(self.loop.stop) From edc4a9f5ba25f9d8665069eb923894c71668173f Mon Sep 17 00:00:00 2001 From: Kumar Aditya Date: Sun, 5 Jan 2025 11:48:56 +0000 Subject: [PATCH 6/8] add slots --- Lib/asyncio/events.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py index 1ee17d8b649b5e..daf1064f4c9077 100644 --- a/Lib/asyncio/events.py +++ b/Lib/asyncio/events.py @@ -115,6 +115,8 @@ def _run(self): class _ThreadSafeHandle(Handle): + __slots__ = ('_lock',) + def __init__(self, callback, args, loop, context=None): super().__init__(callback, args, loop, context) self._lock = threading.RLock() From 87960e33ed72ef3a8b7ef5ea51d44817a31ab3e0 Mon Sep 17 00:00:00 2001 From: Kumar Aditya Date: Sun, 5 Jan 2025 12:30:31 +0000 Subject: [PATCH 7/8] add comments --- Lib/asyncio/events.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py index daf1064f4c9077..e69292b239a970 100644 --- a/Lib/asyncio/events.py +++ b/Lib/asyncio/events.py @@ -59,7 +59,7 @@ def __init__(self, callback, args, loop, context=None): def _repr_info(self): info = [self.__class__.__name__] - if self.cancelled(): + if self._cancelled: info.append('cancelled') if self._callback is not None: info.append(format_helpers._format_callback_source( @@ -130,6 +130,10 @@ def cancelled(self): return super().cancelled() def _run(self): + # The event loop checks for cancellation without holding the lock + # It is possible that the handle is cancelled after the check + # but before the callback is called so check it again after acquiring + # the lock and return without calling the callback if it is cancelled. with self._lock: if self._cancelled: return From d8650ae0a130671bdb38feaf998dbe2ccda29c3d Mon Sep 17 00:00:00 2001 From: Kumar Aditya Date: Sun, 5 Jan 2025 12:34:53 +0000 Subject: [PATCH 8/8] more comments --- Lib/asyncio/events.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py index e69292b239a970..2ee9870e80f20b 100644 --- a/Lib/asyncio/events.py +++ b/Lib/asyncio/events.py @@ -113,6 +113,8 @@ def _run(self): self._loop.call_exception_handler(context) self = None # Needed to break cycles when an exception occurs. +# _ThreadSafeHandle is used for callbacks scheduled with call_soon_threadsafe +# and is thread safe unlike Handle which is not thread safe. class _ThreadSafeHandle(Handle): __slots__ = ('_lock',)