From cd9e47e345718eb3c2bd2ccf09bf8793ec4abaed Mon Sep 17 00:00:00 2001 From: Andy Staples Date: Thu, 6 Nov 2025 10:37:43 -0700 Subject: [PATCH 1/5] Add/increase execution logging --- durabletask/worker.py | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/durabletask/worker.py b/durabletask/worker.py index 09f6559..215e9f7 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -1233,13 +1233,21 @@ def execute( old_events: Sequence[pb.HistoryEvent], new_events: Sequence[pb.HistoryEvent], ) -> ExecutionResults: + orchestration_name = "" + orchestration_started_events = [e for e in old_events if e.HasField("executionStarted")] + if len(orchestration_started_events) > 1: + orchestration_name = orchestration_started_events[0].executionStarted.name + + self._logger.info( + f"{instance_id}: Beginning replay for orchestrator {orchestration_name}..." + ) + self._entity_state = OrchestrationEntityContext(instance_id) if not new_events: raise task.OrchestrationStateError( "The new history event list must have at least one event in it." ) - ctx = _RuntimeOrchestrationContext(instance_id, self._registry, self._entity_state) try: # Rebuild local state by replaying old history into the orchestrator function @@ -1271,13 +1279,15 @@ def execute( except Exception as ex: # Unhandled exceptions fail the orchestration + self._logger.info(f"{instance_id}: Orchestration {orchestration_name} failed") ctx.set_failed(ex) if not ctx._is_complete: task_count = len(ctx._pending_tasks) event_count = len(ctx._pending_events) self._logger.info( - f"{instance_id}: Orchestrator yielded with {task_count} task(s) and {event_count} event(s) outstanding." + f"{instance_id}: Orchestrator {orchestration_name} yielded with {task_count} task(s) " + f"and {event_count} event(s) outstanding." ) elif ( ctx._completion_status and ctx._completion_status is not pb.ORCHESTRATION_STATUS_CONTINUED_AS_NEW @@ -1286,7 +1296,7 @@ def execute( ctx._completion_status ) self._logger.info( - f"{instance_id}: Orchestration completed with status: {completion_status_str}" + f"{instance_id}: Orchestration {orchestration_name} completed with status: {completion_status_str}" ) actions = ctx.get_actions() @@ -1754,7 +1764,7 @@ def execute( encoded_input: Optional[str], ) -> Optional[str]: """Executes an activity function and returns the serialized result, if any.""" - self._logger.debug( + self._logger.info( f"{orchestration_id}/{task_id}: Executing activity '{name}'..." ) fn = self._registry.get_activity(name) @@ -1773,7 +1783,7 @@ def execute( shared.to_json(activity_output) if activity_output is not None else None ) chars = len(encoded_output) if encoded_output else 0 - self._logger.debug( + self._logger.info( f"{orchestration_id}/{task_id}: Activity '{name}' completed successfully with {chars} char(s) of encoded output." ) return encoded_output @@ -1793,7 +1803,7 @@ def execute( encoded_input: Optional[str], ) -> Optional[str]: """Executes an entity function and returns the serialized result, if any.""" - self._logger.debug( + self._logger.info( f"{orchestration_id}: Executing entity '{entity_id}'..." ) fn = self._registry.get_entity(entity_id.entity) @@ -1827,7 +1837,7 @@ def execute( shared.to_json(entity_output) if entity_output is not None else None ) chars = len(encoded_output) if encoded_output else 0 - self._logger.debug( + self._logger.info( f"{orchestration_id}: Entity '{entity_id}' completed successfully with {chars} char(s) of encoded output." ) return encoded_output From 09bfe58401151496ca136aa318c86531ff573db3 Mon Sep 17 00:00:00 2001 From: andystaples <77818326+andystaples@users.noreply.github.com> Date: Thu, 6 Nov 2025 10:41:59 -0700 Subject: [PATCH 2/5] Update durabletask/worker.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- durabletask/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/durabletask/worker.py b/durabletask/worker.py index 215e9f7..ce10909 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -1235,7 +1235,7 @@ def execute( ) -> ExecutionResults: orchestration_name = "" orchestration_started_events = [e for e in old_events if e.HasField("executionStarted")] - if len(orchestration_started_events) > 1: + if len(orchestration_started_events) >= 1: orchestration_name = orchestration_started_events[0].executionStarted.name self._logger.info( From 755bf41fabe4525d1f5ee507dd9af8f20fc267e3 Mon Sep 17 00:00:00 2001 From: Andy Staples Date: Thu, 6 Nov 2025 11:33:28 -0700 Subject: [PATCH 3/5] Changes - Set level to debug for execution logs - Allow logging config in durabletask-python - Update docs for logging --- docs/features.md | 19 ++++++++++++++++++- .../durabletask/azuremanaged/client.py | 8 +++++++- .../durabletask/azuremanaged/worker.py | 9 ++++++++- durabletask/worker.py | 14 +++++++------- 4 files changed, 40 insertions(+), 10 deletions(-) diff --git a/docs/features.md b/docs/features.md index daa727e..0ccac74 100644 --- a/docs/features.md +++ b/docs/features.md @@ -148,4 +148,21 @@ Orchestrations can be suspended using the `suspend_orchestration` client API and ### Retry policies -Orchestrations can specify retry policies for activities and sub-orchestrations. These policies control how many times and how frequently an activity or sub-orchestration will be retried in the event of a transient error. \ No newline at end of file +Orchestrations can specify retry policies for activities and sub-orchestrations. These policies control how many times and how frequently an activity or sub-orchestration will be retried in the event of a transient error. + +### Logging configuration + +Both the TaskHubGrpcWorker and TaskHubGrpcClient (as well as DurableTaskSchedulerWorker and DurableTaskSchedulerClient for durabletask-azuremanaged) accept a log_handler and log_formatter object from `logging`. These can be used to customize verbosity, output location, and format of logs emitted by these sources. + +For example, to output logs to a file called `worker.log` at level `DEBUG`, the following syntax might apply: + +```python +log_handler = logging.FileHandler('durable.log', encoding='utf-8') +log_handler.setLevel(logging.DEBUG) + +with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=secure_channel, + taskhub=taskhub_name, token_credential=credential, log_handler=log_handler) as w: +``` + +**NOTE** +The worker and client output many logs at the `DEBUG` level that will be useful when understanding orchestration flow and diagnosing issues with Durable applications. Before submitting issues, please attempt a repro of the issue with debug logging enabled. diff --git a/durabletask-azuremanaged/durabletask/azuremanaged/client.py b/durabletask-azuremanaged/durabletask/azuremanaged/client.py index ffc0a7e..2ea2ebb 100644 --- a/durabletask-azuremanaged/durabletask/azuremanaged/client.py +++ b/durabletask-azuremanaged/durabletask/azuremanaged/client.py @@ -1,6 +1,8 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. +import logging + from typing import Optional from azure.core.credentials import TokenCredential @@ -18,7 +20,9 @@ def __init__(self, *, taskhub: str, token_credential: Optional[TokenCredential], secure_channel: bool = True, - default_version: Optional[str] = None): + default_version: Optional[str] = None, + log_handler=None, + log_formatter: Optional[logging.Formatter] = None): if not taskhub: raise ValueError("Taskhub value cannot be empty. Please provide a value for your taskhub") @@ -31,5 +35,7 @@ def __init__(self, *, host_address=host_address, secure_channel=secure_channel, metadata=None, + log_handler=log_handler, + log_formatter=log_formatter, interceptors=interceptors, default_version=default_version) diff --git a/durabletask-azuremanaged/durabletask/azuremanaged/worker.py b/durabletask-azuremanaged/durabletask/azuremanaged/worker.py index 1135ae7..10b6335 100644 --- a/durabletask-azuremanaged/durabletask/azuremanaged/worker.py +++ b/durabletask-azuremanaged/durabletask/azuremanaged/worker.py @@ -1,6 +1,8 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. +import logging + from typing import Optional from azure.core.credentials import TokenCredential @@ -52,12 +54,15 @@ class DurableTaskSchedulerWorker(TaskHubGrpcWorker): parameter is set to None since authentication is handled by the DTS interceptor. """ + def __init__(self, *, host_address: str, taskhub: str, token_credential: Optional[TokenCredential], secure_channel: bool = True, - concurrency_options: Optional[ConcurrencyOptions] = None): + concurrency_options: Optional[ConcurrencyOptions] = None, + log_handler=None, + log_formatter: Optional[logging.Formatter] = None): if not taskhub: raise ValueError("The taskhub value cannot be empty.") @@ -70,5 +75,7 @@ def __init__(self, *, host_address=host_address, secure_channel=secure_channel, metadata=None, + log_handler=log_handler, + log_formatter=log_formatter, interceptors=interceptors, concurrency_options=concurrency_options) diff --git a/durabletask/worker.py b/durabletask/worker.py index ce10909..637d6fd 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -1238,7 +1238,7 @@ def execute( if len(orchestration_started_events) >= 1: orchestration_name = orchestration_started_events[0].executionStarted.name - self._logger.info( + self._logger.debug( f"{instance_id}: Beginning replay for orchestrator {orchestration_name}..." ) @@ -1279,7 +1279,7 @@ def execute( except Exception as ex: # Unhandled exceptions fail the orchestration - self._logger.info(f"{instance_id}: Orchestration {orchestration_name} failed") + self._logger.debug(f"{instance_id}: Orchestration {orchestration_name} failed") ctx.set_failed(ex) if not ctx._is_complete: @@ -1295,7 +1295,7 @@ def execute( completion_status_str = ph.get_orchestration_status_str( ctx._completion_status ) - self._logger.info( + self._logger.debug( f"{instance_id}: Orchestration {orchestration_name} completed with status: {completion_status_str}" ) @@ -1764,7 +1764,7 @@ def execute( encoded_input: Optional[str], ) -> Optional[str]: """Executes an activity function and returns the serialized result, if any.""" - self._logger.info( + self._logger.debug( f"{orchestration_id}/{task_id}: Executing activity '{name}'..." ) fn = self._registry.get_activity(name) @@ -1783,7 +1783,7 @@ def execute( shared.to_json(activity_output) if activity_output is not None else None ) chars = len(encoded_output) if encoded_output else 0 - self._logger.info( + self._logger.debug( f"{orchestration_id}/{task_id}: Activity '{name}' completed successfully with {chars} char(s) of encoded output." ) return encoded_output @@ -1803,7 +1803,7 @@ def execute( encoded_input: Optional[str], ) -> Optional[str]: """Executes an entity function and returns the serialized result, if any.""" - self._logger.info( + self._logger.debug( f"{orchestration_id}: Executing entity '{entity_id}'..." ) fn = self._registry.get_entity(entity_id.entity) @@ -1837,7 +1837,7 @@ def execute( shared.to_json(entity_output) if entity_output is not None else None ) chars = len(encoded_output) if encoded_output else 0 - self._logger.info( + self._logger.debug( f"{orchestration_id}: Entity '{entity_id}' completed successfully with {chars} char(s) of encoded output." ) return encoded_output From 124023696b10e8f9c718832da0068e4cbefc700c Mon Sep 17 00:00:00 2001 From: Andy Staples Date: Thu, 6 Nov 2025 11:34:48 -0700 Subject: [PATCH 4/5] Revert one more log line --- durabletask/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/durabletask/worker.py b/durabletask/worker.py index 637d6fd..8d299ba 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -1295,7 +1295,7 @@ def execute( completion_status_str = ph.get_orchestration_status_str( ctx._completion_status ) - self._logger.debug( + self._logger.info( f"{instance_id}: Orchestration {orchestration_name} completed with status: {completion_status_str}" ) From 7f44a0d697a9ca1a8a9a4a272107bb37f9e9ae67 Mon Sep 17 00:00:00 2001 From: Andy Staples Date: Mon, 10 Nov 2025 17:03:37 -0700 Subject: [PATCH 5/5] Type-hint log_handler --- durabletask-azuremanaged/durabletask/azuremanaged/client.py | 2 +- durabletask-azuremanaged/durabletask/azuremanaged/worker.py | 4 +++- durabletask/worker.py | 4 ++-- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/durabletask-azuremanaged/durabletask/azuremanaged/client.py b/durabletask-azuremanaged/durabletask/azuremanaged/client.py index 2ea2ebb..50612e0 100644 --- a/durabletask-azuremanaged/durabletask/azuremanaged/client.py +++ b/durabletask-azuremanaged/durabletask/azuremanaged/client.py @@ -21,7 +21,7 @@ def __init__(self, *, token_credential: Optional[TokenCredential], secure_channel: bool = True, default_version: Optional[str] = None, - log_handler=None, + log_handler: Optional[logging.Handler] = None, log_formatter: Optional[logging.Formatter] = None): if not taskhub: diff --git a/durabletask-azuremanaged/durabletask/azuremanaged/worker.py b/durabletask-azuremanaged/durabletask/azuremanaged/worker.py index 10b6335..4816223 100644 --- a/durabletask-azuremanaged/durabletask/azuremanaged/worker.py +++ b/durabletask-azuremanaged/durabletask/azuremanaged/worker.py @@ -30,6 +30,8 @@ class DurableTaskSchedulerWorker(TaskHubGrpcWorker): concurrency_options (Optional[ConcurrencyOptions], optional): Configuration for controlling worker concurrency limits. If None, default concurrency settings will be used. + log_handler (Optional[logging.Handler], optional): Custom logging handler for worker logs. + log_formatter (Optional[logging.Formatter], optional): Custom log formatter for worker logs. Raises: ValueError: If taskhub is empty or None. @@ -61,7 +63,7 @@ def __init__(self, *, token_credential: Optional[TokenCredential], secure_channel: bool = True, concurrency_options: Optional[ConcurrencyOptions] = None, - log_handler=None, + log_handler: Optional[logging.Handler] = None, log_formatter: Optional[logging.Formatter] = None): if not taskhub: diff --git a/durabletask/worker.py b/durabletask/worker.py index 8d299ba..a63d7e4 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -246,7 +246,7 @@ class TaskHubGrpcWorker: Defaults to the value from environment variables or localhost. metadata (Optional[list[tuple[str, str]]], optional): gRPC metadata to include with requests. Used for authentication and routing. Defaults to None. - log_handler (optional): Custom logging handler for worker logs. Defaults to None. + log_handler (optional[logging.Handler]): Custom logging handler for worker logs. Defaults to None. log_formatter (Optional[logging.Formatter], optional): Custom log formatter. Defaults to None. secure_channel (bool, optional): Whether to use a secure gRPC channel (TLS). @@ -314,7 +314,7 @@ def __init__( *, host_address: Optional[str] = None, metadata: Optional[list[tuple[str, str]]] = None, - log_handler=None, + log_handler: Optional[logging.Handler] = None, log_formatter: Optional[logging.Formatter] = None, secure_channel: bool = False, interceptors: Optional[Sequence[shared.ClientInterceptor]] = None,