Skip to content

Commit

Permalink
WIP: cancellation
Browse files Browse the repository at this point in the history
  • Loading branch information
dandavison committed Jul 5, 2024
1 parent 6eb176a commit 94d0d38
Showing 1 changed file with 49 additions and 0 deletions.
49 changes: 49 additions & 0 deletions tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -5632,6 +5632,55 @@ async def finish(self):
self.workflow_may_exit = True


@workflow.defn
class HandlerCoroutinesUseLockWithCancellationWorkflow(CoroutinesUseLockWorkflow):
def __init__(self) -> None:
self.non_terminating_operation_has_started = False

@workflow.run
async def run(self) -> str:
await workflow.wait_condition(lambda: False)
return "unreachable"

@workflow.update
async def wait_until_non_terminating_operation_has_started(self) -> None:
await workflow.wait_condition(
lambda: self.non_terminating_operation_has_started
)

@workflow.update
async def non_terminating_operation(self) -> str:
self.non_terminating_operation_has_started = True
await workflow.wait_condition(lambda: False)
return "unreachable"


async def test_workflow_coroutines_can_use_lock_with_cancellation(client: Client):
async with new_worker(
client, HandlerCoroutinesUseLockWithCancellationWorkflow
) as worker:
wf_handle = await client.start_workflow(
HandlerCoroutinesUseLockWithCancellationWorkflow.run,
id=str(uuid.uuid4()),
task_queue=worker.task_queue,
)
# Asynchronously run an update that will never complete
non_terminating_update_task = asyncio.create_task(
wf_handle.execute_update(
HandlerCoroutinesUseLockWithCancellationWorkflow.non_terminating_operation
)
)
print("wait until handler started...")
# Wait until we know the update handler has started executing
await wf_handle.execute_update(
HandlerCoroutinesUseLockWithCancellationWorkflow.wait_until_non_terminating_operation_has_started
)
print("cancel the workflow")
await wf_handle.cancel()
print("await non_terminating_update_task...")
await non_terminating_update_task


async def _do_workflow_coroutines_lock_or_semaphore_test(
client: Client,
params: UseLockOrSemaphoreWorkflowParameters,
Expand Down

0 comments on commit 94d0d38

Please sign in to comment.