Skip to content

Commit

Permalink
make sink a pure subscriber (instead a operator it was before)
Browse files Browse the repository at this point in the history
  • Loading branch information
semiversus committed Sep 12, 2018
1 parent 8625a79 commit 9591815
Show file tree
Hide file tree
Showing 43 changed files with 122 additions and 156 deletions.
6 changes: 3 additions & 3 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ be printed (with the prefix 'Voltage too high:')
| op.sliding_window(4) # append the value to a buffer with 4 elements (and drop the oldest value)
| op.map(statistics.mean) # use ``statistics.mean`` to calulate the average over the emitted sequence
| op.filter(lambda v:v>1) # emit only values greater 1
| op.sink(print, 'Voltage too high:') # call ``print`` with 'Voltage too high:' and the value
| op.Sink(print, 'Voltage too high:') # call ``print`` with 'Voltage too high:' and the value
)
.. image:: https://cdn.rawgit.com/semiversus/python-broqer/ec5ddbbd/docs/example1.svg
Expand Down Expand Up @@ -176,7 +176,7 @@ Subscribers
A Subscriber_ is the sink for messages.

+----------------------------------+--------------------------------------------------------------+
| sink_ (func, \*args, \*\*kwargs) | Apply ``func(*args, value, **kwargs)`` to each emitted value |
| Sink_ (func, \*args, \*\*kwargs) | Apply ``func(*args, value, **kwargs)`` to each emitted value |
+----------------------------------+--------------------------------------------------------------+
| to_future_ (timeout=None) | Build a future able to await for |
+----------------------------------+--------------------------------------------------------------+
Expand All @@ -197,7 +197,7 @@ A Subscriber_ is the sink for messages.
.. _partition: https://github.com/semiversus/python-broqer/blob/master/broqer/op/partition.py
.. _reduce: https://github.com/semiversus/python-broqer/blob/master/broqer/op/reduce.py
.. _sample: https://github.com/semiversus/python-broqer/blob/master/broqer/op/sample.py
.. _sink: https://github.com/semiversus/python-broqer/blob/master/broqer/op/sink.py
.. _Sink: https://github.com/semiversus/python-broqer/blob/master/broqer/op/sink.py
.. _sliding_window: https://github.com/semiversus/python-broqer/blob/master/broqer/op/sliding_window.py
.. _switch: https://github.com/semiversus/python-broqer/blob/master/broqer/op/switch.py
.. _throttle: https://github.com/semiversus/python-broqer/blob/master/broqer/op/throttle.py
Expand Down
4 changes: 2 additions & 2 deletions broqer/hub/hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
It's possible to subscribe to a topic
>>> _d1 = hub['value1'] | op.sink(print, 'Output:')
>>> _d1 = hub['value1'] | op.Sink(print, 'Output:')
At the moment this hub object is not assigned to a publisher
Expand Down Expand Up @@ -50,7 +50,7 @@
Also assigning publisher first and then subscribing is possible:
>>> _ = hub.assign('value2', Value(2))
>>> _d2 = hub['value2'] | op.sink(print, 'Output:')
>>> _d2 = hub['value2'] | op.Sink(print, 'Output:')
Output: 2
>>> hub['value2'].emit(3)
Expand Down
11 changes: 5 additions & 6 deletions broqer/op/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@

# subscribers
from .subscribers.to_future import ToFuture, to_future
from .subscribers.sink import Sink, sink
from .subscribers.trace import Trace, trace
from .subscribers.topic_mapper import TopicMapper, topic_mapper
from .subscribers.sink import Sink
from .subscribers.trace import Trace
from .subscribers.topic_mapper import TopicMapper

