-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
c2223d5
commit c1c3794
Showing
4 changed files
with
172 additions
and
0 deletions.
There are no files selected for viewing
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
from collections import OrderedDict | ||
from itertools import count | ||
from time import time | ||
from typing import Any | ||
from typing import Dict | ||
from typing import Iterable | ||
from typing import Optional | ||
|
||
# TODO: There needs to be two versions, the one with a fixed TTL and the other with | ||
# varying TTL, for which priority queue may be necessary. | ||
|
||
|
||
class TimeAwareSink: | ||
"""A set with an object expiry time. | ||
This is a set which holds objects with expiry time. | ||
Args: | ||
arg: An iterable from which to initialize a set. | ||
ttl: The time to live in seconds. | ||
""" | ||
|
||
def __init__( | ||
self, arg: Optional[Iterable[Any]] = None, ttl: float = 10, eviction_method=None | ||
): | ||
self._dic: Dict[Any, int] = OrderedDict() | ||
self.ttl = ttl | ||
self.eviction_method = NotImplemented | ||
if arg: | ||
for i in arg: | ||
self.add(i) | ||
|
||
def __contains__(self, item: Any) -> bool: | ||
return self.has(item) | ||
|
||
def __len__(self) -> int: | ||
self.trim() | ||
return len(self._dic) | ||
|
||
def _set_expire(self, item: Any): | ||
t = time() | ||
expire = t + self.ttl | ||
if t < expire: | ||
self._dic[item] = expire | ||
elif item in self._dic: | ||
del self._dic[item] | ||
|
||
def add(self, item: Any): | ||
"""Add an item to a set.""" | ||
if item in self._dic: | ||
self._dic.move_to_end(item) | ||
else: | ||
self.trim(1) | ||
self._set_expire(item) | ||
|
||
def has(self, item: Any) -> bool: | ||
"""Test if an item in a set.""" | ||
return item in self._dic | ||
|
||
def remove(self, item: Any): | ||
"""Remove an item from a set.""" | ||
try: | ||
self._dic.pop(item) | ||
except KeyError: | ||
raise KeyError("item not found") | ||
|
||
def touch(self, item: Any): | ||
"""Touch an item in a set. | ||
Raises: | ||
KeyError: If an item is not in a set. | ||
""" | ||
if item not in self._dic: | ||
raise KeyError("item not found") | ||
self._dic.move_to_end(item) | ||
self._set_expire(item) | ||
|
||
def trim(self, n: Optional[int] = None): | ||
"""Trim a set by ejecting expired item(s) from a set. | ||
Args: | ||
n: The max number of items to eject. | ||
""" | ||
rng = range(0, n) if n else count() | ||
for _ in rng: | ||
try: | ||
item = next(iter(self._dic)) | ||
except StopIteration: | ||
break | ||
expire = self._dic[item] | ||
if expire > time(): | ||
break | ||
del self._dic[item] |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
from time import sleep | ||
|
||
import pytest | ||
|
||
from pyaides.collections.time_aware_sink import TimeAwareSink | ||
|
||
|
||
class TestTimeAwareSink: | ||
def test_basic(self): | ||
chars = "abcde" | ||
sink = TimeAwareSink(chars, ttl=1) | ||
assert len(sink) == len(chars) | ||
|
||
def test_expire(self): | ||
chars = "abcde" | ||
sink = TimeAwareSink(chars, ttl=-1) | ||
assert len(sink) == 0 | ||
|
||
def test_in(self): | ||
chars = "abcde" | ||
sink = TimeAwareSink(chars, ttl=1) | ||
for c in chars: | ||
assert c in sink | ||
|
||
def test_not_in(self): | ||
chars = "abcde" | ||
sink = TimeAwareSink(chars, ttl=-1) | ||
for c in chars: | ||
assert c not in sink | ||
|
||
def test_add(self): | ||
sink = TimeAwareSink(ttl=1) | ||
sink.add("a") | ||
assert "a" in sink | ||
|
||
def test_add_keepalive(self): | ||
sink = TimeAwareSink(ttl=0.05) | ||
for _ in range(10): | ||
sink.add("a") | ||
sleep(0.05) | ||
assert "a" in sink | ||
|
||
def test_has(self): | ||
key = "a" | ||
sink = TimeAwareSink(key, ttl=1) | ||
assert (key in sink) is sink.has(key) | ||
|
||
def test_has_not(self): | ||
key = "a" | ||
sink = TimeAwareSink() | ||
assert (key not in sink) is not sink.has(key) | ||
|
||
def test_remove(self): | ||
key = "a" | ||
sink = TimeAwareSink(key, ttl=1) | ||
assert key in sink | ||
sink.remove(key) | ||
assert key not in sink | ||
|
||
def test_remove_nonexisting(self): | ||
sink = TimeAwareSink("a", ttl=-1) | ||
with pytest.raises(KeyError): | ||
sink.remove("a") | ||
|
||
def test_touch(self): | ||
sink = TimeAwareSink("a", ttl=0.1) | ||
sink.touch("a") | ||
|
||
def test_touch_with_missing_key(self): | ||
sink = TimeAwareSink() | ||
with pytest.raises(KeyError): | ||
sink.touch("a") | ||
|
||
def test_trim(self): | ||
sink = TimeAwareSink("abc", ttl=0.1) | ||
assert len(sink._dic) == 3 | ||
sleep(0.2) | ||
sink.trim() | ||
assert len(sink._dic) == 0 |