Skip to content

Commit

Permalink
wip: introducing build_ decorators
Browse files Browse the repository at this point in the history
  • Loading branch information
semiversus committed Sep 22, 2018
1 parent 89915ac commit 2870391
Show file tree
Hide file tree
Showing 10 changed files with 75 additions and 79 deletions.
9 changes: 5 additions & 4 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ Make your own operators on the fly with ``map`` and ``filter`` decorators:

.. code-block:: python3
>>> @map
>>> @build_map
... def count_vowels(s):
... return sum([s.count(v) for v in 'aeiou'])
Expand All @@ -153,7 +153,8 @@ You can even make configurable ``Map`` s and ``Filter`` s:
.. code-block:: python3
>>> import re
>>> @filter
>>> @build_filter
... def filter_pattern(pattern, s):
... return re.search(pattern, s) is not None
Expand All @@ -164,8 +165,8 @@ You can even make configurable ``Map`` s and ``Filter`` s:
>>> msg.emit('Only 1 car has passed')
Only 1 car has passed
Decorators are also available for ``Accumulate``, ``MapAsync``, ``MapThreaded``
and ``Reduce``.
Decorators are also available for ``Accumulate``, ``MapAsync``, ``MapThreaded``,
``Reduce`` and ``Sink``.

Install
=======
Expand Down
3 changes: 2 additions & 1 deletion broqer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from .subject import Subject, Value

from .op import operator_overloading # noqa: F401 enable operator overloading
from . import op

__author__ = 'Günther Jena'
__email__ = 'guenther@jena.at'
Expand All @@ -20,5 +21,5 @@
__all__ = [
'StatefulPublisher', 'Disposable', 'Publisher', 'Subscriber',
'SubscriptionDisposable', 'SubscriptionError', 'NONE', 'Hub',
'SubHub', 'Subject', 'Value', 'default_error_handler',
'SubHub', 'Subject', 'Value', 'default_error_handler', 'op'
]
20 changes: 10 additions & 10 deletions broqer/op/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
# synchronous operators
from .any_ import Any
from .all_ import All
from .accumulate import Accumulate, accumulate
from .accumulate import Accumulate, build_accumulate
from .cache import Cache
from .catch_exception import CatchException
from .combine_latest import CombineLatest
from .filter_ import Filter, True_, False_, filter_, filter_unpacked
from .map_ import Map, map_, map_unpacked
from .filter_ import Filter, True_, False_, build_filter
from .map_ import Map, build_map
from .merge import Merge
from .partition import Partition
from .reduce import Reduce, reduce
from .reduce import Reduce, build_reduce
from .replace import Replace
from .sliding_window import SlidingWindow
from .switch import Switch
Expand All @@ -20,16 +20,16 @@
from .debounce import Debounce
from .delay import Delay
from .sample import Sample
from .map_async import MapAsync, MODE, map_async, map_async_unpacked
from .map_threaded import MapThreaded, map_threaded, map_threaded_unpacked
from .map_async import MapAsync, MODE, build_map_async
from .map_threaded import MapThreaded, build_map_threaded
from .throttle import Throttle

# publishers
from .publishers.from_polling import FromPolling

# subscribers
from .subscribers.on_emit_future import OnEmitFuture
from .subscribers.sink import Sink
from .subscribers.sink import Sink, build_sink
from .subscribers.trace import Trace
from .subscribers.topic_mapper import TopicMapper

Expand All @@ -43,7 +43,7 @@
'Filter', 'Map', 'Merge', 'Partition', 'Reduce', 'Replace', 'Sink',
'SlidingWindow', 'Switch', 'Debounce', 'Delay', 'FromPolling', 'Sample',
'MapAsync', 'MODE', 'MapThreaded', 'Throttle', 'OnEmitFuture', 'True_',
'False_', 'Trace', 'TopicMapper', 'map_', 'map_unpacked',
'filter_', 'filter_unpacked', 'accumulate', 'reduce', 'map_async',
'map_async_unpacked', 'map_threaded', 'map_threaded_unpacked'
'False_', 'Trace', 'TopicMapper', 'build_map', 'build_reduce',
'build_filter', 'build_accumulate', 'build_map_async',
'build_map_threaded', 'build_sink'
]
2 changes: 1 addition & 1 deletion broqer/op/accumulate.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def reset(self, state: Any) -> None:
self._state = state


def accumulate(function):
def build_accumulate(function):
@wraps(function)
def wrapper_accumulate_function(init):
return Accumulate(function, init)
Expand Down
24 changes: 12 additions & 12 deletions broqer/op/filter_.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,15 @@ def emit(self, value: Any, who: Publisher) -> asyncio.Future:
return None


def filter_(predicate):
@wraps(predicate)
def wrapper_filter_function(*args, **kwargs):
return Filter(predicate, *args, unpack=False, **kwargs)
return wrapper_filter_function


