Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .github/copilot-instructions.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,19 @@ building durable orchestrations. The repo contains two packages:
- Update `CHANGELOG.md` for core SDK changes and
`durabletask-azuremanaged/CHANGELOG.md` for provider changes.
- If a change affects both packages, update both changelogs.
- Include changelog entries for externally observable outcomes only, such as
new public APIs, behavior changes, bug fixes users can notice, breaking
changes, and new configuration capabilities.
- Do NOT document internal-only changes in changelogs, including CI/workflow
updates, test-only changes, refactors with no user-visible behavior change,
and implementation details that do not affect public behavior or API.
- When in doubt, write the changelog entry in terms of user impact (what users
can now do or what behavior changed), not implementation mechanism (how it
was implemented internally).

Examples:
- Include: "Added `get_orchestration_history()` to retrieve orchestration history from the client."
- Exclude: "Added internal helper functions to aggregate streamed history chunks."

## Language and Style

Expand Down
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

ADDED

- Added `get_orchestration_history()` and `list_instance_ids()` to the sync and async gRPC clients.
- Added in-memory backend support for `StreamInstanceHistory` and `ListInstanceIds` so local orchestration tests can retrieve history and page terminal instance IDs by completion window.

## v1.4.0

ADDED
Expand Down
87 changes: 86 additions & 1 deletion durabletask/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import Any, List, Optional, Sequence, TypeVar, Union
from typing import Any, Generic, List, Optional, Sequence, TypeVar, Union

import grpc
import grpc.aio

import durabletask.history as history
from durabletask.entities import EntityInstanceId
from durabletask.entities.entity_metadata import EntityMetadata
import durabletask.internal.helpers as helpers
import durabletask.internal.history_helpers as history_helpers
import durabletask.internal.orchestrator_service_pb2 as pb
import durabletask.internal.orchestrator_service_pb2_grpc as stubs
import durabletask.internal.shared as shared
Expand All @@ -37,6 +39,7 @@

TInput = TypeVar('TInput')
TOutput = TypeVar('TOutput')
TItem = TypeVar('TItem')


class OrchestrationStatus(Enum):
Expand Down Expand Up @@ -99,6 +102,12 @@ class PurgeInstancesResult:
is_complete: bool


@dataclass
class Page(Generic[TItem]):
items: List[TItem]
continuation_token: Optional[str]


@dataclass
class CleanEntityStorageResult:
empty_entities_removed: int
Expand Down Expand Up @@ -218,6 +227,44 @@ def get_orchestration_state(self, instance_id: str, *, fetch_payloads: bool = Tr
payload_helpers.deexternalize_payloads(res, self._payload_store)
return new_orchestration_state(req.instanceId, res)

def get_orchestration_history(self,
instance_id: str, *,
execution_id: Optional[str] = None,
for_work_item_processing: bool = False) -> List[history.HistoryEvent]:
req = pb.StreamInstanceHistoryRequest(
instanceId=instance_id,
executionId=helpers.get_string_value(execution_id),
forWorkItemProcessing=for_work_item_processing,
)
self._logger.info(f"Retrieving history for instance '{instance_id}'.")
stream = self._stub.StreamInstanceHistory(req)
return history_helpers.collect_history_events(stream, self._payload_store)

def list_instance_ids(self,
runtime_status: Optional[List[OrchestrationStatus]] = None,
completed_time_from: Optional[datetime] = None,
completed_time_to: Optional[datetime] = None,
page_size: Optional[int] = None,
continuation_token: Optional[str] = None) -> Page[str]:
req = pb.ListInstanceIdsRequest(
runtimeStatus=[status.value for status in runtime_status] if runtime_status else [],
completedTimeFrom=helpers.new_timestamp(completed_time_from) if completed_time_from else None,
completedTimeTo=helpers.new_timestamp(completed_time_to) if completed_time_to else None,
pageSize=page_size or 0,
lastInstanceKey=helpers.get_string_value(continuation_token),
)
Comment thread
andystaples marked this conversation as resolved.
self._logger.info(
"Listing terminal instance IDs with filters: "
f"runtime_status={[str(status) for status in runtime_status] if runtime_status else None}, "
f"completed_time_from={completed_time_from}, "
f"completed_time_to={completed_time_to}, "
f"page_size={page_size}, "
f"continuation_token={continuation_token}"
)
resp: pb.ListInstanceIdsResponse = self._stub.ListInstanceIds(req)
next_token = resp.lastInstanceKey.value if resp.HasField("lastInstanceKey") else None
return Page(items=list(resp.instanceIds), continuation_token=next_token)

def get_all_orchestration_states(self,
orchestration_query: Optional[OrchestrationQuery] = None
) -> List[OrchestrationState]:
Expand Down Expand Up @@ -502,6 +549,44 @@ async def get_orchestration_state(self, instance_id: str, *,
await payload_helpers.deexternalize_payloads_async(res, self._payload_store)
return new_orchestration_state(req.instanceId, res)

async def get_orchestration_history(self,
instance_id: str, *,
execution_id: Optional[str] = None,
for_work_item_processing: bool = False) -> List[history.HistoryEvent]:
req = pb.StreamInstanceHistoryRequest(
instanceId=instance_id,
executionId=helpers.get_string_value(execution_id),
forWorkItemProcessing=for_work_item_processing,
)
self._logger.info(f"Retrieving history for instance '{instance_id}'.")
stream = self._stub.StreamInstanceHistory(req)
return await history_helpers.collect_history_events_async(stream, self._payload_store)

async def list_instance_ids(self,
runtime_status: Optional[List[OrchestrationStatus]] = None,
completed_time_from: Optional[datetime] = None,
completed_time_to: Optional[datetime] = None,
page_size: Optional[int] = None,
continuation_token: Optional[str] = None) -> Page[str]:
req = pb.ListInstanceIdsRequest(
runtimeStatus=[status.value for status in runtime_status] if runtime_status else [],
completedTimeFrom=helpers.new_timestamp(completed_time_from) if completed_time_from else None,
completedTimeTo=helpers.new_timestamp(completed_time_to) if completed_time_to else None,
pageSize=page_size or 0,
lastInstanceKey=helpers.get_string_value(continuation_token),
)
self._logger.info(
"Listing terminal instance IDs with filters: "
f"runtime_status={[str(status) for status in runtime_status] if runtime_status else None}, "
f"completed_time_from={completed_time_from}, "
f"completed_time_to={completed_time_to}, "
f"page_size={page_size}, "
f"continuation_token={continuation_token}"
)
resp: pb.ListInstanceIdsResponse = await self._stub.ListInstanceIds(req)
next_token = resp.lastInstanceKey.value if resp.HasField("lastInstanceKey") else None
return Page(items=list(resp.instanceIds), continuation_token=next_token)

async def get_all_orchestration_states(self,
orchestration_query: Optional[OrchestrationQuery] = None
) -> List[OrchestrationState]:
Expand Down
Loading
Loading