Skip to content

Implement missing send_event capability for orchestration context #52

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
16 changes: 16 additions & 0 deletions durabletask/internal/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,14 @@ def new_event_raised_event(name: str, encoded_input: Optional[str] = None) -> pb
)


def new_event_sent_event(instance_id: str, name: str, encoded_input: Optional[str] = None) -> pb.HistoryEvent:
return pb.HistoryEvent(
eventId=-1,
timestamp=timestamp_pb2.Timestamp(),
eventSent=pb.EventSentEvent(instanceId=instance_id, name=name, input=get_string_value(encoded_input))
)


def new_suspend_event() -> pb.HistoryEvent:
return pb.HistoryEvent(
eventId=-1,
Expand Down Expand Up @@ -203,6 +211,14 @@ def new_create_sub_orchestration_action(
))


def new_send_event_action(id: int, instance_id: str, event_name: str, encoded_data: Optional[str]) -> pb.OrchestratorAction:
return pb.OrchestratorAction(id=id, sendEvent=pb.SendEventAction(
instance=pb.OrchestrationInstance(instanceId=instance_id),
name=event_name,
data=get_string_value(encoded_data)
))


def is_empty(v: wrappers_pb2.StringValue):
return v is None or v.value == ''

Expand Down
21 changes: 21 additions & 0 deletions durabletask/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,27 @@ def wait_for_external_event(self, name: str) -> Task:
"""
pass

@abstractmethod
def send_event(self, instance_id: str, event_name: str, *,
data: Optional[Any] = None) -> Task:
"""Send an event to another orchestration instance.

Parameters
----------
instance_id : str
The ID of the orchestration instance to send the event to.
event_name : str
The name of the event to send.
data : Optional[Any]
The optional JSON-serializable data to include with the event.

Returns
-------
Task
A Durable Task that completes when the event has been sent.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove the return value from this method definition. This method should return None.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated send_event abstract method to return None instead of Task. Commit 4ca1234.

"""
pass

@abstractmethod
def continue_as_new(self, new_input: Any, *, save_events: bool = False) -> None:
"""Continue the orchestration execution as a new instance.
Expand Down
31 changes: 29 additions & 2 deletions durabletask/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,22 @@ def wait_for_external_event(self, name: str) -> task.Task:
task_list.append(external_event_task)
return external_event_task

def send_event(self, instance_id: str, event_name: str, *,
data: Optional[Any] = None) -> task.Task:
# Validate inputs similar to .NET implementation
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove this comment. The fact that a .NET implementation does basic input validation is not noteworthy, and might give the wrong impression to developers who read this code and aren't necessarily interested in .NET.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the .NET implementation comment as requested. Commit 4ca1234.

if not instance_id:
raise ValueError("instance_id cannot be None or empty")
if not event_name:
raise ValueError("event_name cannot be None or empty")

id = self.next_sequence_number()
encoded_data = shared.to_json(data) if data is not None else None
action = ph.new_send_event_action(id, instance_id, event_name, encoded_data)
self._pending_actions[id] = action
send_event_task = task.CompletableTask()
self._pending_tasks[id] = send_event_task
return send_event_task
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sending an event is a one-way operation and is therefore not awaitable. There's no need to create a task for this, and no need to return anything from this method.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed send_event to not create or return any task. It's now a one-way operation returning None. Commit 4ca1234.


def continue_as_new(self, new_input, *, save_events: bool = False) -> None:
if self._is_complete:
return
Expand Down Expand Up @@ -1188,6 +1204,17 @@ def process_event(
self._logger.info(
f"{ctx.instance_id}: Event '{event_name}' has been buffered as there are no tasks waiting for it."
)
elif event.HasField("eventSent"):
# This history event confirms that the event was successfully sent.
# Complete the corresponding send_event task.
event_id = event.eventId
send_event_task = ctx._pending_tasks.pop(event_id, None)
if send_event_task:
# For send_event, we don't return any meaningful result, just completion
send_event_task.complete(None)
ctx.resume()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to my other comment, sending an event is not an awaitable operation, so we should not have any tasks associated with sending events.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the eventSent handling and all task-related logic for send_event operations. Commit 4ca1234.

# Also remove the corresponding action from pending actions
ctx._pending_actions.pop(event_id, None)
elif event.HasField("executionSuspended"):
if not self._is_suspended and not ctx.is_replaying:
self._logger.info(f"{ctx.instance_id}: Execution suspended.")
Expand Down Expand Up @@ -1304,8 +1331,8 @@ def _get_method_name_for_action(action: pb.OrchestratorAction) -> str:
return task.get_name(task.OrchestrationContext.create_timer)
elif action_type == "createSubOrchestration":
return task.get_name(task.OrchestrationContext.call_sub_orchestrator)
# elif action_type == "sendEvent":
# return task.get_name(task.OrchestrationContext.send_event)
elif action_type == "sendEvent":
return task.get_name(task.OrchestrationContext.send_event)
else:
raise NotImplementedError(f"Action type '{action_type}' not supported!")

Expand Down
Loading
Loading