Skip to content

Commit 82107f9

Browse files
authored
Add tags for orchestrations and activities (#56)
1 parent a6d186d commit 82107f9

File tree

5 files changed

+21
-10
lines changed

5 files changed

+21
-10
lines changed

durabletask/client.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ def schedule_new_orchestration(self, orchestrator: Union[task.Orchestrator[TInpu
123123
input: Optional[TInput] = None,
124124
instance_id: Optional[str] = None,
125125
start_at: Optional[datetime] = None,
126+
tags: Optional[dict[str, str]] = None,
126127
reuse_id_policy: Optional[pb.OrchestrationIdReusePolicy] = None) -> str:
127128

128129
name = orchestrator if isinstance(orchestrator, str) else task.get_name(orchestrator)
@@ -134,6 +135,7 @@ def schedule_new_orchestration(self, orchestrator: Union[task.Orchestrator[TInpu
134135
scheduledStartTimestamp=helpers.new_timestamp(start_at) if start_at else None,
135136
version=wrappers_pb2.StringValue(value=""),
136137
orchestrationIdReusePolicy=reuse_id_policy,
138+
tags=tags,
137139
)
138140

139141
self._logger.info(f"Starting new '{name}' instance with ID = '{req.instanceId}'.")

durabletask/internal/helpers.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,16 @@ def new_orchestrator_started_event(timestamp: Optional[datetime] = None) -> pb.H
1919
return pb.HistoryEvent(eventId=-1, timestamp=ts, orchestratorStarted=pb.OrchestratorStartedEvent())
2020

2121

22-
def new_execution_started_event(name: str, instance_id: str, encoded_input: Optional[str] = None) -> pb.HistoryEvent:
22+
def new_execution_started_event(name: str, instance_id: str, encoded_input: Optional[str] = None,
23+
tags: Optional[dict[str, str]] = None) -> pb.HistoryEvent:
2324
return pb.HistoryEvent(
2425
eventId=-1,
2526
timestamp=timestamp_pb2.Timestamp(),
2627
executionStarted=pb.ExecutionStartedEvent(
2728
name=name,
2829
input=get_string_value(encoded_input),
29-
orchestrationInstance=pb.OrchestrationInstance(instanceId=instance_id)))
30+
orchestrationInstance=pb.OrchestrationInstance(instanceId=instance_id),
31+
tags=tags))
3032

3133

3234
def new_timer_created_event(timer_id: int, fire_at: datetime) -> pb.HistoryEvent:
@@ -178,10 +180,12 @@ def new_create_timer_action(id: int, fire_at: datetime) -> pb.OrchestratorAction
178180
return pb.OrchestratorAction(id=id, createTimer=pb.CreateTimerAction(fireAt=timestamp))
179181

180182

181-
def new_schedule_task_action(id: int, name: str, encoded_input: Optional[str]) -> pb.OrchestratorAction:
183+
def new_schedule_task_action(id: int, name: str, encoded_input: Optional[str],
184+
tags: Optional[dict[str, str]]) -> pb.OrchestratorAction:
182185
return pb.OrchestratorAction(id=id, scheduleTask=pb.ScheduleTaskAction(
183186
name=name,
184-
input=get_string_value(encoded_input)
187+
input=get_string_value(encoded_input),
188+
tags=tags
185189
))
186190

187191

durabletask/task.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,8 @@ def create_timer(self, fire_at: Union[datetime, timedelta]) -> Task:
100100
@abstractmethod
101101
def call_activity(self, activity: Union[Activity[TInput, TOutput], str], *,
102102
input: Optional[TInput] = None,
103-
retry_policy: Optional[RetryPolicy] = None) -> Task[TOutput]:
103+
retry_policy: Optional[RetryPolicy] = None,
104+
tags: Optional[dict[str, str]] = None) -> Task[TOutput]:
104105
"""Schedule an activity for execution.
105106
106107
Parameters
@@ -111,6 +112,8 @@ def call_activity(self, activity: Union[Activity[TInput, TOutput], str], *,
111112
The JSON-serializable input (or None) to pass to the activity.
112113
retry_policy: Optional[RetryPolicy]
113114
The retry policy to use for this activity call.
115+
tags: Optional[dict[str, str]]
116+
Optional tags to associate with the activity invocation.
114117
115118
Returns
116119
-------

durabletask/worker.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -752,11 +752,12 @@ def call_activity(
752752
*,
753753
input: Optional[TInput] = None,
754754
retry_policy: Optional[task.RetryPolicy] = None,
755+
tags: Optional[dict[str, str]] = None,
755756
) -> task.Task[TOutput]:
756757
id = self.next_sequence_number()
757758

758759
self.call_activity_function_helper(
759-
id, activity, input=input, retry_policy=retry_policy, is_sub_orch=False
760+
id, activity, input=input, retry_policy=retry_policy, is_sub_orch=False, tags=tags
760761
)
761762
return self._pending_tasks.get(id, task.CompletableTask())
762763

@@ -787,6 +788,7 @@ def call_activity_function_helper(
787788
*,
788789
input: Optional[TInput] = None,
789790
retry_policy: Optional[task.RetryPolicy] = None,
791+
tags: Optional[dict[str, str]] = None,
790792
is_sub_orch: bool = False,
791793
instance_id: Optional[str] = None,
792794
fn_task: Optional[task.CompletableTask[TOutput]] = None,
@@ -806,7 +808,7 @@ def call_activity_function_helper(
806808
if isinstance(activity_function, str)
807809
else task.get_name(activity_function)
808810
)
809-
action = ph.new_schedule_task_action(id, name, encoded_input)
811+
action = ph.new_schedule_task_action(id, name, encoded_input, tags)
810812
else:
811813
if instance_id is None:
812814
# Create a deteministic instance ID based on the parent instance ID

tests/durabletask/test_orchestration_e2e.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ def empty_orchestrator(ctx: task.OrchestrationContext, _):
2929
w.start()
3030

3131
c = client.TaskHubGrpcClient()
32-
id = c.schedule_new_orchestration(empty_orchestrator)
32+
id = c.schedule_new_orchestration(empty_orchestrator, tags={'Tagged': 'true'})
3333
state = c.wait_for_orchestration_completion(id, timeout=30)
3434

3535
assert invoked
@@ -52,7 +52,7 @@ def sequence(ctx: task.OrchestrationContext, start_val: int):
5252
numbers = [start_val]
5353
current = start_val
5454
for _ in range(10):
55-
current = yield ctx.call_activity(plus_one, input=current)
55+
current = yield ctx.call_activity(plus_one, input=current, tags={'Activity': 'PlusOne'})
5656
numbers.append(current)
5757
return numbers
5858

@@ -63,7 +63,7 @@ def sequence(ctx: task.OrchestrationContext, start_val: int):
6363
w.start()
6464

6565
task_hub_client = client.TaskHubGrpcClient()
66-
id = task_hub_client.schedule_new_orchestration(sequence, input=1)
66+
id = task_hub_client.schedule_new_orchestration(sequence, input=1, tags={'Orchestration': 'Sequence'})
6767
state = task_hub_client.wait_for_orchestration_completion(
6868
id, timeout=30)
6969

0 commit comments

Comments
 (0)