def filter_unpacked(predicate):
@wraps(predicate)
def wrapper_filter_function(*args, **kwargs):
return Filter(predicate, *args, unpack=True, **kwargs)
return wrapper_filter_function
def build_filter(predicate: Callable[[Any], bool] = None,
unpack: bool = False):
def _build_filter(predicate: Callable[[Any], bool]):
@wraps(predicate)
def _wrapper(*args, **kwargs) -> Filter:
return Filter(predicate, *args, unpack=unpack, **kwargs)
return _wrapper

if predicate:
return _build_filter(predicate)

return _build_filter
24 changes: 12 additions & 12 deletions broqer/op/map_.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,15 @@ def emit(self, value: Any, who: Publisher) -> asyncio.Future:
return None


def map_(callback):
@wraps(callback)
def wrapper_map_function(*args, **kwargs):
return Map(callback, *args, unpack=False, **kwargs)
return wrapper_map_function


def map_unpacked(callback):
@wraps(callback)
def wrapper_map_function(*args, **kwargs):
return Map(callback, *args, unpack=True, **kwargs)
return wrapper_map_function
def build_map(function: Callable[[Any], Any] = None,
unpack: bool = False):
def _build_map(function: Callable[[Any], Any]):
@wraps(function)
def _wrapper(*args, **kwargs) -> Map:
return Map(function, *args, unpack=unpack, **kwargs)
return _wrapper

if function:
return _build_map(function)

return _build_map
27 changes: 8 additions & 19 deletions broqer/op/map_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,25 +226,14 @@ def _run_coro(self, value):
self._future.add_done_callback(self._future_done)


def map_async(coroutine=None, mode=MODE.CONCURRENT):
def _map_async_unpacked(coroutine):
@wraps(coroutine)
def _wrapper(*args, error_callback=default_error_handler, **kwargs):
return MapAsync(coroutine, *args, unpack=False, mode=mode,
error_callback=error_callback, **kwargs)
def build_map_async(coro=None, mode=MODE.CONCURRENT, unpack: bool = False):
def _build_map_async(coro):
@wraps(coro)
def _wrapper(*args, **kwargs) -> MapAsync:
return MapAsync(coro, *args, mode=mode, unpack=unpack, **kwargs)
return _wrapper
if coroutine:
return _map_async_unpacked(coroutine)
return _map_async_unpacked

if coro:
return _build_map_async(coro)

def map_async_unpacked(coroutine=None, mode=MODE.CONCURRENT):
def _map_async_unpacked(coroutine):
@wraps(coroutine)
def _wrapper(*args, error_callback=default_error_handler, **kwargs):
return MapAsync(coroutine, *args, unpack=True, mode=mode,
error_callback=error_callback, **kwargs)
return _wrapper
if coroutine:
return _map_async_unpacked(coroutine)
return _map_async_unpacked
return _build_map_async
27 changes: 9 additions & 18 deletions broqer/op/map_threaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,25 +125,16 @@ async def _thread_coro(self, *args):
self._executor, self._map_func, *args)


def map_threaded(function=None, mode=MODE.CONCURRENT):
def _map_threaded_unpacked(function):
def build_map_threaded(function=None, mode=MODE.CONCURRENT,
unpack: bool = False):
def _build_map_threaded(function):
@wraps(function)
def _wrapper(*args, error_callback=default_error_handler, **kwargs):
return MapThreaded(function, *args, unpack=False, mode=mode,
error_callback=error_callback, **kwargs)
def _wrapper(*args, **kwargs) -> MapThreaded:
return MapThreaded(function, *args, mode=mode, unpack=unpack,
**kwargs)
return _wrapper
if function:
return _map_threaded_unpacked(function)
return _map_threaded_unpacked


def map_threaded_unpacked(function=None, mode=MODE.CONCURRENT):
def _map_threaded_unpacked(function):
@wraps(function)
def _wrapper(*args, error_callback=default_error_handler, **kwargs):
return MapThreaded(function, *args, unpack=True, mode=mode,
error_callback=error_callback, **kwargs)
return _wrapper
if function:
return _map_threaded_unpacked(function)
return _map_threaded_unpacked
return _build_map_threaded(function)

return _build_map_threaded
2 changes: 1 addition & 1 deletion broqer/op/reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def _func(state, value):
Accumulate.__init__(self, _func, init)


def reduce(function):
def build_reduce(function):
@wraps(function)
def _wrapper(init: Any):
return Reduce(function, init)
Expand Down
16 changes: 15 additions & 1 deletion broqer/op/subscribers/sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
>>> len(s.subscriptions)
0
"""
from functools import partial
from functools import partial, wraps
from typing import Any, Callable, Optional

from broqer import Subscriber, Publisher
Expand Down Expand Up @@ -57,3 +57,17 @@ def emit(self, value: Any, who: Publisher):
self._callback(*value)
else:
self._callback(value)


def build_sink(function: Callable[..., None] = None,
unpack: bool = False):
def _build_sink(function: Callable[..., None]):
@wraps(function)
def _wrapper(*args, **kwargs) -> Sink:
return Sink(function, *args, unpack=unpack, **kwargs)
return _wrapper

if function:
return _build_sink(function)

return _build_sink

0 comments on commit 2870391

Please sign in to comment.