Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions .github/workflows/momento-local-tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
name: Momento Local tests

on:
pull_request:
branches: [main]

jobs:
local-tests:
strategy:
matrix:
os: [ubuntu-24.04]
python-version: ["3.13"]
runs-on: ${{ matrix.os }}

env:
TEST_API_KEY: ${{ secrets.ALPHA_TEST_AUTH_TOKEN }}
TEST_CACHE_NAME: python-integration-test-${{ matrix.python-version }}-${{ matrix.new-python-protobuf }}-${{ github.sha }}

steps:
- uses: actions/checkout@v4

- name: Setup Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}

- name: Install and configure Poetry
uses: snok/install-poetry@v1
with:
version: 1.3.1
virtualenvs-in-project: true

- name: Install dependencies
run: poetry install

- name: Start Momento Local
run: |
docker run --cap-add=NET_ADMIN --rm -d -p 8080:8080 -p 9090:9090 gomomento/momento-local --enable-test-admin

- name: Run tests
run: poetry run pytest -p no:sugar -q -m local
2 changes: 1 addition & 1 deletion .github/workflows/on-pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ jobs:
run: poetry run ruff format --check --diff src tests

- name: Run tests
run: poetry run pytest -p no:sugar -q
run: poetry run pytest -p no:sugar -q -m "not local"

test-examples:
runs-on: ubuntu-24.04
Expand Down
7 changes: 6 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,12 @@ gen-sync: do-gen-sync format lint
.PHONY: test
## Run unit and integration tests with pytest
test:
@poetry run pytest
@poetry run pytest -m "not local"

.PHONY: test-local
## Run the integration tests that require Momento Local
test-local:
@poetry run pytest -m local
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎗️ this only runs the momento-local specific tests. In the future we could run the entire test suite against Momento local.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Part of my dilemma when choosing a name for this was that we'll have tests like these that always run against momento local, and the standard integration tests that could run with it. I wasn't sure whether we should separate those. We can solve that problem in the future, I suppose.


.PHONY: precommit
## Run format, lint, and test as a step before committing.
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ log_level = "ERROR"
log_cli = true
log_cli_format = "%(asctime)s [%(levelname)s] %(message)s"
log_cli_date_format = "%Y-%m-%d %H:%M:%S.%f"
markers = [
"local: tests that require Momento Local",
]
Comment on lines +62 to +64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!


[tool.mypy]
python_version = "3.7"
Expand Down
2 changes: 1 addition & 1 deletion src/momento/retry/fixed_timeout_retry_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def determine_when_to_retry(self, props: RetryableProps) -> Optional[float]:
# If a retry attempt's timeout has passed but the client's overall timeout has not yet passed,
# we should reset the deadline and retry.
if (
props.attempt_number > 0
props.attempt_number > 0 # type: ignore[misc]
and props.grpc_status == grpc.StatusCode.DEADLINE_EXCEEDED # type: ignore[misc]
and props.overall_deadline > datetime.now()
):
Expand Down
53 changes: 51 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
import asyncio
import os
import random
from contextlib import asynccontextmanager, contextmanager
from datetime import timedelta
from typing import AsyncIterator, Callable, Iterator, List, Optional, Union, cast
from typing import AsyncGenerator, AsyncIterator, Callable, Iterator, List, Optional, Union, cast

import pytest
import pytest_asyncio
Expand Down Expand Up @@ -41,6 +42,8 @@
TTopicName,
)

