Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions DEVELOPING.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,22 @@ store = DiskStore(path="/path/to/cache", size_limit=1024*1024*1024) # 1GB
store = DiskStore(cache=existing_cache_instance)
```

### Memcached Store
High-performance distributed caching with Memcached:

```python
from kv_store_adapter import MemcachedStore

# Connection options
store = MemcachedStore(host="localhost", port=11211)
store = MemcachedStore(client=existing_pymemcache_client)
```

**Limitations:**
- `keys()`, `clear_collection()`, and `list_collections()` return empty results due to Memcached's lack of key enumeration
- Keys longer than 250 characters are automatically hashed with MD5
- TTL is handled natively by Memcached

### Elasticsearch Store
Full-text searchable storage with Elasticsearch:

Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ Choose the store that best fits your needs. All stores implement the same `KVSto
### Production Stores

- **RedisStore**: `RedisStore(url="redis://localhost:6379/0")`
- **MemcachedStore**: `MemcachedStore(host="localhost", port=11211)`
- **ElasticsearchStore**: `ElasticsearchStore(url="https://localhost:9200", api_key="your-api-key")`
- **DiskStore**: A sqlite-based store for local persistence `DiskStore(path="./cache")`
- **MemoryStore**: A fast in-memory cache `MemoryStore()`
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ memory = ["cachetools>=6.0.0"]
disk = ["diskcache>=5.6.0"]
redis = ["redis>=6.0.0"]
elasticsearch = ["elasticsearch>=9.0.0", "aiohttp>=3.12"]
memcached = ["pymemcache>=4.0.0"]
pydantic = ["pydantic>=2.11.9"]

[tool.pytest.ini_options]
Expand All @@ -40,7 +41,7 @@ env_files = [".env"]

