Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add EventStream class based on 'pulse' prototype #1990

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 11 additions & 0 deletions docs/source/reference-core.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1433,6 +1433,17 @@ deadlock. Using an unbounded channel avoids this, because it means
that :meth:`~trio.abc.SendChannel.send` never blocks.


Higher-level synchronization primitives
---------------------------------------

While events and channels are useful in a very wide range of
applications, some less common problems are best tackled with some
higher-level concurrency primitives that focus on a specific problem.

.. autoclass:: EventStream
:members:


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd put the docs for this right after the docs for Event -- "if you clicked here because you were looking for multiple events, then maybe you want...". (Maybe add a cross-ref at the top of the Event docs too.)

Lower-level synchronization primitives
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Expand Down
1 change: 1 addition & 0 deletions trio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
Lock,
StrictFIFOLock,
Condition,
EventStream,
)

from ._highlevel_generic import aclose_forcefully, StapledStream
Expand Down
94 changes: 94 additions & 0 deletions trio/_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -784,3 +784,97 @@ def statistics(self):
return _ConditionStatistics(
tasks_waiting=len(self._lot), lock_statistics=self._lock.statistics()
)


@attr.s
class EventStream(metaclass=Final):
"""A concurrency primitive for a sequence of events.

Multiple tasks can subscribe for events on the stream using an ``async
for`` loop::

events = EventStream()

...

async for _ in events.subscribe():
...

On each loop iteration, a subcriber will be blocked if there are no new
events on the stream. An event can be "fired" on a stream, which causes
subscribers to awake::

events.fire()

By default, events are coalesced, but will never be lost. That is, if any
events are fired while a subscriber is processing its last wakeup, that
subscriber will not block on the next loop iteration.

Note that EventStream does not hold any data items associated with events.
However subscribe() does yield integer indices that indicate a position
in the event stream, which could be used. fire() returns the index of the
event added to the stream.

"""
_write_cursor = attr.ib(default=-1)
_wakeup = attr.ib(default=None)
_closed = attr.ib(default=False)

def close(self):
"""Close the stream.

This causes all subscribers to terminate once they have consumed
all events.
"""
self._closed = True
self._wake()

def _wake(self):
"""Wake blocked tasks."""
if self._wakeup is not None:
self._wakeup.set()
self._wakeup = None

def fire(self):
"""Fire an event on the stream."""
if self._closed:
raise RuntimeError(
"Cannot fire an event on a closed event stream."
)
self._write_cursor += 1
self._wake()
return self._write_cursor

async def _wait(self):
"""Wait for the next wakeup.

We lazily create the Event object to block on if one does not yet
exist; this avoids creating event objects that are never awaited.

"""
if self._wakeup is None:
self._wakeup = trio.Event()
await self._wakeup.wait()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you're worried about efficiency, then also consider implementing this with a set and wait_task_rescheduled – basically inlining Event.


async def subscribe(self, from_start=False, coalesce=True):
"""Subscribe for events on the stream.

If from_start is True, then subscribe for events from the start of
the stream.

If coalesce is True, then each iteration 'consumes' all previous
events; otherwise, each iteration consumes just one event.
"""
read_cursor = -1 if from_start else self._write_cursor
while True:
if self._write_cursor > read_cursor:
if coalesce:
read_cursor = self._write_cursor
else:
read_cursor += 1
yield read_cursor
else:
if self._closed:
return
else:
await self._wait()
125 changes: 124 additions & 1 deletion trio/tests/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from .. import _core
from .. import _timeouts
from .._timeouts import sleep_forever, move_on_after
from .._timeouts import sleep_forever, move_on_after, sleep
from .._sync import *


Expand Down Expand Up @@ -568,3 +568,126 @@ async def lock_taker():
await wait_all_tasks_blocked()
assert record == ["started"]
lock_like.release()


async def test_EventStream_basics():
p = EventStream()

wakeups = 0

async def background():
nonlocal wakeups
async for i in p.subscribe():
wakeups += 1

async with _core.open_nursery() as nursery:
nursery.start_soon(background)

