-
Notifications
You must be signed in to change notification settings - Fork 15
Implement missing send_event capability for orchestration context #52
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 7 commits
8f1445c
c2684fd
22d1459
cd9d951
00b4957
7ba38ce
ccfbefd
58d88c7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -852,6 +852,22 @@ def wait_for_external_event(self, name: str) -> task.Task: | |
task_list.append(external_event_task) | ||
return external_event_task | ||
|
||
def send_event(self, instance_id: str, event_name: str, *, | ||
data: Optional[Any] = None) -> task.Task: | ||
# Validate inputs similar to .NET implementation | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's remove this comment. The fact that a .NET implementation does basic input validation is not noteworthy, and might give the wrong impression to developers who read this code and aren't necessarily interested in .NET. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed the .NET implementation comment as requested. Commit 4ca1234. |
||
if not instance_id: | ||
raise ValueError("instance_id cannot be None or empty") | ||
if not event_name: | ||
raise ValueError("event_name cannot be None or empty") | ||
|
||
id = self.next_sequence_number() | ||
encoded_data = shared.to_json(data) if data is not None else None | ||
action = ph.new_send_event_action(id, instance_id, event_name, encoded_data) | ||
self._pending_actions[id] = action | ||
send_event_task = task.CompletableTask() | ||
self._pending_tasks[id] = send_event_task | ||
return send_event_task | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sending an event is a one-way operation and is therefore not awaitable. There's no need to create a task for this, and no need to return anything from this method. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed send_event to not create or return any task. It's now a one-way operation returning None. Commit 4ca1234. |
||
|
||
def continue_as_new(self, new_input, *, save_events: bool = False) -> None: | ||
if self._is_complete: | ||
return | ||
|
@@ -1188,6 +1204,17 @@ def process_event( | |
self._logger.info( | ||
f"{ctx.instance_id}: Event '{event_name}' has been buffered as there are no tasks waiting for it." | ||
) | ||
elif event.HasField("eventSent"): | ||
# This history event confirms that the event was successfully sent. | ||
# Complete the corresponding send_event task. | ||
event_id = event.eventId | ||
send_event_task = ctx._pending_tasks.pop(event_id, None) | ||
if send_event_task: | ||
# For send_event, we don't return any meaningful result, just completion | ||
send_event_task.complete(None) | ||
ctx.resume() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar to my other comment, sending an event is not an awaitable operation, so we should not have any tasks associated with sending events. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed the eventSent handling and all task-related logic for send_event operations. Commit 4ca1234. |
||
# Also remove the corresponding action from pending actions | ||
ctx._pending_actions.pop(event_id, None) | ||
elif event.HasField("executionSuspended"): | ||
if not self._is_suspended and not ctx.is_replaying: | ||
self._logger.info(f"{ctx.instance_id}: Execution suspended.") | ||
|
@@ -1304,8 +1331,8 @@ def _get_method_name_for_action(action: pb.OrchestratorAction) -> str: | |
return task.get_name(task.OrchestrationContext.create_timer) | ||
elif action_type == "createSubOrchestration": | ||
return task.get_name(task.OrchestrationContext.call_sub_orchestrator) | ||
# elif action_type == "sendEvent": | ||
# return task.get_name(task.OrchestrationContext.send_event) | ||
elif action_type == "sendEvent": | ||
return task.get_name(task.OrchestrationContext.send_event) | ||
else: | ||
raise NotImplementedError(f"Action type '{action_type}' not supported!") | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove the return value from this method definition. This method should return
None
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated send_event abstract method to return None instead of Task. Commit 4ca1234.