diff --git a/src/momento/internal/aio/_retry_interceptor.py b/src/momento/internal/aio/_retry_interceptor.py index 8daca89e..beef0398 100644 --- a/src/momento/internal/aio/_retry_interceptor.py +++ b/src/momento/internal/aio/_retry_interceptor.py @@ -2,6 +2,7 @@ import asyncio import logging +from datetime import datetime, timedelta from typing import Callable import grpc @@ -34,8 +35,28 @@ async def intercept_unary_unary( client_call_details: grpc.aio._interceptor.ClientCallDetails, request: grpc.aio._typing.RequestType, ) -> grpc.aio._call.UnaryUnaryCall | grpc.aio._typing.ResponseType: + call = None attempt_number = 1 + # The overall deadline is calculated from the timeout set on the client call details. + # That value is set in our gRPC configurations and, while typed as optional, will never be None here. + overall_deadline = datetime.now() + timedelta(seconds=client_call_details.timeout or 0.0) + # variable to capture the penultimate call to a deadline-aware retry strategy, which + # will hold the call object before a terminal DEADLINE_EXCEEDED response is returned + last_call = None + while True: + if attempt_number > 1: + retry_deadline = self._retry_strategy.calculate_retry_deadline(overall_deadline) + if retry_deadline is not None: + client_call_details = grpc.aio._interceptor.ClientCallDetails( + client_call_details.method, + retry_deadline, + client_call_details.metadata, + client_call_details.credentials, + client_call_details.wait_for_ready, + ) + last_call = call + call = await continuation(client_call_details, request) response_code = await call.code() @@ -43,11 +64,15 @@ async def intercept_unary_unary( return call retryTime = self._retry_strategy.determine_when_to_retry( - RetryableProps(response_code, client_call_details.method.decode("utf-8"), attempt_number) + # Note: the async interceptor gets `client_call_details.method` as a binary string that needs to be decoded + # but the sync interceptor gets it as a string. + RetryableProps( + response_code, client_call_details.method.decode("utf-8"), attempt_number, overall_deadline + ) ) if retryTime is None: - return call + return last_call or call attempt_number += 1 await asyncio.sleep(retryTime) diff --git a/src/momento/internal/synchronous/_retry_interceptor.py b/src/momento/internal/synchronous/_retry_interceptor.py index 7f58c866..4b4d5cc8 100644 --- a/src/momento/internal/synchronous/_retry_interceptor.py +++ b/src/momento/internal/synchronous/_retry_interceptor.py @@ -2,6 +2,7 @@ import logging import time +from datetime import datetime, timedelta from typing import Callable, TypeVar import grpc @@ -35,8 +36,28 @@ def intercept_unary_unary( client_call_details: grpc.ClientCallDetails, request: RequestType, ) -> InterceptorCall | ResponseType: + call = None attempt_number = 1 + # The overall deadline is calculated from the timeout set on the client call details. + # That value is set in our gRPC configurations and, while typed as optional, will never be None here. + overall_deadline = datetime.now() + timedelta(seconds=client_call_details.timeout or 0.0) + # variable to capture the penultimate call to a deadline-aware retry strategy, which + # will hold the call object before a terminal DEADLINE_EXCEEDED response is returned + last_call = None + while True: + if attempt_number > 1: + retry_deadline = self._retry_strategy.calculate_retry_deadline(overall_deadline) + if retry_deadline is not None: + client_call_details = grpc.aio._interceptor.ClientCallDetails( + client_call_details.method, + retry_deadline, + client_call_details.metadata, + client_call_details.credentials, + client_call_details.wait_for_ready, + ) + last_call = call + call = continuation(client_call_details, request) response_code = call.code() # type: ignore[attr-defined] # noqa: F401 @@ -44,11 +65,13 @@ def intercept_unary_unary( return call retryTime = self._retry_strategy.determine_when_to_retry( - RetryableProps(response_code, client_call_details.method, attempt_number) + # Note: the async interceptor gets `client_call_details.method` as a binary string that needs to be decoded + # but the sync interceptor gets it as a string. + RetryableProps(response_code, client_call_details.method, attempt_number, overall_deadline) ) if retryTime is None: - return call + return last_call or call attempt_number += 1 time.sleep(retryTime) diff --git a/src/momento/retry/fixed_timeout_retry_strategy.py b/src/momento/retry/fixed_timeout_retry_strategy.py new file mode 100644 index 00000000..52b465ca --- /dev/null +++ b/src/momento/retry/fixed_timeout_retry_strategy.py @@ -0,0 +1,97 @@ +import logging +import random +from datetime import datetime, timedelta +from typing import Optional + +import grpc + +from .default_eligibility_strategy import DefaultEligibilityStrategy +from .eligibility_strategy import EligibilityStrategy +from .retry_strategy import RetryStrategy +from .retryable_props import RetryableProps + +logger = logging.getLogger("fixed-timeout-retry-strategy") + + +class FixedTimeoutRetryStrategy(RetryStrategy): + def __init__( + self, + *, + retry_timeout_millis: int, + retry_delay_interval_millis: int, + eligibility_strategy: DefaultEligibilityStrategy = DefaultEligibilityStrategy(), + ): + self._eligibility_strategy: EligibilityStrategy = eligibility_strategy + self._retry_timeout_millis: int = retry_timeout_millis + self._retry_delay_interval_millis: int = retry_delay_interval_millis + + def determine_when_to_retry(self, props: RetryableProps) -> Optional[float]: + """Determines whether a grpc call can be retried and how long to wait before that retry. + + Args: + props (RetryableProps): Information about the grpc call, its last invocation, and how many times the call + has been made. + + :Returns + The time in seconds before the next retry should occur or None if no retry should be attempted. + """ + logger.debug( + "Determining whether request is eligible for retry; status code: %s, request type: %s, attemptNumber: %d", + props.grpc_status, # type: ignore[misc] + props.grpc_method, + props.attempt_number, + ) + + if props.overall_deadline is None: + logger.debug("Overall deadline is None; not retrying.") + return None + + # If a retry attempt's timeout has passed but the client's overall timeout has not yet passed, + # we should reset the deadline and retry. + if ( + props.attempt_number > 0 + and props.grpc_status == grpc.StatusCode.DEADLINE_EXCEEDED # type: ignore[misc] + and props.overall_deadline > datetime.now() + ): + return self.get_jitter_in_millis(props) + + if self._eligibility_strategy.is_eligible_for_retry(props) is False: + logger.debug( + "Request path: %s; retryable status code: %s. Request is not retryable.", + props.grpc_method, + props.grpc_status, # type: ignore[misc] + ) + return None + + return self.get_jitter_in_millis(props) + + def get_jitter_in_millis(self, props: RetryableProps) -> float: + timeout_with_jitter = self.add_jitter(self._retry_delay_interval_millis) + logger.debug( + "Determined request is retryable; retrying after %d ms: [method: %s, status: %s, attempt: %d]", + timeout_with_jitter, + props.grpc_method, + props.grpc_status, # type: ignore[misc] + props.attempt_number, + ) + return timeout_with_jitter / 1000.0 + + def add_jitter(self, base_delay: int) -> int: + return int((0.2 * random.random() + 0.9) * float(base_delay)) + + def calculate_retry_deadline(self, overall_deadline: datetime) -> Optional[float]: + """Calculates the deadline for a retry attempt using the retry timeout, but clips it to the overall deadline if the overall deadline is sooner. + + Args: + overall_deadline (datetime): The overall deadline for the operation. + + Returns: + float: The calculated retry deadline. + """ + logger.debug( + f"Calculating retry deadline:\nnow: {datetime.now()}\noverall deadline: {overall_deadline}\n" + + f"retry timeout millis: {self._retry_timeout_millis}" + ) + if datetime.now() + timedelta(milliseconds=self._retry_timeout_millis) > overall_deadline: + return (overall_deadline - datetime.now()).total_seconds() * 1000 + return self._retry_timeout_millis diff --git a/src/momento/retry/retry_strategy.py b/src/momento/retry/retry_strategy.py index 26b51c94..ea7ea004 100644 --- a/src/momento/retry/retry_strategy.py +++ b/src/momento/retry/retry_strategy.py @@ -1,4 +1,5 @@ from abc import ABC, abstractmethod +from datetime import datetime from typing import Optional from .retryable_props import RetryableProps @@ -8,3 +9,7 @@ class RetryStrategy(ABC): @abstractmethod def determine_when_to_retry(self, props: RetryableProps) -> Optional[float]: pass + + # Currently used only by the FixedTimeoutRetryStrategy + def calculate_retry_deadline(self, overall_deadline: datetime) -> Optional[float]: + return None diff --git a/src/momento/retry/retryable_props.py b/src/momento/retry/retryable_props.py index 3832f193..55c0b208 100644 --- a/src/momento/retry/retryable_props.py +++ b/src/momento/retry/retryable_props.py @@ -1,4 +1,6 @@ from dataclasses import dataclass +from datetime import datetime +from typing import Optional import grpc @@ -9,3 +11,4 @@ class RetryableProps: grpc_status: grpc.StatusCode grpc_method: str attempt_number: int + overall_deadline: Optional[datetime] = None