diff --git a/docs/features.md b/docs/features.md index daa727e..0ccac74 100644 --- a/docs/features.md +++ b/docs/features.md @@ -148,4 +148,21 @@ Orchestrations can be suspended using the `suspend_orchestration` client API and ### Retry policies -Orchestrations can specify retry policies for activities and sub-orchestrations. These policies control how many times and how frequently an activity or sub-orchestration will be retried in the event of a transient error. \ No newline at end of file +Orchestrations can specify retry policies for activities and sub-orchestrations. These policies control how many times and how frequently an activity or sub-orchestration will be retried in the event of a transient error. + +### Logging configuration + +Both the TaskHubGrpcWorker and TaskHubGrpcClient (as well as DurableTaskSchedulerWorker and DurableTaskSchedulerClient for durabletask-azuremanaged) accept a log_handler and log_formatter object from `logging`. These can be used to customize verbosity, output location, and format of logs emitted by these sources. + +For example, to output logs to a file called `worker.log` at level `DEBUG`, the following syntax might apply: + +```python +log_handler = logging.FileHandler('durable.log', encoding='utf-8') +log_handler.setLevel(logging.DEBUG) + +with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=secure_channel, + taskhub=taskhub_name, token_credential=credential, log_handler=log_handler) as w: +``` + +**NOTE** +The worker and client output many logs at the `DEBUG` level that will be useful when understanding orchestration flow and diagnosing issues with Durable applications. Before submitting issues, please attempt a repro of the issue with debug logging enabled. diff --git a/durabletask-azuremanaged/durabletask/azuremanaged/client.py b/durabletask-azuremanaged/durabletask/azuremanaged/client.py index ffc0a7e..50612e0 100644 --- a/durabletask-azuremanaged/durabletask/azuremanaged/client.py +++ b/durabletask-azuremanaged/durabletask/azuremanaged/client.py @@ -1,6 +1,8 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. +import logging + from typing import Optional from azure.core.credentials import TokenCredential @@ -18,7 +20,9 @@ def __init__(self, *, taskhub: str, token_credential: Optional[TokenCredential], secure_channel: bool = True, - default_version: Optional[str] = None): + default_version: Optional[str] = None, + log_handler: Optional[logging.Handler] = None, + log_formatter: Optional[logging.Formatter] = None): if not taskhub: raise ValueError("Taskhub value cannot be empty. Please provide a value for your taskhub") @@ -31,5 +35,7 @@ def __init__(self, *, host_address=host_address, secure_channel=secure_channel, metadata=None, + log_handler=log_handler, + log_formatter=log_formatter, interceptors=interceptors, default_version=default_version) diff --git a/durabletask-azuremanaged/durabletask/azuremanaged/worker.py b/durabletask-azuremanaged/durabletask/azuremanaged/worker.py index 1135ae7..4816223 100644 --- a/durabletask-azuremanaged/durabletask/azuremanaged/worker.py +++ b/durabletask-azuremanaged/durabletask/azuremanaged/worker.py @@ -1,6 +1,8 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. +import logging + from typing import Optional from azure.core.credentials import TokenCredential @@ -28,6 +30,8 @@ class DurableTaskSchedulerWorker(TaskHubGrpcWorker): concurrency_options (Optional[ConcurrencyOptions], optional): Configuration for controlling worker concurrency limits. If None, default concurrency settings will be used. + log_handler (Optional[logging.Handler], optional): Custom logging handler for worker logs. + log_formatter (Optional[logging.Formatter], optional): Custom log formatter for worker logs. Raises: ValueError: If taskhub is empty or None. @@ -52,12 +56,15 @@ class DurableTaskSchedulerWorker(TaskHubGrpcWorker): parameter is set to None since authentication is handled by the DTS interceptor. """ + def __init__(self, *, host_address: str, taskhub: str, token_credential: Optional[TokenCredential], secure_channel: bool = True, - concurrency_options: Optional[ConcurrencyOptions] = None): + concurrency_options: Optional[ConcurrencyOptions] = None, + log_handler: Optional[logging.Handler] = None, + log_formatter: Optional[logging.Formatter] = None): if not taskhub: raise ValueError("The taskhub value cannot be empty.") @@ -70,5 +77,7 @@ def __init__(self, *, host_address=host_address, secure_channel=secure_channel, metadata=None, + log_handler=log_handler, + log_formatter=log_formatter, interceptors=interceptors, concurrency_options=concurrency_options) diff --git a/durabletask/worker.py b/durabletask/worker.py index a244927..681e2df 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -246,7 +246,7 @@ class TaskHubGrpcWorker: Defaults to the value from environment variables or localhost. metadata (Optional[list[tuple[str, str]]], optional): gRPC metadata to include with requests. Used for authentication and routing. Defaults to None. - log_handler (optional): Custom logging handler for worker logs. Defaults to None. + log_handler (optional[logging.Handler]): Custom logging handler for worker logs. Defaults to None. log_formatter (Optional[logging.Formatter], optional): Custom log formatter. Defaults to None. secure_channel (bool, optional): Whether to use a secure gRPC channel (TLS). @@ -314,7 +314,7 @@ def __init__( *, host_address: Optional[str] = None, metadata: Optional[list[tuple[str, str]]] = None, - log_handler=None, + log_handler: Optional[logging.Handler] = None, log_formatter: Optional[logging.Formatter] = None, secure_channel: bool = False, interceptors: Optional[Sequence[shared.ClientInterceptor]] = None, @@ -1236,13 +1236,21 @@ def execute( old_events: Sequence[pb.HistoryEvent], new_events: Sequence[pb.HistoryEvent], ) -> ExecutionResults: + orchestration_name = "" + orchestration_started_events = [e for e in old_events if e.HasField("executionStarted")] + if len(orchestration_started_events) >= 1: + orchestration_name = orchestration_started_events[0].executionStarted.name + + self._logger.debug( + f"{instance_id}: Beginning replay for orchestrator {orchestration_name}..." + ) + self._entity_state = OrchestrationEntityContext(instance_id) if not new_events: raise task.OrchestrationStateError( "The new history event list must have at least one event in it." ) - ctx = _RuntimeOrchestrationContext(instance_id, self._registry, self._entity_state) try: # Rebuild local state by replaying old history into the orchestrator function @@ -1274,13 +1282,15 @@ def execute( except Exception as ex: # Unhandled exceptions fail the orchestration + self._logger.debug(f"{instance_id}: Orchestration {orchestration_name} failed") ctx.set_failed(ex) if not ctx._is_complete: task_count = len(ctx._pending_tasks) event_count = len(ctx._pending_events) self._logger.info( - f"{instance_id}: Orchestrator yielded with {task_count} task(s) and {event_count} event(s) outstanding." + f"{instance_id}: Orchestrator {orchestration_name} yielded with {task_count} task(s) " + f"and {event_count} event(s) outstanding." ) elif ( ctx._completion_status and ctx._completion_status is not pb.ORCHESTRATION_STATUS_CONTINUED_AS_NEW @@ -1289,7 +1299,7 @@ def execute( ctx._completion_status ) self._logger.info( - f"{instance_id}: Orchestration completed with status: {completion_status_str}" + f"{instance_id}: Orchestration {orchestration_name} completed with status: {completion_status_str}" ) actions = ctx.get_actions()