Skip to content

Commit

Permalink
Merge pull request #38 from flofeurstein/master
Browse files Browse the repository at this point in the history
migrate throttle operator. Thx - will release a new version
  • Loading branch information
semiversus committed Jan 11, 2021
2 parents bb67299 + 24b4dd8 commit e41fd87
Show file tree
Hide file tree
Showing 4 changed files with 234 additions and 1 deletion.
2 changes: 2 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ Operators
+-------------------------------------+-----------------------------------------------------------------------------+
| MapAsync_ (coro, mode, ...) | Apply ``coro(*args, value, **kwargs)`` to each emitted value |
+-------------------------------------+-----------------------------------------------------------------------------+
| Throttle (duration) | Limit the number of emits per duration |
+-------------------------------------+-----------------------------------------------------------------------------+

Subscribers
-----------
Expand Down
3 changes: 2 additions & 1 deletion broqer/op/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

# utils
from broqer.op.concat import Concat
from broqer.op.throttle import Throttle

# enable operator overloading
from .py_operators import Str, Bool, Int, Float, Repr, Len, In, All, Any, \
Expand All @@ -22,5 +23,5 @@
'EvalFalse', 'build_map', 'build_map_factory', 'build_combine_latest',
'build_filter', 'build_filter_factory', 'Concat', 'Str', 'Bool', 'Int',
'Float', 'Repr', 'map_bit', 'build_map_async_factory',
'Len', 'In', 'All', 'Any', 'BitwiseAnd', 'BitwiseOr', 'Not'
'Len', 'In', 'All', 'Any', 'BitwiseAnd', 'BitwiseOr', 'Not', 'Throttle'
]
92 changes: 92 additions & 0 deletions broqer/op/throttle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
"""
Rate limit emits by the given time.
Usage:
>>> import asyncio
>>> from broqer import Value, op, Sink
>>> v = Value()
>>> throttle_publisher = v | op.Throttle(0.1)
>>> _d = throttle_publisher.subscribe(Sink(print))
>>> v.emit(1)
1
>>> v.emit(2)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.05))
>>> v.emit(3)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.2))
3
>>> # It's also possible to reset the throttling duration:
>>> v.emit(4)
4
>>> v.emit(5)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.05))
>>> throttle_publisher.reset()
"""
import asyncio
import sys
from typing import Any # noqa: F401

from broqer import Publisher, default_error_handler, NONE

from broqer.operator import Operator, OperatorFactory


class AppliedThrottle(Operator):
""" Rate limit emits by the given time.
:param duration: time for throttling in seconds
:param error_callback: the error callback to be registered
:param loop: asyncio event loop to use
"""
def __init__(self, publisher: Publisher, duration: float,
error_callback=default_error_handler, loop=None) -> None:

Operator.__init__(self, publisher)

self._duration = duration
self._loop = loop or asyncio.get_event_loop()
self._call_later_handler = None # type: asyncio.Handle
self._last_state = NONE # type: Any
self._error_callback = error_callback

def get(self):
return Publisher.get(self)

def emit(self, value: Any, who: Publisher) -> None:
if who is not self._orginator:
raise ValueError('Emit from non assigned publisher')

if self._call_later_handler is None:
self._last_state = value
self._wait_done_cb()
else:
self._last_state = value

def _wait_done_cb(self):
if self._last_state is not NONE:
try:
Publisher.notify(self, self._last_state)
except Exception: # pylint: disable=broad-except
self._error_callback(*sys.exc_info())
self._last_state = NONE
self._call_later_handler = self._loop.call_later(
self._duration, self._wait_done_cb)
else:
self._call_later_handler = None

def reset(self):
""" Reseting duration for throttling """
if self._call_later_handler is not None:
self._call_later_handler.cancel()
self._call_later_handler = None


class Throttle(OperatorFactory): # pylint: disable=too-few-public-methods
""" Apply throttling to each emitted value.
:param duration: time for throttling in seconds
"""
def __init__(self, duration: float) -> None:
if duration < 0:
raise ValueError('Duration has to be bigger than zero')

self._duration = duration

