diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/LifecycleEventsUtils.java b/impl/core/src/main/java/io/serverlessworkflow/impl/LifecycleEventsUtils.java similarity index 51% rename from impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/LifecycleEventsUtils.java rename to impl/core/src/main/java/io/serverlessworkflow/impl/LifecycleEventsUtils.java index 220ec5568..0cb97c1c1 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/LifecycleEventsUtils.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/LifecycleEventsUtils.java @@ -13,9 +13,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.serverlessworkflow.impl.lifecycle; +package io.serverlessworkflow.impl; -import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionCompletableListener; +import java.util.Collection; import java.util.concurrent.CompletableFuture; import java.util.function.Function; import org.slf4j.Logger; @@ -30,17 +31,25 @@ private LifecycleEventsUtils() {} public static CompletableFuture publishEvent( WorkflowContext workflowContext, Function> function) { - return CompletableFuture.allOf( - workflowContext.definition().application().listeners().stream() - .map( - v -> - function - .apply(v) - .exceptionally( - ex -> { - logger.error("Error while executing listener", ex); - return null; - })) - .toArray(CompletableFuture[]::new)); + CompletableFuture result = CompletableFuture.completedFuture(null); + for (Collection listeners : + workflowContext.definition().application().listenersByPriority()) { + result = + result.thenCompose( + __ -> + CompletableFuture.allOf( + listeners.stream() + .map( + v -> + function + .apply(v) + .exceptionally( + ex -> { + logger.error("Error while executing listener", ex); + return null; + })) + .toArray(CompletableFuture[]::new))); + } + return result; } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java b/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java index 25d2f2736..4c256f7e1 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java @@ -38,6 +38,7 @@ public class TaskContext implements TaskContextData { private Instant completedAt; private TransitionInfo transition; private short retryAttempt; + private int iteration; private AuthorizationDescriptor authorization; public TaskContext( @@ -73,11 +74,6 @@ private TaskContext( parentContext.map(p -> new HashMap<>(p.contextVariables)).orElseGet(HashMap::new); } - public TaskContext copy() { - return new TaskContext( - rawInput, parentContext, taskName, task, position, startedAt, input, output, rawOutput); - } - public void input(WorkflowModel input) { this.input = input; this.rawOutput = input; @@ -174,6 +170,7 @@ public boolean isCompleted() { return completedAt != null; } + @Override public short retryAttempt() { return retryAttempt; } @@ -186,6 +183,15 @@ public boolean isRetrying() { return retryAttempt > 0; } + @Override + public int iteration() { + return iteration; + } + + public void iteration(int iteration) { + this.iteration = iteration; + } + @Override public String toString() { return "TaskContext [position=" diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContextData.java b/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContextData.java index cbe70fc55..2e735f63c 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContextData.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContextData.java @@ -37,4 +37,8 @@ public interface TaskContextData { String taskName(); Instant completedAt(); + + int iteration(); + + short retryAttempt(); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java index 5139d6fb8..703e0397f 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java @@ -51,6 +51,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -69,7 +70,7 @@ public class WorkflowApplication implements AutoCloseable { private final ResourceLoaderFactory resourceLoaderFactory; private final SchemaValidatorFactory schemaValidatorFactory; private final WorkflowInstanceIdFactory idFactory; - private final Collection listeners; + private final List> listenersByPriority; private final Map definitions; private final WorkflowPositionFactory positionFactory; private final ExecutorServiceFactory executorFactory; @@ -99,7 +100,7 @@ private WorkflowApplication(Builder builder) { this.idFactory = builder.idFactory; this.runtimeDescriptorFactory = builder.descriptorFactory; this.executorFactory = builder.executorFactory; - this.listeners = new LinkedHashSet<>(builder.listeners); + this.listenersByPriority = groupByPriority(new LinkedHashSet<>(builder.listeners)); this.definitions = new ConcurrentHashMap<>(); this.eventConsumer = builder.eventConsumer; this.eventPublishers = builder.eventPublishers; @@ -140,7 +141,7 @@ public ResourceLoaderFactory resourceLoaderFactory() { } public Collection listeners() { - return listeners; + return this.listenersByPriority.stream().flatMap(x -> x.stream()).toList(); } public Collection eventPublishers() { @@ -151,6 +152,36 @@ public WorkflowInstanceIdFactory idFactory() { return idFactory; } + List> listenersByPriority() { + return listenersByPriority; + } + + private static List> groupByPriority( + Collection listeners) { + if (listeners.isEmpty()) { + return List.of(); + } + List> result = new ArrayList<>(); + Iterator iter = listeners.iterator(); + List currentList = new ArrayList<>(); + WorkflowExecutionCompletableListener currentListener = iter.next(); + int currentPriority = currentListener.priority(); + currentList.add(currentListener); + while (iter.hasNext()) { + currentListener = iter.next(); + if (currentListener.priority() != currentPriority) { + result.add(currentList); + currentList = new ArrayList<>(); + currentPriority = currentListener.priority(); + } + currentList.add(currentListener); + } + if (!currentList.isEmpty()) { + result.add(currentList); + } + return result; + } + public static class Builder { private static final class EmptySchemaValidatorHolder { @@ -423,10 +454,15 @@ public void close() { } definitions.clear(); - for (WorkflowExecutionCompletableListener listener : listeners) { - safeClose(listener); + if (!listenersByPriority.isEmpty()) { + for (Collection listeners : listenersByPriority) { + for (WorkflowExecutionCompletableListener listener : listeners) { + safeClose(listener); + } + listeners.clear(); + } + listenersByPriority.clear(); } - listeners.clear(); } public WorkflowPositionFactory positionFactory() { diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java index cfd380523..8f67e83c2 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java @@ -15,7 +15,7 @@ */ package io.serverlessworkflow.impl; -import static io.serverlessworkflow.impl.lifecycle.LifecycleEventsUtils.publishEvent; +import static io.serverlessworkflow.impl.LifecycleEventsUtils.publishEvent; import io.serverlessworkflow.impl.executors.TaskExecutorHelper; import io.serverlessworkflow.impl.lifecycle.WorkflowCancelledEvent; @@ -53,6 +53,8 @@ public class WorkflowMutableInstance implements WorkflowInstance { protected final Map additionalObjects = new ConcurrentHashMap<>(); + protected final Map iterationsMap = new ConcurrentHashMap<>(); + private Lock statusLock = new ReentrantLock(); private Map, TaskContext> suspended; @@ -164,6 +166,10 @@ public WorkflowModel input() { return input; } + public int incIteration(WorkflowPosition position) { + return iterationsMap.compute(position.jsonPointer(), (k, v) -> v == null ? 1 : v + 1); + } + @Override public WorkflowStatus status() { return status.get(); diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java index cd8590521..591b3f503 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java @@ -15,9 +15,9 @@ */ package io.serverlessworkflow.impl.executors; +import static io.serverlessworkflow.impl.LifecycleEventsUtils.publishEvent; import static io.serverlessworkflow.impl.WorkflowUtils.buildWorkflowFilter; import static io.serverlessworkflow.impl.WorkflowUtils.getSchemaValidator; -import static io.serverlessworkflow.impl.lifecycle.LifecycleEventsUtils.publishEvent; import io.serverlessworkflow.api.types.Export; import io.serverlessworkflow.api.types.FlowDirective; @@ -210,6 +210,7 @@ public CompletableFuture apply( } else if (taskContext.isCompleted()) { return executeNext(completable, workflowContext); } else if (ifFilter.map(f -> f.test(workflowContext, taskContext, input)).orElse(true)) { + taskContext.iteration(workflowContext.instance().incIteration(position)); completable = completable .thenCompose(workflowContext.instance()::suspendedCheck) diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/CompletedTaskInfo.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/CompletedTaskInfo.java index c08ef7ec0..b4eb9760a 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/CompletedTaskInfo.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/CompletedTaskInfo.java @@ -23,5 +23,16 @@ public record CompletedTaskInfo( WorkflowModel model, WorkflowModel context, Boolean isEndNode, - String nextPosition) - implements PersistenceTaskInfo {} + String nextPosition, + int iteration) + implements PersistenceTaskInfo { + + public CompletedTaskInfo( + Instant instant, + WorkflowModel model, + WorkflowModel context, + Boolean isEndNode, + String nextPosition) { + this(instant, model, context, isEndNode, nextPosition, 1); + } +} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java index 47bc65bc3..dc8737b51 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java @@ -38,6 +38,13 @@ public static WorkflowInstance of(WorkflowDefinition definition, PersistenceWork private WorkflowPersistenceInstance(WorkflowDefinition definition, PersistenceWorkflowInfo info) { super(definition, info.id(), info.input()); this.info = info; + info.tasks() + .forEach( + (k, v) -> { + if (v instanceof CompletedTaskInfo task) { + iterationsMap.put(k, task.iteration()); + } + }); this.startedAt = info.startedAt(); } @@ -54,7 +61,13 @@ public CompletableFuture start() { @Override public void restoreContext(WorkflowContext workflow, TaskContext context) { + if (info.tasks().isEmpty()) { + return; + } PersistenceTaskInfo taskInfo = info.tasks().remove(context.position().jsonPointer()); + if (taskInfo == null) { + return; + } if (taskInfo instanceof CompletedTaskInfo completedTaskInfo) { context.output(completedTaskInfo.model()); context.completedAt(completedTaskInfo.instant()); diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceTransaction.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceTransaction.java index 2df8086cf..3435550a8 100644 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceTransaction.java +++ b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceTransaction.java @@ -39,6 +39,7 @@ public abstract class BytesMapInstanceTransaction private static final byte VERSION_0 = 0; private static final byte VERSION_1 = 1; + private static final byte VERSION_2 = 2; private final WorkflowBufferFactory factory; @@ -50,7 +51,7 @@ protected BytesMapInstanceTransaction(WorkflowBufferFactory factory) { protected byte[] marshallTaskCompleted(WorkflowContextData contextData, TaskContext taskContext) { ByteArrayOutputStream bytes = new ByteArrayOutputStream(); try (WorkflowOutputBuffer writer = factory.output(bytes)) { - writer.writeByte(VERSION_1); + writer.writeByte(VERSION_2); writer.writeEnum(TaskStatus.COMPLETED); writer.writeInstant(taskContext.completedAt()); writeModel(writer, taskContext.output()); @@ -64,6 +65,7 @@ protected byte[] marshallTaskCompleted(WorkflowContextData contextData, TaskCont writer.writeBoolean(true); writer.writeString(next.position().jsonPointer()); } + writer.writeInt(taskContext.iteration()); } return bytes.toByteArray(); } @@ -119,23 +121,42 @@ protected PersistenceTaskInfo unmarshallTaskInfo(byte[] taskData) { byte version = buffer.readByte(); switch (version) { case VERSION_0: - default: return readVersion0(buffer); case VERSION_1: return readVersion1(buffer); + case VERSION_2: + return readVersion2(buffer); } + throw new UnsupportedOperationException("Unknown version " + version); + } + } + + private PersistenceTaskInfo readVersion2(WorkflowInputBuffer buffer) { + TaskStatus taskStatus = buffer.readEnum(TaskStatus.class); + switch (taskStatus) { + case COMPLETED: + return new CompletedTaskInfo( + buffer.readInstant(), + (WorkflowModel) buffer.readObject(), + (WorkflowModel) buffer.readObject(), + buffer.readBoolean(), + buffer.readBoolean() ? buffer.readString() : null, + buffer.readInt()); + case RETRIED: + return new RetriedTaskInfo(buffer.readShort()); } + throw new UnsupportedOperationException("Unknown status " + taskStatus); } private PersistenceTaskInfo readVersion1(WorkflowInputBuffer buffer) { TaskStatus taskStatus = buffer.readEnum(TaskStatus.class); switch (taskStatus) { case COMPLETED: - default: return readVersion0(buffer); case RETRIED: return new RetriedTaskInfo(buffer.readShort()); } + throw new UnsupportedOperationException("Unknown status " + taskStatus); } private PersistenceTaskInfo readVersion0(WorkflowInputBuffer buffer) { diff --git a/impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractHandlerPersistenceTest.java b/impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractHandlerPersistenceTest.java index 05baba827..7e120e735 100644 --- a/impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractHandlerPersistenceTest.java +++ b/impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractHandlerPersistenceTest.java @@ -89,6 +89,7 @@ protected TaskContextData completedTaskContext( when(taskContext.completedAt()).thenReturn(Instant.now()); when(taskContext.output()).thenReturn(app.modelFactory().from(model)); when(taskContext.transition()).thenReturn(new TransitionInfo(null, true)); + when(taskContext.iteration()).thenReturn(2); return taskContext; } @@ -172,6 +173,7 @@ void testWorkflowInstance() throws InterruptedException { ArgumentCaptor transition = ArgumentCaptor.forClass(TransitionInfo.class); verify(updateTContext).transition(transition.capture()); assertThat(transition.getValue().isEndNode()).isTrue(); + assertThat(instance.incIteration(position2)).isEqualTo(3); // workflow completed handlers.writer().completed(workflowContext).join(); diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/ScheduleEventConsumerTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/ScheduleEventConsumerTest.java index 30736f1f3..a28f7a895 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/ScheduleEventConsumerTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/ScheduleEventConsumerTest.java @@ -62,7 +62,7 @@ void testAllEvent() throws IOException, InterruptedException, ExecutionException appl.eventPublishers().forEach(p -> p.publish(buildCloudEvent(Map.of("name", "Fulanito")))); await() .pollDelay(Duration.ofMillis(20)) - .atMost(Duration.ofMillis(600)) + .atMost(Duration.ofMillis(980)) .until( () -> instances.stream().filter(i -> i.status() == WorkflowStatus.COMPLETED).count() @@ -80,7 +80,7 @@ void testOneEvent() throws IOException, InterruptedException, ExecutionException Collection instances = definition.scheduledInstances(); await() .pollDelay(Duration.ofMillis(20)) - .atMost(Duration.ofMillis(600)) + .atMost(Duration.ofMillis(980)) .until( () -> instances.stream().filter(i -> i.status() == WorkflowStatus.COMPLETED).count()