Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* Updated package version to v1.0.0 - to be updated
* update DataConverterException with detail error message ([#78](https://github.com/microsoft/durabletask-java/issues/78))
* update OrchestratorBlockedEvent and TaskFailedException to be unchecked exceptions ([#88](https://github.com/microsoft/durabletask-java/issues/88))
* updated PurgeInstances to take a timeout parameter and throw TimeoutException ([#37](https://github.com/microsoft/durabletask-java/issues/37))

### Breaking changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,6 @@ public abstract OrchestrationMetadata waitForInstanceCompletion(
*/
public abstract PurgeResult purgeInstance(String instanceId);

// TODO, https://github.com/microsoft/durabletask-java/issues/37, add a timeout parameter
/**
* Purges orchestration instance metadata from the durable store using a filter that determines which instances to
* purge data for.
Expand All @@ -277,7 +276,9 @@ public abstract OrchestrationMetadata waitForInstanceCompletion(
* into multiple method calls over a period of time and have them cover smaller time windows.
*
* @param purgeInstanceCriteria orchestration instance filter criteria used to determine which instances to purge
* @throws TimeoutException when purging instances is not completed within the specified amount of time.
* The default timeout for purging instances is 10 minutes
* @return the result of the purge operation, including the number of purged orchestration instances (0 or 1)
*/
public abstract PurgeResult purgeInstances(PurgeInstanceCriteria purgeInstanceCriteria);
public abstract PurgeResult purgeInstances(PurgeInstanceCriteria purgeInstanceCriteria) throws TimeoutException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -252,13 +252,32 @@ public PurgeResult purgeInstance(String instanceId) {
}

@Override
public PurgeResult purgeInstances(PurgeInstanceCriteria purgeInstanceCriteria) {
public PurgeResult purgeInstances(PurgeInstanceCriteria purgeInstanceCriteria) throws TimeoutException {
PurgeInstanceFilter.Builder builder = PurgeInstanceFilter.newBuilder();
builder.setCreatedTimeFrom(DataConverter.getTimestampFromInstant(purgeInstanceCriteria.getCreatedTimeFrom()));
Optional.ofNullable(purgeInstanceCriteria.getCreatedTimeTo()).ifPresent(createdTimeTo -> builder.setCreatedTimeTo(DataConverter.getTimestampFromInstant(createdTimeTo)));
purgeInstanceCriteria.getRuntimeStatusList().forEach(runtimeStatus -> Optional.ofNullable(runtimeStatus).ifPresent(status -> builder.addRuntimeStatus(OrchestrationRuntimeStatus.toProtobuf(status))));
PurgeInstancesResponse response = this.sidecarClient.purgeInstances(PurgeInstancesRequest.newBuilder().setPurgeInstanceFilter(builder).build());
return toPurgeResult(response);

Duration timeout = purgeInstanceCriteria.getTimeout();
if (timeout == null || timeout.isNegative() || timeout.isZero()) {
timeout = Duration.ofMinutes(4);
}

TaskHubSidecarServiceBlockingStub grpcClient = this.sidecarClient.withDeadlineAfter(
timeout.toMillis(),
TimeUnit.MILLISECONDS);

PurgeInstancesResponse response;
try {
response = grpcClient.purgeInstances(PurgeInstancesRequest.newBuilder().setPurgeInstanceFilter(builder).build());
return toPurgeResult(response);
} catch (StatusRuntimeException e) {
if (e.getStatus().getCode() == Status.Code.DEADLINE_EXCEEDED) {
String timeOutException = String.format("Purge instances timeout duration of %s reached.", timeout);
throw new TimeoutException(timeOutException);
}
throw e;
}
}

private PurgeResult toPurgeResult(PurgeInstancesResponse response){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package com.microsoft.durabletask;

import javax.annotation.Nullable;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -15,6 +16,7 @@ public final class PurgeInstanceCriteria {
private Instant createdTimeFrom;
private Instant createdTimeTo;
private List<OrchestrationRuntimeStatus> runtimeStatusList = new ArrayList<>();
private Duration timeout;

/**
* Creates a new, default instance of the {@code PurgeInstanceCriteria} class.
Expand Down Expand Up @@ -58,6 +60,17 @@ public PurgeInstanceCriteria setRuntimeStatusList(List<OrchestrationRuntimeStatu
return this;
}

/**
* Sets a timeout duration for the purge operation. Setting to {@code null} will reset the timeout to be the default value.
*
* @param timeout the amount of time to wait for the purge instance operation to complete
* @return this criteria object
*/
public PurgeInstanceCriteria setTimeout(Duration timeout) {
this.timeout = timeout;
return this;
}

/**
* Gets the configured minimum orchestration creation time or {@code null} if none was configured.
* @return the configured minimum orchestration creation time or {@code null} if none was configured
Expand All @@ -83,4 +96,14 @@ public Instant getCreatedTimeTo() {
public List<OrchestrationRuntimeStatus> getRuntimeStatusList() {
return this.runtimeStatusList;
}

/**
* Gets the configured timeout duration or {@code null} if none was configured.
* @return the configured timeout
*/
@Nullable
public Duration getTimeout() {
return this.timeout;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,63 @@ void purgeInstanceFilter() throws TimeoutException {
}
}

@Test
void purgeInstanceFilterTimeout() throws TimeoutException {
final String orchestratorName = "PurgeInstance";
final String plusOne = "PlusOne";
final String plusTwo = "PlusTwo";

DurableTaskGrpcWorker worker = this.createWorkerBuilder()
.addOrchestrator(orchestratorName, ctx -> {
int value = ctx.getInput(int.class);
value = ctx.callActivity(plusOne, value, int.class).await();
ctx.complete(value);
})
.addActivity(plusOne, ctx -> ctx.getInput(int.class) + 1)
.addOrchestrator(plusOne, ctx -> {
int value = ctx.getInput(int.class);
value = ctx.callActivity(plusOne, value, int.class).await();
ctx.complete(value);
})
.addOrchestrator(plusTwo, ctx -> {
int value = ctx.getInput(int.class);
value = ctx.callActivity(plusTwo, value, int.class).await();
ctx.complete(value);
})
.addActivity(plusTwo, ctx -> ctx.getInput(int.class) + 2)
.buildAndStart();

DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
try (worker; client) {
client.createTaskHub(true);
Instant startTime = Instant.now();

String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName, 0);
OrchestrationMetadata metadata = client.waitForInstanceCompletion(instanceId, defaultTimeout, true);
assertNotNull(metadata);
assertEquals(OrchestrationRuntimeStatus.COMPLETED, metadata.getRuntimeStatus());
assertEquals(1, metadata.readOutputAs(int.class));

String instanceId1 = client.scheduleNewOrchestrationInstance(plusOne, 0);
metadata = client.waitForInstanceCompletion(instanceId1, defaultTimeout, true);
assertNotNull(metadata);
assertEquals(OrchestrationRuntimeStatus.COMPLETED, metadata.getRuntimeStatus());
assertEquals(1, metadata.readOutputAs(int.class));

String instanceId2 = client.scheduleNewOrchestrationInstance(plusTwo, 10);
metadata = client.waitForInstanceCompletion(instanceId2, defaultTimeout, true);
assertNotNull(metadata);
assertEquals(OrchestrationRuntimeStatus.COMPLETED, metadata.getRuntimeStatus());
assertEquals(12, metadata.readOutputAs(int.class));

PurgeInstanceCriteria criteria = new PurgeInstanceCriteria();
criteria.setCreatedTimeFrom(startTime);
criteria.setTimeout(Duration.ofNanos(1));

assertThrows(TimeoutException.class, () -> client.purgeInstances(criteria));
}
}

@Test()
void waitForInstanceStartThrowsException() {
final String orchestratorName = "orchestratorName";
Expand Down