From 1f11d5f885775328ea495125cd463ae36a421340 Mon Sep 17 00:00:00 2001 From: Andy Staples Date: Thu, 23 Oct 2025 14:23:22 -0600 Subject: [PATCH 1/8] Initial work - not complete --- durabletask/worker.py | 131 +++++++++++++++--- .../test_worker_concurrency_loop.py | 12 +- .../test_worker_concurrency_loop_async.py | 12 +- 3 files changed, 128 insertions(+), 27 deletions(-) diff --git a/durabletask/worker.py b/durabletask/worker.py index 09f6559..de72285 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -346,7 +346,7 @@ def __init__( else: self._interceptors = None - self._async_worker_manager = _AsyncWorkerManager(self._concurrency_options) + self._async_worker_manager = _AsyncWorkerManager(self._concurrency_options, self._logger) @property def concurrency_options(self) -> ConcurrencyOptions: @@ -533,6 +533,7 @@ def stream_reader(): if work_item.HasField("orchestratorRequest"): self._async_worker_manager.submit_orchestration( self._execute_orchestrator, + self._cancel_orchestrator, work_item.orchestratorRequest, stub, work_item.completionToken, @@ -540,6 +541,7 @@ def stream_reader(): elif work_item.HasField("activityRequest"): self._async_worker_manager.submit_activity( self._execute_activity, + self._cancel_activity, work_item.activityRequest, stub, work_item.completionToken, @@ -547,6 +549,7 @@ def stream_reader(): elif work_item.HasField("entityRequest"): self._async_worker_manager.submit_entity_batch( self._execute_entity_batch, + self._cancel_entity_batch, work_item.entityRequest, stub, work_item.completionToken, @@ -554,6 +557,7 @@ def stream_reader(): elif work_item.HasField("entityRequestV2"): self._async_worker_manager.submit_entity_batch( self._execute_entity_batch, + self._cancel_entity_batch, work_item.entityRequestV2, stub, work_item.completionToken @@ -670,6 +674,19 @@ def _execute_orchestrator( f"Failed to deliver orchestrator response for '{req.instanceId}' to sidecar: {ex}" ) + def _cancel_orchestrator( + self, + req: pb.OrchestratorRequest, + stub: stubs.TaskHubSidecarServiceStub, + completionToken, + ): + stub.AbandonTaskOrchestratorWorkItem( + pb.AbandonOrchestrationTaskRequest( + completionToken=completionToken + ) + ) + self._logger.info(f"Cancelled orchestration task for invocation ID: {req.instanceId}") + def _execute_activity( self, req: pb.ActivityRequest, @@ -703,6 +720,19 @@ def _execute_activity( f"Failed to deliver activity response for '{req.name}#{req.taskId}' of orchestration ID '{instance_id}' to sidecar: {ex}" ) + def _cancel_activity( + self, + req: pb.ActivityRequest, + stub: stubs.TaskHubSidecarServiceStub, + completionToken, + ): + stub.AbandonTaskActivityWorkItem( + pb.AbandonActivityTaskRequest( + completionToken=completionToken + ) + ) + self._logger.info(f"Cancelled activity task for task ID: {req.taskId} on orchestration ID: {req.orchestrationInstance.instanceId}") + def _execute_entity_batch( self, req: Union[pb.EntityBatchRequest, pb.EntityRequest], @@ -771,6 +801,19 @@ def _execute_entity_batch( return batch_result + def _cancel_entity_batch( + self, + req: pb.EntityBatchRequest, + stub: stubs.TaskHubSidecarServiceStub, + completionToken, + ): + stub.AbandonTaskEntityWorkItem( + pb.AbandonEntityTaskRequest( + completionToken=completionToken + ) + ) + self._logger.info(f"Cancelled entity batch task for entity instance ID: {req.instanceId}") + class _RuntimeOrchestrationContext(task.OrchestrationContext): _generator: Optional[Generator[task.Task, Any, Any]] @@ -1368,7 +1411,7 @@ def process_event( timer_id = event.timerFired.timerId timer_task = ctx._pending_tasks.pop(timer_id, None) if not timer_task: - # TODO: Should this be an error? When would it ever happen? + # TODO: Should this be an error? would it ever happen? if not ctx._is_replaying: self._logger.warning( f"{ctx.instance_id}: Ignoring unexpected timerFired event with ID = {timer_id}." @@ -1920,8 +1963,10 @@ def _is_suspendable(event: pb.HistoryEvent) -> bool: class _AsyncWorkerManager: - def __init__(self, concurrency_options: ConcurrencyOptions): + def __init__(self, concurrency_options: ConcurrencyOptions, logger: logging.Logger): self.concurrency_options = concurrency_options + self._logger = logger + self.activity_semaphore = None self.orchestration_semaphore = None self.entity_semaphore = None @@ -2031,17 +2076,47 @@ async def run(self): ) # Start background consumers for each work type - if self.activity_queue is not None and self.orchestration_queue is not None \ - and self.entity_batch_queue is not None: - await asyncio.gather( - self._consume_queue(self.activity_queue, self.activity_semaphore), - self._consume_queue( - self.orchestration_queue, self.orchestration_semaphore - ), - self._consume_queue( - self.entity_batch_queue, self.entity_semaphore + try: + if self.activity_queue is not None and self.orchestration_queue is not None \ + and self.entity_batch_queue is not None: + await asyncio.gather( + self._consume_queue(self.activity_queue, self.activity_semaphore), + self._consume_queue( + self.orchestration_queue, self.orchestration_semaphore + ), + self._consume_queue( + self.entity_batch_queue, self.entity_semaphore + ) ) - ) + except Exception as queue_exception: + self._logger.error(f"Uncaught error in activity manager thread pool: {queue_exception}") + while self.activity_queue is not None and not self.activity_queue.empty(): + try: + func, cancellation_func, args, kwargs = self.activity_queue.get_nowait() + await self._run_func(cancellation_func, *args, **kwargs) + self._logger.error(f"Activity work item args: {args}, kwargs: {kwargs}") + except asyncio.QueueEmpty: + pass + except Exception as cancellation_exception: + self._logger.error(f"Uncaught error while cancelling activity work item: {cancellation_exception}") + while self.orchestration_queue is not None and not self.orchestration_queue.empty(): + try: + func, cancellation_func, args, kwargs = self.orchestration_queue.get_nowait() + await self._run_func(cancellation_func, *args, **kwargs) + self._logger.error(f"Orchestration work item args: {args}, kwargs: {kwargs}") + except asyncio.QueueEmpty: + pass + except Exception as cancellation_exception: + self._logger.error(f"Uncaught error while cancelling orchestration work item: {cancellation_exception}") + while self.entity_batch_queue is not None and not self.entity_batch_queue.empty(): + try: + func, cancellation_func, args, kwargs = self.entity_batch_queue.get_nowait() + await self._run_func(cancellation_func, *args, **kwargs) + self._logger.error(f"Entity batch work item args: {args}, kwargs: {kwargs}") + except asyncio.QueueEmpty: + pass + except Exception as cancellation_exception: + self._logger.error(f"Uncaught error while cancelling entity batch work item: {cancellation_exception}") async def _consume_queue(self, queue: asyncio.Queue, semaphore: asyncio.Semaphore): # List to track running tasks @@ -2061,7 +2136,7 @@ async def _consume_queue(self, queue: asyncio.Queue, semaphore: asyncio.Semaphor except asyncio.TimeoutError: continue - func, args, kwargs = work + func, cancellation_func, args, kwargs = work # Create a concurrent task for processing task = asyncio.create_task( self._process_work_item(semaphore, queue, func, args, kwargs) @@ -2092,8 +2167,8 @@ async def _run_func(self, func, *args, **kwargs): self.thread_pool, lambda: func(*args, **kwargs) ) - def submit_activity(self, func, *args, **kwargs): - work_item = (func, args, kwargs) + def submit_activity(self, func, cancellation_func, *args, **kwargs): + work_item = (func, cancellation_func, args, kwargs) self._ensure_queues_for_current_loop() if self.activity_queue is not None: self.activity_queue.put_nowait(work_item) @@ -2101,8 +2176,8 @@ def submit_activity(self, func, *args, **kwargs): # No event loop running, store in pending list self._pending_activity_work.append(work_item) - def submit_orchestration(self, func, *args, **kwargs): - work_item = (func, args, kwargs) + def submit_orchestration(self, func, cancellation_func, *args, **kwargs): + work_item = (func, cancellation_func, args, kwargs) self._ensure_queues_for_current_loop() if self.orchestration_queue is not None: self.orchestration_queue.put_nowait(work_item) @@ -2110,8 +2185,8 @@ def submit_orchestration(self, func, *args, **kwargs): # No event loop running, store in pending list self._pending_orchestration_work.append(work_item) - def submit_entity_batch(self, func, *args, **kwargs): - work_item = (func, args, kwargs) + def submit_entity_batch(self, func, cancellation_func, *args, **kwargs): + work_item = (func, cancellation_func, args, kwargs) self._ensure_queues_for_current_loop() if self.entity_batch_queue is not None: self.entity_batch_queue.put_nowait(work_item) @@ -2123,7 +2198,7 @@ def shutdown(self): self._shutdown = True self.thread_pool.shutdown(wait=True) - def reset_for_new_run(self): + async def reset_for_new_run(self): """Reset the manager state for a new run.""" self._shutdown = False # Clear any existing queues - they'll be recreated when needed @@ -2132,18 +2207,28 @@ def reset_for_new_run(self): # This ensures no items from previous runs remain try: while not self.activity_queue.empty(): - self.activity_queue.get_nowait() + func, cancellation_func, args, kwargs = self.activity_queue.get_nowait() + await self._run_func(cancellation_func, *args, **kwargs) except Exception: pass if self.orchestration_queue is not None: try: while not self.orchestration_queue.empty(): - self.orchestration_queue.get_nowait() + func, cancellation_func, args, kwargs = self.orchestration_queue.get_nowait() + await self._run_func(cancellation_func, *args, **kwargs) + except Exception: + pass + if self.entity_batch_queue is not None: + try: + while not self.entity_batch_queue.empty(): + func, cancellation_func, args, kwargs = self.entity_batch_queue.get_nowait() + await self._run_func(cancellation_func, *args, **kwargs) except Exception: pass # Clear pending work lists self._pending_activity_work.clear() self._pending_orchestration_work.clear() + self._pending_entity_batch_work.clear() # Export public API diff --git a/tests/durabletask/test_worker_concurrency_loop.py b/tests/durabletask/test_worker_concurrency_loop.py index de6753b..9a20f48 100644 --- a/tests/durabletask/test_worker_concurrency_loop.py +++ b/tests/durabletask/test_worker_concurrency_loop.py @@ -52,13 +52,21 @@ def dummy_orchestrator(req, stub, completionToken): time.sleep(0.1) stub.CompleteOrchestratorTask('ok') + def cancel_dummy_orchestrator(req, stub, completionToken): + pass + def dummy_activity(req, stub, completionToken): time.sleep(0.1) stub.CompleteActivityTask('ok') + def cancel_dummy_activity(req, stub, completionToken): + pass + # Patch the worker's _execute_orchestrator and _execute_activity worker._execute_orchestrator = dummy_orchestrator + worker._cancel_orchestrator = cancel_dummy_orchestrator worker._execute_activity = dummy_activity + worker._cancel_activity = cancel_dummy_activity orchestrator_requests = [DummyRequest('orchestrator', f'orch{i}') for i in range(3)] activity_requests = [DummyRequest('activity', f'act{i}') for i in range(4)] @@ -67,9 +75,9 @@ async def run_test(): # Start the worker manager's run loop in the background worker_task = asyncio.create_task(worker._async_worker_manager.run()) for req in orchestrator_requests: - worker._async_worker_manager.submit_orchestration(dummy_orchestrator, req, stub, DummyCompletionToken()) + worker._async_worker_manager.submit_orchestration(dummy_orchestrator, cancel_dummy_orchestrator, req, stub, DummyCompletionToken()) for req in activity_requests: - worker._async_worker_manager.submit_activity(dummy_activity, req, stub, DummyCompletionToken()) + worker._async_worker_manager.submit_activity(dummy_activity, cancel_dummy_activity, req, stub, DummyCompletionToken()) await asyncio.sleep(1.0) orchestrator_count = sum(1 for t, _ in stub.completed if t == 'orchestrator') activity_count = sum(1 for t, _ in stub.completed if t == 'activity') diff --git a/tests/durabletask/test_worker_concurrency_loop_async.py b/tests/durabletask/test_worker_concurrency_loop_async.py index c7ba238..0ef63e0 100644 --- a/tests/durabletask/test_worker_concurrency_loop_async.py +++ b/tests/durabletask/test_worker_concurrency_loop_async.py @@ -50,13 +50,21 @@ async def dummy_orchestrator(req, stub, completionToken): await asyncio.sleep(0.1) stub.CompleteOrchestratorTask('ok') + async def cancel_dummy_orchestrator(req, stub, completionToken): + pass + async def dummy_activity(req, stub, completionToken): await asyncio.sleep(0.1) stub.CompleteActivityTask('ok') + async def cancel_dummy_activity(req, stub, completionToken): + pass + # Patch the worker's _execute_orchestrator and _execute_activity grpc_worker._execute_orchestrator = dummy_orchestrator + grpc_worker._cancel_orchestrator = cancel_dummy_orchestrator grpc_worker._execute_activity = dummy_activity + grpc_worker._cancel_activity = cancel_dummy_activity orchestrator_requests = [DummyRequest('orchestrator', f'orch{i}') for i in range(3)] activity_requests = [DummyRequest('activity', f'act{i}') for i in range(4)] @@ -66,9 +74,9 @@ async def run_test(): stub.completed.clear() worker_task = asyncio.create_task(grpc_worker._async_worker_manager.run()) for req in orchestrator_requests: - grpc_worker._async_worker_manager.submit_orchestration(dummy_orchestrator, req, stub, DummyCompletionToken()) + grpc_worker._async_worker_manager.submit_orchestration(dummy_orchestrator, cancel_dummy_orchestrator, req, stub, DummyCompletionToken()) for req in activity_requests: - grpc_worker._async_worker_manager.submit_activity(dummy_activity, req, stub, DummyCompletionToken()) + grpc_worker._async_worker_manager.submit_activity(dummy_activity, cancel_dummy_activity, req, stub, DummyCompletionToken()) await asyncio.sleep(1.0) orchestrator_count = sum(1 for t, _ in stub.completed if t == 'orchestrator') activity_count = sum(1 for t, _ in stub.completed if t == 'activity') From e48f87c72ac760ee0fa024f892aeb8e0d717c96c Mon Sep 17 00:00:00 2001 From: Andy Staples Date: Thu, 6 Nov 2025 13:59:10 -0700 Subject: [PATCH 2/8] Fix for crash in worker run loop --- durabletask/worker.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/durabletask/worker.py b/durabletask/worker.py index de72285..e7d0986 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -2117,6 +2117,7 @@ async def run(self): pass except Exception as cancellation_exception: self._logger.error(f"Uncaught error while cancelling entity batch work item: {cancellation_exception}") + self.shutdown() async def _consume_queue(self, queue: asyncio.Queue, semaphore: asyncio.Semaphore): # List to track running tasks @@ -2139,16 +2140,18 @@ async def _consume_queue(self, queue: asyncio.Queue, semaphore: asyncio.Semaphor func, cancellation_func, args, kwargs = work # Create a concurrent task for processing task = asyncio.create_task( - self._process_work_item(semaphore, queue, func, args, kwargs) + self._process_work_item(semaphore, queue, func, cancellation_func, args, kwargs) ) running_tasks.add(task) async def _process_work_item( - self, semaphore: asyncio.Semaphore, queue: asyncio.Queue, func, args, kwargs + self, semaphore: asyncio.Semaphore, queue: asyncio.Queue, func, cancellation_func, args, kwargs ): async with semaphore: try: await self._run_func(func, *args, **kwargs) + except Exception as work_exception: + await self._run_func(cancellation_func, *args, **kwargs) finally: queue.task_done() @@ -2168,6 +2171,8 @@ async def _run_func(self, func, *args, **kwargs): ) def submit_activity(self, func, cancellation_func, *args, **kwargs): + if self._shutdown: + raise RuntimeError("Cannot submit new work items after shutdown has been initiated.") work_item = (func, cancellation_func, args, kwargs) self._ensure_queues_for_current_loop() if self.activity_queue is not None: @@ -2177,6 +2182,8 @@ def submit_activity(self, func, cancellation_func, *args, **kwargs): self._pending_activity_work.append(work_item) def submit_orchestration(self, func, cancellation_func, *args, **kwargs): + if self._shutdown: + raise RuntimeError("Cannot submit new work items after shutdown has been initiated.") work_item = (func, cancellation_func, args, kwargs) self._ensure_queues_for_current_loop() if self.orchestration_queue is not None: @@ -2186,6 +2193,8 @@ def submit_orchestration(self, func, cancellation_func, *args, **kwargs): self._pending_orchestration_work.append(work_item) def submit_entity_batch(self, func, cancellation_func, *args, **kwargs): + if self._shutdown: + raise RuntimeError("Cannot submit new work items after shutdown has been initiated.") work_item = (func, cancellation_func, args, kwargs) self._ensure_queues_for_current_loop() if self.entity_batch_queue is not None: From 57b7673160225248b3e8b6a2845a14bb4619d567 Mon Sep 17 00:00:00 2001 From: Andy Staples Date: Thu, 6 Nov 2025 14:00:59 -0700 Subject: [PATCH 3/8] Log clarity --- durabletask/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/durabletask/worker.py b/durabletask/worker.py index e7d0986..c005627 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -2089,7 +2089,7 @@ async def run(self): ) ) except Exception as queue_exception: - self._logger.error(f"Uncaught error in activity manager thread pool: {queue_exception}") + self._logger.error(f"Shutting down worker - Uncaught error in activity manager thread pool: {queue_exception}") while self.activity_queue is not None and not self.activity_queue.empty(): try: func, cancellation_func, args, kwargs = self.activity_queue.get_nowait() From a79dd9d58add04e304c07df3170322a53eb284bf Mon Sep 17 00:00:00 2001 From: Andy Staples Date: Thu, 6 Nov 2025 14:01:32 -0700 Subject: [PATCH 4/8] Revert comment --- durabletask/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/durabletask/worker.py b/durabletask/worker.py index c005627..bffd6f2 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -1411,7 +1411,7 @@ def process_event( timer_id = event.timerFired.timerId timer_task = ctx._pending_tasks.pop(timer_id, None) if not timer_task: - # TODO: Should this be an error? would it ever happen? + # TODO: Should this be an error? When would it ever happen? if not ctx._is_replaying: self._logger.warning( f"{ctx.instance_id}: Ignoring unexpected timerFired event with ID = {timer_id}." From 693071f51f206807d975a8f6e4e92e0cca4c1748 Mon Sep 17 00:00:00 2001 From: Andy Staples Date: Thu, 6 Nov 2025 14:03:36 -0700 Subject: [PATCH 5/8] Linting fix --- durabletask/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/durabletask/worker.py b/durabletask/worker.py index bffd6f2..d819d55 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -2150,7 +2150,7 @@ async def _process_work_item( async with semaphore: try: await self._run_func(func, *args, **kwargs) - except Exception as work_exception: + except Exception: await self._run_func(cancellation_func, *args, **kwargs) finally: queue.task_done() From c63be78195afe03cf218cbe3b787c69fb070662c Mon Sep 17 00:00:00 2001 From: Andy Staples Date: Thu, 6 Nov 2025 14:09:39 -0700 Subject: [PATCH 6/8] Fix tests --- .../durabletask/test_worker_concurrency_loop.py | 9 +++++++-- .../test_worker_concurrency_loop_async.py | 16 ++++++++-------- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/tests/durabletask/test_worker_concurrency_loop.py b/tests/durabletask/test_worker_concurrency_loop.py index 9a20f48..6fd1270 100644 --- a/tests/durabletask/test_worker_concurrency_loop.py +++ b/tests/durabletask/test_worker_concurrency_loop.py @@ -128,8 +128,8 @@ def fn(*args, **kwargs): # Submit more work than concurrency allows for i in range(5): - manager.submit_orchestration(make_work("orch", i)) - manager.submit_activity(make_work("act", i)) + manager.submit_orchestration(make_work("orch", i), lambda *a, **k: None) + manager.submit_activity(make_work("act", i), lambda *a, **k: None) # Run the manager loop in a thread (sync context) def run_manager(): @@ -139,6 +139,11 @@ def run_manager(): t.start() time.sleep(1.5) # Let work process manager.shutdown() + + # Ensure the queues have been started + if (manager.activity_queue is None or manager.orchestration_queue is None): + raise RuntimeError("Worker manager queues not initialized") + # Unblock the consumers by putting dummy items in the queues manager.activity_queue.put_nowait((lambda: None, (), {})) manager.orchestration_queue.put_nowait((lambda: None, (), {})) diff --git a/tests/durabletask/test_worker_concurrency_loop_async.py b/tests/durabletask/test_worker_concurrency_loop_async.py index 0ef63e0..21f5c6d 100644 --- a/tests/durabletask/test_worker_concurrency_loop_async.py +++ b/tests/durabletask/test_worker_concurrency_loop_async.py @@ -46,25 +46,25 @@ def test_worker_concurrency_loop_async(): grpc_worker = TaskHubGrpcWorker(concurrency_options=options) stub = DummyStub() - async def dummy_orchestrator(req, stub, completionToken): + async def dummy_orchestrator(self, req, stub, completionToken): await asyncio.sleep(0.1) stub.CompleteOrchestratorTask('ok') - async def cancel_dummy_orchestrator(req, stub, completionToken): + async def cancel_dummy_orchestrator(self, req, stub, completionToken): pass - async def dummy_activity(req, stub, completionToken): + async def dummy_activity(self, req, stub, completionToken): await asyncio.sleep(0.1) stub.CompleteActivityTask('ok') - async def cancel_dummy_activity(req, stub, completionToken): + async def cancel_dummy_activity(self, req, stub, completionToken): pass # Patch the worker's _execute_orchestrator and _execute_activity - grpc_worker._execute_orchestrator = dummy_orchestrator - grpc_worker._cancel_orchestrator = cancel_dummy_orchestrator - grpc_worker._execute_activity = dummy_activity - grpc_worker._cancel_activity = cancel_dummy_activity + grpc_worker._execute_orchestrator = dummy_orchestrator.__get__(grpc_worker, TaskHubGrpcWorker) + grpc_worker._cancel_orchestrator = cancel_dummy_orchestrator.__get__(grpc_worker, TaskHubGrpcWorker) + grpc_worker._execute_activity = dummy_activity.__get__(grpc_worker, TaskHubGrpcWorker) + grpc_worker._cancel_activity = cancel_dummy_activity.__get__(grpc_worker, TaskHubGrpcWorker) orchestrator_requests = [DummyRequest('orchestrator', f'orch{i}') for i in range(3)] activity_requests = [DummyRequest('activity', f'act{i}') for i in range(4)] From e4999371b37ef91f38362bd0999f154d8548afea Mon Sep 17 00:00:00 2001 From: Andy Staples Date: Thu, 6 Nov 2025 14:31:01 -0700 Subject: [PATCH 7/8] Feedback, fix tests --- durabletask/worker.py | 24 +++++++++++-------- .../test_worker_concurrency_loop_async.py | 8 +++---- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/durabletask/worker.py b/durabletask/worker.py index d819d55..dc6745f 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -803,7 +803,7 @@ def _execute_entity_batch( def _cancel_entity_batch( self, - req: pb.EntityBatchRequest, + req: Union[pb.EntityBatchRequest, pb.EntityRequest], stub: stubs.TaskHubSidecarServiceStub, completionToken, ): @@ -812,7 +812,7 @@ def _cancel_entity_batch( completionToken=completionToken ) ) - self._logger.info(f"Cancelled entity batch task for entity instance ID: {req.instanceId}") + self._logger.info(f"Cancelled entity batch task for instance ID: {req.instanceId}") class _RuntimeOrchestrationContext(task.OrchestrationContext): @@ -2089,13 +2089,14 @@ async def run(self): ) ) except Exception as queue_exception: - self._logger.error(f"Shutting down worker - Uncaught error in activity manager thread pool: {queue_exception}") + self._logger.error(f"Shutting down worker - Uncaught error in worker manager: {queue_exception}") while self.activity_queue is not None and not self.activity_queue.empty(): try: func, cancellation_func, args, kwargs = self.activity_queue.get_nowait() await self._run_func(cancellation_func, *args, **kwargs) self._logger.error(f"Activity work item args: {args}, kwargs: {kwargs}") except asyncio.QueueEmpty: + # Queue was empty, no cancellation needed pass except Exception as cancellation_exception: self._logger.error(f"Uncaught error while cancelling activity work item: {cancellation_exception}") @@ -2105,6 +2106,7 @@ async def run(self): await self._run_func(cancellation_func, *args, **kwargs) self._logger.error(f"Orchestration work item args: {args}, kwargs: {kwargs}") except asyncio.QueueEmpty: + # Queue was empty, no cancellation needed pass except Exception as cancellation_exception: self._logger.error(f"Uncaught error while cancelling orchestration work item: {cancellation_exception}") @@ -2114,6 +2116,7 @@ async def run(self): await self._run_func(cancellation_func, *args, **kwargs) self._logger.error(f"Entity batch work item args: {args}, kwargs: {kwargs}") except asyncio.QueueEmpty: + # Queue was empty, no cancellation needed pass except Exception as cancellation_exception: self._logger.error(f"Uncaught error while cancelling entity batch work item: {cancellation_exception}") @@ -2150,7 +2153,8 @@ async def _process_work_item( async with semaphore: try: await self._run_func(func, *args, **kwargs) - except Exception: + except Exception as work_exception: + self._logger.error(f"Uncaught error while processing work item, item will be abandoned: {work_exception}") await self._run_func(cancellation_func, *args, **kwargs) finally: queue.task_done() @@ -2218,22 +2222,22 @@ async def reset_for_new_run(self): while not self.activity_queue.empty(): func, cancellation_func, args, kwargs = self.activity_queue.get_nowait() await self._run_func(cancellation_func, *args, **kwargs) - except Exception: - pass + except Exception as reset_exception: + self._logger.warning(f"Error while clearing activity queue during reset: {reset_exception}") if self.orchestration_queue is not None: try: while not self.orchestration_queue.empty(): func, cancellation_func, args, kwargs = self.orchestration_queue.get_nowait() await self._run_func(cancellation_func, *args, **kwargs) - except Exception: - pass + except Exception as reset_exception: + self._logger.warning(f"Error while clearing orchestration queue during reset: {reset_exception}") if self.entity_batch_queue is not None: try: while not self.entity_batch_queue.empty(): func, cancellation_func, args, kwargs = self.entity_batch_queue.get_nowait() await self._run_func(cancellation_func, *args, **kwargs) - except Exception: - pass + except Exception as reset_exception: + self._logger.warning(f"Error while clearing entity queue during reset: {reset_exception}") # Clear pending work lists self._pending_activity_work.clear() self._pending_orchestration_work.clear() diff --git a/tests/durabletask/test_worker_concurrency_loop_async.py b/tests/durabletask/test_worker_concurrency_loop_async.py index 21f5c6d..c0f59c5 100644 --- a/tests/durabletask/test_worker_concurrency_loop_async.py +++ b/tests/durabletask/test_worker_concurrency_loop_async.py @@ -46,18 +46,18 @@ def test_worker_concurrency_loop_async(): grpc_worker = TaskHubGrpcWorker(concurrency_options=options) stub = DummyStub() - async def dummy_orchestrator(self, req, stub, completionToken): + async def dummy_orchestrator(req, stub, completionToken): await asyncio.sleep(0.1) stub.CompleteOrchestratorTask('ok') - async def cancel_dummy_orchestrator(self, req, stub, completionToken): + async def cancel_dummy_orchestrator(req, stub, completionToken): pass - async def dummy_activity(self, req, stub, completionToken): + async def dummy_activity(req, stub, completionToken): await asyncio.sleep(0.1) stub.CompleteActivityTask('ok') - async def cancel_dummy_activity(self, req, stub, completionToken): + async def cancel_dummy_activity(req, stub, completionToken): pass # Patch the worker's _execute_orchestrator and _execute_activity From 7d394aa1dc66bb249ef50c222a3fd7de4080417f Mon Sep 17 00:00:00 2001 From: Andy Staples Date: Thu, 6 Nov 2025 14:40:30 -0700 Subject: [PATCH 8/8] Fix tests --- tests/durabletask/test_worker_concurrency_loop_async.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/durabletask/test_worker_concurrency_loop_async.py b/tests/durabletask/test_worker_concurrency_loop_async.py index c0f59c5..8482c20 100644 --- a/tests/durabletask/test_worker_concurrency_loop_async.py +++ b/tests/durabletask/test_worker_concurrency_loop_async.py @@ -73,6 +73,11 @@ async def run_test(): # Clear stub state before each run stub.completed.clear() worker_task = asyncio.create_task(grpc_worker._async_worker_manager.run()) + # Need to yield to that thread in order to let it start up on the second run + startup_attempts = 0 + while grpc_worker._async_worker_manager._shutdown and startup_attempts < 10: + await asyncio.sleep(0.1) + startup_attempts += 1 for req in orchestrator_requests: grpc_worker._async_worker_manager.submit_orchestration(dummy_orchestrator, cancel_dummy_orchestrator, req, stub, DummyCompletionToken()) for req in activity_requests: