Skip to content

Commit

Permalink
✨ (store) add stream method for file-like store
Browse files Browse the repository at this point in the history
  • Loading branch information
simonwoerpel committed Mar 19, 2024
1 parent cc4ffec commit f5c80aa
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 3 deletions.
5 changes: 4 additions & 1 deletion anystore/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,12 @@ def cli_keys(
prefix: Annotated[Optional[str], typer.Argument(..., help="Key prefix")] = None,
o: Annotated[str, typer.Option("-o", help="Output uri")] = "-",
):
"""
Iterate keys in given store
"""
with ErrorHandler():
S = get_store(uri=state["uri"], use_pickle=state["pickle"])
keys = "\n".join(S.iterate_keys(prefix))
keys = "\n".join(S.iterate_keys(prefix)) + "\n"
smart_write(o, keys.encode())


Expand Down
22 changes: 22 additions & 0 deletions anystore/store/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ def _read(self, key: Uri, raise_on_nonexist: bool | None = True, **kwargs) -> An
"""
raise NotImplementedError

def _stream(self, key: Uri, raise_on_nonexist: bool | None = True, **kwargs) -> Any:
"""
Stream key line by line from actual backend (for file-like powered backend)
"""
raise NotImplementedError

def _get_key_prefix(self) -> str:
"""
Get backend specific key prefix
Expand Down Expand Up @@ -64,6 +70,22 @@ def get(
raise DoesNotExist(f"Key does not exist: `{key}`")
return None

def stream(
self,
key: Uri,
raise_on_nonexist: bool | None = None,
serialization_mode: Mode | None = None,
**kwargs,
) -> Generator[Any, None, None]:
key = self.get_key(key)
try:
for line in self._stream(key, raise_on_nonexist, **kwargs):
yield from_store(line, serialization_mode)
except FileNotFoundError: # fsspec
if raise_on_nonexist:
raise DoesNotExist(f"Key does not exist: `{key}`")
return None

def put(
self, key: Uri, value: Any, serialization_mode: Mode | None = None, **kwargs
):
Expand Down
12 changes: 10 additions & 2 deletions anystore/store/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import fsspec
from banal import ensure_dict

from anystore.io import smart_read, smart_write
from anystore.io import smart_read, smart_stream, smart_write
from anystore.exceptions import DoesNotExist
from anystore.store.base import BaseStore
from anystore.types import Uri, Value
Expand All @@ -27,7 +27,15 @@ def _read(
raise DoesNotExist(f"Key does not exist: `{key}`")
return None

# def _exists(self, key: Uri) -> bool:
def _stream(
self, key: Uri, raise_on_nonexist: bool | None = True, **kwargs
) -> Generator[Value, None, None]:
try:
yield from smart_stream(str(key), **kwargs)
except FileNotFoundError:
if raise_on_nonexist:
raise DoesNotExist(f"Key does not exist: `{key}`")

def _get_key_prefix(self) -> str:
return str(self.uri).rstrip("/")

Expand Down
10 changes: 10 additions & 0 deletions tests/test_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ def test_store_fs(tmp_path, fixtures_path):
assert (tmp_path / "foo/bar/baz").exists()
assert store.get("/bar/baz") == 1

# stream
store = Store(uri=fixtures_path)
tested = False
for ix, line in enumerate(store.stream("lorem.txt", mode="r")):
if ix == 1:
assert line.startswith("tempor")
tested = True
break
assert tested


def test_store_intialize(fixtures_path):
# initialize (take env vars into account)
Expand Down

0 comments on commit f5c80aa

Please sign in to comment.