Skip to content

Commit

Permalink
Merge branch 'master' of github.com:semiversus/python-broqer
Browse files Browse the repository at this point in the history
  • Loading branch information
semiversus committed Sep 17, 2018
2 parents 07df53b + abab84a commit 7a45377
Show file tree
Hide file tree
Showing 21 changed files with 153 additions and 110 deletions.
93 changes: 62 additions & 31 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ Python Broqer
.. image:: https://img.shields.io/github/license/semiversus/python-broqer.svg
:target: https://en.wikipedia.org/wiki/MIT_License

+--------------------------------------------------------------------------------+
| **Broqer 1.0.0 will be released soon (till end of october 2018) - stay tuned** |
+--------------------------------------------------------------------------------+

Initial focus on embedded systems *Broqer* can be used wherever continuous streams of data have to be processed - and they are everywhere. Watch out!

.. image:: https://cdn.rawgit.com/semiversus/python-broqer/7beb7379/docs/logo.svg
Expand All @@ -29,11 +33,9 @@ Synopsis
- Source is hosted on GitHub.com_
- Documentation is hosted on ReadTheDocs.com_
- Tested on Python 3.5, 3.6 and 3.7
- Compact library (<1000 lines of code) and well documented (>1000 lines of comments)
- Fully unit tested with pytest_, coding style checked with flake8_, static type checked with mypy_, static code checked with pylint_
- Operators known from ReactiveX_ and other streaming frameworks (like distinct_, combine_latest_, ...)
- Supporting ``asyncio`` for time depended operations and using coroutines (e.g. map_async_, debounce_, ...)
- Publishers are *awaitable* (e.g. ``await adc_raw``)
- Compact library (~1000 lines of code) and well documented
- Unit tested with pytest_, coding style checked with flake8_, static type checked with mypy_, static code checked with pylint_
- Operators known from ReactiveX_ and other streaming frameworks (like map_, combine_latest_, ...)
- Broker functionality via Hub_

+ Centralised object to keep track of publishers and subscribers
Expand All @@ -47,24 +49,28 @@ Synopsis
.. _ReadTheDocs.com: http://python-broqer.readthedocs.io
.. _ReactiveX: http://reactivex.io/

.. _Hub: https://github.com/semiversus/python-broqer/blob/master/broqer/hub.py
.. _Hub: https://github.com/semiversus/python-broqer/blob/master/broqer/hub/hub.py
.. _debounce: https://github.com/semiversus/python-broqer/blob/master/broqer/op/debounce.py
.. _map_async: https://github.com/semiversus/python-broqer/blob/master/broqer/op/map_async.py
.. _combine_latest: https://github.com/semiversus/python-broqer/blob/master/broqer/op/combine_latest.py
.. _distinct: https://github.com/semiversus/python-broqer/blob/master/broqer/op/distinct.py

.. marker-for-example
Showcase
========

In other frameworks Publisher_ are sometimes called ``Oberservable``. A subscriber
is able to observe changes the publisher is emitting.
is able to observe changes the publisher is emitting. With this basics you're
able to use the observer pattern - let's see!

Observer pattern
----------------

.. code-block:: python
Subscribing to a publisher is done via the `|` - here used as a pipe. A simple
subscriber is `op.Sink` which is calling a function with optional positional
and keyword arguments.

.. code-block:: python3
>>> from broqer import Value, op
>>> a = Value(5) # create a value (publisher and subscriber with state)
Expand All @@ -79,20 +85,53 @@ Observer pattern
Combining publishers
--------------------

.. code-block:: python
You're able to create publishers on the fly by combining two publishers with
the common operators (like `+`, `>`, `<<`, ...).

.. code-block:: python3
>>> from broqer import Value, op
>>> a = Value(1)
>>> b = Value(3)
>>> c = a * 3 > b # create a new publisher via operator overloading
>>> c | op.Sink(print, 'c =')
c = False
>>> c | op.Sink(print, 'c:')
c: False
>>> a.emit(1) # will not change the state of c
>>> a.emit(2)
c = True
c: True
Asyncio Support
---------------

A lot of operators are made for asynchronous operations. You're able to debounce
and throttle emits (via `op.debounce` and `op.throttle`), sample and delay (via
`op.Sample` and `op.Delay`) or start coroutines and when finishing the result
will be emitted.

