Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions temporalio/worker/_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,12 +295,13 @@ class StartNexusOperationInput(Generic[InputT, OutputT]):

endpoint: str
service: str
operation: Union[nexusrpc.Operation[InputT, OutputT], str, Callable[..., Any]]
operation: nexusrpc.Operation[InputT, OutputT] | str | Callable[..., Any]
input: InputT
schedule_to_close_timeout: Optional[timedelta]
schedule_to_close_timeout: timedelta | None
cancellation_type: temporalio.workflow.NexusOperationCancellationType
headers: Optional[Mapping[str, str]]
output_type: Optional[Type[OutputT]] = None
headers: Mapping[str, str] | None
summary: str | None
output_type: Type[OutputT] | None = None

def __post_init__(self) -> None:
"""Initialize operation-specific attributes after dataclass creation."""
Expand Down
15 changes: 11 additions & 4 deletions temporalio/worker/_workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1581,12 +1581,13 @@ async def workflow_start_nexus_operation(
self,
endpoint: str,
service: str,
operation: Union[nexusrpc.Operation[InputT, OutputT], str, Callable[..., Any]],
operation: nexusrpc.Operation[InputT, OutputT] | str | Callable[..., Any],
input: Any,
output_type: Optional[Type[OutputT]],
schedule_to_close_timeout: Optional[timedelta],
output_type: Type[OutputT] | None,
schedule_to_close_timeout: timedelta | None,
cancellation_type: temporalio.workflow.NexusOperationCancellationType,
headers: Optional[Mapping[str, str]],
headers: Mapping[str, str] | None,
summary: str | None,
) -> temporalio.workflow.NexusOperationHandle[OutputT]:
# start_nexus_operation
return await self._outbound.start_nexus_operation(
Expand All @@ -1599,6 +1600,7 @@ async def workflow_start_nexus_operation(
schedule_to_close_timeout=schedule_to_close_timeout,
cancellation_type=cancellation_type,
headers=headers,
summary=summary,
)
)

Expand Down Expand Up @@ -3330,6 +3332,11 @@ def _apply_schedule_command(self) -> None:
for key, val in self._input.headers.items():
v.nexus_header[key] = val

if self._input.summary:
command.user_metadata.summary.CopyFrom(
self._payload_converter.to_payload(self._input.summary)
)

def _apply_cancel_command(
self,
command: temporalio.bridge.proto.workflow_commands.WorkflowCommand,
Expand Down
123 changes: 71 additions & 52 deletions temporalio/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -858,12 +858,13 @@ async def workflow_start_nexus_operation(
self,
endpoint: str,
service: str,
operation: Union[nexusrpc.Operation[InputT, OutputT], str, Callable[..., Any]],
operation: nexusrpc.Operation[InputT, OutputT] | str | Callable[..., Any],
input: Any,
output_type: Optional[Type[OutputT]],
schedule_to_close_timeout: Optional[timedelta],
output_type: Type[OutputT] | None,
schedule_to_close_timeout: timedelta | None,
cancellation_type: temporalio.workflow.NexusOperationCancellationType,
headers: Optional[Mapping[str, str]],
headers: Mapping[str, str] | None,
summary: str | None,
) -> NexusOperationHandle[OutputT]: ...

@abstractmethod
Expand Down Expand Up @@ -5346,10 +5347,11 @@ async def start_operation(
operation: nexusrpc.Operation[InputT, OutputT],
input: InputT,
*,
output_type: Optional[Type[OutputT]] = None,
schedule_to_close_timeout: Optional[timedelta] = None,
output_type: Type[OutputT] | None = None,
schedule_to_close_timeout: timedelta | None = None,
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
headers: Optional[Mapping[str, str]] = None,
headers: Mapping[str, str] | None = None,
summary: str | None = None,
) -> NexusOperationHandle[OutputT]: ...

# Overload for string operation name
Expand All @@ -5360,10 +5362,11 @@ async def start_operation(
operation: str,
input: Any,
*,
output_type: Optional[Type[OutputT]] = None,
schedule_to_close_timeout: Optional[timedelta] = None,
output_type: Type[OutputT] | None = None,
schedule_to_close_timeout: timedelta | None = None,
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
headers: Optional[Mapping[str, str]] = None,
headers: Mapping[str, str] | None = None,
summary: str | None = None,
) -> NexusOperationHandle[OutputT]: ...

# Overload for workflow_run_operation methods
Expand All @@ -5377,10 +5380,11 @@ async def start_operation(
],
input: InputT,
*,
output_type: Optional[Type[OutputT]] = None,
schedule_to_close_timeout: Optional[timedelta] = None,
output_type: Type[OutputT] | None = None,
schedule_to_close_timeout: timedelta | None = None,
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
headers: Optional[Mapping[str, str]] = None,
headers: Mapping[str, str] | None = None,
summary: str | None = None,
) -> NexusOperationHandle[OutputT]: ...