# enable operator overloading
from .operator_overloading import apply_operator_overloading
Expand All @@ -43,10 +43,9 @@
'CatchException', 'catch_exception', 'CombineLatest', 'combine_latest',
'Filter', 'filter_', 'Map', 'map_', 'Merge',
'merge', 'Partition', 'partition', 'Reduce', 'reduce', 'Replace',
'replace', 'Sink', 'sink', 'SlidingWindow', 'sliding_window', 'Switch',
'replace', 'Sink', 'SlidingWindow', 'sliding_window', 'Switch',
'switch', 'Debounce', 'debounce', 'Delay', 'delay', 'FromPolling',
'Sample', 'sample', 'MapAsync', 'map_async', 'MODE', 'MapThreaded',
'map_threaded', 'Throttle', 'throttle', 'ToFuture', 'to_future',
'True_', 'true', 'False_', 'false', 'Trace', 'trace', 'TopicMapper',
'topic_mapper'
'True_', 'true', 'False_', 'false', 'Trace', 'TopicMapper',
]
2 changes: 1 addition & 1 deletion broqer/op/accumulate.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
... return state, sum(state)/len(state)
>>> lowpass = s | op.accumulate(moving_average, init=[0]*3)
>>> lowpass | op.sink(print)
>>> lowpass | op.Sink(print)
<...>
>>> s.emit(3)
1.0
Expand Down
2 changes: 1 addition & 1 deletion broqer/op/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
>>> s = Subject()
>>> cached_publisher = s | op.cache(0)
>>> _disposable = cached_publisher | op.sink(print, sep=' - ')
>>> _disposable = cached_publisher | op.Sink(print, sep=' - ')
0
>>> s.emit(3)
Expand Down
4 changes: 2 additions & 2 deletions broqer/op/catch_exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
Example with exception:
>>> disposable = s | op.map_(lambda s:1/s) | op.sink(print)
>>> disposable = s | op.map_(lambda s:1/s) | op.Sink(print)
>>> s.emit(1)
1.0
Expand All @@ -18,7 +18,7 @@
Now with ``catch_exception``:
>>> excp = ZeroDivisionError
>>> s | op.catch_exception(excp) | op.map_(lambda s:1/s) | op.sink(print)
>>> s | op.catch_exception(excp) | op.map_(lambda s:1/s) | op.Sink(print)
<...>
>>> s.emit(1)
Expand Down
4 changes: 2 additions & 2 deletions broqer/op/combine_latest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
>>> s2 = Subject()
>>> combination = s1 | op.combine_latest(s2)
>>> disposable = combination | op.sink(print)
>>> disposable = combination | op.Sink(print)
CombineLatest is only emitting, when all values are collected:
Expand All @@ -17,7 +17,7 @@
Subscribing to a CombineLatest with all values available is emitting the values
immediate on subscription:
>>> combination | op.sink(print, 'Second sink:')
>>> combination | op.Sink(print, 'Second sink:')
Second sink: (1, 3)
<...>
Expand Down
4 changes: 2 additions & 2 deletions broqer/op/debounce.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
>>> import asyncio
>>> from broqer import Subject, op
>>> s = Subject()
>>> _d = s | op.debounce(0.1) | op.sink(print)
>>> _d = s | op.debounce(0.1) | op.Sink(print)
>>> s.emit(1)
>>> s.emit(2)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.05))
Expand All @@ -19,7 +19,7 @@
When debounce is retriggered you can specify a value to emit:
>>> debounce_publisher = s | op.debounce(0.1, False)
>>> _d = debounce_publisher | op.sink(print)
>>> _d = debounce_publisher | op.Sink(print)
>>> s.emit(False)
False
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.15))
Expand Down
2 changes: 1 addition & 1 deletion broqer/op/delay.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
>>> import asyncio
>>> from broqer import Subject, op
>>> s = Subject()
>>> s | op.delay(0.1) | op.sink(print)
>>> s | op.delay(0.1) | op.Sink(print)
<...>
>>> s.emit(1)
>>> s.emit(2)
Expand Down
4 changes: 2 additions & 2 deletions broqer/op/filter_.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
>>> s = Subject()
>>> filtered_publisher = s | op.filter_(lambda v:v>0)
>>> _disposable = filtered_publisher | op.sink(print)
>>> _disposable = filtered_publisher | op.Sink(print)
>>> s.emit(1)
1
Expand All @@ -19,7 +19,7 @@
>>> import operator
>>> filtered_publisher = s | op.filter_(operator.and_, 0x01)
>>> _disposable = filtered_publisher | op.sink(print)
>>> _disposable = filtered_publisher | op.Sink(print)
>>> s.emit(100)
>>> s.emit(101)
101
Expand Down
6 changes: 3 additions & 3 deletions broqer/op/map_.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
>>> s = Subject()
>>> mapped_publisher = s | op.map_(lambda v:v*2)
>>> _disposable = mapped_publisher | op.sink(print)
>>> _disposable = mapped_publisher | op.Sink(print)
>>> s.emit(1)
2
Expand All @@ -21,12 +21,12 @@
>>> import operator
>>> mapped_publisher = s | op.map_(operator.add, 3)
>>> _disposable = mapped_publisher | op.sink(print)
>>> _disposable = mapped_publisher | op.Sink(print)
>>> s.emit(100)
103
>>> _disposable.dispose()
>>> _disposable = s | op.map_(print, 'Output:') | op.sink(print, 'EMITTED')
>>> _disposable = s | op.map_(print, 'Output:') | op.Sink(print, 'EMITTED')
>>> s.emit(1)
Output: 1
EMITTED None
Expand Down
12 changes: 6 additions & 6 deletions broqer/op/map_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
MODE: CONCURRENT (is default)
>>> _d = s | op.map_async(delay_add) | op.sink()
>>> _d = s | op.map_async(delay_add) | op.Sink()
>>> s.emit(0)
>>> s.emit(1)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.02))
Expand All @@ -28,7 +28,7 @@
MODE: INTERRUPT
>>> _d = s | op.map_async(delay_add, mode=op.MODE.INTERRUPT) | op.sink(print)
>>> _d = s | op.map_async(delay_add, mode=op.MODE.INTERRUPT) | op.Sink(print)
>>> s.emit(0)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.005))
Starting with argument 0
Expand All @@ -41,7 +41,7 @@
MODE: QUEUE
>>> _d = s | op.map_async(delay_add, mode=op.MODE.QUEUE) | op.sink(print)
>>> _d = s | op.map_async(delay_add, mode=op.MODE.QUEUE) | op.Sink(print)
>>> s.emit(0)
>>> s.emit(1)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.04))
Expand All @@ -55,7 +55,7 @@
MODE: LAST
>>> _d = s | op.map_async(delay_add, mode=op.MODE.LAST) | op.sink(print)
>>> _d = s | op.map_async(delay_add, mode=op.MODE.LAST) | op.Sink(print)
>>> s.emit(0)
>>> s.emit(1)
>>> s.emit(2)
Expand All @@ -70,7 +70,7 @@
MODE: SKIP
>>> _d = s | op.map_async(delay_add, mode=op.MODE.SKIP) | op.sink(print)
>>> _d = s | op.map_async(delay_add, mode=op.MODE.SKIP) | op.Sink(print)
>>> s.emit(0)
>>> s.emit(1)
>>> s.emit(2)
Expand All @@ -85,7 +85,7 @@
>>> def cb(*e):
... print('Got error')
>>> _d = s | op.map_async(delay_add, error_callback=cb) | op.sink(print)
>>> _d = s | op.map_async(delay_add, error_callback=cb) | op.Sink(print)
>>> s.emit('abc')
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.02))
Starting with argument abc
Expand Down
10 changes: 5 additions & 5 deletions broqer/op/map_threaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
Mode: CONCURRENT (is default)
>>> _d = s | op.map_threaded(delay_add) | op.sink()
>>> _d = s | op.map_threaded(delay_add) | op.Sink()
>>> s.emit(0)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.001))
Starting with argument 0
Expand All @@ -29,7 +29,7 @@
Mode: QUEUE
>>> _d = s | op.map_threaded(delay_add, mode=op.MODE.QUEUE) | op.sink(print)
>>> _d = s | op.map_threaded(delay_add, mode=op.MODE.QUEUE) | op.Sink(print)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.001))
>>> s.emit(0)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.001))
Expand All @@ -45,7 +45,7 @@
Mode: LAST
>>> _d = s | op.map_threaded(delay_add, mode=op.MODE.LAST) | op.sink(print)
>>> _d = s | op.map_threaded(delay_add, mode=op.MODE.LAST) | op.Sink(print)
>>> s.emit(0)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.001))
Starting with argument 0
Expand All @@ -61,7 +61,7 @@
Mode: SKIP
>>> _d = s | op.map_threaded(delay_add, mode=op.MODE.SKIP) | op.sink(print)
>>> _d = s | op.map_threaded(delay_add, mode=op.MODE.SKIP) | op.Sink(print)
>>> s.emit(0)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.001))
Starting with argument 0
Expand All @@ -77,7 +77,7 @@
>>> def cb(*e):
... print('Got error')
>>> _d = s | op.map_threaded(delay_add, error_callback=cb) | op.sink(print)
>>> _d = s | op.map_threaded(delay_add, error_callback=cb) | op.Sink(print)
>>> s.emit('abc')
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.001))
Starting with argument abc
Expand Down
2 changes: 1 addition & 1 deletion broqer/op/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
>>> s1 = Subject()
>>> s2 = Subject()
>>> _d = s1 | op.merge(s2) | op.sink(print, 'Merge:')
>>> _d = s1 | op.merge(s2) | op.Sink(print, 'Merge:')
>>> s1.emit(1)
Merge: 1
>>> s2.emit('abc')
Expand Down
2 changes: 1 addition & 1 deletion broqer/op/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
>>> s = Subject()
>>> partitioned_publisher = s | op.partition(3)
>>> _d = partitioned_publisher | op.sink(print, 'Partition:')
>>> _d = partitioned_publisher | op.Sink(print, 'Partition:')
>>> s.emit(1)
>>> s.emit(2)
Expand Down
4 changes: 2 additions & 2 deletions broqer/op/publishers/from_polling.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
>>> import itertools
>>> from broqer import op
>>> _d = op.FromPolling(0.015, itertools.count().__next__) | op.sink(print)
>>> _d = op.FromPolling(0.015, itertools.count().__next__) | op.Sink(print)
0
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.07))
1
Expand All @@ -18,7 +18,7 @@
>>> def foo(arg):
... print('Foo:', arg)
>>> _d = op.FromPolling(0.015, foo, 5) | op.sink()
>>> _d = op.FromPolling(0.015, foo, 5) | op.Sink()
Foo: 5
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.05))
Foo: 5
Expand Down
2 changes: 1 addition & 1 deletion broqer/op/reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
... return last_result*10+value
>>> reduce_publisher = s | op.reduce(build_number, 0)
>>> _d = reduce_publisher | op.sink(print, 'Reduce:')
>>> _d = reduce_publisher | op.Sink(print, 'Reduce:')
>>> s.emit(4)
Reduce: 4
>>> s.emit(7)
Expand Down
4 changes: 2 additions & 2 deletions broqer/op/sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
>>> sample_publisher = s | op.sample(0.015)
>>> _d = sample_publisher | op.sink(print, 'Sample:')
>>> _d = sample_publisher | op.Sink(print, 'Sample:')
>>> s.emit(1)
Sample: 1
Expand All @@ -24,7 +24,7 @@
...
Sample: (2, 3)
>>> _d2 = sample_publisher | op.sink(print, 'Sample 2:')
>>> _d2 = sample_publisher | op.Sink(print, 'Sample 2:')
Sample 2: (2, 3)
>>> _d.dispose()
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.06))
Expand Down
4 changes: 2 additions & 2 deletions broqer/op/sliding_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
>>> s = Subject()
>>> window_publisher = s | op.sliding_window(3)
>>> _d = window_publisher | op.sink(print, 'Sliding Window:')
>>> _d = window_publisher | op.Sink(print, 'Sliding Window:')
>>> s.emit(1)
>>> s.emit(2)
>>> s.emit(3)
Sliding Window: (1, 2, 3)
>>> with window_publisher | op.sink(print, '2nd subscriber:'):
>>> with window_publisher | op.Sink(print, '2nd subscriber:'):
... pass
2nd subscriber: (1, 2, 3)
>>> s.emit((4, 5))
Expand Down

0 comments on commit 9591815

Please sign in to comment.