Skip to content

Commit

Permalink
added map functionality to op.CombineLatest
Browse files Browse the repository at this point in the history
  • Loading branch information
semiversus committed Jun 9, 2018
1 parent 769ed28 commit fa8f56c
Showing 1 changed file with 6 additions and 8 deletions.
14 changes: 6 additions & 8 deletions broqer/op/combine_latest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
1 2
>>> s2.emit(3)
1 3
>>> combination.cache
(1, 3)
>>> disposable.dispose()
Subscribing to a CombineLatest with all values available is emitting the values
Expand All @@ -37,13 +35,14 @@


class CombineLatest(MultiOperator):
def __init__(self, *publishers: Publisher) -> None:
def __init__(self, *publishers: Publisher, map=None) -> None:
MultiOperator.__init__(self, *publishers)
self._cache = [None for _ in publishers] # type: MutableSequence[Any]
self._missing = set(publishers)
self._index = \
{p: i for i, p in enumerate(publishers)
} # type: Dict[Publisher, int]
self._map = map

def subscribe(self, subscriber: Subscriber) -> SubscriptionDisposable:
disposable = MultiOperator.subscribe(self, subscriber)
Expand All @@ -59,11 +58,10 @@ def emit(self, *args: Any, who: Publisher) -> None:
args = args[0]
self._cache[self._index[who]] = args
if not self._missing:
self._emit(*self._cache)

@property
def cache(self):
return tuple(self._cache)
if self._map:
self._emit(self._map(*self._cache))
else:
self._emit(*self._cache)


combine_latest = build_operator(CombineLatest)

0 comments on commit fa8f56c

Please sign in to comment.