From cdddff467b2d66830b9f47e9ec0a1db7cb0e1bbf Mon Sep 17 00:00:00 2001 From: kaibocai <89094811+kaibocai@users.noreply.github.com> Date: Fri, 16 Dec 2022 09:34:41 -0600 Subject: [PATCH 1/9] fix comments to align with official doc --- .../src/main/java/com/functions/AzureFunctions.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/samples-azure-functions/src/main/java/com/functions/AzureFunctions.java b/samples-azure-functions/src/main/java/com/functions/AzureFunctions.java index 58d4aee1..ad804370 100644 --- a/samples-azure-functions/src/main/java/com/functions/AzureFunctions.java +++ b/samples-azure-functions/src/main/java/com/functions/AzureFunctions.java @@ -31,12 +31,12 @@ public HttpResponseMessage startOrchestration( } /** - * This is the orchestrator function. The OrchestrationRunner.loadAndRun() static - * method is used to take the function input and execute the orchestrator logic. + * This is the orchestrator function, which can schedule activity functions, create durable timers, + * or wait for external events in a way that's completely fault-tolerant. */ @FunctionName("Cities") public String citiesOrchestrator( - @DurableOrchestrationTrigger(name = "taskOrchestrationContext") TaskOrchestrationContext ctx) { + @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) { String result = ""; result += ctx.callActivity("Capitalize", "Tokyo", String.class).await() + ", "; result += ctx.callActivity("Capitalize", "London", String.class).await() + ", "; From e8e343fdcf73cd12d2d2a0d288c95dbc87f4ff2f Mon Sep 17 00:00:00 2001 From: kaibocai <89094811+kaibocai@users.noreply.github.com> Date: Fri, 20 Jan 2023 09:25:57 -0600 Subject: [PATCH 2/9] implement suspend and resume client apis --- .../durabletask/DurableTaskClient.java | 30 ++++ .../durabletask/DurableTaskGrpcClient.java | 18 +++ .../OrchestrationRuntimeStatus.java | 11 +- .../TaskOrchestrationExecutor.java | 145 +++++++++++------- submodules/durabletask-protobuf | 2 +- 5 files changed, 146 insertions(+), 60 deletions(-) diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java index 597c90fd..a0b2ed52 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java @@ -281,4 +281,34 @@ public abstract OrchestrationMetadata waitForInstanceCompletion( * @return the result of the purge operation, including the number of purged orchestration instances (0 or 1) */ public abstract PurgeResult purgeInstances(PurgeInstanceCriteria purgeInstanceCriteria) throws TimeoutException; + + /** + * Suspends a running orchestration instance. + * @param instanceId the ID of the orchestration instance to suspend + */ + public void suspendInstance (String instanceId) { + this.suspendInstance(instanceId, null); + } + + /** + * Resumes a running orchestration instance. + * @param instanceId the ID of the orchestration instance to resume + */ + public void resumeInstance(String instanceId) { + this.resumeInstance(instanceId, null); + } + + /** + * Suspends a running orchestration instance. + * @param instanceId the ID of the orchestration instance to suspend + * @param reason the reason for suspending the orchestration instance + */ + public abstract void suspendInstance(String instanceId, String reason); + + /** + * Resumes a running orchestration instance. + * @param instanceId the ID of the orchestration instance to resume + * @param reason the reason for resuming the orchestration instance + */ + public abstract void resumeInstance(String instanceId, String reason); } \ No newline at end of file diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java index ea994766..0ee75568 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java @@ -280,6 +280,24 @@ public PurgeResult purgeInstances(PurgeInstanceCriteria purgeInstanceCriteria) t } } + @Override + public void suspendInstance(String instanceId, String reason) { + SuspendRequest suspendRequest = SuspendRequest.newBuilder() + .setInstanceId(instanceId) + .setReason(StringValue.of(reason)) + .build(); + this.sidecarClient.suspendInstance(suspendRequest); + } + + @Override + public void resumeInstance(String instanceId, String reason) { + ResumeRequest resumeRequest = ResumeRequest.newBuilder() + .setInstanceId(instanceId) + .setReason(StringValue.of(reason)) + .build(); + this.sidecarClient.resumeInstance(resumeRequest); + } + private PurgeResult toPurgeResult(PurgeInstancesResponse response){ return new PurgeResult(response.getDeletedInstanceCount()); } diff --git a/client/src/main/java/com/microsoft/durabletask/OrchestrationRuntimeStatus.java b/client/src/main/java/com/microsoft/durabletask/OrchestrationRuntimeStatus.java index 547ecfb5..a205ccc7 100644 --- a/client/src/main/java/com/microsoft/durabletask/OrchestrationRuntimeStatus.java +++ b/client/src/main/java/com/microsoft/durabletask/OrchestrationRuntimeStatus.java @@ -46,7 +46,12 @@ public enum OrchestrationRuntimeStatus { /** * The orchestration was scheduled but hasn't started running. */ - PENDING; + PENDING, + + /** + * The orchestration is in a suspended state. + */ + SUSPENDED; static OrchestrationRuntimeStatus fromProtobuf(OrchestrationStatus status) { switch (status) { @@ -64,6 +69,8 @@ static OrchestrationRuntimeStatus fromProtobuf(OrchestrationStatus status) { return TERMINATED; case ORCHESTRATION_STATUS_PENDING: return PENDING; + case ORCHESTRATION_STATUS_SUSPENDED: + return SUSPENDED; default: throw new IllegalArgumentException(String.format("Unknown status value: %s", status)); } @@ -85,6 +92,8 @@ static OrchestrationStatus toProtobuf(OrchestrationRuntimeStatus status){ return ORCHESTRATION_STATUS_TERMINATED; case PENDING: return ORCHESTRATION_STATUS_PENDING; + case SUSPENDED: + return ORCHESTRATION_STATUS_SUSPENDED; default: throw new IllegalArgumentException(String.format("Unknown status value: %s", status)); } diff --git a/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java b/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java index 883b7d39..a797cc69 100644 --- a/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java +++ b/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java @@ -66,6 +66,7 @@ private class ContextImplTask implements TaskOrchestrationContext { private String instanceId; private Instant currentInstant; private boolean isComplete; + private boolean isSuspended; private boolean isReplaying = true; // LinkedHashMap to maintain insertion order when returning the list of pending actions @@ -73,6 +74,7 @@ private class ContextImplTask implements TaskOrchestrationContext { private final HashMap> openTasks = new HashMap<>(); private final LinkedHashMap>> outstandingEvents = new LinkedHashMap<>(); private final LinkedList unprocessedEvents = new LinkedList<>(); + private final Queue eventsWhileSuspended = new ArrayDeque<>(); private final DataConverter dataConverter = TaskOrchestrationExecutor.this.dataConverter; private final Logger logger = TaskOrchestrationExecutor.this.logger; private final OrchestrationHistoryIterator historyEventPlayer; @@ -80,7 +82,6 @@ private class ContextImplTask implements TaskOrchestrationContext { private boolean continuedAsNew; private Object continuedAsNewInput; private boolean preserveUnprocessedEvents; - private Object customStatus; public ContextImplTask(List pastEvents, List newEvents) { @@ -524,6 +525,23 @@ private void handleEventRaised(HistoryEvent e) { task.complete(result); } + private void handleEventWhileSuspended (HistoryEvent historyEvent){ + if (historyEvent.getEventTypeCase() != HistoryEvent.EventTypeCase.EXECUTIONSUSPENDED) { + eventsWhileSuspended.offer(historyEvent); + } + } + + private void handleExecutionSuspended(HistoryEvent historyEvent) { + this.isSuspended = true; + } + + private void handleExecutionResumed(HistoryEvent historyEvent) { + this.isSuspended = false; + while (!eventsWhileSuspended.isEmpty()) { + this.processEvent(eventsWhileSuspended.poll()); + } + } + public Task createTimer(Duration duration) { Helpers.throwIfOrchestratorComplete(this.isComplete); Helpers.throwIfArgumentNull(duration, "duration"); @@ -717,75 +735,86 @@ private boolean processNextEvent() { } private void processEvent(HistoryEvent e) { - switch (e.getEventTypeCase()) { - case ORCHESTRATORSTARTED: - Instant instant = DataConverter.getInstantFromTimestamp(e.getTimestamp()); - this.setCurrentInstant(instant); - break; - case ORCHESTRATORCOMPLETED: - // No action - break; - case EXECUTIONSTARTED: - ExecutionStartedEvent startedEvent = e.getExecutionStarted(); - String name = startedEvent.getName(); - this.setName(name); - String instanceId = startedEvent.getOrchestrationInstance().getInstanceId(); - this.setInstanceId(instanceId); - String input = startedEvent.getInput().getValue(); - this.setInput(input); - TaskOrchestrationFactory factory = TaskOrchestrationExecutor.this.orchestrationFactories.get(name); - if (factory == null) { - // Try getting the default orchestrator - factory = TaskOrchestrationExecutor.this.orchestrationFactories.get("*"); - } - // TODO: Throw if the factory is null (orchestration by that name doesn't exist) - TaskOrchestration orchestrator = factory.create(); - orchestrator.run(this); - break; + boolean overrideSuspension = e.getEventTypeCase() == HistoryEvent.EventTypeCase.EXECUTIONRESUMED || e.getEventTypeCase() == HistoryEvent.EventTypeCase.EXECUTIONTERMINATED; + if (this.isSuspended && !overrideSuspension) { + this.handleEventWhileSuspended(e); + } else { + switch (e.getEventTypeCase()) { + case ORCHESTRATORSTARTED: + Instant instant = DataConverter.getInstantFromTimestamp(e.getTimestamp()); + this.setCurrentInstant(instant); + break; + case ORCHESTRATORCOMPLETED: + // No action + break; + case EXECUTIONSTARTED: + ExecutionStartedEvent startedEvent = e.getExecutionStarted(); + String name = startedEvent.getName(); + this.setName(name); + String instanceId = startedEvent.getOrchestrationInstance().getInstanceId(); + this.setInstanceId(instanceId); + String input = startedEvent.getInput().getValue(); + this.setInput(input); + TaskOrchestrationFactory factory = TaskOrchestrationExecutor.this.orchestrationFactories.get(name); + if (factory == null) { + // Try getting the default orchestrator + factory = TaskOrchestrationExecutor.this.orchestrationFactories.get("*"); + } + // TODO: Throw if the factory is null (orchestration by that name doesn't exist) + TaskOrchestration orchestrator = factory.create(); + orchestrator.run(this); + break; // case EXECUTIONCOMPLETED: // break; // case EXECUTIONFAILED: // break; - case EXECUTIONTERMINATED: - this.handleExecutionTerminated(e); - break; - case TASKSCHEDULED: - this.handleTaskScheduled(e); - break; - case TASKCOMPLETED: - this.handleTaskCompleted(e); - break; - case TASKFAILED: - this.handleTaskFailed(e); - break; - case TIMERCREATED: - this.handleTimerCreated(e); - break; - case TIMERFIRED: - this.handleTimerFired(e); - break; - case SUBORCHESTRATIONINSTANCECREATED: - this.handleSubOrchestrationCreated(e); - break; - case SUBORCHESTRATIONINSTANCECOMPLETED: - this.handleSubOrchestrationCompleted(e); - break; - case SUBORCHESTRATIONINSTANCEFAILED: - this.handleSubOrchestrationFailed(e); - break; + case EXECUTIONTERMINATED: + this.handleExecutionTerminated(e); + break; + case TASKSCHEDULED: + this.handleTaskScheduled(e); + break; + case TASKCOMPLETED: + this.handleTaskCompleted(e); + break; + case TASKFAILED: + this.handleTaskFailed(e); + break; + case TIMERCREATED: + this.handleTimerCreated(e); + break; + case TIMERFIRED: + this.handleTimerFired(e); + break; + case SUBORCHESTRATIONINSTANCECREATED: + this.handleSubOrchestrationCreated(e); + break; + case SUBORCHESTRATIONINSTANCECOMPLETED: + this.handleSubOrchestrationCompleted(e); + break; + case SUBORCHESTRATIONINSTANCEFAILED: + this.handleSubOrchestrationFailed(e); + break; // case EVENTSENT: // break; - case EVENTRAISED: - this.handleEventRaised(e); - break; + case EVENTRAISED: + this.handleEventRaised(e); + break; // case GENERICEVENT: // break; // case HISTORYSTATE: // break; // case EVENTTYPE_NOT_SET: // break; - default: - throw new IllegalStateException("Don't know how to handle history type " + e.getEventTypeCase()); + case EXECUTIONSUSPENDED: + this.handleExecutionSuspended(e); + break; + case EXECUTIONRESUMED: + this.handleExecutionResumed(e); + break; + default: + throw new IllegalStateException("Don't know how to handle history type " + e.getEventTypeCase()); + } } } diff --git a/submodules/durabletask-protobuf b/submodules/durabletask-protobuf index c78f8553..7d682688 160000 --- a/submodules/durabletask-protobuf +++ b/submodules/durabletask-protobuf @@ -1 +1 @@ -Subproject commit c78f85538566fcde474ddb6d698872d1ea8d4fcf +Subproject commit 7d6826889eb9b104592ab1020c648517a155ba79 From 6b6314f1a7f245a3c784fef3eaba7f7f8097d86e Mon Sep 17 00:00:00 2001 From: kaibocai <89094811+kaibocai@users.noreply.github.com> Date: Fri, 20 Jan 2023 11:31:19 -0600 Subject: [PATCH 3/9] add integration test --- .../durabletask/IntegrationTests.java | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java b/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java index 3996f412..09962c72 100644 --- a/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java +++ b/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java @@ -319,6 +319,28 @@ void termination() throws TimeoutException { } } + @Test + void suspendResumeOrchestration() throws TimeoutException { + final String orchestratorName = "suspendResume"; + final Duration delay = Duration.ofSeconds(3); + + DurableTaskGrpcWorker worker = this.createWorkerBuilder() + .addOrchestrator(orchestratorName, ctx -> ctx.createTimer(delay).await()) + .buildAndStart(); + + DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); + try (worker; client) { + String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName); + String expectReason = "Suspend for testing."; + client.suspendInstance(instanceId, expectReason); + OrchestrationMetadata instance = client.waitForInstanceStart(instanceId, defaultTimeout); + assertNotNull(instance); + assertEquals(instanceId, instance.getInstanceId()); + assertEquals(OrchestrationRuntimeStatus.SUSPENDED, instance.getRuntimeStatus()); + assertEquals(expectReason, instance.getSerializedOutput()); + } + } + @Test void activityFanOut() throws IOException, TimeoutException { final String orchestratorName = "ActivityFanOut"; From 1daa2f6d31aa9cf65db86e16c512192b4d5335bf Mon Sep 17 00:00:00 2001 From: kaibocai <89094811+kaibocai@users.noreply.github.com> Date: Fri, 20 Jan 2023 13:04:15 -0600 Subject: [PATCH 4/9] add integration tests --- .../durabletask/DurableTaskClient.java | 4 +-- .../durabletask/DurableTaskGrpcClient.java | 26 +++++++++-------- .../durabletask/IntegrationTests.java | 29 ++++++++++++++----- 3 files changed, 38 insertions(+), 21 deletions(-) diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java index a0b2ed52..050ef4f7 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java @@ -303,12 +303,12 @@ public void resumeInstance(String instanceId) { * @param instanceId the ID of the orchestration instance to suspend * @param reason the reason for suspending the orchestration instance */ - public abstract void suspendInstance(String instanceId, String reason); + public abstract void suspendInstance(String instanceId, @Nullable String reason); /** * Resumes a running orchestration instance. * @param instanceId the ID of the orchestration instance to resume * @param reason the reason for resuming the orchestration instance */ - public abstract void resumeInstance(String instanceId, String reason); + public abstract void resumeInstance(String instanceId, @Nullable String reason); } \ No newline at end of file diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java index 0ee75568..c8b13de4 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java @@ -281,21 +281,23 @@ public PurgeResult purgeInstances(PurgeInstanceCriteria purgeInstanceCriteria) t } @Override - public void suspendInstance(String instanceId, String reason) { - SuspendRequest suspendRequest = SuspendRequest.newBuilder() - .setInstanceId(instanceId) - .setReason(StringValue.of(reason)) - .build(); - this.sidecarClient.suspendInstance(suspendRequest); + public void suspendInstance(String instanceId, @Nullable String reason) { + SuspendRequest.Builder suspendRequestBuilder = SuspendRequest.newBuilder(); + suspendRequestBuilder.setInstanceId(instanceId); + if (reason != null) { + suspendRequestBuilder.setReason(StringValue.of(reason)); + } + this.sidecarClient.suspendInstance(suspendRequestBuilder.build()); } @Override - public void resumeInstance(String instanceId, String reason) { - ResumeRequest resumeRequest = ResumeRequest.newBuilder() - .setInstanceId(instanceId) - .setReason(StringValue.of(reason)) - .build(); - this.sidecarClient.resumeInstance(resumeRequest); + public void resumeInstance(String instanceId, @Nullable String reason) { + ResumeRequest.Builder resumeRequestBuilder = ResumeRequest.newBuilder(); + resumeRequestBuilder.setInstanceId(instanceId); + if (reason != null) { + resumeRequestBuilder.setReason(StringValue.of(reason)); + } + this.sidecarClient.resumeInstance(resumeRequestBuilder.build()); } private PurgeResult toPurgeResult(PurgeInstancesResponse response){ diff --git a/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java b/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java index 09962c72..2a6d8d9d 100644 --- a/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java +++ b/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java @@ -320,24 +320,39 @@ void termination() throws TimeoutException { } @Test - void suspendResumeOrchestration() throws TimeoutException { - final String orchestratorName = "suspendResume"; - final Duration delay = Duration.ofSeconds(3); + void suspendResumeOrchestrationWithReason() throws TimeoutException, InterruptedException { + final String orchestratorName = "suspendResumeWithReason"; + final String eventName = "MyEvent"; + final String eventPayload = "testPayload"; DurableTaskGrpcWorker worker = this.createWorkerBuilder() - .addOrchestrator(orchestratorName, ctx -> ctx.createTimer(delay).await()) + .addOrchestrator(orchestratorName, ctx -> { + String payload = ctx.waitForExternalEvent(eventName, String.class).await(); + ctx.complete(payload); + }) .buildAndStart(); DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); try (worker; client) { String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName); - String expectReason = "Suspend for testing."; - client.suspendInstance(instanceId, expectReason); + String suspendReason = "Suspend for testing."; + client.suspendInstance(instanceId, suspendReason); OrchestrationMetadata instance = client.waitForInstanceStart(instanceId, defaultTimeout); assertNotNull(instance); assertEquals(instanceId, instance.getInstanceId()); assertEquals(OrchestrationRuntimeStatus.SUSPENDED, instance.getRuntimeStatus()); - assertEquals(expectReason, instance.getSerializedOutput()); + + client.raiseEvent(instanceId, eventName, eventPayload); + client.waitForInstanceStart(instanceId, defaultTimeout); + assertEquals(OrchestrationRuntimeStatus.SUSPENDED, instance.getRuntimeStatus()); + + String resumeReason = "Resume for testing."; + client.resumeInstance(instanceId, resumeReason); + instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true); + assertNotNull(instance); + assertEquals(instanceId, instance.getInstanceId()); + assertEquals(eventPayload, instance.getSerializedOutput()); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); } } From f212e3087c2a6978edebe40e6227fd01d87bfb3b Mon Sep 17 00:00:00 2001 From: kaibocai <89094811+kaibocai@users.noreply.github.com> Date: Fri, 20 Jan 2023 16:38:19 -0600 Subject: [PATCH 5/9] update integratio tests --- .../durabletask/IntegrationTests.java | 39 +++++++++++++++---- 1 file changed, 31 insertions(+), 8 deletions(-) diff --git a/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java b/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java index 2a6d8d9d..28e44a4e 100644 --- a/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java +++ b/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java @@ -320,8 +320,8 @@ void termination() throws TimeoutException { } @Test - void suspendResumeOrchestrationWithReason() throws TimeoutException, InterruptedException { - final String orchestratorName = "suspendResumeWithReason"; + void suspendOrchestration() throws TimeoutException, InterruptedException { + final String orchestratorName = "suspend"; final String eventName = "MyEvent"; final String eventPayload = "testPayload"; @@ -335,23 +335,46 @@ void suspendResumeOrchestrationWithReason() throws TimeoutException, Interrupted DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); try (worker; client) { String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName); - String suspendReason = "Suspend for testing."; - client.suspendInstance(instanceId, suspendReason); + client.suspendInstance(instanceId); OrchestrationMetadata instance = client.waitForInstanceStart(instanceId, defaultTimeout); assertNotNull(instance); - assertEquals(instanceId, instance.getInstanceId()); assertEquals(OrchestrationRuntimeStatus.SUSPENDED, instance.getRuntimeStatus()); client.raiseEvent(instanceId, eventName, eventPayload); - client.waitForInstanceStart(instanceId, defaultTimeout); + instance = client.waitForInstanceStart(instanceId, defaultTimeout); + assertNotNull(instance); + assertEquals(instanceId, instance.getInstanceId()); assertEquals(OrchestrationRuntimeStatus.SUSPENDED, instance.getRuntimeStatus()); + } + } + + @Test + void suspendResumeOrchestration() throws TimeoutException, InterruptedException { + final String orchestratorName = "suspendResume"; + final String eventName = "MyEvent"; + final String eventPayload = "testPayload"; + + DurableTaskGrpcWorker worker = this.createWorkerBuilder() + .addOrchestrator(orchestratorName, ctx -> { + String payload = ctx.waitForExternalEvent(eventName, String.class).await(); + ctx.complete(payload); + }) + .buildAndStart(); + + DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); + try (worker; client) { + String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName); + String suspendReason = "Suspend for testing."; + client.suspendInstance(instanceId, suspendReason); + + client.raiseEvent(instanceId, eventName, eventPayload); String resumeReason = "Resume for testing."; client.resumeInstance(instanceId, resumeReason); - instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true); + OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true); assertNotNull(instance); assertEquals(instanceId, instance.getInstanceId()); - assertEquals(eventPayload, instance.getSerializedOutput()); + assertEquals(eventPayload, instance.readOutputAs(String.class)); assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); } } From 390bd52d233e5233dcba83f9160fa675ecc5330b Mon Sep 17 00:00:00 2001 From: kaibocai <89094811+kaibocai@users.noreply.github.com> Date: Fri, 20 Jan 2023 16:44:22 -0600 Subject: [PATCH 6/9] update CHANGELOG.md --- CHANGELOG.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7da6413c..51a457a6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## v1.1.0 + +### Updates +* Support Suspend and Resume Client APIs ([#104](https://github.com/microsoft/durabletask-java/issues/104)) + + ## v1.0.0 ### New From e3f59c4e75820db0aa10b961b96f2d91f07742db Mon Sep 17 00:00:00 2001 From: kaibocai <89094811+kaibocai@users.noreply.github.com> Date: Wed, 1 Feb 2023 14:16:37 -0600 Subject: [PATCH 7/9] fix potential NPE of terminate method - update unit tests for suspend logics --- .../durabletask/DurableTaskGrpcClient.java | 5 +++- .../durabletask/IntegrationTests.java | 27 ++++++++++++------- 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java index c8b13de4..e078aade 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java @@ -204,7 +204,10 @@ public void terminate(String instanceId, @Nullable Object output) { "Terminating instance %s and setting output to: %s", instanceId, serializeOutput != null ? serializeOutput : "(null)")); - TerminateRequest.Builder builder = TerminateRequest.newBuilder().setInstanceId(instanceId).setOutput(StringValue.of(serializeOutput)); + TerminateRequest.Builder builder = TerminateRequest.newBuilder().setInstanceId(instanceId); + if (serializeOutput != null){ + builder.setOutput(StringValue.of(serializeOutput)); + } this.sidecarClient.terminateInstance(builder.build()); } diff --git a/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java b/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java index 28e44a4e..35448094 100644 --- a/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java +++ b/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java @@ -320,10 +320,11 @@ void termination() throws TimeoutException { } @Test - void suspendOrchestration() throws TimeoutException, InterruptedException { + void suspendResumeOrchestration() throws TimeoutException, InterruptedException { final String orchestratorName = "suspend"; final String eventName = "MyEvent"; final String eventPayload = "testPayload"; + final Duration suspendTimeout = Duration.ofSeconds(5); DurableTaskGrpcWorker worker = this.createWorkerBuilder() .addOrchestrator(orchestratorName, ctx -> { @@ -341,15 +342,25 @@ void suspendOrchestration() throws TimeoutException, InterruptedException { assertEquals(OrchestrationRuntimeStatus.SUSPENDED, instance.getRuntimeStatus()); client.raiseEvent(instanceId, eventName, eventPayload); - instance = client.waitForInstanceStart(instanceId, defaultTimeout); + + assertThrows( + TimeoutException.class, + () -> client.waitForInstanceCompletion(instanceId, suspendTimeout, false), + "Expected to throw TimeoutException, but it didn't" + ); + + String resumeReason = "Resume for testing."; + client.resumeInstance(instanceId, resumeReason); + instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true); assertNotNull(instance); assertEquals(instanceId, instance.getInstanceId()); - assertEquals(OrchestrationRuntimeStatus.SUSPENDED, instance.getRuntimeStatus()); + assertEquals(eventPayload, instance.readOutputAs(String.class)); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); } } @Test - void suspendResumeOrchestration() throws TimeoutException, InterruptedException { + void terminateSuspendOrchestration() throws TimeoutException, InterruptedException { final String orchestratorName = "suspendResume"; final String eventName = "MyEvent"; final String eventPayload = "testPayload"; @@ -369,13 +380,11 @@ void suspendResumeOrchestration() throws TimeoutException, InterruptedException client.raiseEvent(instanceId, eventName, eventPayload); - String resumeReason = "Resume for testing."; - client.resumeInstance(instanceId, resumeReason); - OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true); + client.terminate(instanceId, null); + OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, false); assertNotNull(instance); assertEquals(instanceId, instance.getInstanceId()); - assertEquals(eventPayload, instance.readOutputAs(String.class)); - assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); + assertEquals(OrchestrationRuntimeStatus.TERMINATED, instance.getRuntimeStatus()); } } From 9ece50bc38700e838362f9106b4a696f767a9d20 Mon Sep 17 00:00:00 2001 From: kaibocai <89094811+kaibocai@users.noreply.github.com> Date: Wed, 1 Feb 2023 14:43:35 -0600 Subject: [PATCH 8/9] update release notes - minior refactor unit test --- CHANGELOG.md | 1 + .../test/java/com/microsoft/durabletask/IntegrationTests.java | 3 --- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 51a457a6..8f76bfff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ### Updates * Support Suspend and Resume Client APIs ([#104](https://github.com/microsoft/durabletask-java/issues/104)) +* Fix the potential NPE issue of `DurableTaskClient terminate` method ([#104](https://github.com/microsoft/durabletask-java/issues/104)) ## v1.0.0 diff --git a/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java b/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java index 35448094..68d9ae6f 100644 --- a/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java +++ b/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java @@ -377,9 +377,6 @@ void terminateSuspendOrchestration() throws TimeoutException, InterruptedExcepti String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName); String suspendReason = "Suspend for testing."; client.suspendInstance(instanceId, suspendReason); - - client.raiseEvent(instanceId, eventName, eventPayload); - client.terminate(instanceId, null); OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, false); assertNotNull(instance); From 740f68855b5f001ec329a360781e36cae974610c Mon Sep 17 00:00:00 2001 From: kaibocai <89094811+kaibocai@users.noreply.github.com> Date: Wed, 1 Feb 2023 15:02:46 -0600 Subject: [PATCH 9/9] update sidecar image for testing --- .github/workflows/build-validation.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build-validation.yml b/.github/workflows/build-validation.yml index df63c437..ee566f93 100644 --- a/.github/workflows/build-validation.yml +++ b/.github/workflows/build-validation.yml @@ -42,7 +42,7 @@ jobs: arguments: build # TODO: Move the sidecar into a central image repository - name: Initialize Durable Task Sidecar - run: docker run --name durabletask-sidecar -p 4001:4001 --env 'DURABLETASK_SIDECAR_LOGLEVEL=Debug' -d cgillum/durabletask-sidecar:latest start --backend Emulator + run: docker run --name durabletask-sidecar -p 4001:4001 --env 'DURABLETASK_SIDECAR_LOGLEVEL=Debug' -d kaibocai/durabletask-sidecar:latest start --backend Emulator - name: Integration Tests with Gradle uses: gradle/gradle-build-action@bc3340afc5e3cc44f2321809ac090d731c13c514 with: