diff --git a/DEVELOPING.md b/DEVELOPING.md index 1db0721e..2131e025 100644 --- a/DEVELOPING.md +++ b/DEVELOPING.md @@ -98,6 +98,22 @@ store = DiskStore(path="/path/to/cache", size_limit=1024*1024*1024) # 1GB store = DiskStore(cache=existing_cache_instance) ``` +### Memcached Store +High-performance distributed caching with Memcached: + +```python +from kv_store_adapter import MemcachedStore + +# Connection options +store = MemcachedStore(host="localhost", port=11211) +store = MemcachedStore(client=existing_pymemcache_client) +``` + +**Limitations:** +- `keys()`, `clear_collection()`, and `list_collections()` return empty results due to Memcached's lack of key enumeration +- Keys longer than 250 characters are automatically hashed with MD5 +- TTL is handled natively by Memcached + ### Elasticsearch Store Full-text searchable storage with Elasticsearch: diff --git a/README.md b/README.md index 78f7f767..541b2884 100644 --- a/README.md +++ b/README.md @@ -61,6 +61,7 @@ Choose the store that best fits your needs. All stores implement the same `KVSto ### Production Stores - **RedisStore**: `RedisStore(url="redis://localhost:6379/0")` +- **MemcachedStore**: `MemcachedStore(host="localhost", port=11211)` - **ElasticsearchStore**: `ElasticsearchStore(url="https://localhost:9200", api_key="your-api-key")` - **DiskStore**: A sqlite-based store for local persistence `DiskStore(path="./cache")` - **MemoryStore**: A fast in-memory cache `MemoryStore()` diff --git a/pyproject.toml b/pyproject.toml index 92b4117d..087e8621 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,6 +27,7 @@ memory = ["cachetools>=6.0.0"] disk = ["diskcache>=5.6.0"] redis = ["redis>=6.0.0"] elasticsearch = ["elasticsearch>=9.0.0", "aiohttp>=3.12"] +memcached = ["pymemcache>=4.0.0"] pydantic = ["pydantic>=2.11.9"] [tool.pytest.ini_options] @@ -40,7 +41,7 @@ env_files = [".env"] [dependency-groups] dev = [ - "kv-store-adapter[memory,disk,redis,elasticsearch]", + "kv-store-adapter[memory,disk,redis,elasticsearch,memcached]", "kv-store-adapter[pydantic]", "pytest", "pytest-mock", diff --git a/src/kv_store_adapter/stores/memcached/__init__.py b/src/kv_store_adapter/stores/memcached/__init__.py new file mode 100644 index 00000000..a70097ac --- /dev/null +++ b/src/kv_store_adapter/stores/memcached/__init__.py @@ -0,0 +1,3 @@ +from .store import MemcachedStore + +__all__ = ["MemcachedStore"] diff --git a/src/kv_store_adapter/stores/memcached/store.py b/src/kv_store_adapter/stores/memcached/store.py new file mode 100644 index 00000000..8d2c8c2d --- /dev/null +++ b/src/kv_store_adapter/stores/memcached/store.py @@ -0,0 +1,154 @@ +import hashlib +from typing import Any, overload + +from pymemcache.client.base import Client +from typing_extensions import override + +from kv_store_adapter.errors import StoreConnectionError +from kv_store_adapter.stores.base.managed import BaseManagedKVStore +from kv_store_adapter.stores.utils.compound import compound_key +from kv_store_adapter.stores.utils.managed_entry import ManagedEntry + +# Memcached key length limit +MEMCACHED_MAX_KEY_LENGTH = 250 + + +class MemcachedStore(BaseManagedKVStore): + """Memcached-based key-value store.""" + + _client: Client + + @overload + def __init__(self, *, client: Client) -> None: ... + + @overload + def __init__(self, *, host: str = "localhost", port: int = 11211) -> None: ... + + def __init__( + self, + *, + client: Client | None = None, + host: str = "localhost", + port: int = 11211, + ) -> None: + """Initialize the Memcached store. + + Args: + client: An existing pymemcache Client to use. + host: Memcached host. Defaults to localhost. + port: Memcached port. Defaults to 11211. + """ + if client: + self._client = client + else: + self._client = Client((host, port)) + + super().__init__() + + def _get_safe_key(self, combo_key: str) -> str: + """Get a safe key for memcached, hashing if necessary.""" + if len(combo_key) > MEMCACHED_MAX_KEY_LENGTH: + # Use MD5 hash for long keys - this is not for security, just for key shortening + return hashlib.md5(combo_key.encode()).hexdigest() # noqa: S324 + return combo_key + + def _test_connection(self) -> None: + """Test the memcached connection.""" + test_key = "__memcached_test__" + self._client.set(test_key, "test_value", expire=1) + result = self._client.get(test_key) + if result is None: + msg = "Failed to connect to Memcached" + raise StoreConnectionError(message=msg) + # Clean up test key + self._client.delete(test_key) + + @override + async def setup(self) -> None: + # Test the connection by performing a simple operation + try: + self._test_connection() + except Exception as e: + msg = f"Failed to connect to Memcached: {e}" + raise StoreConnectionError(message=msg) from e + + @override + async def get_entry(self, collection: str, key: str) -> ManagedEntry | None: + combo_key: str = compound_key(collection=collection, key=key) + safe_key = self._get_safe_key(combo_key) + + cache_entry: Any = self._client.get(safe_key) + + if cache_entry is None: + return None + + if not isinstance(cache_entry, (str, bytes)): + return None + + # Convert bytes to string if necessary + if isinstance(cache_entry, bytes): + cache_entry = cache_entry.decode("utf-8") + + return ManagedEntry.from_json(json_str=cache_entry) + + @override + async def put_entry( + self, + collection: str, + key: str, + cache_entry: ManagedEntry, + *, + ttl: float | None = None, + ) -> None: + combo_key: str = compound_key(collection=collection, key=key) + safe_key = self._get_safe_key(combo_key) + + json_value: str = cache_entry.to_json() + + if ttl is not None: + # Memcached TTL must be an integer + ttl = max(int(ttl), 1) + self._client.set(safe_key, json_value, expire=ttl) + else: + self._client.set(safe_key, json_value) + + @override + async def delete(self, collection: str, key: str) -> bool: + await self.setup_collection_once(collection=collection) + + combo_key: str = compound_key(collection=collection, key=key) + safe_key = self._get_safe_key(combo_key) + + return self._client.delete(safe_key) + + @override + async def keys(self, collection: str) -> list[str]: + await self.setup_collection_once(collection=collection) + + # Memcached doesn't support pattern matching or listing keys + # This is a limitation of memcached - we return an empty list + # In practice, applications should track keys separately if needed + return [] + + @override + async def clear_collection(self, collection: str) -> int: + await self.setup_collection_once(collection=collection) + + # Memcached doesn't support pattern matching or selective deletion + # This is a limitation of memcached - we return 0 + # In practice, applications would need to track keys separately + return 0 + + @override + async def list_collections(self) -> list[str]: + await self.setup_once() + + # Memcached doesn't support listing all keys or pattern matching + # This is a limitation of memcached - we return an empty list + return [] + + @override + async def cull(self) -> None: + # Memcached handles expiration automatically + # No need to manually cull expired entries + pass diff --git a/tests/stores/memcached/__init__.py b/tests/stores/memcached/__init__.py new file mode 100644 index 00000000..072b7147 --- /dev/null +++ b/tests/stores/memcached/__init__.py @@ -0,0 +1 @@ +# Memcached store tests diff --git a/tests/stores/memcached/test_memcached.py b/tests/stores/memcached/test_memcached.py new file mode 100644 index 00000000..88ea9d8c --- /dev/null +++ b/tests/stores/memcached/test_memcached.py @@ -0,0 +1,145 @@ +import asyncio +import contextlib +from collections.abc import AsyncGenerator + +import pytest +from pymemcache.client.base import Client +from typing_extensions import override + +from kv_store_adapter.errors import StoreConnectionError +from kv_store_adapter.stores.base.unmanaged import BaseKVStore +from kv_store_adapter.stores.memcached import MemcachedStore +from tests.stores.conftest import BaseStoreTests + +# Memcached test configuration +MEMCACHED_HOST = "localhost" +MEMCACHED_PORT = 11211 + +WAIT_FOR_MEMCACHED_TIMEOUT = 30 + + +async def ping_memcached() -> bool: + client = Client((MEMCACHED_HOST, MEMCACHED_PORT)) + try: + client.set("__test__", "test_value", expire=1) + result = client.get("__test__") + client.delete("__test__") + except Exception: + return False + else: + return result is not None + + +async def wait_memcached() -> bool: + # with a timeout of 30 seconds + for _ in range(WAIT_FOR_MEMCACHED_TIMEOUT): + if await ping_memcached(): + return True + await asyncio.sleep(1) + return False + + +@pytest.mark.skip_on_ci +class TestMemcachedStore(BaseStoreTests): + @pytest.fixture + @override + async def store(self) -> AsyncGenerator[BaseKVStore, None]: + if not await wait_memcached(): + pytest.skip("Memcached is not available") + + store = MemcachedStore(host=MEMCACHED_HOST, port=MEMCACHED_PORT) + yield store + + # Cleanup - flush all keys + with contextlib.suppress(Exception): + store._client.flush_all() + + async def test_memcached_store_initialization(self): + """Test that MemcachedStore can be initialized with different parameters.""" + # Test with host and port + store1 = MemcachedStore(host="localhost", port=11211) + assert store1._client is not None + + # Test with existing client + client = Client(("localhost", 11211)) + store2 = MemcachedStore(client=client) + assert store2._client is client + + async def test_memcached_store_long_keys(self): + """Test that memcached store handles long keys correctly by hashing them.""" + if not await wait_memcached(): + pytest.skip("Memcached is not available") + + store = MemcachedStore(host=MEMCACHED_HOST, port=MEMCACHED_PORT) + await store.setup() + + # Create a very long key that exceeds memcached's 250 character limit + long_collection = "a" * 200 + long_key = "b" * 200 + test_value = {"test": "value"} + + # This should work despite the long key + await store.put(collection=long_collection, key=long_key, value=test_value) + result = await store.get(collection=long_collection, key=long_key) + + assert result == test_value + + # Cleanup + await store.delete(collection=long_collection, key=long_key) + + async def test_memcached_store_limitations(self): + """Test that memcached store correctly handles its limitations.""" + if not await wait_memcached(): + pytest.skip("Memcached is not available") + + store = MemcachedStore(host=MEMCACHED_HOST, port=MEMCACHED_PORT) + await store.setup() + + # Set some test data + await store.put(collection="test", key="key1", value={"test": "value1"}) + await store.put(collection="test", key="key2", value={"test": "value2"}) + + # Test that keys() returns empty list (memcached limitation) + keys = await store.keys(collection="test") + assert keys == [] + + # Test that clear_collection() returns 0 (memcached limitation) + cleared = await store.clear_collection(collection="test") + assert cleared == 0 + + # Test that list_collections() returns empty list (memcached limitation) + collections = await store.list_collections() + assert collections == [] + + # Cleanup + store._client.flush_all() + + async def test_memcached_store_ttl(self): + """Test TTL functionality with memcached.""" + if not await wait_memcached(): + pytest.skip("Memcached is not available") + + store = MemcachedStore(host=MEMCACHED_HOST, port=MEMCACHED_PORT) + await store.setup() + + # Test with TTL + await store.put(collection="test", key="ttl_key", value={"test": "value"}, ttl=2) + + # Should exist immediately + result = await store.get(collection="test", key="ttl_key") + assert result == {"test": "value"} + + # Wait for expiration + await asyncio.sleep(3) + + # Should be expired now + result = await store.get(collection="test", key="ttl_key") + assert result is None + + async def test_memcached_store_connection_error(self): + """Test that connection errors are properly handled.""" + # Create store with invalid port to test connection error + store = MemcachedStore(host="localhost", port=99999) + + with pytest.raises(StoreConnectionError): + await store.setup()