Skip to content

Commit

Permalink
add Timer
Browse files Browse the repository at this point in the history
  • Loading branch information
semiversus committed May 3, 2021
1 parent 588e554 commit 449a0c1
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 22 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 2.3.3

* added `broqer.Timer` class and rewrite `op.Throttle` to use that class

## 2.3.2

* added `PollPublisher`
Expand Down
43 changes: 21 additions & 22 deletions broqer/op/throttle.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@
"""
import asyncio
import sys
from typing import Any, Optional # noqa: F401
from typing import Any # noqa: F401

from broqer import Publisher, default_error_handler, NONE

from broqer.operator import Operator, OperatorFactory
from broqer.timer import Timer


class AppliedThrottle(Operator):
Expand All @@ -42,8 +43,7 @@ def __init__(self, publisher: Publisher, duration: float,

self._duration = duration
self._loop = loop or asyncio.get_event_loop()
self._call_later_handler = None # type: Optional[asyncio.Handle]
self._last_state = NONE # type: Any
self._timer = Timer(self._delayed_emit_cb, loop=loop)
self._error_callback = error_callback

def get(self):
Expand All @@ -53,29 +53,28 @@ def emit(self, value: Any, who: Publisher) -> None:
if who is not self._orginator:
raise ValueError('Emit from non assigned publisher')

if self._call_later_handler is None:
self._last_state = value
self._wait_done_cb()
if not self._timer.is_running():
self._timer.start(timeout=0, args=(value,))
else:
self._last_state = value

def _wait_done_cb(self):
if self._last_state is not NONE:
try:
Publisher.notify(self, self._last_state)
except Exception: # pylint: disable=broad-except
self._error_callback(*sys.exc_info())
self._last_state = NONE
self._call_later_handler = self._loop.call_later(
self._duration, self._wait_done_cb)
else:
self._call_later_handler = None
self._timer.change_arguments(args=(value,))

def _delayed_emit_cb(self, value=NONE):
if value is NONE:
# since the last emit the given duration has passed without another
# emit
return

try:
Publisher.notify(self, value)
except Exception: # pylint: disable=broad-except
self._error_callback(*sys.exc_info())

self._timer.start(self._duration)


def reset(self):
""" Reseting duration for throttling """
if self._call_later_handler is not None:
self._call_later_handler.cancel()
self._call_later_handler = None
self._timer.cancel()


class Throttle(OperatorFactory): # pylint: disable=too-few-public-methods
Expand Down
71 changes: 71 additions & 0 deletions broqer/timer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
""" Asynchronous timer object """
import asyncio
from typing import Callable, Optional


class Timer:
""" The timer object is used to have an abstraction for handling time
dependend functionality. The timer works non-periodically.
:param callback: an optional callback function called when the time
`timeout` has passed after calling `.start()` or when
calling `.end_early()`
:param loop: optional asyncio event loop
"""
def __init__(self, callback: Optional[Callable[[], None]] = None,
loop: Optional[asyncio.BaseEventLoop] = None):
self._callback = callback
self._handle = None # type: asyncio.Handle
self._loop = loop or asyncio.get_event_loop()
self._args = None

def start(self, timeout: float, args=()) -> None:
""" start the timer with given timeout. Optional arguments for the
callback can be provided. When the timer is currently running, the timer
will be re-set to the new timeout.
:param timeout: time in seconds to the end of the timer
:param args: optional tuple with arguments for the callback
"""
if self._handle:
self._handle.cancel()

self._args = args

if timeout > 0:
self._handle = self._loop.call_later(timeout, self._trigger)
else:
self._trigger()

def change_arguments(self, args=()):
""" Will chance the arguments for the scheduled callback
:param args: Positional arguments
"""
self._args = args

def cancel(self) -> None:
""" Cancel the timer. An optional callback will not be called. """

if self._handle:
self._handle.cancel()
self._handle = None

def end_early(self) -> None:
""" immediate stopping the timer and call optional callback """
self._handle = None
if self._handle and self._callback:
self._callback(*self._args)

def is_running(self) -> bool:
""" tells if the timer is currently running
:returns: boolean, True when timer is running
"""
return self._handle is not None

def _trigger(self):
""" internal method called when timer is finished """
self._handle = None
if self._callback:
self._callback(*self._args)

0 comments on commit 449a0c1

Please sign in to comment.