Skip to content

Commit 5526874

Browse files
Retry Policy (#126)
1 parent 8b33897 commit 5526874

File tree

8 files changed

+188
-34
lines changed

8 files changed

+188
-34
lines changed

python/restate/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
# types
2020
from .context import Context, ObjectContext, ObjectSharedContext
2121
from .context import WorkflowContext, WorkflowSharedContext
22+
from .retry_policy import InvocationRetryPolicy
2223
# pylint: disable=line-too-long
2324
from .context import DurablePromise, RestateDurableFuture, RestateDurableCallFuture, RestateDurableSleepFuture, SendHandle, RunOptions
2425
from .exceptions import TerminalError
@@ -65,5 +66,6 @@ def test_harness(app, # type: ignore
6566
"wait_completed",
6667
"select",
6768
"logging",
68-
"RestateLoggingFilter"
69+
"RestateLoggingFilter",
70+
"InvocationRetryPolicy"
6971
]

python/restate/discovery.py

Lines changed: 95 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,26 @@ def __init__(self, setContentTypeIfEmpty: bool, contentType: Optional[str] = Non
6060
self.jsonSchema = jsonSchema
6161

6262
class Handler:
63-
# pylint: disable=R0902
64-
def __init__(self, name: str, ty: Optional[ServiceHandlerType] = None, input: Optional[InputPayload | Dict[str, str]] = None, output: Optional[OutputPayload] = None, description: Optional[str] = None, metadata: Optional[Dict[str, str]] = None, inactivityTimeout: Optional[int] = None, abortTimeout: Optional[int] = None, journalRetention: Optional[int] = None, idempotencyRetention: Optional[int] = None, workflowCompletionRetention: Optional[int] = None, enableLazyState: Optional[bool] = None, ingressPrivate: Optional[bool] = None):
63+
# pylint: disable=R0902,R0914
64+
def __init__(self,
65+
name: str,
66+
ty: Optional[ServiceHandlerType] = None,
67+
input: Optional[InputPayload | Dict[str, str]] = None,
68+
output: Optional[OutputPayload] = None,
69+
description: Optional[str] = None,
70+
metadata: Optional[Dict[str, str]] = None,
71+
inactivityTimeout: Optional[int] = None,
72+
abortTimeout: Optional[int] = None,
73+
journalRetention: Optional[int] = None,
74+
idempotencyRetention: Optional[int] = None,
75+
workflowCompletionRetention: Optional[int] = None,
76+
enableLazyState: Optional[bool] = None,
77+
ingressPrivate: Optional[bool] = None,
78+
retryPolicyInitialInterval: Optional[int] = None,
79+
retryPolicyMaxInterval: Optional[int] = None,
80+
retryPolicyMaxAttempts: Optional[int] = None,
81+
retryPolicyExponentiationFactor: Optional[float] = None,
82+
retryPolicyOnMaxAttempts: Optional[str] = None):
6583
self.name = name
6684
self.ty = ty
6785
self.input = input
@@ -75,10 +93,31 @@ def __init__(self, name: str, ty: Optional[ServiceHandlerType] = None, input: Op
7593
self.workflowCompletionRetention = workflowCompletionRetention
7694
self.enableLazyState = enableLazyState
7795
self.ingressPrivate = ingressPrivate
96+
self.retryPolicyInitialInterval = retryPolicyInitialInterval
97+
self.retryPolicyMaxInterval = retryPolicyMaxInterval
98+
self.retryPolicyMaxAttempts = retryPolicyMaxAttempts
99+
self.retryPolicyExponentiationFactor = retryPolicyExponentiationFactor
100+
self.retryPolicyOnMaxAttempts = retryPolicyOnMaxAttempts
78101

79102
class Service:
80-
# pylint: disable=R0902
81-
def __init__(self, name: str, ty: ServiceType, handlers: List[Handler], description: Optional[str] = None, metadata: Optional[Dict[str, str]] = None, inactivityTimeout: Optional[int] = None, abortTimeout: Optional[int] = None, journalRetention: Optional[int] = None, idempotencyRetention: Optional[int] = None, enableLazyState: Optional[bool] = None, ingressPrivate: Optional[bool] = None):
103+
# pylint: disable=R0902,R0914
104+
def __init__(self,
105+
name: str,
106+
ty: ServiceType,
107+
handlers: List[Handler],
108+
description: Optional[str] = None,
109+
metadata: Optional[Dict[str, str]] = None,
110+
inactivityTimeout: Optional[int] = None,
111+
abortTimeout: Optional[int] = None,
112+
journalRetention: Optional[int] = None,
113+
idempotencyRetention: Optional[int] = None,
114+
enableLazyState: Optional[bool] = None,
115+
ingressPrivate: Optional[bool] = None,
116+
retryPolicyInitialInterval: Optional[int] = None,
117+
retryPolicyMaxInterval: Optional[int] = None,
118+
retryPolicyMaxAttempts: Optional[int] = None,
119+
retryPolicyExponentiationFactor: Optional[float] = None,
120+
retryPolicyOnMaxAttempts: Optional[str] = None):
82121
self.name = name
83122
self.ty = ty
84123
self.handlers = handlers
@@ -90,6 +129,11 @@ def __init__(self, name: str, ty: ServiceType, handlers: List[Handler], descript
90129
self.idempotencyRetention = idempotencyRetention
91130
self.enableLazyState = enableLazyState
92131
self.ingressPrivate = ingressPrivate
132+
self.retryPolicyInitialInterval = retryPolicyInitialInterval
133+
self.retryPolicyMaxInterval = retryPolicyMaxInterval
134+
self.retryPolicyMaxAttempts = retryPolicyMaxAttempts
135+
self.retryPolicyExponentiationFactor = retryPolicyExponentiationFactor
136+
self.retryPolicyOnMaxAttempts = retryPolicyOnMaxAttempts
93137

94138
class Endpoint:
95139
def __init__(self, protocolMode: ProtocolMode, minProtocolVersion: int, maxProtocolVersion: int, services: List[Service]):
@@ -163,7 +207,7 @@ def json_schema_from_type_hint(type_hint: Optional[TypeHint[Any]]) -> Any:
163207
return type_hint_to_json_schema(type_hint.annotation)
164208

165209

166-
# pylint: disable=R0912
210+
# pylint: disable=R0912,R0915
167211
def compute_discovery_json(endpoint: RestateEndpoint,
168212
version: int,
169213
discovered_as: typing.Literal["bidi", "request_response"]) -> str:
@@ -174,6 +218,31 @@ def compute_discovery_json(endpoint: RestateEndpoint,
174218
ep = compute_discovery(endpoint, discovered_as)
175219

176220
# Validate that new discovery fields aren't used with older protocol versions
221+
if version <= 3:
222+
for service in ep.services:
223+
if service.retryPolicyInitialInterval is not None:
224+
raise ValueError("retryPolicyInitialInterval is only supported in discovery protocol version 4")
225+
if service.retryPolicyMaxInterval is not None:
226+
raise ValueError("retryPolicyMaxInterval is only supported in discovery protocol version 4")
227+
if service.retryPolicyMaxAttempts is not None:
228+
raise ValueError("retryPolicyMaxAttempts is only supported in discovery protocol version 4")
229+
if service.retryPolicyExponentiationFactor is not None:
230+
raise ValueError("retryPolicyExponentiationFactor is only supported in discovery protocol version 4")
231+
if service.retryPolicyOnMaxAttempts is not None:
232+
raise ValueError("retryPolicyOnMaxAttempts is only supported in discovery protocol version 4")
233+
234+
for handler in service.handlers:
235+
if handler.retryPolicyInitialInterval is not None:
236+
raise ValueError("retryPolicyInitialInterval is only supported in discovery protocol version 4")
237+
if handler.retryPolicyMaxInterval is not None:
238+
raise ValueError("retryPolicyMaxInterval is only supported in discovery protocol version 4")
239+
if handler.retryPolicyMaxAttempts is not None:
240+
raise ValueError("retryPolicyMaxAttempts is only supported in discovery protocol version 4")
241+
if handler.retryPolicyExponentiationFactor is not None:
242+
raise ValueError("retryPolicyExponentiationFactor is only supported in discovery protocol version 4")
243+
if handler.retryPolicyOnMaxAttempts is not None:
244+
raise ValueError("retryPolicyOnMaxAttempts is only supported in discovery protocol version 4")
245+
177246
if version <= 2:
178247
# Check for new discovery fields in version 3 that shouldn't be used in version 2 or lower
179248
for service in ep.services:
@@ -253,21 +322,31 @@ def compute_discovery(endpoint: RestateEndpoint, discovered_as : typing.Literal[
253322
idempotencyRetention=int(handler.idempotency_retention.total_seconds() * 1000) if handler.idempotency_retention else None,
254323
workflowCompletionRetention=int(handler.workflow_retention.total_seconds() * 1000) if handler.workflow_retention else None,
255324
enableLazyState=handler.enable_lazy_state,
256-
ingressPrivate=handler.ingress_private))
325+
ingressPrivate=handler.ingress_private,
326+
retryPolicyInitialInterval=int(handler.invocation_retry_policy.initial_interval.total_seconds() * 1000) if handler.invocation_retry_policy and handler.invocation_retry_policy.initial_interval else None,
327+
retryPolicyMaxInterval=int(handler.invocation_retry_policy.max_interval.total_seconds() * 1000) if handler.invocation_retry_policy and handler.invocation_retry_policy.max_interval else None,
328+
retryPolicyMaxAttempts=int(handler.invocation_retry_policy.max_attempts) if handler.invocation_retry_policy and handler.invocation_retry_policy.max_attempts is not None else None,
329+
retryPolicyExponentiationFactor=float(handler.invocation_retry_policy.exponentiation_factor) if handler.invocation_retry_policy and handler.invocation_retry_policy.exponentiation_factor is not None else None,
330+
retryPolicyOnMaxAttempts=(handler.invocation_retry_policy.on_max_attempts.upper() if handler.invocation_retry_policy and handler.invocation_retry_policy.on_max_attempts is not None else None)))
257331
# add the service
258332
description = service.service_tag.description
259333
metadata = service.service_tag.metadata
260334
services.append(Service(name=service.name,
261-
ty=service_type,
262-
handlers=service_handlers,
263-
description=description,
264-
metadata=metadata,
265-
inactivityTimeout=int(service.inactivity_timeout.total_seconds() * 1000) if service.inactivity_timeout else None,
266-
abortTimeout=int(service.abort_timeout.total_seconds() * 1000) if service.abort_timeout else None,
267-
journalRetention=int(service.journal_retention.total_seconds() * 1000) if service.journal_retention else None,
268-
idempotencyRetention=int(service.idempotency_retention.total_seconds() * 1000) if service.idempotency_retention else None,
269-
enableLazyState=service.enable_lazy_state if hasattr(service, 'enable_lazy_state') else None,
270-
ingressPrivate=service.ingress_private))
335+
ty=service_type,
336+
handlers=service_handlers,
337+
description=description,
338+
metadata=metadata,
339+
inactivityTimeout=int(service.inactivity_timeout.total_seconds() * 1000) if service.inactivity_timeout else None,
340+
abortTimeout=int(service.abort_timeout.total_seconds() * 1000) if service.abort_timeout else None,
341+
journalRetention=int(service.journal_retention.total_seconds() * 1000) if service.journal_retention else None,
342+
idempotencyRetention=int(service.idempotency_retention.total_seconds() * 1000) if service.idempotency_retention else None,
343+
enableLazyState=service.enable_lazy_state if hasattr(service, 'enable_lazy_state') else None,
344+
ingressPrivate=service.ingress_private,
345+
retryPolicyInitialInterval=int(service.invocation_retry_policy.initial_interval.total_seconds() * 1000) if service.invocation_retry_policy and service.invocation_retry_policy.initial_interval else None,
346+
retryPolicyMaxInterval=int(service.invocation_retry_policy.max_interval.total_seconds() * 1000) if service.invocation_retry_policy and service.invocation_retry_policy.max_interval else None,
347+
retryPolicyMaxAttempts=int(service.invocation_retry_policy.max_attempts) if service.invocation_retry_policy and service.invocation_retry_policy.max_attempts is not None else None,
348+
retryPolicyExponentiationFactor=float(service.invocation_retry_policy.exponentiation_factor) if service.invocation_retry_policy and service.invocation_retry_policy.exponentiation_factor is not None else None,
349+
retryPolicyOnMaxAttempts=(service.invocation_retry_policy.on_max_attempts.upper() if service.invocation_retry_policy and service.invocation_retry_policy.on_max_attempts is not None else None)))
271350

272351
if endpoint.protocol:
273352
protocol_mode = PROTOCOL_MODES[endpoint.protocol]

python/restate/handler.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
from inspect import Signature
2121
from typing import Any, Callable, Awaitable, Dict, Generic, Literal, Optional, TypeVar
2222

23+
from restate.retry_policy import InvocationRetryPolicy
24+
2325
from restate.context import HandlerType
2426
from restate.exceptions import TerminalError
2527
from restate.serde import DefaultSerde, PydanticJsonSerde, Serde, is_pydantic
@@ -122,6 +124,7 @@ class Handler(Generic[I, O]):
122124
workflow_retention: Workflow completion retention duration.
123125
enable_lazy_state: If true, lazy state is enabled.
124126
ingress_private: If true, the handler cannot be invoked from the HTTP nor Kafka ingress.
127+
invocation_retry_policy: Optional retry policy configuration applied to this handler.
125128
"""
126129
service_tag: ServiceTag
127130
handler_io: HandlerIO[I, O]
@@ -138,6 +141,7 @@ class Handler(Generic[I, O]):
138141
workflow_retention: Optional[timedelta] = None
139142
enable_lazy_state: Optional[bool] = None
140143
ingress_private: Optional[bool] = None
144+
invocation_retry_policy: Optional[InvocationRetryPolicy] = None
141145

142146

143147
# disable too many arguments warning
@@ -157,7 +161,8 @@ def make_handler(service_tag: ServiceTag,
157161
idempotency_retention: Optional[timedelta] = None,
158162
workflow_retention: Optional[timedelta] = None,
159163
enable_lazy_state: Optional[bool] = None,
160-
ingress_private: Optional[bool] = None) -> Handler[I, O]:
164+
ingress_private: Optional[bool] = None,
165+
invocation_retry_policy: Optional[InvocationRetryPolicy] = None) -> Handler[I, O]:
161166
"""
162167
Factory function to create a handler.
163168
@@ -177,6 +182,8 @@ def make_handler(service_tag: ServiceTag,
177182
workflow_retention: Workflow completion retention duration.
178183
enable_lazy_state: If true, lazy state is enabled.
179184
ingress_private: If true, the handler cannot be invoked from the HTTP nor Kafka ingress.
185+
invocation_retry_policy: Retry policy used by Restate when invoking this handler.
186+
NOTE: only supported with restate-server >= 1.5.
180187
"""
181188
# try to deduce the handler name
182189
handler_name = name
@@ -205,7 +212,8 @@ def make_handler(service_tag: ServiceTag,
205212
idempotency_retention=idempotency_retention,
206213
workflow_retention=workflow_retention,
207214
enable_lazy_state=enable_lazy_state,
208-
ingress_private=ingress_private)
215+
ingress_private=ingress_private,
216+
invocation_retry_policy=invocation_retry_policy)
209217

210218
vars(wrapped)[RESTATE_UNIQUE_HANDLER_SYMBOL] = handler
211219
return handler

python/restate/object.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from datetime import timedelta
2121

2222
from restate.serde import Serde, DefaultSerde
23+
from restate.retry_policy import InvocationRetryPolicy
2324
from restate.handler import Handler, HandlerIO, ServiceTag, make_handler
2425

2526
I = typing.TypeVar('I')
@@ -68,6 +69,7 @@ class VirtualObject:
6869
HTTP and Kafka ingress, but only from other services.
6970
NOTE: You can set this field only if you register this service against restate-server >= 1.4,
7071
otherwise the service discovery will fail.
72+
invocation_retry_policy (InvocationRetryPolicy, optional): Retry policy applied for all invocations to this virtual object.
7173
"""
7274

7375
handlers: typing.Dict[str, Handler[typing.Any, typing.Any]]
@@ -80,7 +82,8 @@ def __init__(self, name,
8082
journal_retention: typing.Optional[timedelta] = None,
8183
idempotency_retention: typing.Optional[timedelta] = None,
8284
enable_lazy_state: typing.Optional[bool] = None,
83-
ingress_private: typing.Optional[bool] = None):
85+
ingress_private: typing.Optional[bool] = None,
86+
invocation_retry_policy: typing.Optional[InvocationRetryPolicy] = None):
8487
self.service_tag = ServiceTag("object", name, description, metadata)
8588
self.handlers = {}
8689
self.inactivity_timeout = inactivity_timeout
@@ -89,6 +92,7 @@ def __init__(self, name,
8992
self.idempotency_retention = idempotency_retention
9093
self.enable_lazy_state = enable_lazy_state
9194
self.ingress_private = ingress_private
95+
self.invocation_retry_policy = invocation_retry_policy
9296

9397
@property
9498
def name(self):
@@ -111,7 +115,8 @@ def handler(self,
111115
journal_retention: typing.Optional[timedelta] = None,
112116
idempotency_retention: typing.Optional[timedelta] = None,
113117
enable_lazy_state: typing.Optional[bool] = None,
114-
ingress_private: typing.Optional[bool] = None) -> typing.Callable:
118+
ingress_private: typing.Optional[bool] = None,
119+
invocation_retry_policy: typing.Optional[InvocationRetryPolicy] = None) -> typing.Callable:
115120
"""
116121
Decorator for defining a handler function.
117122
@@ -151,6 +156,7 @@ def handler(self,
151156
but only from other services.
152157
NOTE: You can set this field only if you register this service against restate-server >= 1.4,
153158
otherwise the service discovery will fail.
159+
invocation_retry_policy (InvocationRetryPolicy, optional): Retry policy applied for all invocations to this handler.
154160
155161
Returns:
156162
Callable: The decorated function.
@@ -174,7 +180,7 @@ def wrapped(*args, **kwargs):
174180
signature = inspect.signature(fn, eval_str=True)
175181
handler = make_handler(self.service_tag, handler_io, name, kind, wrapped, signature, inspect.getdoc(fn), metadata,
176182
inactivity_timeout, abort_timeout, journal_retention, idempotency_retention,
177-
None, enable_lazy_state, ingress_private)
183+
None, enable_lazy_state, ingress_private, invocation_retry_policy)
178184
self.handlers[handler.name] = handler
179185
return wrapped
180186

0 commit comments

Comments
 (0)