Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions durabletask/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ def schedule_new_orchestration(self, orchestrator: Union[task.Orchestrator[TInpu
input: Optional[TInput] = None,
instance_id: Optional[str] = None,
start_at: Optional[datetime] = None,
tags: Optional[dict[str, str]] = None,
reuse_id_policy: Optional[pb.OrchestrationIdReusePolicy] = None) -> str:

name = orchestrator if isinstance(orchestrator, str) else task.get_name(orchestrator)
Expand All @@ -134,6 +135,7 @@ def schedule_new_orchestration(self, orchestrator: Union[task.Orchestrator[TInpu
scheduledStartTimestamp=helpers.new_timestamp(start_at) if start_at else None,
version=wrappers_pb2.StringValue(value=""),
orchestrationIdReusePolicy=reuse_id_policy,
tags=tags,
)

self._logger.info(f"Starting new '{name}' instance with ID = '{req.instanceId}'.")
Expand Down
12 changes: 8 additions & 4 deletions durabletask/internal/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ def new_orchestrator_started_event(timestamp: Optional[datetime] = None) -> pb.H
return pb.HistoryEvent(eventId=-1, timestamp=ts, orchestratorStarted=pb.OrchestratorStartedEvent())


def new_execution_started_event(name: str, instance_id: str, encoded_input: Optional[str] = None) -> pb.HistoryEvent:
def new_execution_started_event(name: str, instance_id: str, encoded_input: Optional[str] = None,
tags: Optional[dict[str, str]] = None) -> pb.HistoryEvent:
return pb.HistoryEvent(
eventId=-1,
timestamp=timestamp_pb2.Timestamp(),
executionStarted=pb.ExecutionStartedEvent(
name=name,
input=get_string_value(encoded_input),
orchestrationInstance=pb.OrchestrationInstance(instanceId=instance_id)))
orchestrationInstance=pb.OrchestrationInstance(instanceId=instance_id),
tags=tags))


def new_timer_created_event(timer_id: int, fire_at: datetime) -> pb.HistoryEvent:
Expand Down Expand Up @@ -178,10 +180,12 @@ def new_create_timer_action(id: int, fire_at: datetime) -> pb.OrchestratorAction
return pb.OrchestratorAction(id=id, createTimer=pb.CreateTimerAction(fireAt=timestamp))


def new_schedule_task_action(id: int, name: str, encoded_input: Optional[str]) -> pb.OrchestratorAction:
def new_schedule_task_action(id: int, name: str, encoded_input: Optional[str],
tags: Optional[dict[str, str]]) -> pb.OrchestratorAction:
return pb.OrchestratorAction(id=id, scheduleTask=pb.ScheduleTaskAction(
name=name,
input=get_string_value(encoded_input)
input=get_string_value(encoded_input),
tags=tags
))


Expand Down
5 changes: 4 additions & 1 deletion durabletask/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ def create_timer(self, fire_at: Union[datetime, timedelta]) -> Task:
@abstractmethod
def call_activity(self, activity: Union[Activity[TInput, TOutput], str], *,
input: Optional[TInput] = None,
retry_policy: Optional[RetryPolicy] = None) -> Task[TOutput]:
retry_policy: Optional[RetryPolicy] = None,
tags: Optional[dict[str, str]] = None) -> Task[TOutput]:
"""Schedule an activity for execution.

Parameters
Expand All @@ -111,6 +112,8 @@ def call_activity(self, activity: Union[Activity[TInput, TOutput], str], *,
The JSON-serializable input (or None) to pass to the activity.
retry_policy: Optional[RetryPolicy]
The retry policy to use for this activity call.
tags: Optional[dict[str, str]]
Optional tags to associate with the activity invocation.

Returns
-------
Expand Down
6 changes: 4 additions & 2 deletions durabletask/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -752,11 +752,12 @@ def call_activity(
*,
input: Optional[TInput] = None,
retry_policy: Optional[task.RetryPolicy] = None,
tags: Optional[dict[str, str]] = None,
) -> task.Task[TOutput]:
id = self.next_sequence_number()

self.call_activity_function_helper(
id, activity, input=input, retry_policy=retry_policy, is_sub_orch=False
id, activity, input=input, retry_policy=retry_policy, is_sub_orch=False, tags=tags
)
return self._pending_tasks.get(id, task.CompletableTask())

Expand Down Expand Up @@ -787,6 +788,7 @@ def call_activity_function_helper(
*,
input: Optional[TInput] = None,
retry_policy: Optional[task.RetryPolicy] = None,
tags: Optional[dict[str, str]] = None,
is_sub_orch: bool = False,
instance_id: Optional[str] = None,
fn_task: Optional[task.CompletableTask[TOutput]] = None,
Expand All @@ -806,7 +808,7 @@ def call_activity_function_helper(
if isinstance(activity_function, str)
else task.get_name(activity_function)
)
action = ph.new_schedule_task_action(id, name, encoded_input)
action = ph.new_schedule_task_action(id, name, encoded_input, tags)
else:
if instance_id is None:
# Create a deteministic instance ID based on the parent instance ID
Expand Down
6 changes: 3 additions & 3 deletions tests/durabletask/test_orchestration_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def empty_orchestrator(ctx: task.OrchestrationContext, _):
w.start()

c = client.TaskHubGrpcClient()
id = c.schedule_new_orchestration(empty_orchestrator)
id = c.schedule_new_orchestration(empty_orchestrator, tags={'Tagged': 'true'})
state = c.wait_for_orchestration_completion(id, timeout=30)

assert invoked
Expand All @@ -52,7 +52,7 @@ def sequence(ctx: task.OrchestrationContext, start_val: int):
numbers = [start_val]
current = start_val
for _ in range(10):
current = yield ctx.call_activity(plus_one, input=current)
current = yield ctx.call_activity(plus_one, input=current, tags={'Activity': 'PlusOne'})
numbers.append(current)
return numbers

Expand All @@ -63,7 +63,7 @@ def sequence(ctx: task.OrchestrationContext, start_val: int):
w.start()

task_hub_client = client.TaskHubGrpcClient()
id = task_hub_client.schedule_new_orchestration(sequence, input=1)
id = task_hub_client.schedule_new_orchestration(sequence, input=1, tags={'Orchestration': 'Sequence'})
state = task_hub_client.wait_for_orchestration_completion(
id, timeout=30)

Expand Down
Loading