# Overload for sync_operation methods (async def)
Expand All @@ -5394,10 +5398,11 @@ async def start_operation(
],
input: InputT,
*,
output_type: Optional[Type[OutputT]] = None,
schedule_to_close_timeout: Optional[timedelta] = None,
output_type: Type[OutputT] | None = None,
schedule_to_close_timeout: timedelta | None = None,
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
headers: Optional[Mapping[str, str]] = None,
headers: Mapping[str, str] | None = None,
summary: str | None = None,
) -> NexusOperationHandle[OutputT]: ...

# Overload for sync_operation methods (def)
Expand All @@ -5411,10 +5416,11 @@ async def start_operation(
],
input: InputT,
*,
output_type: Optional[Type[OutputT]] = None,
schedule_to_close_timeout: Optional[timedelta] = None,
output_type: Type[OutputT] | None = None,
schedule_to_close_timeout: timedelta | None = None,
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
headers: Optional[Mapping[str, str]] = None,
headers: Mapping[str, str] | None = None,
summary: str | None = None,
) -> NexusOperationHandle[OutputT]: ...

# Overload for operation_handler
Expand All @@ -5427,10 +5433,11 @@ async def start_operation(
],
input: InputT,
*,
output_type: Optional[Type[OutputT]] = None,
schedule_to_close_timeout: Optional[timedelta] = None,
output_type: Type[OutputT] | None = None,
schedule_to_close_timeout: timedelta | None = None,
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
headers: Optional[Mapping[str, str]] = None,
headers: Mapping[str, str] | None = None,
summary: str | None = None,
) -> NexusOperationHandle[OutputT]: ...

