diff --git a/temporalio/worker/_interceptor.py b/temporalio/worker/_interceptor.py index 7119b0665..a60ebb1d5 100644 --- a/temporalio/worker/_interceptor.py +++ b/temporalio/worker/_interceptor.py @@ -295,12 +295,13 @@ class StartNexusOperationInput(Generic[InputT, OutputT]): endpoint: str service: str - operation: Union[nexusrpc.Operation[InputT, OutputT], str, Callable[..., Any]] + operation: nexusrpc.Operation[InputT, OutputT] | str | Callable[..., Any] input: InputT - schedule_to_close_timeout: Optional[timedelta] + schedule_to_close_timeout: timedelta | None cancellation_type: temporalio.workflow.NexusOperationCancellationType - headers: Optional[Mapping[str, str]] - output_type: Optional[Type[OutputT]] = None + headers: Mapping[str, str] | None + summary: str | None + output_type: Type[OutputT] | None = None def __post_init__(self) -> None: """Initialize operation-specific attributes after dataclass creation.""" diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index 10fbbcc72..bfb49f6d7 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -1581,12 +1581,13 @@ async def workflow_start_nexus_operation( self, endpoint: str, service: str, - operation: Union[nexusrpc.Operation[InputT, OutputT], str, Callable[..., Any]], + operation: nexusrpc.Operation[InputT, OutputT] | str | Callable[..., Any], input: Any, - output_type: Optional[Type[OutputT]], - schedule_to_close_timeout: Optional[timedelta], + output_type: Type[OutputT] | None, + schedule_to_close_timeout: timedelta | None, cancellation_type: temporalio.workflow.NexusOperationCancellationType, - headers: Optional[Mapping[str, str]], + headers: Mapping[str, str] | None, + summary: str | None, ) -> temporalio.workflow.NexusOperationHandle[OutputT]: # start_nexus_operation return await self._outbound.start_nexus_operation( @@ -1599,6 +1600,7 @@ async def workflow_start_nexus_operation( schedule_to_close_timeout=schedule_to_close_timeout, cancellation_type=cancellation_type, headers=headers, + summary=summary, ) ) @@ -3330,6 +3332,11 @@ def _apply_schedule_command(self) -> None: for key, val in self._input.headers.items(): v.nexus_header[key] = val + if self._input.summary: + command.user_metadata.summary.CopyFrom( + self._payload_converter.to_payload(self._input.summary) + ) + def _apply_cancel_command( self, command: temporalio.bridge.proto.workflow_commands.WorkflowCommand, diff --git a/temporalio/workflow.py b/temporalio/workflow.py index e84bb40ee..506769c63 100644 --- a/temporalio/workflow.py +++ b/temporalio/workflow.py @@ -858,12 +858,13 @@ async def workflow_start_nexus_operation( self, endpoint: str, service: str, - operation: Union[nexusrpc.Operation[InputT, OutputT], str, Callable[..., Any]], + operation: nexusrpc.Operation[InputT, OutputT] | str | Callable[..., Any], input: Any, - output_type: Optional[Type[OutputT]], - schedule_to_close_timeout: Optional[timedelta], + output_type: Type[OutputT] | None, + schedule_to_close_timeout: timedelta | None, cancellation_type: temporalio.workflow.NexusOperationCancellationType, - headers: Optional[Mapping[str, str]], + headers: Mapping[str, str] | None, + summary: str | None, ) -> NexusOperationHandle[OutputT]: ... @abstractmethod @@ -5346,10 +5347,11 @@ async def start_operation( operation: nexusrpc.Operation[InputT, OutputT], input: InputT, *, - output_type: Optional[Type[OutputT]] = None, - schedule_to_close_timeout: Optional[timedelta] = None, + output_type: Type[OutputT] | None = None, + schedule_to_close_timeout: timedelta | None = None, cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED, - headers: Optional[Mapping[str, str]] = None, + headers: Mapping[str, str] | None = None, + summary: str | None = None, ) -> NexusOperationHandle[OutputT]: ... # Overload for string operation name @@ -5360,10 +5362,11 @@ async def start_operation( operation: str, input: Any, *, - output_type: Optional[Type[OutputT]] = None, - schedule_to_close_timeout: Optional[timedelta] = None, + output_type: Type[OutputT] | None = None, + schedule_to_close_timeout: timedelta | None = None, cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED, - headers: Optional[Mapping[str, str]] = None, + headers: Mapping[str, str] | None = None, + summary: str | None = None, ) -> NexusOperationHandle[OutputT]: ... # Overload for workflow_run_operation methods @@ -5377,10 +5380,11 @@ async def start_operation( ], input: InputT, *, - output_type: Optional[Type[OutputT]] = None, - schedule_to_close_timeout: Optional[timedelta] = None, + output_type: Type[OutputT] | None = None, + schedule_to_close_timeout: timedelta | None = None, cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED, - headers: Optional[Mapping[str, str]] = None, + headers: Mapping[str, str] | None = None, + summary: str | None = None, ) -> NexusOperationHandle[OutputT]: ... # Overload for sync_operation methods (async def) @@ -5394,10 +5398,11 @@ async def start_operation( ], input: InputT, *, - output_type: Optional[Type[OutputT]] = None, - schedule_to_close_timeout: Optional[timedelta] = None, + output_type: Type[OutputT] | None = None, + schedule_to_close_timeout: timedelta | None = None, cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED, - headers: Optional[Mapping[str, str]] = None, + headers: Mapping[str, str] | None = None, + summary: str | None = None, ) -> NexusOperationHandle[OutputT]: ... # Overload for sync_operation methods (def) @@ -5411,10 +5416,11 @@ async def start_operation( ], input: InputT, *, - output_type: Optional[Type[OutputT]] = None, - schedule_to_close_timeout: Optional[timedelta] = None, + output_type: Type[OutputT] | None = None, + schedule_to_close_timeout: timedelta | None = None, cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED, - headers: Optional[Mapping[str, str]] = None, + headers: Mapping[str, str] | None = None, + summary: str | None = None, ) -> NexusOperationHandle[OutputT]: ... # Overload for operation_handler @@ -5427,10 +5433,11 @@ async def start_operation( ], input: InputT, *, - output_type: Optional[Type[OutputT]] = None, - schedule_to_close_timeout: Optional[timedelta] = None, + output_type: Type[OutputT] | None = None, + schedule_to_close_timeout: timedelta | None = None, cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED, - headers: Optional[Mapping[str, str]] = None, + headers: Mapping[str, str] | None = None, + summary: str | None = None, ) -> NexusOperationHandle[OutputT]: ... @abstractmethod @@ -5439,10 +5446,11 @@ async def start_operation( operation: Any, input: Any, *, - output_type: Optional[Type[OutputT]] = None, - schedule_to_close_timeout: Optional[timedelta] = None, + output_type: Type[OutputT] | None = None, + schedule_to_close_timeout: timedelta | None = None, cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED, - headers: Optional[Mapping[str, str]] = None, + headers: Mapping[str, str] | None = None, + summary: str | None = None, ) -> Any: """Start a Nexus operation and return its handle. @@ -5469,10 +5477,11 @@ async def execute_operation( operation: nexusrpc.Operation[InputT, OutputT], input: InputT, *, - output_type: Optional[Type[OutputT]] = None, - schedule_to_close_timeout: Optional[timedelta] = None, + output_type: Type[OutputT] | None = None, + schedule_to_close_timeout: timedelta | None = None, cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED, - headers: Optional[Mapping[str, str]] = None, + headers: Mapping[str, str] | None = None, + summary: str | None = None, ) -> OutputT: ... # Overload for string operation name @@ -5483,10 +5492,11 @@ async def execute_operation( operation: str, input: Any, *, - output_type: Optional[Type[OutputT]] = None, - schedule_to_close_timeout: Optional[timedelta] = None, + output_type: Type[OutputT] | None = None, + schedule_to_close_timeout: timedelta | None = None, cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED, - headers: Optional[Mapping[str, str]] = None, + headers: Mapping[str, str] | None = None, + summary: str | None = None, ) -> OutputT: ... # Overload for workflow_run_operation methods @@ -5500,10 +5510,11 @@ async def execute_operation( ], input: InputT, *, - output_type: Optional[Type[OutputT]] = None, - schedule_to_close_timeout: Optional[timedelta] = None, + output_type: Type[OutputT] | None = None, + schedule_to_close_timeout: timedelta | None = None, cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED, - headers: Optional[Mapping[str, str]] = None, + headers: Mapping[str, str] | None = None, + summary: str | None = None, ) -> OutputT: ... # TODO(nexus-preview): in practice, both these overloads match an async def sync @@ -5520,10 +5531,11 @@ async def execute_operation( ], input: InputT, *, - output_type: Optional[Type[OutputT]] = None, - schedule_to_close_timeout: Optional[timedelta] = None, + output_type: Type[OutputT] | None = None, + schedule_to_close_timeout: timedelta | None = None, cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED, - headers: Optional[Mapping[str, str]] = None, + headers: Mapping[str, str] | None = None, + summary: str | None = None, ) -> OutputT: ... # Overload for sync_operation methods (def) @@ -5537,10 +5549,11 @@ async def execute_operation( ], input: InputT, *, - output_type: Optional[Type[OutputT]] = None, - schedule_to_close_timeout: Optional[timedelta] = None, + output_type: Type[OutputT] | None = None, + schedule_to_close_timeout: timedelta | None = None, cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED, - headers: Optional[Mapping[str, str]] = None, + headers: Mapping[str, str] | None = None, + summary: str | None = None, ) -> OutputT: ... # Overload for operation_handler @@ -5554,10 +5567,11 @@ async def execute_operation( ], input: InputT, *, - output_type: Optional[Type[OutputT]] = None, - schedule_to_close_timeout: Optional[timedelta] = None, + output_type: Type[OutputT] | None = None, + schedule_to_close_timeout: timedelta | None = None, cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED, - headers: Optional[Mapping[str, str]] = None, + headers: Mapping[str, str] | None = None, + summary: str | None = None, ) -> OutputT: ... @abstractmethod @@ -5566,10 +5580,11 @@ async def execute_operation( operation: Any, input: Any, *, - output_type: Optional[Type[OutputT]] = None, - schedule_to_close_timeout: Optional[timedelta] = None, + output_type: Type[OutputT] | None = None, + schedule_to_close_timeout: timedelta | None = None, cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED, - headers: Optional[Mapping[str, str]] = None, + headers: Mapping[str, str] | None = None, + summary: str | None = None, ) -> Any: """Execute a Nexus operation and return its result. @@ -5618,10 +5633,11 @@ async def start_operation( operation: Any, input: Any, *, - output_type: Optional[Type] = None, - schedule_to_close_timeout: Optional[timedelta] = None, + output_type: Type[OutputT] | None = None, + schedule_to_close_timeout: timedelta | None = None, cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED, - headers: Optional[Mapping[str, str]] = None, + headers: Mapping[str, str] | None = None, + summary: str | None = None, ) -> Any: return ( await temporalio.workflow._Runtime.current().workflow_start_nexus_operation( @@ -5633,6 +5649,7 @@ async def start_operation( schedule_to_close_timeout=schedule_to_close_timeout, cancellation_type=cancellation_type, headers=headers, + summary=summary, ) ) @@ -5641,10 +5658,11 @@ async def execute_operation( operation: Any, input: Any, *, - output_type: Optional[Type] = None, - schedule_to_close_timeout: Optional[timedelta] = None, + output_type: Type[OutputT] | None = None, + schedule_to_close_timeout: timedelta | None = None, cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED, - headers: Optional[Mapping[str, str]] = None, + headers: Mapping[str, str] | None = None, + summary: str | None = None, ) -> Any: handle = await self.start_operation( operation, @@ -5653,6 +5671,7 @@ async def execute_operation( schedule_to_close_timeout=schedule_to_close_timeout, cancellation_type=cancellation_type, headers=headers, + summary=summary, ) return await handle diff --git a/tests/nexus/test_workflow_caller.py b/tests/nexus/test_workflow_caller.py index d53c178fa..0c0dd988a 100644 --- a/tests/nexus/test_workflow_caller.py +++ b/tests/nexus/test_workflow_caller.py @@ -34,10 +34,8 @@ WorkflowHandle, ) from temporalio.common import WorkflowIDConflictPolicy -from temporalio.exceptions import ( - CancelledError, - NexusOperationError, -) +from temporalio.converter import PayloadConverter +from temporalio.exceptions import ApplicationError, CancelledError, NexusOperationError from temporalio.nexus import WorkflowRunOperationContext, workflow_run_operation from temporalio.service import RPCError, RPCStatusCode from temporalio.testing import WorkflowEnvironment @@ -1061,6 +1059,73 @@ async def test_workflow_run_operation_can_execute_workflow_before_starting_backi assert result == "result-1-result-2" +@service_handler +class SimpleSyncService: + @sync_operation + async def sync_op(self, ctx: StartOperationContext, input: str) -> str: + return input + + +@workflow.defn +class ExecuteNexusOperationWithSummaryWorkflow: + @workflow.run + async def run(self, input: str, task_queue: str) -> str: + nexus_client = workflow.create_nexus_client( + service=SimpleSyncService, + endpoint=make_nexus_endpoint_name(task_queue), + ) + + op_result = await nexus_client.execute_operation( + SimpleSyncService.sync_op, input, summary="nexus operation summary" + ) + + if op_result != input: + raise ApplicationError("expected nexus operation to echo input") + + return op_result + + +async def test_nexus_operation_summary( + client: Client, + env: WorkflowEnvironment, +): + if env.supports_time_skipping: + pytest.skip("Nexus tests don't work with time-skipping server") + + task_queue = f"task-queue-{uuid.uuid4()}" + async with Worker( + client, + workflows=[ExecuteNexusOperationWithSummaryWorkflow], + nexus_service_handlers=[ + SimpleSyncService(), + ], + task_queue=task_queue, + ): + await create_nexus_endpoint(task_queue, client) + wf_id = f"wf-{uuid.uuid4()}" + handle = await client.start_workflow( + ExecuteNexusOperationWithSummaryWorkflow.run, + args=("success", task_queue), + id=wf_id, + task_queue=task_queue, + ) + result = await handle.result() + assert result == "success" + + history = await handle.fetch_history() + + nexus_events = [ + event + for event in history.events + if event.HasField("nexus_operation_scheduled_event_attributes") + ] + assert len(nexus_events) == 1 + summary_value = PayloadConverter.default.from_payload( + nexus_events[0].user_metadata.summary + ) + assert summary_value == "nexus operation summary" + + # TODO(nexus-prerelease): test invalid service interface implementations # TODO(nexus-prerelease): test caller passing output_type