Skip to content

Commit

Permalink
Add a send_async method to the Signal
Browse files Browse the repository at this point in the history
This allows for signals to send to coroutine receivers by awaiting
them. The _async_wrapper and _sync_wrapper allows for conversion of
sync and async receivers as required if defined. If not defined a
runtime error is raised.

The wrappers are used to avoid any direct tie into asyncio, trio,
greenbacks, asgiref, or other specific async implementation.
  • Loading branch information
pgjones committed Jan 24, 2023
1 parent 0d4ca6e commit 5ed9c95
Show file tree
Hide file tree
Showing 7 changed files with 182 additions and 11 deletions.
63 changes: 63 additions & 0 deletions docs/index.rst
Expand Up @@ -272,6 +272,69 @@ See the documentation of the :obj:`receiver_connected` built-in signal
for an example.


Async receivers
---------------

Receivers can be coroutine functions which can be called and awaited
via the :meth:`~Signal.send_async` method:

.. code-block:: python
sig = blinker.Signal()
async def receiver():
...
sig.connect(receiver)
await sig.send_async()
This however requires that all receivers are awaitable which then
precludes the usage of :meth:`~Signal.send`. To mix and match the
:meth:`~Signal.send_async` method takes a ``_sync_wrapper`` argument
such as:

.. code-block:: python
sig = blinker.Signal()
def receiver():
...
sig.connect(receiver)
def wrapper(func):
async def inner(*args, **kwargs):
func(*args, **kwargs)
return inner
await sig.send_async(_sync_wrapper=wrapper)
The equivalent usage for :meth:`~Signal.send` is via the
``_async_wrapper`` argument. This usage is will depend on your event
loop, and in the simple case whereby you aren't running within an
event loop the following example can be used:

.. code-block:: python
sig = blinker.Signal()
async def receiver():
...
sig.connect(receiver)
def wrapper(func):
def inner(*args, **kwargs):
asyncio.run(func(*args, **kwargs))
return inner
await sig.send(_async_wrapper=wrapper)
API Documentation
-----------------

Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Expand Up @@ -44,5 +44,6 @@ include-package-data = false
version = {attr = "blinker.__version__"}

[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
filterwarnings = ["error"]
1 change: 1 addition & 0 deletions requirements/tests.in
@@ -1 +1,2 @@
pytest
pytest-asyncio
10 changes: 8 additions & 2 deletions requirements/tests.txt
@@ -1,4 +1,4 @@
# SHA1:0eaa389e1fdb3a1917c0f987514bd561be5718ee
# SHA1:738f1ea95febe383951f6eb64bdad13fefc1a97a
#
# This file is autogenerated by pip-compile-multi
# To update, run:
Expand All @@ -20,10 +20,16 @@ packaging==23.0
pluggy==1.0.0
# via pytest
pytest==7.2.1
# via
# -r requirements/tests.in
# pytest-asyncio
pytest-asyncio==0.20.3
# via -r requirements/tests.in
tomli==2.0.1
# via pytest
typing-extensions==4.4.0
# via importlib-metadata
# via
# importlib-metadata
# pytest-asyncio
zipp==3.11.0
# via importlib-metadata
37 changes: 37 additions & 0 deletions src/blinker/_utilities.py
@@ -1,3 +1,8 @@
import asyncio
import inspect
import sys
from functools import partial
from typing import Any
from weakref import ref

from blinker._saferef import BoundMethodWeakref
Expand Down Expand Up @@ -93,3 +98,35 @@ def __get__(self, obj, cls):
value = self._deferred(obj)
setattr(obj, self._deferred.__name__, value)
return value


def is_coroutine_function(func: Any) -> bool:
# Python < 3.8 does not correctly determine partially wrapped
# coroutine functions are coroutine functions, hence the need for
# this to exist. Code taken from CPython.
if sys.version_info >= (3, 8):
return asyncio.iscoroutinefunction(func)
else:
# Note that there is something special about the AsyncMock
# such that it isn't determined as a coroutine function
# without an explicit check.
try:
from unittest.mock import AsyncMock

if isinstance(func, AsyncMock):
return True
except ImportError:
# Not testing, no asynctest to import
pass

while inspect.ismethod(func):
func = func.__func__
while isinstance(func, partial):
func = func.func
if not inspect.isfunction(func):
return False
result = bool(func.__code__.co_flags & inspect.CO_COROUTINE)
return (
result
or getattr(func, "_is_coroutine", None) is asyncio.coroutines._is_coroutine
)
54 changes: 45 additions & 9 deletions src/blinker/base.py
Expand Up @@ -13,6 +13,7 @@
from weakref import WeakValueDictionary

from blinker._utilities import hashable_identity
from blinker._utilities import is_coroutine_function
from blinker._utilities import lazy_property
from blinker._utilities import reference
from blinker._utilities import symbol
Expand Down Expand Up @@ -242,17 +243,59 @@ def temporarily_connected_to(self, receiver, sender=ANY):
)
return self.connected_to(receiver, sender)

