-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
c2223d5
commit b1a2fbf
Showing
4 changed files
with
104 additions
and
0 deletions.
There are no files selected for viewing
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
from collections import OrderedDict | ||
from itertools import count | ||
from time import time | ||
from typing import Any | ||
from typing import Dict | ||
from typing import Optional | ||
|
||
# TODO: There needs to be two versions, the one with a fixed TTL and the other with | ||
# varying TTL, for which priority queue may be necessary. | ||
|
||
|
||
class TimeAwareSink: | ||
def __init__(self, arg=None, ttl: int = 10, eviction_method=None): | ||
self._dic: Dict[Any, int] = OrderedDict() | ||
self.ttl = ttl | ||
self.eviction_method = NotImplemented | ||
if arg: | ||
for i in arg: | ||
self.add(i) | ||
|
||
def __contains__(self, item: Any) -> bool: | ||
return self.has(item) | ||
|
||
def __len__(self) -> int: | ||
self.trim() | ||
return len(self._dic) | ||
|
||
def _set_expire(self, item: Any, ttl: Optional[int] = None): | ||
t = time() | ||
expire = t + (ttl or self.ttl) | ||
if t < expire: | ||
self._dic[item] = expire | ||
elif item in self._dic: | ||
del self._dic[item] | ||
|
||
def add(self, item: Any, ttl: Optional[int] = None): | ||
if item in self._dic: | ||
self._dic.move_to_end(item) | ||
else: | ||
self.trim(1) | ||
self._set_expire(item, ttl) | ||
|
||
def has(self, item: Any) -> bool: | ||
return item in self._dic | ||
|
||
def remove(self, item: Any): | ||
try: | ||
self._dic.pop(item) | ||
except KeyError: | ||
raise ValueError("item not found") | ||
|
||
def touch(self, item: Any, ttl: Optional[int] = None): | ||
if item not in self._dic: | ||
raise KeyError("item not found") | ||
self._dic.move_to_end(item) | ||
self._set_expire(item, ttl) | ||
|
||
def trim(self, n: Optional[int] = None): | ||
rng = range(0, n) if n else count() | ||
for _ in rng: | ||
try: | ||
item = next(iter(self._dic)) | ||
except StopIteration: | ||
break | ||
expire = self._dic[item] | ||
if expire > time(): | ||
break | ||
del self._dic[item] |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
import pytest | ||
|
||
from pyaides.collections.time_aware_sink import TimeAwareSink | ||
|
||
|
||
class TestTimeAwareSink: | ||
def test_basic(self): | ||
chars = "abcde" | ||
sink = TimeAwareSink(chars, ttl=3600) | ||
assert len(sink) == len(chars) | ||
|
||
def test_expire(self): | ||
chars = "abcde" | ||
sink = TimeAwareSink(chars, ttl=-3600) | ||
assert len(sink) == 0 | ||
|
||
def test_in(self): | ||
chars = "abcde" | ||
sink = TimeAwareSink(chars, ttl=3600) | ||
for c in chars: | ||
assert c in sink | ||
|
||
def test_not_in(self): | ||
chars = "abcde" | ||
sink = TimeAwareSink(chars, ttl=-3600) | ||
for c in chars: | ||
assert c not in sink | ||
|
||
def test_touch(self): | ||
sink = TimeAwareSink("a", ttl=2) | ||
sink.touch("a") | ||
|
||
def test_touch_with_missing_key(self): | ||
sink = TimeAwareSink() | ||
with pytest.raises(KeyError): | ||
sink.touch("a") |