From 8e0e4878ed794b3bbdb6b86f737931be64ca91cf Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Mon, 9 Feb 2026 13:47:33 -0800 Subject: [PATCH] Add sample for Nexus cancellation --- nexus_cancel/README.md | 43 +++++++++ nexus_cancel/__init__.py | 0 nexus_cancel/caller/__init__.py | 0 nexus_cancel/caller/starter.py | 42 +++++++++ nexus_cancel/caller/worker.py | 37 ++++++++ nexus_cancel/caller/workflows.py | 116 ++++++++++++++++++++++++ nexus_cancel/handler/__init__.py | 0 nexus_cancel/handler/service_handler.py | 54 +++++++++++ nexus_cancel/handler/worker.py | 43 +++++++++ nexus_cancel/handler/workflows.py | 49 ++++++++++ nexus_cancel/service.py | 11 +++ 11 files changed, 395 insertions(+) create mode 100644 nexus_cancel/README.md create mode 100644 nexus_cancel/__init__.py create mode 100644 nexus_cancel/caller/__init__.py create mode 100644 nexus_cancel/caller/starter.py create mode 100644 nexus_cancel/caller/worker.py create mode 100644 nexus_cancel/caller/workflows.py create mode 100644 nexus_cancel/handler/__init__.py create mode 100644 nexus_cancel/handler/service_handler.py create mode 100644 nexus_cancel/handler/worker.py create mode 100644 nexus_cancel/handler/workflows.py create mode 100644 nexus_cancel/service.py diff --git a/nexus_cancel/README.md b/nexus_cancel/README.md new file mode 100644 index 00000000..dc9fc6dc --- /dev/null +++ b/nexus_cancel/README.md @@ -0,0 +1,43 @@ +# Nexus Cancellation + +This sample shows how to cancel a Nexus operation from a caller workflow and specify a cancellation type. In this sample we show using the `WAIT_REQUESTED` cancellation type, which allows the caller to return after the handler workflow has received the request to be cancelled, but does not wait for the handler workflow to finish processing the cancellation request. + +To run this sample, set up your environment following the instructions in the main [Nexus Sample](../hello_nexus/README.md). + +Next, in separate terminal windows: + +## Nexus Handler Worker + +```bash +uv run nexus_cancel/handler/worker.py +``` + +## Nexus Caller Worker + +```bash +uv run nexus_cancel/caller/worker.py +``` + +## Start Caller Workflow + +```bash +uv run nexus_cancel/caller/starter.py +``` + +## Expected Output + +On the caller side, you should see: +``` +Started workflow workflowId: hello-caller- runId: +Workflow result: Hello Nexus-X 👋 +``` + +On the handler side, you should see multiple log messages: +``` +HelloHandlerWorkflow was cancelled successfully. +HelloHandlerWorkflow was cancelled successfully. +HelloHandlerWorkflow was cancelled successfully. +HelloHandlerWorkflow was cancelled successfully. +``` + +Notice the timing: the caller workflow returns before all handler workflows have completed their cancellation cleanup. This is because of the use of `WAIT_REQUESTED` as the cancellation type in the Nexus operation. This means the caller didn't have to wait for the handler workflows to finish, but still guarantees the handler workflows will receive the cancellation request. diff --git a/nexus_cancel/__init__.py b/nexus_cancel/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/nexus_cancel/caller/__init__.py b/nexus_cancel/caller/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/nexus_cancel/caller/starter.py b/nexus_cancel/caller/starter.py new file mode 100644 index 00000000..26fd65b9 --- /dev/null +++ b/nexus_cancel/caller/starter.py @@ -0,0 +1,42 @@ +""" +Starter script to execute the caller workflow that demonstrates Nexus cancellation. +""" + +import asyncio +import uuid + +from temporalio.client import Client +from temporalio.envconfig import ClientConfig + +from nexus_cancel.caller.workflows import HelloCallerWorkflow + +NAMESPACE = "my-caller-namespace" +TASK_QUEUE = "my-caller-workflow-task-queue" + + +async def main(): + """Execute the caller workflow.""" + config = ClientConfig.load_client_connect_config() + config.setdefault("target_host", "localhost:7233") + config.setdefault("namespace", NAMESPACE) + client = await Client.connect(**config) + + workflow_id = f"hello-caller-{uuid.uuid4()}" + + # Start the workflow + handle = await client.start_workflow( + HelloCallerWorkflow.run, + "Nexus", + id=workflow_id, + task_queue=TASK_QUEUE, + ) + + print(f"Started workflow workflowId: {handle.id} runId: {handle.result_run_id}") + + # Wait for result + result = await handle.result() + print(f"Workflow result: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/nexus_cancel/caller/worker.py b/nexus_cancel/caller/worker.py new file mode 100644 index 00000000..223fc7e0 --- /dev/null +++ b/nexus_cancel/caller/worker.py @@ -0,0 +1,37 @@ +""" +Worker for the caller namespace that executes workflows calling Nexus operations. +""" + +import asyncio + +from temporalio.client import Client +from temporalio.envconfig import ClientConfig +from temporalio.worker import Worker + +from nexus_cancel.caller.workflows import HelloCallerWorkflow + +NAMESPACE = "my-caller-namespace" +TASK_QUEUE = "my-caller-workflow-task-queue" + + +async def main(): + """Start the caller worker.""" + config = ClientConfig.load_client_connect_config() + config.setdefault("target_host", "localhost:7233") + config.setdefault("namespace", NAMESPACE) + client = await Client.connect(**config) + + # Start worker with the caller workflow + worker = Worker( + client, + task_queue=TASK_QUEUE, + workflows=[HelloCallerWorkflow], + ) + + print(f"Starting caller worker on namespace '{NAMESPACE}', task queue '{TASK_QUEUE}'") + print("Worker is ready to execute caller workflows...") + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/nexus_cancel/caller/workflows.py b/nexus_cancel/caller/workflows.py new file mode 100644 index 00000000..f990c2a2 --- /dev/null +++ b/nexus_cancel/caller/workflows.py @@ -0,0 +1,116 @@ +""" +This workflow demonstrates how to cancel Nexus operations using cancellation scopes. + +This sample shows how to use the WAIT_REQUESTED cancellation type, which allows +the caller to return after the handler workflow has received the cancellation request, +but does not wait for the handler workflow to finish processing the cancellation. +""" + +import asyncio +from datetime import timedelta + +from temporalio import workflow +from temporalio.exceptions import CancelledError, NexusOperationError + +with workflow.unsafe.imports_passed_through(): + from hello_nexus.service import MyInput, MyNexusService, MyOutput + +NEXUS_ENDPOINT = "my-nexus-endpoint-name" + + +@workflow.defn +class HelloCallerWorkflow: + """ + A workflow that calls multiple Nexus operations concurrently and cancels them + after the first one completes. + + This demonstrates the WAIT_REQUESTED cancellation type, which ensures the handler + receives the cancellation request but doesn't wait for it to complete. + """ + + def __init__(self): + self.nexus_client = workflow.create_nexus_client( + service=MyNexusService, + endpoint=NEXUS_ENDPOINT, + # Set the cancellation type to WAIT_REQUESTED. This means that the caller + # will wait for the cancellation request to be received by the handler before + # proceeding with the cancellation. + # + # By default, the caller would wait until the operation is completed. + operation_options=workflow.NexusOperationOptions( + schedule_to_close_timeout=timedelta(seconds=10), + cancellation_type=workflow.NexusOperationCancellationType.WAIT_REQUESTED, + ), + ) + + @workflow.run + async def run(self, message: str) -> str: + """ + Execute multiple Nexus operations concurrently and return the first result. + + Args: + message: The message to pass to the Nexus operations + + Returns: + The result from the first completed operation + """ + # Names to call the operation with concurrently + names = ["Nexus-1", "Nexus-2", "Nexus-3", "Nexus-4", "Nexus-5"] + + # Create a list to store operation tasks + tasks = [] + + # Create our cancellation scope. Within this scope we call the nexus operation + # asynchronously for each name. + async def start_operations(): + for name in names: + # Start each operation asynchronously + handle = await self.nexus_client.start_operation( + MyNexusService.my_workflow_run_operation, + MyInput(name), + ) + # Create a task that waits for the operation result + tasks.append(asyncio.create_task(handle)) + + # Execute all nexus operations within a try block so we can cancel them + try: + # Start all operations + await start_operations() + + # Wait for the first operation to complete + workflow.logger.info(f"Started {len(tasks)} operations, waiting for first to complete...") + done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) + + # Get the result from the first completed operation + result = await done.pop() + workflow.logger.info(f"First operation completed with: {result.message}") + + # Cancel all remaining operations + workflow.logger.info(f"Cancelling {len(pending)} remaining operations...") + for task in pending: + task.cancel() + + # Wait for all operations to receive cancellation requests before proceeding + # Note: Once the workflow completes any pending cancellation requests are + # dropped by the server. In general, it is a good practice to wait for all + # cancellation requests to be processed before completing the workflow. + for task in pending: + try: + await task + except (NexusOperationError, CancelledError) as e: + # If the operation was cancelled, we can ignore the failure + if isinstance(e, NexusOperationError) and isinstance( + e.__cause__, CancelledError + ): + workflow.logger.info("Operation was cancelled") + continue + if isinstance(e, CancelledError): + workflow.logger.info("Operation was cancelled") + continue + raise e + + return result.message + + except Exception as e: + workflow.logger.error(f"Error during operation execution: {e}") + raise diff --git a/nexus_cancel/handler/__init__.py b/nexus_cancel/handler/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/nexus_cancel/handler/service_handler.py b/nexus_cancel/handler/service_handler.py new file mode 100644 index 00000000..efb91c0a --- /dev/null +++ b/nexus_cancel/handler/service_handler.py @@ -0,0 +1,54 @@ +""" +This file demonstrates how to implement a Nexus service handler with a +workflow-backed operation that can be cancelled. + +This sample reuses the service definition from hello_nexus but uses a custom +workflow implementation that demonstrates cancellation handling. +""" + +from __future__ import annotations + +import uuid + +import nexusrpc +from temporalio import nexus + +from hello_nexus.service import MyInput, MyNexusService, MyOutput +from nexus_cancel.handler.workflows import HelloHandlerWorkflow + + +@nexusrpc.handler.service_handler(service=MyNexusService) +class MyNexusServiceHandler: + """ + Handler for MyNexusService that demonstrates cancellation. + + This handler implements the workflow run operation using a workflow that + handles cancellation gracefully. + """ + + @nexus.workflow_run_operation + async def my_workflow_run_operation( + self, ctx: nexus.WorkflowRunOperationContext, input: MyInput + ) -> nexus.WorkflowHandle[MyOutput]: + """ + Start a workflow that can be cancelled. + + The workflow will receive an asyncio.CancelledError when the caller + requests cancellation. + """ + # Use the request ID as the workflow ID for idempotency + return await ctx.start_workflow( + HelloHandlerWorkflow.run, + input, + id=f"hello-handler-{ctx.request_id}", + ) + + # Note: In a real implementation, you would also implement my_sync_operation + # from the service. For this cancellation demo, we only implement the + # workflow run operation which can be cancelled. + @nexusrpc.handler.sync_operation + async def my_sync_operation( + self, ctx: nexusrpc.handler.StartOperationContext, input: MyInput + ) -> MyOutput: + """Sync operation that cannot be cancelled.""" + return MyOutput(message=f"Hello {input.name} from sync operation!") diff --git a/nexus_cancel/handler/worker.py b/nexus_cancel/handler/worker.py new file mode 100644 index 00000000..ef891412 --- /dev/null +++ b/nexus_cancel/handler/worker.py @@ -0,0 +1,43 @@ +""" +Worker for the handler namespace that processes Nexus operations and workflows. +""" + +import asyncio + +from temporalio.client import Client +from temporalio.envconfig import ClientConfig +from temporalio.worker import Worker + +from nexus_cancel.handler.service_handler import MyNexusServiceHandler +from nexus_cancel.handler.workflows import HelloHandlerWorkflow + +NAMESPACE = "my-target-namespace" +TASK_QUEUE = "my-handler-task-queue" + + +async def main(): + """Start the handler worker.""" + config = ClientConfig.load_client_connect_config() + config.setdefault("target_host", "localhost:7233") + config.setdefault("namespace", NAMESPACE) + client = await Client.connect(**config) + + # Create the service handler + service_handler = MyNexusServiceHandler() + + # Start worker with both workflows and Nexus service + worker = Worker( + client, + task_queue=TASK_QUEUE, + workflows=[HelloHandlerWorkflow], + # The nexus_services parameter registers the Nexus service handler + nexus_services=[service_handler], + ) + + print(f"Starting handler worker on namespace '{NAMESPACE}', task queue '{TASK_QUEUE}'") + print("Worker is ready to process Nexus operations and workflows...") + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/nexus_cancel/handler/workflows.py b/nexus_cancel/handler/workflows.py new file mode 100644 index 00000000..5bad6734 --- /dev/null +++ b/nexus_cancel/handler/workflows.py @@ -0,0 +1,49 @@ +""" +This workflow is started by the hello Nexus operation. +It demonstrates how to handle cancellation from the caller workflow. +""" + +import asyncio + +from temporalio import workflow + +with workflow.unsafe.imports_passed_through(): + from hello_nexus.service import MyInput, MyOutput + + +@workflow.defn +class HelloHandlerWorkflow: + """ + A workflow that handles the hello operation and responds to cancellation. + + This workflow simulates work by sleeping for a random duration, then handles + cancellation gracefully if requested. + """ + + @workflow.run + async def run(self, input: MyInput) -> MyOutput: + try: + # Sleep for a random duration to simulate some work (0-5 seconds) + random_seconds = workflow.random().randint(0, 5) + workflow.logger.info(f"Working for {random_seconds} seconds...") + await asyncio.sleep(random_seconds) + + # Return the greeting message + return MyOutput(message=f"Hello {input.name} 👋") + + except asyncio.CancelledError: + # Simulate some cleanup work after cancellation is requested + # Use a shield to prevent this cleanup from being cancelled + workflow.logger.info("Received cancellation request, performing cleanup...") + + try: + cleanup_seconds = workflow.random().randint(0, 5) + # Shield this sleep from cancellation to simulate cleanup work + await asyncio.shield(asyncio.sleep(cleanup_seconds)) + workflow.logger.info("HelloHandlerWorkflow was cancelled successfully.") + except asyncio.CancelledError: + # Even if shield is cancelled, log completion + workflow.logger.info("HelloHandlerWorkflow was cancelled successfully.") + + # Re-raise the cancellation + raise diff --git a/nexus_cancel/service.py b/nexus_cancel/service.py new file mode 100644 index 00000000..6ee83dd6 --- /dev/null +++ b/nexus_cancel/service.py @@ -0,0 +1,11 @@ +""" +This file re-exports the Nexus service from the hello_nexus sample. + +The nexus_cancel sample demonstrates cancellation using the same service +definition as hello_nexus to show how to cancel operations. +""" + +# Re-export the service from hello_nexus +from hello_nexus.service import MyInput, MyNexusService, MyOutput + +__all__ = ["MyInput", "MyNexusService", "MyOutput"]