Skip to content

Commit

Permalink
rework operators
Browse files Browse the repository at this point in the history
  • Loading branch information
semiversus committed Jun 1, 2021
1 parent f68d038 commit c545085
Show file tree
Hide file tree
Showing 15 changed files with 146 additions and 220 deletions.
5 changes: 1 addition & 4 deletions broqer/op/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@
from broqer.op.cache import Cache
from broqer.op.throttle import Throttle

# utils
from broqer.op.concat import Concat

# enable operator overloading
from .py_operators import Str, Bool, Int, Float, Repr, Len, In, All, Any, \
BitwiseAnd, BitwiseOr, Not
Expand All @@ -22,7 +19,7 @@
'CombineLatest', 'BitwiseCombineLatest',
'Filter', 'Map', 'EvalTrue', 'MapAsync', 'build_map_async', 'AsyncMode',
'EvalFalse', 'build_map', 'build_map_factory', 'build_combine_latest',
'build_filter', 'build_filter_factory', 'Concat', 'Str', 'Bool', 'Int',
'build_filter', 'build_filter_factory', 'Str', 'Bool', 'Int',
'Float', 'Repr', 'map_bit', 'build_map_async_factory',
'Len', 'In', 'All', 'Any', 'BitwiseAnd', 'BitwiseOr', 'Not', 'Throttle',
'Cache'
Expand Down
23 changes: 7 additions & 16 deletions broqer/op/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,19 @@

from broqer import Publisher, NONE
from broqer.publisher import TValue
from broqer.operator import Operator, OperatorFactory
from broqer.operator import Operator


class AppliedCache(Operator):
class Cache(Operator):
""" Cache object applied to publisher (see Map) """
def __init__(self, publisher: Publisher, init: Any = NONE) -> None:
Operator.__init__(self, publisher)
def __init__(self, init: Any = NONE) -> None:
Operator.__init__(self)
self._state = init

def get(self) -> TValue:
if self._originator is None:
raise ValueError('Operator is missing originator')

return self._originator.get()

def emit(self, value: TValue, who: Publisher) -> None:
Expand All @@ -54,15 +57,3 @@ def emit(self, value: TValue, who: Publisher) -> None:
return Publisher.notify(self, value)

return None


class Cache(OperatorFactory): # pylint: disable=too-few-public-methods
""" Cache emitted values. Suppress duplicated value emits.
:param init: optional initialization state
"""
def __init__(self, init: Any = NONE) -> None:
self._init = init

def apply(self, publisher: Publisher):
return AppliedCache(publisher, self._init)
22 changes: 0 additions & 22 deletions broqer/op/concat.py

This file was deleted.

53 changes: 25 additions & 28 deletions broqer/op/filter_.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,27 @@
from typing import Any, Callable

from broqer import NONE, Publisher
from broqer.operator import Operator, OperatorFactory, OperatorMeta
from broqer.operator import Operator, ClassOperatorMeta


class AppliedFilter(Operator):
""" Filter object applied to publisher (see Filter) """
def __init__(self, publisher: Publisher, predicate: Callable[[Any], bool],
unpack: bool = False) -> None:
Operator.__init__(self, publisher)
self._predicate = predicate
class Filter(Operator):
""" Filter object applied to publisher
:param predicate: function to evaluate the filtering
:param \\*args: variable arguments to be used for evaluating predicate
:param unpack: value from emits will be unpacked (\\*value)
:param \\*\\*kwargs: keyword arguments to be used for evaluating predicate
"""
def __init__(self, predicate: Callable[[Any], bool], *args,
unpack: bool = False, **kwargs) -> None:
Operator.__init__(self)
self._predicate = partial(predicate, *args, **kwargs) # type: Callable
self._unpack = unpack

def get(self) -> Any:
if self._originator is None:
raise ValueError('Operator is missing originator')

if self._subscriptions:
return self._state

Expand Down Expand Up @@ -68,30 +77,16 @@ def emit(self, value: Any, who: Publisher) -> None:
return None


