diff --git a/README.md b/README.md index c190a090..6f7fd179 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ This monorepo contains two libraries: ## Why use this library? -- **Multiple backends**: DynamoDB, Elasticsearch, Memcached, MongoDB, Redis, +- **Multiple backends**: DynamoDB, S3, Elasticsearch, Memcached, MongoDB, Redis, RocksDB, Valkey, and In-memory, Disk, etc - **TTL support**: Automatic expiration handling across all store types - **Type-safe**: Full type hints with Protocol-based interfaces @@ -131,6 +131,7 @@ pip install py-key-value-aio pip install py-key-value-aio[memory] pip install py-key-value-aio[disk] pip install py-key-value-aio[dynamodb] +pip install py-key-value-aio[s3] pip install py-key-value-aio[elasticsearch] # or: redis, mongodb, memcached, valkey, vault, registry, rocksdb, see below for all options ``` @@ -191,7 +192,7 @@ categories: - **Local stores**: In-memory and disk-based storage (Memory, Disk, RocksDB, etc.) - **Secret stores**: Secure OS-level storage for sensitive data (Keyring, Vault) - **Distributed stores**: Network-based storage for multi-node apps (Redis, - DynamoDB, MongoDB, etc.) + DynamoDB, S3, MongoDB, etc.) Each store has a **stability rating** indicating likelihood of backwards-incompatible changes. Stable stores (Redis, Valkey, Disk, Keyring) diff --git a/key-value/key-value-aio/pyproject.toml b/key-value/key-value-aio/pyproject.toml index 43aebd42..d4f4edce 100644 --- a/key-value/key-value-aio/pyproject.toml +++ b/key-value/key-value-aio/pyproject.toml @@ -41,6 +41,7 @@ vault = ["hvac>=2.3.0", "types-hvac>=2.3.0"] memcached = ["aiomcache>=0.8.0"] elasticsearch = ["elasticsearch>=8.0.0", "aiohttp>=3.12"] dynamodb = ["aioboto3>=13.3.0", "types-aiobotocore-dynamodb>=2.16.0"] +s3 = ["aioboto3>=13.3.0", "types-aiobotocore-s3>=2.16.0"] keyring = ["keyring>=25.6.0"] keyring-linux = ["keyring>=25.6.0", "dbus-python>=1.4.0"] pydantic = ["pydantic>=2.11.9"] @@ -67,7 +68,7 @@ env_files = [".env"] [dependency-groups] dev = [ - "py-key-value-aio[memory,disk,redis,elasticsearch,memcached,mongodb,vault,dynamodb,rocksdb]", + "py-key-value-aio[memory,disk,redis,elasticsearch,memcached,mongodb,vault,dynamodb,s3,rocksdb]", "py-key-value-aio[valkey]; platform_system != 'Windows'", "py-key-value-aio[keyring]", "py-key-value-aio[pydantic]", diff --git a/key-value/key-value-aio/src/key_value/aio/stores/s3/__init__.py b/key-value/key-value-aio/src/key_value/aio/stores/s3/__init__.py new file mode 100644 index 00000000..95d93ceb --- /dev/null +++ b/key-value/key-value-aio/src/key_value/aio/stores/s3/__init__.py @@ -0,0 +1,13 @@ +"""AWS S3-based key-value store.""" + +from key_value.aio.stores.s3.store import ( + S3CollectionSanitizationStrategy, + S3KeySanitizationStrategy, + S3Store, +) + +__all__ = [ + "S3CollectionSanitizationStrategy", + "S3KeySanitizationStrategy", + "S3Store", +] diff --git a/key-value/key-value-aio/src/key_value/aio/stores/s3/store.py b/key-value/key-value-aio/src/key_value/aio/stores/s3/store.py new file mode 100644 index 00000000..3804fe52 --- /dev/null +++ b/key-value/key-value-aio/src/key_value/aio/stores/s3/store.py @@ -0,0 +1,456 @@ +from types import TracebackType +from typing import TYPE_CHECKING, Any, overload + +from key_value.shared.utils.managed_entry import ManagedEntry +from key_value.shared.utils.sanitization import SanitizationStrategy +from key_value.shared.utils.sanitize import hash_excess_length +from typing_extensions import Self, override + +from key_value.aio.stores.base import ( + BaseContextManagerStore, + BaseStore, +) + +# HTTP status code for not found errors +HTTP_NOT_FOUND = 404 + +# S3 key length limit is 1024 bytes +# We allocate space for collection, separator, and key +# Using 500 bytes for each allows for the separator and stays well under 1024 +MAX_COLLECTION_LENGTH = 500 +MAX_KEY_LENGTH = 500 + +try: + import aioboto3 + from aioboto3.session import Session # noqa: TC002 +except ImportError as e: + msg = "S3Store requires py-key-value-aio[s3]" + raise ImportError(msg) from e + +# aioboto3 generates types at runtime, so we use AioBaseClient at runtime but S3Client during static type checking +if TYPE_CHECKING: + from types_aiobotocore_s3.client import S3Client +else: + from aiobotocore.client import AioBaseClient as S3Client + + +class S3KeySanitizationStrategy(SanitizationStrategy): + """Sanitization strategy for S3 keys with byte-aware length limits. + + S3 has a maximum key length of 1024 bytes (UTF-8 encoded). This strategy + hashes keys that exceed the specified byte limit to ensure compliance. + + Args: + max_bytes: Maximum key length in bytes. Defaults to 500. + """ + + def __init__(self, max_bytes: int = MAX_KEY_LENGTH) -> None: + """Initialize the S3 key sanitization strategy. + + Args: + max_bytes: Maximum key length in bytes. + """ + self.max_bytes = max_bytes + + def sanitize(self, value: str) -> str: + """Hash the value if it exceeds max_bytes when UTF-8 encoded. + + Args: + value: The key to sanitize. + + Returns: + The original value if within limit, or truncated+hashed if too long. + """ + return hash_excess_length(value, self.max_bytes, length_is_bytes=True) + + def validate(self, value: str) -> None: + """No validation needed for S3 keys.""" + + +class S3CollectionSanitizationStrategy(S3KeySanitizationStrategy): + """Sanitization strategy for S3 collection names with byte-aware length limits. + + This is identical to S3KeySanitizationStrategy but uses a default of 500 bytes + for collection names to match the S3 key format {collection}/{key}. + """ + + def __init__(self, max_bytes: int = MAX_COLLECTION_LENGTH) -> None: + """Initialize the S3 collection sanitization strategy. + + Args: + max_bytes: Maximum collection name length in bytes. + """ + super().__init__(max_bytes=max_bytes) + + +class S3Store(BaseContextManagerStore, BaseStore): + """AWS S3-based key-value store. + + This store uses AWS S3 to store key-value pairs as objects. Each entry is stored + as a separate S3 object with the path format: {collection}/{key}. The ManagedEntry + is serialized to JSON and stored as the object body. TTL information is stored in + S3 object metadata and checked client-side during retrieval (S3 lifecycle policies + can be configured separately for background cleanup, but don't provide atomic TTL+retrieval). + + By default, collections and keys are not sanitized. This means you must ensure that + the combined "{collection}/{key}" path does not exceed S3's 1024-byte limit when UTF-8 encoded. + + To handle long collection or key names, use the S3CollectionSanitizationStrategy and + S3KeySanitizationStrategy which will hash values exceeding the byte limit. + + Example: + Basic usage with automatic AWS credentials: + + >>> async with S3Store(bucket_name="my-kv-store") as store: + ... await store.put(key="user:123", value={"name": "Alice"}, ttl=3600) + ... user = await store.get(key="user:123") + + With sanitization for long keys/collections: + + >>> async with S3Store( + ... bucket_name="my-kv-store", + ... collection_sanitization_strategy=S3CollectionSanitizationStrategy(), + ... key_sanitization_strategy=S3KeySanitizationStrategy(), + ... ) as store: + ... await store.put(key="very_long_key" * 100, value={"data": "test"}) + + With custom AWS credentials: + + >>> async with S3Store( + ... bucket_name="my-kv-store", + ... region_name="us-west-2", + ... aws_access_key_id="...", + ... aws_secret_access_key="...", + ... ) as store: + ... await store.put(key="config", value={"setting": "value"}) + + For local testing with LocalStack: + + >>> async with S3Store( + ... bucket_name="test-bucket", + ... endpoint_url="http://localhost:4566", + ... ) as store: + ... await store.put(key="test", value={"data": "test"}) + """ + + _bucket_name: str + _endpoint_url: str | None + _raw_client: Any # S3 client from aioboto3 + _client: S3Client | None + + @overload + def __init__( + self, + *, + client: S3Client, + bucket_name: str, + default_collection: str | None = None, + collection_sanitization_strategy: SanitizationStrategy | None = None, + key_sanitization_strategy: SanitizationStrategy | None = None, + ) -> None: + """Initialize the S3 store with a pre-configured client. + + Args: + client: The S3 client to use. You must have entered the context manager before passing this in. + bucket_name: The name of the S3 bucket to use. + default_collection: The default collection to use if no collection is provided. + collection_sanitization_strategy: Strategy for sanitizing collection names. Defaults to None (no sanitization). + key_sanitization_strategy: Strategy for sanitizing keys. Defaults to None (no sanitization). + """ + + @overload + def __init__( + self, + *, + bucket_name: str, + region_name: str | None = None, + endpoint_url: str | None = None, + aws_access_key_id: str | None = None, + aws_secret_access_key: str | None = None, + aws_session_token: str | None = None, + default_collection: str | None = None, + collection_sanitization_strategy: SanitizationStrategy | None = None, + key_sanitization_strategy: SanitizationStrategy | None = None, + ) -> None: + """Initialize the S3 store with AWS credentials. + + Args: + bucket_name: The name of the S3 bucket to use. + region_name: AWS region name. Defaults to None (uses AWS default). + endpoint_url: Custom endpoint URL (useful for LocalStack/MinIO). Defaults to None. + aws_access_key_id: AWS access key ID. Defaults to None (uses AWS default credentials). + aws_secret_access_key: AWS secret access key. Defaults to None (uses AWS default credentials). + aws_session_token: AWS session token. Defaults to None (uses AWS default credentials). + default_collection: The default collection to use if no collection is provided. + collection_sanitization_strategy: Strategy for sanitizing collection names. Defaults to None (no sanitization). + key_sanitization_strategy: Strategy for sanitizing keys. Defaults to None (no sanitization). + """ + + def __init__( + self, + *, + client: S3Client | None = None, + bucket_name: str, + region_name: str | None = None, + endpoint_url: str | None = None, + aws_access_key_id: str | None = None, + aws_secret_access_key: str | None = None, + aws_session_token: str | None = None, + default_collection: str | None = None, + collection_sanitization_strategy: SanitizationStrategy | None = None, + key_sanitization_strategy: SanitizationStrategy | None = None, + ) -> None: + """Initialize the S3 store. + + Args: + client: The S3 client to use. Defaults to None (creates a new client). + bucket_name: The name of the S3 bucket to use. + region_name: AWS region name. Defaults to None (uses AWS default). + endpoint_url: Custom endpoint URL (useful for LocalStack/MinIO). Defaults to None. + aws_access_key_id: AWS access key ID. Defaults to None (uses AWS default credentials). + aws_secret_access_key: AWS secret access key. Defaults to None (uses AWS default credentials). + aws_session_token: AWS session token. Defaults to None (uses AWS default credentials). + default_collection: The default collection to use if no collection is provided. + collection_sanitization_strategy: Strategy for sanitizing collection names. Defaults to None (no sanitization). + key_sanitization_strategy: Strategy for sanitizing keys. Defaults to None (no sanitization). + """ + self._bucket_name = bucket_name + self._endpoint_url = endpoint_url + + if client: + self._client = client + self._raw_client = None + else: + session: Session = aioboto3.Session( + region_name=region_name, + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + aws_session_token=aws_session_token, + ) + + self._raw_client = session.client(service_name="s3", endpoint_url=endpoint_url) # pyright: ignore[reportUnknownMemberType] + self._client = None + + super().__init__( + default_collection=default_collection, + collection_sanitization_strategy=collection_sanitization_strategy, + key_sanitization_strategy=key_sanitization_strategy, + ) + + async def _connect(self) -> None: + if self._client is None and self._raw_client: + self._client = await self._raw_client.__aenter__() + + async def _disconnect(self) -> None: + if self._client is not None: + await self._client.__aexit__(None, None, None) + self._client = None + + @override + async def __aenter__(self) -> Self: + await self._connect() + await super().__aenter__() + return self + + @override + async def __aexit__( + self, exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None + ) -> None: + await super().__aexit__(exc_type, exc_value, traceback) + await self._disconnect() + + @property + def _connected_client(self) -> S3Client: + """Get the connected S3 client. + + Raises: + ValueError: If the client is not connected. + + Returns: + The connected S3 client. + """ + if not self._client: + msg = "Client not connected" + raise ValueError(msg) + return self._client + + @override + async def _setup(self) -> None: + """Setup the S3 client and ensure bucket exists. + + This method creates the S3 bucket if it doesn't already exist. It uses the + HeadBucket operation to check for bucket existence and creates it if not found. + """ + await self._connect() + + from botocore.exceptions import ClientError + + try: + # Check if bucket exists + await self._connected_client.head_bucket(Bucket=self._bucket_name) # pyright: ignore[reportUnknownMemberType] + except ClientError as e: + # Only proceed with bucket creation if it's a 404/NoSuchBucket error + error_code = e.response.get("Error", {}).get("Code", "") # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] + http_status = e.response.get("ResponseMetadata", {}).get("HTTPStatusCode", 0) # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] + + if error_code in ("404", "NoSuchBucket") or http_status == HTTP_NOT_FOUND: + # Bucket doesn't exist, create it + import contextlib + + with contextlib.suppress(self._connected_client.exceptions.BucketAlreadyOwnedByYou): # pyright: ignore[reportUnknownMemberType] + # Build create_bucket parameters + create_params: dict[str, Any] = {"Bucket": self._bucket_name} + + # Get region from client metadata + region_name = getattr(self._connected_client.meta, "region_name", None) # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] + + # For regions other than us-east-1, we need to specify LocationConstraint + # Skip this for custom endpoints (LocalStack, MinIO) which may not support it + if region_name and region_name != "us-east-1" and not self._endpoint_url: + create_params["CreateBucketConfiguration"] = {"LocationConstraint": region_name} + + await self._connected_client.create_bucket(**create_params) # pyright: ignore[reportUnknownMemberType] + else: + # Re-raise authentication, permission, or other errors + raise + + def _get_s3_key(self, *, collection: str, key: str) -> str: + """Generate the S3 object key for a given collection and key. + + The collection and key are sanitized using the configured sanitization strategies + before being combined into the S3 object key format: {collection}/{key}. + + Args: + collection: The collection name. + key: The key within the collection. + + Returns: + The S3 object key in format: {collection}/{key} + """ + # Use the sanitization strategies from BaseStore + sanitized_collection, sanitized_key = self._sanitize_collection_and_key(collection=collection, key=key) + return f"{sanitized_collection}/{sanitized_key}" + + @override + async def _get_managed_entry(self, *, key: str, collection: str) -> ManagedEntry | None: + """Retrieve a managed entry from S3. + + This method fetches the object from S3, deserializes the JSON body to a ManagedEntry, + and checks for client-side TTL expiration. If the entry has expired, it is deleted + and None is returned. + + Args: + key: The key to retrieve. + collection: The collection to retrieve from. + + Returns: + The ManagedEntry if found and not expired, otherwise None. + """ + s3_key = self._get_s3_key(collection=collection, key=key) + + try: + response = await self._connected_client.get_object( # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] + Bucket=self._bucket_name, + Key=s3_key, + ) + + # Read the object body and ensure the streaming body is closed + async with response["Body"] as stream: # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] + body_bytes = await stream.read() # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] + json_value = body_bytes.decode("utf-8") # pyright: ignore[reportUnknownMemberType] + + # Deserialize to ManagedEntry + managed_entry = self._serialization_adapter.load_json(json_str=json_value) + + # Check for client-side expiration + if managed_entry.is_expired: + # Entry expired, return None without deleting + return None + return managed_entry # noqa: TRY300 + + except self._connected_client.exceptions.NoSuchKey: # pyright: ignore[reportUnknownMemberType] + # Object doesn't exist + return None + + @override + async def _put_managed_entry( + self, + *, + key: str, + collection: str, + managed_entry: ManagedEntry, + ) -> None: + """Store a managed entry in S3. + + This method serializes the ManagedEntry to JSON and stores it as an S3 object. + TTL information is stored in the object metadata for potential use by S3 lifecycle + policies (though lifecycle policies don't support atomic TTL+retrieval, so client-side + checking is still required). + + Args: + key: The key to store. + collection: The collection to store in. + managed_entry: The ManagedEntry to store. + """ + s3_key = self._get_s3_key(collection=collection, key=key) + json_value = self._serialization_adapter.dump_json(entry=managed_entry) + + # Prepare metadata + metadata: dict[str, str] = {} + if managed_entry.expires_at: + metadata["expires-at"] = managed_entry.expires_at.isoformat() + if managed_entry.created_at: + metadata["created-at"] = managed_entry.created_at.isoformat() + + await self._connected_client.put_object( # pyright: ignore[reportUnknownMemberType] + Bucket=self._bucket_name, + Key=s3_key, + Body=json_value.encode("utf-8"), + ContentType="application/json", + Metadata=metadata, + ) + + @override + async def _delete_managed_entry(self, *, key: str, collection: str) -> bool: + """Delete a managed entry from S3. + + Args: + key: The key to delete. + collection: The collection to delete from. + + Returns: + True if an object was deleted, False if the object didn't exist. + """ + s3_key = self._get_s3_key(collection=collection, key=key) + + from botocore.exceptions import ClientError + + try: + # Check if object exists before deletion + await self._connected_client.head_object( # pyright: ignore[reportUnknownMemberType] + Bucket=self._bucket_name, + Key=s3_key, + ) + + except ClientError as e: + # Check if it's a 404/not found error + error_code = e.response.get("Error", {}).get("Code", "") # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] + http_status = e.response.get("ResponseMetadata", {}).get("HTTPStatusCode", 0) # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] + + if error_code in ("404", "NoSuchKey") or http_status == HTTP_NOT_FOUND: + # Object doesn't exist + return False + # Re-raise other errors (auth, network, etc.) + raise + else: + # Object exists, delete it + await self._connected_client.delete_object( # pyright: ignore[reportUnknownMemberType] + Bucket=self._bucket_name, + Key=s3_key, + ) + return True + + @override + async def _close(self) -> None: + """Close the S3 client.""" + await self._disconnect() diff --git a/key-value/key-value-aio/tests/stores/base.py b/key-value/key-value-aio/tests/stores/base.py index 43177ee2..c2e57acf 100644 --- a/key-value/key-value-aio/tests/stores/base.py +++ b/key-value/key-value-aio/tests/stores/base.py @@ -30,7 +30,7 @@ async def eventually_consistent(self) -> None: # noqa: B027 @abstractmethod async def store(self) -> BaseStore | AsyncGenerator[BaseStore, None]: ... - @pytest.mark.timeout(60) + @pytest.mark.timeout(90) async def test_store(self, store: BaseStore): """Tests that the store is a valid AsyncKeyValueProtocol.""" assert isinstance(store, AsyncKeyValueProtocol) is True diff --git a/key-value/key-value-aio/tests/stores/dynamodb/test_dynamodb.py b/key-value/key-value-aio/tests/stores/dynamodb/test_dynamodb.py index 235b3572..f2a748d4 100644 --- a/key-value/key-value-aio/tests/stores/dynamodb/test_dynamodb.py +++ b/key-value/key-value-aio/tests/stores/dynamodb/test_dynamodb.py @@ -40,7 +40,7 @@ async def ping_dynamodb() -> bool: session = aioboto3.Session( aws_access_key_id="test", - aws_secret_access_key="test", # noqa: S106 + aws_secret_access_key="test", region_name="us-east-1", ) async with session.client(service_name="dynamodb", endpoint_url=DYNAMODB_ENDPOINT) as client: # type: ignore @@ -89,7 +89,7 @@ async def store(self, setup_dynamodb: None) -> DynamoDBStore: table_name=DYNAMODB_TEST_TABLE, endpoint_url=DYNAMODB_ENDPOINT, aws_access_key_id="test", - aws_secret_access_key="test", # noqa: S106 + aws_secret_access_key="test", region_name="us-east-1", ) @@ -98,7 +98,7 @@ async def store(self, setup_dynamodb: None) -> DynamoDBStore: session = aioboto3.Session( aws_access_key_id="test", - aws_secret_access_key="test", # noqa: S106 + aws_secret_access_key="test", region_name="us-east-1", ) async with session.client(service_name="dynamodb", endpoint_url=DYNAMODB_ENDPOINT) as client: # type: ignore diff --git a/key-value/key-value-aio/tests/stores/elasticsearch/test_elasticsearch.py b/key-value/key-value-aio/tests/stores/elasticsearch/test_elasticsearch.py index d47d6893..02048f91 100644 --- a/key-value/key-value-aio/tests/stores/elasticsearch/test_elasticsearch.py +++ b/key-value/key-value-aio/tests/stores/elasticsearch/test_elasticsearch.py @@ -1,3 +1,4 @@ +import logging from collections.abc import AsyncGenerator from datetime import datetime, timedelta, timezone @@ -32,6 +33,8 @@ "9.2.0", # Released Oct 2025 ] +logger = logging.getLogger(__name__) + def get_elasticsearch_client() -> AsyncElasticsearch: return AsyncElasticsearch(hosts=[ES_URL]) @@ -41,7 +44,11 @@ async def ping_elasticsearch() -> bool: es_client: AsyncElasticsearch = get_elasticsearch_client() async with es_client: - return await es_client.ping() + if await es_client.ping(): + logger.info("Elasticsearch pinged, wait for yellow status") + await es_client.cluster.health(wait_for_status="yellow", timeout="10s") + logger.info("Elasticsearch is ready") + return False async def cleanup_elasticsearch_indices(elasticsearch_client: AsyncElasticsearch): diff --git a/key-value/key-value-aio/tests/stores/s3/__init__.py b/key-value/key-value-aio/tests/stores/s3/__init__.py new file mode 100644 index 00000000..d1936388 --- /dev/null +++ b/key-value/key-value-aio/tests/stores/s3/__init__.py @@ -0,0 +1 @@ +"""Tests for S3Store.""" diff --git a/key-value/key-value-aio/tests/stores/s3/test_s3.py b/key-value/key-value-aio/tests/stores/s3/test_s3.py new file mode 100644 index 00000000..23035e75 --- /dev/null +++ b/key-value/key-value-aio/tests/stores/s3/test_s3.py @@ -0,0 +1,109 @@ +import contextlib +from collections.abc import AsyncGenerator + +import pytest +from key_value.shared.stores.wait import async_wait_for_true +from typing_extensions import override + +from key_value.aio.stores.base import BaseStore +from key_value.aio.stores.s3 import S3Store +from tests.conftest import docker_container, should_skip_docker_tests +from tests.stores.base import BaseStoreTests, ContextManagerStoreTestMixin + +# S3 test configuration (using LocalStack) +S3_HOST = "localhost" +S3_HOST_PORT = 4566 +S3_ENDPOINT = f"http://{S3_HOST}:{S3_HOST_PORT}" +S3_TEST_BUCKET = "kv-store-test" + +WAIT_FOR_S3_TIMEOUT = 30 + +# LocalStack versions to test +LOCALSTACK_VERSIONS_TO_TEST = [ + "4.0.3", # Latest stable version +] + +LOCALSTACK_CONTAINER_PORT = 4566 + + +async def ping_s3() -> bool: + """Check if LocalStack S3 is running.""" + try: + import aioboto3 + + session = aioboto3.Session( + aws_access_key_id="test", + aws_secret_access_key="test", + region_name="us-east-1", + ) + async with session.client(service_name="s3", endpoint_url=S3_ENDPOINT) as client: # type: ignore + await client.list_buckets() # type: ignore + except Exception: + return False + else: + return True + + +class S3FailedToStartError(Exception): + pass + + +@pytest.mark.skipif(should_skip_docker_tests(), reason="Docker is not available") +class TestS3Store(ContextManagerStoreTestMixin, BaseStoreTests): + @pytest.fixture(scope="session", params=LOCALSTACK_VERSIONS_TO_TEST) + async def setup_s3(self, request: pytest.FixtureRequest) -> AsyncGenerator[None, None]: + version = request.param + + # LocalStack container for S3 + with docker_container( + f"s3-test-{version}", + f"localstack/localstack:{version}", + {str(LOCALSTACK_CONTAINER_PORT): S3_HOST_PORT}, + environment={"SERVICES": "s3"}, + ): + if not await async_wait_for_true(bool_fn=ping_s3, tries=WAIT_FOR_S3_TIMEOUT, wait_time=2): + msg = f"LocalStack S3 {version} failed to start" + raise S3FailedToStartError(msg) + + yield + + @override + @pytest.fixture + async def store(self, setup_s3: None) -> S3Store: + from key_value.aio.stores.s3 import S3CollectionSanitizationStrategy, S3KeySanitizationStrategy + + store = S3Store( + bucket_name=S3_TEST_BUCKET, + endpoint_url=S3_ENDPOINT, + aws_access_key_id="test", + aws_secret_access_key="test", + region_name="us-east-1", + # Use sanitization strategies for tests to handle long collection/key names + collection_sanitization_strategy=S3CollectionSanitizationStrategy(), + key_sanitization_strategy=S3KeySanitizationStrategy(), + ) + + # Clean up test bucket if it exists + import aioboto3 + + session = aioboto3.Session( + aws_access_key_id="test", + aws_secret_access_key="test", + region_name="us-east-1", + ) + async with session.client(service_name="s3", endpoint_url=S3_ENDPOINT) as client: # type: ignore + with contextlib.suppress(Exception): + # Delete all objects in the bucket + response = await client.list_objects_v2(Bucket=S3_TEST_BUCKET) # type: ignore + if "Contents" in response: + for obj in response["Contents"]: # type: ignore + await client.delete_object(Bucket=S3_TEST_BUCKET, Key=obj["Key"]) # type: ignore + + # Delete the bucket + await client.delete_bucket(Bucket=S3_TEST_BUCKET) # type: ignore + + return store + + @pytest.mark.skip(reason="Distributed Caches are unbounded") + @override + async def test_not_unbounded(self, store: BaseStore): ... diff --git a/key-value/key-value-aio/tests/stores/vault/test_vault.py b/key-value/key-value-aio/tests/stores/vault/test_vault.py index e9704641..b7e25ec0 100644 --- a/key-value/key-value-aio/tests/stores/vault/test_vault.py +++ b/key-value/key-value-aio/tests/stores/vault/test_vault.py @@ -13,7 +13,7 @@ # Vault test configuration VAULT_HOST = "localhost" VAULT_PORT = 8200 -VAULT_TOKEN = "dev-root-token" # noqa: S105 +VAULT_TOKEN = "dev-root-token" VAULT_MOUNT_POINT = "secret" VAULT_CONTAINER_PORT = 8200 diff --git a/key-value/key-value-shared/src/key_value/shared/utils/sanitize.py b/key-value/key-value-shared/src/key_value/shared/utils/sanitize.py index ce1e5df3..91a684a4 100644 --- a/key-value/key-value-shared/src/key_value/shared/utils/sanitize.py +++ b/key-value/key-value-shared/src/key_value/shared/utils/sanitize.py @@ -61,6 +61,37 @@ def sanitize_characters_in_string(value: str, allowed_characters: str, replace_w return new_value +def _truncate_to_bytes(value: str, max_bytes: int, encoding: str = "utf-8") -> str: + """Truncate a string to fit within max_bytes when encoded, without splitting multi-byte characters. + + Args: + value: The string to truncate. + max_bytes: The maximum number of bytes. + encoding: The encoding to use (default: utf-8). + + Returns: + The truncated string that fits within max_bytes. + """ + encoded = value.encode(encoding) + if len(encoded) <= max_bytes: + return value + + # Binary search to find the longest substring that fits + left, right = 0, len(value) + result = "" + + while left <= right: + mid = (left + right) // 2 + candidate = value[:mid] + if len(candidate.encode(encoding)) <= max_bytes: + result = candidate + left = mid + 1 + else: + right = mid - 1 + + return result + + @bear_enforce def sanitize_string( value: str, @@ -70,6 +101,7 @@ def sanitize_string( hash_fragment_separator: str = DEFAULT_HASH_FRAGMENT_SEPARATOR, hash_fragment_mode: HashFragmentMode = HashFragmentMode.ONLY_IF_CHANGED, hash_fragment_length: int = DEFAULT_HASH_FRAGMENT_SIZE, + length_is_bytes: bool = False, ) -> str: """Sanitize the value, replacing characters and optionally adding a fragment a hash of the value if requested. @@ -81,9 +113,10 @@ def sanitize_string( Args: value: The value to sanitize. allowed_characters: The allowed characters in the value. - max_length: The maximum length of the value (with the hash fragment added). + max_length: The maximum length of the value (with hash fragment). Interpreted as bytes if length_is_bytes is True. hash_fragment_separator: The separator to add between the value and the hash fragment. hash_fragment_mode: The mode to add the hash fragment. + length_is_bytes: If True, max_length is interpreted as bytes instead of characters. """ if max_length < MINIMUM_MAX_LENGTH: msg = f"max_length must be greater than or equal to {MINIMUM_MAX_LENGTH}" @@ -106,8 +139,7 @@ def sanitize_string( if hash_fragment_mode == HashFragmentMode.ALWAYS: actual_max_length = max_length - hash_fragment_size_required - - sanitized_value = sanitized_value[:actual_max_length] + sanitized_value = _truncate_to_bytes(sanitized_value, actual_max_length) if length_is_bytes else sanitized_value[:actual_max_length] if not sanitized_value: return hash_fragment @@ -115,14 +147,13 @@ def sanitize_string( return sanitized_value + hash_fragment_separator + hash_fragment if hash_fragment_mode == HashFragmentMode.ONLY_IF_CHANGED: - sanitized_value = sanitized_value[:max_length] + sanitized_value = _truncate_to_bytes(sanitized_value, max_length) if length_is_bytes else sanitized_value[:max_length] if value == sanitized_value: return value actual_max_length = max_length - hash_fragment_size_required - - sanitized_value = sanitized_value[:actual_max_length] + sanitized_value = _truncate_to_bytes(sanitized_value, actual_max_length) if length_is_bytes else sanitized_value[:actual_max_length] if not sanitized_value: return hash_fragment @@ -133,18 +164,19 @@ def sanitize_string( msg = "Entire value was sanitized and hash_fragment_mode is HashFragmentMode.NEVER" raise ValueError(msg) - return sanitized_value + return _truncate_to_bytes(sanitized_value, max_length) if length_is_bytes else sanitized_value @bear_enforce -def hash_excess_length(value: str, max_length: int) -> str: +def hash_excess_length(value: str, max_length: int, length_is_bytes: bool = False) -> str: """Hash part of the value if it exceeds the maximum length. This operation will truncate the value to the maximum length minus 8 characters and will swap the last 8 characters with the first 8 characters of the generated hash. Args: value: The value to hash. - max_length: The maximum length of the value. Must be greater than 32. + max_length: The maximum length of the value. Must be greater than 16. If length_is_bytes is True, this is interpreted as bytes. + length_is_bytes: If True, max_length is interpreted as bytes instead of characters. Returns: The hashed value if the value exceeds the maximum length, otherwise the original value. @@ -153,10 +185,13 @@ def hash_excess_length(value: str, max_length: int) -> str: msg = f"max_length must be greater than {MINIMUM_MAX_LENGTH}" raise ValueError(msg) - if len(value) <= max_length: + # Check if truncation is needed + current_length = len(value.encode("utf-8")) if length_is_bytes else len(value) + if current_length <= max_length: return value - truncated_value = value[: max_length - 8] + # Truncate to max_length - 8 to make room for hash + truncated_value = _truncate_to_bytes(value, max_length - 8) if length_is_bytes else value[: max_length - 8] hash_of_value = hashlib.sha256(value.encode()).hexdigest() first_eight_of_hash = hash_of_value[:8] diff --git a/key-value/key-value-sync/tests/code_gen/stores/base.py b/key-value/key-value-sync/tests/code_gen/stores/base.py index b78e279d..9c6303b1 100644 --- a/key-value/key-value-sync/tests/code_gen/stores/base.py +++ b/key-value/key-value-sync/tests/code_gen/stores/base.py @@ -27,7 +27,7 @@ def eventually_consistent(self) -> None: # noqa: B027 @abstractmethod def store(self) -> BaseStore | Generator[BaseStore, None, None]: ... - @pytest.mark.timeout(60) + @pytest.mark.timeout(90) def test_store(self, store: BaseStore): """Tests that the store is a valid KeyValueProtocol.""" assert isinstance(store, KeyValueProtocol) is True diff --git a/key-value/key-value-sync/tests/code_gen/stores/elasticsearch/test_elasticsearch.py b/key-value/key-value-sync/tests/code_gen/stores/elasticsearch/test_elasticsearch.py index 2bdf1274..b634263f 100644 --- a/key-value/key-value-sync/tests/code_gen/stores/elasticsearch/test_elasticsearch.py +++ b/key-value/key-value-sync/tests/code_gen/stores/elasticsearch/test_elasticsearch.py @@ -1,6 +1,7 @@ # WARNING: this file is auto-generated by 'build_sync_library.py' # from the original file 'test_elasticsearch.py' # DO NOT CHANGE! Change the original file instead. +import logging from collections.abc import Generator from datetime import datetime, timedelta, timezone @@ -33,6 +34,8 @@ # Released Oct 2025 ELASTICSEARCH_VERSIONS_TO_TEST = ["9.0.0", "9.2.0"] +logger = logging.getLogger(__name__) + def get_elasticsearch_client() -> Elasticsearch: return Elasticsearch(hosts=[ES_URL]) @@ -42,7 +45,11 @@ def ping_elasticsearch() -> bool: es_client: Elasticsearch = get_elasticsearch_client() with es_client: - return es_client.ping() + if es_client.ping(): + logger.info("Elasticsearch pinged, wait for yellow status") + es_client.cluster.health(wait_for_status="yellow", timeout="10s") + logger.info("Elasticsearch is ready") + return False def cleanup_elasticsearch_indices(elasticsearch_client: Elasticsearch): diff --git a/key-value/key-value-sync/tests/code_gen/stores/vault/test_vault.py b/key-value/key-value-sync/tests/code_gen/stores/vault/test_vault.py index 6eddf1f1..d0cbc6d3 100644 --- a/key-value/key-value-sync/tests/code_gen/stores/vault/test_vault.py +++ b/key-value/key-value-sync/tests/code_gen/stores/vault/test_vault.py @@ -14,7 +14,7 @@ # Vault test configuration VAULT_HOST = "localhost" VAULT_PORT = 8200 -VAULT_TOKEN = "dev-root-token" # noqa: S105 +VAULT_TOKEN = "dev-root-token" VAULT_MOUNT_POINT = "secret" VAULT_CONTAINER_PORT = 8200 diff --git a/pyproject.toml b/pyproject.toml index ed23a84a..a947d4e3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -64,6 +64,8 @@ line-length = 140 [tool.ruff.lint.extend-per-file-ignores] "**/tests/*.py" = [ "S101", # Ignore asserts + "S105", # Ignore hardcoded password string (test credentials) + "S106", # Ignore hardcoded password function argument (test credentials) "DTZ005", # Ignore datetime.UTC "PLR2004", # Ignore magic values "E501", # Ignore line length diff --git a/scripts/build_sync_library.py b/scripts/build_sync_library.py index c79a870b..1263fdc5 100644 --- a/scripts/build_sync_library.py +++ b/scripts/build_sync_library.py @@ -56,6 +56,8 @@ "key-value/key-value-aio/tests/stores/dynamodb", "key-value/key-value-aio/src/key_value/aio/stores/memcached", "key-value/key-value-aio/tests/stores/memcached", + "key-value/key-value-aio/src/key_value/aio/stores/s3", + "key-value/key-value-aio/tests/stores/s3", "key-value/key-value-aio/src/key_value/aio/wrappers/timeout", "key-value/key-value-aio/tests/wrappers/timeout", ] diff --git a/uv.lock b/uv.lock index bea6629b..e1355868 100644 --- a/uv.lock +++ b/uv.lock @@ -1743,6 +1743,10 @@ redis = [ rocksdb = [ { name = "rocksdict" }, ] +s3 = [ + { name = "aioboto3" }, + { name = "types-aiobotocore-s3" }, +] valkey = [ { name = "valkey-glide" }, ] @@ -1757,13 +1761,14 @@ wrappers-encryption = [ [package.dev-dependencies] dev = [ { name = "py-key-value", extra = ["dev"] }, - { name = "py-key-value-aio", extra = ["disk", "dynamodb", "elasticsearch", "keyring", "memcached", "memory", "mongodb", "pydantic", "redis", "rocksdb", "vault", "wrappers-encryption"] }, + { name = "py-key-value-aio", extra = ["disk", "dynamodb", "elasticsearch", "keyring", "memcached", "memory", "mongodb", "pydantic", "redis", "rocksdb", "s3", "vault", "wrappers-encryption"] }, { name = "py-key-value-aio", extra = ["valkey"], marker = "sys_platform != 'win32'" }, ] [package.metadata] requires-dist = [ { name = "aioboto3", marker = "extra == 'dynamodb'", specifier = ">=13.3.0" }, + { name = "aioboto3", marker = "extra == 's3'", specifier = ">=13.3.0" }, { name = "aiohttp", marker = "extra == 'elasticsearch'", specifier = ">=3.12" }, { name = "aiomcache", marker = "extra == 'memcached'", specifier = ">=0.8.0" }, { name = "beartype", specifier = ">=0.20.0" }, @@ -1783,16 +1788,17 @@ requires-dist = [ { name = "rocksdict", marker = "python_full_version >= '3.12' and extra == 'rocksdb'", specifier = ">=0.3.24" }, { name = "rocksdict", marker = "python_full_version < '3.12' and extra == 'rocksdb'", specifier = ">=0.3.2" }, { name = "types-aiobotocore-dynamodb", marker = "extra == 'dynamodb'", specifier = ">=2.16.0" }, + { name = "types-aiobotocore-s3", marker = "extra == 's3'", specifier = ">=2.16.0" }, { name = "types-hvac", marker = "extra == 'vault'", specifier = ">=2.3.0" }, { name = "valkey-glide", marker = "extra == 'valkey'", specifier = ">=2.1.0" }, ] -provides-extras = ["memory", "disk", "redis", "mongodb", "valkey", "vault", "memcached", "elasticsearch", "dynamodb", "keyring", "keyring-linux", "pydantic", "rocksdb", "wrappers-encryption"] +provides-extras = ["memory", "disk", "redis", "mongodb", "valkey", "vault", "memcached", "elasticsearch", "dynamodb", "s3", "keyring", "keyring-linux", "pydantic", "rocksdb", "wrappers-encryption"] [package.metadata.requires-dev] dev = [ { name = "py-key-value", extras = ["dev"], editable = "." }, { name = "py-key-value-aio", extras = ["keyring"] }, - { name = "py-key-value-aio", extras = ["memory", "disk", "redis", "elasticsearch", "memcached", "mongodb", "vault", "dynamodb", "rocksdb"] }, + { name = "py-key-value-aio", extras = ["memory", "disk", "redis", "elasticsearch", "memcached", "mongodb", "vault", "dynamodb", "s3", "rocksdb"] }, { name = "py-key-value-aio", extras = ["pydantic"] }, { name = "py-key-value-aio", extras = ["valkey"], marker = "sys_platform != 'win32'" }, { name = "py-key-value-aio", extras = ["wrappers-encryption"] }, @@ -2617,6 +2623,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/ca/4f/05d80aa8b5a95b82ddb89547c3a037b9460702286c66ca6e0fbb8fa2ce86/types_aiobotocore_dynamodb-2.25.0-py3-none-any.whl", hash = "sha256:de791dfcef90eb3431c09b63419301f9ff824d82970623e149a427d5fd325430", size = 57971, upload-time = "2025-10-11T01:27:39.639Z" }, ] +[[package]] +name = "types-aiobotocore-s3" +version = "2.25.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "typing-extensions", marker = "python_full_version < '3.12'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/17/c7/d69fd0aa19b61ae67357b4d4aa546be7a6e0b38cde5d0c0704208e00791a/types_aiobotocore_s3-2.25.0.tar.gz", hash = "sha256:3885d2d975b02490c4a59ab0d9c607dec2b7d650189f19a2b6a5e0f81a07e56d", size = 76334, upload-time = "2025-10-11T01:32:43.509Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/43/42/0606f39a94b828f53db6741a5803626766379fc653c43fa98d1a3da28fb8/types_aiobotocore_s3-2.25.0-py3-none-any.whl", hash = "sha256:637b8b3eabd71a8083e835adb696e08b8bab75fb2e3f1ef3020334659abdf698", size = 83849, upload-time = "2025-10-11T01:32:42.437Z" }, +] + [[package]] name = "types-hvac" version = "2.3.0.20250914"