from tests.momento.local.momento_local_async_middleware import MomentoLocalAsyncMiddleware, MomentoLocalMiddlewareArgs
from tests.momento.local.momento_local_middleware import MomentoLocalMiddleware
from tests.utils import (
unique_test_cache_name,
uuid_bytes,
Expand All @@ -51,13 +54,17 @@
# Integration test data
#######################

TEST_CONFIGURATION = Configurations.Laptop.latest()
TEST_CONFIGURATION: Configuration = Configurations.Laptop.latest()
TEST_TOPIC_CONFIGURATION = TopicConfigurations.Default.latest().with_client_timeout(timedelta(seconds=10))
TEST_AUTH_CONFIGURATION = AuthConfigurations.Laptop.latest()


TEST_AUTH_PROVIDER = CredentialProvider.from_environment_variable("TEST_API_KEY")

MOMENTO_LOCAL_HOSTNAME = os.environ.get("MOMENTO_HOSTNAME", "127.0.0.1")
MOMENTO_LOCAL_PORT = int(os.environ.get("MOMENTO_PORT", "8080"))
TEST_LOCAL_AUTH_PROVIDER = CredentialProvider.for_momento_local(MOMENTO_LOCAL_HOSTNAME, MOMENTO_LOCAL_PORT)


TEST_CACHE_NAME: Optional[str] = os.getenv("TEST_CACHE_NAME")
if not TEST_CACHE_NAME:
Expand Down Expand Up @@ -354,6 +361,48 @@ async def auth_client_async() -> AsyncIterator[AuthClientAsync]:
yield _auth_client


@asynccontextmanager
async def client_async_local(
cache_name: str,
middleware_args: Optional[MomentoLocalMiddlewareArgs] = None,
config_fn: Optional[Callable[[Configuration], Configuration]] = None,
) -> AsyncGenerator[CacheClientAsync, None]:
config = TEST_CONFIGURATION

if config_fn:
config = config_fn(config)

if middleware_args:
config = config.add_middleware(MomentoLocalAsyncMiddleware(middleware_args))

client = await CacheClientAsync.create(config, TEST_LOCAL_AUTH_PROVIDER, DEFAULT_TTL_SECONDS)

await client.create_cache(cache_name)

yield client


@contextmanager
def client_local(
cache_name: str,
middleware_args: Optional[MomentoLocalMiddlewareArgs] = None,
config_fn: Optional[Callable[[Configuration], Configuration]] = None,
) -> Iterator[CacheClient]:
config = TEST_CONFIGURATION

if config_fn:
config = config_fn(config)

if middleware_args:
config = config.add_middleware(MomentoLocalMiddleware(middleware_args))

client = CacheClient.create(config, TEST_LOCAL_AUTH_PROVIDER, DEFAULT_TTL_SECONDS)

client.create_cache(cache_name)

yield client


TUniqueCacheName = Callable[[CacheClient], str]


Expand Down
Empty file added tests/momento/local/__init__.py
Empty file.
20 changes: 20 additions & 0 deletions tests/momento/local/momento_error_code_metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from momento.errors import MomentoErrorCode

MOMENTO_ERROR_CODE_TO_METADATA = {
MomentoErrorCode.INVALID_ARGUMENT_ERROR: "invalid-argument",
MomentoErrorCode.UNKNOWN_SERVICE_ERROR: "unknown",
MomentoErrorCode.ALREADY_EXISTS_ERROR: "already-exists",
MomentoErrorCode.NOT_FOUND_ERROR: "not-found",
MomentoErrorCode.INTERNAL_SERVER_ERROR: "internal",
MomentoErrorCode.PERMISSION_ERROR: "permission-denied",
MomentoErrorCode.AUTHENTICATION_ERROR: "unauthenticated",
MomentoErrorCode.CANCELLED_ERROR: "cancelled",
MomentoErrorCode.LIMIT_EXCEEDED_ERROR: "resource-exhausted",
MomentoErrorCode.BAD_REQUEST_ERROR: "invalid-argument",
MomentoErrorCode.TIMEOUT_ERROR: "deadline-exceeded",
MomentoErrorCode.SERVER_UNAVAILABLE: "unavailable",
MomentoErrorCode.CLIENT_RESOURCE_EXHAUSTED: "resource-exhausted",
MomentoErrorCode.FAILED_PRECONDITION_ERROR: "failed-precondition",
MomentoErrorCode.UNKNOWN_ERROR: "unknown",
MomentoErrorCode.CONNECTION_ERROR: "unavailable",
}
110 changes: 110 additions & 0 deletions tests/momento/local/momento_local_async_middleware.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import asyncio
from typing import List

from grpc.aio import Metadata
from momento import logs
from momento.config.middleware import MiddlewareMessage, MiddlewareRequestHandlerContext, MiddlewareStatus
from momento.config.middleware.aio import Middleware, MiddlewareMetadata, MiddlewareRequestHandler

from tests.momento.local.momento_error_code_metadata import MOMENTO_ERROR_CODE_TO_METADATA
from tests.momento.local.momento_local_middleware_args import MomentoLocalMiddlewareArgs
from tests.momento.local.momento_rpc_method import MomentoRpcMethod


class MomentoLocalAsyncMiddlewareRequestHandler(MiddlewareRequestHandler):
def __init__(self, args: MomentoLocalMiddlewareArgs):
self._args = args
self._cache_name = None
self._logger = logs.logger

async def on_request_metadata(self, metadata: MiddlewareMetadata) -> MiddlewareMetadata:
grpc_metadata = metadata.grpc_metadata

if grpc_metadata is not None:
self._set_grpc_metadata(grpc_metadata, "request-id", self._args.request_id)

if self._args.return_error is not None:
error = MOMENTO_ERROR_CODE_TO_METADATA[self._args.return_error]
if error is not None:
self._set_grpc_metadata(grpc_metadata, "return-error", error)

if self._args.error_rpc_list is not None:
rpcs = self._concatenate_rpcs(self._args.error_rpc_list)
self._set_grpc_metadata(grpc_metadata, "error-rpcs", rpcs)

if self._args.delay_rpc_list is not None:
rpcs = self._concatenate_rpcs(self._args.delay_rpc_list)
self._set_grpc_metadata(grpc_metadata, "delay-rpcs", rpcs)

if self._args.error_count is not None:
self._set_grpc_metadata(grpc_metadata, "error-count", str(self._args.error_count))

if self._args.delay_millis is not None:
self._set_grpc_metadata(grpc_metadata, "delay-ms", str(self._args.delay_millis))

if self._args.delay_count is not None:
self._set_grpc_metadata(grpc_metadata, "delay-count", str(self._args.delay_count))

if self._args.stream_error_rpc_list is not None:
rpcs = self._concatenate_rpcs(self._args.stream_error_rpc_list)
self._set_grpc_metadata(grpc_metadata, "stream-error-rpcs", rpcs)

if self._args.stream_error is not None:
error = MOMENTO_ERROR_CODE_TO_METADATA[self._args.stream_error]
if error is not None:
self._set_grpc_metadata(grpc_metadata, "stream-error", error)

if self._args.stream_error_message_limit is not None:
limit_str = str(self._args.stream_error_message_limit)
self._set_grpc_metadata(grpc_metadata, "stream-error-message-limit", limit_str)

cache_name = grpc_metadata.get("cache")
if cache_name is not None:
self._cache_name = cache_name
else:
self._logger.debug("No cache name found in metadata.")

return metadata

async def on_request_body(self, request: MiddlewareMessage) -> MiddlewareMessage:
request_type = request.constructor_name

if self._cache_name is not None:
if self._args.test_metrics_collector is not None: # type: ignore[unreachable]
rpc_method = MomentoRpcMethod.from_request_name(request_type)
if rpc_method:
self._args.test_metrics_collector.add_timestamp(
self._cache_name,
rpc_method,
int(asyncio.get_event_loop().time() * 1000), # Current time in milliseconds
)
else:
self._logger.debug("No cache name available. Timestamp will not be collected.")

return request

async def on_response_metadata(self, metadata: MiddlewareMetadata) -> MiddlewareMetadata:
return metadata

async def on_response_body(self, response: MiddlewareMessage) -> MiddlewareMessage:
return response

async def on_response_status(self, status: MiddlewareStatus) -> MiddlewareStatus:
return status

@staticmethod
def _set_grpc_metadata(metadata: Metadata, key: str, value: str) -> None:
if value is not None:
metadata[key] = value

@staticmethod
def _concatenate_rpcs(rpcs: List[MomentoRpcMethod]) -> str:
return " ".join(rpc.metadata for rpc in rpcs)


class MomentoLocalAsyncMiddleware(Middleware):
def __init__(self, args: MomentoLocalMiddlewareArgs):
self._args = args

async def on_new_request(self, context: MiddlewareRequestHandlerContext) -> MiddlewareRequestHandler:
return MomentoLocalAsyncMiddlewareRequestHandler(self._args)
59 changes: 59 additions & 0 deletions tests/momento/local/momento_local_metrics_collector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from collections import defaultdict
from typing import Dict, List

from tests.momento.local.momento_rpc_method import MomentoRpcMethod


class MomentoLocalMetricsCollector:
def __init__(self) -> None:
# Data structure to store timestamps: cacheName -> requestName -> [timestamps]
self.data: Dict[str, Dict[MomentoRpcMethod, List[int]]] = defaultdict(lambda: defaultdict(list))

def add_timestamp(self, cache_name: str, request_name: MomentoRpcMethod, timestamp: int) -> None:
"""Add a timestamp for a specific request and cache.

Args:
cache_name: The name of the cache
request_name: The name of the request (using MomentoRpcMethod enum)
timestamp: The timestamp to record in seconds since epoch
"""
self.data[cache_name][request_name].append(timestamp)

def get_total_retry_count(self, cache_name: str, request_name: MomentoRpcMethod) -> int:
"""Calculate the total retry count for a specific cache and request.

Args:
cache_name: The name of the cache
request_name: The name of the request (using MomentoRpcMethod enum)

Returns:
The total number of retries
"""
timestamps = self.data.get(cache_name, {}).get(request_name, [])
# Number of retries is one less than the number of timestamps
return max(0, len(timestamps) - 1)

def get_average_time_between_retries(self, cache_name: str, request_name: MomentoRpcMethod) -> float:
"""Calculate the average time between retries for a specific cache and request.

Args:
cache_name: The name of the cache
request_name: The name of the request (using MomentoRpcMethod enum)

Returns:
The average time in seconds, or 0.0 if there are no retries
"""
timestamps = self.data.get(cache_name, {}).get(request_name, [])
if len(timestamps) < 2:
return 0.0 # No retries occurred

total_interval = sum(timestamps[i] - timestamps[i - 1] for i in range(1, len(timestamps)))
return total_interval / (len(timestamps) - 1)

def get_all_metrics(self) -> Dict[str, Dict[MomentoRpcMethod, List[int]]]:
"""Retrieve all collected metrics for debugging or analysis.

Returns:
The complete data structure with all recorded metrics
"""
return self.data
Loading
Loading