.. code-block:: python3
>>> async def long_running_coro(value):
... await asyncio.sleep(3)
... return value + 1
...
>>> a = Value(0)
>>> a | op.map_async(long_running_coro) | op.Sink(print, 'Result:')
After 3 seconds the result will be:

Result:0

`map_async` supports various modes how to handle a new emit when a coroutine
is running. Default is a concurrent run of coroutines, but also various queue
or interrupt mode is available.

Every publisher can be awaited in coroutines:

.. code-block:: python3
await signal_publisher
Install
=======
Expand All @@ -103,12 +142,6 @@ Install
.. marker-for-api
How it works
============

Basically it's based on the observable pattern - an object you can register on and you will be informed as
soon the state has changed. The observable are called ``Publishers``.

API
===

Expand Down Expand Up @@ -139,8 +172,6 @@ Operators
+-------------------------------------+-----------------------------------------------------------------------------+
| combine_latest_ (\*publishers) | Combine the latest emit of multiple publishers and emit the combination |
+-------------------------------------+-----------------------------------------------------------------------------+
| distinct_ (\*init) | Only emit values which changed regarding to the cached state |
+-------------------------------------+-----------------------------------------------------------------------------+
| filter_ (predicate, ...) | Filters values based on a ``predicate`` function |
+-------------------------------------+-----------------------------------------------------------------------------+
| map_ (map_func, \*args, \*\*kwargs) | Apply ``map_func(*args, value, **kwargs)`` to each emitted value |
Expand Down Expand Up @@ -198,28 +229,28 @@ Subjects

.. _Subject: https://github.com/semiversus/python-broqer/blob/master/broqer/subject.py
.. _Value: https://github.com/semiversus/python-broqer/blob/master/broqer/subject.py
.. _Publisher: https://github.com/semiversus/python-broqer/blob/master/broqer/core.py
.. _StatefulPublisher: https://github.com/semiversus/python-broqer/blob/master/broqer/core.py
.. _Subscriber: https://github.com/semiversus/python-broqer/blob/master/broqer/core.py
.. _Publisher: https://github.com/semiversus/python-broqer/blob/master/broqer/publisher.py
.. _StatefulPublisher: https://github.com/semiversus/python-broqer/blob/master/broqer/publisher.py
.. _Subscriber: https://github.com/semiversus/python-broqer/blob/master/broqer/subscriber.py
.. _accumulate: https://github.com/semiversus/python-broqer/blob/master/broqer/op/accumulate.py
.. _cache: https://github.com/semiversus/python-broqer/blob/master/broqer/op/cache.py
.. _catch_exception: https://github.com/semiversus/python-broqer/blob/master/broqer/op/catch_exception.py
.. _delay: https://github.com/semiversus/python-broqer/blob/master/broqer/op/delay.py
.. _filter: https://github.com/semiversus/python-broqer/blob/master/broqer/op/filter.py
.. _FromPolling: https://github.com/semiversus/python-broqer/blob/master/broqer/op/from_polling.py
.. _filter: https://github.com/semiversus/python-broqer/blob/master/broqer/op/filter_.py
.. _FromPolling: https://github.com/semiversus/python-broqer/blob/master/broqer/op/publishers/from_polling.py
.. _map_threaded: https://github.com/semiversus/python-broqer/blob/master/broqer/op/map_threaded.py
.. _map: https://github.com/semiversus/python-broqer/blob/master/broqer/op/map.py
.. _map: https://github.com/semiversus/python-broqer/blob/master/broqer/op/map_.py
.. _merge: https://github.com/semiversus/python-broqer/blob/master/broqer/op/merge.py
.. _partition: https://github.com/semiversus/python-broqer/blob/master/broqer/op/partition.py
.. _reduce: https://github.com/semiversus/python-broqer/blob/master/broqer/op/reduce.py
.. _sample: https://github.com/semiversus/python-broqer/blob/master/broqer/op/sample.py
.. _Sink: https://github.com/semiversus/python-broqer/blob/master/broqer/op/sink.py
.. _Sink: https://github.com/semiversus/python-broqer/blob/master/broqer/op/subscribers/sink.py
.. _sliding_window: https://github.com/semiversus/python-broqer/blob/master/broqer/op/sliding_window.py
.. _switch: https://github.com/semiversus/python-broqer/blob/master/broqer/op/switch.py
.. _throttle: https://github.com/semiversus/python-broqer/blob/master/broqer/op/throttle.py
.. _OnEmitFuture: https://github.com/semiversus/python-broqer/blob/master/broqer/op/emit_future.py
.. _Trace: https://github.com/semiversus/python-broqer/blob/master/broqer/op/sink/trace.py
.. _TopicMapper: https://github.com/semiversus/python-broqer/blob/master/broqer/op/sink/topic_mapper.py
.. _OnEmitFuture: https://github.com/semiversus/python-broqer/blob/master/broqer/op/subscribers/on_emit_future.py
.. _Trace: https://github.com/semiversus/python-broqer/blob/master/broqer/op/subscribers/trace.py
.. _TopicMapper: https://github.com/semiversus/python-broqer/blob/master/broqer/op/subscribers/topic_mapper.py

