From 48cefcaec813e5562d6579593a0eeb8b229a08c4 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Tue, 2 Jun 2026 15:59:31 -0700 Subject: [PATCH 1/4] Add workflow cancellation reason --- temporalio/worker/_workflow_instance.py | 20 ++++---- temporalio/workflow/__init__.py | 2 + temporalio/workflow/_context.py | 25 +++++++++ tests/worker/test_workflow.py | 68 +++++++++++++++++++++++++ 4 files changed, 106 insertions(+), 9 deletions(-) diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index 16c3483d8..ed3913441 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -265,7 +265,7 @@ def __init__(self, det: WorkflowInstanceDetails) -> None: ) self._primary_task: asyncio.Task[None] | None = None self._time_ns = 0 - self._cancel_requested = False + self._cancel_reason: str | None = None self._deployment_version_for_current_task: None | ( temporalio.bridge.proto.common.WorkerDeploymentVersion ) = None @@ -595,10 +595,9 @@ def _apply( raise RuntimeError(f"Unrecognized job: {job.WhichOneof('variant')}") def _apply_cancel_workflow( - self, _job: temporalio.bridge.proto.workflow_activation.CancelWorkflow + self, job: temporalio.bridge.proto.workflow_activation.CancelWorkflow ) -> None: - self._cancel_requested = True - # TODO(cretz): Details or cancel message or whatever? + self._cancel_reason = job.reason if self._primary_task: # The primary task may not have started yet and we want to give the # workflow the ability to receive the cancellation, so we must defer @@ -799,7 +798,6 @@ def _apply_remove_from_cache( self, _job: temporalio.bridge.proto.workflow_activation.RemoveFromCache ) -> None: self._deleting = True - self._cancel_requested = True # We consider eviction to be under replay so that certain code like # logging that avoids replaying doesn't run during eviction either self._is_replaying = True @@ -1189,6 +1187,9 @@ def workflow_continue_as_new( ) ) + def workflow_cancellation_reason(self) -> str | None: + return self._cancel_reason + def workflow_extern_functions(self) -> Mapping[str, Callable]: return self._extern_functions @@ -2046,7 +2047,7 @@ async def run_child() -> Any: and (t := asyncio.current_task()) is not None ): t.uncancel() # type: ignore[union-attr] - if self._cancel_requested: + if self._cancel_reason is not None or self._deleting: raise async def _outbound_start_nexus_operation( @@ -2102,7 +2103,7 @@ async def operation_handle_fn() -> OutputT: and (t := asyncio.current_task()) is not None ): t.uncancel() # type: ignore[union-attr] - if self._cancel_requested: + if self._cancel_reason is not None or self._deleting: raise #### Miscellaneous helpers #### @@ -2588,8 +2589,9 @@ async def _run_top_level_workflow_function(self, coro: Awaitable[None]) -> None: # cancel later on will show the workflow as cancelled. But this is # a Temporal limitation in that cancellation is a state not an # event. - if self._cancel_requested and temporalio.exceptions.is_cancelled_exception( - err + if ( + self._cancel_reason is not None + and temporalio.exceptions.is_cancelled_exception(err) ): self._add_command().cancel_workflow_execution.SetInParent() elif self.workflow_is_failure_exception(err): diff --git a/temporalio/workflow/__init__.py b/temporalio/workflow/__init__.py index ec74299c2..8b8b0fb6f 100644 --- a/temporalio/workflow/__init__.py +++ b/temporalio/workflow/__init__.py @@ -59,6 +59,7 @@ _current_update_info, _Runtime, _set_current_update_info, + cancellation_reason, current_update_info, deprecate_patch, extern_functions, @@ -196,6 +197,7 @@ "get_last_completion_result", "get_last_failure", "has_last_completion_result", + "cancellation_reason", "in_workflow", "info", "instance", diff --git a/temporalio/workflow/_context.py b/temporalio/workflow/_context.py index 5c3f22cc9..297a8bf30 100644 --- a/temporalio/workflow/_context.py +++ b/temporalio/workflow/_context.py @@ -36,6 +36,7 @@ "ParentInfo", "RootInfo", "UpdateInfo", + "cancellation_reason", "current_update_info", "deprecate_patch", "extern_functions", @@ -295,6 +296,9 @@ def workflow_continue_as_new( initial_versioning_behavior: ContinueAsNewVersioningBehavior | None, ) -> NoReturn: ... + @abstractmethod + def workflow_cancellation_reason(self) -> str | None: ... + @abstractmethod def workflow_extern_functions(self) -> Mapping[str, Callable]: ... @@ -591,6 +595,27 @@ def in_workflow() -> bool: return _Runtime.maybe_current() is not None +def cancellation_reason() -> str | None: + """Reason the workflow was cancelled, or None if no external cancellation + request has been received. + + A non-None value (including an empty string) indicates that the workflow + received an explicit cancellation request from the server. This can be used + when catching an :py:class:`asyncio.CancelledError` to distinguish a + workflow-level cancel from a cancel that originated from inner asyncio task + cancellation. + + Note, this only reflects cancellation requested via the server; it is not + set for cache eviction or for cancels of inner tasks/scopes. + + Returns: + The reason string sent with the workflow cancellation request (which + may be empty), or ``None`` if the workflow has not been cancelled via + an external request. + """ + return _Runtime.current().workflow_cancellation_reason() + + def memo() -> Mapping[str, Any]: """Current workflow's memo values, converted without type hints. diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index cb5bb8067..34334eeed 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -1093,6 +1093,74 @@ async def started() -> bool: assert (await handle.describe()).status == WorkflowExecutionStatus.CANCELED +@workflow.defn +class CancelReasonWorkflow: + def __init__(self) -> None: + self._started = False + # Reason observed when the inner task was cancelled (no external + # workflow cancel has happened yet at that point). + self._reason_inner: str | None = "unset" + # Reason observed in the outer CancelledError handler after the + # external workflow cancel has been delivered. + self._reason_outer: str | None = "unset" + + @workflow.run + async def run(self) -> NoReturn: + self._started = True + task = asyncio.create_task(asyncio.sleep(1000)) + try: + task.cancel() + await task + except asyncio.CancelledError: + self._reason_inner = workflow.cancellation_reason() + try: + await asyncio.sleep(1000) + except asyncio.CancelledError: + self._reason_outer = workflow.cancellation_reason() + raise + raise RuntimeError("unreachable") + + @workflow.query + def started(self) -> bool: + return self._started + + @workflow.query + def reason_inner(self) -> str | None: + return self._reason_inner + + @workflow.query + def reason_outer(self) -> str | None: + return self._reason_outer + + +async def test_workflow_cancellation_reason(client: Client): + async with new_worker(client, CancelReasonWorkflow) as worker: + handle = await client.start_workflow( + CancelReasonWorkflow.run, + id=f"workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + ) + + async def started() -> bool: + return await handle.query(CancelReasonWorkflow.started) + + await assert_eq_eventually(True, started) + # Before any external cancel, reason is None even though an inner task + # cancel has already been observed. + assert await handle.query(CancelReasonWorkflow.reason_inner) is None + + await handle.cancel() + with pytest.raises(WorkflowFailureError) as err: + await handle.result() + assert isinstance(err.value.cause, CancelledError) + + # After external cancel, reason is a string (empty since the Python + # client does not send one) — non-None is the load-bearing distinction. + outer = await handle.query(CancelReasonWorkflow.reason_outer) + assert outer is not None + assert isinstance(outer, str) + + @workflow.defn class TrapCancelWorkflow: @workflow.run From 04d6fe7fecba4044882ccb01bfce2f76378da0d1 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Tue, 2 Jun 2026 16:20:20 -0700 Subject: [PATCH 2/4] Add cancel reasons on caller side --- temporalio/client/_impl.py | 1 + temporalio/client/_interceptor.py | 1 + temporalio/client/_workflow.py | 4 ++ temporalio/worker/_workflow_instance.py | 25 ++++++--- temporalio/workflow/_workflow_ops.py | 6 +- tests/worker/test_workflow.py | 75 +++++++++++++++++++++++-- 6 files changed, 97 insertions(+), 15 deletions(-) diff --git a/temporalio/client/_impl.py b/temporalio/client/_impl.py index af221865a..0f3667b19 100644 --- a/temporalio/client/_impl.py +++ b/temporalio/client/_impl.py @@ -352,6 +352,7 @@ async def cancel_workflow(self, input: CancelWorkflowInput) -> None: identity=self._client.identity, request_id=str(uuid.uuid4()), first_execution_run_id=input.first_execution_run_id or "", + reason=input.reason, ), retry=True, metadata=input.rpc_metadata, diff --git a/temporalio/client/_interceptor.py b/temporalio/client/_interceptor.py index 587b802d0..0e780146d 100644 --- a/temporalio/client/_interceptor.py +++ b/temporalio/client/_interceptor.py @@ -111,6 +111,7 @@ class CancelWorkflowInput: id: str run_id: str | None first_execution_run_id: str | None + reason: str rpc_metadata: Mapping[str, str | bytes] rpc_timeout: timedelta | None diff --git a/temporalio/client/_workflow.py b/temporalio/client/_workflow.py index 22ac00d84..e82006580 100644 --- a/temporalio/client/_workflow.py +++ b/temporalio/client/_workflow.py @@ -318,6 +318,7 @@ async def result( async def cancel( self, *, + reason: str = "", rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: timedelta | None = None, ) -> None: @@ -334,6 +335,8 @@ async def cancel( workflow ID even if it is unrelated to the started workflow. Args: + reason: Reason recorded with the cancellation request. Available + inside the workflow via :py:func:`temporalio.workflow.cancellation_reason`. rpc_metadata: Headers used on the RPC call. Keys here override client-level RPC metadata keys. rpc_timeout: Optional RPC deadline to set for the RPC call. @@ -346,6 +349,7 @@ async def cancel( id=self._id, run_id=self._run_id, first_execution_run_id=self._first_execution_run_id, + reason=reason, rpc_metadata=rpc_metadata, rpc_timeout=rpc_timeout, ) diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index ed3913441..76ccdb2e3 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -1988,10 +1988,12 @@ async def _outbound_start_child_workflow( handle: _ChildWorkflowHandle # Common code for handling cancel for start and run - def apply_child_cancel_error() -> None: - # Send a cancel request to the child + def apply_child_cancel_error(err: asyncio.CancelledError) -> None: + # Send a cancel request to the child, forwarding the msg passed to + # Task.cancel(msg) (if any) as the cancellation reason. + reason = err.args[0] if err.args and isinstance(err.args[0], str) else "" cancel_command = self._add_command() - handle._apply_cancel_command(cancel_command) + handle._apply_cancel_command(cancel_command, reason=reason) # If the cancel command is for external workflow, we # have to add a seq and mark it pending if cancel_command.HasField("request_cancel_external_workflow_execution"): @@ -2014,8 +2016,8 @@ async def run_child() -> Any: # We have to shield because we don't want the future itself # to be cancelled return await asyncio.shield(handle._result_fut) - except asyncio.CancelledError: - apply_child_cancel_error() + except asyncio.CancelledError as err: + apply_child_cancel_error(err) # Clear the cancellation counter on Python 3.11+ so the # next await does not immediately re-raise CancelledError if ( @@ -2038,8 +2040,8 @@ async def run_child() -> Any: # to be cancelled await asyncio.shield(handle._start_fut) return handle - except asyncio.CancelledError: - apply_child_cancel_error() + except asyncio.CancelledError as err: + apply_child_cancel_error(err) # Clear the cancellation counter on Python 3.11+ so the # next await does not immediately re-raise CancelledError if ( @@ -3383,8 +3385,12 @@ def _apply_start_command(self) -> None: def _apply_cancel_command( self, command: temporalio.bridge.proto.workflow_commands.WorkflowCommand, + *, + reason: str = "", ) -> None: - command.cancel_child_workflow_execution.child_workflow_seq = self._seq + v = command.cancel_child_workflow_execution + v.child_workflow_seq = self._seq + v.reason = reason class _ExternalWorkflowHandle(temporalio.workflow.ExternalWorkflowHandle[Any]): @@ -3428,7 +3434,7 @@ async def signal( ) ) - async def cancel(self) -> None: + async def cancel(self, *, reason: str = "") -> None: self._instance._assert_not_read_only("cancel external handle") command = self._instance._add_command() v = command.request_cancel_external_workflow_execution @@ -3436,6 +3442,7 @@ async def cancel(self) -> None: v.workflow_execution.workflow_id = self._id if self._run_id: v.workflow_execution.run_id = self._run_id + v.reason = reason await self._instance._cancel_external_workflow(command) diff --git a/temporalio/workflow/_workflow_ops.py b/temporalio/workflow/_workflow_ops.py index 0cd22cb17..6eb8bd222 100644 --- a/temporalio/workflow/_workflow_ops.py +++ b/temporalio/workflow/_workflow_ops.py @@ -604,11 +604,15 @@ async def signal( """ raise NotImplementedError - async def cancel(self) -> None: + async def cancel(self, *, reason: str = "") -> None: """Send a cancellation request to this external workflow. This will fail if the workflow cannot accept the request (e.g. if the workflow is not found). + + Args: + reason: Reason recorded with the cancellation request. Available in + the target workflow via :py:func:`cancellation_reason`. """ raise NotImplementedError diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index 34334eeed..513bae394 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -1149,16 +1149,81 @@ async def started() -> bool: # cancel has already been observed. assert await handle.query(CancelReasonWorkflow.reason_inner) is None - await handle.cancel() + await handle.cancel(reason="user-supplied reason") with pytest.raises(WorkflowFailureError) as err: await handle.result() assert isinstance(err.value.cause, CancelledError) - # After external cancel, reason is a string (empty since the Python - # client does not send one) — non-None is the load-bearing distinction. outer = await handle.query(CancelReasonWorkflow.reason_outer) - assert outer is not None - assert isinstance(outer, str) + assert outer == "user-supplied reason" + + +@workflow.defn +class CancelReasonReporter: + """Workflow that swallows a cancel and returns the observed reason.""" + + @workflow.run + async def run(self) -> str: + try: + await asyncio.sleep(1000) + except asyncio.CancelledError: + return workflow.cancellation_reason() or "" + raise RuntimeError("unreachable") + + +@workflow.defn +class ChildCancelReasonWorkflow: + @workflow.run + async def run(self, msg: str) -> str: + child = await workflow.start_child_workflow( + CancelReasonReporter.run, + id=f"{workflow.info().workflow_id}_child", + ) + child.cancel(msg) + return await child + + +async def test_workflow_child_cancel_reason(client: Client): + async with new_worker( + client, ChildCancelReasonWorkflow, CancelReasonReporter + ) as worker: + result = await client.execute_workflow( + ChildCancelReasonWorkflow.run, + "from-parent", + id=f"workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + ) + assert result == "from-parent" + + +@workflow.defn +class ExternalCancelReasonWorkflow: + @workflow.run + async def run(self, target_id: str) -> None: + await workflow.get_external_workflow_handle(target_id).cancel( + reason="from-external-caller" + ) + + +async def test_workflow_external_cancel_reason(client: Client): + async with new_worker( + client, ExternalCancelReasonWorkflow, CancelReasonReporter + ) as worker: + target_id = f"workflow-{uuid.uuid4()}" + target = await client.start_workflow( + CancelReasonReporter.run, + id=target_id, + task_queue=worker.task_queue, + ) + await client.execute_workflow( + ExternalCancelReasonWorkflow.run, + target_id, + id=f"workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + ) + # Server wraps the user-supplied reason with metadata about the caller + # when one workflow cancels another, so check for substring. + assert "from-external-caller" in await target.result() @workflow.defn From 40a837135fe132d173e4f209da0dbe12f7a99a31 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Tue, 2 Jun 2026 16:24:21 -0700 Subject: [PATCH 3/4] Add test proving reason isn't none --- tests/worker/test_workflow.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index 513bae394..4cd070cc8 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -1133,7 +1133,8 @@ def reason_outer(self) -> str | None: return self._reason_outer -async def test_workflow_cancellation_reason(client: Client): +@pytest.mark.parametrize("reason", ["user-supplied reason", ""]) +async def test_workflow_cancellation_reason(client: Client, reason: str): async with new_worker(client, CancelReasonWorkflow) as worker: handle = await client.start_workflow( CancelReasonWorkflow.run, @@ -1149,13 +1150,21 @@ async def started() -> bool: # cancel has already been observed. assert await handle.query(CancelReasonWorkflow.reason_inner) is None - await handle.cancel(reason="user-supplied reason") + # When reason is "", cancel without providing the kwarg at all to + # exercise the default path. + if reason: + await handle.cancel(reason=reason) + else: + await handle.cancel() with pytest.raises(WorkflowFailureError) as err: await handle.result() assert isinstance(err.value.cause, CancelledError) outer = await handle.query(CancelReasonWorkflow.reason_outer) - assert outer == "user-supplied reason" + # Load-bearing: a cancel with no reason still produces an empty string, + # not None — None means "no external cancel happened". + assert outer is not None + assert outer == reason @workflow.defn From e7b8721aff069083b31185579c6e8adc27a1c6eb Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Tue, 2 Jun 2026 16:29:13 -0700 Subject: [PATCH 4/4] Lint fix --- temporalio/workflow/_workflow_ops.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/temporalio/workflow/_workflow_ops.py b/temporalio/workflow/_workflow_ops.py index 6eb8bd222..b877be585 100644 --- a/temporalio/workflow/_workflow_ops.py +++ b/temporalio/workflow/_workflow_ops.py @@ -604,7 +604,7 @@ async def signal( """ raise NotImplementedError - async def cancel(self, *, reason: str = "") -> None: + async def cancel(self, *, reason: str = "") -> None: # pyright: ignore[reportUnusedParameter] """Send a cancellation request to this external workflow. This will fail if the workflow cannot accept the request (e.g. if the