Skip to content

Commit

Permalink
Merge pull request #18 from jek/feature/send-async
Browse files Browse the repository at this point in the history
Adds Signal.send_async for asyncio

Author: @jek
  • Loading branch information
yackx committed Jan 29, 2021
2 parents 904d8d3 + c7b83a1 commit b5e9f06
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ Not yet released.
- Verified Python 3.7 support (no changes needed).
- Verified Python 3.6 support (no changes needed).
- Verified Python 3.5 support (no changes needed).
- Added Signal.send_async, dispatching to an arbitrary mix of connected
coroutines and receiver functions.

Version 1.4
-----------
Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ interested parties to subscribe to events, or "signals".
Signal receivers can subscribe to specific senders or receive signals
sent by any sender.

It supports dispatching to an arbitrary mix of connected
coroutines and receiver functions.

```python
>>> from blinker import signal
>>> started = signal('round-started')
Expand Down
5 changes: 5 additions & 0 deletions blinker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,8 @@


__version__ = '1.5dev'

try:
import blinker._async
except (ImportError, SyntaxError):
pass
28 changes: 28 additions & 0 deletions blinker/_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import asyncio

from blinker.base import Signal


try:
schedule = asyncio.create_task
except AttributeError:
schedule = asyncio.ensure_future


@asyncio.coroutine
def _wrap_plain_value(value):
"""Pass through a coroutine *value* or wrap a plain value."""
if asyncio.iscoroutine(value):
value = yield from value
return value


def send_async(self, *sender, **kwargs):
return [(receiver, schedule(_wrap_plain_value(value)))
for receiver, value
in self.send(*sender, **kwargs)]


send_async.__doc__ = Signal.send_async.__doc__
Signal.send_async = send_async

11 changes: 11 additions & 0 deletions blinker/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,17 @@ def send(self, *sender, **kwargs):
return [(receiver, receiver(sender, **kwargs))
for receiver in self.receivers_for(sender)]

def send_async(self, *sender, **kwargs):
"""Send and collect results from connected functions and coroutines.
As `Signal.send`, but also schedules any coroutines connected to the
signal, and uniformly presents all receiver return values as futures,
even if one or more receivers are regular functions.
Available only if asyncio and `yield from` are present.
"""
raise NotImplementedError("asyncio support unavailable")

def has_receivers_for(self, sender):
"""True if there is probably a receiver for *sender*.
Expand Down
30 changes: 30 additions & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ The core of Blinker is quite small but provides powerful features:
- sending arbitrary data payloads
- collecting return values from signal receivers
- thread safety
- coroutines as signal receivers

Blinker was written by Jason Kirtand and is provided under the MIT
License. The library supports Python 2.7 and Python 3.5 or later;
Expand Down Expand Up @@ -94,6 +95,35 @@ notifications being sent, and these no-op sends are optimized to be as
inexpensive as possible.


Async support
-------------

Send a signal asynchronously to coroutine receivers.

>>> async def receiver_a(sender):
... return 'value a'
...
>>> async def receiver_b(sender):
... return 'value b'
...
>>> ready = signal('ready')
>>> ready.connect(receiver_a)
>>> ready.connect(receiver_b)
...
>>> async def collect():
... return ready.send_async('sender')
...
>>> loop = asyncio.get_event_loop()
>>> results = loop.run_until_complete(collect())
>>> len(results)
2
>>> [v.result() for r, v in results][0]
value a

Dispatching to an arbitrary mix of connected
coroutines and receiver functions is supported.


Subscribing to Specific Senders
-------------------------------

Expand Down
43 changes: 43 additions & 0 deletions tests/_test_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import asyncio

import blinker


def test_send_async():
calls = []

@asyncio.coroutine
def receiver_a(sender):
calls.append(receiver_a)
return 'value a'

@asyncio.coroutine
def receiver_b(sender):
calls.append(receiver_b)
return 'value b'

def receiver_c(sender):
calls.append(receiver_c)
return 'value c'

sig = blinker.Signal()
sig.connect(receiver_a)
sig.connect(receiver_b)
sig.connect(receiver_c)

@asyncio.coroutine
def collect():
return sig.send_async()

loop = asyncio.get_event_loop()
results = loop.run_until_complete(collect())

expected = {
receiver_a: 'value a',
receiver_b: 'value b',
receiver_c: 'value c',
}

assert set(calls) == set(expected.keys())
collected_results = {v.result() for r, v in results}
assert collected_results == set(expected.values())
5 changes: 5 additions & 0 deletions tests/test_signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@
jython = sys.platform.startswith('java')
pypy = hasattr(sys, 'pypy_version_info')

try:
from _test_async import test_send_async
except (SyntaxError, ImportError):
pass


def collect_acyclic_refs():
# cpython releases these immediately without a collection
Expand Down

0 comments on commit b5e9f06

Please sign in to comment.