Conversation
| this.historyEventPlayer = new OrchestrationHistoryIterator(pastEvents, newEvents); | ||
| } | ||
|
|
||
| // private void initialize(){ |
There was a problem hiding this comment.
This method is never invoked, but still pass the test. I am wondering where I made the mistake. @cgillum @davidmrdavid please help.
There was a problem hiding this comment.
What were you expected this code would be needed for? I'm not sure if this is actually needed.
There was a problem hiding this comment.
The Python SDK has something like this here -https://github.com/Azure/azure-functions-durable-python/blob/2f679e9e6507e7c41195fc752c26e215c532e014/azure/durable_functions/models/TaskOrchestrationExecutor.py#L109
In essence, in Python the initialize method helps "clear" the orchestrator's state when a continue_as_new event is processed by the TaskOrchestrationExecutor.
However, in a later comment you clarify that the Continue_as_new is no longer an event type, which is news to me :) . So if this is already being handled in "the backend", then this is certainly not needed
| // break; | ||
| // case CONTINUEASNEW: | ||
| // break; | ||
| case CONTINUEASNEW: |
There was a problem hiding this comment.
when debugging the test, never see a event with this type coming. but the test still pass. @cgillum @davidmrdavid please help.
There was a problem hiding this comment.
This is expected. Only older versions of the Durable Task Framework actually use this event. Newer versions implement this handling directly in the host. We can remove it from the switch statement.
There was a problem hiding this comment.
I see, this is new to me. So is it fair to conclude that, moving forward, OOProc PLs don't need to handle ContinueAsNew history events at all?
There was a problem hiding this comment.
Yes, I think it's fair to ignore this moving forward.
cgillum
left a comment
There was a problem hiding this comment.
Glad to see you got this working so quickly! Some feedback:
|
|
||
| void continueAsNew(Object input, boolean preserveUnprocessedEvents ); | ||
|
|
||
| void sendEvent(OrchestratorService.OrchestrationInstance instance, String eventName, Object eventData); |
There was a problem hiding this comment.
Instead of exposing the gRPC types in the public API (OrchestratorService.*), let's just have the user pass in the orchestration instance ID as a String. Something like this:
void sendEvent(String instanceId, String eventName, Object eventData);
The internal implementation of sendEvent can then construct the OrchestrationInstance object.
Also, consider adding a default overload that passes null for eventData.
| // break; | ||
| // case CONTINUEASNEW: | ||
| // break; | ||
| case CONTINUEASNEW: |
There was a problem hiding this comment.
This is expected. Only older versions of the Durable Task Framework actually use this event. Newer versions implement this handling directly in the host. We can remove it from the switch statement.
| // Send all the buffered external events to ourself. | ||
| OrchestrationInstance.Builder OrchestrationInstanceBuilder = OrchestrationInstance.newBuilder().setInstanceId(this.instanceId); | ||
| for (EventRaisedEvent unprocessedEvent : unprocessedEvents) { | ||
| this.sendEvent(OrchestrationInstanceBuilder.build(), unprocessedEvent.getName(), unprocessedEvent.getInput()); |
There was a problem hiding this comment.
Per my other feedback let's replace OrchestrationInstanceBuilder.build() with this.instanceId.
| this.historyEventPlayer = new OrchestrationHistoryIterator(pastEvents, newEvents); | ||
| } | ||
|
|
||
| // private void initialize(){ |
There was a problem hiding this comment.
What were you expected this code would be needed for? I'm not sure if this is actually needed.
| } | ||
|
|
||
| @Override | ||
| public void continueAsNew(Object input, boolean preserveUnprocessedEvents ){ |
There was a problem hiding this comment.
nit: fix whitespace here
| public void continueAsNew(Object input, boolean preserveUnprocessedEvents ){ | |
| public void continueAsNew(Object input, boolean preserveUnprocessedEvents) { |
| private boolean isComplete; | ||
| private boolean hasContinueAsNew; | ||
|
|
||
|
|
There was a problem hiding this comment.
nit: let's remove this extra whitespace.
| static void throwIfArgumentNullOrWhiteSpace(String argValue, String argName) { | ||
| throwIfArgumentNull(argValue, argName); | ||
| if (argValue.trim().length() == 0){ | ||
| throw new IllegalArgumentException("Argument '" + argName + "' was WhiteSpace."); |
There was a problem hiding this comment.
I would rephrase this slightly:
| throw new IllegalArgumentException("Argument '" + argName + "' was WhiteSpace."); | |
| throw new IllegalArgumentException("The argument '" + argName + "' was empty or contained only whitespace."); |
| .build()); | ||
| this.hasContinueAsNew = true; | ||
|
|
||
| if (preserveUnprocessedEvents){ |
There was a problem hiding this comment.
Let's add another integration test that covers this case (unprocessed events). Basically, you should write an orchestration that listens for a single external event and then immediately does ctx.continueAsNew(n + 1). The client should then start an orchestration and, without waiting, immediately raise N external events to the orchestration. For example, 10 events. The client should then wait for the orchestration to complete and verify that the input (and/or output) on the OrchestrationMetadata object is 10, meaning that all 10 external events were processed.
davidmrdavid
left a comment
There was a problem hiding this comment.
I agree with Chris' suggestions. Also left some thoughts and questions, mostly for Chris as well :)
| //TODO: add logic for newVersion | ||
| CompleteOrchestrationAction.Builder completeOrchestrationActionBuilder = CompleteOrchestrationAction.newBuilder() | ||
| .setOrchestrationStatus(OrchestrationStatus.ORCHESTRATION_STATUS_CONTINUED_AS_NEW) | ||
| .setNewVersion(StringValue.of("")); |
There was a problem hiding this comment.
I keep on seeing this version stuff in the actions, which the old out-of-process model did not need. Is this a new requirement, @cgillum?
There was a problem hiding this comment.
The Durable Task Framework has a concept of versions of activities and orchestrations. However, we never exposed it to Durable Functions because I couldn't reason about how to leverage it effectively. I went ahead and carried it forward for the OOProc protocols since I want those to work with DTFx but even in the new SDKs we don't actually use them anywhere. At some point we'll want to spend time thinking about how to implement versioning, but we're not there quite yet.
| // break; | ||
| // case CONTINUEASNEW: | ||
| // break; | ||
| case CONTINUEASNEW: |
There was a problem hiding this comment.
I see, this is new to me. So is it fair to conclude that, moving forward, OOProc PLs don't need to handle ContinueAsNew history events at all?
8bda6dd to
9216d6e
Compare
|
@kaibocai please sync the latest changes from this PR so that you have my updates locally on your machine. It would be sad if you accidentally force-push and delete my changes. :) |
Support for "carryover" events (#25)
23bce59 to
1c2da39
Compare
This pr includes:
refer to #6