diff --git a/durabletask/client.py b/durabletask/client.py index 60e194f..277f39c 100644 --- a/durabletask/client.py +++ b/durabletask/client.py @@ -123,6 +123,7 @@ def schedule_new_orchestration(self, orchestrator: Union[task.Orchestrator[TInpu input: Optional[TInput] = None, instance_id: Optional[str] = None, start_at: Optional[datetime] = None, + tags: Optional[dict[str, str]] = None, reuse_id_policy: Optional[pb.OrchestrationIdReusePolicy] = None) -> str: name = orchestrator if isinstance(orchestrator, str) else task.get_name(orchestrator) @@ -134,6 +135,7 @@ def schedule_new_orchestration(self, orchestrator: Union[task.Orchestrator[TInpu scheduledStartTimestamp=helpers.new_timestamp(start_at) if start_at else None, version=wrappers_pb2.StringValue(value=""), orchestrationIdReusePolicy=reuse_id_policy, + tags=tags, ) self._logger.info(f"Starting new '{name}' instance with ID = '{req.instanceId}'.") diff --git a/durabletask/internal/helpers.py b/durabletask/internal/helpers.py index 6b36586..29f29e0 100644 --- a/durabletask/internal/helpers.py +++ b/durabletask/internal/helpers.py @@ -19,14 +19,16 @@ def new_orchestrator_started_event(timestamp: Optional[datetime] = None) -> pb.H return pb.HistoryEvent(eventId=-1, timestamp=ts, orchestratorStarted=pb.OrchestratorStartedEvent()) -def new_execution_started_event(name: str, instance_id: str, encoded_input: Optional[str] = None) -> pb.HistoryEvent: +def new_execution_started_event(name: str, instance_id: str, encoded_input: Optional[str] = None, + tags: Optional[dict[str, str]] = None) -> pb.HistoryEvent: return pb.HistoryEvent( eventId=-1, timestamp=timestamp_pb2.Timestamp(), executionStarted=pb.ExecutionStartedEvent( name=name, input=get_string_value(encoded_input), - orchestrationInstance=pb.OrchestrationInstance(instanceId=instance_id))) + orchestrationInstance=pb.OrchestrationInstance(instanceId=instance_id), + tags=tags)) def new_timer_created_event(timer_id: int, fire_at: datetime) -> pb.HistoryEvent: @@ -178,10 +180,12 @@ def new_create_timer_action(id: int, fire_at: datetime) -> pb.OrchestratorAction return pb.OrchestratorAction(id=id, createTimer=pb.CreateTimerAction(fireAt=timestamp)) -def new_schedule_task_action(id: int, name: str, encoded_input: Optional[str]) -> pb.OrchestratorAction: +def new_schedule_task_action(id: int, name: str, encoded_input: Optional[str], + tags: Optional[dict[str, str]]) -> pb.OrchestratorAction: return pb.OrchestratorAction(id=id, scheduleTask=pb.ScheduleTaskAction( name=name, - input=get_string_value(encoded_input) + input=get_string_value(encoded_input), + tags=tags )) diff --git a/durabletask/task.py b/durabletask/task.py index 9e8a08a..1424436 100644 --- a/durabletask/task.py +++ b/durabletask/task.py @@ -100,7 +100,8 @@ def create_timer(self, fire_at: Union[datetime, timedelta]) -> Task: @abstractmethod def call_activity(self, activity: Union[Activity[TInput, TOutput], str], *, input: Optional[TInput] = None, - retry_policy: Optional[RetryPolicy] = None) -> Task[TOutput]: + retry_policy: Optional[RetryPolicy] = None, + tags: Optional[dict[str, str]] = None) -> Task[TOutput]: """Schedule an activity for execution. Parameters @@ -111,6 +112,8 @@ def call_activity(self, activity: Union[Activity[TInput, TOutput], str], *, The JSON-serializable input (or None) to pass to the activity. retry_policy: Optional[RetryPolicy] The retry policy to use for this activity call. + tags: Optional[dict[str, str]] + Optional tags to associate with the activity invocation. Returns ------- diff --git a/durabletask/worker.py b/durabletask/worker.py index b433a83..8a85070 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -752,11 +752,12 @@ def call_activity( *, input: Optional[TInput] = None, retry_policy: Optional[task.RetryPolicy] = None, + tags: Optional[dict[str, str]] = None, ) -> task.Task[TOutput]: id = self.next_sequence_number() self.call_activity_function_helper( - id, activity, input=input, retry_policy=retry_policy, is_sub_orch=False + id, activity, input=input, retry_policy=retry_policy, is_sub_orch=False, tags=tags ) return self._pending_tasks.get(id, task.CompletableTask()) @@ -787,6 +788,7 @@ def call_activity_function_helper( *, input: Optional[TInput] = None, retry_policy: Optional[task.RetryPolicy] = None, + tags: Optional[dict[str, str]] = None, is_sub_orch: bool = False, instance_id: Optional[str] = None, fn_task: Optional[task.CompletableTask[TOutput]] = None, @@ -806,7 +808,7 @@ def call_activity_function_helper( if isinstance(activity_function, str) else task.get_name(activity_function) ) - action = ph.new_schedule_task_action(id, name, encoded_input) + action = ph.new_schedule_task_action(id, name, encoded_input, tags) else: if instance_id is None: # Create a deteministic instance ID based on the parent instance ID diff --git a/tests/durabletask/test_orchestration_e2e.py b/tests/durabletask/test_orchestration_e2e.py index 3ccf782..63d2058 100644 --- a/tests/durabletask/test_orchestration_e2e.py +++ b/tests/durabletask/test_orchestration_e2e.py @@ -29,7 +29,7 @@ def empty_orchestrator(ctx: task.OrchestrationContext, _): w.start() c = client.TaskHubGrpcClient() - id = c.schedule_new_orchestration(empty_orchestrator) + id = c.schedule_new_orchestration(empty_orchestrator, tags={'Tagged': 'true'}) state = c.wait_for_orchestration_completion(id, timeout=30) assert invoked @@ -52,7 +52,7 @@ def sequence(ctx: task.OrchestrationContext, start_val: int): numbers = [start_val] current = start_val for _ in range(10): - current = yield ctx.call_activity(plus_one, input=current) + current = yield ctx.call_activity(plus_one, input=current, tags={'Activity': 'PlusOne'}) numbers.append(current) return numbers @@ -63,7 +63,7 @@ def sequence(ctx: task.OrchestrationContext, start_val: int): w.start() task_hub_client = client.TaskHubGrpcClient() - id = task_hub_client.schedule_new_orchestration(sequence, input=1) + id = task_hub_client.schedule_new_orchestration(sequence, input=1, tags={'Orchestration': 'Sequence'}) state = task_hub_client.wait_for_orchestration_completion( id, timeout=30)