Skip to content

Commit

Permalink
add Cache
Browse files Browse the repository at this point in the history
  • Loading branch information
semiversus committed Jan 14, 2021
1 parent e41fd87 commit a39fc09
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 4 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
## 2.3.0

* added `Cache` operator

## 2.2.0

* added `Throttle` operator

## 2.1.0

* .reset_state is now calling .reset_state for all subscribers
Expand Down
6 changes: 4 additions & 2 deletions broqer/op/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@
from broqer.op.map_async import MapAsync, build_map_async, \
build_map_async_factory, AsyncMode
from broqer.op.bitwise import BitwiseCombineLatest, map_bit
from broqer.op.cache import Cache
from broqer.op.throttle import Throttle

# utils
from broqer.op.concat import Concat
from broqer.op.throttle import Throttle

# enable operator overloading
from .py_operators import Str, Bool, Int, Float, Repr, Len, In, All, Any, \
Expand All @@ -23,5 +24,6 @@
'EvalFalse', 'build_map', 'build_map_factory', 'build_combine_latest',
'build_filter', 'build_filter_factory', 'Concat', 'Str', 'Bool', 'Int',
'Float', 'Repr', 'map_bit', 'build_map_async_factory',
'Len', 'In', 'All', 'Any', 'BitwiseAnd', 'BitwiseOr', 'Not', 'Throttle'
'Len', 'In', 'All', 'Any', 'BitwiseAnd', 'BitwiseOr', 'Not', 'Throttle',
'Cache'
]
68 changes: 68 additions & 0 deletions broqer/op/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
"""
Cache the latest emit - the result is suppressing multiple emits with the same
value. Also initialization can be defined in the case the source publisher does
not emit on subscription.
Usage:
>>> from broqer import Value, op, Sink
>>> s = Value(1)
>>> cached_publisher = s | op.Cache()
>>> _disposable = cached_publisher.subscribe(Sink(print))
1
>>> s.emit(2)
2
>>> s.emit(2)
>>> _disposable.dispose()
Using the initial value for cache:
>>> from broqer import Value, op, Sink
>>> s = Value()
>>> cached_publisher = s | op.Cache(1)
>>> _disposable = cached_publisher.subscribe(Sink(print))
1
>>> s.emit(1)
>>> s.emit(2)
2
>>> _disposable.dispose()
"""
from typing import Any

from broqer import Publisher, NONE
from broqer.publisher import TValue
from broqer.operator import Operator, OperatorFactory


class AppliedCache(Operator):
""" Cache object applied to publisher (see Map) """
def __init__(self, publisher: Publisher, init: Any = NONE) -> None:
Operator.__init__(self, publisher)
self._state = init

def get(self) -> TValue:
return self._orginator.get()

def emit(self, value: TValue, who: Publisher) -> None:
if who is not self._orginator:
raise ValueError('Emit from non assigned publisher')

if value != self._state:
self._state = value
return Publisher.notify(self, value)

return None


class Cache(OperatorFactory): # pylint: disable=too-few-public-methods
""" Cache emitted values. Suppress duplicated value emits.
:param init: optional initialization state
"""
def __init__(self, init: Any = NONE) -> None:
self._init = init

def apply(self, publisher: Publisher):
return AppliedCache(publisher, self._init)
4 changes: 2 additions & 2 deletions broqer/op/throttle.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
"""
import asyncio
import sys
from typing import Any # noqa: F401
from typing import Any, Optional # noqa: F401

from broqer import Publisher, default_error_handler, NONE

Expand All @@ -42,7 +42,7 @@ def __init__(self, publisher: Publisher, duration: float,

self._duration = duration
self._loop = loop or asyncio.get_event_loop()
self._call_later_handler = None # type: asyncio.Handle
self._call_later_handler = None # type: Optional[asyncio.Handle]
self._last_state = NONE # type: Any
self._error_callback = error_callback

Expand Down

0 comments on commit a39fc09

Please sign in to comment.