Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion python/restate/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
# types
from .context import Context, ObjectContext, ObjectSharedContext
from .context import WorkflowContext, WorkflowSharedContext
from .retry_policy import InvocationRetryPolicy
# pylint: disable=line-too-long
from .context import DurablePromise, RestateDurableFuture, RestateDurableCallFuture, RestateDurableSleepFuture, SendHandle, RunOptions
from .exceptions import TerminalError
Expand Down Expand Up @@ -65,5 +66,6 @@ def test_harness(app, # type: ignore
"wait_completed",
"select",
"logging",
"RestateLoggingFilter"
"RestateLoggingFilter",
"InvocationRetryPolicy"
]
111 changes: 95 additions & 16 deletions python/restate/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,26 @@ def __init__(self, setContentTypeIfEmpty: bool, contentType: Optional[str] = Non
self.jsonSchema = jsonSchema

class Handler:
# pylint: disable=R0902
def __init__(self, name: str, ty: Optional[ServiceHandlerType] = None, input: Optional[InputPayload | Dict[str, str]] = None, output: Optional[OutputPayload] = None, description: Optional[str] = None, metadata: Optional[Dict[str, str]] = None, inactivityTimeout: Optional[int] = None, abortTimeout: Optional[int] = None, journalRetention: Optional[int] = None, idempotencyRetention: Optional[int] = None, workflowCompletionRetention: Optional[int] = None, enableLazyState: Optional[bool] = None, ingressPrivate: Optional[bool] = None):
# pylint: disable=R0902,R0914
def __init__(self,
name: str,
ty: Optional[ServiceHandlerType] = None,
input: Optional[InputPayload | Dict[str, str]] = None,
output: Optional[OutputPayload] = None,
description: Optional[str] = None,
metadata: Optional[Dict[str, str]] = None,
inactivityTimeout: Optional[int] = None,
abortTimeout: Optional[int] = None,
journalRetention: Optional[int] = None,
idempotencyRetention: Optional[int] = None,
workflowCompletionRetention: Optional[int] = None,
enableLazyState: Optional[bool] = None,
ingressPrivate: Optional[bool] = None,
retryPolicyInitialInterval: Optional[int] = None,
retryPolicyMaxInterval: Optional[int] = None,
retryPolicyMaxAttempts: Optional[int] = None,
retryPolicyExponentiationFactor: Optional[float] = None,
retryPolicyOnMaxAttempts: Optional[str] = None):
self.name = name
self.ty = ty
self.input = input
Expand All @@ -75,10 +93,31 @@ def __init__(self, name: str, ty: Optional[ServiceHandlerType] = None, input: Op
self.workflowCompletionRetention = workflowCompletionRetention
self.enableLazyState = enableLazyState
self.ingressPrivate = ingressPrivate
self.retryPolicyInitialInterval = retryPolicyInitialInterval
self.retryPolicyMaxInterval = retryPolicyMaxInterval
self.retryPolicyMaxAttempts = retryPolicyMaxAttempts
self.retryPolicyExponentiationFactor = retryPolicyExponentiationFactor
self.retryPolicyOnMaxAttempts = retryPolicyOnMaxAttempts

class Service:
# pylint: disable=R0902
def __init__(self, name: str, ty: ServiceType, handlers: List[Handler], description: Optional[str] = None, metadata: Optional[Dict[str, str]] = None, inactivityTimeout: Optional[int] = None, abortTimeout: Optional[int] = None, journalRetention: Optional[int] = None, idempotencyRetention: Optional[int] = None, enableLazyState: Optional[bool] = None, ingressPrivate: Optional[bool] = None):
# pylint: disable=R0902,R0914
def __init__(self,
name: str,
ty: ServiceType,
handlers: List[Handler],
description: Optional[str] = None,
metadata: Optional[Dict[str, str]] = None,
inactivityTimeout: Optional[int] = None,
abortTimeout: Optional[int] = None,
journalRetention: Optional[int] = None,
idempotencyRetention: Optional[int] = None,
enableLazyState: Optional[bool] = None,
ingressPrivate: Optional[bool] = None,
retryPolicyInitialInterval: Optional[int] = None,
retryPolicyMaxInterval: Optional[int] = None,
retryPolicyMaxAttempts: Optional[int] = None,
retryPolicyExponentiationFactor: Optional[float] = None,
retryPolicyOnMaxAttempts: Optional[str] = None):
self.name = name
self.ty = ty
self.handlers = handlers
Expand All @@ -90,6 +129,11 @@ def __init__(self, name: str, ty: ServiceType, handlers: List[Handler], descript
self.idempotencyRetention = idempotencyRetention
self.enableLazyState = enableLazyState
self.ingressPrivate = ingressPrivate
self.retryPolicyInitialInterval = retryPolicyInitialInterval
self.retryPolicyMaxInterval = retryPolicyMaxInterval
self.retryPolicyMaxAttempts = retryPolicyMaxAttempts
self.retryPolicyExponentiationFactor = retryPolicyExponentiationFactor
self.retryPolicyOnMaxAttempts = retryPolicyOnMaxAttempts

class Endpoint:
def __init__(self, protocolMode: ProtocolMode, minProtocolVersion: int, maxProtocolVersion: int, services: List[Service]):
Expand Down Expand Up @@ -163,7 +207,7 @@ def json_schema_from_type_hint(type_hint: Optional[TypeHint[Any]]) -> Any:
return type_hint_to_json_schema(type_hint.annotation)


# pylint: disable=R0912
# pylint: disable=R0912,R0915
def compute_discovery_json(endpoint: RestateEndpoint,
version: int,
discovered_as: typing.Literal["bidi", "request_response"]) -> str:
Expand All @@ -174,6 +218,31 @@ def compute_discovery_json(endpoint: RestateEndpoint,
ep = compute_discovery(endpoint, discovered_as)

# Validate that new discovery fields aren't used with older protocol versions
if version <= 3:
for service in ep.services:
if service.retryPolicyInitialInterval is not None:
raise ValueError("retryPolicyInitialInterval is only supported in discovery protocol version 4")
if service.retryPolicyMaxInterval is not None:
raise ValueError("retryPolicyMaxInterval is only supported in discovery protocol version 4")
if service.retryPolicyMaxAttempts is not None:
raise ValueError("retryPolicyMaxAttempts is only supported in discovery protocol version 4")
if service.retryPolicyExponentiationFactor is not None:
raise ValueError("retryPolicyExponentiationFactor is only supported in discovery protocol version 4")
if service.retryPolicyOnMaxAttempts is not None:
raise ValueError("retryPolicyOnMaxAttempts is only supported in discovery protocol version 4")

for handler in service.handlers:
if handler.retryPolicyInitialInterval is not None:
raise ValueError("retryPolicyInitialInterval is only supported in discovery protocol version 4")
if handler.retryPolicyMaxInterval is not None:
raise ValueError("retryPolicyMaxInterval is only supported in discovery protocol version 4")
if handler.retryPolicyMaxAttempts is not None:
raise ValueError("retryPolicyMaxAttempts is only supported in discovery protocol version 4")
if handler.retryPolicyExponentiationFactor is not None:
raise ValueError("retryPolicyExponentiationFactor is only supported in discovery protocol version 4")
if handler.retryPolicyOnMaxAttempts is not None:
raise ValueError("retryPolicyOnMaxAttempts is only supported in discovery protocol version 4")

if version <= 2:
# Check for new discovery fields in version 3 that shouldn't be used in version 2 or lower
for service in ep.services:
Expand Down Expand Up @@ -253,21 +322,31 @@ def compute_discovery(endpoint: RestateEndpoint, discovered_as : typing.Literal[
idempotencyRetention=int(handler.idempotency_retention.total_seconds() * 1000) if handler.idempotency_retention else None,
workflowCompletionRetention=int(handler.workflow_retention.total_seconds() * 1000) if handler.workflow_retention else None,
enableLazyState=handler.enable_lazy_state,
ingressPrivate=handler.ingress_private))
ingressPrivate=handler.ingress_private,
retryPolicyInitialInterval=int(handler.invocation_retry_policy.initial_interval.total_seconds() * 1000) if handler.invocation_retry_policy and handler.invocation_retry_policy.initial_interval else None,
retryPolicyMaxInterval=int(handler.invocation_retry_policy.max_interval.total_seconds() * 1000) if handler.invocation_retry_policy and handler.invocation_retry_policy.max_interval else None,
retryPolicyMaxAttempts=int(handler.invocation_retry_policy.max_attempts) if handler.invocation_retry_policy and handler.invocation_retry_policy.max_attempts is not None else None,
retryPolicyExponentiationFactor=float(handler.invocation_retry_policy.exponentiation_factor) if handler.invocation_retry_policy and handler.invocation_retry_policy.exponentiation_factor is not None else None,
retryPolicyOnMaxAttempts=(handler.invocation_retry_policy.on_max_attempts.upper() if handler.invocation_retry_policy and handler.invocation_retry_policy.on_max_attempts is not None else None)))
# add the service
description = service.service_tag.description
metadata = service.service_tag.metadata
services.append(Service(name=service.name,
ty=service_type,
handlers=service_handlers,
description=description,
metadata=metadata,
inactivityTimeout=int(service.inactivity_timeout.total_seconds() * 1000) if service.inactivity_timeout else None,
abortTimeout=int(service.abort_timeout.total_seconds() * 1000) if service.abort_timeout else None,
journalRetention=int(service.journal_retention.total_seconds() * 1000) if service.journal_retention else None,
idempotencyRetention=int(service.idempotency_retention.total_seconds() * 1000) if service.idempotency_retention else None,
enableLazyState=service.enable_lazy_state if hasattr(service, 'enable_lazy_state') else None,
ingressPrivate=service.ingress_private))
ty=service_type,
handlers=service_handlers,
description=description,
metadata=metadata,
inactivityTimeout=int(service.inactivity_timeout.total_seconds() * 1000) if service.inactivity_timeout else None,
abortTimeout=int(service.abort_timeout.total_seconds() * 1000) if service.abort_timeout else None,
journalRetention=int(service.journal_retention.total_seconds() * 1000) if service.journal_retention else None,
idempotencyRetention=int(service.idempotency_retention.total_seconds() * 1000) if service.idempotency_retention else None,
enableLazyState=service.enable_lazy_state if hasattr(service, 'enable_lazy_state') else None,
ingressPrivate=service.ingress_private,
retryPolicyInitialInterval=int(service.invocation_retry_policy.initial_interval.total_seconds() * 1000) if service.invocation_retry_policy and service.invocation_retry_policy.initial_interval else None,
retryPolicyMaxInterval=int(service.invocation_retry_policy.max_interval.total_seconds() * 1000) if service.invocation_retry_policy and service.invocation_retry_policy.max_interval else None,
retryPolicyMaxAttempts=int(service.invocation_retry_policy.max_attempts) if service.invocation_retry_policy and service.invocation_retry_policy.max_attempts is not None else None,
retryPolicyExponentiationFactor=float(service.invocation_retry_policy.exponentiation_factor) if service.invocation_retry_policy and service.invocation_retry_policy.exponentiation_factor is not None else None,
retryPolicyOnMaxAttempts=(service.invocation_retry_policy.on_max_attempts.upper() if service.invocation_retry_policy and service.invocation_retry_policy.on_max_attempts is not None else None)))