[dependency-groups]
dev = [
"kv-store-adapter[memory,disk,redis,elasticsearch]",
"kv-store-adapter[memory,disk,redis,elasticsearch,memcached]",
"kv-store-adapter[pydantic]",
"pytest",
"pytest-mock",
Expand Down
3 changes: 3 additions & 0 deletions src/kv_store_adapter/stores/memcached/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .store import MemcachedStore

__all__ = ["MemcachedStore"]
154 changes: 154 additions & 0 deletions src/kv_store_adapter/stores/memcached/store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
import hashlib
from typing import Any, overload

from pymemcache.client.base import Client
from typing_extensions import override

from kv_store_adapter.errors import StoreConnectionError
from kv_store_adapter.stores.base.managed import BaseManagedKVStore
from kv_store_adapter.stores.utils.compound import compound_key
from kv_store_adapter.stores.utils.managed_entry import ManagedEntry

# Memcached key length limit
MEMCACHED_MAX_KEY_LENGTH = 250


class MemcachedStore(BaseManagedKVStore):
"""Memcached-based key-value store."""

_client: Client

@overload
def __init__(self, *, client: Client) -> None: ...

@overload
def __init__(self, *, host: str = "localhost", port: int = 11211) -> None: ...

def __init__(
self,
*,
client: Client | None = None,
host: str = "localhost",
port: int = 11211,
) -> None:
"""Initialize the Memcached store.

Args:
client: An existing pymemcache Client to use.
host: Memcached host. Defaults to localhost.
port: Memcached port. Defaults to 11211.
"""
if client:
self._client = client
else:
self._client = Client((host, port))

super().__init__()

def _get_safe_key(self, combo_key: str) -> str:
"""Get a safe key for memcached, hashing if necessary."""
if len(combo_key) > MEMCACHED_MAX_KEY_LENGTH:
# Use MD5 hash for long keys - this is not for security, just for key shortening
return hashlib.md5(combo_key.encode()).hexdigest() # noqa: S324
return combo_key

def _test_connection(self) -> None:
"""Test the memcached connection."""
test_key = "__memcached_test__"
self._client.set(test_key, "test_value", expire=1)
result = self._client.get(test_key)
if result is None:
msg = "Failed to connect to Memcached"
raise StoreConnectionError(message=msg)
# Clean up test key
self._client.delete(test_key)

@override
async def setup(self) -> None:
# Test the connection by performing a simple operation
try:
self._test_connection()
except Exception as e:
msg = f"Failed to connect to Memcached: {e}"
raise StoreConnectionError(message=msg) from e

@override
async def get_entry(self, collection: str, key: str) -> ManagedEntry | None:
combo_key: str = compound_key(collection=collection, key=key)
safe_key = self._get_safe_key(combo_key)

cache_entry: Any = self._client.get(safe_key)

if cache_entry is None:
return None

if not isinstance(cache_entry, (str, bytes)):
return None

# Convert bytes to string if necessary
if isinstance(cache_entry, bytes):
cache_entry = cache_entry.decode("utf-8")

return ManagedEntry.from_json(json_str=cache_entry)

@override
async def put_entry(
self,
collection: str,
key: str,
cache_entry: ManagedEntry,
*,
ttl: float | None = None,
) -> None:
combo_key: str = compound_key(collection=collection, key=key)
safe_key = self._get_safe_key(combo_key)

json_value: str = cache_entry.to_json()

if ttl is not None:
# Memcached TTL must be an integer
ttl = max(int(ttl), 1)
self._client.set(safe_key, json_value, expire=ttl)
else:
self._client.set(safe_key, json_value)

@override
async def delete(self, collection: str, key: str) -> bool:
await self.setup_collection_once(collection=collection)

combo_key: str = compound_key(collection=collection, key=key)
safe_key = self._get_safe_key(combo_key)

return self._client.delete(safe_key)

@override
async def keys(self, collection: str) -> list[str]:
await self.setup_collection_once(collection=collection)

# Memcached doesn't support pattern matching or listing keys
# This is a limitation of memcached - we return an empty list
# In practice, applications should track keys separately if needed
return []

@override
async def clear_collection(self, collection: str) -> int:
await self.setup_collection_once(collection=collection)

# Memcached doesn't support pattern matching or selective deletion
# This is a limitation of memcached - we return 0
# In practice, applications would need to track keys separately
return 0

@override
async def list_collections(self) -> list[str]:
await self.setup_once()

# Memcached doesn't support listing all keys or pattern matching
# This is a limitation of memcached - we return an empty list
return []

@override
async def cull(self) -> None:
# Memcached handles expiration automatically
# No need to manually cull expired entries
pass
1 change: 1 addition & 0 deletions tests/stores/memcached/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Memcached store tests
145 changes: 145 additions & 0 deletions tests/stores/memcached/test_memcached.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
import asyncio
import contextlib
from collections.abc import AsyncGenerator

import pytest
from pymemcache.client.base import Client
from typing_extensions import override

from kv_store_adapter.errors import StoreConnectionError
from kv_store_adapter.stores.base.unmanaged import BaseKVStore
from kv_store_adapter.stores.memcached import MemcachedStore
from tests.stores.conftest import BaseStoreTests

# Memcached test configuration
MEMCACHED_HOST = "localhost"
MEMCACHED_PORT = 11211

WAIT_FOR_MEMCACHED_TIMEOUT = 30


async def ping_memcached() -> bool:
client = Client((MEMCACHED_HOST, MEMCACHED_PORT))
try:
client.set("__test__", "test_value", expire=1)
result = client.get("__test__")
client.delete("__test__")
except Exception:
return False
else:
return result is not None


async def wait_memcached() -> bool:
# with a timeout of 30 seconds
for _ in range(WAIT_FOR_MEMCACHED_TIMEOUT):
if await ping_memcached():
return True
await asyncio.sleep(1)
return False


@pytest.mark.skip_on_ci
class TestMemcachedStore(BaseStoreTests):
@pytest.fixture
@override
async def store(self) -> AsyncGenerator[BaseKVStore, None]:
if not await wait_memcached():
pytest.skip("Memcached is not available")

store = MemcachedStore(host=MEMCACHED_HOST, port=MEMCACHED_PORT)
yield store

# Cleanup - flush all keys
with contextlib.suppress(Exception):
store._client.flush_all()

async def test_memcached_store_initialization(self):
"""Test that MemcachedStore can be initialized with different parameters."""
# Test with host and port
store1 = MemcachedStore(host="localhost", port=11211)
assert store1._client is not None

# Test with existing client
client = Client(("localhost", 11211))
store2 = MemcachedStore(client=client)
assert store2._client is client

async def test_memcached_store_long_keys(self):
"""Test that memcached store handles long keys correctly by hashing them."""
if not await wait_memcached():
pytest.skip("Memcached is not available")

store = MemcachedStore(host=MEMCACHED_HOST, port=MEMCACHED_PORT)
await store.setup()

# Create a very long key that exceeds memcached's 250 character limit
long_collection = "a" * 200
long_key = "b" * 200
test_value = {"test": "value"}

# This should work despite the long key
await store.put(collection=long_collection, key=long_key, value=test_value)
result = await store.get(collection=long_collection, key=long_key)

assert result == test_value

# Cleanup
await store.delete(collection=long_collection, key=long_key)

async def test_memcached_store_limitations(self):
"""Test that memcached store correctly handles its limitations."""
if not await wait_memcached():
pytest.skip("Memcached is not available")

store = MemcachedStore(host=MEMCACHED_HOST, port=MEMCACHED_PORT)
await store.setup()

# Set some test data
await store.put(collection="test", key="key1", value={"test": "value1"})
await store.put(collection="test", key="key2", value={"test": "value2"})

# Test that keys() returns empty list (memcached limitation)
keys = await store.keys(collection="test")
assert keys == []

# Test that clear_collection() returns 0 (memcached limitation)
cleared = await store.clear_collection(collection="test")
assert cleared == 0

# Test that list_collections() returns empty list (memcached limitation)
collections = await store.list_collections()
assert collections == []

# Cleanup
store._client.flush_all()

async def test_memcached_store_ttl(self):
"""Test TTL functionality with memcached."""
if not await wait_memcached():
pytest.skip("Memcached is not available")

store = MemcachedStore(host=MEMCACHED_HOST, port=MEMCACHED_PORT)
await store.setup()

# Test with TTL
await store.put(collection="test", key="ttl_key", value={"test": "value"}, ttl=2)

# Should exist immediately
result = await store.get(collection="test", key="ttl_key")
assert result == {"test": "value"}

# Wait for expiration
await asyncio.sleep(3)

# Should be expired now
result = await store.get(collection="test", key="ttl_key")
assert result is None

async def test_memcached_store_connection_error(self):
"""Test that connection errors are properly handled."""
# Create store with invalid port to test connection error
store = MemcachedStore(host="localhost", port=99999)

with pytest.raises(StoreConnectionError):
await store.setup()