Skip to content

Commit

Permalink
added LAST_DISTINCT to op.MapThreaded
Browse files Browse the repository at this point in the history
  • Loading branch information
semiversus committed Jun 8, 2018
1 parent 3523a3b commit ffb07ce
Showing 1 changed file with 21 additions and 21 deletions.
42 changes: 21 additions & 21 deletions broqer/op/map_threaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ def __init__(self, publisher: Publisher, map_func, *args,
* CONCURRENT - just run coroutines concurrent
* QUEUE - queue the value(s) and call after coroutine is finished
* LAST - use last emitted value after coroutine is finished
* LAST_DISTINCT - like LAST but only when value has changed
* SKIP - skip values emitted during coroutine is running
( INTERRUPT like MapAsync is not possible with MapThreaded )
"""
Expand All @@ -121,16 +122,17 @@ def __init__(self, publisher: Publisher, map_func, *args,
self._mode = mode
self._error_callback = error_callback
self._future = None # type: asyncio.Future
self._last_emit = None # type: Any

if args or kwargs:
self._map_func = \
partial(map_func, *args, **kwargs) # type: Callable
else:
self._map_func = map_func # type: Callable

if mode in (Mode.QUEUE, Mode.LAST):
self._queue = deque(maxlen=(None if mode == Mode.QUEUE else 1)
) # type: MutableSequence
if mode in (Mode.QUEUE, Mode.LAST, Mode.LAST_DISTINCT):
maxlen = (None if mode == Mode.QUEUE else 1)
self._queue = deque(maxlen=maxlen) # type: MutableSequence
else: # no queue for CONCURRENT and SKIP
self._queue = None

Expand All @@ -144,18 +146,14 @@ def __init__(self, publisher: Publisher, map_func, *args,
def emit(self, *args: Any, who: Publisher) -> None:
assert who == self._publisher, 'emit from non assigned publisher'

if (self._mode == Mode.CONCURRENT or
self._future is None or
if (self._mode == Mode.CONCURRENT or self._future is None or
self._future.done()):

self._future = asyncio.ensure_future(
asyncio.get_event_loop().run_in_executor(
self._executor,
self._map_func,
*args
)
)
self._future.add_done_callback(self._future_done)
self._last_emit = args
future = asyncio.get_event_loop().run_in_executor(
self._executor, self._map_func, *args)
self._future = asyncio.ensure_future(future)
self._future.add_done_callback(self._future_done)
elif self._mode in (Mode.QUEUE, Mode.LAST):
self._queue.append(args)

Expand All @@ -169,16 +167,18 @@ def _future_done(self, future):
result = ()
elif not isinstance(result, tuple):
result = (result, )
self._emit(*result)
try:
self._emit(*result)
except Exception:
self._error_callback(*sys.exc_info())

if self._queue:
self._future = asyncio.ensure_future(
asyncio.get_event_loop().run_in_executor(
self._executor,
self._map_func,
*self._queue.popleft()
)
)
args = self._queue.popleft()
if self._mode == Mode.LAST_DISTINCT and args == self._last_emit:
return
future = asyncio.get_event_loop().run_in_executor(
self._executor, self._map_func, *args)
self._future = asyncio.ensure_future(future)
self._future.add_done_callback(self._future_done)


Expand Down

0 comments on commit ffb07ce

Please sign in to comment.