From da53b29f6abfe5d0c9013d795536d46c78434b18 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Wed, 15 Oct 2025 10:14:13 +0200 Subject: [PATCH 1/7] Expose all run retry options --- python/restate/context.py | 46 +++++++++++++++++++++++++++----- python/restate/server_context.py | 42 ++++++++++++++++++++--------- python/restate/vm.py | 14 ++++++++-- src/lib.rs | 28 ++++++++++++++----- 4 files changed, 102 insertions(+), 28 deletions(-) diff --git a/python/restate/context.py b/python/restate/context.py index 37ab74d..bbc6b90 100644 --- a/python/restate/context.py +++ b/python/restate/context.py @@ -20,6 +20,7 @@ from typing import Any, Awaitable, Callable, Dict, List, Optional, TypeVar, Union, Coroutine, overload, ParamSpec import typing from datetime import timedelta +import warnings from restate.serde import DefaultSerde, Serde @@ -31,6 +32,7 @@ HandlerType = Union[Callable[[Any, I], Awaitable[O]], Callable[[Any], Awaitable[O]]] RunAction = Union[Callable[..., Coroutine[Any, Any, T]], Callable[..., T]] +# pylint: disable=R0902 @dataclass class RunOptions(typing.Generic[T]): """ @@ -40,15 +42,45 @@ class RunOptions(typing.Generic[T]): serde: Serde[T] = DefaultSerde() """The serialization/deserialization mechanism. - if the default serde is used, a default serializer will be used based on the type. See also 'type_hint'.""" - max_attempts: Optional[int] = None - """The maximum number of retry attempts, including the initial attempt, to complete the action. - If None, the action will be retried indefinitely, until it succeeds. - Otherwise, the action will be retried until the maximum number of attempts is reached and then it will raise a TerminalError.""" - max_retry_duration: Optional[timedelta] = None - """The maximum duration for retrying. If None, the action will be retried indefinitely, until it succeeds. - Otherwise, the action will be retried until the maximum duration is reached and then it will raise a TerminalError.""" type_hint: Optional[typing.Type[T]] = None """The type hint of the return value of the action. This is used to pick the serializer. If None, the type hint will be inferred from the action's return type, or the provided serializer.""" + max_attempts: Optional[int] = None + """Max number of attempts (including the initial), before giving up. + + When giving up, `ctx.run` will throw a `TerminalError` wrapping the original error message.""" + max_duration: Optional[timedelta] = None + """Max duration of retries, before giving up. + + When giving up, `ctx.run` will throw a `TerminalError` wrapping the original error message.""" + initial_retry_interval: Optional[timedelta] = None + """Initial interval for the first retry attempt. + Retry interval will grow by a factor specified in `retry_interval_factor`. + + If any of the other retry related fields is specified, the default for this field is 50 milliseconds, otherwise restate will fallback to the overall invocation retry policy.""" + max_retry_interval: Optional[timedelta] = None + """Max interval between retries. + Retry interval will grow by a factor specified in `retry_interval_factor`. + + The default is 10 seconds.""" + retry_interval_factor: Optional[float] = None + """Exponentiation factor to use when computing the next retry delay. + + If any of the other retry related fields is specified, the default for this field is `2`, meaning retry interval will double at each attempt, otherwise restate will fallback to the overall invocation retry policy.""" + max_retry_duration: Optional[timedelta] = None + """Deprecated: Use max_duration instead. + + Max duration of retries, before giving up.""" + + def __post_init__(self): + """Handle deprecated fields.""" + if self.max_retry_duration is not None: + warnings.warn( + "max_retry_duration is deprecated, use max_duration instead", + DeprecationWarning, + stacklevel=2 + ) + if self.max_duration is None: + self.max_duration = self.max_retry_duration # pylint: disable=R0903 class RestateDurableFuture(typing.Generic[T], Awaitable[T]): diff --git a/python/restate/server_context.py b/python/restate/server_context.py index d6dabee..f598880 100644 --- a/python/restate/server_context.py +++ b/python/restate/server_context.py @@ -542,9 +542,13 @@ async def create_run_coroutine(self, action: RunAction[T], serde: Serde[T], max_attempts: Optional[int] = None, - max_retry_duration: Optional[timedelta] = None, + max_duration: Optional[timedelta] = None, + initial_retry_interval: Optional[timedelta] = None, + max_retry_interval: Optional[timedelta] = None, + retry_interval_factor: Optional[float] = None, ): """Create a coroutine to poll the handle.""" + start = time.time() try: if inspect.iscoroutinefunction(action): action_result: T = await action() # type: ignore @@ -565,15 +569,20 @@ async def create_run_coroutine(self, raise e from None # pylint: disable=W0718 except Exception as e: - if max_attempts is None and max_retry_duration is None: - # no retry policy - # todo: log the error - self.vm.notify_error(repr(e), traceback.format_exc()) - else: - failure = Failure(code=500, message=str(e)) - max_duration_ms = None if max_retry_duration is None else int(max_retry_duration.total_seconds() * 1000) - config = RunRetryConfig(max_attempts=max_attempts, max_duration=max_duration_ms) - self.vm.propose_run_completion_transient(handle, failure=failure, attempt_duration_ms=1, config=config) + end = time.time() + attempt_duration = int((end - start) * 1000) + failure = Failure(code=500, message=str(e)) + max_duration_ms = None if max_duration is None else int(max_duration.total_seconds() * 1000) + initial_retry_interval_ms = None if initial_retry_interval is None else int(initial_retry_interval.total_seconds() * 1000) + max_retry_interval_ms = None if max_retry_interval is None else int(max_retry_interval.total_seconds() * 1000) + config = RunRetryConfig( + max_attempts=max_attempts, + max_duration=max_duration_ms, + initial_interval=initial_retry_interval_ms, + max_interval=max_retry_interval_ms, + interval_factor=retry_interval_factor + ) + self.vm.propose_run_completion_transient(handle, failure=failure, attempt_duration_ms=attempt_duration, config=config) # pylint: disable=W0236 # pylint: disable=R0914 def run(self, @@ -600,7 +609,7 @@ def run(self, else: # todo: we can also verify by looking at the signature that there are no missing parameters noargs_action = action # type: ignore - self.run_coros_to_execute[handle] = lambda : self.create_run_coroutine(handle, noargs_action, serde, max_attempts, max_retry_duration) + self.run_coros_to_execute[handle] = lambda : self.create_run_coroutine(handle, noargs_action, serde, max_attempts, max_retry_duration, None, None, None) return self.create_future(handle, serde) # type: ignore def run_typed( @@ -623,7 +632,16 @@ def run_typed( update_restate_context_is_replaying(self.vm) func = functools.partial(action, *args, **kwargs) - self.run_coros_to_execute[handle] = lambda : self.create_run_coroutine(handle, func, options.serde, options.max_attempts, options.max_retry_duration) + self.run_coros_to_execute[handle] = lambda : self.create_run_coroutine( + handle, + func, + options.serde, + options.max_attempts, + options.max_duration, + options.initial_retry_interval, + options.max_retry_interval, + options.retry_interval_factor + ) return self.create_future(handle, options.serde) def sleep(self, delta: timedelta) -> RestateDurableSleepFuture: diff --git a/python/restate/vm.py b/python/restate/vm.py index a001ceb..82fff0a 100644 --- a/python/restate/vm.py +++ b/python/restate/vm.py @@ -33,11 +33,15 @@ class Invocation: @dataclass class RunRetryConfig: """ - Expo Retry Configuration + Exponential Retry Configuration + + All duration/interval values are in milliseconds. """ initial_interval: typing.Optional[int] = None max_attempts: typing.Optional[int] = None max_duration: typing.Optional[int] = None + max_interval: typing.Optional[int] = None + interval_factor: typing.Optional[float] = None @dataclass class Failure: @@ -400,7 +404,13 @@ def propose_run_completion_transient(self, handle: int, failure: Failure, attemp This requires a retry policy to be provided. """ py_failure = PyFailure(failure.code, failure.message) - py_config = PyExponentialRetryConfig(config.initial_interval, config.max_attempts, config.max_duration) + py_config = PyExponentialRetryConfig( + config.initial_interval, + config.max_attempts, + config.max_duration, + config.max_interval, + config.interval_factor + ) try: handle = self.vm.propose_run_completion_failure_transient(handle, py_failure, attempt_duration_ms, py_config) # The VM decided not to retry, therefore we get back an handle that will be resolved diff --git a/src/lib.rs b/src/lib.rs index 992753d..eb45740 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -115,33 +115,47 @@ struct PyExponentialRetryConfig { max_attempts: Option, #[pyo3(get, set)] max_duration: Option, + #[pyo3(get, set)] + max_interval: Option, + #[pyo3(get, set)] + factor: Option, } #[pymethods] impl PyExponentialRetryConfig { - #[pyo3(signature = (initial_interval=None, max_attempts=None, max_duration=None))] + #[pyo3(signature = (initial_interval=None, max_attempts=None, max_duration=None, max_interval=None, factor=None))] #[new] fn new( initial_interval: Option, max_attempts: Option, max_duration: Option, + max_interval: Option, + factor: Option, ) -> Self { Self { initial_interval, max_attempts, max_duration, + max_interval, + factor, } } } impl From for RetryPolicy { fn from(value: PyExponentialRetryConfig) -> Self { - RetryPolicy::Exponential { - initial_interval: Duration::from_millis(value.initial_interval.unwrap_or(10)), - max_attempts: value.max_attempts, - max_duration: value.max_duration.map(Duration::from_millis), - factor: 2.0, - max_interval: None, + if value.initial_interval.is_some() || value.max_attempts.is_some() || value.max_duration.is_some() || value.max_interval.is_some() || value.factor.is_some() { + // If any of the values are set, then let's create the exponential retry policy + RetryPolicy::Exponential { + initial_interval: Duration::from_millis(value.initial_interval.unwrap_or(50)), + max_attempts: value.max_attempts, + max_duration: value.max_duration.map(Duration::from_millis), + factor: value.factor.unwrap_or(2.0) as f32, + max_interval: value.max_interval.map(Duration::from_millis).or_else(|| Some(Duration::from_secs(10))), + } + } else { + // Let's use retry policy infinite here, which will give back control to the invocation retry policy + RetryPolicy::Infinite } } } From 2a01d2c8fe80acb5ebe9c518468381ed6ffced83 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Wed, 15 Oct 2025 10:30:44 +0200 Subject: [PATCH 2/7] Remove post_init --- python/restate/context.py | 16 +--------------- 1 file changed, 1 insertion(+), 15 deletions(-) diff --git a/python/restate/context.py b/python/restate/context.py index bbc6b90..fd51626 100644 --- a/python/restate/context.py +++ b/python/restate/context.py @@ -20,7 +20,6 @@ from typing import Any, Awaitable, Callable, Dict, List, Optional, TypeVar, Union, Coroutine, overload, ParamSpec import typing from datetime import timedelta -import warnings from restate.serde import DefaultSerde, Serde @@ -67,20 +66,7 @@ class RunOptions(typing.Generic[T]): If any of the other retry related fields is specified, the default for this field is `2`, meaning retry interval will double at each attempt, otherwise restate will fallback to the overall invocation retry policy.""" max_retry_duration: Optional[timedelta] = None - """Deprecated: Use max_duration instead. - - Max duration of retries, before giving up.""" - - def __post_init__(self): - """Handle deprecated fields.""" - if self.max_retry_duration is not None: - warnings.warn( - "max_retry_duration is deprecated, use max_duration instead", - DeprecationWarning, - stacklevel=2 - ) - if self.max_duration is None: - self.max_duration = self.max_retry_duration + """Deprecated: Use max_duration instead.""" # pylint: disable=R0903 class RestateDurableFuture(typing.Generic[T], Awaitable[T]): From 657c2f9cd8d3aa8558feea90b22e9e28dedaa753 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Wed, 15 Oct 2025 15:04:55 +0200 Subject: [PATCH 3/7] Make retry happen instantaneously on errors. --- test-services/services/failing.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/test-services/services/failing.py b/test-services/services/failing.py index 37711e8..9856d82 100644 --- a/test-services/services/failing.py +++ b/test-services/services/failing.py @@ -9,6 +9,8 @@ # https://github.com/restatedev/sdk-typescript/blob/main/LICENSE # """example.py""" +from datetime import timedelta + # pylint: disable=C0116 # pylint: disable=W0613 # pylint: disable=W0622 @@ -61,7 +63,7 @@ def side_effect(): return eventual_success_side_effects raise ValueError(f"Failed at attempt: {eventual_success_side_effects}") - options: RunOptions[int] = RunOptions(max_attempts=minimum_attempts + 1) + options: RunOptions[int] = RunOptions(max_attempts=minimum_attempts + 1, initial_retry_interval=timedelta(milliseconds=1), retry_interval_factor=1.0) return await ctx.run_typed("sideEffect", side_effect, options) eventual_failure_side_effects = 0 @@ -75,7 +77,7 @@ def side_effect(): raise ValueError(f"Failed at attempt: {eventual_failure_side_effects}") try: - options: RunOptions[int] = RunOptions(max_attempts=retry_policy_max_retry_count) + options: RunOptions[int] = RunOptions(max_attempts=retry_policy_max_retry_count, initial_retry_interval=timedelta(milliseconds=1), retry_interval_factor=1.0) await ctx.run_typed("sideEffect", side_effect, options) raise ValueError("Side effect did not fail.") except TerminalError as t: From 29f4868c2157ad4775cdbd2a2dd2dc3c5b73b808 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Wed, 15 Oct 2025 16:04:05 +0200 Subject: [PATCH 4/7] Fix incorrect try/catch --- python/restate/vm.py | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/python/restate/vm.py b/python/restate/vm.py index 82fff0a..ef8532f 100644 --- a/python/restate/vm.py +++ b/python/restate/vm.py @@ -398,7 +398,7 @@ def propose_run_completion_failure(self, handle: int, output: Failure) -> int: return self.vm.propose_run_completion_failure(handle, res) # pylint: disable=line-too-long - def propose_run_completion_transient(self, handle: int, failure: Failure, attempt_duration_ms: int, config: RunRetryConfig) -> int | None: + def propose_run_completion_transient(self, handle: int, failure: Failure, attempt_duration_ms: int, config: RunRetryConfig): """ Exit a side effect with a transient Error. This requires a retry policy to be provided. @@ -411,15 +411,7 @@ def propose_run_completion_transient(self, handle: int, failure: Failure, attemp config.max_interval, config.interval_factor ) - try: - handle = self.vm.propose_run_completion_failure_transient(handle, py_failure, attempt_duration_ms, py_config) - # The VM decided not to retry, therefore we get back an handle that will be resolved - # with a terminal failure. - return handle - # pylint: disable=bare-except - except: - # The VM decided to retry, therefore we tear down the current execution - return None + self.vm.propose_run_completion_failure_transient(handle, py_failure, attempt_duration_ms, py_config) def sys_end(self): """ From 24ae71c878bff29d8194633a8ebd87c632e0a099 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Wed, 15 Oct 2025 16:44:12 +0200 Subject: [PATCH 5/7] Move take_and_send_output inside the do progress loop, and enqueue the restate event always. --- python/restate/server_context.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/python/restate/server_context.py b/python/restate/server_context.py index f598880..d7fd30a 100644 --- a/python/restate/server_context.py +++ b/python/restate/server_context.py @@ -408,8 +408,8 @@ async def must_take_notification(self, handle): async def create_poll_or_cancel_coroutine(self, handles: typing.List[int]) -> None: """Create a coroutine to poll the handle.""" - await self.take_and_send_output() while True: + await self.take_and_send_output() do_progress_response = self.vm.do_progress(handles) if isinstance(do_progress_response, Exception): # We might need to write out something at this point. @@ -432,9 +432,10 @@ async def create_poll_or_cancel_coroutine(self, handles: typing.List[int]) -> No assert fn is not None async def wrapper(f): - await f() - await self.take_and_send_output() - await self.receive.enqueue_restate_event({ 'type' : 'restate.run_completed', 'data': None}) + try: + await f() + finally: + await self.receive.enqueue_restate_event({ 'type' : 'restate.run_completed', 'data': None}) task = asyncio.create_task(wrapper(fn)) self.tasks.add(task) From 58167943ba7f84a22fbf0c2233f1d2eb89ae4e9f Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Wed, 15 Oct 2025 16:46:01 +0200 Subject: [PATCH 6/7] Move take_and_send_output inside the do progress loop, and enqueue the restate event always. --- python/restate/server_context.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/python/restate/server_context.py b/python/restate/server_context.py index d7fd30a..8469891 100644 --- a/python/restate/server_context.py +++ b/python/restate/server_context.py @@ -411,15 +411,11 @@ async def create_poll_or_cancel_coroutine(self, handles: typing.List[int]) -> No while True: await self.take_and_send_output() do_progress_response = self.vm.do_progress(handles) - if isinstance(do_progress_response, Exception): - # We might need to write out something at this point. - await self.take_and_send_output() + if isinstance(do_progress_response, BaseException): # Print this exception, might be relevant for the user traceback.print_exception(do_progress_response) await cancel_current_task() if isinstance(do_progress_response, Suspended): - # We might need to write out something at this point. - await self.take_and_send_output() await cancel_current_task() if isinstance(do_progress_response, DoProgressAnyCompleted): # One of the handles completed From 17cd6c8a13dd51d6447f23f23c8d29d0f77c4f5a Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Wed, 15 Oct 2025 17:01:25 +0200 Subject: [PATCH 7/7] Format --- src/lib.rs | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index eb45740..a5da5f8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,14 +1,14 @@ -use std::fmt; use pyo3::create_exception; use pyo3::prelude::*; use pyo3::types::{PyBytes, PyNone, PyString}; +use restate_sdk_shared_core::fmt::{set_error_formatter, ErrorFormatter}; use restate_sdk_shared_core::{ CallHandle, CoreVM, DoProgressResponse, Error, Header, IdentityVerifier, Input, NonEmptyValue, NotificationHandle, ResponseHead, RetryPolicy, RunExitResult, TakeOutputResult, Target, TerminalFailure, VMOptions, Value, CANCEL_NOTIFICATION_HANDLE, VM, }; +use std::fmt; use std::time::{Duration, SystemTime}; -use restate_sdk_shared_core::fmt::{set_error_formatter, ErrorFormatter}; // Current crate version const CURRENT_VERSION: &str = env!("CARGO_PKG_VERSION"); @@ -144,18 +144,26 @@ impl PyExponentialRetryConfig { impl From for RetryPolicy { fn from(value: PyExponentialRetryConfig) -> Self { - if value.initial_interval.is_some() || value.max_attempts.is_some() || value.max_duration.is_some() || value.max_interval.is_some() || value.factor.is_some() { + if value.initial_interval.is_some() + || value.max_attempts.is_some() + || value.max_duration.is_some() + || value.max_interval.is_some() + || value.factor.is_some() + { // If any of the values are set, then let's create the exponential retry policy RetryPolicy::Exponential { initial_interval: Duration::from_millis(value.initial_interval.unwrap_or(50)), max_attempts: value.max_attempts, max_duration: value.max_duration.map(Duration::from_millis), factor: value.factor.unwrap_or(2.0) as f32, - max_interval: value.max_interval.map(Duration::from_millis).or_else(|| Some(Duration::from_secs(10))), + max_interval: value + .max_interval + .map(Duration::from_millis) + .or_else(|| Some(Duration::from_secs(10))), } } else { // Let's use retry policy infinite here, which will give back control to the invocation retry policy - RetryPolicy::Infinite + RetryPolicy::Infinite } } }