def send(self, *sender, **kwargs):
def send(self, *sender, _async_wrapper=None, **kwargs):
"""Emit this signal on behalf of *sender*, passing on ``kwargs``.
Returns a list of 2-tuples, pairing receivers with their return
value. The ordering of receiver notification is undefined.
:param sender: Any object or ``None``. If omitted, synonymous
with ``None``. Only accepts one positional argument.
:param _async_wrapper: A callable that should wrap a coroutine
receiver and run it when called synchronously.
:param kwargs: Data to be sent to receivers.
"""
if self.is_muted:
return []

sender = self._extract_sender(sender)
results = []
for receiver in self.receivers_for(sender):
if is_coroutine_function(receiver):
if _async_wrapper is None:
raise RuntimeError("Cannot send to a coroutine function")
receiver = _async_wrapper(receiver)
results.append((receiver, receiver(sender, **kwargs)))
return results

async def send_async(self, *sender, _sync_wrapper=None, **kwargs):
"""Emit this signal on behalf of *sender*, passing on ``kwargs``.
Returns a list of 2-tuples, pairing receivers with their return
value. The ordering of receiver notification is undefined.
:param sender: Any object or ``None``. If omitted, synonymous
with ``None``. Only accepts one positional argument.
:param _sync_wrapper: A callable that should wrap a synchronous
receiver and run it when awaited.
:param kwargs: Data to be sent to receivers.
"""
if self.is_muted:
return []

sender = self._extract_sender(sender)
results = []
for receiver in self.receivers_for(sender):
if not is_coroutine_function(receiver):
if _sync_wrapper is None:
raise RuntimeError("Cannot send to a non-coroutine function")
receiver = _sync_wrapper(receiver)
results.append((receiver, await receiver(sender, **kwargs)))
return results

def _extract_sender(self, sender):
if not self.receivers:
# Ensure correct signature even on no-op sends, disable with -O
# for lowest possible cost.
Expand All @@ -273,14 +316,7 @@ def send(self, *sender, **kwargs):
)
else:
sender = sender[0]

if self.is_muted:
return []
else:
return [
(receiver, receiver(sender, **kwargs))
for receiver in self.receivers_for(sender)
]
return sender

def has_receivers_for(self, sender):
"""True if there is probably a receiver for *sender*.
Expand Down
27 changes: 27 additions & 0 deletions tests/test_signals.py
Expand Up @@ -332,6 +332,33 @@ def received(sender):
assert [id(fn) for fn in sig.receivers.values()] == [fn_id]


async def test_async_receiver():
sentinel = []

async def received_async(sender):
sentinel.append(sender)

def received(sender):
sentinel.append(sender)

def wrapper(func):

async def inner(*args, **kwargs):
func(*args, **kwargs)

return inner

sig = blinker.Signal()
sig.connect(received)
sig.connect(received_async)

await sig.send_async(_sync_wrapper=wrapper)
assert len(sentinel) == 2

with pytest.raises(RuntimeError):
sig.send()


def test_instancemethod_receiver():
sentinel = []

Expand Down

0 comments on commit 5ed9c95

Please sign in to comment.