diff --git a/python/restate/__init__.py b/python/restate/__init__.py index e952d0b..25eec36 100644 --- a/python/restate/__init__.py +++ b/python/restate/__init__.py @@ -19,6 +19,7 @@ # types from .context import Context, ObjectContext, ObjectSharedContext from .context import WorkflowContext, WorkflowSharedContext +from .retry_policy import InvocationRetryPolicy # pylint: disable=line-too-long from .context import DurablePromise, RestateDurableFuture, RestateDurableCallFuture, RestateDurableSleepFuture, SendHandle, RunOptions from .exceptions import TerminalError @@ -65,5 +66,6 @@ def test_harness(app, # type: ignore "wait_completed", "select", "logging", - "RestateLoggingFilter" + "RestateLoggingFilter", + "InvocationRetryPolicy" ] diff --git a/python/restate/discovery.py b/python/restate/discovery.py index b64b6bc..bcae0c4 100644 --- a/python/restate/discovery.py +++ b/python/restate/discovery.py @@ -60,8 +60,26 @@ def __init__(self, setContentTypeIfEmpty: bool, contentType: Optional[str] = Non self.jsonSchema = jsonSchema class Handler: - # pylint: disable=R0902 - def __init__(self, name: str, ty: Optional[ServiceHandlerType] = None, input: Optional[InputPayload | Dict[str, str]] = None, output: Optional[OutputPayload] = None, description: Optional[str] = None, metadata: Optional[Dict[str, str]] = None, inactivityTimeout: Optional[int] = None, abortTimeout: Optional[int] = None, journalRetention: Optional[int] = None, idempotencyRetention: Optional[int] = None, workflowCompletionRetention: Optional[int] = None, enableLazyState: Optional[bool] = None, ingressPrivate: Optional[bool] = None): + # pylint: disable=R0902,R0914 + def __init__(self, + name: str, + ty: Optional[ServiceHandlerType] = None, + input: Optional[InputPayload | Dict[str, str]] = None, + output: Optional[OutputPayload] = None, + description: Optional[str] = None, + metadata: Optional[Dict[str, str]] = None, + inactivityTimeout: Optional[int] = None, + abortTimeout: Optional[int] = None, + journalRetention: Optional[int] = None, + idempotencyRetention: Optional[int] = None, + workflowCompletionRetention: Optional[int] = None, + enableLazyState: Optional[bool] = None, + ingressPrivate: Optional[bool] = None, + retryPolicyInitialInterval: Optional[int] = None, + retryPolicyMaxInterval: Optional[int] = None, + retryPolicyMaxAttempts: Optional[int] = None, + retryPolicyExponentiationFactor: Optional[float] = None, + retryPolicyOnMaxAttempts: Optional[str] = None): self.name = name self.ty = ty self.input = input @@ -75,10 +93,31 @@ def __init__(self, name: str, ty: Optional[ServiceHandlerType] = None, input: Op self.workflowCompletionRetention = workflowCompletionRetention self.enableLazyState = enableLazyState self.ingressPrivate = ingressPrivate + self.retryPolicyInitialInterval = retryPolicyInitialInterval + self.retryPolicyMaxInterval = retryPolicyMaxInterval + self.retryPolicyMaxAttempts = retryPolicyMaxAttempts + self.retryPolicyExponentiationFactor = retryPolicyExponentiationFactor + self.retryPolicyOnMaxAttempts = retryPolicyOnMaxAttempts class Service: - # pylint: disable=R0902 - def __init__(self, name: str, ty: ServiceType, handlers: List[Handler], description: Optional[str] = None, metadata: Optional[Dict[str, str]] = None, inactivityTimeout: Optional[int] = None, abortTimeout: Optional[int] = None, journalRetention: Optional[int] = None, idempotencyRetention: Optional[int] = None, enableLazyState: Optional[bool] = None, ingressPrivate: Optional[bool] = None): + # pylint: disable=R0902,R0914 + def __init__(self, + name: str, + ty: ServiceType, + handlers: List[Handler], + description: Optional[str] = None, + metadata: Optional[Dict[str, str]] = None, + inactivityTimeout: Optional[int] = None, + abortTimeout: Optional[int] = None, + journalRetention: Optional[int] = None, + idempotencyRetention: Optional[int] = None, + enableLazyState: Optional[bool] = None, + ingressPrivate: Optional[bool] = None, + retryPolicyInitialInterval: Optional[int] = None, + retryPolicyMaxInterval: Optional[int] = None, + retryPolicyMaxAttempts: Optional[int] = None, + retryPolicyExponentiationFactor: Optional[float] = None, + retryPolicyOnMaxAttempts: Optional[str] = None): self.name = name self.ty = ty self.handlers = handlers @@ -90,6 +129,11 @@ def __init__(self, name: str, ty: ServiceType, handlers: List[Handler], descript self.idempotencyRetention = idempotencyRetention self.enableLazyState = enableLazyState self.ingressPrivate = ingressPrivate + self.retryPolicyInitialInterval = retryPolicyInitialInterval + self.retryPolicyMaxInterval = retryPolicyMaxInterval + self.retryPolicyMaxAttempts = retryPolicyMaxAttempts + self.retryPolicyExponentiationFactor = retryPolicyExponentiationFactor + self.retryPolicyOnMaxAttempts = retryPolicyOnMaxAttempts class Endpoint: def __init__(self, protocolMode: ProtocolMode, minProtocolVersion: int, maxProtocolVersion: int, services: List[Service]): @@ -163,7 +207,7 @@ def json_schema_from_type_hint(type_hint: Optional[TypeHint[Any]]) -> Any: return type_hint_to_json_schema(type_hint.annotation) -# pylint: disable=R0912 +# pylint: disable=R0912,R0915 def compute_discovery_json(endpoint: RestateEndpoint, version: int, discovered_as: typing.Literal["bidi", "request_response"]) -> str: @@ -174,6 +218,31 @@ def compute_discovery_json(endpoint: RestateEndpoint, ep = compute_discovery(endpoint, discovered_as) # Validate that new discovery fields aren't used with older protocol versions + if version <= 3: + for service in ep.services: + if service.retryPolicyInitialInterval is not None: + raise ValueError("retryPolicyInitialInterval is only supported in discovery protocol version 4") + if service.retryPolicyMaxInterval is not None: + raise ValueError("retryPolicyMaxInterval is only supported in discovery protocol version 4") + if service.retryPolicyMaxAttempts is not None: + raise ValueError("retryPolicyMaxAttempts is only supported in discovery protocol version 4") + if service.retryPolicyExponentiationFactor is not None: + raise ValueError("retryPolicyExponentiationFactor is only supported in discovery protocol version 4") + if service.retryPolicyOnMaxAttempts is not None: + raise ValueError("retryPolicyOnMaxAttempts is only supported in discovery protocol version 4") + + for handler in service.handlers: + if handler.retryPolicyInitialInterval is not None: + raise ValueError("retryPolicyInitialInterval is only supported in discovery protocol version 4") + if handler.retryPolicyMaxInterval is not None: + raise ValueError("retryPolicyMaxInterval is only supported in discovery protocol version 4") + if handler.retryPolicyMaxAttempts is not None: + raise ValueError("retryPolicyMaxAttempts is only supported in discovery protocol version 4") + if handler.retryPolicyExponentiationFactor is not None: + raise ValueError("retryPolicyExponentiationFactor is only supported in discovery protocol version 4") + if handler.retryPolicyOnMaxAttempts is not None: + raise ValueError("retryPolicyOnMaxAttempts is only supported in discovery protocol version 4") + if version <= 2: # Check for new discovery fields in version 3 that shouldn't be used in version 2 or lower for service in ep.services: @@ -253,21 +322,31 @@ def compute_discovery(endpoint: RestateEndpoint, discovered_as : typing.Literal[ idempotencyRetention=int(handler.idempotency_retention.total_seconds() * 1000) if handler.idempotency_retention else None, workflowCompletionRetention=int(handler.workflow_retention.total_seconds() * 1000) if handler.workflow_retention else None, enableLazyState=handler.enable_lazy_state, - ingressPrivate=handler.ingress_private)) + ingressPrivate=handler.ingress_private, + retryPolicyInitialInterval=int(handler.invocation_retry_policy.initial_interval.total_seconds() * 1000) if handler.invocation_retry_policy and handler.invocation_retry_policy.initial_interval else None, + retryPolicyMaxInterval=int(handler.invocation_retry_policy.max_interval.total_seconds() * 1000) if handler.invocation_retry_policy and handler.invocation_retry_policy.max_interval else None, + retryPolicyMaxAttempts=int(handler.invocation_retry_policy.max_attempts) if handler.invocation_retry_policy and handler.invocation_retry_policy.max_attempts is not None else None, + retryPolicyExponentiationFactor=float(handler.invocation_retry_policy.exponentiation_factor) if handler.invocation_retry_policy and handler.invocation_retry_policy.exponentiation_factor is not None else None, + retryPolicyOnMaxAttempts=(handler.invocation_retry_policy.on_max_attempts.upper() if handler.invocation_retry_policy and handler.invocation_retry_policy.on_max_attempts is not None else None))) # add the service description = service.service_tag.description metadata = service.service_tag.metadata services.append(Service(name=service.name, - ty=service_type, - handlers=service_handlers, - description=description, - metadata=metadata, - inactivityTimeout=int(service.inactivity_timeout.total_seconds() * 1000) if service.inactivity_timeout else None, - abortTimeout=int(service.abort_timeout.total_seconds() * 1000) if service.abort_timeout else None, - journalRetention=int(service.journal_retention.total_seconds() * 1000) if service.journal_retention else None, - idempotencyRetention=int(service.idempotency_retention.total_seconds() * 1000) if service.idempotency_retention else None, - enableLazyState=service.enable_lazy_state if hasattr(service, 'enable_lazy_state') else None, - ingressPrivate=service.ingress_private)) + ty=service_type, + handlers=service_handlers, + description=description, + metadata=metadata, + inactivityTimeout=int(service.inactivity_timeout.total_seconds() * 1000) if service.inactivity_timeout else None, + abortTimeout=int(service.abort_timeout.total_seconds() * 1000) if service.abort_timeout else None, + journalRetention=int(service.journal_retention.total_seconds() * 1000) if service.journal_retention else None, + idempotencyRetention=int(service.idempotency_retention.total_seconds() * 1000) if service.idempotency_retention else None, + enableLazyState=service.enable_lazy_state if hasattr(service, 'enable_lazy_state') else None, + ingressPrivate=service.ingress_private, + retryPolicyInitialInterval=int(service.invocation_retry_policy.initial_interval.total_seconds() * 1000) if service.invocation_retry_policy and service.invocation_retry_policy.initial_interval else None, + retryPolicyMaxInterval=int(service.invocation_retry_policy.max_interval.total_seconds() * 1000) if service.invocation_retry_policy and service.invocation_retry_policy.max_interval else None, + retryPolicyMaxAttempts=int(service.invocation_retry_policy.max_attempts) if service.invocation_retry_policy and service.invocation_retry_policy.max_attempts is not None else None, + retryPolicyExponentiationFactor=float(service.invocation_retry_policy.exponentiation_factor) if service.invocation_retry_policy and service.invocation_retry_policy.exponentiation_factor is not None else None, + retryPolicyOnMaxAttempts=(service.invocation_retry_policy.on_max_attempts.upper() if service.invocation_retry_policy and service.invocation_retry_policy.on_max_attempts is not None else None))) if endpoint.protocol: protocol_mode = PROTOCOL_MODES[endpoint.protocol] diff --git a/python/restate/handler.py b/python/restate/handler.py index 19e6da9..481c160 100644 --- a/python/restate/handler.py +++ b/python/restate/handler.py @@ -20,6 +20,8 @@ from inspect import Signature from typing import Any, Callable, Awaitable, Dict, Generic, Literal, Optional, TypeVar +from restate.retry_policy import InvocationRetryPolicy + from restate.context import HandlerType from restate.exceptions import TerminalError from restate.serde import DefaultSerde, PydanticJsonSerde, Serde, is_pydantic @@ -122,6 +124,7 @@ class Handler(Generic[I, O]): workflow_retention: Workflow completion retention duration. enable_lazy_state: If true, lazy state is enabled. ingress_private: If true, the handler cannot be invoked from the HTTP nor Kafka ingress. + invocation_retry_policy: Optional retry policy configuration applied to this handler. """ service_tag: ServiceTag handler_io: HandlerIO[I, O] @@ -138,6 +141,7 @@ class Handler(Generic[I, O]): workflow_retention: Optional[timedelta] = None enable_lazy_state: Optional[bool] = None ingress_private: Optional[bool] = None + invocation_retry_policy: Optional[InvocationRetryPolicy] = None # disable too many arguments warning @@ -157,7 +161,8 @@ def make_handler(service_tag: ServiceTag, idempotency_retention: Optional[timedelta] = None, workflow_retention: Optional[timedelta] = None, enable_lazy_state: Optional[bool] = None, - ingress_private: Optional[bool] = None) -> Handler[I, O]: + ingress_private: Optional[bool] = None, + invocation_retry_policy: Optional[InvocationRetryPolicy] = None) -> Handler[I, O]: """ Factory function to create a handler. @@ -177,6 +182,8 @@ def make_handler(service_tag: ServiceTag, workflow_retention: Workflow completion retention duration. enable_lazy_state: If true, lazy state is enabled. ingress_private: If true, the handler cannot be invoked from the HTTP nor Kafka ingress. + invocation_retry_policy: Retry policy used by Restate when invoking this handler. + NOTE: only supported with restate-server >= 1.5. """ # try to deduce the handler name handler_name = name @@ -205,7 +212,8 @@ def make_handler(service_tag: ServiceTag, idempotency_retention=idempotency_retention, workflow_retention=workflow_retention, enable_lazy_state=enable_lazy_state, - ingress_private=ingress_private) + ingress_private=ingress_private, + invocation_retry_policy=invocation_retry_policy) vars(wrapped)[RESTATE_UNIQUE_HANDLER_SYMBOL] = handler return handler diff --git a/python/restate/object.py b/python/restate/object.py index f045e19..0a926a5 100644 --- a/python/restate/object.py +++ b/python/restate/object.py @@ -20,6 +20,7 @@ from datetime import timedelta from restate.serde import Serde, DefaultSerde +from restate.retry_policy import InvocationRetryPolicy from restate.handler import Handler, HandlerIO, ServiceTag, make_handler I = typing.TypeVar('I') @@ -68,6 +69,7 @@ class VirtualObject: HTTP and Kafka ingress, but only from other services. NOTE: You can set this field only if you register this service against restate-server >= 1.4, otherwise the service discovery will fail. + invocation_retry_policy (InvocationRetryPolicy, optional): Retry policy applied for all invocations to this virtual object. """ handlers: typing.Dict[str, Handler[typing.Any, typing.Any]] @@ -80,7 +82,8 @@ def __init__(self, name, journal_retention: typing.Optional[timedelta] = None, idempotency_retention: typing.Optional[timedelta] = None, enable_lazy_state: typing.Optional[bool] = None, - ingress_private: typing.Optional[bool] = None): + ingress_private: typing.Optional[bool] = None, + invocation_retry_policy: typing.Optional[InvocationRetryPolicy] = None): self.service_tag = ServiceTag("object", name, description, metadata) self.handlers = {} self.inactivity_timeout = inactivity_timeout @@ -89,6 +92,7 @@ def __init__(self, name, self.idempotency_retention = idempotency_retention self.enable_lazy_state = enable_lazy_state self.ingress_private = ingress_private + self.invocation_retry_policy = invocation_retry_policy @property def name(self): @@ -111,7 +115,8 @@ def handler(self, journal_retention: typing.Optional[timedelta] = None, idempotency_retention: typing.Optional[timedelta] = None, enable_lazy_state: typing.Optional[bool] = None, - ingress_private: typing.Optional[bool] = None) -> typing.Callable: + ingress_private: typing.Optional[bool] = None, + invocation_retry_policy: typing.Optional[InvocationRetryPolicy] = None) -> typing.Callable: """ Decorator for defining a handler function. @@ -151,6 +156,7 @@ def handler(self, but only from other services. NOTE: You can set this field only if you register this service against restate-server >= 1.4, otherwise the service discovery will fail. + invocation_retry_policy (InvocationRetryPolicy, optional): Retry policy applied for all invocations to this handler. Returns: Callable: The decorated function. @@ -174,7 +180,7 @@ def wrapped(*args, **kwargs): signature = inspect.signature(fn, eval_str=True) handler = make_handler(self.service_tag, handler_io, name, kind, wrapped, signature, inspect.getdoc(fn), metadata, inactivity_timeout, abort_timeout, journal_retention, idempotency_retention, - None, enable_lazy_state, ingress_private) + None, enable_lazy_state, ingress_private, invocation_retry_policy) self.handlers[handler.name] = handler return wrapped diff --git a/python/restate/retry_policy.py b/python/restate/retry_policy.py new file mode 100644 index 0000000..09e3022 --- /dev/null +++ b/python/restate/retry_policy.py @@ -0,0 +1,39 @@ +# Copyright (c) 2025 - Restate Software, Inc., Restate GmbH +# +# This file is part of the Restate SDK for Python, +# which is released under the MIT license. +# +# You can find a copy of the license in file LICENSE in the root +# directory of this repository or package, or at +# https://github.com/restatedev/sdk-typescript/blob/main/LICENSE +""" +Retry policy configuration for handler invocations exposed in the discovery manifest (protocol v4+). + +Note: You can set these fields only if you register this service against restate-server >= 1.5 +and discovery protocol v4. Otherwise, service discovery will fail. +""" +from __future__ import annotations + +from dataclasses import dataclass +from datetime import timedelta +from typing import Optional, Literal + +@dataclass +class InvocationRetryPolicy: + """ + Retry policy used by Restate when retrying failed handler invocations. + + Fields: + - initial_interval: Initial delay before the first retry attempt. + - exponentiation_factor: Exponential backoff multiplier used to compute the next retry delay. + - max_interval: Upper bound for any computed retry delay. + - max_attempts: Maximum number of attempts before giving up retrying. + The initial call counts as the first attempt. + - on_max_attempts: Behavior when reaching max attempts (pause or kill). + """ + + initial_interval: Optional[timedelta] = None + exponentiation_factor: Optional[float] = None + max_interval: Optional[timedelta] = None + max_attempts: Optional[int] = None + on_max_attempts: Optional[Literal["pause", "kill"]] = None diff --git a/python/restate/server.py b/python/restate/server.py index e0da1ad..cebaebb 100644 --- a/python/restate/server.py +++ b/python/restate/server.py @@ -61,7 +61,9 @@ async def send_discovery(scope: Scope, send: Send, endpoint: Endpoint): # Negotiate discovery protocol version version = 2 if accept_header: - if "application/vnd.restate.endpointmanifest.v3+json" in accept_header: + if "application/vnd.restate.endpointmanifest.v4+json" in accept_header: + version = 4 + elif "application/vnd.restate.endpointmanifest.v3+json" in accept_header: version = 3 elif "application/vnd.restate.endpointmanifest.v2+json" in accept_header: version = 2 diff --git a/python/restate/service.py b/python/restate/service.py index 76ba1f6..28b7f7c 100644 --- a/python/restate/service.py +++ b/python/restate/service.py @@ -21,6 +21,7 @@ from datetime import timedelta from restate.serde import Serde, DefaultSerde +from restate.retry_policy import InvocationRetryPolicy from .handler import Handler, HandlerIO, ServiceTag, make_handler I = typing.TypeVar('I') @@ -31,7 +32,7 @@ # pylint: disable=R0913 # disable line too long warning -# pylint: disable=C0301 +# pylint: disable=C0301,R0902 class Service: """ @@ -65,6 +66,7 @@ class Service: HTTP and Kafka ingress, but only from other services. NOTE: You can set this field only if you register this service against restate-server >= 1.4, otherwise the service discovery will fail. + invocation_retry_policy (InvocationRetryPolicy, optional): Retry policy applied for all invocations to this service. """ def __init__(self, name: str, @@ -74,7 +76,8 @@ def __init__(self, name: str, abort_timeout: typing.Optional[timedelta] = None, journal_retention: typing.Optional[timedelta] = None, idempotency_retention: typing.Optional[timedelta] = None, - ingress_private: typing.Optional[bool] = None) -> None: + ingress_private: typing.Optional[bool] = None, + invocation_retry_policy: typing.Optional[InvocationRetryPolicy] = None) -> None: self.service_tag = ServiceTag("service", name, description, metadata) self.handlers: typing.Dict[str, Handler] = {} self.inactivity_timeout = inactivity_timeout @@ -82,6 +85,7 @@ def __init__(self, name: str, self.journal_retention = journal_retention self.idempotency_retention = idempotency_retention self.ingress_private = ingress_private + self.invocation_retry_policy = invocation_retry_policy @property def name(self): @@ -101,7 +105,8 @@ def handler(self, abort_timeout: typing.Optional[timedelta] = None, journal_retention: typing.Optional[timedelta] = None, idempotency_retention: typing.Optional[timedelta] = None, - ingress_private: typing.Optional[bool] = None) -> typing.Callable: + ingress_private: typing.Optional[bool] = None, + invocation_retry_policy: typing.Optional[InvocationRetryPolicy] = None) -> typing.Callable: """ Decorator for defining a handler function. @@ -138,6 +143,7 @@ def handler(self, but only from other services. NOTE: You can set this field only if you register this service against restate-server >= 1.4, otherwise the service discovery will fail. + invocation_retry_policy (InvocationRetryPolicy, optional): Retry policy applied for all invocations to this handler. Returns: Callable: The decorated function. @@ -160,7 +166,7 @@ def wrapped(*args, **kwargs): signature = inspect.signature(fn, eval_str=True) handler = make_handler(self.service_tag, handler_io, name, None, wrapped, signature, inspect.getdoc(fn), metadata, inactivity_timeout, abort_timeout, journal_retention, idempotency_retention, - None, None, ingress_private) + None, None, ingress_private, invocation_retry_policy) self.handlers[handler.name] = handler return wrapped diff --git a/python/restate/workflow.py b/python/restate/workflow.py index 8e221ec..b937879 100644 --- a/python/restate/workflow.py +++ b/python/restate/workflow.py @@ -20,6 +20,7 @@ import typing from datetime import timedelta +from restate.retry_policy import InvocationRetryPolicy from restate.serde import DefaultSerde, Serde from restate.handler import Handler, HandlerIO, ServiceTag, make_handler @@ -72,6 +73,7 @@ class Workflow: HTTP and Kafka ingress, but only from other services. NOTE: You can set this field only if you register this service against restate-server >= 1.4, otherwise the service discovery will fail. + invocation_retry_policy (InvocationRetryPolicy, optional): Retry policy applied for all invocations to this workflow. """ handlers: typing.Dict[str, Handler[typing.Any, typing.Any]] @@ -85,7 +87,8 @@ def __init__(self, journal_retention: typing.Optional[timedelta] = None, idempotency_retention: typing.Optional[timedelta] = None, enable_lazy_state: typing.Optional[bool] = None, - ingress_private: typing.Optional[bool] = None): + ingress_private: typing.Optional[bool] = None, + invocation_retry_policy: typing.Optional[InvocationRetryPolicy] = None): self.service_tag = ServiceTag("workflow", name, description, metadata) self.handlers = {} self.inactivity_timeout = inactivity_timeout @@ -94,6 +97,7 @@ def __init__(self, self.idempotency_retention = idempotency_retention self.enable_lazy_state = enable_lazy_state self.ingress_private = ingress_private + self.invocation_retry_policy = invocation_retry_policy @property def name(self): @@ -114,7 +118,8 @@ def main(self, journal_retention: typing.Optional[timedelta] = None, workflow_retention: typing.Optional[timedelta] = None, enable_lazy_state: typing.Optional[bool] = None, - ingress_private: typing.Optional[bool] = None) -> typing.Callable: # type: ignore + ingress_private: typing.Optional[bool] = None, + invocation_retry_policy: typing.Optional[InvocationRetryPolicy] = None) -> typing.Callable: # type: ignore """ Mark this handler as a workflow entry point. @@ -153,6 +158,7 @@ def main(self, but only from other services. NOTE: You can set this field only if you register this service against restate-server >= 1.4, otherwise the service discovery will fail. + invocation_retry_policy (InvocationRetryPolicy, optional): Retry policy applied for all invocations to this workflow. """ return self._add_handler(name, kind="workflow", @@ -167,7 +173,8 @@ def main(self, idempotency_retention=None, workflow_retention=workflow_retention, enable_lazy_state=enable_lazy_state, - ingress_private=ingress_private) + ingress_private=ingress_private, + invocation_retry_policy=invocation_retry_policy) def handler(self, name: typing.Optional[str] = None, @@ -181,7 +188,8 @@ def handler(self, journal_retention: typing.Optional[timedelta] = None, idempotency_retention: typing.Optional[timedelta] = None, enable_lazy_state: typing.Optional[bool] = None, - ingress_private: typing.Optional[bool] = None) -> typing.Callable: + ingress_private: typing.Optional[bool] = None, + invocation_retry_policy: typing.Optional[InvocationRetryPolicy] = None) -> typing.Callable: """ Decorator for defining a handler function. @@ -220,10 +228,11 @@ def handler(self, but only from other services. NOTE: You can set this field only if you register this service against restate-server >= 1.4, otherwise the service discovery will fail. + invocation_retry_policy (InvocationRetryPolicy, optional): Retry policy applied for all invocations to this handler. """ return self._add_handler(name, "shared", accept, content_type, input_serde, output_serde, metadata, inactivity_timeout, abort_timeout, journal_retention, idempotency_retention, - None, enable_lazy_state, ingress_private) + None, enable_lazy_state, ingress_private, invocation_retry_policy) # pylint: disable=R0914 def _add_handler(self, @@ -240,7 +249,8 @@ def _add_handler(self, idempotency_retention: typing.Optional[timedelta] = None, workflow_retention: typing.Optional[timedelta] = None, enable_lazy_state: typing.Optional[bool] = None, - ingress_private: typing.Optional[bool] = None) -> typing.Callable: # type: ignore + ingress_private: typing.Optional[bool] = None, + invocation_retry_policy: typing.Optional["InvocationRetryPolicy"] = None) -> typing.Callable: # type: ignore """ Decorator for defining a handler function. @@ -283,6 +293,7 @@ def _add_handler(self, but only from other services. NOTE: You can set this field only if you register this service against restate-server >= 1.4, otherwise the service discovery will fail. + invocation_retry_policy (InvocationRetryPolicy, optional): Retry policy applied for all invocations to this handler. Returns: Callable: The decorated function. @@ -319,7 +330,8 @@ def wrapped(*args, **kwargs): idempotency_retention=idempotency_retention, workflow_retention=workflow_retention, enable_lazy_state=enable_lazy_state, - ingress_private=ingress_private) + ingress_private=ingress_private, + invocation_retry_policy=invocation_retry_policy) self.handlers[handler.name] = handler return wrapped