From 4ed3638f94fd99d83e9d8ac958776bfab6e6d5c9 Mon Sep 17 00:00:00 2001 From: Jean-Paul Calderone Date: Tue, 28 Feb 2023 09:39:19 -0500 Subject: [PATCH 1/7] news fragment --- src/twisted/newsfragments/11817.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 src/twisted/newsfragments/11817.feature diff --git a/src/twisted/newsfragments/11817.feature b/src/twisted/newsfragments/11817.feature new file mode 100644 index 00000000000..ffeaae09b2c --- /dev/null +++ b/src/twisted/newsfragments/11817.feature @@ -0,0 +1 @@ +`twisted.internet.defer.race` has been added as a way to get the first available result from a list of Deferreds. \ No newline at end of file From d55476466fe28f6715afac88ce3ea5dbe51ee6dd Mon Sep 17 00:00:00 2001 From: Jean-Paul Calderone Date: Tue, 28 Feb 2023 11:02:33 -0500 Subject: [PATCH 2/7] Add `twisted.internet.defer.race` and tests --- src/twisted/internet/defer.py | 90 +++++++++++++++++++ src/twisted/test/test_defer.py | 154 ++++++++++++++++++++++++++++++++- 2 files changed, 243 insertions(+), 1 deletion(-) diff --git a/src/twisted/internet/defer.py b/src/twisted/internet/defer.py index 9aa8938cb24..78807ce528a 100644 --- a/src/twisted/internet/defer.py +++ b/src/twisted/internet/defer.py @@ -29,6 +29,7 @@ Mapping, NoReturn, Optional, + Sequence, Tuple, Type, TypeVar, @@ -1435,6 +1436,95 @@ def gatherResults( return cast(Deferred[List[_T]], d) +class MultiFailure(Exception): + """ + More than one failure occurred. + """ + + def __init__(self, failures: Sequence[Failure]) -> None: + super(MultiFailure, self).__init__() + self.failures = failures + + +def race(ds: Sequence[Deferred[_T]]) -> Deferred[tuple[int, _T]]: + """ + Select the first available result from the sequence of Deferreds and + cancel the rest. + + @return: A cancellable L{Deferred} that fires with the index and output of + the element of C{ds} to have a success result first, or that fires + with L{MultiFailure} holding a list of their failures if they all + fail. + """ + # Keep track of the Deferred for the action which completed first. When + # it completes, all of the other Deferreds will get cancelled but this one + # shouldn't be. Even though it "completed" it isn't really done - the + # caller will still be using it for something. If we cancelled it, + # cancellation could propagate down to them. + winner: Optional[Deferred] = None + + # The cancellation function for the Deferred this function returns. + def cancel(result: Deferred) -> None: + # If it is cancelled then we cancel all of the Deferreds for the + # individual actions because there is no longer the possibility of + # delivering any of their results anywhere. We don't have to fire + # `result` because the Deferred will do that for us. + for d in to_cancel: + d.cancel() + + # The Deferred that this function will return. It will fire with the + # input and output of the action that completes first, or None if all of + # the actions fail. If it is cancelled, all of the actions will be + # cancelled. + final_result: Deferred[tuple[int, _T]] = Deferred(canceller=cancel) + + # A callback for an individual action. + def succeeded(this_output: _T, this_index: int) -> None: + # If it is the first action to succeed then it becomes the "winner", + # its input/output become the externally visible result, and the rest + # of the action Deferreds get cancelled. If it is not the first + # action to succeed (because some action did not support + # cancellation), just ignore the result. It is uncommon for this + # callback to be entered twice. The only way it can happen is if one + # of the input Deferreds has a cancellation function that fires the + # Deferred with a success result. + nonlocal winner + if winner is None: + # This is the first success. Act on it. + winner = to_cancel[this_index] + + # Cancel the rest. + for d in to_cancel: + if d is not winner: + d.cancel() + + # Fire our Deferred + final_result.callback((this_index, this_output)) + + # Keep track of how many actions have failed. If they all fail we need to + # deliver failure notification on our externally visible result. + failure_state = [] + + def failed(failure: Failure, this_index: int) -> None: + failure_state.append((this_index, failure)) + if len(failure_state) == len(to_cancel): + # Every operation failed. + failure_state.sort() + failures = [f for (ignored, f) in failure_state] + final_result.errback(MultiFailure(failures)) + + # Copy the sequence of Deferreds so we know it doesn't get mutated out + # from under us. + to_cancel = list(ds) + for index, d in enumerate(ds): + # Propagate the position of this action as well as the argument to f + # to the success callback so we can cancel the right Deferreds and + # propagate the result outwards. + d.addCallbacks(succeeded, failed, callbackArgs=(index,), errbackArgs=(index,)) + + return final_result + + # Constants for use with DeferredList SUCCESS = True FAILURE = False diff --git a/src/twisted/test/test_defer.py b/src/twisted/test/test_defer.py index 88f4559baa6..a81c680943e 100644 --- a/src/twisted/test/test_defer.py +++ b/src/twisted/test/test_defer.py @@ -36,7 +36,9 @@ cast, ) -from hamcrest import assert_that, equal_to, is_ +from hamcrest import assert_that, empty, equal_to, is_ +from hypothesis import given +from hypothesis.strategies import integers from twisted.internet import defer, reactor from twisted.internet.defer import ( @@ -46,10 +48,12 @@ DeferredLock, DeferredQueue, DeferredSemaphore, + MultiFailure, _DeferredListResultListT, _DeferredListSingleResultT, _DeferredResultT, ensureDeferred, + race, ) from twisted.internet.task import Clock from twisted.python import log @@ -1674,6 +1678,154 @@ def test_fromCoroutineRequiresCoroutine(self) -> None: self.assertRaises(defer.NotACoroutineError, Deferred.fromCoroutine, thing) +def _setupRaceState(numDeferreds: int) -> tuple[list[int], list[Deferred[object]]]: + """ + Create a list of Deferreds and a corresponding list of integers + tracking how many times each Deferred has been cancelled. Without + additional steps the Deferreds will never fire. + """ + cancelledState = [0] * numDeferreds + + ds: list[Deferred[object]] = [] + for n in range(numDeferreds): + + def cancel(d: Deferred, n: int = n) -> None: + cancelledState[n] += 1 + + ds.append(Deferred(canceller=cancel)) + + return cancelledState, ds + + +class RaceTests(unittest.SynchronousTestCase): + """ + Tests for L{race}. + """ + + @given( + beforeWinner=integers(min_value=0, max_value=3), + afterWinner=integers(min_value=0, max_value=3), + ) + def test_success(self, beforeWinner: int, afterWinner: int) -> None: + """ + When one of the L{Deferred}s passed to L{race} fires successfully, + the L{Deferred} return by L{race} fires with the index of that + L{Deferred} and its result and cancels the rest of the L{Deferred}s. + + @param beforeWinner: A randomly selected number of Deferreds to + appear before the "winning" Deferred in the list passed in. + + @param beforeWinner: A randomly selected number of Deferreds to + appear after the "winning" Deferred in the list passed in. + """ + cancelledState, ds = _setupRaceState(beforeWinner + 1 + afterWinner) + + raceResult = race(ds) + expected = object() + ds[beforeWinner].callback(expected) + + # The result should be the index and result of the only Deferred that + # fired. + assert_that( + self.successResultOf(raceResult), + equal_to((beforeWinner, expected)), + ) + # All Deferreds except the winner should have been cancelled once. + expectedCancelledState = [1] * beforeWinner + [0] + [1] * afterWinner + assert_that( + cancelledState, + equal_to(expectedCancelledState), + ) + + @given( + beforeWinner=integers(min_value=0, max_value=3), + afterWinner=integers(min_value=0, max_value=3), + ) + def test_failure(self, beforeWinner: int, afterWinner: int) -> None: + """ + When all of the L{Deferred}s passed to L{race} fire with failures, + the L{Deferred} return by L{race} fires with L{MultiFailure} wrapping + all of their failures. + + @param beforeWinner: A randomly selected number of Deferreds to + appear before the "winning" Deferred in the list passed in. + + @param beforeWinner: A randomly selected number of Deferreds to + appear after the "winning" Deferred in the list passed in. + """ + cancelledState, ds = _setupRaceState(beforeWinner + 1 + afterWinner) + + failure = Failure(Exception("The test demands failures.")) + raceResult = race(ds) + for d in ds: + d.errback(failure) + + actualFailure = self.failureResultOf(raceResult, MultiFailure) + assert_that( + actualFailure.value.failures, + equal_to([failure] * len(ds)), + ) + assert_that( + cancelledState, + equal_to([0] * len(ds)), + ) + + @given( + beforeWinner=integers(min_value=0, max_value=3), + afterWinner=integers(min_value=0, max_value=3), + ) + def test_resultAfterCancel(self, beforeWinner: int, afterWinner: int) -> None: + """ + If one of the Deferreds fires after it was cancelled its result + goes nowhere. In particular, it does not cause any errors to be + logged. + """ + # Ensure we have a Deferred to win and at least one other Deferred + # that can ignore cancellation. + ds: list[Deferred[None]] = [ + Deferred() for n in range(beforeWinner + 2 + afterWinner) + ] + + raceResult = race(ds) + ds[beforeWinner].callback(None) + ds[beforeWinner + 1].callback(None) + + self.successResultOf(raceResult) + assert_that(self.flushLoggedErrors(), empty()) + + def test_resultFromCancel(self) -> None: + """ + If one of the input Deferreds has a cancel function that fires it + with success, nothing bad happens. + """ + winner: Deferred[object] = Deferred() + ds: list[Deferred[object]] = [ + winner, + Deferred(canceller=lambda d: d.callback(object())), + ] + expected = object() + raceResult = race(ds) + winner.callback(expected) + + assert_that(self.successResultOf(raceResult), equal_to((0, expected))) + + @given( + numDeferreds=integers(min_value=1, max_value=3), + ) + def test_cancel(self, numDeferreds: int) -> None: + """ + If the result of L{race} is cancelled then all of the L{Deferred}s + passed in are cancelled. + """ + cancelledState, ds = _setupRaceState(numDeferreds) + + raceResult = race(ds) + raceResult.cancel() + + assert_that(cancelledState, equal_to([1] * numDeferreds)) + self.failureResultOf(raceResult, MultiFailure) + + class FirstErrorTests(unittest.SynchronousTestCase): """ Tests for L{FirstError}. From 9d8d21817083c948760264d26995c6bd5c9c93c6 Mon Sep 17 00:00:00 2001 From: Jean-Paul Calderone Date: Tue, 28 Feb 2023 14:47:49 -0500 Subject: [PATCH 3/7] Use __future__.annotations for indexable builtins --- src/twisted/internet/defer.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/twisted/internet/defer.py b/src/twisted/internet/defer.py index 78807ce528a..60368b0e5a2 100644 --- a/src/twisted/internet/defer.py +++ b/src/twisted/internet/defer.py @@ -8,6 +8,8 @@ Maintainer: Glyph Lefkowitz """ +from __future__ import annotations + import traceback import warnings from abc import ABC, abstractmethod @@ -1448,7 +1450,7 @@ def __init__(self, failures: Sequence[Failure]) -> None: def race(ds: Sequence[Deferred[_T]]) -> Deferred[tuple[int, _T]]: """ - Select the first available result from the sequence of Deferreds and + Select the first available result from the sequence of Deferreds and cancel the rest. @return: A cancellable L{Deferred} that fires with the index and output of From eaaabd32b151dc0f739b2b5a2bbcc160cb43b000 Mon Sep 17 00:00:00 2001 From: Jean-Paul Calderone Date: Tue, 28 Feb 2023 14:49:00 -0500 Subject: [PATCH 4/7] whitespace... --- src/twisted/internet/defer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/twisted/internet/defer.py b/src/twisted/internet/defer.py index 60368b0e5a2..43cfa9e8141 100644 --- a/src/twisted/internet/defer.py +++ b/src/twisted/internet/defer.py @@ -1450,7 +1450,7 @@ def __init__(self, failures: Sequence[Failure]) -> None: def race(ds: Sequence[Deferred[_T]]) -> Deferred[tuple[int, _T]]: """ - Select the first available result from the sequence of Deferreds and + Select the first available result from the sequence of Deferreds and cancel the rest. @return: A cancellable L{Deferred} that fires with the index and output of From 93e0aabaad78661d2fb35304883b952cde58a1bb Mon Sep 17 00:00:00 2001 From: Jean-Paul Calderone Date: Tue, 28 Feb 2023 14:50:16 -0500 Subject: [PATCH 5/7] It fires with an index, not an input --- src/twisted/internet/defer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/twisted/internet/defer.py b/src/twisted/internet/defer.py index 43cfa9e8141..42bb88d24dd 100644 --- a/src/twisted/internet/defer.py +++ b/src/twisted/internet/defer.py @@ -1475,7 +1475,7 @@ def cancel(result: Deferred) -> None: d.cancel() # The Deferred that this function will return. It will fire with the - # input and output of the action that completes first, or None if all of + # index and output of the action that completes first, or None if all of # the actions fail. If it is cancelled, all of the actions will be # cancelled. final_result: Deferred[tuple[int, _T]] = Deferred(canceller=cancel) @@ -1483,7 +1483,7 @@ def cancel(result: Deferred) -> None: # A callback for an individual action. def succeeded(this_output: _T, this_index: int) -> None: # If it is the first action to succeed then it becomes the "winner", - # its input/output become the externally visible result, and the rest + # its index/output become the externally visible result, and the rest # of the action Deferreds get cancelled. If it is not the first # action to succeed (because some action did not support # cancellation), just ignore the result. It is uncommon for this From cd1cb485d52088ef9b5fe4050ce6ded16719eca4 Mon Sep 17 00:00:00 2001 From: Jean-Paul Calderone Date: Tue, 28 Feb 2023 14:52:21 -0500 Subject: [PATCH 6/7] Also use __future__.annotations in the test suite --- src/twisted/test/test_defer.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/twisted/test/test_defer.py b/src/twisted/test/test_defer.py index a81c680943e..a1fe5aafb0b 100644 --- a/src/twisted/test/test_defer.py +++ b/src/twisted/test/test_defer.py @@ -5,6 +5,7 @@ Test cases for L{twisted.internet.defer}. """ +from __future__ import annotations import functools import gc From f8613e9f2bed1b142e430c3b1148d0e558c1b78b Mon Sep 17 00:00:00 2001 From: Jean-Paul Calderone Date: Mon, 13 Mar 2023 17:09:07 -0400 Subject: [PATCH 7/7] use FailureGroup to parallel stdlib ExceptionGroup --- src/twisted/internet/defer.py | 12 ++++++------ src/twisted/test/test_defer.py | 8 ++++---- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/twisted/internet/defer.py b/src/twisted/internet/defer.py index 42bb88d24dd..e76bd1feb48 100644 --- a/src/twisted/internet/defer.py +++ b/src/twisted/internet/defer.py @@ -1438,13 +1438,13 @@ def gatherResults( return cast(Deferred[List[_T]], d) -class MultiFailure(Exception): +class FailureGroup(Exception): """ More than one failure occurred. """ def __init__(self, failures: Sequence[Failure]) -> None: - super(MultiFailure, self).__init__() + super(FailureGroup, self).__init__() self.failures = failures @@ -1455,7 +1455,7 @@ def race(ds: Sequence[Deferred[_T]]) -> Deferred[tuple[int, _T]]: @return: A cancellable L{Deferred} that fires with the index and output of the element of C{ds} to have a success result first, or that fires - with L{MultiFailure} holding a list of their failures if they all + with L{FailureGroup} holding a list of their failures if they all fail. """ # Keep track of the Deferred for the action which completed first. When @@ -1475,8 +1475,8 @@ def cancel(result: Deferred) -> None: d.cancel() # The Deferred that this function will return. It will fire with the - # index and output of the action that completes first, or None if all of - # the actions fail. If it is cancelled, all of the actions will be + # index and output of the action that completes first, or errback if all + # of the actions fail. If it is cancelled, all of the actions will be # cancelled. final_result: Deferred[tuple[int, _T]] = Deferred(canceller=cancel) @@ -1513,7 +1513,7 @@ def failed(failure: Failure, this_index: int) -> None: # Every operation failed. failure_state.sort() failures = [f for (ignored, f) in failure_state] - final_result.errback(MultiFailure(failures)) + final_result.errback(FailureGroup(failures)) # Copy the sequence of Deferreds so we know it doesn't get mutated out # from under us. diff --git a/src/twisted/test/test_defer.py b/src/twisted/test/test_defer.py index a1fe5aafb0b..f2c487ba05e 100644 --- a/src/twisted/test/test_defer.py +++ b/src/twisted/test/test_defer.py @@ -49,7 +49,7 @@ DeferredLock, DeferredQueue, DeferredSemaphore, - MultiFailure, + FailureGroup, _DeferredListResultListT, _DeferredListSingleResultT, _DeferredResultT, @@ -1745,7 +1745,7 @@ def test_success(self, beforeWinner: int, afterWinner: int) -> None: def test_failure(self, beforeWinner: int, afterWinner: int) -> None: """ When all of the L{Deferred}s passed to L{race} fire with failures, - the L{Deferred} return by L{race} fires with L{MultiFailure} wrapping + the L{Deferred} return by L{race} fires with L{FailureGroup} wrapping all of their failures. @param beforeWinner: A randomly selected number of Deferreds to @@ -1761,7 +1761,7 @@ def test_failure(self, beforeWinner: int, afterWinner: int) -> None: for d in ds: d.errback(failure) - actualFailure = self.failureResultOf(raceResult, MultiFailure) + actualFailure = self.failureResultOf(raceResult, FailureGroup) assert_that( actualFailure.value.failures, equal_to([failure] * len(ds)), @@ -1824,7 +1824,7 @@ def test_cancel(self, numDeferreds: int) -> None: raceResult.cancel() assert_that(cancelledState, equal_to([1] * numDeferreds)) - self.failureResultOf(raceResult, MultiFailure) + self.failureResultOf(raceResult, FailureGroup) class FirstErrorTests(unittest.SynchronousTestCase):