Skip to content

Commit

Permalink
store each .emit until hub topic is assigned
Browse files Browse the repository at this point in the history
  • Loading branch information
semiversus committed Sep 12, 2018
1 parent 09ee148 commit 6c19b4a
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 19 deletions.
25 changes: 10 additions & 15 deletions broqer/hub/hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,16 @@
>>> 'value2' in hub
False
It will store the first .emit for an unassigned topic:
It will store each .emit for an unassigned topic:
>>> hub['value1'].emit(2)
And will raise an exception if .emit is used a second time on unassigned topic:
>>> hub['value1'].emit(3)
Traceback (most recent call last):
...
broqer.publisher.SubscriptionError: Only one emit will be stored ...
Assign a publisher to a hub topic:
>>> _ = hub.assign('value1', Value(1))
Output: 1
Output: 2
Output: 3
>>> hub['value1'].assigned
True
Expand Down Expand Up @@ -114,7 +109,7 @@ def __init__(self, hub: 'Hub', # pylint: disable=unused-argument
self._subject = None # type: Publisher
self._path = path
self.assignment_future = None
self._pre_assign_emit = NONE # type: Any
self._pre_assign_emit = None # type: list

def subscribe(self, subscriber: 'Subscriber',
prepend: bool = False) -> SubscriptionDisposable:
Expand Down Expand Up @@ -147,11 +142,9 @@ def get(self):
def emit(self, value: Any,
who: Optional[Publisher] = None) -> asyncio.Future:
if self._subject is None:
if self._pre_assign_emit is not NONE:
# method will be replaced by .__call__
raise SubscriptionError('Only one emit will be stored before' +
' assignment')
self._pre_assign_emit = value
if self._pre_assign_emit is None:
self._pre_assign_emit = []
self._pre_assign_emit.append(value)
return None

if who is self._subject:
Expand All @@ -170,8 +163,10 @@ def assign(self, subject, *_args, **_kwargs):
self._subject = subject
if self._subscriptions:
self._subject.subscribe(self)
if self._pre_assign_emit is not NONE:
self._subject.emit(self._pre_assign_emit, who=self)
if self._pre_assign_emit is not None:
for value in self._pre_assign_emit:
self._subject.emit(value, who=self)
self._pre_assign_emit = None
if self.assignment_future is not None:
self.assignment_future.set_result(None)

Expand Down
6 changes: 2 additions & 4 deletions test/test_hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,7 @@ def test_subscribe_emit_assign(factory):
assert len(hub['value1']._subscriptions) == 1

hub['value1'].emit(1)

with pytest.raises(ValueError):
hub['value1'].emit(2)
hub['value1'].emit(2)

mock_sink.assert_not_called()

Expand All @@ -178,7 +176,7 @@ def test_subscribe_emit_assign(factory):
value = Value(0)

hub.assign('value1', value)
mock_sink.calls(mock.call(0), mock.call(1))
mock_sink.calls(mock.call(0), mock.call(1), mock.call(2))

@pytest.mark.asyncio
async def test_wait_for_assignment(event_loop):
Expand Down

0 comments on commit 6c19b4a

Please sign in to comment.