if endpoint.protocol:
protocol_mode = PROTOCOL_MODES[endpoint.protocol]
Expand Down
12 changes: 10 additions & 2 deletions python/restate/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
from inspect import Signature
from typing import Any, Callable, Awaitable, Dict, Generic, Literal, Optional, TypeVar

from restate.retry_policy import InvocationRetryPolicy

from restate.context import HandlerType
from restate.exceptions import TerminalError
from restate.serde import DefaultSerde, PydanticJsonSerde, Serde, is_pydantic
Expand Down Expand Up @@ -122,6 +124,7 @@ class Handler(Generic[I, O]):
workflow_retention: Workflow completion retention duration.
enable_lazy_state: If true, lazy state is enabled.
ingress_private: If true, the handler cannot be invoked from the HTTP nor Kafka ingress.
invocation_retry_policy: Optional retry policy configuration applied to this handler.
"""
service_tag: ServiceTag
handler_io: HandlerIO[I, O]
Expand All @@ -138,6 +141,7 @@ class Handler(Generic[I, O]):
workflow_retention: Optional[timedelta] = None
enable_lazy_state: Optional[bool] = None
ingress_private: Optional[bool] = None
invocation_retry_policy: Optional[InvocationRetryPolicy] = None


# disable too many arguments warning
Expand All @@ -157,7 +161,8 @@ def make_handler(service_tag: ServiceTag,
idempotency_retention: Optional[timedelta] = None,
workflow_retention: Optional[timedelta] = None,
enable_lazy_state: Optional[bool] = None,
ingress_private: Optional[bool] = None) -> Handler[I, O]:
ingress_private: Optional[bool] = None,
invocation_retry_policy: Optional[InvocationRetryPolicy] = None) -> Handler[I, O]:
"""
Factory function to create a handler.

