Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions temporalio/client/_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ async def cancel_workflow(self, input: CancelWorkflowInput) -> None:
identity=self._client.identity,
request_id=str(uuid.uuid4()),
first_execution_run_id=input.first_execution_run_id or "",
reason=input.reason,
),
retry=True,
metadata=input.rpc_metadata,
Expand Down
1 change: 1 addition & 0 deletions temporalio/client/_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class CancelWorkflowInput:
id: str
run_id: str | None
first_execution_run_id: str | None
reason: str
rpc_metadata: Mapping[str, str | bytes]
rpc_timeout: timedelta | None

Expand Down
4 changes: 4 additions & 0 deletions temporalio/client/_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ async def result(
async def cancel(
self,
*,
reason: str = "",
rpc_metadata: Mapping[str, str | bytes] = {},
rpc_timeout: timedelta | None = None,
) -> None:
Expand All @@ -334,6 +335,8 @@ async def cancel(
workflow ID even if it is unrelated to the started workflow.

Args:
reason: Reason recorded with the cancellation request. Available
inside the workflow via :py:func:`temporalio.workflow.cancellation_reason`.
rpc_metadata: Headers used on the RPC call. Keys here override
client-level RPC metadata keys.
rpc_timeout: Optional RPC deadline to set for the RPC call.
Expand All @@ -346,6 +349,7 @@ async def cancel(
id=self._id,
run_id=self._run_id,
first_execution_run_id=self._first_execution_run_id,
reason=reason,
rpc_metadata=rpc_metadata,
rpc_timeout=rpc_timeout,
)
Expand Down
45 changes: 27 additions & 18 deletions temporalio/worker/_workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ def __init__(self, det: WorkflowInstanceDetails) -> None:
)
self._primary_task: asyncio.Task[None] | None = None
self._time_ns = 0
self._cancel_requested = False
self._cancel_reason: str | None = None
self._deployment_version_for_current_task: None | (
temporalio.bridge.proto.common.WorkerDeploymentVersion
) = None
Expand Down Expand Up @@ -595,10 +595,9 @@ def _apply(
raise RuntimeError(f"Unrecognized job: {job.WhichOneof('variant')}")

def _apply_cancel_workflow(
self, _job: temporalio.bridge.proto.workflow_activation.CancelWorkflow
self, job: temporalio.bridge.proto.workflow_activation.CancelWorkflow
) -> None:
self._cancel_requested = True
# TODO(cretz): Details or cancel message or whatever?
self._cancel_reason = job.reason
Comment thread
tconley1428 marked this conversation as resolved.
if self._primary_task:
# The primary task may not have started yet and we want to give the
# workflow the ability to receive the cancellation, so we must defer
Expand Down Expand Up @@ -799,7 +798,6 @@ def _apply_remove_from_cache(
self, _job: temporalio.bridge.proto.workflow_activation.RemoveFromCache
) -> None:
self._deleting = True
self._cancel_requested = True
# We consider eviction to be under replay so that certain code like
# logging that avoids replaying doesn't run during eviction either
self._is_replaying = True
Expand Down Expand Up @@ -1189,6 +1187,9 @@ def workflow_continue_as_new(
)
)

def workflow_cancellation_reason(self) -> str | None:
return self._cancel_reason

def workflow_extern_functions(self) -> Mapping[str, Callable]:
return self._extern_functions

Expand Down Expand Up @@ -1987,10 +1988,12 @@ async def _outbound_start_child_workflow(
handle: _ChildWorkflowHandle

# Common code for handling cancel for start and run
def apply_child_cancel_error() -> None:
# Send a cancel request to the child
def apply_child_cancel_error(err: asyncio.CancelledError) -> None:
# Send a cancel request to the child, forwarding the msg passed to
# Task.cancel(msg) (if any) as the cancellation reason.
reason = err.args[0] if err.args and isinstance(err.args[0], str) else ""
cancel_command = self._add_command()
handle._apply_cancel_command(cancel_command)
handle._apply_cancel_command(cancel_command, reason=reason)
# If the cancel command is for external workflow, we
# have to add a seq and mark it pending
if cancel_command.HasField("request_cancel_external_workflow_execution"):
Expand All @@ -2013,8 +2016,8 @@ async def run_child() -> Any:
# We have to shield because we don't want the future itself
# to be cancelled
return await asyncio.shield(handle._result_fut)
except asyncio.CancelledError:
apply_child_cancel_error()
except asyncio.CancelledError as err:
apply_child_cancel_error(err)
# Clear the cancellation counter on Python 3.11+ so the
# next await does not immediately re-raise CancelledError
if (
Expand All @@ -2037,16 +2040,16 @@ async def run_child() -> Any:
# to be cancelled
await asyncio.shield(handle._start_fut)
return handle
except asyncio.CancelledError:
apply_child_cancel_error()
except asyncio.CancelledError as err:
apply_child_cancel_error(err)
# Clear the cancellation counter on Python 3.11+ so the
# next await does not immediately re-raise CancelledError
if (
sys.version_info >= (3, 11)
and (t := asyncio.current_task()) is not None
):
t.uncancel() # type: ignore[union-attr]
if self._cancel_requested:
if self._cancel_reason is not None or self._deleting:
raise

async def _outbound_start_nexus_operation(
Expand Down Expand Up @@ -2102,7 +2105,7 @@ async def operation_handle_fn() -> OutputT:
and (t := asyncio.current_task()) is not None
):
t.uncancel() # type: ignore[union-attr]
if self._cancel_requested:
if self._cancel_reason is not None or self._deleting:
raise

#### Miscellaneous helpers ####
Expand Down Expand Up @@ -2588,8 +2591,9 @@ async def _run_top_level_workflow_function(self, coro: Awaitable[None]) -> None:
# cancel later on will show the workflow as cancelled. But this is
# a Temporal limitation in that cancellation is a state not an
# event.
if self._cancel_requested and temporalio.exceptions.is_cancelled_exception(
err
if (
self._cancel_reason is not None
and temporalio.exceptions.is_cancelled_exception(err)
):
self._add_command().cancel_workflow_execution.SetInParent()
elif self.workflow_is_failure_exception(err):
Expand Down Expand Up @@ -3381,8 +3385,12 @@ def _apply_start_command(self) -> None:
def _apply_cancel_command(
self,
command: temporalio.bridge.proto.workflow_commands.WorkflowCommand,
*,
reason: str = "",
) -> None:
command.cancel_child_workflow_execution.child_workflow_seq = self._seq
v = command.cancel_child_workflow_execution
v.child_workflow_seq = self._seq
v.reason = reason


class _ExternalWorkflowHandle(temporalio.workflow.ExternalWorkflowHandle[Any]):
Expand Down Expand Up @@ -3426,14 +3434,15 @@ async def signal(
)
)

async def cancel(self) -> None:
async def cancel(self, *, reason: str = "") -> None:
self._instance._assert_not_read_only("cancel external handle")
command = self._instance._add_command()
v = command.request_cancel_external_workflow_execution
v.workflow_execution.namespace = self._instance._info.namespace
v.workflow_execution.workflow_id = self._id
if self._run_id:
v.workflow_execution.run_id = self._run_id
v.reason = reason
await self._instance._cancel_external_workflow(command)


Expand Down
2 changes: 2 additions & 0 deletions temporalio/workflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
_current_update_info,
_Runtime,
_set_current_update_info,
cancellation_reason,
current_update_info,
deprecate_patch,
extern_functions,
Expand Down Expand Up @@ -196,6 +197,7 @@
"get_last_completion_result",
"get_last_failure",
"has_last_completion_result",
"cancellation_reason",
"in_workflow",
"info",
"instance",
Expand Down
25 changes: 25 additions & 0 deletions temporalio/workflow/_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
"ParentInfo",
"RootInfo",
"UpdateInfo",
"cancellation_reason",
"current_update_info",
"deprecate_patch",
"extern_functions",
Expand Down Expand Up @@ -295,6 +296,9 @@ def workflow_continue_as_new(
initial_versioning_behavior: ContinueAsNewVersioningBehavior | None,
) -> NoReturn: ...

@abstractmethod
def workflow_cancellation_reason(self) -> str | None: ...

@abstractmethod
def workflow_extern_functions(self) -> Mapping[str, Callable]: ...

Expand Down Expand Up @@ -591,6 +595,27 @@ def in_workflow() -> bool:
return _Runtime.maybe_current() is not None


def cancellation_reason() -> str | None:
"""Reason the workflow was cancelled, or None if no external cancellation
request has been received.

A non-None value (including an empty string) indicates that the workflow
received an explicit cancellation request from the server. This can be used
when catching an :py:class:`asyncio.CancelledError` to distinguish a
workflow-level cancel from a cancel that originated from inner asyncio task
cancellation.

Note, this only reflects cancellation requested via the server; it is not
set for cache eviction or for cancels of inner tasks/scopes.

Returns:
The reason string sent with the workflow cancellation request (which
may be empty), or ``None`` if the workflow has not been cancelled via
an external request.
"""
return _Runtime.current().workflow_cancellation_reason()


def memo() -> Mapping[str, Any]:
"""Current workflow's memo values, converted without type hints.

Expand Down
6 changes: 5 additions & 1 deletion temporalio/workflow/_workflow_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -604,11 +604,15 @@ async def signal(
"""
raise NotImplementedError

async def cancel(self) -> None:
async def cancel(self, *, reason: str = "") -> None: # pyright: ignore[reportUnusedParameter]
"""Send a cancellation request to this external workflow.

This will fail if the workflow cannot accept the request (e.g. if the
workflow is not found).

Args:
reason: Reason recorded with the cancellation request. Available in
the target workflow via :py:func:`cancellation_reason`.
"""
raise NotImplementedError

Expand Down
Loading
Loading