Credits
=======
Expand Down
2 changes: 1 addition & 1 deletion broqer/default_error_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@


def _default_error_callback(exc_type, exc_value, exc_traceback):
""" default error callback is printing traceback of the exception
""" Default error callback is printing traceback of the exception
"""
traceback.print_exception(exc_type, exc_value, exc_traceback)

Expand Down
8 changes: 4 additions & 4 deletions broqer/disposable.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ def dispose(self) -> None:
""" .dispose() method has to be overwritten"""

def __enter__(self):
""" called on entry of a new context """
""" Called on entry of a new context """
return self

def __exit__(self, _type, _value, _traceback):
""" called on exit of the context. .dispose() is called here """
""" Called on exit of the context. .dispose() is called here """
self.dispose()


Expand All @@ -51,10 +51,10 @@ def dispose(self) -> None:

@property
def publisher(self) -> 'Publisher':
""" subscripted publisher """
""" Subscripted publisher """
return self._publisher

@property
def subscriber(self) -> 'Subscriber':
""" subscriber used in this subscription """
""" Subscriber used in this subscription """
return self._subscriber
20 changes: 10 additions & 10 deletions broqer/hub/hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def emit(self, value: Any,
return self._subject.emit(value, who=self)

def assign(self, subject):
""" assigns the given subject to the topic """
""" Assigns the given subject to the topic """
assert isinstance(subject, (Publisher, Subscriber))

if self._subject is not None:
Expand All @@ -178,35 +178,35 @@ def assign(self, subject):
self.assignment_future.set_result(None)

def freeze(self):
""" called by hub when hub is going to be frozen """
""" Called by hub when hub is going to be frozen """
if self._subject is None:
raise ValueError('Topic %r is not assigned' % self._path)

@property
def assigned(self) -> bool:
""" telling as boolean if topic is assigned with a publisher/subscriber
""" Telling as boolean if topic is assigned with a publisher/subscriber
"""
return self._subject is not None

@property
def subject(self):
""" the assigned subject """
""" The assigned subject """
return self._subject

async def wait_for_assignment(self):
""" coroutine to wait until the assignment is finished """
""" Coroutine to wait until the assignment is finished """
if not self.assigned:
self.assignment_future = asyncio.get_event_loop().create_future()
await self.assignment_future

@property
def path(self) -> str:
""" topic path used as key in the hub """
""" Topic path used as key in the hub """
return self._path

@property
def hub(self) -> 'Hub':
""" reference to hub """
""" Reference to hub """
return self._hub


Expand All @@ -223,7 +223,7 @@ def assign(self, subject, meta=None): # pylint: disable=arguments-differ

@property
def meta(self):
""" the meta dictionary """
""" The meta dictionary """
return self._meta


Expand Down Expand Up @@ -265,13 +265,13 @@ def freeze(self, freeze: bool = True):

@property
def topics(self):
""" ordered dictionary with path:topic ordered by path """
""" Ordered dictionary with path:topic ordered by path """
topics_sorted = sorted(self._topics.items(), key=lambda t: t[0])
return MappingProxyType(OrderedDict(topics_sorted))

@property
def topic_factory(self):
""" used topic_factory """
""" Used topic_factory """
return self._topic_factory


