Skip to content

Commit

Permalink
added docstrings
Browse files Browse the repository at this point in the history
  • Loading branch information
semiversus committed Sep 10, 2018
1 parent 013d45a commit 0f5b9a0
Show file tree
Hide file tree
Showing 32 changed files with 204 additions and 43 deletions.
3 changes: 3 additions & 0 deletions broqer/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
# -*- coding: utf-8 -*-
""" Broqer is a carefully crafted library to operate with continuous streams
of data in a reactive style with publish/subscribe and broker functionality.
"""
from .default_error_handler import default_error_handler
from .types import NONE
from .disposable import Disposable, SubscriptionDisposable
Expand Down
17 changes: 17 additions & 0 deletions broqer/default_error_handler.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
""" Implementing DefaultErrorHandler. Object default_error_handler is used
as global object to register a callbacks for exceptions in asynchronous
operators, """
import traceback


Expand All @@ -8,16 +11,30 @@ def _default_error_callback(exc_type, exc_value, exc_traceback):


class DefaultErrorHandler:
""" DefaultErrorHandler object is a callable which is calling a registred
callback and is used for handling exceptions when asynchronous operators
receiving an exception during .emit(). The callback can be registred via
the .set(callback) method. The default callback is _default_error_callback
which is dumping the traceback of the exception.
"""
def __init__(self):
self._error_callback = _default_error_callback

def __call__(self, exc_type, exc_value, exc_traceback):
""" When calling the call will be forwarded to the registered
callback """
self._error_callback(exc_type, exc_value, exc_traceback)

def set(self, error_callback):
""" Register a new callback
:param error_callback: the callback to be registered
"""
self._error_callback = error_callback

def reset(self):
""" Reset to the default callback (dumping traceback)
"""
self._error_callback = _default_error_callback


Expand Down
3 changes: 2 additions & 1 deletion broqer/disposable.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from broqer import Publisher, Subscriber # noqa: F401
from broqer import (Publisher, # noqa: F401 pylint: disable=unused-import
Subscriber)


class Disposable(metaclass=ABCMeta):
Expand Down
1 change: 1 addition & 0 deletions broqer/hub/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
""" This module is containing the Hub class and all depending classes """
from .hub import Hub, SubHub, Topic, MetaTopic

