Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion docs/features.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,21 @@ Orchestrations can be suspended using the `suspend_orchestration` client API and

### Retry policies

Orchestrations can specify retry policies for activities and sub-orchestrations. These policies control how many times and how frequently an activity or sub-orchestration will be retried in the event of a transient error.
Orchestrations can specify retry policies for activities and sub-orchestrations. These policies control how many times and how frequently an activity or sub-orchestration will be retried in the event of a transient error.

### Logging configuration

Both the TaskHubGrpcWorker and TaskHubGrpcClient (as well as DurableTaskSchedulerWorker and DurableTaskSchedulerClient for durabletask-azuremanaged) accept a log_handler and log_formatter object from `logging`. These can be used to customize verbosity, output location, and format of logs emitted by these sources.

For example, to output logs to a file called `worker.log` at level `DEBUG`, the following syntax might apply:

```python
log_handler = logging.FileHandler('durable.log', encoding='utf-8')
log_handler.setLevel(logging.DEBUG)

with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=secure_channel,
taskhub=taskhub_name, token_credential=credential, log_handler=log_handler) as w:
```

**NOTE**
The worker and client output many logs at the `DEBUG` level that will be useful when understanding orchestration flow and diagnosing issues with Durable applications. Before submitting issues, please attempt a repro of the issue with debug logging enabled.
8 changes: 7 additions & 1 deletion durabletask-azuremanaged/durabletask/azuremanaged/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

import logging

from typing import Optional

from azure.core.credentials import TokenCredential
Expand All @@ -18,7 +20,9 @@ def __init__(self, *,
taskhub: str,
token_credential: Optional[TokenCredential],
secure_channel: bool = True,
default_version: Optional[str] = None):
default_version: Optional[str] = None,
log_handler: Optional[logging.Handler] = None,
log_formatter: Optional[logging.Formatter] = None):

if not taskhub:
raise ValueError("Taskhub value cannot be empty. Please provide a value for your taskhub")
Expand All @@ -31,5 +35,7 @@ def __init__(self, *,
host_address=host_address,
secure_channel=secure_channel,
metadata=None,
log_handler=log_handler,
log_formatter=log_formatter,
interceptors=interceptors,
default_version=default_version)
11 changes: 10 additions & 1 deletion durabletask-azuremanaged/durabletask/azuremanaged/worker.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

import logging

from typing import Optional

from azure.core.credentials import TokenCredential
Expand Down Expand Up @@ -28,6 +30,8 @@ class DurableTaskSchedulerWorker(TaskHubGrpcWorker):
concurrency_options (Optional[ConcurrencyOptions], optional): Configuration
for controlling worker concurrency limits. If None, default concurrency
settings will be used.
log_handler (Optional[logging.Handler], optional): Custom logging handler for worker logs.
log_formatter (Optional[logging.Formatter], optional): Custom log formatter for worker logs.

Raises:
ValueError: If taskhub is empty or None.
Expand All @@ -52,12 +56,15 @@ class DurableTaskSchedulerWorker(TaskHubGrpcWorker):
parameter is set to None since authentication is handled by the
DTS interceptor.
"""

def __init__(self, *,
host_address: str,
taskhub: str,
token_credential: Optional[TokenCredential],
secure_channel: bool = True,
concurrency_options: Optional[ConcurrencyOptions] = None):
concurrency_options: Optional[ConcurrencyOptions] = None,
log_handler: Optional[logging.Handler] = None,
log_formatter: Optional[logging.Formatter] = None):

if not taskhub:
raise ValueError("The taskhub value cannot be empty.")
Expand All @@ -70,5 +77,7 @@ def __init__(self, *,
host_address=host_address,
secure_channel=secure_channel,
metadata=None,
log_handler=log_handler,
log_formatter=log_formatter,
interceptors=interceptors,
concurrency_options=concurrency_options)
20 changes: 15 additions & 5 deletions durabletask/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ class TaskHubGrpcWorker:
Defaults to the value from environment variables or localhost.
metadata (Optional[list[tuple[str, str]]], optional): gRPC metadata to include with
requests. Used for authentication and routing. Defaults to None.
log_handler (optional): Custom logging handler for worker logs. Defaults to None.
log_handler (optional[logging.Handler]): Custom logging handler for worker logs. Defaults to None.
log_formatter (Optional[logging.Formatter], optional): Custom log formatter.
Defaults to None.
secure_channel (bool, optional): Whether to use a secure gRPC channel (TLS).
Expand Down Expand Up @@ -314,7 +314,7 @@ def __init__(
*,
host_address: Optional[str] = None,
metadata: Optional[list[tuple[str, str]]] = None,
log_handler=None,
log_handler: Optional[logging.Handler] = None,
log_formatter: Optional[logging.Formatter] = None,
secure_channel: bool = False,
interceptors: Optional[Sequence[shared.ClientInterceptor]] = None,
Expand Down Expand Up @@ -1236,13 +1236,21 @@ def execute(
old_events: Sequence[pb.HistoryEvent],
new_events: Sequence[pb.HistoryEvent],
) -> ExecutionResults:
orchestration_name = "<unknown>"
orchestration_started_events = [e for e in old_events if e.HasField("executionStarted")]
if len(orchestration_started_events) >= 1:
orchestration_name = orchestration_started_events[0].executionStarted.name
Comment on lines +1239 to +1242
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is QoL, not sure if it is worth the time to include, given it involves another iteration over old_events.
In durabletask-dotnet, they use this logic
https://github.com/Azure/durabletask/blob/8576bc5777119c673857b87c1e85992a1111d437/src/DurableTask.Core/OrchestrationRuntimeState.cs#L148


self._logger.debug(
f"{instance_id}: Beginning replay for orchestrator {orchestration_name}..."
)

self._entity_state = OrchestrationEntityContext(instance_id)

if not new_events:
raise task.OrchestrationStateError(
"The new history event list must have at least one event in it."
)

ctx = _RuntimeOrchestrationContext(instance_id, self._registry, self._entity_state)
try:
# Rebuild local state by replaying old history into the orchestrator function
Expand Down Expand Up @@ -1274,13 +1282,15 @@ def execute(

except Exception as ex:
# Unhandled exceptions fail the orchestration
self._logger.debug(f"{instance_id}: Orchestration {orchestration_name} failed")
ctx.set_failed(ex)

if not ctx._is_complete:
task_count = len(ctx._pending_tasks)
event_count = len(ctx._pending_events)
self._logger.info(
f"{instance_id}: Orchestrator yielded with {task_count} task(s) and {event_count} event(s) outstanding."
f"{instance_id}: Orchestrator {orchestration_name} yielded with {task_count} task(s) "
f"and {event_count} event(s) outstanding."
)
elif (
ctx._completion_status and ctx._completion_status is not pb.ORCHESTRATION_STATUS_CONTINUED_AS_NEW
Expand All @@ -1289,7 +1299,7 @@ def execute(
ctx._completion_status
)
self._logger.info(
f"{instance_id}: Orchestration completed with status: {completion_status_str}"
f"{instance_id}: Orchestration {orchestration_name} completed with status: {completion_status_str}"
)

actions = ctx.get_actions()
Expand Down
Loading