Skip to content

Commit 53da1e8

Browse files
gh-134173: optimize state transfer between concurrent.futures.Future and asyncio.Future (#134174)
Co-authored-by: Kumar Aditya <kumaraditya@python.org>
1 parent f2de1e6 commit 53da1e8

File tree

5 files changed

+148
-14
lines changed

5 files changed

+148
-14
lines changed

Lib/asyncio/futures.py

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -351,22 +351,19 @@ def _set_concurrent_future_state(concurrent, source):
351351
def _copy_future_state(source, dest):
352352
"""Internal helper to copy state from another Future.
353353
354-
The other Future may be a concurrent.futures.Future.
354+
The other Future must be a concurrent.futures.Future.
355355
"""
356-
assert source.done()
357356
if dest.cancelled():
358357
return
359358
assert not dest.done()
360-
if source.cancelled():
359+
done, cancelled, result, exception = source._get_snapshot()
360+
assert done
361+
if cancelled:
361362
dest.cancel()
363+
elif exception is not None:
364+
dest.set_exception(_convert_future_exc(exception))
362365
else:
363-
exception = source.exception()
364-
if exception is not None:
365-
dest.set_exception(_convert_future_exc(exception))
366-
else:
367-
result = source.result()
368-
dest.set_result(result)
369-
366+
dest.set_result(result)
370367

371368
def _chain_future(source, destination):
372369
"""Chain two futures so that when one completes, so does the other.

Lib/concurrent/futures/_base.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -558,6 +558,33 @@ def set_exception(self, exception):
558558
self._condition.notify_all()
559559
self._invoke_callbacks()
560560

561+
def _get_snapshot(self):
562+
"""Get a snapshot of the future's current state.
563+
564+
This method atomically retrieves the state in one lock acquisition,
565+
which is significantly faster than multiple method calls.
566+
567+
Returns:
568+
Tuple of (done, cancelled, result, exception)
569+
- done: True if the future is done (cancelled or finished)
570+
- cancelled: True if the future was cancelled
571+
- result: The result if available and not cancelled
572+
- exception: The exception if available and not cancelled
573+
"""
574+
# Fast path: check if already finished without lock
575+
if self._state == FINISHED:
576+
return True, False, self._result, self._exception
577+
578+
# Need lock for other states since they can change
579+
with self._condition:
580+
# We have to check the state again after acquiring the lock
581+
# because it may have changed in the meantime.
582+
if self._state == FINISHED:
583+
return True, False, self._result, self._exception
584+
if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED}:
585+
return True, True, None, None
586+
return False, False, None, None
587+
561588
__class_getitem__ = classmethod(types.GenericAlias)
562589

563590
class Executor(object):

Lib/test/test_asyncio/test_futures.py

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -413,23 +413,23 @@ def func_repr(func):
413413
def test_copy_state(self):
414414
from asyncio.futures import _copy_future_state
415415

416-
f = self._new_future(loop=self.loop)
416+
f = concurrent.futures.Future()
417417
f.set_result(10)
418418

419419
newf = self._new_future(loop=self.loop)
420420
_copy_future_state(f, newf)
421421
self.assertTrue(newf.done())
422422
self.assertEqual(newf.result(), 10)
423423

424-
f_exception = self._new_future(loop=self.loop)
424+
f_exception = concurrent.futures.Future()
425425
f_exception.set_exception(RuntimeError())
426426

427427
newf_exception = self._new_future(loop=self.loop)
428428
_copy_future_state(f_exception, newf_exception)
429429
self.assertTrue(newf_exception.done())
430430
self.assertRaises(RuntimeError, newf_exception.result)
431431

432-
f_cancelled = self._new_future(loop=self.loop)
432+
f_cancelled = concurrent.futures.Future()
433433
f_cancelled.cancel()
434434

435435
newf_cancelled = self._new_future(loop=self.loop)
@@ -441,7 +441,7 @@ def test_copy_state(self):
441441
except BaseException as e:
442442
f_exc = e
443443

444-
f_conexc = self._new_future(loop=self.loop)
444+
f_conexc = concurrent.futures.Future()
445445
f_conexc.set_exception(f_exc)
446446

447447
newf_conexc = self._new_future(loop=self.loop)
@@ -454,6 +454,56 @@ def test_copy_state(self):
454454
newf_tb = ''.join(traceback.format_tb(newf_exc.__traceback__))
455455
self.assertEqual(newf_tb.count('raise concurrent.futures.InvalidStateError'), 1)
456456

457+
def test_copy_state_from_concurrent_futures(self):
458+
"""Test _copy_future_state from concurrent.futures.Future.
459+
460+
This tests the optimized path using _get_snapshot when available.
461+
"""
462+
from asyncio.futures import _copy_future_state
463+
464+
# Test with a result
465+
f_concurrent = concurrent.futures.Future()
466+
f_concurrent.set_result(42)
467+
f_asyncio = self._new_future(loop=self.loop)
468+
_copy_future_state(f_concurrent, f_asyncio)
469+
self.assertTrue(f_asyncio.done())
470+
self.assertEqual(f_asyncio.result(), 42)
471+
472+
# Test with an exception
473+
f_concurrent_exc = concurrent.futures.Future()
474+
f_concurrent_exc.set_exception(ValueError("test exception"))
475+
f_asyncio_exc = self._new_future(loop=self.loop)
476+
_copy_future_state(f_concurrent_exc, f_asyncio_exc)
477+
self.assertTrue(f_asyncio_exc.done())
478+
with self.assertRaises(ValueError) as cm:
479+
f_asyncio_exc.result()
480+
self.assertEqual(str(cm.exception), "test exception")
481+
482+
# Test with cancelled state
483+
f_concurrent_cancelled = concurrent.futures.Future()
484+
f_concurrent_cancelled.cancel()
485+
f_asyncio_cancelled = self._new_future(loop=self.loop)
486+
_copy_future_state(f_concurrent_cancelled, f_asyncio_cancelled)
487+
self.assertTrue(f_asyncio_cancelled.cancelled())
488+
489+
# Test that destination already cancelled prevents copy
490+
f_concurrent_result = concurrent.futures.Future()
491+
f_concurrent_result.set_result(10)
492+
f_asyncio_precancelled = self._new_future(loop=self.loop)
493+
f_asyncio_precancelled.cancel()
494+
_copy_future_state(f_concurrent_result, f_asyncio_precancelled)
495+
self.assertTrue(f_asyncio_precancelled.cancelled())
496+
497+
# Test exception type conversion
498+
f_concurrent_invalid = concurrent.futures.Future()
499+
f_concurrent_invalid.set_exception(concurrent.futures.InvalidStateError("invalid"))
500+
f_asyncio_invalid = self._new_future(loop=self.loop)
501+
_copy_future_state(f_concurrent_invalid, f_asyncio_invalid)
502+
self.assertTrue(f_asyncio_invalid.done())
503+
with self.assertRaises(asyncio.exceptions.InvalidStateError) as cm:
504+
f_asyncio_invalid.result()
505+
self.assertEqual(str(cm.exception), "invalid")
506+
457507
def test_iter(self):
458508
fut = self._new_future(loop=self.loop)
459509

Lib/test/test_concurrent_futures/test_future.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future)
77

88
from test import support
9+
from test.support import threading_helper
910

1011
from .util import (
1112
PENDING_FUTURE, RUNNING_FUTURE, CANCELLED_FUTURE,
@@ -282,6 +283,62 @@ def test_multiple_set_exception(self):
282283

283284
self.assertEqual(f.exception(), e)
284285

286+
def test_get_snapshot(self):
287+
"""Test the _get_snapshot method for atomic state retrieval."""
288+
# Test with a pending future
289+
f = Future()
290+
done, cancelled, result, exception = f._get_snapshot()
291+
self.assertFalse(done)
292+
self.assertFalse(cancelled)
293+
self.assertIsNone(result)
294+
self.assertIsNone(exception)
295+
296+
# Test with a finished future (successful result)
297+
f = Future()
298+
f.set_result(42)
299+
done, cancelled, result, exception = f._get_snapshot()
300+
self.assertTrue(done)
301+
self.assertFalse(cancelled)
302+
self.assertEqual(result, 42)
303+
self.assertIsNone(exception)
304+
305+
# Test with a finished future (exception)
306+
f = Future()
307+
exc = ValueError("test error")
308+
f.set_exception(exc)
309+
done, cancelled, result, exception = f._get_snapshot()
310+
self.assertTrue(done)
311+
self.assertFalse(cancelled)
312+
self.assertIsNone(result)
313+
self.assertIs(exception, exc)
314+
315+
# Test with a cancelled future
316+
f = Future()
317+
f.cancel()
318+
done, cancelled, result, exception = f._get_snapshot()
319+
self.assertTrue(done)
320+
self.assertTrue(cancelled)
321+
self.assertIsNone(result)
322+
self.assertIsNone(exception)
323+
324+
# Test concurrent access (basic thread safety check)
325+
f = Future()
326+
f.set_result(100)
327+
results = []
328+
329+
def get_snapshot():
330+
for _ in range(1000):
331+
snapshot = f._get_snapshot()
332+
results.append(snapshot)
333+
334+
threads = [threading.Thread(target=get_snapshot) for _ in range(4)]
335+
with threading_helper.start_threads(threads):
336+
pass
337+
# All snapshots should be identical for a finished future
338+
expected = (True, False, 100, None)
339+
for result in results:
340+
self.assertEqual(result, expected)
341+
285342

286343
def setUpModule():
287344
setup_module()
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Speed up :mod:`asyncio` performance of transferring state from thread
2+
pool :class:`concurrent.futures.Future` by up to 4.4x. Patch by J. Nick
3+
Koston.

0 commit comments

Comments
 (0)