Skip to content

Commit

Permalink
fix on_emit_future
Browse files Browse the repository at this point in the history
  • Loading branch information
semiversus committed Feb 23, 2021
1 parent a39fc09 commit 0e9a86d
Showing 1 changed file with 6 additions and 5 deletions.
11 changes: 6 additions & 5 deletions broqer/subscribers/on_emit_future.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@
class OnEmitFuture(broqer.Subscriber, asyncio.Future):
""" Build a future able to await for.
:param publisher: source publisher
:param timeout: timeout in seconds
:param timeout: timeout in seconds, None for no timeout
:param omit_subscription: omit any emit while subscription
:param loop: asyncio loop to be used
"""
def __init__(self, publisher: 'Publisher', timeout=None,
omit_first_emit=False, loop=None):
omit_subscription=False, loop=None):
if loop is None:
loop = asyncio.get_event_loop()

Expand All @@ -42,7 +43,7 @@ def __init__(self, publisher: 'Publisher', timeout=None,

self._publisher = publisher

self._omit_first_emit = omit_first_emit
self._omit_subscription = omit_subscription

if timeout is not None:
self._timeout_handle = loop.call_later(
Expand All @@ -51,6 +52,7 @@ def __init__(self, publisher: 'Publisher', timeout=None,
self._timeout_handle = None

publisher.subscribe(self)
self._omit_subscription = False

def _cleanup(self, _future=None):
self._publisher.unsubscribe(self)
Expand All @@ -63,8 +65,7 @@ def emit(self, value: Any, who: Optional['Publisher'] = None) -> None:
if who is not self._publisher:
raise ValueError('Emit from non assigned publisher')

if self._omit_first_emit:
self._omit_first_emit = False
if self._omit_subscription:
return

if not self.done():
Expand Down

0 comments on commit 0e9a86d

Please sign in to comment.