Expand Down
4 changes: 2 additions & 2 deletions broqer/op/all_.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Applying any or all build in function to multiple publishers"""
""" Applying any or all build in function to multiple publishers"""
from typing import Callable
from typing import Any as Any_

Expand All @@ -9,7 +9,7 @@


class All(_MultiPredicate):
"""Applying any built in to source publishers"""
""" Applying any built in to source publishers"""
def __init__(self, *publishers: Publisher,
predicate: Callable[[Any_], bool] = None) -> None:
_MultiPredicate.__init__(self, *publishers, predicate=predicate)
Expand Down
4 changes: 2 additions & 2 deletions broqer/op/any_.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Applying any or all build in function to multiple publishers"""
""" Applying any or all build in function to multiple publishers"""
import asyncio
from typing import Dict, MutableSequence, Callable # noqa: F401
from typing import Any as Any_
Expand Down Expand Up @@ -55,7 +55,7 @@ def emit(self, value: Any_, who: Publisher) -> asyncio.Future:


class Any(_MultiPredicate):
"""Applying any built in to source publishers"""
""" Applying any built in to source publishers"""
def __init__(self, *publishers: Publisher,
predicate: Callable[[Any_], bool] = None) -> None:
_MultiPredicate.__init__(self, *publishers, predicate=predicate)
Expand Down
2 changes: 1 addition & 1 deletion broqer/op/map_.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class Map(Operator):
"""
def __init__(self, publisher: Publisher, map_func: Callable[[Any], Any],
*args, unpack=False, **kwargs) -> None:
""" special care for return values:
""" Special care for return values:
* return `None` (or nothing) if you don't want to return a result
* return `None, ` if you want to return `None`
* return `(a, b), ` to return a tuple as value
Expand Down
4 changes: 2 additions & 2 deletions broqer/op/map_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ def emit(self, value: Any, who: Publisher) -> None:
self._run_coro(value)

def _future_done(self, future):
""" will be called when the coroutine is done """
""" Will be called when the coroutine is done """
try:
# notify the subscribers (except result is an exception or NONE)
result = future.result() # may raise exception
Expand All @@ -202,7 +202,7 @@ def _future_done(self, future):
self._run_coro(value)

def _run_coro(self, value):
""" start the coroutine as task """
""" Start the coroutine as task """

# when LAST_DISTINCT is used only start coroutine when value changed
if self._options.mode is MODE.LAST_DISTINCT and \
Expand Down
8 changes: 4 additions & 4 deletions broqer/op/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ def unsubscribe(self, subscriber: Subscriber) -> None:

@property
def source_publishers(self):
""" tuple with all source publishers """
""" Tuple with all source publishers """
return (self._publisher, )

@abstractmethod
def get(self): # pylint: disable=useless-return, no-self-use
""" get value of operator """
""" Get value of operator """


class MultiOperator(Publisher, Subscriber): # pylint: disable=abstract-method
Expand Down Expand Up @@ -82,12 +82,12 @@ def unsubscribe(self, subscriber: Subscriber) -> None:

@property
def source_publishers(self):
""" tuple with all source publishers """
""" Tuple with all source publishers """
return self._publishers

@abstractmethod
def get(self): # pylint: disable=useless-return, no-self-use
""" get value of operator """
""" Get value of operator """


def build_operator(operator_cls):
Expand Down
2 changes: 1 addition & 1 deletion broqer/op/operator_overloading.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def emit(self, value: Any, who: Publisher) -> asyncio.Future:


def apply_operator_overloading():
""" function to apply operator overloading to Publisher class """
""" Function to apply operator overloading to Publisher class """
for method in ('__lt__', '__le__', '__eq__', '__ne__', '__ge__', '__gt__',
'__add__', '__and__', '__lshift__', '__mod__', '__mul__',
'__pow__', '__rshift__', '__sub__', '__xor__', '__concat__',
Expand Down
2 changes: 1 addition & 1 deletion broqer/op/sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def get(self):
return Publisher.get(self) # raises ValueError

def _periodic_callback(self):
""" will be started on first emit """
""" Will be started on first emit """
try:
self.notify(self._state) # emit to all subscribers
except Exception: # pylint: disable=broad-except
Expand Down

0 comments on commit 7a45377

Please sign in to comment.