Skip to content

Commit

Permalink
emit empty tuple on subscription of CombineLatest without publishers
Browse files Browse the repository at this point in the history
fix bug for allow_stateless when all publishers are stateful
  • Loading branch information
semiversus committed Sep 12, 2018
1 parent 9591815 commit 09ee148
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 3 deletions.
10 changes: 8 additions & 2 deletions broqer/op/combine_latest.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ def subscribe(self, subscriber: Subscriber,

disposable = MultiOperator.subscribe(self, subscriber, prepend)

# if there are no source publishers emit an empty tuple on subscription
if not self._publishers:
self.notify(())

# check if ._statless is already definied (will be done on first
# subscription)
if self._stateless is not None:
return disposable

Expand Down Expand Up @@ -165,7 +171,7 @@ def emit(self, value: Any, who: Publisher) -> asyncio.Future:
state = tuple(self._partial_state)

# remove stateless publisher emits from ._partial_state
if self._stateless[index]:
if self._stateless and self._stateless[index]:
self._partial_state[index] = NONE

# if result of _map() was NONE don't emit
Expand All @@ -176,7 +182,7 @@ def emit(self, value: Any, who: Publisher) -> asyncio.Future:
is_new_state = (state == self._state)

# store ._state only when all publishers are stateful
if not any(self._stateless):
if self._stateless and not any(self._stateless):
self._state = state

# check if state has changed or stateless publisher has emitted
Expand Down
24 changes: 23 additions & 1 deletion test/test_op_combine_latest.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,4 +218,26 @@ def reverse(s1, s2, s3, s4):
source4.notify(4)
with pytest.raises(ValueError):
dut.get()
assert collector.result_vector == ((NONE, 3, NONE, 1), (4, 3, NONE, 1),)
assert collector.result_vector == ((NONE, 3, NONE, 1), (4, 3, NONE, 1),)

def test_allow_stateless_with_stateful_publishers():
source1 = StatefulPublisher(0)
source2 = StatefulPublisher(0)

dut = CombineLatest(source1, source2, allow_stateless=True)

collector = Collector()
dut.subscribe(collector)

source1.notify(1)
source2.notify(1)

assert collector.result_vector == ((0,0), (1,0), (1,1))

def test_no_publishers():
dut = CombineLatest()

collector = Collector()
dut.subscribe(collector)

assert collector.result_vector == ((),)

0 comments on commit 09ee148

Please sign in to comment.