From 3f97f85e108f6372d062618931d08c7ccffdc4ba Mon Sep 17 00:00:00 2001 From: kaibocai <89094811+kaibocai@users.noreply.github.com> Date: Mon, 10 Apr 2023 13:12:46 -0500 Subject: [PATCH 1/2] Revert "Implement suspend and resume client APIs (#104)" This reverts commit c303a12cf7fd94415c1e0339b0767c407965a77b. --- .github/workflows/build-validation.yml | 2 +- CHANGELOG.md | 1 - .../durabletask/DurableTaskClient.java | 30 ---- .../durabletask/DurableTaskGrpcClient.java | 25 +-- .../OrchestrationRuntimeStatus.java | 11 +- .../TaskOrchestrationExecutor.java | 145 +++++++----------- .../durabletask/IntegrationTests.java | 66 -------- 7 files changed, 61 insertions(+), 219 deletions(-) diff --git a/.github/workflows/build-validation.yml b/.github/workflows/build-validation.yml index 0ec556dc..3eaccff8 100644 --- a/.github/workflows/build-validation.yml +++ b/.github/workflows/build-validation.yml @@ -95,4 +95,4 @@ jobs: uses: actions/upload-artifact@v2 with: name: Integration test report - path: client/build/reports/tests/endToEndTest + path: client/build/reports/tests/endToEndTest \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index 367b2b1a..5a00e8fb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,6 @@ ## v1.1.0 ### Updates -* Support Suspend and Resume Client APIs ([#104](https://github.com/microsoft/durabletask-java/issues/104)) * Fix the potential NPE issue of `DurableTaskClient terminate` method ([#104](https://github.com/microsoft/durabletask-java/issues/104)) * Add waitForCompletionOrCreateCheckStatusResponse client API ([#115](https://github.com/microsoft/durabletask-java/pull/115)) * Support long timers by breaking up into smaller timers ([#114](https://github.com/microsoft/durabletask-java/issues/114)) diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java index 050ef4f7..597c90fd 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java @@ -281,34 +281,4 @@ public abstract OrchestrationMetadata waitForInstanceCompletion( * @return the result of the purge operation, including the number of purged orchestration instances (0 or 1) */ public abstract PurgeResult purgeInstances(PurgeInstanceCriteria purgeInstanceCriteria) throws TimeoutException; - - /** - * Suspends a running orchestration instance. - * @param instanceId the ID of the orchestration instance to suspend - */ - public void suspendInstance (String instanceId) { - this.suspendInstance(instanceId, null); - } - - /** - * Resumes a running orchestration instance. - * @param instanceId the ID of the orchestration instance to resume - */ - public void resumeInstance(String instanceId) { - this.resumeInstance(instanceId, null); - } - - /** - * Suspends a running orchestration instance. - * @param instanceId the ID of the orchestration instance to suspend - * @param reason the reason for suspending the orchestration instance - */ - public abstract void suspendInstance(String instanceId, @Nullable String reason); - - /** - * Resumes a running orchestration instance. - * @param instanceId the ID of the orchestration instance to resume - * @param reason the reason for resuming the orchestration instance - */ - public abstract void resumeInstance(String instanceId, @Nullable String reason); } \ No newline at end of file diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java index e078aade..ea994766 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java @@ -204,10 +204,7 @@ public void terminate(String instanceId, @Nullable Object output) { "Terminating instance %s and setting output to: %s", instanceId, serializeOutput != null ? serializeOutput : "(null)")); - TerminateRequest.Builder builder = TerminateRequest.newBuilder().setInstanceId(instanceId); - if (serializeOutput != null){ - builder.setOutput(StringValue.of(serializeOutput)); - } + TerminateRequest.Builder builder = TerminateRequest.newBuilder().setInstanceId(instanceId).setOutput(StringValue.of(serializeOutput)); this.sidecarClient.terminateInstance(builder.build()); } @@ -283,26 +280,6 @@ public PurgeResult purgeInstances(PurgeInstanceCriteria purgeInstanceCriteria) t } } - @Override - public void suspendInstance(String instanceId, @Nullable String reason) { - SuspendRequest.Builder suspendRequestBuilder = SuspendRequest.newBuilder(); - suspendRequestBuilder.setInstanceId(instanceId); - if (reason != null) { - suspendRequestBuilder.setReason(StringValue.of(reason)); - } - this.sidecarClient.suspendInstance(suspendRequestBuilder.build()); - } - - @Override - public void resumeInstance(String instanceId, @Nullable String reason) { - ResumeRequest.Builder resumeRequestBuilder = ResumeRequest.newBuilder(); - resumeRequestBuilder.setInstanceId(instanceId); - if (reason != null) { - resumeRequestBuilder.setReason(StringValue.of(reason)); - } - this.sidecarClient.resumeInstance(resumeRequestBuilder.build()); - } - private PurgeResult toPurgeResult(PurgeInstancesResponse response){ return new PurgeResult(response.getDeletedInstanceCount()); } diff --git a/client/src/main/java/com/microsoft/durabletask/OrchestrationRuntimeStatus.java b/client/src/main/java/com/microsoft/durabletask/OrchestrationRuntimeStatus.java index a205ccc7..547ecfb5 100644 --- a/client/src/main/java/com/microsoft/durabletask/OrchestrationRuntimeStatus.java +++ b/client/src/main/java/com/microsoft/durabletask/OrchestrationRuntimeStatus.java @@ -46,12 +46,7 @@ public enum OrchestrationRuntimeStatus { /** * The orchestration was scheduled but hasn't started running. */ - PENDING, - - /** - * The orchestration is in a suspended state. - */ - SUSPENDED; + PENDING; static OrchestrationRuntimeStatus fromProtobuf(OrchestrationStatus status) { switch (status) { @@ -69,8 +64,6 @@ static OrchestrationRuntimeStatus fromProtobuf(OrchestrationStatus status) { return TERMINATED; case ORCHESTRATION_STATUS_PENDING: return PENDING; - case ORCHESTRATION_STATUS_SUSPENDED: - return SUSPENDED; default: throw new IllegalArgumentException(String.format("Unknown status value: %s", status)); } @@ -92,8 +85,6 @@ static OrchestrationStatus toProtobuf(OrchestrationRuntimeStatus status){ return ORCHESTRATION_STATUS_TERMINATED; case PENDING: return ORCHESTRATION_STATUS_PENDING; - case SUSPENDED: - return ORCHESTRATION_STATUS_SUSPENDED; default: throw new IllegalArgumentException(String.format("Unknown status value: %s", status)); } diff --git a/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java b/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java index 9bf88992..f31532f2 100644 --- a/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java +++ b/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java @@ -70,7 +70,6 @@ private class ContextImplTask implements TaskOrchestrationContext { private String instanceId; private Instant currentInstant; private boolean isComplete; - private boolean isSuspended; private boolean isReplaying = true; // LinkedHashMap to maintain insertion order when returning the list of pending actions @@ -78,7 +77,6 @@ private class ContextImplTask implements TaskOrchestrationContext { private final HashMap> openTasks = new HashMap<>(); private final LinkedHashMap>> outstandingEvents = new LinkedHashMap<>(); private final LinkedList unprocessedEvents = new LinkedList<>(); - private final Queue eventsWhileSuspended = new ArrayDeque<>(); private final DataConverter dataConverter = TaskOrchestrationExecutor.this.dataConverter; private final Duration maximumTimerInterval = TaskOrchestrationExecutor.this.maximumTimerInterval; private final Logger logger = TaskOrchestrationExecutor.this.logger; @@ -87,6 +85,7 @@ private class ContextImplTask implements TaskOrchestrationContext { private boolean continuedAsNew; private Object continuedAsNewInput; private boolean preserveUnprocessedEvents; + private Object customStatus; public ContextImplTask(List pastEvents, List newEvents) { @@ -530,23 +529,6 @@ private void handleEventRaised(HistoryEvent e) { task.complete(result); } - private void handleEventWhileSuspended (HistoryEvent historyEvent){ - if (historyEvent.getEventTypeCase() != HistoryEvent.EventTypeCase.EXECUTIONSUSPENDED) { - eventsWhileSuspended.offer(historyEvent); - } - } - - private void handleExecutionSuspended(HistoryEvent historyEvent) { - this.isSuspended = true; - } - - private void handleExecutionResumed(HistoryEvent historyEvent) { - this.isSuspended = false; - while (!eventsWhileSuspended.isEmpty()) { - this.processEvent(eventsWhileSuspended.poll()); - } - } - public Task createTimer(Duration duration) { Helpers.throwIfOrchestratorComplete(this.isComplete); Helpers.throwIfArgumentNull(duration, "duration"); @@ -762,86 +744,75 @@ private boolean processNextEvent() { } private void processEvent(HistoryEvent e) { - boolean overrideSuspension = e.getEventTypeCase() == HistoryEvent.EventTypeCase.EXECUTIONRESUMED || e.getEventTypeCase() == HistoryEvent.EventTypeCase.EXECUTIONTERMINATED; - if (this.isSuspended && !overrideSuspension) { - this.handleEventWhileSuspended(e); - } else { - switch (e.getEventTypeCase()) { - case ORCHESTRATORSTARTED: - Instant instant = DataConverter.getInstantFromTimestamp(e.getTimestamp()); - this.setCurrentInstant(instant); - break; - case ORCHESTRATORCOMPLETED: - // No action - break; - case EXECUTIONSTARTED: - ExecutionStartedEvent startedEvent = e.getExecutionStarted(); - String name = startedEvent.getName(); - this.setName(name); - String instanceId = startedEvent.getOrchestrationInstance().getInstanceId(); - this.setInstanceId(instanceId); - String input = startedEvent.getInput().getValue(); - this.setInput(input); - TaskOrchestrationFactory factory = TaskOrchestrationExecutor.this.orchestrationFactories.get(name); - if (factory == null) { - // Try getting the default orchestrator - factory = TaskOrchestrationExecutor.this.orchestrationFactories.get("*"); - } - // TODO: Throw if the factory is null (orchestration by that name doesn't exist) - TaskOrchestration orchestrator = factory.create(); - orchestrator.run(this); - break; + switch (e.getEventTypeCase()) { + case ORCHESTRATORSTARTED: + Instant instant = DataConverter.getInstantFromTimestamp(e.getTimestamp()); + this.setCurrentInstant(instant); + break; + case ORCHESTRATORCOMPLETED: + // No action + break; + case EXECUTIONSTARTED: + ExecutionStartedEvent startedEvent = e.getExecutionStarted(); + String name = startedEvent.getName(); + this.setName(name); + String instanceId = startedEvent.getOrchestrationInstance().getInstanceId(); + this.setInstanceId(instanceId); + String input = startedEvent.getInput().getValue(); + this.setInput(input); + TaskOrchestrationFactory factory = TaskOrchestrationExecutor.this.orchestrationFactories.get(name); + if (factory == null) { + // Try getting the default orchestrator + factory = TaskOrchestrationExecutor.this.orchestrationFactories.get("*"); + } + // TODO: Throw if the factory is null (orchestration by that name doesn't exist) + TaskOrchestration orchestrator = factory.create(); + orchestrator.run(this); + break; // case EXECUTIONCOMPLETED: // break; // case EXECUTIONFAILED: // break; - case EXECUTIONTERMINATED: - this.handleExecutionTerminated(e); - break; - case TASKSCHEDULED: - this.handleTaskScheduled(e); - break; - case TASKCOMPLETED: - this.handleTaskCompleted(e); - break; - case TASKFAILED: - this.handleTaskFailed(e); - break; - case TIMERCREATED: - this.handleTimerCreated(e); - break; - case TIMERFIRED: - this.handleTimerFired(e); - break; - case SUBORCHESTRATIONINSTANCECREATED: - this.handleSubOrchestrationCreated(e); - break; - case SUBORCHESTRATIONINSTANCECOMPLETED: - this.handleSubOrchestrationCompleted(e); - break; - case SUBORCHESTRATIONINSTANCEFAILED: - this.handleSubOrchestrationFailed(e); - break; + case EXECUTIONTERMINATED: + this.handleExecutionTerminated(e); + break; + case TASKSCHEDULED: + this.handleTaskScheduled(e); + break; + case TASKCOMPLETED: + this.handleTaskCompleted(e); + break; + case TASKFAILED: + this.handleTaskFailed(e); + break; + case TIMERCREATED: + this.handleTimerCreated(e); + break; + case TIMERFIRED: + this.handleTimerFired(e); + break; + case SUBORCHESTRATIONINSTANCECREATED: + this.handleSubOrchestrationCreated(e); + break; + case SUBORCHESTRATIONINSTANCECOMPLETED: + this.handleSubOrchestrationCompleted(e); + break; + case SUBORCHESTRATIONINSTANCEFAILED: + this.handleSubOrchestrationFailed(e); + break; // case EVENTSENT: // break; - case EVENTRAISED: - this.handleEventRaised(e); - break; + case EVENTRAISED: + this.handleEventRaised(e); + break; // case GENERICEVENT: // break; // case HISTORYSTATE: // break; // case EVENTTYPE_NOT_SET: // break; - case EXECUTIONSUSPENDED: - this.handleExecutionSuspended(e); - break; - case EXECUTIONRESUMED: - this.handleExecutionResumed(e); - break; - default: - throw new IllegalStateException("Don't know how to handle history type " + e.getEventTypeCase()); - } + default: + throw new IllegalStateException("Don't know how to handle history type " + e.getEventTypeCase()); } } diff --git a/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java b/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java index af230169..f9992855 100644 --- a/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java +++ b/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java @@ -408,72 +408,6 @@ void termination() throws TimeoutException { } } - @Test - void suspendResumeOrchestration() throws TimeoutException, InterruptedException { - final String orchestratorName = "suspend"; - final String eventName = "MyEvent"; - final String eventPayload = "testPayload"; - final Duration suspendTimeout = Duration.ofSeconds(5); - - DurableTaskGrpcWorker worker = this.createWorkerBuilder() - .addOrchestrator(orchestratorName, ctx -> { - String payload = ctx.waitForExternalEvent(eventName, String.class).await(); - ctx.complete(payload); - }) - .buildAndStart(); - - DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); - try (worker; client) { - String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName); - client.suspendInstance(instanceId); - OrchestrationMetadata instance = client.waitForInstanceStart(instanceId, defaultTimeout); - assertNotNull(instance); - assertEquals(OrchestrationRuntimeStatus.SUSPENDED, instance.getRuntimeStatus()); - - client.raiseEvent(instanceId, eventName, eventPayload); - - assertThrows( - TimeoutException.class, - () -> client.waitForInstanceCompletion(instanceId, suspendTimeout, false), - "Expected to throw TimeoutException, but it didn't" - ); - - String resumeReason = "Resume for testing."; - client.resumeInstance(instanceId, resumeReason); - instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true); - assertNotNull(instance); - assertEquals(instanceId, instance.getInstanceId()); - assertEquals(eventPayload, instance.readOutputAs(String.class)); - assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); - } - } - - @Test - void terminateSuspendOrchestration() throws TimeoutException, InterruptedException { - final String orchestratorName = "suspendResume"; - final String eventName = "MyEvent"; - final String eventPayload = "testPayload"; - - DurableTaskGrpcWorker worker = this.createWorkerBuilder() - .addOrchestrator(orchestratorName, ctx -> { - String payload = ctx.waitForExternalEvent(eventName, String.class).await(); - ctx.complete(payload); - }) - .buildAndStart(); - - DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); - try (worker; client) { - String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName); - String suspendReason = "Suspend for testing."; - client.suspendInstance(instanceId, suspendReason); - client.terminate(instanceId, null); - OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, false); - assertNotNull(instance); - assertEquals(instanceId, instance.getInstanceId()); - assertEquals(OrchestrationRuntimeStatus.TERMINATED, instance.getRuntimeStatus()); - } - } - @Test void activityFanOut() throws IOException, TimeoutException { final String orchestratorName = "ActivityFanOut"; From 03ed200caef94e10c55e9640a610423ee1d46406 Mon Sep 17 00:00:00 2001 From: kaibocai <89094811+kaibocai@users.noreply.github.com> Date: Mon, 10 Apr 2023 13:43:56 -0500 Subject: [PATCH 2/2] fix NPE when terminate instance --- .../com/microsoft/durabletask/DurableTaskGrpcClient.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java index ea994766..6f928050 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java @@ -204,7 +204,10 @@ public void terminate(String instanceId, @Nullable Object output) { "Terminating instance %s and setting output to: %s", instanceId, serializeOutput != null ? serializeOutput : "(null)")); - TerminateRequest.Builder builder = TerminateRequest.newBuilder().setInstanceId(instanceId).setOutput(StringValue.of(serializeOutput)); + TerminateRequest.Builder builder = TerminateRequest.newBuilder().setInstanceId(instanceId); + if (serializeOutput != null) { + builder.setOutput(StringValue.of(serializeOutput)); + } this.sidecarClient.terminateInstance(builder.build()); }