def apply(self, publisher: Publisher):
return AppliedThrottle(publisher, self._duration)
138 changes: 138 additions & 0 deletions tests/test_op_throttle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
import asyncio
import pytest
from unittest import mock

from broqer import NONE, Sink, Publisher, op
from broqer.op import Throttle

from .eventloop import VirtualTimeEventLoop


@pytest.yield_fixture()
def event_loop():
loop = VirtualTimeEventLoop()
yield loop
loop.close()


@pytest.mark.asyncio
async def test_throttle_errorhandler(event_loop):
from broqer import default_error_handler

p = Publisher()
mock_sink = mock.Mock()
mock_error_handler = mock.Mock()

default_error_handler.set(mock_error_handler)

throttle = p | op.Throttle(0.1)
disposable = throttle.subscribe(Sink(mock_sink))

mock_sink.side_effect = (None, ZeroDivisionError('FAIL'))

# test error_handler
p.notify(1)
await asyncio.sleep(0.05, loop=event_loop)
mock_sink.assert_called_once_with(1)
p.notify(2)
await asyncio.sleep(0.1, loop=event_loop)
mock_error_handler.assert_called_once_with(ZeroDivisionError, mock.ANY, mock.ANY)
mock_sink.assert_has_calls((mock.call(1), mock.call(2)))

mock_sink.reset_mock()


@pytest.mark.asyncio
async def test_throttle_unsubscribe(event_loop):
p = Publisher()
mock_sink = mock.Mock()

throttle = p | op.Throttle(0.1)
disposable = throttle.subscribe(Sink(mock_sink))

# test subscription and unsubscribe
p.notify(2)
mock_sink.assert_called_once_with(2)

await asyncio.sleep(0.05, loop=event_loop)
mock_sink.reset_mock()

disposable.dispose()
await asyncio.sleep(0.1, loop=event_loop)

# dispose must not emit anything
mock_sink.assert_not_called()

p.notify(3)

await asyncio.sleep(0.1, loop=event_loop)

# after dispose was called, p.notify must not emit to mock_sink
mock_sink.assert_not_called()


@pytest.mark.asyncio
async def test_throttle_reset(event_loop):
p = Publisher()
mock_sink = mock.Mock()

throttle = p | op.Throttle(0.1)
disposable = throttle.subscribe(Sink(mock_sink))

p.notify(1)
await asyncio.sleep(0.05, loop=event_loop)
throttle.reset()
p.notify(3)

await asyncio.sleep(0.05, loop=event_loop)

# reset is called after "1" was emitted
mock_sink.assert_has_calls((mock.call(1), mock.call(3)))

## wait until initial state is set and reset mock
await asyncio.sleep(0.1, loop=event_loop)
mock_sink.reset_mock()

p.notify(1)
await asyncio.sleep(0.05, loop=event_loop)
p.notify(2)
throttle.reset()
p.notify(3)

await asyncio.sleep(0.05, loop=event_loop)

# reset is called after "1" was emitted, and while "2" was hold back,
# therefore "1" and "3" are emitted, but "2" is ignored
mock_sink.assert_has_calls((mock.call(1), mock.call(3)))

disposable.dispose()


@pytest.mark.parametrize('emit_sequence, expected_emits', [
(((0, 0), (0.05, 1), (0.4, 2), (0.6, 3), (0.2, 4), (0.2, 5)),
(mock.call(0), mock.call(2), mock.call(3), mock.call(5))),
(((0.001, 0), (0.6, 1), (0.5, 2), (0.05, 3), (0.44, 4)),
(mock.call(0), mock.call(1), mock.call(2), mock.call(4))),
])
@pytest.mark.asyncio
async def test_throttle(event_loop, emit_sequence, expected_emits):
p = Publisher()
mock_sink = mock.Mock()

throttle = p | op.Throttle(0.5)
disposable = throttle.subscribe(Sink(mock_sink))

mock_sink.assert_not_called()

for item in emit_sequence:
await asyncio.sleep(item[0], loop=event_loop)
p.notify(item[1])

await asyncio.sleep(0.5, loop=event_loop)

mock_sink.assert_has_calls(expected_emits)


def test_argument_check():
with pytest.raises(ValueError):
Throttle(-1)

0 comments on commit e41fd87

Please sign in to comment.