Skip to content

Commit

Permalink
remove .wait_for_assignment for hub topics and return subject on topi…
Browse files Browse the repository at this point in the history
…c.assign()
  • Loading branch information
semiversus committed Sep 18, 2018
1 parent 3b9efa1 commit d4d154a
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 63 deletions.
18 changes: 10 additions & 8 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ able to use the observer pattern - let's see!
Observer pattern
----------------

Subscribing to a publisher is done via the `|` - here used as a pipe. A simple
subscriber is `op.Sink` which is calling a function with optional positional
Subscribing to a publisher is done via the ``|`` - here used as a pipe. A simple
subscriber is ``op.Sink`` which is calling a function with optional positional
and keyword arguments.

.. code-block:: python3
Expand All @@ -86,7 +86,7 @@ Combining publishers
--------------------

You're able to create publishers on the fly by combining two publishers with
the common operators (like `+`, `>`, `<<`, ...).
the common operators (like ``+``, ``>``, ``<<``, ...).

.. code-block:: python3
Expand All @@ -106,9 +106,9 @@ Asyncio Support
---------------

A lot of operators are made for asynchronous operations. You're able to debounce
and throttle emits (via `op.debounce` and `op.throttle`), sample and delay (via
`op.Sample` and `op.Delay`) or start coroutines and when finishing the result
will be emitted.
and throttle emits (via ``op.Debounce`` and ``op.Throttle``), sample and delay
(via ``op.Sample`` and ``op.Delay``) or start coroutines and when finishing the
result will be emitted.

.. code-block:: python3
Expand All @@ -121,9 +121,11 @@ will be emitted.
After 3 seconds the result will be:

Result:0
.. code-block:: bash
Result: 0
`map_async` supports various modes how to handle a new emit when a coroutine
``map_async`` supports various modes how to handle a new emit when a coroutine
is running. Default is a concurrent run of coroutines, but also various queue
or interrupt mode is available.

Expand Down
61 changes: 22 additions & 39 deletions broqer/hub/hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,30 +68,6 @@
>>> _ = hub['value3'].assign(Value(0), meta={'maximum':10})
>>> hub['value3'].meta
{'maximum': 10}
Wait for assignment
-------------------
It's also possible to wait for an assignment:
>>> import asyncio
>>> _f2 = asyncio.ensure_future(hub['value4'].wait_for_assignment())
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.02))
>>> _f2.done()
False
>>> _ = hub['value4'].assign(Value(0))
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.01))
>>> _f2.done()
True
When already assigned it will not wait at all:
>>> _f2 = asyncio.ensure_future(hub['value4'].wait_for_assignment())
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.01))
>>> _f2.done()
True
"""
import asyncio
from collections import OrderedDict
Expand All @@ -114,14 +90,16 @@ def __init__(self, hub: 'Hub', # pylint: disable=unused-argument
self._subject = None # type: Publisher
self._path = path
self._hub = hub
self.assignment_future = None
self._pre_assign_emit = None # type: list

def subscribe(self, subscriber: 'Subscriber',
def subscribe(self, subscriber: Subscriber,
prepend: bool = False) -> SubscriptionDisposable:
disposable = Publisher.subscribe(self, subscriber, prepend)

if self._subject is not None:
assert isinstance(self, Publisher), 'Topic %r has to be a ' \
'Publisher when using .subscribe()' % self._path

if len(self._subscriptions) == 1:
self._subject.subscribe(self)
else:
Expand All @@ -142,40 +120,51 @@ def unsubscribe(self, subscriber: Subscriber) -> None:
def get(self):
try:
return self._subject.get()
except AttributeError: # if self._subject is None
raise ValueError('Topic is not yet assigned')
except AttributeError: # if self._subject is None or not a Publisher
raise ValueError('Topic %r is not yet assigned or is not a '
'Publisher' % self._path)

def emit(self, value: Any,
who: Optional[Publisher] = None) -> asyncio.Future:

if self._subject is None:
# if yet unassigned store the emit for later replay
if self._pre_assign_emit is None:
self._pre_assign_emit = []
self._pre_assign_emit.append(value)
return None

assert isinstance(self._subject, Subscriber), \
'Topic %r has to be a Subscriber when using .emit()' % self._path

# notify all subscribers when the source of the .emit is the subject
if who is self._subject:
return self.notify(value)

assert isinstance(self._subject, Subscriber), \
'Topic has to be a subscriber'

# otherwise pass this .emit to the subject
return self._subject.emit(value, who=self)

def assign(self, subject):
""" Assigns the given subject to the topic """
assert isinstance(subject, (Publisher, Subscriber))

# check if not already assigned
if self._subject is not None:
raise SubscriptionError('Topic %r already assigned' % self._path)

self._subject = subject

# subscribe to subject if topic has subscriptions
if self._subscriptions:
self._subject.subscribe(self)

# if topic received emits before assignment replay those emits
if self._pre_assign_emit is not None:
for value in self._pre_assign_emit:
self._subject.emit(value, who=self)
self._pre_assign_emit = None
if self.assignment_future is not None:
self.assignment_future.set_result(None)

return subject

def freeze(self):
""" Called by hub when hub is going to be frozen """
Expand All @@ -193,12 +182,6 @@ def subject(self):
""" The assigned subject """
return self._subject

async def wait_for_assignment(self):
""" Coroutine to wait until the assignment is finished """
if not self.assigned:
self.assignment_future = asyncio.get_event_loop().create_future()
await self.assignment_future

@property
def path(self) -> str:
""" Topic path used as key in the hub """
Expand Down
16 changes: 0 additions & 16 deletions test/test_hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,22 +178,6 @@ def test_subscribe_emit_assign(factory):
hub['value1'].assign(value)
mock_sink.calls(mock.call(0), mock.call(1), mock.call(2))

@pytest.mark.asyncio
async def test_wait_for_assignment(event_loop):
hub = Hub()

assign_future = asyncio.ensure_future(hub['value1'].wait_for_assignment())
assert not assign_future.done()
await asyncio.sleep(0.001)
assert not assign_future.done()
hub['value1'].assign(Value(0))
await asyncio.sleep(0.001)
assert assign_future.done()

assign_future = asyncio.ensure_future(hub['value1'].wait_for_assignment())
await asyncio.sleep(0.001)
assert assign_future.done()

def test_meta_topic():
hub = Hub(topic_factory=MetaTopic)
assert hub['value1'].meta == dict()
Expand Down

0 comments on commit d4d154a

Please sign in to comment.