From 3bc8db7b9c5a5568bbf472607d06e1f9235caa26 Mon Sep 17 00:00:00 2001 From: Shreyas Gopalakrishna Date: Wed, 12 Oct 2022 15:50:29 -0500 Subject: [PATCH 1/3] Updated PurgeInstances to throw TimeoutException --- .../durabletask/DurableTaskClient.java | 5 +- .../durabletask/DurableTaskGrpcClient.java | 24 +++++++- .../durabletask/PurgeInstanceCriteria.java | 23 ++++++++ .../durabletask/IntegrationTests.java | 57 +++++++++++++++++++ 4 files changed, 104 insertions(+), 5 deletions(-) diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java index a5cc69cb..597c90fd 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java @@ -262,7 +262,6 @@ public abstract OrchestrationMetadata waitForInstanceCompletion( */ public abstract PurgeResult purgeInstance(String instanceId); - // TODO, https://github.com/microsoft/durabletask-java/issues/37, add a timeout parameter /** * Purges orchestration instance metadata from the durable store using a filter that determines which instances to * purge data for. @@ -277,7 +276,9 @@ public abstract OrchestrationMetadata waitForInstanceCompletion( * into multiple method calls over a period of time and have them cover smaller time windows. * * @param purgeInstanceCriteria orchestration instance filter criteria used to determine which instances to purge + * @throws TimeoutException when purging instances is not completed within the specified amount of time. + * The default timeout for purging instances is 10 minutes * @return the result of the purge operation, including the number of purged orchestration instances (0 or 1) */ - public abstract PurgeResult purgeInstances(PurgeInstanceCriteria purgeInstanceCriteria); + public abstract PurgeResult purgeInstances(PurgeInstanceCriteria purgeInstanceCriteria) throws TimeoutException; } \ No newline at end of file diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java index 43ce6998..9dd5b8bb 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java @@ -252,13 +252,31 @@ public PurgeResult purgeInstance(String instanceId) { } @Override - public PurgeResult purgeInstances(PurgeInstanceCriteria purgeInstanceCriteria) { + public PurgeResult purgeInstances(PurgeInstanceCriteria purgeInstanceCriteria) throws TimeoutException { PurgeInstanceFilter.Builder builder = PurgeInstanceFilter.newBuilder(); builder.setCreatedTimeFrom(DataConverter.getTimestampFromInstant(purgeInstanceCriteria.getCreatedTimeFrom())); Optional.ofNullable(purgeInstanceCriteria.getCreatedTimeTo()).ifPresent(createdTimeTo -> builder.setCreatedTimeTo(DataConverter.getTimestampFromInstant(createdTimeTo))); purgeInstanceCriteria.getRuntimeStatusList().forEach(runtimeStatus -> Optional.ofNullable(runtimeStatus).ifPresent(status -> builder.addRuntimeStatus(OrchestrationRuntimeStatus.toProtobuf(status)))); - PurgeInstancesResponse response = this.sidecarClient.purgeInstances(PurgeInstancesRequest.newBuilder().setPurgeInstanceFilter(builder).build()); - return toPurgeResult(response); + + Duration timeout = purgeInstanceCriteria.getTimeout(); + if (timeout == null || timeout.isNegative() || timeout.isZero()) { + timeout = Duration.ofMinutes(10); + } + + TaskHubSidecarServiceBlockingStub grpcClient = this.sidecarClient.withDeadlineAfter( + timeout.toMillis(), + TimeUnit.MILLISECONDS); + + PurgeInstancesResponse response; + try { + response = grpcClient.purgeInstances(PurgeInstancesRequest.newBuilder().setPurgeInstanceFilter(builder).build()); + return toPurgeResult(response); + } catch (StatusRuntimeException e) { + if(e.getStatus().getCode() == Status.Code.DEADLINE_EXCEEDED){ + throw new TimeoutException("Purge instances completion timeout reached."); + } + throw e; + } } private PurgeResult toPurgeResult(PurgeInstancesResponse response){ diff --git a/client/src/main/java/com/microsoft/durabletask/PurgeInstanceCriteria.java b/client/src/main/java/com/microsoft/durabletask/PurgeInstanceCriteria.java index df078722..2f24e915 100644 --- a/client/src/main/java/com/microsoft/durabletask/PurgeInstanceCriteria.java +++ b/client/src/main/java/com/microsoft/durabletask/PurgeInstanceCriteria.java @@ -3,6 +3,7 @@ package com.microsoft.durabletask; import javax.annotation.Nullable; +import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.List; @@ -15,6 +16,7 @@ public final class PurgeInstanceCriteria { private Instant createdTimeFrom; private Instant createdTimeTo; private List runtimeStatusList = new ArrayList<>(); + private Duration timeout; /** * Creates a new, default instance of the {@code PurgeInstanceCriteria} class. @@ -58,6 +60,17 @@ public PurgeInstanceCriteria setRuntimeStatusList(List getRuntimeStatusList() { return this.runtimeStatusList; } + + /** + * Gets the configured timeout duration or {@code null} if none was configured. + * @return the configured timeout + */ + @Nullable + public Duration getTimeout() { + return this.timeout; + } + } \ No newline at end of file diff --git a/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java b/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java index a6c9a483..3996f412 100644 --- a/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java +++ b/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java @@ -790,6 +790,63 @@ void purgeInstanceFilter() throws TimeoutException { } } + @Test + void purgeInstanceFilterTimeout() throws TimeoutException { + final String orchestratorName = "PurgeInstance"; + final String plusOne = "PlusOne"; + final String plusTwo = "PlusTwo"; + + DurableTaskGrpcWorker worker = this.createWorkerBuilder() + .addOrchestrator(orchestratorName, ctx -> { + int value = ctx.getInput(int.class); + value = ctx.callActivity(plusOne, value, int.class).await(); + ctx.complete(value); + }) + .addActivity(plusOne, ctx -> ctx.getInput(int.class) + 1) + .addOrchestrator(plusOne, ctx -> { + int value = ctx.getInput(int.class); + value = ctx.callActivity(plusOne, value, int.class).await(); + ctx.complete(value); + }) + .addOrchestrator(plusTwo, ctx -> { + int value = ctx.getInput(int.class); + value = ctx.callActivity(plusTwo, value, int.class).await(); + ctx.complete(value); + }) + .addActivity(plusTwo, ctx -> ctx.getInput(int.class) + 2) + .buildAndStart(); + + DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); + try (worker; client) { + client.createTaskHub(true); + Instant startTime = Instant.now(); + + String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName, 0); + OrchestrationMetadata metadata = client.waitForInstanceCompletion(instanceId, defaultTimeout, true); + assertNotNull(metadata); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, metadata.getRuntimeStatus()); + assertEquals(1, metadata.readOutputAs(int.class)); + + String instanceId1 = client.scheduleNewOrchestrationInstance(plusOne, 0); + metadata = client.waitForInstanceCompletion(instanceId1, defaultTimeout, true); + assertNotNull(metadata); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, metadata.getRuntimeStatus()); + assertEquals(1, metadata.readOutputAs(int.class)); + + String instanceId2 = client.scheduleNewOrchestrationInstance(plusTwo, 10); + metadata = client.waitForInstanceCompletion(instanceId2, defaultTimeout, true); + assertNotNull(metadata); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, metadata.getRuntimeStatus()); + assertEquals(12, metadata.readOutputAs(int.class)); + + PurgeInstanceCriteria criteria = new PurgeInstanceCriteria(); + criteria.setCreatedTimeFrom(startTime); + criteria.setTimeout(Duration.ofNanos(1)); + + assertThrows(TimeoutException.class, () -> client.purgeInstances(criteria)); + } + } + @Test() void waitForInstanceStartThrowsException() { final String orchestratorName = "orchestratorName"; From bf524f96791f0aa9c30da3afa3aee518614c8741 Mon Sep 17 00:00:00 2001 From: Shreyas Gopalakrishna Date: Wed, 12 Oct 2022 16:21:39 -0500 Subject: [PATCH 2/3] Updated changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index d0cf0bd6..7866f1ac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ * Updated package version to v1.0.0 - to be updated * update DataConverterException with detail error message ([#78](https://github.com/microsoft/durabletask-java/issues/78)) * update OrchestratorBlockedEvent and TaskFailedException to be unchecked exceptions ([#88](https://github.com/microsoft/durabletask-java/issues/88)) +* updated PurgeInstances to take a timeout parameter and throw TimeoutException ([#37](https://github.com/microsoft/durabletask-java/issues/37)) ### Breaking changes From 5520244399ba65924a8d5e392cee4a22d809e147 Mon Sep 17 00:00:00 2001 From: Shreyas Gopalakrishna Date: Wed, 12 Oct 2022 17:43:05 -0500 Subject: [PATCH 3/3] Updated timeout and exception message --- .../com/microsoft/durabletask/DurableTaskGrpcClient.java | 7 ++++--- .../com/microsoft/durabletask/PurgeInstanceCriteria.java | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java index 9dd5b8bb..78ed2800 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java @@ -260,7 +260,7 @@ public PurgeResult purgeInstances(PurgeInstanceCriteria purgeInstanceCriteria) t Duration timeout = purgeInstanceCriteria.getTimeout(); if (timeout == null || timeout.isNegative() || timeout.isZero()) { - timeout = Duration.ofMinutes(10); + timeout = Duration.ofMinutes(4); } TaskHubSidecarServiceBlockingStub grpcClient = this.sidecarClient.withDeadlineAfter( @@ -272,8 +272,9 @@ public PurgeResult purgeInstances(PurgeInstanceCriteria purgeInstanceCriteria) t response = grpcClient.purgeInstances(PurgeInstancesRequest.newBuilder().setPurgeInstanceFilter(builder).build()); return toPurgeResult(response); } catch (StatusRuntimeException e) { - if(e.getStatus().getCode() == Status.Code.DEADLINE_EXCEEDED){ - throw new TimeoutException("Purge instances completion timeout reached."); + if (e.getStatus().getCode() == Status.Code.DEADLINE_EXCEEDED) { + String timeOutException = String.format("Purge instances timeout duration of %s reached.", timeout); + throw new TimeoutException(timeOutException); } throw e; } diff --git a/client/src/main/java/com/microsoft/durabletask/PurgeInstanceCriteria.java b/client/src/main/java/com/microsoft/durabletask/PurgeInstanceCriteria.java index 2f24e915..8a5f2695 100644 --- a/client/src/main/java/com/microsoft/durabletask/PurgeInstanceCriteria.java +++ b/client/src/main/java/com/microsoft/durabletask/PurgeInstanceCriteria.java @@ -61,7 +61,7 @@ public PurgeInstanceCriteria setRuntimeStatusList(List