Skip to content

Commit

Permalink
Add twisted.internet.defer.race and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
exarkun committed Feb 28, 2023
1 parent 4ed3638 commit d554764
Show file tree
Hide file tree
Showing 2 changed files with 243 additions and 1 deletion.
90 changes: 90 additions & 0 deletions src/twisted/internet/defer.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
Mapping,
NoReturn,
Optional,
Sequence,
Tuple,
Type,
TypeVar,
Expand Down Expand Up @@ -1435,6 +1436,95 @@ def gatherResults(
return cast(Deferred[List[_T]], d)


class MultiFailure(Exception):
"""
More than one failure occurred.
"""

def __init__(self, failures: Sequence[Failure]) -> None:
super(MultiFailure, self).__init__()
self.failures = failures


def race(ds: Sequence[Deferred[_T]]) -> Deferred[tuple[int, _T]]:
"""
Select the first available result from the sequence of Deferreds and
cancel the rest.
@return: A cancellable L{Deferred} that fires with the index and output of
the element of C{ds} to have a success result first, or that fires
with L{MultiFailure} holding a list of their failures if they all
fail.
"""
# Keep track of the Deferred for the action which completed first. When
# it completes, all of the other Deferreds will get cancelled but this one
# shouldn't be. Even though it "completed" it isn't really done - the
# caller will still be using it for something. If we cancelled it,
# cancellation could propagate down to them.
winner: Optional[Deferred] = None

# The cancellation function for the Deferred this function returns.
def cancel(result: Deferred) -> None:
# If it is cancelled then we cancel all of the Deferreds for the
# individual actions because there is no longer the possibility of
# delivering any of their results anywhere. We don't have to fire
# `result` because the Deferred will do that for us.
for d in to_cancel:
d.cancel()

# The Deferred that this function will return. It will fire with the
# input and output of the action that completes first, or None if all of
# the actions fail. If it is cancelled, all of the actions will be
# cancelled.
final_result: Deferred[tuple[int, _T]] = Deferred(canceller=cancel)

# A callback for an individual action.
def succeeded(this_output: _T, this_index: int) -> None:
# If it is the first action to succeed then it becomes the "winner",
# its input/output become the externally visible result, and the rest
# of the action Deferreds get cancelled. If it is not the first
# action to succeed (because some action did not support
# cancellation), just ignore the result. It is uncommon for this
# callback to be entered twice. The only way it can happen is if one
# of the input Deferreds has a cancellation function that fires the
# Deferred with a success result.
nonlocal winner
if winner is None:
# This is the first success. Act on it.
winner = to_cancel[this_index]

# Cancel the rest.
for d in to_cancel:
if d is not winner:
d.cancel()

# Fire our Deferred
final_result.callback((this_index, this_output))

# Keep track of how many actions have failed. If they all fail we need to
# deliver failure notification on our externally visible result.
failure_state = []

def failed(failure: Failure, this_index: int) -> None:
failure_state.append((this_index, failure))
if len(failure_state) == len(to_cancel):
# Every operation failed.
failure_state.sort()
failures = [f for (ignored, f) in failure_state]
final_result.errback(MultiFailure(failures))

# Copy the sequence of Deferreds so we know it doesn't get mutated out
# from under us.
to_cancel = list(ds)
for index, d in enumerate(ds):
# Propagate the position of this action as well as the argument to f
# to the success callback so we can cancel the right Deferreds and
# propagate the result outwards.
d.addCallbacks(succeeded, failed, callbackArgs=(index,), errbackArgs=(index,))

return final_result


# Constants for use with DeferredList
SUCCESS = True
FAILURE = False
Expand Down
154 changes: 153 additions & 1 deletion src/twisted/test/test_defer.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@
cast,
)

from hamcrest import assert_that, equal_to, is_
from hamcrest import assert_that, empty, equal_to, is_
from hypothesis import given
from hypothesis.strategies import integers

from twisted.internet import defer, reactor
from twisted.internet.defer import (
Expand All @@ -46,10 +48,12 @@
DeferredLock,
DeferredQueue,
DeferredSemaphore,
MultiFailure,
_DeferredListResultListT,
_DeferredListSingleResultT,
_DeferredResultT,
ensureDeferred,
race,
)
from twisted.internet.task import Clock
from twisted.python import log
Expand Down Expand Up @@ -1674,6 +1678,154 @@ def test_fromCoroutineRequiresCoroutine(self) -> None:
self.assertRaises(defer.NotACoroutineError, Deferred.fromCoroutine, thing)