class Filter(OperatorFactory): # pylint: disable=too-few-public-methods
""" Filters values based on a ``predicate`` function
:param predicate: function to evaluate the filtering
:param \\*args: variable arguments to be used for evaluating predicate
:param unpack: value from emits will be unpacked (\\*value)
:param \\*\\*kwargs: keyword arguments to be used for evaluating predicate
"""
def __init__(self, predicate: Callable[[Any], bool],
*args, unpack: bool = False, **kwargs) -> None:
self._predicate = partial(predicate, *args, **kwargs) # type: Callable
self._unpack = unpack

def apply(self, publisher: Publisher):
return AppliedFilter(publisher, self._predicate, self._unpack)


class EvalTrue(Operator, metaclass=OperatorMeta):
class EvalTrue(Operator, metaclass=ClassOperatorMeta):
""" Emits all values which evaluates for True.
This operator can be used in the pipline style (v | EvalTrue) or as
standalone operation (EvalTrue(v)).
"""
def __init__(self, publisher: Publisher) -> None:
Operator.__init__(self, publisher)
def __init__(self, publisher: Publisher = None) -> None:
Operator.__init__(self)
if publisher is not None:
self.originator = publisher

def get(self) -> Any:
if self._subscriptions:
Expand All @@ -115,13 +110,15 @@ def emit(self, value: Any, who: Publisher) -> None:
return None


class EvalFalse(Operator, metaclass=OperatorMeta):
class EvalFalse(Operator, metaclass=ClassOperatorMeta):
""" Filters all emits which evaluates for False.
This operator can be used in the pipline style (v | EvalFalse or as
standalone operation (EvalFalse(v))."""
def __init__(self, publisher: Publisher) -> None:
Operator.__init__(self, publisher)
Operator.__init__(self)
if publisher is not None:
self.originator = publisher

def get(self) -> Any:
if self._subscriptions:
Expand Down
40 changes: 16 additions & 24 deletions broqer/op/map_.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,28 +40,37 @@

from broqer import Publisher, NONE
from broqer.publisher import TValue
from broqer.operator import Operator, OperatorFactory
from broqer.operator import Operator


class AppliedMap(Operator):
""" Map object applied to publisher (see Map) """
def __init__(self, publisher: Publisher, function: Callable[[Any], Any],
unpack: bool = False) -> None:
class Map(Operator):
""" Map object applied to publisher
:param function: function to be applied for each emit
:param \\*args: variable arguments to be used for calling function
:param unpack: value from emits will be unpacked (\\*value)
:param \\*\\*kwargs: keyword arguments to be used for calling function
"""
def __init__(self, function: Callable[[Any], Any], *args,
unpack: bool = False, **kwargs) -> None:
""" Special care for return values:
- return `None` (or nothing) if you don't want to return a result
- return `None, ` if you want to return `None`
- return `(a, b), ` to return a tuple as value
- every other return value will be unpacked
"""

Operator.__init__(self, publisher)
self._function = function
Operator.__init__(self)
self._function = partial(function, *args, **kwargs)
self._unpack = unpack

def get(self) -> TValue:
if self._subscriptions:
return self._state

if self._originator is None:
raise ValueError('Operator is missing originator')

value = self._originator.get() # type: TValue

if value is NONE:
Expand Down Expand Up @@ -89,23 +98,6 @@ def emit(self, value: TValue, who: Publisher) -> None:
return None


class Map(OperatorFactory): # pylint: disable=too-few-public-methods
""" Apply ``function(*args, value, **kwargs)`` to each emitted value.
:param function: function to be applied for each emit
:param \\*args: variable arguments to be used for calling function
:param unpack: value from emits will be unpacked (\\*value)
:param \\*\\*kwargs: keyword arguments to be used for calling function
"""
def __init__(self, function: Callable[[Any], Any],
*args, unpack: bool = False, **kwargs) -> None:
self._function = partial(function, *args, **kwargs)
self._unpack = unpack

def apply(self, publisher: Publisher):
return AppliedMap(publisher, self._function, self._unpack)


