Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
from datetime import datetime
from typing import Any, overload

from elastic_transport import ObjectApiResponse
from elastic_transport import SerializationError as ElasticsearchSerializationError
from key_value.shared.errors import DeserializationError, SerializationError
from key_value.shared.utils.managed_entry import ManagedEntry
from key_value.shared.utils.sanitization import AlwaysHashStrategy, HashFragmentMode, HybridSanitizationStrategy
from key_value.shared.utils.sanitization import (
AlwaysHashStrategy,
HashFragmentMode,
HybridSanitizationStrategy,
SanitizationStrategy,
)
from key_value.shared.utils.sanitize import (
ALPHANUMERIC_CHARACTERS,
LOWERCASE_ALPHABET,
Expand All @@ -29,7 +32,10 @@
from key_value.aio.stores.elasticsearch.utils import LessCapableJsonSerializer, LessCapableNdjsonSerializer, new_bulk_action

try:
from elastic_transport import ObjectApiResponse
from elastic_transport import SerializationError as ElasticsearchSerializationError
from elasticsearch import AsyncElasticsearch
from elasticsearch.exceptions import BadRequestError

from key_value.aio.stores.elasticsearch.utils import (
get_aggregations_from_body,
Expand Down Expand Up @@ -63,12 +69,6 @@
},
"value": {
"properties": {
# You might think the `string` field should be a text/keyword field
# but this is the recommended mapping for large stringified json
"string": {
"type": "object",
"enabled": False,
},
"flattened": {
"type": "flattened",
},
Expand All @@ -83,80 +83,100 @@
MAX_KEY_LENGTH = 256
ALLOWED_KEY_CHARACTERS: str = ALPHANUMERIC_CHARACTERS

MAX_INDEX_LENGTH = 240
MAX_INDEX_LENGTH = 200
ALLOWED_INDEX_CHARACTERS: str = LOWERCASE_ALPHABET + NUMBERS + "_" + "-" + "."


class ElasticsearchSerializationAdapter(SerializationAdapter):
"""Adapter for Elasticsearch with support for native and string storage modes."""

_native_storage: bool

def __init__(self, *, native_storage: bool = True) -> None:
"""Initialize the Elasticsearch adapter.
"""Adapter for Elasticsearch."""

Args:
native_storage: If True (default), store values as flattened dicts.
If False, store values as JSON strings.
"""
def __init__(self) -> None:
"""Initialize the Elasticsearch adapter"""
super().__init__()

self._native_storage = native_storage
self._date_format = "isoformat"
self._value_format = "dict" if native_storage else "string"
self._value_format = "dict"

@override
def prepare_dump(self, data: dict[str, Any]) -> dict[str, Any]:
value = data.pop("value")

data["value"] = {}

if self._native_storage:
data["value"]["flattened"] = value
else:
data["value"]["string"] = value
data["value"] = {
"flattened": value,
}

return data

@override
def prepare_load(self, data: dict[str, Any]) -> dict[str, Any]:
value = data.pop("value")

if "flattened" in value:
data["value"] = value["flattened"]
elif "string" in value:
data["value"] = value["string"]
else:
msg = "Value field not found in Elasticsearch document"
raise DeserializationError(message=msg)
data["value"] = data.pop("value").get("flattened")

Comment on lines 111 to 113
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Restore legacy load compatibility.

Existing indices still hold "value" as the old JSON string. Calling .get("flattened") on that string raises immediately, breaking every read after deployment. Please keep the guard so both legacy and new documents deserialize.

-        data["value"] = data.pop("value").get("flattened")
+        raw_value = data.pop("value")
+        if isinstance(raw_value, dict):
+            data["value"] = raw_value.get("flattened", raw_value)
+        else:
+            data["value"] = raw_value
🤖 Prompt for AI Agents
In key-value/key-value-aio/src/key_value/aio/stores/elasticsearch/store.py
around lines 111 to 113, the prepare_load function currently assumes
data["value"] is a dict and calls .get("flattened") which will break when legacy
documents store a JSON string; update the logic to handle both cases: if
data["value"] is a string, json.loads it first; if it’s a dict, use
value.get("flattened") when present otherwise treat the dict as already
flattened; assign the resolved value back to data["value"] and return data.

return data


class ElasticsearchV1KeySanitizationStrategy(AlwaysHashStrategy):
def __init__(self) -> None:
super().__init__(
hash_length=64,
)


class ElasticsearchV1CollectionSanitizationStrategy(HybridSanitizationStrategy):
def __init__(self) -> None:
super().__init__(
replacement_character="_",
max_length=MAX_INDEX_LENGTH,
allowed_characters=UPPERCASE_ALPHABET + ALLOWED_INDEX_CHARACTERS,
hash_fragment_mode=HashFragmentMode.ALWAYS,
)


class ElasticsearchStore(
BaseEnumerateCollectionsStore, BaseEnumerateKeysStore, BaseDestroyCollectionStore, BaseCullStore, BaseContextManagerStore, BaseStore
):
"""A elasticsearch-based store."""
"""An Elasticsearch-based store.

Stores collections in their own indices and stores values in Flattened fields.

This store has specific restrictions on what is allowed in keys and collections. Keys and collections are not sanitized
by default which may result in errors when using the store.

To avoid issues, you may want to consider leveraging the `ElasticsearchV1KeySanitizationStrategy` and
`ElasticsearchV1CollectionSanitizationStrategy` strategies.
"""

_client: AsyncElasticsearch

_is_serverless: bool

_index_prefix: str

_native_storage: bool
_default_collection: str | None

_serializer: SerializationAdapter

_key_sanitization_strategy: SanitizationStrategy
_collection_sanitization_strategy: SanitizationStrategy

@overload
def __init__(
self,
*,
elasticsearch_client: AsyncElasticsearch,
index_prefix: str,
native_storage: bool = True,
default_collection: str | None = None,
) -> None: ...
key_sanitization_strategy: SanitizationStrategy | None = None,
collection_sanitization_strategy: SanitizationStrategy | None = None,
) -> None:
"""Initialize the elasticsearch store.

Args:
elasticsearch_client: The elasticsearch client to use.
index_prefix: The index prefix to use. Collections will be prefixed with this prefix.
default_collection: The default collection to use if no collection is provided.
key_sanitization_strategy: The sanitization strategy to use for keys.
collection_sanitization_strategy: The sanitization strategy to use for collections.
"""

@overload
def __init__(
Expand All @@ -165,9 +185,18 @@ def __init__(
url: str,
api_key: str | None = None,
index_prefix: str,
native_storage: bool = True,
default_collection: str | None = None,
) -> None: ...
key_sanitization_strategy: SanitizationStrategy | None = None,
collection_sanitization_strategy: SanitizationStrategy | None = None,
) -> None:
"""Initialize the elasticsearch store.

Args:
url: The url of the elasticsearch cluster.
api_key: The api key to use.
index_prefix: The index prefix to use. Collections will be prefixed with this prefix.
default_collection: The default collection to use if no collection is provided.
"""

def __init__(
self,
Expand All @@ -176,8 +205,9 @@ def __init__(
url: str | None = None,
api_key: str | None = None,
index_prefix: str,
native_storage: bool = True,
default_collection: str | None = None,
key_sanitization_strategy: SanitizationStrategy | None = None,
collection_sanitization_strategy: SanitizationStrategy | None = None,
) -> None:
"""Initialize the elasticsearch store.

Expand All @@ -186,9 +216,9 @@ def __init__(
url: The url of the elasticsearch cluster.
api_key: The api key to use.
index_prefix: The index prefix to use. Collections will be prefixed with this prefix.
native_storage: Whether to use native storage mode (flattened field type) or serialize
all values to JSON strings. Defaults to True.
default_collection: The default collection to use if no collection is provided.
key_sanitization_strategy: The sanitization strategy to use for keys.
collection_sanitization_strategy: The sanitization strategy to use for collections.
"""
if elasticsearch_client is None and url is None:
msg = "Either elasticsearch_client or url must be provided"
Expand All @@ -209,29 +239,14 @@ def __init__(
LessCapableNdjsonSerializer.install_serializer(client=self._client)

self._index_prefix = index_prefix.lower()
self._native_storage = native_storage
self._is_serverless = False

# We have 240 characters to work with
# We need to account for the index prefix and the hyphen.
max_index_length = MAX_INDEX_LENGTH - (len(self._index_prefix) + 1)

self._serializer = ElasticsearchSerializationAdapter(native_storage=native_storage)

# We allow uppercase through the sanitizer so we can lowercase them instead of them
# all turning into underscores.
collection_sanitization = HybridSanitizationStrategy(
replacement_character="_",
max_length=max_index_length,
allowed_characters=UPPERCASE_ALPHABET + ALLOWED_INDEX_CHARACTERS,
hash_fragment_mode=HashFragmentMode.ALWAYS,
)
key_sanitization = AlwaysHashStrategy()
self._serializer = ElasticsearchSerializationAdapter()

super().__init__(
default_collection=default_collection,
collection_sanitization_strategy=collection_sanitization,
key_sanitization_strategy=key_sanitization,
collection_sanitization_strategy=collection_sanitization_strategy,
key_sanitization_strategy=key_sanitization_strategy,
)

@override
Expand All @@ -247,7 +262,12 @@ async def _setup_collection(self, *, collection: str) -> None:
if await self._client.options(ignore_status=404).indices.exists(index=index_name):
return

_ = await self._client.options(ignore_status=404).indices.create(index=index_name, mappings=DEFAULT_MAPPING, settings={})
try:
_ = await self._client.options(ignore_status=404).indices.create(index=index_name, mappings=DEFAULT_MAPPING, settings={})
except BadRequestError as e:
if "index_already_exists_exception" in str(e).lower():
return
raise

def _get_index_name(self, collection: str) -> str:
return self._index_prefix + "-" + self._sanitize_collection(collection=collection).lower()
Expand Down
40 changes: 32 additions & 8 deletions key-value/key-value-aio/src/key_value/aio/stores/keyring/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from key_value.shared.utils.compound import compound_key
from key_value.shared.utils.managed_entry import ManagedEntry
from key_value.shared.utils.sanitization import HybridSanitizationStrategy
from key_value.shared.utils.sanitization import HybridSanitizationStrategy, SanitizationStrategy
from key_value.shared.utils.sanitize import ALPHANUMERIC_CHARACTERS
from typing_extensions import override

Expand All @@ -21,12 +21,36 @@
ALLOWED_KEY_COLLECTION_CHARACTERS: str = ALPHANUMERIC_CHARACTERS


class KeyringV1KeySanitizationStrategy(HybridSanitizationStrategy):
def __init__(self) -> None:
super().__init__(
replacement_character="_",
max_length=MAX_KEY_COLLECTION_LENGTH,
allowed_characters=ALLOWED_KEY_COLLECTION_CHARACTERS,
)


class KeyringV1CollectionSanitizationStrategy(HybridSanitizationStrategy):
def __init__(self) -> None:
super().__init__(
replacement_character="_",
max_length=MAX_KEY_COLLECTION_LENGTH,
allowed_characters=ALLOWED_KEY_COLLECTION_CHARACTERS,
)
Comment on lines +33 to +39
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consider reducing duplication between strategy classes.

KeyringV1CollectionSanitizationStrategy is identical to KeyringV1KeySanitizationStrategy. While the semantic separation may be intentional to allow future divergence, consider whether a single KeyringV1SanitizationStrategy would suffice, or extract the common initialization into a shared base.

🤖 Prompt for AI Agents
In key-value/key-value-aio/src/key_value/aio/stores/keyring/store.py around
lines 33 to 39, the two sanitization classes are duplicates; consolidate them by
either renaming one to a single KeyringV1SanitizationStrategy and removing the
duplicate class, or extract the common init args into a small shared base (e.g.,
KeyringV1BaseSanitizationStrategy) that both classes call; update references
across the codebase to use the new class name(s) and remove the redundant class
definition.



class KeyringStore(BaseStore):
"""Python keyring-based key-value store using keyring library.

This store uses the Python keyring to persist key-value pairs. Each entry is stored
This store uses the system's keyring to persist key-value pairs. Each entry is stored
as a password in the keychain with the combination of collection and key as the username.

This store has specific restrictions on what is allowed in keys and collections. Keys and collections are not sanitized
by default which may result in errors when using the store.

To avoid issues, you may want to consider leveraging the `KeyringV1KeySanitizationStrategy`
and `KeyringV1CollectionSanitizationStrategy` strategies.

Note: TTL is not natively supported by Python keyring, so TTL information is stored
within the JSON payload and checked at retrieval time.
"""
Expand All @@ -38,23 +62,23 @@ def __init__(
*,
service_name: str = DEFAULT_KEYCHAIN_SERVICE,
default_collection: str | None = None,
key_sanitization_strategy: SanitizationStrategy | None = None,
collection_sanitization_strategy: SanitizationStrategy | None = None,
) -> None:
"""Initialize the Python keyring store.

Args:
service_name: The service name to use in the keychain. Defaults to "py-key-value".
default_collection: The default collection to use if no collection is provided.
key_sanitization_strategy: The sanitization strategy to use for keys.
collection_sanitization_strategy: The sanitization strategy to use for collections.
"""
self._service_name = service_name

sanitization_strategy = HybridSanitizationStrategy(
replacement_character="_", max_length=MAX_KEY_COLLECTION_LENGTH, allowed_characters=ALLOWED_KEY_COLLECTION_CHARACTERS
)

super().__init__(
default_collection=default_collection,
collection_sanitization_strategy=sanitization_strategy,
key_sanitization_strategy=sanitization_strategy,
collection_sanitization_strategy=collection_sanitization_strategy,
key_sanitization_strategy=key_sanitization_strategy,
)

@override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from key_value.aio.stores.memcached.store import MemcachedStore
from key_value.aio.stores.memcached.store import MemcachedStore, MemcachedV1KeySanitizationStrategy

__all__ = ["MemcachedStore"]
__all__ = ["MemcachedStore", "MemcachedV1KeySanitizationStrategy"]
Loading