# The event stream starts in a blocked state (no event fired)
await wait_all_tasks_blocked()
assert wakeups == 0

# Calling fire() lets it run:
p.fire()
await wait_all_tasks_blocked()
assert wakeups == 1

# Multiple events are coalesced into one:
p.fire()
p.fire()
p.fire()
await wait_all_tasks_blocked()
assert wakeups == 2

p.close()


async def test_EventStream_while_task_is_elsewhere(autojump_clock):
p = EventStream()

wakeups = 0

async def background():
nonlocal wakeups
async for _ in p.subscribe():
wakeups += 1
await sleep(10)

async with _core.open_nursery() as nursery:
nursery.start_soon(background)

# Double-check that it's all idle and settled waiting for a event
await sleep(5)
assert wakeups == 0
await sleep(10)
assert wakeups == 0

# Wake it up
p.fire()

# Now it's sitting in sleep()...
await sleep(5)
assert wakeups == 1

# ...when another event arrives.
p.fire()

# It still wakes up though
await sleep(10)
assert wakeups == 2

p.close()


async def test_EventStream_subscribe_independence(autojump_clock):
p = EventStream()

wakeups = [0, 0]

async def background(i, sleep_time):
nonlocal wakeups
async for _ in p.subscribe():
wakeups[i] += 1
await sleep(sleep_time)

try:
async with _core.open_nursery() as nursery:
nursery.start_soon(background, 0, 10)
nursery.start_soon(background, 1, 100)

# Initially blocked, no event fired
await sleep(200)
assert wakeups == [0, 0]

# Firing an event wakes both tasks
p.fire()
await sleep(5)
assert wakeups == [1, 1]

# Now
# task 0 is sleeping for 5 more seconds
# task 1 is sleeping for 95 more seconds

# Fire events at a 10s interval; task 0 will wake up for each
# task 1 will only wake up after its sleep
p.fire()
await sleep(10)
p.fire()
assert wakeups == [2, 1]
await sleep(100)
assert wakeups == [3, 2]

# Now task 0 is blocked on the next event
# Task 1 is sleeping for 100s

p.fire()
await sleep(1)
assert wakeups == [4, 2]
await sleep(100)
assert wakeups == [4, 3]

p.close()
except:
import traceback
traceback.print_exc()
raise
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was pytest not printing the traceback properly somehow while you were developing?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it was reporting a ResourceWarning about an async generator being disposed:

E               Traceback (most recent call last):                                                                                                                       
E                 File "/home/mauve/dev/trio/trio/_core/_asyncgens.py", line 79, in finalizer                                                                           
E                   warnings.warn(                                                                                                                                       
E               ResourceWarning: Async generator 'trio._sync.EventStream.subscribe' was garbage collected before it had been exhausted. Surround its use in 'async with a
closing(...):' to ensure that it gets cleaned up as soon as you're done using it.                                                                                        

I didn't dig into what you're doing here, and it sounds a bit odd to me, but I rolled with it. This is one reason I added the .close() method (but I think it is a good idea anyway).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh, ok. A lot going on here :-)

I don't see how the except block could interact with that warning. The warning is generated in "GC context", where raising exceptions is impossible. Well, it's really a fake GC context, because we re-enter Trio and run the finalizer in async context, so Trio tries to emulate the interpreter's regular GC context... it's possible we don't do it perfectly and some kind of exception escapes somewhere. But I don't think so?

Also, that warning ought to be harmless in this case. It's pointing out that if the async generator is holding any resources like file descriptors, then they are being cleaned up by the GC rather than explicitly. Is this a useful thing to point out? tbh I'm not sure. ResourceWarnings are hidden by default so it's not necessarily a big deal to have some false positives. But in this case it's pure noise, since subscribe doesn't hold any resources. And CPython doesn't bother pointing this out when it GC's regular sync generators -- the only reason we can do it is because the GC delegates async generator cleanup to Trio. Maybe it would be better to just drop the warning entirely? It's kind of a mess.

@oremanj, any thoughts you want to add?