Skip to content

Commit a1a4e6a

Browse files
authored
[Fix #803] Fixing suspend/resume with parallel (#815)
Signed-off-by: fjtirado <ftirados@redhat.com>
1 parent 94f41da commit a1a4e6a

File tree

42 files changed

+422
-284
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+422
-284
lines changed

impl/core/src/main/java/io/serverlessworkflow/impl/AbstractExecutorServiceHolder.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,17 @@
1616
package io.serverlessworkflow.impl;
1717

1818
import java.util.concurrent.ExecutorService;
19+
import java.util.concurrent.TimeUnit;
1920

2021
public abstract class AbstractExecutorServiceHolder implements ExecutorServiceFactory {
2122

2223
protected ExecutorService service;
2324

2425
@Override
25-
public void close() {
26+
public void close() throws InterruptedException {
2627
if (service != null && !service.isShutdown()) {
2728
service.shutdown();
29+
service.awaitTermination(2, TimeUnit.SECONDS);
2830
}
2931
}
3032
}

impl/core/src/main/java/io/serverlessworkflow/impl/ExecutorServiceFactory.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,4 @@
1818
import java.util.concurrent.ExecutorService;
1919
import java.util.function.Supplier;
2020

21-
public interface ExecutorServiceFactory extends Supplier<ExecutorService>, AutoCloseable {
22-
void close();
23-
}
21+
public interface ExecutorServiceFactory extends Supplier<ExecutorService>, AutoCloseable {}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java

Lines changed: 38 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,19 @@
1818
import static io.serverlessworkflow.impl.lifecycle.LifecycleEventsUtils.publishEvent;
1919

2020
import io.serverlessworkflow.impl.executors.TaskExecutorHelper;
21-
import io.serverlessworkflow.impl.lifecycle.TaskResumedEvent;
22-
import io.serverlessworkflow.impl.lifecycle.TaskSuspendedEvent;
2321
import io.serverlessworkflow.impl.lifecycle.WorkflowCancelledEvent;
2422
import io.serverlessworkflow.impl.lifecycle.WorkflowCompletedEvent;
2523
import io.serverlessworkflow.impl.lifecycle.WorkflowFailedEvent;
2624
import io.serverlessworkflow.impl.lifecycle.WorkflowResumedEvent;
2725
import io.serverlessworkflow.impl.lifecycle.WorkflowStartedEvent;
2826
import io.serverlessworkflow.impl.lifecycle.WorkflowSuspendedEvent;
2927
import java.time.Instant;
28+
import java.util.Map;
3029
import java.util.Optional;
3130
import java.util.concurrent.CancellationException;
3231
import java.util.concurrent.CompletableFuture;
3332
import java.util.concurrent.CompletionException;
33+
import java.util.concurrent.ConcurrentHashMap;
3434
import java.util.concurrent.atomic.AtomicReference;
3535
import java.util.concurrent.locks.Lock;
3636
import java.util.concurrent.locks.ReentrantLock;
@@ -47,9 +47,7 @@ public class WorkflowMutableInstance implements WorkflowInstance {
4747
private volatile WorkflowModel output;
4848
private Lock statusLock = new ReentrantLock();
4949
private CompletableFuture<WorkflowModel> completableFuture;
50-
private CompletableFuture<TaskContext> suspended;
51-
private TaskContext suspendedTask;
52-
private CompletableFuture<TaskContext> cancelled;
50+
private Map<CompletableFuture<TaskContext>, TaskContext> suspended;
5351

5452
WorkflowMutableInstance(WorkflowDefinition definition, WorkflowModel input) {
5553
this.id = definition.application().idFactory().get();
@@ -88,11 +86,7 @@ private void whenFailed(WorkflowModel result, Throwable ex) {
8886
}
8987

9088
private void handleException(Throwable ex) {
91-
if (ex instanceof CancellationException) {
92-
status.set(WorkflowStatus.CANCELLED);
93-
publishEvent(
94-
workflowContext, l -> l.onWorkflowCancelled(new WorkflowCancelledEvent(workflowContext)));
95-
} else {
89+
if (!(ex instanceof CancellationException)) {
9690
status.set(WorkflowStatus.FAULTED);
9791
publishEvent(
9892
workflowContext, l -> l.onWorkflowFailed(new WorkflowFailedEvent(workflowContext, ex)));
@@ -107,7 +101,7 @@ private WorkflowModel whenSuccess(WorkflowModel node) {
107101
.map(f -> f.apply(workflowContext, null, node))
108102
.orElse(node);
109103
workflowContext.definition().outputSchemaValidator().ifPresent(v -> v.validate(output));
110-
status.compareAndSet(WorkflowStatus.RUNNING, WorkflowStatus.COMPLETED);
104+
status.set(WorkflowStatus.COMPLETED);
111105
publishEvent(
112106
workflowContext, l -> l.onWorkflowCompleted(new WorkflowCompletedEvent(workflowContext)));
113107
return output;
@@ -174,9 +168,9 @@ public String toString() {
174168
public boolean suspend() {
175169
try {
176170
statusLock.lock();
177-
if (TaskExecutorHelper.isActive(status.get())) {
178-
suspended = new CompletableFuture<TaskContext>();
179-
workflowContext.instance().status(WorkflowStatus.SUSPENDED);
171+
if (TaskExecutorHelper.isActive(status.get()) && suspended == null) {
172+
suspended = new ConcurrentHashMap<>();
173+
status.set(WorkflowStatus.SUSPENDED);
180174
publishEvent(
181175
workflowContext,
182176
l -> l.onWorkflowSuspended(new WorkflowSuspendedEvent(workflowContext)));
@@ -193,40 +187,27 @@ public boolean suspend() {
193187
public boolean resume() {
194188
try {
195189
statusLock.lock();
196-
if (suspended != null) {
197-
if (suspendedTask != null) {
198-
CompletableFuture<TaskContext> toBeCompleted = suspended;
199-
suspended = null;
200-
toBeCompleted.complete(suspendedTask);
201-
publishEvent(
202-
workflowContext,
203-
l -> l.onTaskResumed(new TaskResumedEvent(workflowContext, suspendedTask)));
204-
} else {
205-
suspended = null;
206-
}
190+
if (TaskExecutorHelper.isActive(status.get()) && suspended != null) {
207191
publishEvent(
208192
workflowContext, l -> l.onWorkflowResumed(new WorkflowResumedEvent(workflowContext)));
193+
suspended.forEach(
194+
(k, v) -> {
195+
k.complete(v);
196+
});
197+
suspended = null;
209198
return true;
210-
} else {
211-
return false;
212199
}
213200
} finally {
214201
statusLock.unlock();
215202
}
203+
return false;
216204
}
217205

218-
public CompletableFuture<TaskContext> completedChecks(TaskContext t) {
206+
public CompletableFuture<TaskContext> cancelCheck(TaskContext t) {
219207
try {
220208
statusLock.lock();
221-
if (suspended != null) {
222-
suspendedTask = t;
223-
publishEvent(
224-
workflowContext, l -> l.onTaskSuspended(new TaskSuspendedEvent(workflowContext, t)));
225-
return suspended;
226-
}
227-
if (cancelled != null) {
228-
cancelled = new CompletableFuture<TaskContext>();
229-
workflowContext.instance().status(WorkflowStatus.CANCELLED);
209+
if (status.get() == WorkflowStatus.CANCELLED) {
210+
CompletableFuture<TaskContext> cancelled = new CompletableFuture<TaskContext>();
230211
cancelled.completeExceptionally(
231212
new CancellationException("Task " + t.taskName() + " has been cancelled"));
232213
return cancelled;
@@ -237,12 +218,31 @@ public CompletableFuture<TaskContext> completedChecks(TaskContext t) {
237218
return CompletableFuture.completedFuture(t);
238219
}
239220

221+
public CompletableFuture<TaskContext> suspendedCheck(TaskContext t) {
222+
try {
223+
statusLock.lock();
224+
if (suspended != null) {
225+
CompletableFuture<TaskContext> suspendedTask = new CompletableFuture<TaskContext>();
226+
suspended.put(suspendedTask, t);
227+
return suspendedTask;
228+
} else if (TaskExecutorHelper.isActive(status.get())) {
229+
status.set(WorkflowStatus.RUNNING);
230+
}
231+
} finally {
232+
statusLock.unlock();
233+
}
234+
return CompletableFuture.completedFuture(t);
235+
}
236+
240237
@Override
241238
public boolean cancel() {
242239
try {
243240
statusLock.lock();
244241
if (TaskExecutorHelper.isActive(status.get())) {
245-
cancelled = new CompletableFuture<TaskContext>();
242+
status.set(WorkflowStatus.CANCELLED);
243+
publishEvent(
244+
workflowContext,
245+
l -> l.onWorkflowCancelled(new WorkflowCancelledEvent(workflowContext)));
246246
return true;
247247
} else {
248248
return false;

impl/core/src/main/java/io/serverlessworkflow/impl/events/EventConsumer.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,4 @@ public interface EventConsumer<T extends EventRegistration, V extends EventRegis
3131
T register(V builder, Consumer<CloudEvent> consumer);
3232

3333
void unregister(T register);
34-
35-
void close();
3634
}

impl/core/src/main/java/io/serverlessworkflow/impl/events/EventPublisher.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,4 @@
2020

2121
public interface EventPublisher extends AutoCloseable {
2222
CompletableFuture<Void> publish(CloudEvent event);
23-
24-
void close();
2523
}

impl/core/src/main/java/io/serverlessworkflow/impl/events/InMemoryEvents.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ protected void unregisterFromAll() {
8181
}
8282

8383
@Override
84-
public void close() {
84+
public void close() throws Exception {
8585
topicMap.clear();
8686
serviceFactory.close();
8787
}

impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,7 @@ public CompletableFuture<TaskContext> apply(
197197
if (ifFilter.map(f -> f.test(workflowContext, taskContext, input)).orElse(true)) {
198198
return executeNext(
199199
completable
200+
.thenCompose(workflowContext.instance()::suspendedCheck)
200201
.thenApply(
201202
t -> {
202203
publishEvent(
@@ -208,16 +209,14 @@ public CompletableFuture<TaskContext> apply(
208209
return t;
209210
})
210211
.thenCompose(t -> execute(workflowContext, t))
211-
.thenCompose(t -> workflowContext.instance().completedChecks(t))
212+
.thenCompose(workflowContext.instance()::cancelCheck)
212213
.whenComplete(
213214
(t, e) -> {
214215
if (e != null) {
215216
handleException(
216217
workflowContext,
217218
taskContext,
218219
e instanceof CompletionException ? e.getCause() : e);
219-
} else {
220-
workflowContext.instance().status(WorkflowStatus.RUNNING);
221220
}
222221
})
223222
.thenApply(

impl/core/src/main/java/io/serverlessworkflow/impl/executors/TaskExecutorHelper.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,9 @@ public static boolean isActive(WorkflowContext context) {
5454
}
5555

5656
public static boolean isActive(WorkflowStatus status) {
57-
return status == WorkflowStatus.RUNNING || status == WorkflowStatus.WAITING;
57+
return status == WorkflowStatus.RUNNING
58+
|| status == WorkflowStatus.WAITING
59+
|| status == WorkflowStatus.SUSPENDED;
5860
}
5961

6062
public static TaskExecutor<?> createExecutorList(
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.test;
17+
18+
import static io.serverlessworkflow.api.WorkflowReader.readWorkflowFromClasspath;
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import io.serverlessworkflow.api.types.Workflow;
22+
import io.serverlessworkflow.impl.WorkflowApplication;
23+
import io.serverlessworkflow.impl.WorkflowInstance;
24+
import io.serverlessworkflow.impl.WorkflowModel;
25+
import io.serverlessworkflow.impl.WorkflowStatus;
26+
import java.io.IOException;
27+
import java.util.Collection;
28+
import java.util.List;
29+
import java.util.Map;
30+
import java.util.concurrent.CompletableFuture;
31+
import java.util.concurrent.ExecutionException;
32+
import org.junit.jupiter.api.AfterAll;
33+
import org.junit.jupiter.api.BeforeAll;
34+
import org.junit.jupiter.api.Test;
35+
36+
class ForkWaitTest {
37+
38+
private static WorkflowApplication appl;
39+
40+
@BeforeAll
41+
static void init() throws IOException {
42+
appl = WorkflowApplication.builder().build();
43+
}
44+
45+
@AfterAll
46+
static void tearDown() throws IOException {
47+
appl.close();
48+
}
49+
50+
@Test
51+
void testForkWait() throws IOException, InterruptedException, ExecutionException {
52+
assertModel(
53+
appl.workflowDefinition(readWorkflowFromClasspath("workflows-samples/fork-wait.yaml"))
54+
.instance(Map.of())
55+
.start()
56+
.join());
57+
}
58+
59+
@Test
60+
void testForkWaitWithSuspend() throws IOException, InterruptedException {
61+
Workflow workflow = readWorkflowFromClasspath("workflows-samples/fork-wait.yaml");
62+
WorkflowInstance instance = appl.workflowDefinition(workflow).instance(Map.of());
63+
CompletableFuture<WorkflowModel> future = instance.start();
64+
Thread.sleep(50);
65+
assertThat(instance.status()).isEqualTo(WorkflowStatus.WAITING);
66+
instance.suspend();
67+
assertThat(instance.status()).isEqualTo(WorkflowStatus.SUSPENDED);
68+
Thread.sleep(200);
69+
instance.resume();
70+
WorkflowModel model = future.join();
71+
assertThat(instance.status()).isEqualTo(WorkflowStatus.COMPLETED);
72+
assertModel(model);
73+
}
74+
75+
private void assertModel(WorkflowModel current) {
76+
assertThat((Collection<Map<String, Object>>) current.asJavaObject())
77+
.containsExactlyInAnyOrderElementsOf(
78+
List.of(
79+
Map.of("helloBranch", Map.of("value", 1)),
80+
Map.of("byeBranch", Map.of("value", 2))));
81+
}
82+
}

impl/test/src/test/java/io/serverlessworkflow/impl/test/HTTPWorkflowDefinitionTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.Map;
2626
import java.util.stream.Stream;
2727
import org.assertj.core.api.Condition;
28+
import org.junit.jupiter.api.AfterAll;
2829
import org.junit.jupiter.api.BeforeAll;
2930
import org.junit.jupiter.params.ParameterizedTest;
3031
import org.junit.jupiter.params.provider.Arguments;
@@ -40,6 +41,11 @@ static void init() {
4041
appl = WorkflowApplication.builder().build();
4142
}
4243

44+
@AfterAll
45+
static void cleanup() {
46+
appl.close();
47+
}
48+
4349
@ParameterizedTest
4450
@MethodSource("provideParameters")
4551
void testWorkflowExecution(String fileName, Object input, Condition<Object> condition)

0 commit comments

Comments
 (0)