Skip to content

Commit

Permalink
WIP: cancellation
Browse files Browse the repository at this point in the history
  • Loading branch information
dandavison committed Jul 9, 2024
1 parent 6eb176a commit 2e68dad
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 2 deletions.
2 changes: 0 additions & 2 deletions temporalio/worker/_workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,8 +422,6 @@ def activate(
continue
i += 1

if seen_completion:
self._warn_if_unfinished_handlers()
return self._current_completion

def _apply(
Expand Down
47 changes: 47 additions & 0 deletions tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -5632,6 +5632,53 @@ async def finish(self):
self.workflow_may_exit = True


@workflow.defn
class UpdateCancellationWorkflow(CoroutinesUseLockWorkflow):
def __init__(self) -> None:
self.non_terminating_operation_has_started = False

@workflow.run
async def run(self) -> str:
await workflow.wait_condition(lambda: False)
return "unreachable"

@workflow.update
async def wait_until_non_terminating_operation_has_started(self) -> None:
await workflow.wait_condition(
lambda: self.non_terminating_operation_has_started
)

@workflow.update
async def non_terminating_operation(self) -> str:
self.non_terminating_operation_has_started = True
await workflow.wait_condition(lambda: False)
return "unreachable"


async def test_update_cancellation(client: Client):
async with new_worker(client, UpdateCancellationWorkflow) as worker:
wf_handle = await client.start_workflow(
UpdateCancellationWorkflow.run,
id=str(uuid.uuid4()),
task_queue=worker.task_queue,
)
# Asynchronously run an update that will never complete
non_terminating_update_task = asyncio.create_task(
wf_handle.execute_update(
UpdateCancellationWorkflow.non_terminating_operation
)
)
print("wait until handler started...")
# Wait until we know the update handler has started executing
await wf_handle.execute_update(
UpdateCancellationWorkflow.wait_until_non_terminating_operation_has_started
)
print("cancel the workflow")
await wf_handle.cancel()
print("await non_terminating_update_task...")
await non_terminating_update_task


async def _do_workflow_coroutines_lock_or_semaphore_test(
client: Client,
params: UseLockOrSemaphoreWorkflowParameters,
Expand Down

0 comments on commit 2e68dad

Please sign in to comment.