__all__ = [
Expand Down
7 changes: 4 additions & 3 deletions broqer/hub/hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@


class Topic(Publisher, Subscriber):
def __init__(self, hub: 'Hub', path: str) -> None:
def __init__(self, hub: 'Hub', # pylint: disable=unused-argument
path: str) -> None:
Publisher.__init__(self)
self._subject = None # type: Publisher
self._path = path
Expand Down Expand Up @@ -161,7 +162,7 @@ def emit(self, value: Any,

return self._subject.emit(value, who=self)

def assign(self, subject):
def assign(self, subject, *_args, **_kwargs):
assert isinstance(subject, (Publisher, Subscriber))

if self._subject is not None:
Expand Down Expand Up @@ -201,7 +202,7 @@ def __init__(self, hub: 'Hub', path: str) -> None:
Topic.__init__(self, hub, path)
self._meta = dict() # type: Dict[str, Any]

def assign(self, subject, meta=None):
def assign(self, subject, meta=None): # pylint: disable=arguments-differ
Topic.assign(self, subject)
if meta is not None:
self._meta.update(meta)
Expand Down
2 changes: 2 additions & 0 deletions broqer/op/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
""" The op module contains all operators broqer offers """

# synchronous operators
from .any_ import Any, any_
from .all_ import All, all_
Expand Down
15 changes: 13 additions & 2 deletions broqer/op/debounce.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,18 @@


class Debounce(Operator):
""" Emit a value only after a given idle time (emits meanwhile are
skipped). Debounce can also be used for a timeout functionality.
:param publisher: source publisher
:param duetime: time in seconds to be waited for debounce
:param retrigger_value: value used to emit when value has changed
:param error_callback: error callback to be registered
:param loop: asyncio loop to be used
"""
def __init__(self, publisher: Publisher, duetime: float,
retrigger_value: Any = NONE,
error_callback=default_error_handler) -> None:
error_callback=default_error_handler,
loop=None) -> None:
assert duetime >= 0, 'duetime has to be positive'

Operator.__init__(self, publisher)
Expand All @@ -67,6 +76,7 @@ def __init__(self, publisher: Publisher, duetime: float,
self._error_callback = error_callback
self._state = NONE # type: Any
self._next_state = NONE # type: Any
self._loop = loop or asyncio.get_event_loop()

def unsubscribe(self, subscriber: Subscriber) -> None:
Operator.unsubscribe(self, subscriber)
Expand Down Expand Up @@ -109,7 +119,7 @@ def emit(self, value: Any, who: Publisher) -> None:
self._next_state = value

self._call_later_handler = \
asyncio.get_event_loop().call_later(self.duetime, self._debounced)
self._loop.call_later(self.duetime, self._debounced)

def _debounced(self):
self._call_later_handler = None
Expand All @@ -120,6 +130,7 @@ def _debounced(self):
self._error_callback(*sys.exc_info())

def reset(self):
""" Reset the debounce time """
if self._retrigger_value is not NONE:
self.notify(self._retrigger_value)
self._state = self._retrigger_value
Expand Down
6 changes: 6 additions & 0 deletions broqer/op/delay.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@


class Delay(Operator):
""" Emit every value delayed by the given time.
:param publisher: source publisher
:param duration: time of delay in seconds
:param error_callback: the error callback to be registered
:param loop: asyncio event loop to use
"""
def __init__(self, publisher: Publisher, duration: float,
error_callback=default_error_handler, loop=None) -> None:
assert duration >= 0, 'delay has to be positive'
Expand Down
13 changes: 11 additions & 2 deletions broqer/op/filter_.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@


class Filter(Operator):
""" Filters values based on a ``predicate`` function
:param publisher: source publisher
:param predicate: function to evaluate the filtering
:param \\*args: variable arguments to be used for evaluating predicate
:param unpack: value from emits will be unpacked as (*value)
:param \\**kwargs: keyword arguments to be used for evaluating predicate
"""
def __init__(self, publisher: Publisher,
predicate: Callable[[Any], bool],
*args, unpack: bool = False, **kwargs) -> None:
Expand Down Expand Up @@ -67,7 +74,8 @@ def emit(self, value: Any, who: Publisher) -> asyncio.Future:
return None


class True_(Operator):
class True_(Operator): # pylint: disable=invalid-name
""" Filters all emits which evaluates for True """
def get(self):
value = self._publisher.get() # may raise ValueError
if bool(value):
Expand All @@ -81,7 +89,8 @@ def emit(self, value: Any, who: Publisher) -> asyncio.Future:
return None


class False_(Operator):
class False_(Operator): # pylint: disable=invalid-name
""" Filters all emits which evaluates for False """
def get(self):
value = self._publisher.get() # may raise ValueError
if not bool(value):
Expand Down
7 changes: 7 additions & 0 deletions broqer/op/map_.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@


class Map(Operator):
""" Apply ``map_func(*args, value, **kwargs)`` to each emitted value.
:param publisher: source publisher
:param map_func: function to be applied for each emit
:param \\*args: variable arguments to be used for calling map_coro
:param unpack: value from emits will be unpacked as (*value)
:param \\**kwargs: keyword arguments to be used for calling map_coro
"""
def __init__(self, publisher: Publisher, map_func: Callable[[Any], Any],
*args, unpack=False, **kwargs) -> None:
""" special care for return values:
Expand Down
11 changes: 10 additions & 1 deletion broqer/op/map_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,18 @@


class MapAsync(Operator):
""" Apply ``map_coro`` to each emitted value allowing async processing
:param publisher: source publisher
:param map_coro: coroutine to be applied on emit
:param \\*args: variable arguments to be used for calling map_coro
:param mode: behavior when a value is currently processed
:param error_callback: error callback to be registered
:param unpack: value from emits will be unpacked as (*value)
:param \\*kwargs: keyword arguments to be used for calling map_coro
"""
def __init__(self, publisher: Publisher, map_coro, *args,
mode=MODE.CONCURRENT, error_callback=default_error_handler,
unpack=False, **kwargs) -> None:
unpack: bool = False, **kwargs) -> None:
"""
mode uses one of the following enumerations:
* CONCURRENT - just run coroutines concurrent
Expand Down
27 changes: 19 additions & 8 deletions broqer/op/map_threaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
import asyncio
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from typing import Any, Callable, MutableSequence # noqa: F401
from typing import Callable # noqa: F401

from broqer import Publisher, default_error_handler

Expand All @@ -97,26 +97,37 @@


class MapThreaded(MapAsync):
""" Apply ``map_func`` to each emitted value allowing threaded processing.
:param publisher: source publisher
:param map_func: function called to apply
:param \\*args: variable arguments to be used for calling map_coro
:param mode: behavior when a value is currently processed
:param error_callback: error callback to be registered
:param unpack: value from emits will be unpacked as (*value)
:param \\**kwargs: keyword arguments to be used for calling map_coro
"""
def __init__(self, publisher: Publisher, map_func, *args,
mode: MODE = MODE.CONCURRENT, # type: ignore
error_callback=default_error_handler, **kwargs) -> None:
error_callback=default_error_handler,
unpack: bool = False, loop=None, **kwargs) -> None:

assert mode != MODE.INTERRUPT, 'mode INTERRUPT is not supported'

MapAsync.__init__(self, publisher, self._thread_coro, mode=mode,
error_callback=error_callback)
error_callback=error_callback, unpack=unpack)

