Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions src/momento/internal/aio/_retry_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import asyncio
import logging
from datetime import datetime, timedelta
from typing import Callable

import grpc
Expand Down Expand Up @@ -34,20 +35,44 @@ async def intercept_unary_unary(
client_call_details: grpc.aio._interceptor.ClientCallDetails,
request: grpc.aio._typing.RequestType,
) -> grpc.aio._call.UnaryUnaryCall | grpc.aio._typing.ResponseType:
call = None
attempt_number = 1
# The overall deadline is calculated from the timeout set on the client call details.
# That value is set in our gRPC configurations and, while typed as optional, will never be None here.
overall_deadline = datetime.now() + timedelta(seconds=client_call_details.timeout or 0.0)
# variable to capture the penultimate call to a deadline-aware retry strategy, which
# will hold the call object before a terminal DEADLINE_EXCEEDED response is returned
last_call = None

while True:
if attempt_number > 1:
retry_deadline = self._retry_strategy.calculate_retry_deadline(overall_deadline)
if retry_deadline is not None:
client_call_details = grpc.aio._interceptor.ClientCallDetails(
client_call_details.method,
retry_deadline,
client_call_details.metadata,
client_call_details.credentials,
client_call_details.wait_for_ready,
)
last_call = call

call = await continuation(client_call_details, request)
response_code = await call.code()

if response_code == grpc.StatusCode.OK:
return call

retryTime = self._retry_strategy.determine_when_to_retry(
RetryableProps(response_code, client_call_details.method.decode("utf-8"), attempt_number)
# Note: the async interceptor gets `client_call_details.method` as a binary string that needs to be decoded
# but the sync interceptor gets it as a string.
RetryableProps(
response_code, client_call_details.method.decode("utf-8"), attempt_number, overall_deadline
)
)

if retryTime is None:
return call
return last_call or call

attempt_number += 1
await asyncio.sleep(retryTime)
27 changes: 25 additions & 2 deletions src/momento/internal/synchronous/_retry_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import logging
import time
from datetime import datetime, timedelta
from typing import Callable, TypeVar

import grpc
Expand Down Expand Up @@ -35,20 +36,42 @@ def intercept_unary_unary(
client_call_details: grpc.ClientCallDetails,
request: RequestType,
) -> InterceptorCall | ResponseType:
call = None
attempt_number = 1
# The overall deadline is calculated from the timeout set on the client call details.
# That value is set in our gRPC configurations and, while typed as optional, will never be None here.
overall_deadline = datetime.now() + timedelta(seconds=client_call_details.timeout or 0.0)
# variable to capture the penultimate call to a deadline-aware retry strategy, which
# will hold the call object before a terminal DEADLINE_EXCEEDED response is returned
last_call = None

while True:
if attempt_number > 1:
retry_deadline = self._retry_strategy.calculate_retry_deadline(overall_deadline)
if retry_deadline is not None:
client_call_details = grpc.aio._interceptor.ClientCallDetails(
client_call_details.method,
retry_deadline,
client_call_details.metadata,
client_call_details.credentials,
client_call_details.wait_for_ready,
)
last_call = call

call = continuation(client_call_details, request)
response_code = call.code() # type: ignore[attr-defined] # noqa: F401

if response_code == grpc.StatusCode.OK:
return call

retryTime = self._retry_strategy.determine_when_to_retry(
RetryableProps(response_code, client_call_details.method, attempt_number)
# Note: the async interceptor gets `client_call_details.method` as a binary string that needs to be decoded
# but the sync interceptor gets it as a string.
RetryableProps(response_code, client_call_details.method, attempt_number, overall_deadline)
)

if retryTime is None:
return call
return last_call or call

attempt_number += 1
time.sleep(retryTime)
97 changes: 97 additions & 0 deletions src/momento/retry/fixed_timeout_retry_strategy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import logging
import random
from datetime import datetime, timedelta
from typing import Optional

import grpc

from .default_eligibility_strategy import DefaultEligibilityStrategy
from .eligibility_strategy import EligibilityStrategy
from .retry_strategy import RetryStrategy
from .retryable_props import RetryableProps

