Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions DEVELOPING.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,18 @@ store = RedisStore(url="redis://localhost:6379/0")
store = RedisStore(client=existing_redis_client)
```

### Valkey Store
High-performance Valkey-compatible store with native TTL support:

```python
from kv_store_adapter import ValkeyStore

# Connection options
store = ValkeyStore(host="localhost", port=6379, db=0, password="secret")
store = ValkeyStore(url="valkey://localhost:6379/0")
store = ValkeyStore(client=existing_valkey_client)
```

### Memory Store
In-memory TLRU (Time-aware Least Recently Used) cache:

Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ build-backend = "hatchling.build"
memory = ["cachetools>=6.0.0"]
disk = ["diskcache>=5.6.0"]
redis = ["redis>=6.0.0"]
valkey = ["valkey>=6.0.0"]
elasticsearch = ["elasticsearch>=9.0.0", "aiohttp>=3.12"]
pydantic = ["pydantic>=2.11.9"]

Expand All @@ -40,7 +41,7 @@ env_files = [".env"]

[dependency-groups]
dev = [
"kv-store-adapter[memory,disk,redis,elasticsearch]",
"kv-store-adapter[memory,disk,redis,valkey,elasticsearch]",
"kv-store-adapter[pydantic]",
"pytest",
"pytest-mock",
Expand Down
72 changes: 72 additions & 0 deletions src/kv_store_adapter/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,73 @@

"""KV Store Adapter - A pluggable interface for Key-Value Stores."""


def __getattr__(name: str):
"""Lazy import for optional store implementations."""
# Redis Store
if name == "RedisStore":
try:
from kv_store_adapter.stores.redis import RedisStore
return RedisStore
except ImportError as e:
raise ImportError(f"RedisStore requires redis to be installed: {e}") from e

# Valkey Store
elif name == "ValkeyStore":
try:
from kv_store_adapter.stores.valkey import ValkeyStore
return ValkeyStore
except ImportError as e:
raise ImportError(f"ValkeyStore requires valkey to be installed: {e}") from e

# Memory Store
elif name == "MemoryStore":
try:
from kv_store_adapter.stores.memory import MemoryStore
return MemoryStore
except ImportError as e:
raise ImportError(f"MemoryStore requires cachetools to be installed: {e}") from e

# Disk Store
elif name == "DiskStore":
try:
from kv_store_adapter.stores.disk import DiskStore
return DiskStore
except ImportError as e:
raise ImportError(f"DiskStore requires diskcache to be installed: {e}") from e

# Elasticsearch Store
elif name == "ElasticsearchStore":
try:
from kv_store_adapter.stores.elasticsearch import ElasticsearchStore
return ElasticsearchStore
except ImportError as e:
raise ImportError(f"ElasticsearchStore requires elasticsearch and aiohttp to be installed: {e}") from e

# Simple Stores
elif name == "SimpleStore":
from kv_store_adapter.stores.simple import SimpleStore
return SimpleStore
elif name == "SimpleJSONStore":
from kv_store_adapter.stores.simple import SimpleJSONStore
return SimpleJSONStore

# Null Store
elif name == "NullStore":
from kv_store_adapter.stores.null import NullStore
return NullStore

# If not found, raise AttributeError
raise AttributeError(f"module '{__name__}' has no attribute '{name}'")


__all__ = [
"RedisStore",
"ValkeyStore",
"MemoryStore",
"DiskStore",
"ElasticsearchStore",
"SimpleStore",
"SimpleJSONStore",
"NullStore",
]
3 changes: 3 additions & 0 deletions src/kv_store_adapter/stores/valkey/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .store import ValkeyStore

__all__ = ["ValkeyStore"]
158 changes: 158 additions & 0 deletions src/kv_store_adapter/stores/valkey/store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
from typing import Any, overload
from urllib.parse import urlparse

from typing_extensions import override
from valkey.asyncio import Valkey

from kv_store_adapter.errors import StoreConnectionError
from kv_store_adapter.stores.base.managed import BaseManagedKVStore
from kv_store_adapter.stores.utils.compound import compound_key, get_keys_from_compound_keys, uncompound_key
from kv_store_adapter.stores.utils.managed_entry import ManagedEntry


class ValkeyStore(BaseManagedKVStore):
"""Valkey-based key-value store."""

_client: Valkey

@overload
def __init__(self, *, client: Valkey) -> None: ...

@overload
def __init__(self, *, url: str) -> None: ...

@overload
def __init__(self, *, host: str = "localhost", port: int = 6379, db: int = 0, password: str | None = None) -> None: ...

def __init__(
self,
*,
client: Valkey | None = None,
url: str | None = None,
host: str = "localhost",
port: int = 6379,
db: int = 0,
password: str | None = None,
) -> None:
"""Initialize the Valkey store.

Args:
client: An existing Valkey client to use.
url: Valkey URL (e.g., valkey://localhost:6379/0).
host: Valkey host. Defaults to localhost.
port: Valkey port. Defaults to 6379.
db: Valkey database number. Defaults to 0.
password: Valkey password. Defaults to None.
"""
if client:
self._client = client
elif url:
parsed_url = urlparse(url)
self._client = Valkey(
host=parsed_url.hostname or "localhost",
port=parsed_url.port or 6379,
db=int(parsed_url.path.lstrip("/")) if parsed_url.path and parsed_url.path != "/" else 0,
password=parsed_url.password or password,
decode_responses=True,
)
else:
self._client = Valkey(
host=host,
port=port,
db=db,
password=password,
decode_responses=True,
)

super().__init__()

@override
async def setup(self) -> None:
if not await self._client.ping(): # pyright: ignore[reportUnknownMemberType]
raise StoreConnectionError(message="Failed to connect to Valkey")

@override
async def get_entry(self, collection: str, key: str) -> ManagedEntry | None:
combo_key: str = compound_key(collection=collection, key=key)

cache_entry: Any = await self._client.get(name=combo_key) # pyright: ignore[reportAny]

if cache_entry is None:
return None

if not isinstance(cache_entry, str):
return None

return ManagedEntry.from_json(json_str=cache_entry)

@override
async def put_entry(
self,
collection: str,
key: str,
cache_entry: ManagedEntry,
*,
ttl: float | None = None,
) -> None:
combo_key: str = compound_key(collection=collection, key=key)

json_value: str = cache_entry.to_json()

if ttl is not None:
# Valkey does not support <= 0 TTLs
ttl = max(int(ttl), 1)

_ = await self._client.setex(name=combo_key, time=ttl, value=json_value) # pyright: ignore[reportAny]
else:
_ = await self._client.set(name=combo_key, value=json_value) # pyright: ignore[reportAny]

@override
async def delete(self, collection: str, key: str) -> bool:
await self.setup_collection_once(collection=collection)

combo_key: str = compound_key(collection=collection, key=key)
return await self._client.delete(combo_key) != 0 # pyright: ignore[reportAny]

@override
async def keys(self, collection: str) -> list[str]:
await self.setup_collection_once(collection=collection)

pattern = compound_key(collection=collection, key="*")
compound_keys: list[str] = await self._client.keys(pattern) # pyright: ignore[reportUnknownMemberType, reportAny]

return get_keys_from_compound_keys(compound_keys=compound_keys, collection=collection)

@override
async def clear_collection(self, collection: str) -> int:
await self.setup_collection_once(collection=collection)

pattern = compound_key(collection=collection, key="*")

deleted_count: int = 0

async for key in self._client.scan_iter(name=pattern): # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType]
if not isinstance(key, str):
continue

deleted_count += await self._client.delete(key) # pyright: ignore[reportAny]

return deleted_count

@override
async def list_collections(self) -> list[str]:
await self.setup_once()

pattern: str = compound_key(collection="*", key="*")

collections: set[str] = set()

async for key in self._client.scan_iter(name=pattern): # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType]
if not isinstance(key, str):
continue

collections.add(uncompound_key(key=key)[0])

return list[str](collections)

@override
async def cull(self) -> None: ...
Empty file added tests/stores/valkey/__init__.py
Empty file.
88 changes: 88 additions & 0 deletions tests/stores/valkey/test_valkey.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import asyncio
from collections.abc import AsyncGenerator

import pytest
from typing_extensions import override
from valkey.asyncio import Valkey

from kv_store_adapter.stores.base.unmanaged import BaseKVStore
from kv_store_adapter.stores.valkey import ValkeyStore
from tests.stores.conftest import BaseStoreTests

# Valkey test configuration
VALKEY_HOST = "localhost"
VALKEY_PORT = 6379
VALKEY_DB = 15 # Use a separate database for tests

WAIT_FOR_VALKEY_TIMEOUT = 30


async def ping_valkey() -> bool:
client = Valkey(host=VALKEY_HOST, port=VALKEY_PORT, db=VALKEY_DB, decode_responses=True)
try:
return await client.ping() # pyright: ignore[reportUnknownMemberType, reportAny]
except Exception:
return False


async def wait_valkey() -> bool:
# with a timeout of 30 seconds
for _ in range(WAIT_FOR_VALKEY_TIMEOUT):
if await ping_valkey():
return True
await asyncio.sleep(delay=1)

return False


class ValkeyFailedToStartError(Exception):
pass


class TestValkeyStore(BaseStoreTests):
@pytest.fixture(autouse=True, scope="session")
async def setup_valkey(self) -> AsyncGenerator[None, None]:
_ = await asyncio.create_subprocess_exec("docker", "stop", "valkey-test")
_ = await asyncio.create_subprocess_exec("docker", "rm", "-f", "valkey-test")

process = await asyncio.create_subprocess_exec("docker", "run", "-d", "--name", "valkey-test", "-p", "6379:6379", "valkey/valkey")
_ = await process.wait()
if not await wait_valkey():
msg = "Valkey failed to start"
raise ValkeyFailedToStartError(msg)
try:
yield
finally:
_ = await asyncio.create_subprocess_exec("docker", "rm", "-f", "valkey-test")

@override
@pytest.fixture
async def store(self, setup_valkey: ValkeyStore) -> ValkeyStore:
"""Create a Valkey store for testing."""
# Create the store with test database
valkey_store = ValkeyStore(host=VALKEY_HOST, port=VALKEY_PORT, db=VALKEY_DB)
_ = await valkey_store._client.flushdb() # pyright: ignore[reportPrivateUsage, reportUnknownMemberType]
return valkey_store

async def test_valkey_url_connection(self):
"""Test Valkey store creation with URL."""
valkey_url = f"valkey://{VALKEY_HOST}:{VALKEY_PORT}/{VALKEY_DB}"
store = ValkeyStore(url=valkey_url)
_ = await store._client.flushdb() # pyright: ignore[reportPrivateUsage, reportUnknownMemberType]
await store.put(collection="test", key="url_test", value={"test": "value"})
result = await store.get(collection="test", key="url_test")
assert result == {"test": "value"}

async def test_valkey_client_connection(self):
"""Test Valkey store creation with existing client."""
client = Valkey(host=VALKEY_HOST, port=VALKEY_PORT, db=VALKEY_DB, decode_responses=True)
store = ValkeyStore(client=client)

_ = await store._client.flushdb() # pyright: ignore[reportPrivateUsage, reportUnknownMemberType]
await store.put(collection="test", key="client_test", value={"test": "value"})
result = await store.get(collection="test", key="client_test")
assert result == {"test": "value"}

@pytest.mark.skip(reason="Distributed Caches are unbounded")
@override
async def test_not_unbounded(self, store: BaseKVStore): ...