@abstractmethod
Expand All @@ -5439,10 +5446,11 @@ async def start_operation(
operation: Any,
input: Any,
*,
output_type: Optional[Type[OutputT]] = None,
schedule_to_close_timeout: Optional[timedelta] = None,
output_type: Type[OutputT] | None = None,
schedule_to_close_timeout: timedelta | None = None,
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
headers: Optional[Mapping[str, str]] = None,
headers: Mapping[str, str] | None = None,
summary: str | None = None,
) -> Any:
"""Start a Nexus operation and return its handle.

Expand All @@ -5469,10 +5477,11 @@ async def execute_operation(
operation: nexusrpc.Operation[InputT, OutputT],
input: InputT,
*,
output_type: Optional[Type[OutputT]] = None,
schedule_to_close_timeout: Optional[timedelta] = None,
output_type: Type[OutputT] | None = None,
schedule_to_close_timeout: timedelta | None = None,
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
headers: Optional[Mapping[str, str]] = None,
headers: Mapping[str, str] | None = None,
summary: str | None = None,
) -> OutputT: ...

# Overload for string operation name
Expand All @@ -5483,10 +5492,11 @@ async def execute_operation(
operation: str,
input: Any,
*,
output_type: Optional[Type[OutputT]] = None,
schedule_to_close_timeout: Optional[timedelta] = None,
output_type: Type[OutputT] | None = None,
schedule_to_close_timeout: timedelta | None = None,
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
headers: Optional[Mapping[str, str]] = None,
headers: Mapping[str, str] | None = None,
summary: str | None = None,
) -> OutputT: ...

# Overload for workflow_run_operation methods
Expand All @@ -5500,10 +5510,11 @@ async def execute_operation(
],
input: InputT,
*,
output_type: Optional[Type[OutputT]] = None,
schedule_to_close_timeout: Optional[timedelta] = None,
output_type: Type[OutputT] | None = None,
schedule_to_close_timeout: timedelta | None = None,
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
headers: Optional[Mapping[str, str]] = None,
headers: Mapping[str, str] | None = None,
summary: str | None = None,
) -> OutputT: ...

# TODO(nexus-preview): in practice, both these overloads match an async def sync
Expand All @@ -5520,10 +5531,11 @@ async def execute_operation(
],
input: InputT,
*,
output_type: Optional[Type[OutputT]] = None,
schedule_to_close_timeout: Optional[timedelta] = None,
output_type: Type[OutputT] | None = None,
schedule_to_close_timeout: timedelta | None = None,
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
headers: Optional[Mapping[str, str]] = None,
headers: Mapping[str, str] | None = None,
summary: str | None = None,
) -> OutputT: ...

# Overload for sync_operation methods (def)
Expand All @@ -5537,10 +5549,11 @@ async def execute_operation(
],
input: InputT,
*,
output_type: Optional[Type[OutputT]] = None,
schedule_to_close_timeout: Optional[timedelta] = None,
output_type: Type[OutputT] | None = None,
schedule_to_close_timeout: timedelta | None = None,
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
headers: Optional[Mapping[str, str]] = None,
headers: Mapping[str, str] | None = None,
summary: str | None = None,
) -> OutputT: ...

# Overload for operation_handler
Expand All @@ -5554,10 +5567,11 @@ async def execute_operation(
],
input: InputT,
*,
output_type: Optional[Type[OutputT]] = None,
schedule_to_close_timeout: Optional[timedelta] = None,
output_type: Type[OutputT] | None = None,
schedule_to_close_timeout: timedelta | None = None,
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
headers: Optional[Mapping[str, str]] = None,
headers: Mapping[str, str] | None = None,
summary: str | None = None,
) -> OutputT: ...

@abstractmethod
Expand All @@ -5566,10 +5580,11 @@ async def execute_operation(
operation: Any,
input: Any,
*,
output_type: Optional[Type[OutputT]] = None,
schedule_to_close_timeout: Optional[timedelta] = None,
output_type: Type[OutputT] | None = None,
schedule_to_close_timeout: timedelta | None = None,
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
headers: Optional[Mapping[str, str]] = None,
headers: Mapping[str, str] | None = None,
summary: str | None = None,
) -> Any:
"""Execute a Nexus operation and return its result.

Expand Down Expand Up @@ -5618,10 +5633,11 @@ async def start_operation(
operation: Any,
input: Any,
*,
output_type: Optional[Type] = None,
schedule_to_close_timeout: Optional[timedelta] = None,
output_type: Type[OutputT] | None = None,
schedule_to_close_timeout: timedelta | None = None,
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
headers: Optional[Mapping[str, str]] = None,
headers: Mapping[str, str] | None = None,
summary: str | None = None,
) -> Any:
return (
await temporalio.workflow._Runtime.current().workflow_start_nexus_operation(
Expand All @@ -5633,6 +5649,7 @@ async def start_operation(
schedule_to_close_timeout=schedule_to_close_timeout,
cancellation_type=cancellation_type,
headers=headers,
summary=summary,
)
)

Expand All @@ -5641,10 +5658,11 @@ async def execute_operation(
operation: Any,
input: Any,
*,
output_type: Optional[Type] = None,
schedule_to_close_timeout: Optional[timedelta] = None,
output_type: Type[OutputT] | None = None,
schedule_to_close_timeout: timedelta | None = None,
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
headers: Optional[Mapping[str, str]] = None,
headers: Mapping[str, str] | None = None,
summary: str | None = None,
) -> Any:
handle = await self.start_operation(
operation,
Expand All @@ -5653,6 +5671,7 @@ async def execute_operation(
schedule_to_close_timeout=schedule_to_close_timeout,
cancellation_type=cancellation_type,
headers=headers,
summary=summary,
)
return await handle

Expand Down
Loading