Skip to content

Commit

Permalink
#11985 fix DeferredLock.run/Semaphore.run/maybeDeferred/ensureDeferre…
Browse files Browse the repository at this point in the history
…d/inlineCallbacks and fromCoroutine type annotation (#11986)
  • Loading branch information
graingert committed Oct 16, 2023
2 parents 105a9f5 + ed25d4a commit 157cd8e
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 53 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ dependencies = [
"Automat >= 0.8.0",
"hyperlink >= 17.1.1",
"attrs >= 21.3.0",
"typing_extensions >= 4.0.0",
"typing_extensions >= 4.2.0",
"twisted-iocpsupport >= 1.0.2, <2; platform_system == 'Windows'",
]

Expand Down
115 changes: 66 additions & 49 deletions src/twisted/internet/defer.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@

import attr
from incremental import Version
from typing_extensions import Concatenate, Literal, ParamSpec
from typing_extensions import Concatenate, Literal, ParamSpec, Self

from twisted.internet.interfaces import IDelayedCall, IReactorTime
from twisted.logger import Logger
Expand Down Expand Up @@ -161,7 +161,7 @@ def maybeDeferred(

@overload
def maybeDeferred(
f: Callable[_P, Coroutine[Deferred[_T], object, _T]],
f: Callable[_P, Coroutine[Deferred[Any], Any, _T]],
*args: _P.args,
**kwargs: _P.kwargs,
) -> "Deferred[_T]":
Expand All @@ -176,7 +176,7 @@ def maybeDeferred(


def maybeDeferred(
f: Callable[_P, Union[Deferred[_T], Coroutine[Deferred[_T], object, _T], _T]],
f: Callable[_P, Union[Deferred[_T], Coroutine[Deferred[Any], Any, _T], _T]],
*args: _P.args,
**kwargs: _P.kwargs,
) -> "Deferred[_T]":
Expand Down Expand Up @@ -1277,8 +1277,8 @@ def uncancel(
def fromCoroutine(
cls,
coro: Union[
Coroutine["Deferred[_T]", Any, _T],
Generator["Deferred[_T]", Any, _T],
Coroutine[Deferred[Any], Any, _T],
Generator[Deferred[Any], Any, _T],
],
) -> "Deferred[_T]":
"""
Expand Down Expand Up @@ -1326,8 +1326,8 @@ def main(reactor):

def ensureDeferred(
coro: Union[
Coroutine[Deferred[_T], Any, _T],
Generator[Deferred[_T], Any, _T],
Coroutine[Deferred[Any], Any, _T],
Generator[Deferred[Any], Any, _T],
Deferred[_T],
]
) -> Deferred[_T]:
Expand Down Expand Up @@ -1923,8 +1923,8 @@ def _gotResultInlineCallbacks(
r: object,
waiting: List[Any],
gen: Union[
Generator[Deferred[object], object, _T],
Coroutine[Deferred[object], object, _T],
Generator[Deferred[Any], Any, _T],
Coroutine[Deferred[Any], Any, _T],
],
status: _CancellationStatus[_T],
context: _Context,
Expand All @@ -1950,8 +1950,8 @@ def _gotResultInlineCallbacks(
def _inlineCallbacks(
result: object,
gen: Union[
Generator[Deferred[object], object, _T],
Coroutine[Deferred[object], object, _T],
Generator[Deferred[Any], Any, _T],
Coroutine[Deferred[Any], Any, _T],
],
status: _CancellationStatus[_T],
context: _Context,
Expand Down Expand Up @@ -2149,8 +2149,8 @@ def _handleCancelInlineCallbacks(

def _cancellableInlineCallbacks(
gen: Union[
Generator["Deferred[object]", object, _T],
Coroutine["Deferred[object]", object, _T],
Generator[Deferred[Any], object, _T],
Coroutine[Deferred[Any], object, _T],
]
) -> Deferred[_T]:
"""
Expand Down Expand Up @@ -2178,7 +2178,7 @@ class _InternalInlineCallbacksCancelledError(Exception):


def inlineCallbacks(
f: Callable[_P, Generator[Deferred[object], object, _T]]
f: Callable[_P, Generator[Deferred[Any], Any, _T]]
) -> Callable[_P, Deferred[_T]]:
"""
L{inlineCallbacks} helps you write L{Deferred}-using code that looks like a
Expand Down Expand Up @@ -2261,26 +2261,47 @@ def unwindGenerator(*args: _P.args, **kwargs: _P.kwargs) -> Deferred[_T]:
## DeferredLock/DeferredQueue


_ConcurrencyPrimitiveT = TypeVar(
"_ConcurrencyPrimitiveT", bound="_ConcurrencyPrimitive[Any]"
)


class _ConcurrencyPrimitive(ABC, Generic[_SelfResultT]):
def __init__(self: _ConcurrencyPrimitiveT) -> None:
self.waiting: List[Deferred[_ConcurrencyPrimitiveT]] = []
class _ConcurrencyPrimitive(ABC):
def __init__(self: Self) -> None:
self.waiting: List[Deferred[Self]] = []

def _releaseAndReturn(self, r: _T) -> _T:
self.release()
return r

@overload
def run(
self: _ConcurrencyPrimitiveT,
self: Self,
/,
f: Callable[..., _SelfResultT],
*args: object,
**kwargs: object,
) -> Deferred[_SelfResultT]:
f: Callable[_P, Deferred[_T]],
*args: _P.args,
**kwargs: _P.kwargs,
) -> Deferred[_T]:
...

@overload
def run(
self: Self,
/,
f: Callable[_P, Coroutine[Deferred[Any], Any, _T]],
*args: _P.args,
**kwargs: _P.kwargs,
) -> Deferred[_T]:
...

@overload
def run(
self: Self, /, f: Callable[_P, _T], *args: _P.args, **kwargs: _P.kwargs
) -> Deferred[_T]:
...

def run(
self: Self,
/,
f: Callable[_P, Union[Deferred[_T], Coroutine[Deferred[Any], Any, _T], _T]],
*args: _P.args,
**kwargs: _P.kwargs,
) -> Deferred[_T]:
"""
Acquire, run, release.
Expand All @@ -2295,12 +2316,16 @@ def run(
@return: L{Deferred} of function result.
"""

def execute(ignoredResult: object) -> Deferred[_SelfResultT]:
return maybeDeferred(f, *args, **kwargs).addBoth(self._releaseAndReturn)
def execute(ignoredResult: object) -> Deferred[_T]:
# maybeDeferred arg type requires one of the possible union members
# and won't accept all possible union members
return maybeDeferred(f, *args, **kwargs).addBoth(
self._releaseAndReturn
) # type: ignore[return-value]

return self.acquire().addCallback(execute)

def __aenter__(self: _ConcurrencyPrimitiveT) -> Deferred[_ConcurrencyPrimitiveT]:
def __aenter__(self: Self) -> Deferred[Self]:
"""
We can be used as an asynchronous context manager.
"""
Expand All @@ -2318,18 +2343,15 @@ def __aexit__(
return succeed(False)

@abstractmethod
def acquire(self: _ConcurrencyPrimitiveT) -> Deferred[_ConcurrencyPrimitiveT]:
def acquire(self: Self) -> Deferred[Self]:
pass

@abstractmethod
def release(self) -> None:
pass


_DeferredLockT = TypeVar("_DeferredLockT", bound="DeferredLock")


class DeferredLock(_ConcurrencyPrimitive[Any]):
class DeferredLock(_ConcurrencyPrimitive):
"""
A lock for event driven systems.
Expand All @@ -2340,7 +2362,7 @@ class DeferredLock(_ConcurrencyPrimitive[Any]):

locked = False

def _cancelAcquire(self: _DeferredLockT, d: Deferred[_DeferredLockT]) -> None:
def _cancelAcquire(self: Self, d: Deferred[Self]) -> None:
"""
Remove a deferred d from our waiting list, as the deferred has been
canceled.
Expand All @@ -2354,7 +2376,7 @@ def _cancelAcquire(self: _DeferredLockT, d: Deferred[_DeferredLockT]) -> None:
"""
self.waiting.remove(d)

def acquire(self: _DeferredLockT) -> Deferred[_DeferredLockT]:
def acquire(self: Self) -> Deferred[Self]:
"""
Attempt to acquire the lock. Returns a L{Deferred} that fires on
lock acquisition with the L{DeferredLock} as the value. If the lock
Expand All @@ -2363,15 +2385,15 @@ def acquire(self: _DeferredLockT) -> Deferred[_DeferredLockT]:
@return: a L{Deferred} which fires on lock acquisition.
@rtype: a L{Deferred}
"""
d: Deferred[_DeferredLockT] = Deferred(canceller=self._cancelAcquire)
d: Deferred[Self] = Deferred(canceller=self._cancelAcquire)
if self.locked:
self.waiting.append(d)
else:
self.locked = True
d.callback(self)
return d

def release(self: _DeferredLockT) -> None:
def release(self: Self) -> None:
"""
Release the lock. If there is a waiting list, then the first
L{Deferred} in that waiting list will be called back.
Expand All @@ -2388,10 +2410,7 @@ def release(self: _DeferredLockT) -> None:
d.callback(self)


_DeferredSemaphoreT = TypeVar("_DeferredSemaphoreT", bound="DeferredSemaphore")


class DeferredSemaphore(_ConcurrencyPrimitive[Any]):
class DeferredSemaphore(_ConcurrencyPrimitive):
"""
A semaphore for event driven systems.
Expand All @@ -2415,9 +2434,7 @@ def __init__(self, tokens: int) -> None:
self.tokens = tokens
self.limit = tokens

def _cancelAcquire(
self: _DeferredSemaphoreT, d: Deferred[_DeferredSemaphoreT]
) -> None:
def _cancelAcquire(self: Self, d: Deferred[Self]) -> None:
"""
Remove a deferred d from our waiting list, as the deferred has been
canceled.
Expand All @@ -2431,7 +2448,7 @@ def _cancelAcquire(
"""
self.waiting.remove(d)

def acquire(self: _DeferredSemaphoreT) -> Deferred[_DeferredSemaphoreT]:
def acquire(self: Self) -> Deferred[Self]:
"""
Attempt to acquire the token.
Expand All @@ -2440,15 +2457,15 @@ def acquire(self: _DeferredSemaphoreT) -> Deferred[_DeferredSemaphoreT]:
assert (
self.tokens >= 0
), "Internal inconsistency?? tokens should never be negative"
d: Deferred[_DeferredSemaphoreT] = Deferred(canceller=self._cancelAcquire)
d: Deferred[Self] = Deferred(canceller=self._cancelAcquire)
if not self.tokens:
self.waiting.append(d)
else:
self.tokens = self.tokens - 1
d.callback(self)
return d

def release(self: _DeferredSemaphoreT) -> None:
def release(self: Self) -> None:
"""
Release the token.
Expand Down
1 change: 1 addition & 0 deletions src/twisted/newsfragments/11985.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix the type annotations of DeferredLock.run, DeferredSemaphore.run, maybeDeferred, ensureDeferred, inlineCallbacks and fromCoroutine that used to return Deferred[Any] to return the result of the passed Coroutine/Coroutine function

0 comments on commit 157cd8e

Please sign in to comment.