From 2dba11975bc2ebcfb900380b1b4a10330bebe150 Mon Sep 17 00:00:00 2001 From: Pete Gautier Date: Thu, 10 Apr 2025 16:36:07 -0700 Subject: [PATCH 1/6] chore: adding fixed timeout retry strategy --- .../internal/aio/_retry_interceptor.py | 26 +++++- .../retry/fixed_timeout_retry_strategy.py | 92 +++++++++++++++++++ src/momento/retry/retry_strategy.py | 13 +++ src/momento/retry/retryable_props.py | 2 + tests/momento/temp/__init__.py | 0 tests/momento/temp/test_get.py | 90 ++++++++++++++++++ 6 files changed, 220 insertions(+), 3 deletions(-) create mode 100644 src/momento/retry/fixed_timeout_retry_strategy.py create mode 100644 tests/momento/temp/__init__.py create mode 100644 tests/momento/temp/test_get.py diff --git a/src/momento/internal/aio/_retry_interceptor.py b/src/momento/internal/aio/_retry_interceptor.py index 8daca89e..2c12e43a 100644 --- a/src/momento/internal/aio/_retry_interceptor.py +++ b/src/momento/internal/aio/_retry_interceptor.py @@ -2,8 +2,10 @@ import asyncio import logging +from datetime import date, datetime, timedelta +import time from typing import Callable - +from pprint import pprint import grpc from momento.retry import RetryableProps, RetryStrategy @@ -22,8 +24,9 @@ class RetryInterceptor(grpc.aio.UnaryUnaryClientInterceptor): - def __init__(self, retry_strategy: RetryStrategy): + def __init__(self, retry_strategy: RetryStrategy, client_timeout: timedelta): self._retry_strategy = retry_strategy + self._client_timeout = client_timeout async def intercept_unary_unary( self, @@ -35,7 +38,22 @@ async def intercept_unary_unary( request: grpc.aio._typing.RequestType, ) -> grpc.aio._call.UnaryUnaryCall | grpc.aio._typing.ResponseType: attempt_number = 1 + overall_deadline = datetime.now() + self._client_timeout + while True: + if attempt_number > 1: + retry_deadline = self._retry_strategy.calculate_retry_deadline( + overall_deadline + ) + if retry_deadline is not None: + client_call_details = grpc.aio._interceptor.ClientCallDetails( + client_call_details.method, + retry_deadline, + client_call_details.metadata, + client_call_details.credentials, + client_call_details.wait_for_ready + ) + call = await continuation(client_call_details, request) response_code = await call.code() @@ -43,7 +61,9 @@ async def intercept_unary_unary( return call retryTime = self._retry_strategy.determine_when_to_retry( - RetryableProps(response_code, client_call_details.method.decode("utf-8"), attempt_number) + RetryableProps( + response_code, client_call_details.method.decode("utf-8"), attempt_number, overall_deadline + ) ) if retryTime is None: diff --git a/src/momento/retry/fixed_timeout_retry_strategy.py b/src/momento/retry/fixed_timeout_retry_strategy.py new file mode 100644 index 00000000..3dedf3cb --- /dev/null +++ b/src/momento/retry/fixed_timeout_retry_strategy.py @@ -0,0 +1,92 @@ +from datetime import datetime, timedelta +import logging +import random +from typing import Optional + +import grpc + +from .default_eligibility_strategy import DefaultEligibilityStrategy +from .eligibility_strategy import EligibilityStrategy +from .retry_strategy import RetryStrategy +from .retryable_props import RetryableProps + +logger = logging.getLogger("fixed-timeout-retry-strategy") + +class FixedTimeoutRetryStrategy(RetryStrategy): + def __init__( + self, *, retry_timeout_millis: int, retry_delay_interval_millis: int, + eligibility_strategy: DefaultEligibilityStrategy = DefaultEligibilityStrategy() + ): + self._eligibility_strategy: EligibilityStrategy = eligibility_strategy + self._retry_timeout_millis: int = retry_timeout_millis + self._retry_delay_interval_millis: int = retry_delay_interval_millis + + def determine_when_to_retry(self, props: RetryableProps) -> Optional[float]: + """Determines whether a grpc call can be retried and how long to wait before that retry. + + Args: + props (RetryableProps): Information about the grpc call, its last invocation, and how many times the call + has been made. + + :Returns + The time in seconds before the next retry should occur or None if no retry should be attempted. + """ + logger.debug( + "Determining whether request is eligible for retry; status code: %s, request type: %s, attemptNumber: %d", + props.grpc_status, # type: ignore[misc] + props.grpc_method, + props.attempt_number, + ) + + # If a retry attempt's timeout has passed but the client's overall timeout has not yet passed, + # we should reset the deadline and retry. + if ( + props.attempt_number > 0 and + props.grpc_status == grpc.StatusCode.DEADLINE_EXCEEDED and + props.overall_deadline > datetime.now() + ): + return self.get_jitter_in_millis(props) + + if self._eligibility_strategy.is_eligible_for_retry(props) is False: + logger.debug( + "Request path: %s; retryable status code: %s. Request is not retryable.", + props.grpc_method, + props.grpc_status, # type: ignore[misc] + ) + return None + + return self.get_jitter_in_millis(props) + + def get_jitter_in_millis(self, props: RetryableProps) -> float: + timeout_with_jitter = self.add_jitter( + self._retry_delay_interval_millis + ) + logger.debug("Determined request is retryable; retrying after %d ms: [method: %s, status: %s, attempt: %d]", + timeout_with_jitter, + props.grpc_method, + props.grpc_status, + props.attempt_number + ) + return timeout_with_jitter / 1000.0 + + def add_jitter(self, base_delay: int) -> int: + return int((0.2*random.random() + 0.9) * float(base_delay)) + + def calculate_retry_deadline(self, overall_deadline: datetime) -> Optional[float]: + """Calculates the deadline for a retry attempt using the retry timeout, but clips it to the overall + deadline if the overall deadline is sooner. + + Args: + overall_deadline (datetime): The overall deadline for the operation. + + Returns: + float: The calculated retry deadline. + """ + logger.debug( + f'Calculating retry deadline:\nnow: {datetime.now()}\noverall deadline: {overall_deadline}\n' + + f'retry timeout millis: {self._retry_timeout_millis}' + ) + if datetime.now() + timedelta(milliseconds=self._retry_timeout_millis) > overall_deadline: + return (overall_deadline - datetime.now()).total_seconds() * 1000 + return self._retry_timeout_millis + \ No newline at end of file diff --git a/src/momento/retry/retry_strategy.py b/src/momento/retry/retry_strategy.py index 26b51c94..f5745ebc 100644 --- a/src/momento/retry/retry_strategy.py +++ b/src/momento/retry/retry_strategy.py @@ -1,4 +1,5 @@ from abc import ABC, abstractmethod +from datetime import datetime from typing import Optional from .retryable_props import RetryableProps @@ -8,3 +9,15 @@ class RetryStrategy(ABC): @abstractmethod def determine_when_to_retry(self, props: RetryableProps) -> Optional[float]: pass + + def calculate_retry_deadline(overall_deadline: datetime) -> Optional[float]: + """Calculates the deadline for a retry attempt using the retry timeout, but clips it to the overall + deadline if the overall deadline is sooner. + + Args: + overall_deadline (float): The overall deadline for the operation. + + Returns: + float: The calculated retry deadline. + """ + return None diff --git a/src/momento/retry/retryable_props.py b/src/momento/retry/retryable_props.py index 3832f193..f9c54f19 100644 --- a/src/momento/retry/retryable_props.py +++ b/src/momento/retry/retryable_props.py @@ -1,4 +1,5 @@ from dataclasses import dataclass +from datetime import datetime import grpc @@ -9,3 +10,4 @@ class RetryableProps: grpc_status: grpc.StatusCode grpc_method: str attempt_number: int + overall_deadline: datetime = None diff --git a/tests/momento/temp/__init__.py b/tests/momento/temp/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/momento/temp/test_get.py b/tests/momento/temp/test_get.py new file mode 100644 index 00000000..722ef054 --- /dev/null +++ b/tests/momento/temp/test_get.py @@ -0,0 +1,90 @@ + +from datetime import timedelta +from pprint import pprint +from momento.auth.credential_provider import CredentialProvider +from momento.cache_client_async import CacheClientAsync +from momento.config.configurations import Configurations +from momento.errors.error_details import MomentoErrorCode +from momento.responses.control.cache.list import ListCaches +from momento.responses.data.scalar.get import CacheGet +from momento.responses.data.scalar.set import CacheSet +from momento.retry.fixed_timeout_retry_strategy import FixedTimeoutRetryStrategy +from momento.retry.retry_strategy import RetryStrategy + + +async def create_cache(client: CacheClientAsync) -> None: + print("Creating cache:") + create_cache_response = await client.create_cache("cache") + match create_cache_response: + case CacheSet.Success() as success: + print(f"Cache created successfully: {success.cache_name!r}") + case CacheSet.Error() as error: + print(f"Error creating cache: {error.message}") + case _: + print("Unreachable") + print("") + +async def list_caches(client) -> None: + print("Listing caches:") + list_caches_response = await client.list_caches() + match list_caches_response: + case ListCaches.Success() as success: + for cache_info in success.caches: + print(f"- {cache_info.name!r}") + case ListCaches.Error() as error: + print(f"Error listing caches: {error.message}") + case _: + print("Unreachable") + print("") + +async def set_key(client: CacheClientAsync) -> None: + cache_name = "cache" + set_resp = await client.set(cache_name, "key", "value") + pprint(set_resp) + if isinstance(set_resp, CacheSet.Error): + pprint(set_resp) + raise set_resp.inner_exception.with_traceback(None) + elif isinstance(set_resp, CacheSet.Success): + print("=======SUCCESS=======") + pprint(set_resp) + +def describe_get() -> None: + async def it_should_return_error() -> None: + credential_provider = CredentialProvider.from_environment_variable("MOMENTO_API_KEY") + ttl = timedelta(seconds=600) + config = { + 'configuration': Configurations.Laptop.v1().with_retry_strategy( + FixedTimeoutRetryStrategy(retry_timeout_millis=1000, retry_delay_interval_millis=100) + ).with_client_timeout(timedelta(seconds=3)), + 'credential_provider': credential_provider, + 'default_ttl': ttl, + } + client = await CacheClientAsync.create(**config) + + # These are here mostly to make sure control plane calls are working + # and that the cache is created with the expected key set. + + await create_cache(client) + await list_caches(client) + await set_key(client) + + ################ + # To test the retry strategy, use a cache name for a cache that does not exist + # and uncomment the NOT_FOUIND status code in the default_eligibility_strategy.py file. + ################ + + cache_name = "cachexyz" + get_resp = await client.get(cache_name, "key") + pprint(get_resp) + if isinstance(get_resp, CacheGet.Error): + pprint(get_resp) + assert get_resp.error_code == MomentoErrorCode.NOT_FOUND_ERROR + elif isinstance(get_resp, CacheGet.Miss): + print("=======MISS=======") + pprint(get_resp) + assert True + elif isinstance(get_resp, CacheGet.Hit): + print("=======HIT=======") + pprint(get_resp.value_string) + assert get_resp.value_string == "value" + assert False From c8d8732d2745e3158d5c2740c851d410ee705f1f Mon Sep 17 00:00:00 2001 From: Pete Gautier Date: Fri, 11 Apr 2025 16:36:32 -0700 Subject: [PATCH 2/6] chore: grab client timeout out of client call details --- src/momento/internal/aio/_retry_interceptor.py | 13 +++++++++---- src/momento/retry/retry_strategy.py | 12 ++---------- 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/src/momento/internal/aio/_retry_interceptor.py b/src/momento/internal/aio/_retry_interceptor.py index 2c12e43a..53b5627a 100644 --- a/src/momento/internal/aio/_retry_interceptor.py +++ b/src/momento/internal/aio/_retry_interceptor.py @@ -24,9 +24,8 @@ class RetryInterceptor(grpc.aio.UnaryUnaryClientInterceptor): - def __init__(self, retry_strategy: RetryStrategy, client_timeout: timedelta): + def __init__(self, retry_strategy: RetryStrategy): self._retry_strategy = retry_strategy - self._client_timeout = client_timeout async def intercept_unary_unary( self, @@ -38,13 +37,18 @@ async def intercept_unary_unary( request: grpc.aio._typing.RequestType, ) -> grpc.aio._call.UnaryUnaryCall | grpc.aio._typing.ResponseType: attempt_number = 1 - overall_deadline = datetime.now() + self._client_timeout + # the overall deadline is the timeout set on the client call details + overall_deadline = datetime.now() + timedelta(seconds=client_call_details.timeout) + # variable to capture the penultimate call to a deadline-aware retry strategy, which + # will hold the call object before a terminal DEADLINE_EXCEEDED response is returned + last_call = None while True: if attempt_number > 1: retry_deadline = self._retry_strategy.calculate_retry_deadline( overall_deadline ) + print("===> retry deadline", retry_deadline) if retry_deadline is not None: client_call_details = grpc.aio._interceptor.ClientCallDetails( client_call_details.method, @@ -53,6 +57,7 @@ async def intercept_unary_unary( client_call_details.credentials, client_call_details.wait_for_ready ) + last_call = call call = await continuation(client_call_details, request) response_code = await call.code() @@ -67,7 +72,7 @@ async def intercept_unary_unary( ) if retryTime is None: - return call + return last_call or call attempt_number += 1 await asyncio.sleep(retryTime) diff --git a/src/momento/retry/retry_strategy.py b/src/momento/retry/retry_strategy.py index f5745ebc..ea7ea004 100644 --- a/src/momento/retry/retry_strategy.py +++ b/src/momento/retry/retry_strategy.py @@ -10,14 +10,6 @@ class RetryStrategy(ABC): def determine_when_to_retry(self, props: RetryableProps) -> Optional[float]: pass - def calculate_retry_deadline(overall_deadline: datetime) -> Optional[float]: - """Calculates the deadline for a retry attempt using the retry timeout, but clips it to the overall - deadline if the overall deadline is sooner. - - Args: - overall_deadline (float): The overall deadline for the operation. - - Returns: - float: The calculated retry deadline. - """ + # Currently used only by the FixedTimeoutRetryStrategy + def calculate_retry_deadline(self, overall_deadline: datetime) -> Optional[float]: return None From 4f606e2e2afe1190ccee874928adf8768eae42a1 Mon Sep 17 00:00:00 2001 From: Pete Gautier Date: Fri, 11 Apr 2025 16:44:55 -0700 Subject: [PATCH 3/6] chore: update synch retry interceptor --- .../internal/aio/_retry_interceptor.py | 6 +- .../synchronous/_retry_interceptor.py | 27 +++++- .../retry/fixed_timeout_retry_strategy.py | 3 +- tests/momento/temp/__init__.py | 0 tests/momento/temp/test_get.py | 90 ------------------- 5 files changed, 28 insertions(+), 98 deletions(-) delete mode 100644 tests/momento/temp/__init__.py delete mode 100644 tests/momento/temp/test_get.py diff --git a/src/momento/internal/aio/_retry_interceptor.py b/src/momento/internal/aio/_retry_interceptor.py index 53b5627a..b016e656 100644 --- a/src/momento/internal/aio/_retry_interceptor.py +++ b/src/momento/internal/aio/_retry_interceptor.py @@ -5,7 +5,6 @@ from datetime import date, datetime, timedelta import time from typing import Callable -from pprint import pprint import grpc from momento.retry import RetryableProps, RetryStrategy @@ -37,8 +36,8 @@ async def intercept_unary_unary( request: grpc.aio._typing.RequestType, ) -> grpc.aio._call.UnaryUnaryCall | grpc.aio._typing.ResponseType: attempt_number = 1 - # the overall deadline is the timeout set on the client call details - overall_deadline = datetime.now() + timedelta(seconds=client_call_details.timeout) + # the overall deadline is calculated from the timeout set on the client call details + overall_deadline = datetime.now() + timedelta(seconds=client_call_details.timeout or 0.0) # variable to capture the penultimate call to a deadline-aware retry strategy, which # will hold the call object before a terminal DEADLINE_EXCEEDED response is returned last_call = None @@ -48,7 +47,6 @@ async def intercept_unary_unary( retry_deadline = self._retry_strategy.calculate_retry_deadline( overall_deadline ) - print("===> retry deadline", retry_deadline) if retry_deadline is not None: client_call_details = grpc.aio._interceptor.ClientCallDetails( client_call_details.method, diff --git a/src/momento/internal/synchronous/_retry_interceptor.py b/src/momento/internal/synchronous/_retry_interceptor.py index 7f58c866..05a1750c 100644 --- a/src/momento/internal/synchronous/_retry_interceptor.py +++ b/src/momento/internal/synchronous/_retry_interceptor.py @@ -1,5 +1,6 @@ from __future__ import annotations +from datetime import datetime, timedelta import logging import time from typing import Callable, TypeVar @@ -36,7 +37,27 @@ def intercept_unary_unary( request: RequestType, ) -> InterceptorCall | ResponseType: attempt_number = 1 + # the overall deadline is calculated from the timeout set on the client call details + overall_deadline = datetime.now() + timedelta(seconds=client_call_details.timeout or 0.0) + # variable to capture the penultimate call to a deadline-aware retry strategy, which + # will hold the call object before a terminal DEADLINE_EXCEEDED response is returned + last_call = None + while True: + if attempt_number > 1: + retry_deadline = self._retry_strategy.calculate_retry_deadline( + overall_deadline + ) + if retry_deadline is not None: + client_call_details = grpc.aio._interceptor.ClientCallDetails( + client_call_details.method, + retry_deadline, + client_call_details.metadata, + client_call_details.credentials, + client_call_details.wait_for_ready + ) + last_call = call + call = continuation(client_call_details, request) response_code = call.code() # type: ignore[attr-defined] # noqa: F401 @@ -44,11 +65,13 @@ def intercept_unary_unary( return call retryTime = self._retry_strategy.determine_when_to_retry( - RetryableProps(response_code, client_call_details.method, attempt_number) + RetryableProps( + response_code, client_call_details.method.decode("utf-8"), attempt_number, overall_deadline + ) ) if retryTime is None: - return call + return last_call or call attempt_number += 1 time.sleep(retryTime) diff --git a/src/momento/retry/fixed_timeout_retry_strategy.py b/src/momento/retry/fixed_timeout_retry_strategy.py index 3dedf3cb..fe5b586b 100644 --- a/src/momento/retry/fixed_timeout_retry_strategy.py +++ b/src/momento/retry/fixed_timeout_retry_strategy.py @@ -39,7 +39,7 @@ def determine_when_to_retry(self, props: RetryableProps) -> Optional[float]: ) # If a retry attempt's timeout has passed but the client's overall timeout has not yet passed, - # we should reset the deadline and retry. + # we should reset the deadline and retry. if ( props.attempt_number > 0 and props.grpc_status == grpc.StatusCode.DEADLINE_EXCEEDED and @@ -89,4 +89,3 @@ def calculate_retry_deadline(self, overall_deadline: datetime) -> Optional[float if datetime.now() + timedelta(milliseconds=self._retry_timeout_millis) > overall_deadline: return (overall_deadline - datetime.now()).total_seconds() * 1000 return self._retry_timeout_millis - \ No newline at end of file diff --git a/tests/momento/temp/__init__.py b/tests/momento/temp/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/momento/temp/test_get.py b/tests/momento/temp/test_get.py deleted file mode 100644 index 722ef054..00000000 --- a/tests/momento/temp/test_get.py +++ /dev/null @@ -1,90 +0,0 @@ - -from datetime import timedelta -from pprint import pprint -from momento.auth.credential_provider import CredentialProvider -from momento.cache_client_async import CacheClientAsync -from momento.config.configurations import Configurations -from momento.errors.error_details import MomentoErrorCode -from momento.responses.control.cache.list import ListCaches -from momento.responses.data.scalar.get import CacheGet -from momento.responses.data.scalar.set import CacheSet -from momento.retry.fixed_timeout_retry_strategy import FixedTimeoutRetryStrategy -from momento.retry.retry_strategy import RetryStrategy - - -async def create_cache(client: CacheClientAsync) -> None: - print("Creating cache:") - create_cache_response = await client.create_cache("cache") - match create_cache_response: - case CacheSet.Success() as success: - print(f"Cache created successfully: {success.cache_name!r}") - case CacheSet.Error() as error: - print(f"Error creating cache: {error.message}") - case _: - print("Unreachable") - print("") - -async def list_caches(client) -> None: - print("Listing caches:") - list_caches_response = await client.list_caches() - match list_caches_response: - case ListCaches.Success() as success: - for cache_info in success.caches: - print(f"- {cache_info.name!r}") - case ListCaches.Error() as error: - print(f"Error listing caches: {error.message}") - case _: - print("Unreachable") - print("") - -async def set_key(client: CacheClientAsync) -> None: - cache_name = "cache" - set_resp = await client.set(cache_name, "key", "value") - pprint(set_resp) - if isinstance(set_resp, CacheSet.Error): - pprint(set_resp) - raise set_resp.inner_exception.with_traceback(None) - elif isinstance(set_resp, CacheSet.Success): - print("=======SUCCESS=======") - pprint(set_resp) - -def describe_get() -> None: - async def it_should_return_error() -> None: - credential_provider = CredentialProvider.from_environment_variable("MOMENTO_API_KEY") - ttl = timedelta(seconds=600) - config = { - 'configuration': Configurations.Laptop.v1().with_retry_strategy( - FixedTimeoutRetryStrategy(retry_timeout_millis=1000, retry_delay_interval_millis=100) - ).with_client_timeout(timedelta(seconds=3)), - 'credential_provider': credential_provider, - 'default_ttl': ttl, - } - client = await CacheClientAsync.create(**config) - - # These are here mostly to make sure control plane calls are working - # and that the cache is created with the expected key set. - - await create_cache(client) - await list_caches(client) - await set_key(client) - - ################ - # To test the retry strategy, use a cache name for a cache that does not exist - # and uncomment the NOT_FOUIND status code in the default_eligibility_strategy.py file. - ################ - - cache_name = "cachexyz" - get_resp = await client.get(cache_name, "key") - pprint(get_resp) - if isinstance(get_resp, CacheGet.Error): - pprint(get_resp) - assert get_resp.error_code == MomentoErrorCode.NOT_FOUND_ERROR - elif isinstance(get_resp, CacheGet.Miss): - print("=======MISS=======") - pprint(get_resp) - assert True - elif isinstance(get_resp, CacheGet.Hit): - print("=======HIT=======") - pprint(get_resp.value_string) - assert get_resp.value_string == "value" - assert False From dd2e23b1d366208bc5ded9c69f5cd9e88aa82a01 Mon Sep 17 00:00:00 2001 From: Pete Gautier Date: Fri, 11 Apr 2025 17:05:23 -0700 Subject: [PATCH 4/6] chore: linting --- .../internal/aio/_retry_interceptor.py | 11 +++-- .../synchronous/_retry_interceptor.py | 11 +++-- .../retry/fixed_timeout_retry_strategy.py | 42 +++++++++++-------- src/momento/retry/retryable_props.py | 3 +- 4 files changed, 36 insertions(+), 31 deletions(-) diff --git a/src/momento/internal/aio/_retry_interceptor.py b/src/momento/internal/aio/_retry_interceptor.py index b016e656..f4eb08d6 100644 --- a/src/momento/internal/aio/_retry_interceptor.py +++ b/src/momento/internal/aio/_retry_interceptor.py @@ -2,9 +2,9 @@ import asyncio import logging -from datetime import date, datetime, timedelta -import time +from datetime import datetime, timedelta from typing import Callable + import grpc from momento.retry import RetryableProps, RetryStrategy @@ -35,6 +35,7 @@ async def intercept_unary_unary( client_call_details: grpc.aio._interceptor.ClientCallDetails, request: grpc.aio._typing.RequestType, ) -> grpc.aio._call.UnaryUnaryCall | grpc.aio._typing.ResponseType: + call = None attempt_number = 1 # the overall deadline is calculated from the timeout set on the client call details overall_deadline = datetime.now() + timedelta(seconds=client_call_details.timeout or 0.0) @@ -44,16 +45,14 @@ async def intercept_unary_unary( while True: if attempt_number > 1: - retry_deadline = self._retry_strategy.calculate_retry_deadline( - overall_deadline - ) + retry_deadline = self._retry_strategy.calculate_retry_deadline(overall_deadline) if retry_deadline is not None: client_call_details = grpc.aio._interceptor.ClientCallDetails( client_call_details.method, retry_deadline, client_call_details.metadata, client_call_details.credentials, - client_call_details.wait_for_ready + client_call_details.wait_for_ready, ) last_call = call diff --git a/src/momento/internal/synchronous/_retry_interceptor.py b/src/momento/internal/synchronous/_retry_interceptor.py index 05a1750c..30093bbe 100644 --- a/src/momento/internal/synchronous/_retry_interceptor.py +++ b/src/momento/internal/synchronous/_retry_interceptor.py @@ -1,8 +1,8 @@ from __future__ import annotations -from datetime import datetime, timedelta import logging import time +from datetime import datetime, timedelta from typing import Callable, TypeVar import grpc @@ -36,25 +36,24 @@ def intercept_unary_unary( client_call_details: grpc.ClientCallDetails, request: RequestType, ) -> InterceptorCall | ResponseType: + call = None attempt_number = 1 # the overall deadline is calculated from the timeout set on the client call details overall_deadline = datetime.now() + timedelta(seconds=client_call_details.timeout or 0.0) # variable to capture the penultimate call to a deadline-aware retry strategy, which # will hold the call object before a terminal DEADLINE_EXCEEDED response is returned last_call = None - + while True: if attempt_number > 1: - retry_deadline = self._retry_strategy.calculate_retry_deadline( - overall_deadline - ) + retry_deadline = self._retry_strategy.calculate_retry_deadline(overall_deadline) if retry_deadline is not None: client_call_details = grpc.aio._interceptor.ClientCallDetails( client_call_details.method, retry_deadline, client_call_details.metadata, client_call_details.credentials, - client_call_details.wait_for_ready + client_call_details.wait_for_ready, ) last_call = call diff --git a/src/momento/retry/fixed_timeout_retry_strategy.py b/src/momento/retry/fixed_timeout_retry_strategy.py index fe5b586b..52b465ca 100644 --- a/src/momento/retry/fixed_timeout_retry_strategy.py +++ b/src/momento/retry/fixed_timeout_retry_strategy.py @@ -1,6 +1,6 @@ -from datetime import datetime, timedelta import logging import random +from datetime import datetime, timedelta from typing import Optional import grpc @@ -12,10 +12,14 @@ logger = logging.getLogger("fixed-timeout-retry-strategy") + class FixedTimeoutRetryStrategy(RetryStrategy): def __init__( - self, *, retry_timeout_millis: int, retry_delay_interval_millis: int, - eligibility_strategy: DefaultEligibilityStrategy = DefaultEligibilityStrategy() + self, + *, + retry_timeout_millis: int, + retry_delay_interval_millis: int, + eligibility_strategy: DefaultEligibilityStrategy = DefaultEligibilityStrategy(), ): self._eligibility_strategy: EligibilityStrategy = eligibility_strategy self._retry_timeout_millis: int = retry_timeout_millis @@ -38,12 +42,16 @@ def determine_when_to_retry(self, props: RetryableProps) -> Optional[float]: props.attempt_number, ) + if props.overall_deadline is None: + logger.debug("Overall deadline is None; not retrying.") + return None + # If a retry attempt's timeout has passed but the client's overall timeout has not yet passed, # we should reset the deadline and retry. if ( - props.attempt_number > 0 and - props.grpc_status == grpc.StatusCode.DEADLINE_EXCEEDED and - props.overall_deadline > datetime.now() + props.attempt_number > 0 + and props.grpc_status == grpc.StatusCode.DEADLINE_EXCEEDED # type: ignore[misc] + and props.overall_deadline > datetime.now() ): return self.get_jitter_in_millis(props) @@ -54,27 +62,25 @@ def determine_when_to_retry(self, props: RetryableProps) -> Optional[float]: props.grpc_status, # type: ignore[misc] ) return None - + return self.get_jitter_in_millis(props) def get_jitter_in_millis(self, props: RetryableProps) -> float: - timeout_with_jitter = self.add_jitter( - self._retry_delay_interval_millis - ) - logger.debug("Determined request is retryable; retrying after %d ms: [method: %s, status: %s, attempt: %d]", + timeout_with_jitter = self.add_jitter(self._retry_delay_interval_millis) + logger.debug( + "Determined request is retryable; retrying after %d ms: [method: %s, status: %s, attempt: %d]", timeout_with_jitter, props.grpc_method, - props.grpc_status, - props.attempt_number + props.grpc_status, # type: ignore[misc] + props.attempt_number, ) return timeout_with_jitter / 1000.0 def add_jitter(self, base_delay: int) -> int: - return int((0.2*random.random() + 0.9) * float(base_delay)) + return int((0.2 * random.random() + 0.9) * float(base_delay)) def calculate_retry_deadline(self, overall_deadline: datetime) -> Optional[float]: - """Calculates the deadline for a retry attempt using the retry timeout, but clips it to the overall - deadline if the overall deadline is sooner. + """Calculates the deadline for a retry attempt using the retry timeout, but clips it to the overall deadline if the overall deadline is sooner. Args: overall_deadline (datetime): The overall deadline for the operation. @@ -83,8 +89,8 @@ def calculate_retry_deadline(self, overall_deadline: datetime) -> Optional[float float: The calculated retry deadline. """ logger.debug( - f'Calculating retry deadline:\nnow: {datetime.now()}\noverall deadline: {overall_deadline}\n' + - f'retry timeout millis: {self._retry_timeout_millis}' + f"Calculating retry deadline:\nnow: {datetime.now()}\noverall deadline: {overall_deadline}\n" + + f"retry timeout millis: {self._retry_timeout_millis}" ) if datetime.now() + timedelta(milliseconds=self._retry_timeout_millis) > overall_deadline: return (overall_deadline - datetime.now()).total_seconds() * 1000 diff --git a/src/momento/retry/retryable_props.py b/src/momento/retry/retryable_props.py index f9c54f19..55c0b208 100644 --- a/src/momento/retry/retryable_props.py +++ b/src/momento/retry/retryable_props.py @@ -1,5 +1,6 @@ from dataclasses import dataclass from datetime import datetime +from typing import Optional import grpc @@ -10,4 +11,4 @@ class RetryableProps: grpc_status: grpc.StatusCode grpc_method: str attempt_number: int - overall_deadline: datetime = None + overall_deadline: Optional[datetime] = None From 948841f785143075c0c287c203bca8dae1b2a29c Mon Sep 17 00:00:00 2001 From: Pete Gautier Date: Tue, 15 Apr 2025 11:11:43 -0700 Subject: [PATCH 5/6] chore: remove unnecessary decoding logic and leave a comment --- src/momento/internal/aio/_retry_interceptor.py | 2 ++ src/momento/internal/synchronous/_retry_interceptor.py | 6 +++--- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/momento/internal/aio/_retry_interceptor.py b/src/momento/internal/aio/_retry_interceptor.py index f4eb08d6..cd407685 100644 --- a/src/momento/internal/aio/_retry_interceptor.py +++ b/src/momento/internal/aio/_retry_interceptor.py @@ -63,6 +63,8 @@ async def intercept_unary_unary( return call retryTime = self._retry_strategy.determine_when_to_retry( + # Note: the async interceptor gets `client_call_details.method` as a binary string that needs to be decoded + # but the sync interceptor gets it as a string. RetryableProps( response_code, client_call_details.method.decode("utf-8"), attempt_number, overall_deadline ) diff --git a/src/momento/internal/synchronous/_retry_interceptor.py b/src/momento/internal/synchronous/_retry_interceptor.py index 30093bbe..0540f310 100644 --- a/src/momento/internal/synchronous/_retry_interceptor.py +++ b/src/momento/internal/synchronous/_retry_interceptor.py @@ -64,9 +64,9 @@ def intercept_unary_unary( return call retryTime = self._retry_strategy.determine_when_to_retry( - RetryableProps( - response_code, client_call_details.method.decode("utf-8"), attempt_number, overall_deadline - ) + # Note: the async interceptor gets `client_call_details.method` as a binary string that needs to be decoded + # but the sync interceptor gets it as a string. + RetryableProps(response_code, client_call_details.method, attempt_number, overall_deadline) ) if retryTime is None: From 4af3f4dcc0449ca2be807dcf1f981bcd05e67a0e Mon Sep 17 00:00:00 2001 From: Pete Gautier Date: Fri, 18 Apr 2025 15:56:07 -0700 Subject: [PATCH 6/6] chore: add comment about client call details value --- src/momento/internal/aio/_retry_interceptor.py | 3 ++- src/momento/internal/synchronous/_retry_interceptor.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/momento/internal/aio/_retry_interceptor.py b/src/momento/internal/aio/_retry_interceptor.py index cd407685..beef0398 100644 --- a/src/momento/internal/aio/_retry_interceptor.py +++ b/src/momento/internal/aio/_retry_interceptor.py @@ -37,7 +37,8 @@ async def intercept_unary_unary( ) -> grpc.aio._call.UnaryUnaryCall | grpc.aio._typing.ResponseType: call = None attempt_number = 1 - # the overall deadline is calculated from the timeout set on the client call details + # The overall deadline is calculated from the timeout set on the client call details. + # That value is set in our gRPC configurations and, while typed as optional, will never be None here. overall_deadline = datetime.now() + timedelta(seconds=client_call_details.timeout or 0.0) # variable to capture the penultimate call to a deadline-aware retry strategy, which # will hold the call object before a terminal DEADLINE_EXCEEDED response is returned diff --git a/src/momento/internal/synchronous/_retry_interceptor.py b/src/momento/internal/synchronous/_retry_interceptor.py index 0540f310..4b4d5cc8 100644 --- a/src/momento/internal/synchronous/_retry_interceptor.py +++ b/src/momento/internal/synchronous/_retry_interceptor.py @@ -38,7 +38,8 @@ def intercept_unary_unary( ) -> InterceptorCall | ResponseType: call = None attempt_number = 1 - # the overall deadline is calculated from the timeout set on the client call details + # The overall deadline is calculated from the timeout set on the client call details. + # That value is set in our gRPC configurations and, while typed as optional, will never be None here. overall_deadline = datetime.now() + timedelta(seconds=client_call_details.timeout or 0.0) # variable to capture the penultimate call to a deadline-aware retry strategy, which # will hold the call object before a terminal DEADLINE_EXCEEDED response is returned