diff --git a/CHANGELOG.md b/CHANGELOG.md index d0cf0bd6..7866f1ac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ * Updated package version to v1.0.0 - to be updated * update DataConverterException with detail error message ([#78](https://github.com/microsoft/durabletask-java/issues/78)) * update OrchestratorBlockedEvent and TaskFailedException to be unchecked exceptions ([#88](https://github.com/microsoft/durabletask-java/issues/88)) +* updated PurgeInstances to take a timeout parameter and throw TimeoutException ([#37](https://github.com/microsoft/durabletask-java/issues/37)) ### Breaking changes diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java index a5cc69cb..597c90fd 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java @@ -262,7 +262,6 @@ public abstract OrchestrationMetadata waitForInstanceCompletion( */ public abstract PurgeResult purgeInstance(String instanceId); - // TODO, https://github.com/microsoft/durabletask-java/issues/37, add a timeout parameter /** * Purges orchestration instance metadata from the durable store using a filter that determines which instances to * purge data for. @@ -277,7 +276,9 @@ public abstract OrchestrationMetadata waitForInstanceCompletion( * into multiple method calls over a period of time and have them cover smaller time windows. * * @param purgeInstanceCriteria orchestration instance filter criteria used to determine which instances to purge + * @throws TimeoutException when purging instances is not completed within the specified amount of time. + * The default timeout for purging instances is 10 minutes * @return the result of the purge operation, including the number of purged orchestration instances (0 or 1) */ - public abstract PurgeResult purgeInstances(PurgeInstanceCriteria purgeInstanceCriteria); + public abstract PurgeResult purgeInstances(PurgeInstanceCriteria purgeInstanceCriteria) throws TimeoutException; } \ No newline at end of file diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java index 43ce6998..78ed2800 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java @@ -252,13 +252,32 @@ public PurgeResult purgeInstance(String instanceId) { } @Override - public PurgeResult purgeInstances(PurgeInstanceCriteria purgeInstanceCriteria) { + public PurgeResult purgeInstances(PurgeInstanceCriteria purgeInstanceCriteria) throws TimeoutException { PurgeInstanceFilter.Builder builder = PurgeInstanceFilter.newBuilder(); builder.setCreatedTimeFrom(DataConverter.getTimestampFromInstant(purgeInstanceCriteria.getCreatedTimeFrom())); Optional.ofNullable(purgeInstanceCriteria.getCreatedTimeTo()).ifPresent(createdTimeTo -> builder.setCreatedTimeTo(DataConverter.getTimestampFromInstant(createdTimeTo))); purgeInstanceCriteria.getRuntimeStatusList().forEach(runtimeStatus -> Optional.ofNullable(runtimeStatus).ifPresent(status -> builder.addRuntimeStatus(OrchestrationRuntimeStatus.toProtobuf(status)))); - PurgeInstancesResponse response = this.sidecarClient.purgeInstances(PurgeInstancesRequest.newBuilder().setPurgeInstanceFilter(builder).build()); - return toPurgeResult(response); + + Duration timeout = purgeInstanceCriteria.getTimeout(); + if (timeout == null || timeout.isNegative() || timeout.isZero()) { + timeout = Duration.ofMinutes(4); + } + + TaskHubSidecarServiceBlockingStub grpcClient = this.sidecarClient.withDeadlineAfter( + timeout.toMillis(), + TimeUnit.MILLISECONDS); + + PurgeInstancesResponse response; + try { + response = grpcClient.purgeInstances(PurgeInstancesRequest.newBuilder().setPurgeInstanceFilter(builder).build()); + return toPurgeResult(response); + } catch (StatusRuntimeException e) { + if (e.getStatus().getCode() == Status.Code.DEADLINE_EXCEEDED) { + String timeOutException = String.format("Purge instances timeout duration of %s reached.", timeout); + throw new TimeoutException(timeOutException); + } + throw e; + } } private PurgeResult toPurgeResult(PurgeInstancesResponse response){ diff --git a/client/src/main/java/com/microsoft/durabletask/PurgeInstanceCriteria.java b/client/src/main/java/com/microsoft/durabletask/PurgeInstanceCriteria.java index df078722..8a5f2695 100644 --- a/client/src/main/java/com/microsoft/durabletask/PurgeInstanceCriteria.java +++ b/client/src/main/java/com/microsoft/durabletask/PurgeInstanceCriteria.java @@ -3,6 +3,7 @@ package com.microsoft.durabletask; import javax.annotation.Nullable; +import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.List; @@ -15,6 +16,7 @@ public final class PurgeInstanceCriteria { private Instant createdTimeFrom; private Instant createdTimeTo; private List runtimeStatusList = new ArrayList<>(); + private Duration timeout; /** * Creates a new, default instance of the {@code PurgeInstanceCriteria} class. @@ -58,6 +60,17 @@ public PurgeInstanceCriteria setRuntimeStatusList(List getRuntimeStatusList() { return this.runtimeStatusList; } + + /** + * Gets the configured timeout duration or {@code null} if none was configured. + * @return the configured timeout + */ + @Nullable + public Duration getTimeout() { + return this.timeout; + } + } \ No newline at end of file diff --git a/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java b/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java index a6c9a483..3996f412 100644 --- a/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java +++ b/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java @@ -790,6 +790,63 @@ void purgeInstanceFilter() throws TimeoutException { } } + @Test + void purgeInstanceFilterTimeout() throws TimeoutException { + final String orchestratorName = "PurgeInstance"; + final String plusOne = "PlusOne"; + final String plusTwo = "PlusTwo"; + + DurableTaskGrpcWorker worker = this.createWorkerBuilder() + .addOrchestrator(orchestratorName, ctx -> { + int value = ctx.getInput(int.class); + value = ctx.callActivity(plusOne, value, int.class).await(); + ctx.complete(value); + }) + .addActivity(plusOne, ctx -> ctx.getInput(int.class) + 1) + .addOrchestrator(plusOne, ctx -> { + int value = ctx.getInput(int.class); + value = ctx.callActivity(plusOne, value, int.class).await(); + ctx.complete(value); + }) + .addOrchestrator(plusTwo, ctx -> { + int value = ctx.getInput(int.class); + value = ctx.callActivity(plusTwo, value, int.class).await(); + ctx.complete(value); + }) + .addActivity(plusTwo, ctx -> ctx.getInput(int.class) + 2) + .buildAndStart(); + + DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); + try (worker; client) { + client.createTaskHub(true); + Instant startTime = Instant.now(); + + String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName, 0); + OrchestrationMetadata metadata = client.waitForInstanceCompletion(instanceId, defaultTimeout, true); + assertNotNull(metadata); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, metadata.getRuntimeStatus()); + assertEquals(1, metadata.readOutputAs(int.class)); + + String instanceId1 = client.scheduleNewOrchestrationInstance(plusOne, 0); + metadata = client.waitForInstanceCompletion(instanceId1, defaultTimeout, true); + assertNotNull(metadata); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, metadata.getRuntimeStatus()); + assertEquals(1, metadata.readOutputAs(int.class)); + + String instanceId2 = client.scheduleNewOrchestrationInstance(plusTwo, 10); + metadata = client.waitForInstanceCompletion(instanceId2, defaultTimeout, true); + assertNotNull(metadata); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, metadata.getRuntimeStatus()); + assertEquals(12, metadata.readOutputAs(int.class)); + + PurgeInstanceCriteria criteria = new PurgeInstanceCriteria(); + criteria.setCreatedTimeFrom(startTime); + criteria.setTimeout(Duration.ofNanos(1)); + + assertThrows(TimeoutException.class, () -> client.purgeInstances(criteria)); + } + } + @Test() void waitForInstanceStartThrowsException() { final String orchestratorName = "orchestratorName";