Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
glyph committed Nov 21, 2022
1 parent a3d6a5c commit a2cf925
Show file tree
Hide file tree
Showing 2 changed files with 190 additions and 53 deletions.
229 changes: 184 additions & 45 deletions src/twisted/internet/defer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
Maintainer: Glyph Lefkowitz
"""
from __future__ import annotations

import traceback
import warnings
Expand All @@ -21,6 +22,7 @@
Any,
Awaitable,
Callable,
Concatenate,
Coroutine,
Generator,
Generic,
Expand Down Expand Up @@ -78,7 +80,87 @@ def _copy_context() -> Type[_NoContext]:


_T = TypeVar("_T")

_ResultParam = TypeVar("_ResultParam", contravariant=True)
_P = ParamSpec("_P")
_R = TypeVar("_R")
_R_co = TypeVar("_R_co", covariant=True)


class _SyncCallback(Protocol[_ResultParam, _P, _R_co]):
def __call__(
self, result: _ResultParam, *args: _P.args, **kwargs: _P.kwargs
) -> _R_co:
...


class _AsyncCallback(Protocol[_ResultParam, _P, _R]):
def __call__(
self, result: _ResultParam, *args: _P.args, **kwargs: _P.kwargs
) -> Deferred[_R]:
...


class _FailingCallback(Protocol[_ResultParam, _P]):
def __call__(
self, result: _ResultParam, *args: _P.args, **kwargs: _P.kwargs
) -> Failure:
...


class _SyncRecoveryCallback(Protocol[_P, _R_co]):
def __call__(self, result: Failure, *args: _P.args, **kwargs: _P.kwargs) -> _R_co:
...


class _AsyncRecoveryCallback(Protocol[_P, _R]):
def __call__(
self, result: Failure, *args: _P.args, **kwargs: _P.kwargs
) -> Deferred[_R]:
...


class _FailingRecoveryCallback(Protocol[_P]):
def __call__(self, result: Failure, *args: _P.args, **kwargs: _P.kwargs) -> Failure:
...


class _SyncEitherCallback(Protocol[_ResultParam, _P, _R_co]):
def __call__(
self, result: Union[_ResultParam, Failure], *args: _P.args, **kwargs: _P.kwargs
) -> _R_co:
...


class _AsyncEitherCallback(Protocol[_ResultParam, _P, _R]):
def __call__(
self, result: Union[_ResultParam, Failure], *args: _P.args, **kwargs: _P.kwargs
) -> Deferred[_R]:
...


class _FailingEitherCallback(Protocol[_ResultParam, _P]):
def __call__(
self, result: Union[_ResultParam, Failure], *args: _P.args, **kwargs: _P.kwargs
) -> Failure:
...


# class _EitherCallback(Protocol[_ResultParam, _P, _R_co]):
# def __call__(
# self, result: Union[_ResultParam, Failure], /, *args: _P.args, **kwargs: _P.kwargs
# ) -> Union[_R_co, Deferred[_R_co], Failure]:
# ...


# _DeferredCallback: TypeAlias = Union[
# _SyncCallback[_T, _P, _R],
# _AsyncCallback[_T, _P, _R],
# _FailingCallback[_T, _P],
# ]

# ^ This must be expanded in-line everywhere, due to this issue:
# https://github.com/python/mypy/issues/11855


class AlreadyCalledError(Exception):
Expand Down Expand Up @@ -246,10 +328,16 @@ def passthru(arg: _T) -> _T:
return arg


def _failthru(arg: Failure) -> Failure:
_passthru: _SyncCallback[Any, Any, Any]


def _failthruImpl(arg: Failure) -> Failure:
return arg


_failthru: _FailingRecoveryCallback[Any]


def setDebugging(on: bool) -> None:
"""
Enable or disable L{Deferred} debugging.
Expand Down Expand Up @@ -333,8 +421,8 @@ class _Sentinel(Enum):
_NONE_KWARGS: _CallbackKeywordArguments = MappingProxyType({})


_DeferredResultT = TypeVar("_DeferredResultT", contravariant=True)
_NextDeferredResultT = TypeVar("_NextDeferredResultT", covariant=True)
_DeferredResultT = TypeVar("_DeferredResultT")
_NextDeferredResultT = TypeVar("_NextDeferredResultT")


