Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build-validation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ jobs:
arguments: build
# TODO: Move the sidecar into a central image repository
- name: Initialize Durable Task Sidecar
run: docker run --name durabletask-sidecar -p 4001:4001 --env 'DURABLETASK_SIDECAR_LOGLEVEL=Debug' -d cgillum/durabletask-sidecar:latest start --backend Emulator
run: docker run --name durabletask-sidecar -p 4001:4001 --env 'DURABLETASK_SIDECAR_LOGLEVEL=Debug' -d kaibocai/durabletask-sidecar:latest start --backend Emulator
- name: Integration Tests with Gradle
uses: gradle/gradle-build-action@bc3340afc5e3cc44f2321809ac090d731c13c514
with:
Expand Down
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
## v1.1.0

### Updates
* Support Suspend and Resume Client APIs ([#104](https://github.com/microsoft/durabletask-java/issues/104))
* Fix the potential NPE issue of `DurableTaskClient terminate` method ([#104](https://github.com/microsoft/durabletask-java/issues/104))


## v1.0.0

### New
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,4 +281,34 @@ public abstract OrchestrationMetadata waitForInstanceCompletion(
* @return the result of the purge operation, including the number of purged orchestration instances (0 or 1)
*/
public abstract PurgeResult purgeInstances(PurgeInstanceCriteria purgeInstanceCriteria) throws TimeoutException;

/**
* Suspends a running orchestration instance.
* @param instanceId the ID of the orchestration instance to suspend
*/
public void suspendInstance (String instanceId) {
this.suspendInstance(instanceId, null);
}

/**
* Resumes a running orchestration instance.
* @param instanceId the ID of the orchestration instance to resume
*/
public void resumeInstance(String instanceId) {
this.resumeInstance(instanceId, null);
}

/**
* Suspends a running orchestration instance.
* @param instanceId the ID of the orchestration instance to suspend
* @param reason the reason for suspending the orchestration instance
*/
public abstract void suspendInstance(String instanceId, @Nullable String reason);

/**
* Resumes a running orchestration instance.
* @param instanceId the ID of the orchestration instance to resume
* @param reason the reason for resuming the orchestration instance
*/
public abstract void resumeInstance(String instanceId, @Nullable String reason);
}
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,10 @@ public void terminate(String instanceId, @Nullable Object output) {
"Terminating instance %s and setting output to: %s",
instanceId,
serializeOutput != null ? serializeOutput : "(null)"));
TerminateRequest.Builder builder = TerminateRequest.newBuilder().setInstanceId(instanceId).setOutput(StringValue.of(serializeOutput));
TerminateRequest.Builder builder = TerminateRequest.newBuilder().setInstanceId(instanceId);
if (serializeOutput != null){
builder.setOutput(StringValue.of(serializeOutput));
}
this.sidecarClient.terminateInstance(builder.build());
}

Expand Down Expand Up @@ -280,6 +283,26 @@ public PurgeResult purgeInstances(PurgeInstanceCriteria purgeInstanceCriteria) t
}
}

@Override
public void suspendInstance(String instanceId, @Nullable String reason) {
SuspendRequest.Builder suspendRequestBuilder = SuspendRequest.newBuilder();
suspendRequestBuilder.setInstanceId(instanceId);
if (reason != null) {
suspendRequestBuilder.setReason(StringValue.of(reason));
}
this.sidecarClient.suspendInstance(suspendRequestBuilder.build());
}

@Override
public void resumeInstance(String instanceId, @Nullable String reason) {
ResumeRequest.Builder resumeRequestBuilder = ResumeRequest.newBuilder();
resumeRequestBuilder.setInstanceId(instanceId);
if (reason != null) {
resumeRequestBuilder.setReason(StringValue.of(reason));
}
this.sidecarClient.resumeInstance(resumeRequestBuilder.build());
}