def build_map(function: Callable[..., None] = None, *,
unpack: bool = False):
""" Decorator to wrap a function to return a Map operator.
Expand Down
53 changes: 17 additions & 36 deletions broqer/op/map_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@
from typing import Any, MutableSequence, Optional # noqa: F401

# pylint: disable=cyclic-import
from broqer.operator import Operator, OperatorFactory
from broqer.operator import Operator
from broqer import Publisher, default_error_handler, NONE


Expand Down Expand Up @@ -133,19 +133,27 @@ class AsyncMode(Enum):
SKIP = 6 # skip values emitted during coroutine is running


class AppliedMapAsync(Operator):
""" Apply ``coro`` to each emitted value allowing async processing
class MapAsync(Operator): # pylint: disable=too-many-instance-attributes
""" Apply ``coro(*args, value, **kwargs)`` to each emitted value allow
async processing.
:param coro: coroutine to be applied on emit
:param \\*args: variable arguments to be used for calling coro
:param mode: behavior when a value is currently processed
:param error_callback: error callback to be registered
:param unpack: value from emits will be unpacked as (\\*value)
:param \\*\\*kwargs: keyword arguments to be used for calling coro
:ivar scheduled: Publisher emitting the value when coroutine is actually
started.
"""
def __init__(self, publisher: Publisher,
coro_with_args, mode=AsyncMode.CONCURRENT,
error_callback=default_error_handler,
def __init__(self,
coro, *args, mode=AsyncMode.CONCURRENT,
error_callback=default_error_handler, unpack: bool = False,
**kwargs
) -> None:
Operator.__init__(self, publisher)

self._coro = coro_with_args
Operator.__init__(self)
self._coro = build_coro(coro, unpack, *args, **kwargs)
self._mode = mode
self._error_callback = error_callback

Expand Down Expand Up @@ -237,33 +245,6 @@ def _run_coro(self, value):
self._future.add_done_callback(self._future_done)


class MapAsync(OperatorFactory): # pylint: disable=too-few-public-methods
""" Apply ``coro(*args, value, **kwargs)`` to each emitted value allow
async processing.
:param coro: coroutine to be applied on emit
:param \\*args: variable arguments to be used for calling coro
:param mode: behavior when a value is currently processed
:param error_callback: error callback to be registered
:param unpack: value from emits will be unpacked as (\\*value)
:param \\*\\*kwargs: keyword arguments to be used for calling coro
"""
def __init__(self, coro, *args, mode=AsyncMode.CONCURRENT,
error_callback=default_error_handler,
unpack: bool = False, **kwargs) -> None:
self._coro = build_coro(coro, unpack, *args, **kwargs)
self._mode = mode
self._error_callback = error_callback
self._unpack = unpack

def apply(self, publisher: Publisher):
return AppliedMapAsync(publisher,
coro_with_args=self._coro,
mode=self._mode,
error_callback=self._error_callback,
)


def build_map_async(coro=None, *,
mode: AsyncMode = AsyncMode.CONCURRENT,
error_callback=default_error_handler,
Expand Down
26 changes: 8 additions & 18 deletions broqer/op/throttle.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,24 @@

from broqer import Publisher, default_error_handler, NONE

from broqer.operator import Operator, OperatorFactory
from broqer.operator import Operator
from broqer.timer import Timer


class AppliedThrottle(Operator):
class Throttle(Operator):
""" Rate limit emits by the given time.
:param duration: time for throttling in seconds
:param error_callback: the error callback to be registered
:param loop: asyncio event loop to use
"""
def __init__(self, publisher: Publisher, duration: float,
def __init__(self, duration: float,
error_callback=default_error_handler, loop=None) -> None:

Operator.__init__(self, publisher)
Operator.__init__(self)

if duration < 0:
raise ValueError('Duration has to be bigger than zero')

self._duration = duration
self._loop = loop or asyncio.get_event_loop()
Expand Down Expand Up @@ -74,17 +78,3 @@ def _delayed_emit_cb(self, value=NONE):
def reset(self):
""" Reseting duration for throttling """
self._timer.cancel()


class Throttle(OperatorFactory): # pylint: disable=too-few-public-methods
""" Apply throttling to each emitted value.
:param duration: time for throttling in seconds
"""
def __init__(self, duration: float) -> None:
if duration < 0:
raise ValueError('Duration has to be bigger than zero')

self._duration = duration

def apply(self, publisher: Publisher):
return AppliedThrottle(publisher, self._duration)

0 comments on commit c545085

Please sign in to comment.