logger = logging.getLogger("fixed-timeout-retry-strategy")


class FixedTimeoutRetryStrategy(RetryStrategy):
def __init__(
self,
*,
retry_timeout_millis: int,
retry_delay_interval_millis: int,
eligibility_strategy: DefaultEligibilityStrategy = DefaultEligibilityStrategy(),
):
self._eligibility_strategy: EligibilityStrategy = eligibility_strategy
self._retry_timeout_millis: int = retry_timeout_millis
self._retry_delay_interval_millis: int = retry_delay_interval_millis

def determine_when_to_retry(self, props: RetryableProps) -> Optional[float]:
"""Determines whether a grpc call can be retried and how long to wait before that retry.

Args:
props (RetryableProps): Information about the grpc call, its last invocation, and how many times the call
has been made.

:Returns
The time in seconds before the next retry should occur or None if no retry should be attempted.
"""
logger.debug(
"Determining whether request is eligible for retry; status code: %s, request type: %s, attemptNumber: %d",
props.grpc_status, # type: ignore[misc]
props.grpc_method,
props.attempt_number,
)

if props.overall_deadline is None:
logger.debug("Overall deadline is None; not retrying.")
return None

# If a retry attempt's timeout has passed but the client's overall timeout has not yet passed,
# we should reset the deadline and retry.
if (
props.attempt_number > 0
and props.grpc_status == grpc.StatusCode.DEADLINE_EXCEEDED # type: ignore[misc]
and props.overall_deadline > datetime.now()
):
return self.get_jitter_in_millis(props)

if self._eligibility_strategy.is_eligible_for_retry(props) is False:
logger.debug(
"Request path: %s; retryable status code: %s. Request is not retryable.",
props.grpc_method,
props.grpc_status, # type: ignore[misc]
)
return None

return self.get_jitter_in_millis(props)

def get_jitter_in_millis(self, props: RetryableProps) -> float:
timeout_with_jitter = self.add_jitter(self._retry_delay_interval_millis)
logger.debug(
"Determined request is retryable; retrying after %d ms: [method: %s, status: %s, attempt: %d]",
timeout_with_jitter,
props.grpc_method,
props.grpc_status, # type: ignore[misc]
props.attempt_number,
)
return timeout_with_jitter / 1000.0

def add_jitter(self, base_delay: int) -> int:
return int((0.2 * random.random() + 0.9) * float(base_delay))

def calculate_retry_deadline(self, overall_deadline: datetime) -> Optional[float]:
"""Calculates the deadline for a retry attempt using the retry timeout, but clips it to the overall deadline if the overall deadline is sooner.

Args:
overall_deadline (datetime): The overall deadline for the operation.

Returns:
float: The calculated retry deadline.
"""
logger.debug(
f"Calculating retry deadline:\nnow: {datetime.now()}\noverall deadline: {overall_deadline}\n"
+ f"retry timeout millis: {self._retry_timeout_millis}"
)
if datetime.now() + timedelta(milliseconds=self._retry_timeout_millis) > overall_deadline:
return (overall_deadline - datetime.now()).total_seconds() * 1000
return self._retry_timeout_millis
5 changes: 5 additions & 0 deletions src/momento/retry/retry_strategy.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Optional

from .retryable_props import RetryableProps
Expand All @@ -8,3 +9,7 @@ class RetryStrategy(ABC):
@abstractmethod
def determine_when_to_retry(self, props: RetryableProps) -> Optional[float]:
pass

# Currently used only by the FixedTimeoutRetryStrategy
def calculate_retry_deadline(self, overall_deadline: datetime) -> Optional[float]:
return None
3 changes: 3 additions & 0 deletions src/momento/retry/retryable_props.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from dataclasses import dataclass
from datetime import datetime
from typing import Optional

import grpc

Expand All @@ -9,3 +11,4 @@ class RetryableProps:
grpc_status: grpc.StatusCode
grpc_method: str
attempt_number: int
overall_deadline: Optional[datetime] = None
Loading