Expand All @@ -177,6 +182,8 @@ def make_handler(service_tag: ServiceTag,
workflow_retention: Workflow completion retention duration.
enable_lazy_state: If true, lazy state is enabled.
ingress_private: If true, the handler cannot be invoked from the HTTP nor Kafka ingress.
invocation_retry_policy: Retry policy used by Restate when invoking this handler.
NOTE: only supported with restate-server >= 1.5.
"""
# try to deduce the handler name
handler_name = name
Expand Down Expand Up @@ -205,7 +212,8 @@ def make_handler(service_tag: ServiceTag,
idempotency_retention=idempotency_retention,
workflow_retention=workflow_retention,
enable_lazy_state=enable_lazy_state,
ingress_private=ingress_private)
ingress_private=ingress_private,
invocation_retry_policy=invocation_retry_policy)

vars(wrapped)[RESTATE_UNIQUE_HANDLER_SYMBOL] = handler
return handler
Expand Down
12 changes: 9 additions & 3 deletions python/restate/object.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from datetime import timedelta

from restate.serde import Serde, DefaultSerde
from restate.retry_policy import InvocationRetryPolicy
from restate.handler import Handler, HandlerIO, ServiceTag, make_handler

I = typing.TypeVar('I')
Expand Down Expand Up @@ -68,6 +69,7 @@ class VirtualObject:
HTTP and Kafka ingress, but only from other services.
NOTE: You can set this field only if you register this service against restate-server >= 1.4,
otherwise the service discovery will fail.
invocation_retry_policy (InvocationRetryPolicy, optional): Retry policy applied for all invocations to this virtual object.
"""

handlers: typing.Dict[str, Handler[typing.Any, typing.Any]]
Expand All @@ -80,7 +82,8 @@ def __init__(self, name,
journal_retention: typing.Optional[timedelta] = None,
idempotency_retention: typing.Optional[timedelta] = None,
enable_lazy_state: typing.Optional[bool] = None,
ingress_private: typing.Optional[bool] = None):
ingress_private: typing.Optional[bool] = None,
invocation_retry_policy: typing.Optional[InvocationRetryPolicy] = None):
self.service_tag = ServiceTag("object", name, description, metadata)
self.handlers = {}
self.inactivity_timeout = inactivity_timeout
Expand All @@ -89,6 +92,7 @@ def __init__(self, name,
self.idempotency_retention = idempotency_retention
self.enable_lazy_state = enable_lazy_state
self.ingress_private = ingress_private
self.invocation_retry_policy = invocation_retry_policy

@property
def name(self):
Expand All @@ -111,7 +115,8 @@ def handler(self,
journal_retention: typing.Optional[timedelta] = None,
idempotency_retention: typing.Optional[timedelta] = None,
enable_lazy_state: typing.Optional[bool] = None,
ingress_private: typing.Optional[bool] = None) -> typing.Callable:
ingress_private: typing.Optional[bool] = None,
invocation_retry_policy: typing.Optional[InvocationRetryPolicy] = None) -> typing.Callable:
"""
Decorator for defining a handler function.

Expand Down Expand Up @@ -151,6 +156,7 @@ def handler(self,
but only from other services.
NOTE: You can set this field only if you register this service against restate-server >= 1.4,
otherwise the service discovery will fail.
invocation_retry_policy (InvocationRetryPolicy, optional): Retry policy applied for all invocations to this handler.

Returns:
Callable: The decorated function.
Expand All @@ -174,7 +180,7 @@ def wrapped(*args, **kwargs):
signature = inspect.signature(fn, eval_str=True)
handler = make_handler(self.service_tag, handler_io, name, kind, wrapped, signature, inspect.getdoc(fn), metadata,
inactivity_timeout, abort_timeout, journal_retention, idempotency_retention,
None, enable_lazy_state, ingress_private)
None, enable_lazy_state, ingress_private, invocation_retry_policy)
self.handlers[handler.name] = handler
return wrapped

Expand Down
Loading