From 67fac134a9a16105648b7803c7a00642edcad049 Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Wed, 19 Nov 2025 16:00:00 -0800 Subject: [PATCH 1/2] Add summary to nexus operations --- temporalio/worker/_interceptor.py | 5 +- temporalio/worker/_workflow_instance.py | 15 ++- temporalio/workflow.py | 123 ++++++++++++++---------- tests/nexus/test_workflow_caller.py | 73 +++++++++++++- 4 files changed, 154 insertions(+), 62 deletions(-) diff --git a/temporalio/worker/_interceptor.py b/temporalio/worker/_interceptor.py index 7119b0665..d521203c8 100644 --- a/temporalio/worker/_interceptor.py +++ b/temporalio/worker/_interceptor.py @@ -297,10 +297,11 @@ class StartNexusOperationInput(Generic[InputT, OutputT]): service: str operation: Union[nexusrpc.Operation[InputT, OutputT], str, Callable[..., Any]] input: InputT - schedule_to_close_timeout: Optional[timedelta] + schedule_to_close_timeout: timedelta | None cancellation_type: temporalio.workflow.NexusOperationCancellationType headers: Optional[Mapping[str, str]] - output_type: Optional[Type[OutputT]] = None + summary: str | None + output_type: Type[OutputT] | None = None def __post_init__(self) -> None: """Initialize operation-specific attributes after dataclass creation.""" diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index 10fbbcc72..bfb49f6d7 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -1581,12 +1581,13 @@ async def workflow_start_nexus_operation( self, endpoint: str, service: str, - operation: Union[nexusrpc.Operation[InputT, OutputT], str, Callable[..., Any]], + operation: nexusrpc.Operation[InputT, OutputT] | str | Callable[..., Any], input: Any, - output_type: Optional[Type[OutputT]], - schedule_to_close_timeout: Optional[timedelta], + output_type: Type[OutputT] | None, + schedule_to_close_timeout: timedelta | None, cancellation_type: temporalio.workflow.NexusOperationCancellationType, - headers: Optional[Mapping[str, str]], + headers: Mapping[str, str] | None, + summary: str | None, ) -> temporalio.workflow.NexusOperationHandle[OutputT]: # start_nexus_operation return await self._outbound.start_nexus_operation( @@ -1599,6 +1600,7 @@ async def workflow_start_nexus_operation( schedule_to_close_timeout=schedule_to_close_timeout, cancellation_type=cancellation_type, headers=headers, + summary=summary, ) ) @@ -3330,6 +3332,11 @@ def _apply_schedule_command(self) -> None: for key, val in self._input.headers.items(): v.nexus_header[key] = val + if self._input.summary: + command.user_metadata.summary.CopyFrom( + self._payload_converter.to_payload(self._input.summary) + ) + def _apply_cancel_command( self, command: temporalio.bridge.proto.workflow_commands.WorkflowCommand, diff --git a/temporalio/workflow.py b/temporalio/workflow.py index e84bb40ee..506769c63 100644 --- a/temporalio/workflow.py +++ b/temporalio/workflow.py @@ -858,12 +858,13 @@ async def workflow_start_nexus_operation( self, endpoint: str, service: str, - operation: Union[nexusrpc.Operation[InputT, OutputT], str, Callable[..., Any]], + operation: nexusrpc.Operation[InputT, OutputT] | str | Callable[..., Any], input: Any, - output_type: Optional[Type[OutputT]], - schedule_to_close_timeout: Optional[timedelta], + output_type: Type[OutputT] | None, + schedule_to_close_timeout: timedelta | None, cancellation_type: temporalio.workflow.NexusOperationCancellationType, - headers: Optional[Mapping[str, str]], + headers: Mapping[str, str] | None, + summary: str | None, ) -> NexusOperationHandle[OutputT]: ... @abstractmethod @@ -5346,10 +5347,11 @@ async def start_operation( operation: nexusrpc.Operation[InputT, OutputT], input: InputT, *, - output_type: Optional[Type[OutputT]] = None, - schedule_to_close_timeout: Optional[timedelta] = None, + output_type: Type[OutputT] | None = None, + schedule_to_close_timeout: timedelta | None = None, cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED, - headers: Optional[Mapping[str, str]] = None, + headers: Mapping[str, str] | None = None, + summary: str | None = None, ) -> NexusOperationHandle[OutputT]: ... # Overload for string operation name @@ -5360,10 +5362,11 @@ async def start_operation( operation: str, input: Any, *, - output_type: Optional[Type[OutputT]] = None, - schedule_to_close_timeout: Optional[timedelta] = None, + output_type: Type[OutputT] | None = None, + schedule_to_close_timeout: timedelta | None = None, cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED, - headers: Optional[Mapping[str, str]] = None, + headers: Mapping[str, str] | None = None, + summary: str | None = None, ) -> NexusOperationHandle[OutputT]: ... # Overload for workflow_run_operation methods @@ -5377,10 +5380,11 @@ async def start_operation( ], input: InputT, *, - output_type: Optional[Type[OutputT]] = None, - schedule_to_close_timeout: Optional[timedelta] = None, + output_type: Type[OutputT] | None = None, + schedule_to_close_timeout: timedelta | None = None, cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED, - headers: Optional[Mapping[str, str]] = None, + headers: Mapping[str, str] | None = None, + summary: str | None = None, ) -> NexusOperationHandle[OutputT]: ... # Overload for sync_operation methods (async def) @@ -5394,10 +5398,11 @@ async def start_operation( ], input: InputT, *, - output_type: Optional[Type[OutputT]] = None, - schedule_to_close_timeout: Optional[timedelta] = None, + output_type: Type[OutputT] | None = None, + schedule_to_close_timeout: timedelta | None = None, cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED, - headers: Optional[Mapping[str, str]] = None, + headers: Mapping[str, str] | None = None, + summary: str | None = None, ) -> NexusOperationHandle[OutputT]: ... # Overload for sync_operation methods (def) @@ -5411,10 +5416,11 @@ async def start_operation( ], input: InputT, *, - output_type: Optional[Type[OutputT]] = None, - schedule_to_close_timeout: Optional[timedelta] = None, + output_type: Type[OutputT] | None = None, + schedule_to_close_timeout: timedelta | None = None, cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED, - headers: Optional[Mapping[str, str]] = None, + headers: Mapping[str, str] | None = None, + summary: str | None = None, ) -> NexusOperationHandle[OutputT]: ... # Overload for operation_handler @@ -5427,10 +5433,11 @@ async def start_operation( ], input: InputT, *, - output_type: Optional[Type[OutputT]] = None, - schedule_to_close_timeout: Optional[timedelta] = None, + output_type: Type[OutputT] | None = None, + schedule_to_close_timeout: timedelta | None = None, cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED, - headers: Optional[Mapping[str, str]] = None, + headers: Mapping[str, str] | None = None, + summary: str | None = None, ) -> NexusOperationHandle[OutputT]: ... @abstractmethod @@ -5439,10 +5446,11 @@ async def start_operation( operation: Any, input: Any, *, - output_type: Optional[Type[OutputT]] = None, - schedule_to_close_timeout: Optional[timedelta] = None, + output_type: Type[OutputT] | None = None, + schedule_to_close_timeout: timedelta | None = None, cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED, - headers: Optional[Mapping[str, str]] = None, + headers: Mapping[str, str] | None = None, + summary: str | None = None, ) -> Any: """Start a Nexus operation and return its handle. @@ -5469,10 +5477,11 @@ async def execute_operation( operation: nexusrpc.Operation[InputT, OutputT], input: InputT, *, - output_type: Optional[Type[OutputT]] = None, - schedule_to_close_timeout: Optional[timedelta] = None, + output_type: Type[OutputT] | None = None, + schedule_to_close_timeout: timedelta | None = None, cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED, - headers: Optional[Mapping[str, str]] = None, + headers: Mapping[str, str] | None = None, + summary: str | None = None, ) -> OutputT: ... # Overload for string operation name @@ -5483,10 +5492,11 @@ async def execute_operation( operation: str, input: Any, *, - output_type: Optional[Type[OutputT]] = None, - schedule_to_close_timeout: Optional[timedelta] = None, + output_type: Type[OutputT] | None = None, + schedule_to_close_timeout: timedelta | None = None, cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED, - headers: Optional[Mapping[str, str]] = None, + headers: Mapping[str, str] | None = None, + summary: str | None = None, ) -> OutputT: ... # Overload for workflow_run_operation methods @@ -5500,10 +5510,11 @@ async def execute_operation( ], input: InputT, *, - output_type: Optional[Type[OutputT]] = None, - schedule_to_close_timeout: Optional[timedelta] = None, + output_type: Type[OutputT] | None = None, + schedule_to_close_timeout: timedelta | None = None, cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED, - headers: Optional[Mapping[str, str]] = None, + headers: Mapping[str, str] | None = None, + summary: str | None = None, ) -> OutputT: ... # TODO(nexus-preview): in practice, both these overloads match an async def sync @@ -5520,10 +5531,11 @@ async def execute_operation( ], input: InputT, *, - output_type: Optional[Type[OutputT]] = None, - schedule_to_close_timeout: Optional[timedelta] = None, + output_type: Type[OutputT] | None = None, + schedule_to_close_timeout: timedelta | None = None, cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED, - headers: Optional[Mapping[str, str]] = None, + headers: Mapping[str, str] | None = None, + summary: str | None = None, ) -> OutputT: ... # Overload for sync_operation methods (def) @@ -5537,10 +5549,11 @@ async def execute_operation( ], input: InputT, *, - output_type: Optional[Type[OutputT]] = None, - schedule_to_close_timeout: Optional[timedelta] = None, + output_type: Type[OutputT] | None = None, + schedule_to_close_timeout: timedelta | None = None, cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED, - headers: Optional[Mapping[str, str]] = None, + headers: Mapping[str, str] | None = None, + summary: str | None = None, ) -> OutputT: ... # Overload for operation_handler @@ -5554,10 +5567,11 @@ async def execute_operation( ], input: InputT, *, - output_type: Optional[Type[OutputT]] = None, - schedule_to_close_timeout: Optional[timedelta] = None, + output_type: Type[OutputT] | None = None, + schedule_to_close_timeout: timedelta | None = None, cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED, - headers: Optional[Mapping[str, str]] = None, + headers: Mapping[str, str] | None = None, + summary: str | None = None, ) -> OutputT: ... @abstractmethod @@ -5566,10 +5580,11 @@ async def execute_operation( operation: Any, input: Any, *, - output_type: Optional[Type[OutputT]] = None, - schedule_to_close_timeout: Optional[timedelta] = None, + output_type: Type[OutputT] | None = None, + schedule_to_close_timeout: timedelta | None = None, cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED, - headers: Optional[Mapping[str, str]] = None, + headers: Mapping[str, str] | None = None, + summary: str | None = None, ) -> Any: """Execute a Nexus operation and return its result. @@ -5618,10 +5633,11 @@ async def start_operation( operation: Any, input: Any, *, - output_type: Optional[Type] = None, - schedule_to_close_timeout: Optional[timedelta] = None, + output_type: Type[OutputT] | None = None, + schedule_to_close_timeout: timedelta | None = None, cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED, - headers: Optional[Mapping[str, str]] = None, + headers: Mapping[str, str] | None = None, + summary: str | None = None, ) -> Any: return ( await temporalio.workflow._Runtime.current().workflow_start_nexus_operation( @@ -5633,6 +5649,7 @@ async def start_operation( schedule_to_close_timeout=schedule_to_close_timeout, cancellation_type=cancellation_type, headers=headers, + summary=summary, ) ) @@ -5641,10 +5658,11 @@ async def execute_operation( operation: Any, input: Any, *, - output_type: Optional[Type] = None, - schedule_to_close_timeout: Optional[timedelta] = None, + output_type: Type[OutputT] | None = None, + schedule_to_close_timeout: timedelta | None = None, cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED, - headers: Optional[Mapping[str, str]] = None, + headers: Mapping[str, str] | None = None, + summary: str | None = None, ) -> Any: handle = await self.start_operation( operation, @@ -5653,6 +5671,7 @@ async def execute_operation( schedule_to_close_timeout=schedule_to_close_timeout, cancellation_type=cancellation_type, headers=headers, + summary=summary, ) return await handle diff --git a/tests/nexus/test_workflow_caller.py b/tests/nexus/test_workflow_caller.py index d53c178fa..421a3cc73 100644 --- a/tests/nexus/test_workflow_caller.py +++ b/tests/nexus/test_workflow_caller.py @@ -24,6 +24,7 @@ import temporalio.api.common.v1 import temporalio.api.enums.v1 import temporalio.api.history.v1 +from temporalio.converter import PayloadConverter import temporalio.nexus._operation_handlers from temporalio import nexus, workflow from temporalio.client import ( @@ -34,10 +35,7 @@ WorkflowHandle, ) from temporalio.common import WorkflowIDConflictPolicy -from temporalio.exceptions import ( - CancelledError, - NexusOperationError, -) +from temporalio.exceptions import CancelledError, NexusOperationError, ApplicationError from temporalio.nexus import WorkflowRunOperationContext, workflow_run_operation from temporalio.service import RPCError, RPCStatusCode from temporalio.testing import WorkflowEnvironment @@ -1061,6 +1059,73 @@ async def test_workflow_run_operation_can_execute_workflow_before_starting_backi assert result == "result-1-result-2" +@service_handler +class SimpleSyncService: + @sync_operation + async def sync_op(self, ctx: StartOperationContext, input: str) -> str: + return input + + +@workflow.defn +class ExecuteNexusOperationWithSummaryWorkflow: + @workflow.run + async def run(self, input: str, task_queue: str) -> str: + nexus_client = workflow.create_nexus_client( + service=SimpleSyncService, + endpoint=make_nexus_endpoint_name(task_queue), + ) + + op_result = await nexus_client.execute_operation( + SimpleSyncService.sync_op, input, summary="nexus operation summary" + ) + + if op_result != input: + raise ApplicationError("expected nexus operation to echo input") + + return op_result + + +async def test_nexus_operation_summary( + client: Client, + env: WorkflowEnvironment, +): + if env.supports_time_skipping: + pytest.skip("Nexus tests don't work with time-skipping server") + + task_queue = f"task-queue-{uuid.uuid4()}" + async with Worker( + client, + workflows=[ExecuteNexusOperationWithSummaryWorkflow], + nexus_service_handlers=[ + SimpleSyncService(), + ], + task_queue=task_queue, + ): + await create_nexus_endpoint(task_queue, client) + wf_id = f"wf-{uuid.uuid4()}" + handle = await client.start_workflow( + ExecuteNexusOperationWithSummaryWorkflow.run, + args=("success", task_queue), + id=wf_id, + task_queue=task_queue, + ) + result = await handle.result() + assert result == "success" + + history = await handle.fetch_history() + + nexus_events = [ + event + for event in history.events + if event.HasField("nexus_operation_scheduled_event_attributes") + ] + assert len(nexus_events) == 1 + summary_value = PayloadConverter.default.from_payload( + nexus_events[0].user_metadata.summary + ) + assert summary_value == "nexus operation summary" + + # TODO(nexus-prerelease): test invalid service interface implementations # TODO(nexus-prerelease): test caller passing output_type From bb7911062a9313181e36ae3a00474a8c516fc639 Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Wed, 19 Nov 2025 16:25:54 -0800 Subject: [PATCH 2/2] Update to newer syntax for union where previously missed. Run formatter --- temporalio/worker/_interceptor.py | 4 ++-- tests/nexus/test_workflow_caller.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/temporalio/worker/_interceptor.py b/temporalio/worker/_interceptor.py index d521203c8..a60ebb1d5 100644 --- a/temporalio/worker/_interceptor.py +++ b/temporalio/worker/_interceptor.py @@ -295,11 +295,11 @@ class StartNexusOperationInput(Generic[InputT, OutputT]): endpoint: str service: str - operation: Union[nexusrpc.Operation[InputT, OutputT], str, Callable[..., Any]] + operation: nexusrpc.Operation[InputT, OutputT] | str | Callable[..., Any] input: InputT schedule_to_close_timeout: timedelta | None cancellation_type: temporalio.workflow.NexusOperationCancellationType - headers: Optional[Mapping[str, str]] + headers: Mapping[str, str] | None summary: str | None output_type: Type[OutputT] | None = None diff --git a/tests/nexus/test_workflow_caller.py b/tests/nexus/test_workflow_caller.py index 421a3cc73..0c0dd988a 100644 --- a/tests/nexus/test_workflow_caller.py +++ b/tests/nexus/test_workflow_caller.py @@ -24,7 +24,6 @@ import temporalio.api.common.v1 import temporalio.api.enums.v1 import temporalio.api.history.v1 -from temporalio.converter import PayloadConverter import temporalio.nexus._operation_handlers from temporalio import nexus, workflow from temporalio.client import ( @@ -35,7 +34,8 @@ WorkflowHandle, ) from temporalio.common import WorkflowIDConflictPolicy -from temporalio.exceptions import CancelledError, NexusOperationError, ApplicationError +from temporalio.converter import PayloadConverter +from temporalio.exceptions import ApplicationError, CancelledError, NexusOperationError from temporalio.nexus import WorkflowRunOperationContext, workflow_run_operation from temporalio.service import RPCError, RPCStatusCode from temporalio.testing import WorkflowEnvironment