def _setupRaceState(numDeferreds: int) -> tuple[list[int], list[Deferred[object]]]:
"""
Create a list of Deferreds and a corresponding list of integers
tracking how many times each Deferred has been cancelled. Without
additional steps the Deferreds will never fire.
"""
cancelledState = [0] * numDeferreds

ds: list[Deferred[object]] = []
for n in range(numDeferreds):

def cancel(d: Deferred, n: int = n) -> None:
cancelledState[n] += 1

ds.append(Deferred(canceller=cancel))

return cancelledState, ds


class RaceTests(unittest.SynchronousTestCase):
"""
Tests for L{race}.
"""

@given(
beforeWinner=integers(min_value=0, max_value=3),
afterWinner=integers(min_value=0, max_value=3),
)
def test_success(self, beforeWinner: int, afterWinner: int) -> None:
"""
When one of the L{Deferred}s passed to L{race} fires successfully,
the L{Deferred} return by L{race} fires with the index of that
L{Deferred} and its result and cancels the rest of the L{Deferred}s.
@param beforeWinner: A randomly selected number of Deferreds to
appear before the "winning" Deferred in the list passed in.
@param beforeWinner: A randomly selected number of Deferreds to
appear after the "winning" Deferred in the list passed in.
"""
cancelledState, ds = _setupRaceState(beforeWinner + 1 + afterWinner)

raceResult = race(ds)
expected = object()
ds[beforeWinner].callback(expected)

# The result should be the index and result of the only Deferred that
# fired.
assert_that(
self.successResultOf(raceResult),
equal_to((beforeWinner, expected)),
)
# All Deferreds except the winner should have been cancelled once.
expectedCancelledState = [1] * beforeWinner + [0] + [1] * afterWinner
assert_that(
cancelledState,
equal_to(expectedCancelledState),
)

@given(
beforeWinner=integers(min_value=0, max_value=3),
afterWinner=integers(min_value=0, max_value=3),
)
def test_failure(self, beforeWinner: int, afterWinner: int) -> None:
"""
When all of the L{Deferred}s passed to L{race} fire with failures,
the L{Deferred} return by L{race} fires with L{MultiFailure} wrapping
all of their failures.
@param beforeWinner: A randomly selected number of Deferreds to
appear before the "winning" Deferred in the list passed in.
@param beforeWinner: A randomly selected number of Deferreds to
appear after the "winning" Deferred in the list passed in.
"""
cancelledState, ds = _setupRaceState(beforeWinner + 1 + afterWinner)

failure = Failure(Exception("The test demands failures."))
raceResult = race(ds)
for d in ds:
d.errback(failure)

actualFailure = self.failureResultOf(raceResult, MultiFailure)
assert_that(
actualFailure.value.failures,
equal_to([failure] * len(ds)),
)
assert_that(
cancelledState,
equal_to([0] * len(ds)),
)

@given(
beforeWinner=integers(min_value=0, max_value=3),
afterWinner=integers(min_value=0, max_value=3),
)
def test_resultAfterCancel(self, beforeWinner: int, afterWinner: int) -> None:
"""
If one of the Deferreds fires after it was cancelled its result
goes nowhere. In particular, it does not cause any errors to be
logged.
"""
# Ensure we have a Deferred to win and at least one other Deferred
# that can ignore cancellation.
ds: list[Deferred[None]] = [
Deferred() for n in range(beforeWinner + 2 + afterWinner)
]

raceResult = race(ds)
ds[beforeWinner].callback(None)
ds[beforeWinner + 1].callback(None)

self.successResultOf(raceResult)
assert_that(self.flushLoggedErrors(), empty())

def test_resultFromCancel(self) -> None:
"""
If one of the input Deferreds has a cancel function that fires it
with success, nothing bad happens.
"""
winner: Deferred[object] = Deferred()
ds: list[Deferred[object]] = [
winner,
Deferred(canceller=lambda d: d.callback(object())),
]
expected = object()
raceResult = race(ds)
winner.callback(expected)

assert_that(self.successResultOf(raceResult), equal_to((0, expected)))

@given(
numDeferreds=integers(min_value=1, max_value=3),
)
def test_cancel(self, numDeferreds: int) -> None:
"""
If the result of L{race} is cancelled then all of the L{Deferred}s
passed in are cancelled.
"""
cancelledState, ds = _setupRaceState(numDeferreds)

raceResult = race(ds)
raceResult.cancel()

assert_that(cancelledState, equal_to([1] * numDeferreds))
self.failureResultOf(raceResult, MultiFailure)


class FirstErrorTests(unittest.SynchronousTestCase):
"""
Tests for L{FirstError}.
Expand Down

0 comments on commit d554764

Please sign in to comment.