class DebugInfo:
Expand Down Expand Up @@ -466,16 +554,27 @@ def __init__(

def addCallbacks(
self,
callback: Callable[
...,
"Union[_NextDeferredResultT, Deferred[_NextDeferredResultT]]",
callback: Union[
Callable[..., _NextDeferredResultT],
Callable[..., Deferred[_NextDeferredResultT]],
Callable[..., Failure],
Callable[
...,
Union[_NextDeferredResultT, Deferred[_NextDeferredResultT], Failure],
],
],
errback: Callable[
...,
"Union[Failure, _NextDeferredResultT, Deferred[_NextDeferredResultT]]",
] = _failthru,
callbackArgs: _CallbackOrderedArguments = (),
callbackKeywords: _CallbackKeywordArguments = _NONE_KWARGS,
errback: Union[
Callable[..., _NextDeferredResultT],
Callable[..., Deferred[_NextDeferredResultT]],
Callable[..., Failure],
Callable[
...,
Union[_NextDeferredResultT, Deferred[_NextDeferredResultT], Failure],
],
None,
] = None,
callbackArgs: Tuple[Any, ...] = (),
callbackKeywords: Mapping[str, Any] = _NONE_KWARGS,
errbackArgs: _CallbackOrderedArguments = (),
errbackKeywords: _CallbackKeywordArguments = _NONE_KWARGS,
) -> "Deferred[_NextDeferredResultT]":
Expand All @@ -486,9 +585,10 @@ def addCallbacks(
@return: C{self}.
"""
# Default value used to be None and callers may be using None
if errback is None:
errback = _failthru # type: ignore[unreachable]
errback = _failthru

# Default value used to be None and callers may be using None
if callbackArgs is None:
callbackArgs = () # type: ignore[unreachable]
if callbackKeywords is None:
Expand Down Expand Up @@ -517,12 +617,27 @@ def addCallbacks(

def addCallback(
self,
callback: Callable[
...,
"Union[_NextDeferredResultT, Deferred[_NextDeferredResultT]]",
callback:
# Union[
Callable[
Concatenate[_DeferredResultT, _P],
Union[_NextDeferredResultT, Deferred[_NextDeferredResultT], Failure],
],
*args: object,
**kwargs: object,
# Callable[
# Concatenate[_DeferredResultT, _P],
# _NextDeferredResultT
# ],
# Callable[
# Concatenate[_DeferredResultT, _P],
# Deferred[_NextDeferredResultT],
# ],
# Callable[
# Concatenate[_DeferredResultT, _P],
# Failure,
# ],
# ],
*args: _P.args,
**kwargs: _P.kwargs,
) -> "Deferred[_NextDeferredResultT]":
"""
Convenience method for adding just a callback.
Expand All @@ -533,13 +648,10 @@ def addCallback(

def addErrback(
self,
errback: Callable[
...,
"Union[Failure, _NextDeferredResultT, Deferred[_NextDeferredResultT]]",
],
*args: object,
**kwargs: object,
) -> "Deferred[Union[_DeferredResultT, _NextDeferredResultT]]":
errback: Callable[Concatenate[Failure, _P], _NextDeferredResultT],
*args: _P.args,
**kwargs: _P.kwargs,
) -> "Deferred[Union[_NextDeferredResultT]]":
"""
Convenience method for adding just an errback.
Expand All @@ -549,20 +661,34 @@ def addErrback(
# can't propagate through to _NextDeferredResultT, so we have to
# ignore a type error.
return self.addCallbacks(
passthru,
errback, # type: ignore[arg-type]
_passthru,
errback,
errbackArgs=args,
errbackKeywords=kwargs,
)

def addBoth(
self,
callback: Callable[
...,
"Union[_NextDeferredResultT, Deferred[_NextDeferredResultT]]",
callback: Union[
Callable[
Concatenate[Union[_DeferredResultT, Failure], _P], _NextDeferredResultT
],
Callable[
Concatenate[Union[_DeferredResultT, Failure], _P],
Deferred[_NextDeferredResultT],
],
Callable[Concatenate[Union[_DeferredResultT, Failure], _P], Failure],
Callable[
Concatenate[Union[_DeferredResultT, Failure], _P],
Union[Failure, _NextDeferredResultT],
],
Callable[
Concatenate[Union[_DeferredResultT, Failure], _P],
Union[Failure, _NextDeferredResultT, Deferred[_NextDeferredResultT]],
],
],
*args: object,
**kwargs: object,
*args: _P.args,
**kwargs: _P.kwargs,
) -> "Deferred[_NextDeferredResultT]":
"""
Convenience method for adding a single callable as both a callback
Expand All @@ -583,7 +709,9 @@ def addTimeout(
self,
timeout: float,
clock: IReactorTime,
onTimeoutCancel: Optional[Callable[[object, float], object]] = None,
onTimeoutCancel: Optional[
Callable[[_DeferredResultT, float], _DeferredResultT]
] = None,
) -> "Deferred[_DeferredResultT]":
"""
Time out this L{Deferred} by scheduling it to be cancelled after
Expand Down Expand Up @@ -621,13 +749,16 @@ def timeItOut() -> None:

delayedCall = clock.callLater(timeout, timeItOut)

def convertCancelled(value: object) -> object:
def convertCancelled(
result: _DeferredResultT | Failure,
) -> _DeferredResultT | Failure:
# if C{deferred} was timed out, call the translation function,
# if provided, otherwise just use L{cancelledToTimedOutError}
if timedOut[0]:
assert not isinstance(result, Failure)
toCall = onTimeoutCancel or _cancelledToTimedOutError
return toCall(value, timeout)
return value
return toCall(result, timeout)
return result

self.addBoth(convertCancelled)

Expand Down Expand Up @@ -1141,6 +1272,14 @@ def main(reactor):
raise NotACoroutineError(f"{coro!r} is not a coroutine")


# _EitherCallback = Callable[[Union[_ResultParam, Failure]], Union[_R_co, Deferred[_R_co]]]
class _EitherCallback(Protocol[_ResultParam, _P, _R_co]):
def __call__(
self, result: Union[_ResultParam, Failure], *args: _P.args, **kwargs: _P.kwargs
) -> Union[_R_co, Deferred[_R_co], Failure]:
...


def ensureDeferred(
coro: Union[
Coroutine[Deferred[_T], Any, _T],
Expand Down Expand Up @@ -1631,7 +1770,7 @@ def returnValue(val: object) -> NoReturn:


@attr.s(auto_attribs=True)
class _CancellationStatus:
class _CancellationStatus(Generic[_DeferredResultT]):
"""
Cancellation status of an L{inlineCallbacks} invocation.
Expand All @@ -1641,8 +1780,8 @@ class _CancellationStatus:
L{_inlineCallbacks} must fill out before returning)
"""

deferred: Deferred[object]
waitingOn: Optional[Deferred[object]] = None
deferred: Deferred[_DeferredResultT]
waitingOn: Optional[Deferred[_DeferredResultT]] = None


@_extraneous
Expand Down Expand Up @@ -1826,16 +1965,16 @@ def _cancellableInlineCallbacks(
@return: L{Deferred} for the C{@}L{inlineCallbacks} that is cancellable.
"""

def cancel(it: Deferred[object]) -> None:
def cancel(it: Deferred[_T]) -> None:
it.callbacks, tmp = [], it.callbacks
it.addErrback(handleCancel)
it.callbacks.extend(tmp)
it.errback(_InternalInlineCallbacksCancelledError())

deferred: Deferred[object] = Deferred(cancel)
deferred: Deferred[_T] = Deferred(cancel)
status = _CancellationStatus(deferred)

def handleCancel(result: Failure) -> Deferred[object]:
def handleCancel(result: Failure) -> Union[Failure, _T, Deferred[_T]]:
"""
Propagate the cancellation of an C{@}L{inlineCallbacks} to the
L{Deferred} it is waiting on.
Expand Down Expand Up @@ -1933,7 +2072,7 @@ def loadData(url):
"""

@wraps(f)
def unwindGenerator(*args: object, **kwargs: object) -> Deferred[object]:
def unwindGenerator(*args: object, **kwargs: object) -> Deferred[_T]:
try:
gen = f(*args, **kwargs)
except _DefGen_Return:
Expand Down Expand Up @@ -2193,7 +2332,7 @@ def __init__(
self.size = size
self.backlog = backlog

def _cancelGet(self, d: Deferred[object]) -> None:
def _cancelGet(self, d: Deferred[_T]) -> None:
"""
Remove a deferred d from our waiting list, as the deferred has been
canceled.
Expand Down

0 comments on commit a2cf925

Please sign in to comment.