Skip to content

Commit

Permalink
better handling of immediate dispose for ToFuture
Browse files Browse the repository at this point in the history
  • Loading branch information
semiversus committed Jun 15, 2018
1 parent adb591b commit b20de9c
Showing 1 changed file with 5 additions and 6 deletions.
11 changes: 5 additions & 6 deletions broqer/op/to_future.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ def __init__(self, publisher, timeout=None, loop=None):
else:
self._timeout_handle = None

self._disposable = None
self._disposable = publisher.subscribe(self)
if self.done():
self._disposable.dispose()

def _timeout(self):
self.set_exception(asyncio.TimeoutError)
Expand All @@ -51,12 +54,8 @@ def _future_done(self, future):
self._timeout_handle.cancel()

def emit(self, *args: Any, who: Publisher) -> None:
# handle special case: _disposable is set after
# publisher.subscribe(self) in __init__
assert not hasattr(self, '_disposable') or \
who == self._disposable._publisher, \
'emit comming from non assigned publisher'
self._disposable.dispose()
if self._disposable is not None:
self._disposable.dispose()
if len(args) == 1:
self.set_result(args[0])
else:
Expand Down

0 comments on commit b20de9c

Please sign in to comment.