if args or kwargs:
self._map_func = \
partial(map_func, *args, **kwargs) # type: Callable
partial(map_func, *args, **kwargs) # type: Callable
else:
self._map_func = map_func # type: Callable
self._map_func = map_func

self._loop = loop or asyncio.get_event_loop()
self._executor = ThreadPoolExecutor()

async def _thread_coro(self, *args, **kwargs):
return await asyncio.get_event_loop().run_in_executor(
self._executor, self._map_func, *args, **kwargs)
async def _thread_coro(self, *args):
return await self._loop.run_in_executor(
self._executor, self._map_func, *args)


map_threaded = build_operator(MapThreaded) # pylint: disable=invalid-name
3 changes: 3 additions & 0 deletions broqer/op/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@


class Merge(MultiOperator):
""" Merge emits of multiple publishers into one stream
:param \\*publishers: source publishers to be merged
"""
def __init__(self, *publishers: Publisher) -> None:
MultiOperator.__init__(self, *publishers)

Expand Down
4 changes: 4 additions & 0 deletions broqer/op/operator.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
""" Module implementing Operator, MultiOperator and build_operator.
"""
from abc import abstractmethod

from broqer import Publisher, Subscriber, SubscriptionDisposable
Expand Down Expand Up @@ -37,6 +39,7 @@ def unsubscribe(self, subscriber: Subscriber) -> None:

@property
def source_publishers(self):
""" tuple with all source publishers """
return (self._publisher, )

@abstractmethod
Expand Down Expand Up @@ -79,6 +82,7 @@ def unsubscribe(self, subscriber: Subscriber) -> None:

@property
def source_publishers(self):
""" tuple with all source publishers """
return self._publishers

@abstractmethod
Expand Down
2 changes: 2 additions & 0 deletions broqer/op/operator_overloading.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
""" This module enables the operator overloading of publishers """
import asyncio
import operator
from typing import Any
Expand Down Expand Up @@ -41,6 +42,7 @@ def emit(self, value: Any, who: Publisher) -> asyncio.Future:


def apply_operator_overloading():
""" function to apply operator overloading to Publisher class """
for method in ('__lt__', '__le__', '__eq__', '__ne__', '__ge__', '__gt__',
'__add__', '__and__', '__lshift__', '__mod__', '__mul__',
'__pow__', '__rshift__', '__sub__', '__xor__', '__concat__',
Expand Down
5 changes: 5 additions & 0 deletions broqer/op/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@


class Partition(Operator):
""" Group ``size`` emits into one emit as tuple.
:param publisher: source publisher
:param size: emits to be collected before emit
"""
def __init__(self, publisher: Publisher, size: int) -> None:
# use size = 0 for unlimited partition size
# (only make sense when using .flush() )
Expand Down Expand Up @@ -58,6 +62,7 @@ def emit(self, value: Any, who: Publisher) -> asyncio.Future:
return None

def flush(self):
""" Emits the current queue and clears the queue """
self.notify(tuple(self._queue))
self._queue.clear()

Expand Down
11 changes: 10 additions & 1 deletion broqer/op/publishers/from_polling.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@


class FromPolling(Publisher):
""" Call ``func(*args, **kwargs)`` periodically and emit the returned
values.
:param interval: periodic interval in seconds
:param poll_func: function to be called
:param \\*args: variable arguments to be used for calling poll_func
:param error_callback: error callback to be registered
:param loop: asyncio event loop to use
:param \\*kwargs: keyword arguments to be used for calling poll_func
"""
def __init__(self, interval, poll_func: Callable[[Any], Any], *args,
error_callback=default_error_handler, loop=None,
**kwargs) -> None:
Expand Down Expand Up @@ -70,7 +79,7 @@ def _poll_callback(self):
except Exception: # pylint: disable=broad-except
self._error_callback(*sys.exc_info())

self._call_later_handler = asyncio.get_event_loop().call_later(
self._call_later_handler = self._loop.call_later(
self._interval, self._poll_callback)
else:
self._call_later_handler = None
11 changes: 10 additions & 1 deletion broqer/op/reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,17 @@


class Reduce(Accumulate):
""" Apply ``func`` to the current emitted value and the last result of
``func``.
:param publisher: source publisher
:param func: function taking the emitted value and the last result of the
last run.
:param init: initialisation used as "first result" for the first call of
``func`` on first emit.
"""
def __init__(self, publisher: Publisher, func: Callable[[Any, Any], Any],
init=None) -> None:
init) -> None:
def _func(state, value):
result = func(state, value)
return (result, result) # new state and result is the same
Expand Down

0 comments on commit 0f5b9a0

Please sign in to comment.