Skip to content

Commit 3f97f85

Browse files
committed
Revert "Implement suspend and resume client APIs (#104)"
This reverts commit c303a12.
1 parent 6ccb073 commit 3f97f85

File tree

7 files changed

+61
-219
lines changed

7 files changed

+61
-219
lines changed

.github/workflows/build-validation.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,4 +95,4 @@ jobs:
9595
uses: actions/upload-artifact@v2
9696
with:
9797
name: Integration test report
98-
path: client/build/reports/tests/endToEndTest
98+
path: client/build/reports/tests/endToEndTest

CHANGELOG.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
## v1.1.0
22

33
### Updates
4-
* Support Suspend and Resume Client APIs ([#104](https://github.com/microsoft/durabletask-java/issues/104))
54
* Fix the potential NPE issue of `DurableTaskClient terminate` method ([#104](https://github.com/microsoft/durabletask-java/issues/104))
65
* Add waitForCompletionOrCreateCheckStatusResponse client API ([#115](https://github.com/microsoft/durabletask-java/pull/115))
76
* Support long timers by breaking up into smaller timers ([#114](https://github.com/microsoft/durabletask-java/issues/114))

client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -281,34 +281,4 @@ public abstract OrchestrationMetadata waitForInstanceCompletion(
281281
* @return the result of the purge operation, including the number of purged orchestration instances (0 or 1)
282282
*/
283283
public abstract PurgeResult purgeInstances(PurgeInstanceCriteria purgeInstanceCriteria) throws TimeoutException;
284-
285-
/**
286-
* Suspends a running orchestration instance.
287-
* @param instanceId the ID of the orchestration instance to suspend
288-
*/
289-
public void suspendInstance (String instanceId) {
290-
this.suspendInstance(instanceId, null);
291-
}
292-
293-
/**
294-
* Resumes a running orchestration instance.
295-
* @param instanceId the ID of the orchestration instance to resume
296-
*/
297-
public void resumeInstance(String instanceId) {
298-
this.resumeInstance(instanceId, null);
299-
}
300-
301-
/**
302-
* Suspends a running orchestration instance.
303-
* @param instanceId the ID of the orchestration instance to suspend
304-
* @param reason the reason for suspending the orchestration instance
305-
*/
306-
public abstract void suspendInstance(String instanceId, @Nullable String reason);
307-
308-
/**
309-
* Resumes a running orchestration instance.
310-
* @param instanceId the ID of the orchestration instance to resume
311-
* @param reason the reason for resuming the orchestration instance
312-
*/
313-
public abstract void resumeInstance(String instanceId, @Nullable String reason);
314284
}

client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -204,10 +204,7 @@ public void terminate(String instanceId, @Nullable Object output) {
204204
"Terminating instance %s and setting output to: %s",
205205
instanceId,
206206
serializeOutput != null ? serializeOutput : "(null)"));
207-
TerminateRequest.Builder builder = TerminateRequest.newBuilder().setInstanceId(instanceId);
208-
if (serializeOutput != null){
209-
builder.setOutput(StringValue.of(serializeOutput));
210-
}
207+
TerminateRequest.Builder builder = TerminateRequest.newBuilder().setInstanceId(instanceId).setOutput(StringValue.of(serializeOutput));
211208
this.sidecarClient.terminateInstance(builder.build());
212209
}
213210

@@ -283,26 +280,6 @@ public PurgeResult purgeInstances(PurgeInstanceCriteria purgeInstanceCriteria) t
283280
}
284281
}
285282

286-
@Override
287-
public void suspendInstance(String instanceId, @Nullable String reason) {
288-
SuspendRequest.Builder suspendRequestBuilder = SuspendRequest.newBuilder();
289-
suspendRequestBuilder.setInstanceId(instanceId);
290-
if (reason != null) {
291-
suspendRequestBuilder.setReason(StringValue.of(reason));
292-
}
293-
this.sidecarClient.suspendInstance(suspendRequestBuilder.build());
294-
}
295-
296-
@Override
297-
public void resumeInstance(String instanceId, @Nullable String reason) {
298-
ResumeRequest.Builder resumeRequestBuilder = ResumeRequest.newBuilder();
299-
resumeRequestBuilder.setInstanceId(instanceId);
300-
if (reason != null) {
301-
resumeRequestBuilder.setReason(StringValue.of(reason));
302-
}
303-
this.sidecarClient.resumeInstance(resumeRequestBuilder.build());
304-
}
305-
306283
private PurgeResult toPurgeResult(PurgeInstancesResponse response){
307284
return new PurgeResult(response.getDeletedInstanceCount());
308285
}

client/src/main/java/com/microsoft/durabletask/OrchestrationRuntimeStatus.java

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,7 @@ public enum OrchestrationRuntimeStatus {
4646
/**
4747
* The orchestration was scheduled but hasn't started running.
4848
*/
49-
PENDING,
50-
51-
/**
52-
* The orchestration is in a suspended state.
53-
*/
54-
SUSPENDED;
49+
PENDING;
5550

5651
static OrchestrationRuntimeStatus fromProtobuf(OrchestrationStatus status) {
5752
switch (status) {
@@ -69,8 +64,6 @@ static OrchestrationRuntimeStatus fromProtobuf(OrchestrationStatus status) {
6964
return TERMINATED;
7065
case ORCHESTRATION_STATUS_PENDING:
7166
return PENDING;
72-
case ORCHESTRATION_STATUS_SUSPENDED:
73-
return SUSPENDED;
7467
default:
7568
throw new IllegalArgumentException(String.format("Unknown status value: %s", status));
7669
}
@@ -92,8 +85,6 @@ static OrchestrationStatus toProtobuf(OrchestrationRuntimeStatus status){
9285
return ORCHESTRATION_STATUS_TERMINATED;
9386
case PENDING:
9487
return ORCHESTRATION_STATUS_PENDING;
95-
case SUSPENDED:
96-
return ORCHESTRATION_STATUS_SUSPENDED;
9788
default:
9889
throw new IllegalArgumentException(String.format("Unknown status value: %s", status));
9990
}

client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java

Lines changed: 58 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -70,15 +70,13 @@ private class ContextImplTask implements TaskOrchestrationContext {
7070
private String instanceId;
7171
private Instant currentInstant;
7272
private boolean isComplete;
73-
private boolean isSuspended;
7473
private boolean isReplaying = true;
7574

7675
// LinkedHashMap to maintain insertion order when returning the list of pending actions
7776
private final LinkedHashMap<Integer, OrchestratorAction> pendingActions = new LinkedHashMap<>();
7877
private final HashMap<Integer, TaskRecord<?>> openTasks = new HashMap<>();
7978
private final LinkedHashMap<String, Queue<TaskRecord<?>>> outstandingEvents = new LinkedHashMap<>();
8079
private final LinkedList<HistoryEvent> unprocessedEvents = new LinkedList<>();
81-
private final Queue<HistoryEvent> eventsWhileSuspended = new ArrayDeque<>();
8280
private final DataConverter dataConverter = TaskOrchestrationExecutor.this.dataConverter;
8381
private final Duration maximumTimerInterval = TaskOrchestrationExecutor.this.maximumTimerInterval;
8482
private final Logger logger = TaskOrchestrationExecutor.this.logger;
@@ -87,6 +85,7 @@ private class ContextImplTask implements TaskOrchestrationContext {
8785
private boolean continuedAsNew;
8886
private Object continuedAsNewInput;
8987
private boolean preserveUnprocessedEvents;
88+
9089
private Object customStatus;
9190

9291
public ContextImplTask(List<HistoryEvent> pastEvents, List<HistoryEvent> newEvents) {
@@ -530,23 +529,6 @@ private void handleEventRaised(HistoryEvent e) {
530529
task.complete(result);
531530
}
532531

533-
private void handleEventWhileSuspended (HistoryEvent historyEvent){
534-
if (historyEvent.getEventTypeCase() != HistoryEvent.EventTypeCase.EXECUTIONSUSPENDED) {
535-
eventsWhileSuspended.offer(historyEvent);
536-
}
537-
}
538-
539-
private void handleExecutionSuspended(HistoryEvent historyEvent) {
540-
this.isSuspended = true;
541-
}
542-
543-
private void handleExecutionResumed(HistoryEvent historyEvent) {
544-
this.isSuspended = false;
545-
while (!eventsWhileSuspended.isEmpty()) {
546-
this.processEvent(eventsWhileSuspended.poll());
547-
}
548-
}
549-
550532
public Task<Void> createTimer(Duration duration) {
551533
Helpers.throwIfOrchestratorComplete(this.isComplete);
552534
Helpers.throwIfArgumentNull(duration, "duration");
@@ -762,86 +744,75 @@ private boolean processNextEvent() {
762744
}
763745

764746
private void processEvent(HistoryEvent e) {
765-
boolean overrideSuspension = e.getEventTypeCase() == HistoryEvent.EventTypeCase.EXECUTIONRESUMED || e.getEventTypeCase() == HistoryEvent.EventTypeCase.EXECUTIONTERMINATED;
766-
if (this.isSuspended && !overrideSuspension) {
767-
this.handleEventWhileSuspended(e);
768-
} else {
769-
switch (e.getEventTypeCase()) {
770-
case ORCHESTRATORSTARTED:
771-
Instant instant = DataConverter.getInstantFromTimestamp(e.getTimestamp());
772-
this.setCurrentInstant(instant);
773-
break;
774-
case ORCHESTRATORCOMPLETED:
775-
// No action
776-
break;
777-
case EXECUTIONSTARTED:
778-
ExecutionStartedEvent startedEvent = e.getExecutionStarted();
779-
String name = startedEvent.getName();
780-
this.setName(name);
781-
String instanceId = startedEvent.getOrchestrationInstance().getInstanceId();
782-
this.setInstanceId(instanceId);
783-
String input = startedEvent.getInput().getValue();
784-
this.setInput(input);
785-
TaskOrchestrationFactory factory = TaskOrchestrationExecutor.this.orchestrationFactories.get(name);
786-
if (factory == null) {
787-
// Try getting the default orchestrator
788-
factory = TaskOrchestrationExecutor.this.orchestrationFactories.get("*");
789-
}
790-
// TODO: Throw if the factory is null (orchestration by that name doesn't exist)
791-
TaskOrchestration orchestrator = factory.create();
792-
orchestrator.run(this);
793-
break;
747+
switch (e.getEventTypeCase()) {
748+
case ORCHESTRATORSTARTED:
749+
Instant instant = DataConverter.getInstantFromTimestamp(e.getTimestamp());
750+
this.setCurrentInstant(instant);
751+
break;
752+
case ORCHESTRATORCOMPLETED:
753+
// No action
754+
break;
755+
case EXECUTIONSTARTED:
756+
ExecutionStartedEvent startedEvent = e.getExecutionStarted();
757+
String name = startedEvent.getName();
758+
this.setName(name);
759+
String instanceId = startedEvent.getOrchestrationInstance().getInstanceId();
760+
this.setInstanceId(instanceId);
761+
String input = startedEvent.getInput().getValue();
762+
this.setInput(input);
763+
TaskOrchestrationFactory factory = TaskOrchestrationExecutor.this.orchestrationFactories.get(name);
764+
if (factory == null) {
765+
// Try getting the default orchestrator
766+
factory = TaskOrchestrationExecutor.this.orchestrationFactories.get("*");
767+
}
768+
// TODO: Throw if the factory is null (orchestration by that name doesn't exist)
769+
TaskOrchestration orchestrator = factory.create();
770+
orchestrator.run(this);
771+
break;
794772
// case EXECUTIONCOMPLETED:
795773
// break;
796774
// case EXECUTIONFAILED:
797775
// break;
798-
case EXECUTIONTERMINATED:
799-
this.handleExecutionTerminated(e);
800-
break;
801-
case TASKSCHEDULED:
802-
this.handleTaskScheduled(e);
803-
break;
804-
case TASKCOMPLETED:
805-
this.handleTaskCompleted(e);
806-
break;
807-
case TASKFAILED:
808-
this.handleTaskFailed(e);
809-
break;
810-
case TIMERCREATED:
811-
this.handleTimerCreated(e);
812-
break;
813-
case TIMERFIRED:
814-
this.handleTimerFired(e);
815-
break;
816-
case SUBORCHESTRATIONINSTANCECREATED:
817-
this.handleSubOrchestrationCreated(e);
818-
break;
819-
case SUBORCHESTRATIONINSTANCECOMPLETED:
820-
this.handleSubOrchestrationCompleted(e);
821-
break;
822-
case SUBORCHESTRATIONINSTANCEFAILED:
823-
this.handleSubOrchestrationFailed(e);
824-
break;
776+
case EXECUTIONTERMINATED:
777+
this.handleExecutionTerminated(e);
778+
break;
779+
case TASKSCHEDULED:
780+
this.handleTaskScheduled(e);
781+
break;
782+
case TASKCOMPLETED:
783+
this.handleTaskCompleted(e);
784+
break;
785+
case TASKFAILED:
786+
this.handleTaskFailed(e);
787+
break;
788+
case TIMERCREATED:
789+
this.handleTimerCreated(e);
790+
break;
791+
case TIMERFIRED:
792+
this.handleTimerFired(e);
793+
break;
794+
case SUBORCHESTRATIONINSTANCECREATED:
795+
this.handleSubOrchestrationCreated(e);
796+
break;
797+
case SUBORCHESTRATIONINSTANCECOMPLETED:
798+
this.handleSubOrchestrationCompleted(e);
799+
break;
800+
case SUBORCHESTRATIONINSTANCEFAILED:
801+
this.handleSubOrchestrationFailed(e);
802+
break;
825803
// case EVENTSENT:
826804
// break;
827-
case EVENTRAISED:
828-
this.handleEventRaised(e);
829-
break;
805+
case EVENTRAISED:
806+
this.handleEventRaised(e);
807+
break;
830808
// case GENERICEVENT:
831809
// break;
832810
// case HISTORYSTATE:
833811
// break;
834812
// case EVENTTYPE_NOT_SET:
835813
// break;
836-
case EXECUTIONSUSPENDED:
837-
this.handleExecutionSuspended(e);
838-
break;
839-
case EXECUTIONRESUMED:
840-
this.handleExecutionResumed(e);
841-
break;
842-
default:
843-
throw new IllegalStateException("Don't know how to handle history type " + e.getEventTypeCase());
844-
}
814+
default:
815+
throw new IllegalStateException("Don't know how to handle history type " + e.getEventTypeCase());
845816
}
846817
}
847818

client/src/test/java/com/microsoft/durabletask/IntegrationTests.java

Lines changed: 0 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -408,72 +408,6 @@ void termination() throws TimeoutException {
408408
}
409409
}
410410

411-
@Test
412-
void suspendResumeOrchestration() throws TimeoutException, InterruptedException {
413-
final String orchestratorName = "suspend";
414-
final String eventName = "MyEvent";
415-
final String eventPayload = "testPayload";
416-
final Duration suspendTimeout = Duration.ofSeconds(5);
417-
418-
DurableTaskGrpcWorker worker = this.createWorkerBuilder()
419-
.addOrchestrator(orchestratorName, ctx -> {
420-
String payload = ctx.waitForExternalEvent(eventName, String.class).await();
421-
ctx.complete(payload);
422-
})
423-
.buildAndStart();
424-
425-
DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
426-
try (worker; client) {
427-
String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName);
428-
client.suspendInstance(instanceId);
429-
OrchestrationMetadata instance = client.waitForInstanceStart(instanceId, defaultTimeout);
430-
assertNotNull(instance);
431-
assertEquals(OrchestrationRuntimeStatus.SUSPENDED, instance.getRuntimeStatus());
432-
433-
client.raiseEvent(instanceId, eventName, eventPayload);
434-
435-
assertThrows(
436-
TimeoutException.class,
437-
() -> client.waitForInstanceCompletion(instanceId, suspendTimeout, false),
438-
"Expected to throw TimeoutException, but it didn't"
439-
);
440-
441-
String resumeReason = "Resume for testing.";
442-
client.resumeInstance(instanceId, resumeReason);
443-
instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true);
444-
assertNotNull(instance);
445-
assertEquals(instanceId, instance.getInstanceId());
446-
assertEquals(eventPayload, instance.readOutputAs(String.class));
447-
assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus());
448-
}
449-
}
450-
451-
@Test
452-
void terminateSuspendOrchestration() throws TimeoutException, InterruptedException {
453-
final String orchestratorName = "suspendResume";
454-
final String eventName = "MyEvent";
455-
final String eventPayload = "testPayload";
456-
457-
DurableTaskGrpcWorker worker = this.createWorkerBuilder()
458-
.addOrchestrator(orchestratorName, ctx -> {
459-
String payload = ctx.waitForExternalEvent(eventName, String.class).await();
460-
ctx.complete(payload);
461-
})
462-
.buildAndStart();
463-
464-
DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
465-
try (worker; client) {
466-
String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName);
467-
String suspendReason = "Suspend for testing.";
468-
client.suspendInstance(instanceId, suspendReason);
469-
client.terminate(instanceId, null);
470-
OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, false);
471-
assertNotNull(instance);
472-
assertEquals(instanceId, instance.getInstanceId());
473-
assertEquals(OrchestrationRuntimeStatus.TERMINATED, instance.getRuntimeStatus());
474-
}
475-
}
476-
477411
@Test
478412
void activityFanOut() throws IOException, TimeoutException {
479413
final String orchestratorName = "ActivityFanOut";

0 commit comments

Comments
 (0)