Skip to content

Commit

Permalink
✨ (store) Iterate keys
Browse files Browse the repository at this point in the history
  • Loading branch information
simonwoerpel committed Mar 6, 2024
1 parent d7fdbdb commit dcb05f9
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 9 deletions.
13 changes: 11 additions & 2 deletions anystore/store/base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any
from typing import Any, Generator

from pydantic import field_validator

Expand Down Expand Up @@ -37,6 +37,12 @@ def _get_key_prefix(self) -> str:
"""
raise NotImplementedError

def _iterate_keys(self, prefix: str | None = None) -> Generator[str, None, None]:
"""
Backend specific key iterator
"""
raise NotImplementedError

def get(
self,
key: Uri,
Expand Down Expand Up @@ -71,7 +77,10 @@ def ensure_kwargs(self, **kwargs) -> dict[str, Any]:
return {**config, **clean_dict(kwargs)}

def get_key(self, key: Uri) -> str:
return f"{self._get_key_prefix()}/{str(key).lstrip('/')}"
return f"{self._get_key_prefix()}/{str(key)}".strip("/")

def iterate_keys(self, prefix: str | None = None) -> Generator[str, None, None]:
yield from self._iterate_keys(prefix)

@field_validator("uri", mode="before")
@classmethod
Expand Down
16 changes: 15 additions & 1 deletion anystore/store/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@
Store backend using any file-like location usable via `fsspec`
"""

from typing import Generator

import fsspec
from banal import ensure_dict

from anystore.io import smart_read, smart_write
from anystore.exceptions import DoesNotExist
from anystore.store.base import BaseStore
Expand All @@ -22,5 +27,14 @@ def _read(
raise DoesNotExist(f"Key does not exist: `{key}`")
return None

# def _exists(self, key: Uri) -> bool:
def _get_key_prefix(self) -> str:
return str(self.uri)
return str(self.uri).rstrip("/")

def _iterate_keys(self, prefix: str | None = None) -> Generator[str, None, None]:
path = self.get_key(prefix or "")
mapper = fsspec.get_mapper(path, **ensure_dict(self.backend_config))
for key in mapper.keys():
if prefix:
key = f"{prefix}/{key}"
yield key
14 changes: 12 additions & 2 deletions anystore/store/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
Store backend using redis-like stores such as Redis, Fakeredis or Apache Kvrocks
"""

from typing import Any
from typing import Any, Generator
import logging
import os
from functools import cache
Expand Down Expand Up @@ -51,4 +51,14 @@ def _read(self, key: Uri, raise_on_nonexist: bool | None = True, **kwargs) -> An
return res

def _get_key_prefix(self) -> str:
return ""
if self.backend_config is not None:
return self.backend_config.get("redis_prefix") or "anystore"
return "anystore"

def _iterate_keys(self, prefix: str | None = None) -> Generator[str, None, None]:
con = get_redis(self.uri)
prefix = self.get_key(prefix or "") + "*"
key_prefix = self._get_key_prefix()
for key in con.scan_iter(prefix):
key = key.decode()
yield key[len(key_prefix) + 1 :]
22 changes: 18 additions & 4 deletions anystore/store/sql.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from contextlib import contextmanager
from functools import cache
from typing import Generator, Optional, Union

Expand Down Expand Up @@ -73,6 +72,7 @@ def make_table(name: str, metadata: MetaData) -> Table:


class SqlStore(BaseStore):
_engine: Engine | None = None
_conn: Connection | None = None
_insert: Insert | None = None
_table: Table | None = None
Expand All @@ -89,6 +89,7 @@ def __init__(self, **data):
metadata.create_all(engine, tables=[table], checkfirst=True)
self._insert = get_insert(engine)
self._table = table
self._engine = engine
self._conn = engine.connect()
self._sqlite = "sqlite" in engine.name.lower()

Expand All @@ -98,9 +99,7 @@ def _write(self, key: Uri, value: Value, **kwargs) -> None:
exists = select(self._table).where(self._table.c.key == key)
if self._conn.execute(exists).first():
stmt = (
update(self._table)
.where(self._table.c.key == key)
.values(value=value)
update(self._table).where(self._table.c.key == key).values(value=value)
)
else:
stmt = insert(self._table).values(key=key, value=value)
Expand All @@ -122,3 +121,18 @@ def _read(self, key: Uri, raise_on_nonexist: bool | None = True, **kwargs) -> Va

def _get_key_prefix(self) -> str:
return ""

def _iterate_keys(self, prefix: str | None = None) -> Generator[str, None, None]:
table = self._table
key = self.get_key(prefix or "")
if not prefix:
stmt = select(table.c.key)
else:
key = f"{key}%"
stmt = select(table.c.key).where(table.c.key.like(key))
with self._engine.connect() as conn:
conn = conn.execution_options(stream_results=True)
cursor = conn.execute(stmt)
while rows := cursor.fetchmany(10_000):
for row in rows:
yield row[0]
11 changes: 11 additions & 0 deletions tests/test_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,17 @@ def _test_store(uri: str) -> bool:
with pytest.raises(DoesNotExist):
store.get("nothing")
assert store.get("nothing", raise_on_nonexist=False) is None

# iterate
keys = [k for k in store.iterate_keys()]
assert len(keys) == 3
# assert all(store.exists(k) for k in keys)
keys = [k for k in store.iterate_keys("foo")]
assert keys[0] == "foo/bar/baz"
assert len(keys) == 1
keys = [k for k in store.iterate_keys("foo/bar")]
assert len(keys) == 1
assert keys[0] == "foo/bar/baz"
return True


Expand Down

0 comments on commit dcb05f9

Please sign in to comment.