private PurgeResult toPurgeResult(PurgeInstancesResponse response){
return new PurgeResult(response.getDeletedInstanceCount());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,12 @@ public enum OrchestrationRuntimeStatus {
/**
* The orchestration was scheduled but hasn't started running.
*/
PENDING;
PENDING,

/**
* The orchestration is in a suspended state.
*/
SUSPENDED;

static OrchestrationRuntimeStatus fromProtobuf(OrchestrationStatus status) {
switch (status) {
Expand All @@ -64,6 +69,8 @@ static OrchestrationRuntimeStatus fromProtobuf(OrchestrationStatus status) {
return TERMINATED;
case ORCHESTRATION_STATUS_PENDING:
return PENDING;
case ORCHESTRATION_STATUS_SUSPENDED:
return SUSPENDED;
default:
throw new IllegalArgumentException(String.format("Unknown status value: %s", status));
}
Expand All @@ -85,6 +92,8 @@ static OrchestrationStatus toProtobuf(OrchestrationRuntimeStatus status){
return ORCHESTRATION_STATUS_TERMINATED;
case PENDING:
return ORCHESTRATION_STATUS_PENDING;
case SUSPENDED:
return ORCHESTRATION_STATUS_SUSPENDED;
default:
throw new IllegalArgumentException(String.format("Unknown status value: %s", status));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,21 +66,22 @@ private class ContextImplTask implements TaskOrchestrationContext {
private String instanceId;
private Instant currentInstant;
private boolean isComplete;
private boolean isSuspended;
private boolean isReplaying = true;

// LinkedHashMap to maintain insertion order when returning the list of pending actions
private final LinkedHashMap<Integer, OrchestratorAction> pendingActions = new LinkedHashMap<>();
private final HashMap<Integer, TaskRecord<?>> openTasks = new HashMap<>();
private final LinkedHashMap<String, Queue<TaskRecord<?>>> outstandingEvents = new LinkedHashMap<>();
private final LinkedList<HistoryEvent> unprocessedEvents = new LinkedList<>();
private final Queue<HistoryEvent> eventsWhileSuspended = new ArrayDeque<>();
private final DataConverter dataConverter = TaskOrchestrationExecutor.this.dataConverter;
private final Logger logger = TaskOrchestrationExecutor.this.logger;
private final OrchestrationHistoryIterator historyEventPlayer;
private int sequenceNumber;
private boolean continuedAsNew;
private Object continuedAsNewInput;
private boolean preserveUnprocessedEvents;

private Object customStatus;

public ContextImplTask(List<HistoryEvent> pastEvents, List<HistoryEvent> newEvents) {
Expand Down Expand Up @@ -524,6 +525,23 @@ private void handleEventRaised(HistoryEvent e) {
task.complete(result);
}

private void handleEventWhileSuspended (HistoryEvent historyEvent){
if (historyEvent.getEventTypeCase() != HistoryEvent.EventTypeCase.EXECUTIONSUSPENDED) {
eventsWhileSuspended.offer(historyEvent);
}
}

private void handleExecutionSuspended(HistoryEvent historyEvent) {
this.isSuspended = true;
}

private void handleExecutionResumed(HistoryEvent historyEvent) {
this.isSuspended = false;
while (!eventsWhileSuspended.isEmpty()) {
this.processEvent(eventsWhileSuspended.poll());
}
}

public Task<Void> createTimer(Duration duration) {
Helpers.throwIfOrchestratorComplete(this.isComplete);
Helpers.throwIfArgumentNull(duration, "duration");
Expand Down Expand Up @@ -717,75 +735,86 @@ private boolean processNextEvent() {
}

private void processEvent(HistoryEvent e) {
switch (e.getEventTypeCase()) {
case ORCHESTRATORSTARTED:
Instant instant = DataConverter.getInstantFromTimestamp(e.getTimestamp());
this.setCurrentInstant(instant);
break;
case ORCHESTRATORCOMPLETED:
// No action
break;
case EXECUTIONSTARTED:
ExecutionStartedEvent startedEvent = e.getExecutionStarted();
String name = startedEvent.getName();
this.setName(name);
String instanceId = startedEvent.getOrchestrationInstance().getInstanceId();
this.setInstanceId(instanceId);
String input = startedEvent.getInput().getValue();
this.setInput(input);
TaskOrchestrationFactory factory = TaskOrchestrationExecutor.this.orchestrationFactories.get(name);
if (factory == null) {
// Try getting the default orchestrator
factory = TaskOrchestrationExecutor.this.orchestrationFactories.get("*");
}
// TODO: Throw if the factory is null (orchestration by that name doesn't exist)
TaskOrchestration orchestrator = factory.create();
orchestrator.run(this);
break;
boolean overrideSuspension = e.getEventTypeCase() == HistoryEvent.EventTypeCase.EXECUTIONRESUMED || e.getEventTypeCase() == HistoryEvent.EventTypeCase.EXECUTIONTERMINATED;
if (this.isSuspended && !overrideSuspension) {
this.handleEventWhileSuspended(e);
} else {
switch (e.getEventTypeCase()) {
case ORCHESTRATORSTARTED:
Instant instant = DataConverter.getInstantFromTimestamp(e.getTimestamp());
this.setCurrentInstant(instant);
break;
case ORCHESTRATORCOMPLETED:
// No action
break;
case EXECUTIONSTARTED:
ExecutionStartedEvent startedEvent = e.getExecutionStarted();
String name = startedEvent.getName();
this.setName(name);
String instanceId = startedEvent.getOrchestrationInstance().getInstanceId();
this.setInstanceId(instanceId);
String input = startedEvent.getInput().getValue();
this.setInput(input);
TaskOrchestrationFactory factory = TaskOrchestrationExecutor.this.orchestrationFactories.get(name);
if (factory == null) {
// Try getting the default orchestrator
factory = TaskOrchestrationExecutor.this.orchestrationFactories.get("*");
}
// TODO: Throw if the factory is null (orchestration by that name doesn't exist)
TaskOrchestration orchestrator = factory.create();
orchestrator.run(this);
break;
// case EXECUTIONCOMPLETED:
// break;
// case EXECUTIONFAILED:
// break;
case EXECUTIONTERMINATED:
this.handleExecutionTerminated(e);
break;
case TASKSCHEDULED:
this.handleTaskScheduled(e);
break;
case TASKCOMPLETED:
this.handleTaskCompleted(e);
break;
case TASKFAILED:
this.handleTaskFailed(e);
break;
case TIMERCREATED:
this.handleTimerCreated(e);
break;
case TIMERFIRED:
this.handleTimerFired(e);
break;
case SUBORCHESTRATIONINSTANCECREATED:
this.handleSubOrchestrationCreated(e);
break;
case SUBORCHESTRATIONINSTANCECOMPLETED:
this.handleSubOrchestrationCompleted(e);
break;
case SUBORCHESTRATIONINSTANCEFAILED:
this.handleSubOrchestrationFailed(e);
break;
case EXECUTIONTERMINATED:
this.handleExecutionTerminated(e);
break;
case TASKSCHEDULED:
this.handleTaskScheduled(e);
break;
case TASKCOMPLETED:
this.handleTaskCompleted(e);
break;
case TASKFAILED:
this.handleTaskFailed(e);
break;
case TIMERCREATED:
this.handleTimerCreated(e);
break;
case TIMERFIRED:
this.handleTimerFired(e);
break;
case SUBORCHESTRATIONINSTANCECREATED:
this.handleSubOrchestrationCreated(e);
break;
case SUBORCHESTRATIONINSTANCECOMPLETED:
this.handleSubOrchestrationCompleted(e);
break;
case SUBORCHESTRATIONINSTANCEFAILED:
this.handleSubOrchestrationFailed(e);
break;
// case EVENTSENT:
// break;
case EVENTRAISED:
this.handleEventRaised(e);
break;
case EVENTRAISED:
this.handleEventRaised(e);
break;
// case GENERICEVENT:
// break;
// case HISTORYSTATE:
// break;
// case EVENTTYPE_NOT_SET:
// break;
default:
throw new IllegalStateException("Don't know how to handle history type " + e.getEventTypeCase());
case EXECUTIONSUSPENDED:
this.handleExecutionSuspended(e);
break;
case EXECUTIONRESUMED:
this.handleExecutionResumed(e);
break;
default:
throw new IllegalStateException("Don't know how to handle history type " + e.getEventTypeCase());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,72 @@ void termination() throws TimeoutException {
}
}

@Test
void suspendResumeOrchestration() throws TimeoutException, InterruptedException {
final String orchestratorName = "suspend";
final String eventName = "MyEvent";
final String eventPayload = "testPayload";
final Duration suspendTimeout = Duration.ofSeconds(5);

DurableTaskGrpcWorker worker = this.createWorkerBuilder()
.addOrchestrator(orchestratorName, ctx -> {
String payload = ctx.waitForExternalEvent(eventName, String.class).await();
ctx.complete(payload);
})
.buildAndStart();

DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
try (worker; client) {
String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName);
client.suspendInstance(instanceId);
OrchestrationMetadata instance = client.waitForInstanceStart(instanceId, defaultTimeout);
assertNotNull(instance);
assertEquals(OrchestrationRuntimeStatus.SUSPENDED, instance.getRuntimeStatus());

client.raiseEvent(instanceId, eventName, eventPayload);

assertThrows(
TimeoutException.class,
() -> client.waitForInstanceCompletion(instanceId, suspendTimeout, false),
"Expected to throw TimeoutException, but it didn't"
);

String resumeReason = "Resume for testing.";
client.resumeInstance(instanceId, resumeReason);
instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true);
assertNotNull(instance);
assertEquals(instanceId, instance.getInstanceId());
assertEquals(eventPayload, instance.readOutputAs(String.class));
assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus());
}
}

@Test
void terminateSuspendOrchestration() throws TimeoutException, InterruptedException {
final String orchestratorName = "suspendResume";
final String eventName = "MyEvent";
final String eventPayload = "testPayload";

DurableTaskGrpcWorker worker = this.createWorkerBuilder()
.addOrchestrator(orchestratorName, ctx -> {
String payload = ctx.waitForExternalEvent(eventName, String.class).await();
ctx.complete(payload);
})
.buildAndStart();

DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
try (worker; client) {
String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName);
String suspendReason = "Suspend for testing.";
client.suspendInstance(instanceId, suspendReason);
client.terminate(instanceId, null);
OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, false);
assertNotNull(instance);
assertEquals(instanceId, instance.getInstanceId());
assertEquals(OrchestrationRuntimeStatus.TERMINATED, instance.getRuntimeStatus());
}
}

@Test
void activityFanOut() throws IOException, TimeoutException {
final String orchestratorName = "ActivityFanOut";
Expand Down
2 changes: 1 addition & 1 deletion submodules/durabletask-protobuf