Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,21 @@ def __init__(
self._raise_on_validation_error = raise_on_validation_error

def _validate_model(self, value: dict[str, Any]) -> T | None:
"""Validate and deserialize a dict into the configured Pydantic model.

This method handles both single models and list models. For list models, it expects the value
to contain an "items" key with the list data, following the convention used by `_serialize_model`.
If validation fails and `raise_on_validation_error` is False, returns None instead of raising.

Args:
value: The dict to validate and convert to a Pydantic model.

Returns:
The validated model instance, or None if validation fails and errors are suppressed.

Raises:
DeserializationError: If validation fails and `raise_on_validation_error` is True.
"""
try:
if self._is_list_model:
return self._type_adapter.validate_python(value.get("items", []))
Expand All @@ -62,6 +77,22 @@ def _validate_model(self, value: dict[str, Any]) -> T | None:
return None

def _serialize_model(self, value: T) -> dict[str, Any]:
"""Serialize a Pydantic model to a dict for storage.

This method handles both single models and list models. For list models, it wraps the serialized
list in a dict with an "items" key (e.g., {"items": [...]}) to ensure consistent dict-based storage
format across all value types. This wrapping convention is expected by `_validate_model` during
deserialization.

Args:
value: The Pydantic model instance to serialize.

Returns:
A dict representation of the model suitable for storage.

Raises:
SerializationError: If the model cannot be serialized.
"""
try:
if self._is_list_model:
return {"items": self._type_adapter.dump_python(value, mode="json")}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@

@runtime_checkable
class AsyncKeyValueProtocol(Protocol):
"""A subset of KV operations: get/put/delete and TTL variants, including bulk calls."""
"""A subset of KV operations: get/put/delete and TTL variants, including bulk calls.

This protocol defines the minimal contract for key-value store implementations. All methods are
async and may raise exceptions on connection failures, validation errors, or other operational issues.
Implementations should handle backend-specific errors appropriately.
"""

async def get(
self,
Expand Down Expand Up @@ -54,6 +59,9 @@ async def delete(self, key: str, *, collection: str | None = None) -> bool:
Args:
key: The key to delete the value from.
collection: The collection to delete the value from. If no collection is provided, it will use the default collection.

Returns:
True if the key was deleted, False if the key did not exist.
"""
...

Expand Down
34 changes: 34 additions & 0 deletions key-value/key-value-aio/src/key_value/aio/stores/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,18 @@


def _seed_to_frozen_seed_data(seed: SEED_DATA_TYPE) -> FROZEN_SEED_DATA_TYPE:
"""Convert mutable seed data to an immutable frozen structure.

This function converts the nested mapping structure of seed data into immutable MappingProxyType
objects at all levels. Using immutable structures prevents accidental modification of seed data
after store initialization and ensures thread-safety.

Args:
seed: The mutable seed data mapping: {collection: {key: {field: value}}}.

Returns:
An immutable frozen version of the seed data using MappingProxyType.
"""
return MappingProxyType(
{collection: MappingProxyType({key: MappingProxyType(value) for key, value in items.items()}) for collection, items in seed.items()}
)
Expand Down Expand Up @@ -101,6 +113,15 @@ async def _seed_store(self) -> None:
await self.put(key=key, value=dict(value), collection=collection)

async def setup(self) -> None:
"""Initialize the store if not already initialized.

This method is called automatically before any store operations and uses a lock to ensure
thread-safe lazy initialization. It can also be called manually to ensure the store is ready
before performing operations. The setup process includes calling the `_setup()` hook and
seeding the store with initial data if provided.

This method is idempotent - calling it multiple times has no additional effect after the first call.
"""
if not self._setup_complete:
async with self._setup_lock:
if not self._setup_complete:
Expand All @@ -116,6 +137,19 @@ async def setup(self) -> None:
await self._seed_store()

async def setup_collection(self, *, collection: str) -> None:
"""Initialize a specific collection if not already initialized.

This method is called automatically before any collection-specific operations and uses a per-collection
lock to ensure thread-safe lazy initialization. It can also be called manually to ensure a collection
is ready before performing operations on it. The setup process includes calling the `_setup_collection()`
hook for store-specific collection initialization.

This method is idempotent - calling it multiple times for the same collection has no additional effect
after the first call.

Args:
collection: The name of the collection to initialize.
"""
await self.setup()

if not self._setup_collection_complete[collection]:
Expand Down
39 changes: 35 additions & 4 deletions key-value/key-value-aio/src/key_value/aio/stores/mongodb/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,34 @@


def document_to_managed_entry(document: dict[str, Any]) -> ManagedEntry:
"""
Convert a MongoDB document to a ManagedEntry.
"""Convert a MongoDB document back to a ManagedEntry.

This function deserializes a MongoDB document (created by `managed_entry_to_document`) back to a
ManagedEntry object, parsing the stringified value field and preserving all metadata.

Args:
document: The MongoDB document to convert.

Returns:
A ManagedEntry object reconstructed from the document.
"""
return ManagedEntry.from_dict(data=document, stringified_value=True)


def managed_entry_to_document(key: str, managed_entry: ManagedEntry) -> dict[str, Any]:
"""
Convert a ManagedEntry to a MongoDB document.
"""Convert a ManagedEntry to a MongoDB document for storage.

This function serializes a ManagedEntry to a MongoDB document format, including the key and all
metadata (TTL, creation, and expiration timestamps). The value is stringified to ensure proper
storage in MongoDB. The serialization is designed to preserve all entry information for round-trip
conversion back to a ManagedEntry.

Args:
key: The key associated with this entry.
managed_entry: The ManagedEntry to serialize.

Returns:
A MongoDB document dict containing the key, value, and all metadata.
"""
return {
"key": key,
Expand Down Expand Up @@ -127,6 +146,18 @@ async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: #
await self._client.__aexit__(exc_type, exc_val, exc_tb)

def _sanitize_collection_name(self, collection: str) -> str:
"""Sanitize a collection name to meet MongoDB naming requirements.

MongoDB has specific requirements for collection names (length limits, allowed characters).
This method ensures collection names are compliant by truncating to the maximum allowed length
and replacing invalid characters with safe alternatives.

Args:
collection: The collection name to sanitize.

Returns:
A sanitized collection name that meets MongoDB requirements.
"""
return sanitize_string(value=collection, max_length=MAX_COLLECTION_LENGTH, allowed_characters=ALPHANUMERIC_CHARACTERS)

@override
Expand Down
25 changes: 21 additions & 4 deletions key-value/key-value-aio/src/key_value/aio/stores/redis/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,32 @@


def managed_entry_to_json(managed_entry: ManagedEntry) -> str:
"""
Convert a ManagedEntry to a JSON string.
"""Convert a ManagedEntry to a JSON string for Redis storage.

This function serializes a ManagedEntry to JSON format including all metadata (TTL, creation,
and expiration timestamps). The serialization is designed to preserve all entry information
for round-trip conversion back to a ManagedEntry.

Args:
managed_entry: The ManagedEntry to serialize.

Returns:
A JSON string representation of the ManagedEntry with full metadata.
"""
return managed_entry.to_json(include_metadata=True, include_expiration=True, include_creation=True)


def json_to_managed_entry(json_str: str) -> ManagedEntry:
"""
Convert a JSON string to a ManagedEntry.
"""Convert a JSON string from Redis storage back to a ManagedEntry.

This function deserializes a JSON string (created by `managed_entry_to_json`) back to a
ManagedEntry object, preserving all metadata including TTL, creation, and expiration timestamps.

Args:
json_str: The JSON string to deserialize.

Returns:
A ManagedEntry object reconstructed from the JSON string.
"""
return ManagedEntry.from_json(json_str=json_str, includes_metadata=True)

Expand Down
19 changes: 18 additions & 1 deletion key-value/key-value-aio/src/key_value/aio/wrappers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,24 @@


class BaseWrapper(AsyncKeyValue):
"""A base wrapper for KVStore implementations that passes through to the underlying store."""
"""A base wrapper for KVStore implementations that passes through to the underlying store.

This class implements the passthrough pattern where all operations are delegated to the wrapped
key-value store without modification. It serves as a foundation for creating custom wrappers that
need to intercept, modify, or enhance specific operations while passing through others unchanged.

To create a custom wrapper, subclass this class and override only the methods you need to customize.
All other operations will automatically pass through to the underlying store.

Example:
class LoggingWrapper(BaseWrapper):
async def get(self, key: str, *, collection: str | None = None):
logger.info(f"Getting key: {key}")
return await super().get(key, collection=collection)

Attributes:
key_value: The underlying AsyncKeyValue store that operations are delegated to.
"""

key_value: AsyncKeyValue

Expand Down