From e49918797c703063951aa1c1f0336c8cff7064eb Mon Sep 17 00:00:00 2001 From: fjtirado Date: Thu, 23 Apr 2026 15:38:24 +0200 Subject: [PATCH 1/2] [Fix #1329] Adding iterations to TaskContext Signed-off-by: fjtirado --- .../serverlessworkflow/impl/TaskContext.java | 41 +++++++++++++++++-- .../impl/TaskContextData.java | 4 ++ .../impl/WorkflowDefinition.java | 2 + .../impl/executors/AbstractTaskExecutor.java | 14 ++++++- .../impl/persistence/CompletedTaskInfo.java | 15 ++++++- .../WorkflowPersistenceInstance.java | 1 + .../bigmap/BytesMapInstanceTransaction.java | 23 ++++++++++- .../test/AbstractHandlerPersistenceTest.java | 4 ++ .../impl/test/ScheduleEventConsumerTest.java | 4 +- 9 files changed, 98 insertions(+), 10 deletions(-) diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java b/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java index 25d2f2736..cb5d61c0e 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java @@ -31,6 +31,7 @@ public class TaskContext implements TaskContextData { private final String taskName; private final Map contextVariables; private final Optional parentContext; + private int iteration; private WorkflowModel input; private WorkflowModel output; @@ -45,8 +46,19 @@ public TaskContext( WorkflowPosition position, Optional parentContext, String taskName, - TaskBase task) { - this(input, parentContext, taskName, task, position, Instant.now(), input, input, input); + TaskBase task, + int iterations) { + this( + input, + parentContext, + taskName, + task, + position, + Instant.now(), + input, + input, + input, + iterations); } private TaskContext( @@ -58,7 +70,8 @@ private TaskContext( Instant startedAt, WorkflowModel input, WorkflowModel output, - WorkflowModel rawOutput) { + WorkflowModel rawOutput, + int iterations) { this.rawInput = rawInput; this.parentContext = parentContext; this.taskName = taskName; @@ -71,11 +84,21 @@ private TaskContext( this.retryAttempt = parentContext.map(TaskContext::retryAttempt).orElse((short) 0); this.contextVariables = parentContext.map(p -> new HashMap<>(p.contextVariables)).orElseGet(HashMap::new); + this.iteration = iterations; } public TaskContext copy() { return new TaskContext( - rawInput, parentContext, taskName, task, position, startedAt, input, output, rawOutput); + rawInput, + parentContext, + taskName, + task, + position, + startedAt, + input, + output, + rawOutput, + iteration); } public void input(WorkflowModel input) { @@ -174,6 +197,7 @@ public boolean isCompleted() { return completedAt != null; } + @Override public short retryAttempt() { return retryAttempt; } @@ -186,6 +210,15 @@ public boolean isRetrying() { return retryAttempt > 0; } + @Override + public int iteration() { + return iteration; + } + + public void iteration(int iteration) { + this.iteration = iteration; + } + @Override public String toString() { return "TaskContext [position=" diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContextData.java b/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContextData.java index cbe70fc55..2e735f63c 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContextData.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContextData.java @@ -37,4 +37,8 @@ public interface TaskContextData { String taskName(); Instant completedAt(); + + int iteration(); + + short retryAttempt(); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java index 770f41367..525ac5ed1 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java @@ -24,6 +24,7 @@ import io.serverlessworkflow.api.types.Schedule; import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.impl.events.EventRegistrationBuilderInfo; +import io.serverlessworkflow.impl.executors.AbstractTaskExecutor; import io.serverlessworkflow.impl.executors.TaskExecutor; import io.serverlessworkflow.impl.executors.TaskExecutorHelper; import io.serverlessworkflow.impl.resources.ResourceLoader; @@ -181,6 +182,7 @@ public void addScheduledInstance(WorkflowInstance workflowInstance) { void removeInstance(WorkflowInstance instance) { activeInstances.remove(instance.id()); + executors.forEach((k, v) -> ((AbstractTaskExecutor) v).onInstanceCompleted(instance.id())); } void addInstance(WorkflowInstance instance) { diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java index cd8590521..5aec62cf9 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java @@ -54,6 +54,7 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; public abstract class AbstractTaskExecutor implements TaskExecutor { @@ -61,6 +62,9 @@ public abstract class AbstractTaskExecutor implements TaskEx protected final T task; protected final String taskName; protected final WorkflowPosition position; + + private Map iterationsMap = new ConcurrentHashMap(); + private final Optional inputProcessor; private final Optional outputProcessor; private final Optional contextProcessor; @@ -202,8 +206,12 @@ private CompletableFuture executeNext( @Override public CompletableFuture apply( WorkflowContext workflowContext, Optional parentContext, WorkflowModel input) { - TaskContext taskContext = new TaskContext(input, position, parentContext, taskName, task); + String id = workflowContext.instanceData().id(); + TaskContext taskContext = + new TaskContext( + input, position, parentContext, taskName, task, iterationsMap.getOrDefault(id, 0) + 1); workflowContext.instance().restoreContext(workflowContext, taskContext); + iterationsMap.put(id, taskContext.iteration()); CompletableFuture completable = CompletableFuture.completedFuture(taskContext); if (!TaskExecutorHelper.isActive(workflowContext)) { return completable; @@ -310,6 +318,10 @@ public WorkflowPosition position() { return position; } + public void onInstanceCompleted(String instanceId) { + iterationsMap.remove(instanceId); + } + protected abstract TransitionInfo getSkipTransition(); protected abstract CompletableFuture execute( diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/CompletedTaskInfo.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/CompletedTaskInfo.java index c08ef7ec0..b4eb9760a 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/CompletedTaskInfo.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/CompletedTaskInfo.java @@ -23,5 +23,16 @@ public record CompletedTaskInfo( WorkflowModel model, WorkflowModel context, Boolean isEndNode, - String nextPosition) - implements PersistenceTaskInfo {} + String nextPosition, + int iteration) + implements PersistenceTaskInfo { + + public CompletedTaskInfo( + Instant instant, + WorkflowModel model, + WorkflowModel context, + Boolean isEndNode, + String nextPosition) { + this(instant, model, context, isEndNode, nextPosition, 1); + } +} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java index 47bc65bc3..4b248ea4e 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java @@ -64,6 +64,7 @@ public void restoreContext(WorkflowContext workflow, TaskContext context) { ? null : workflow.definition().taskExecutor(completedTaskInfo.nextPosition()), completedTaskInfo.isEndNode())); + context.iteration(completedTaskInfo.iteration()); workflow.context(completedTaskInfo.context()); } else if (taskInfo instanceof RetriedTaskInfo retriedTaskInfo) { if (context.retryAttempt() == 0) { diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceTransaction.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceTransaction.java index 2df8086cf..b1cdd994b 100644 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceTransaction.java +++ b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceTransaction.java @@ -39,6 +39,7 @@ public abstract class BytesMapInstanceTransaction private static final byte VERSION_0 = 0; private static final byte VERSION_1 = 1; + private static final byte VERSION_2 = 2; private final WorkflowBufferFactory factory; @@ -50,7 +51,7 @@ protected BytesMapInstanceTransaction(WorkflowBufferFactory factory) { protected byte[] marshallTaskCompleted(WorkflowContextData contextData, TaskContext taskContext) { ByteArrayOutputStream bytes = new ByteArrayOutputStream(); try (WorkflowOutputBuffer writer = factory.output(bytes)) { - writer.writeByte(VERSION_1); + writer.writeByte(VERSION_2); writer.writeEnum(TaskStatus.COMPLETED); writer.writeInstant(taskContext.completedAt()); writeModel(writer, taskContext.output()); @@ -64,6 +65,7 @@ protected byte[] marshallTaskCompleted(WorkflowContextData contextData, TaskCont writer.writeBoolean(true); writer.writeString(next.position().jsonPointer()); } + writer.writeInt(taskContext.iteration()); } return bytes.toByteArray(); } @@ -123,10 +125,29 @@ protected PersistenceTaskInfo unmarshallTaskInfo(byte[] taskData) { return readVersion0(buffer); case VERSION_1: return readVersion1(buffer); + case VERSION_2: + return readVersion2(buffer); } } } + private PersistenceTaskInfo readVersion2(WorkflowInputBuffer buffer) { + TaskStatus taskStatus = buffer.readEnum(TaskStatus.class); + switch (taskStatus) { + case COMPLETED: + default: + return new CompletedTaskInfo( + buffer.readInstant(), + (WorkflowModel) buffer.readObject(), + (WorkflowModel) buffer.readObject(), + buffer.readBoolean(), + buffer.readBoolean() ? buffer.readString() : null, + buffer.readInt()); + case RETRIED: + return new RetriedTaskInfo(buffer.readShort()); + } + } + private PersistenceTaskInfo readVersion1(WorkflowInputBuffer buffer) { TaskStatus taskStatus = buffer.readEnum(TaskStatus.class); switch (taskStatus) { diff --git a/impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractHandlerPersistenceTest.java b/impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractHandlerPersistenceTest.java index 05baba827..2a6549b1b 100644 --- a/impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractHandlerPersistenceTest.java +++ b/impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractHandlerPersistenceTest.java @@ -89,6 +89,7 @@ protected TaskContextData completedTaskContext( when(taskContext.completedAt()).thenReturn(Instant.now()); when(taskContext.output()).thenReturn(app.modelFactory().from(model)); when(taskContext.transition()).thenReturn(new TransitionInfo(null, true)); + when(taskContext.iteration()).thenReturn(2); return taskContext; } @@ -172,6 +173,9 @@ void testWorkflowInstance() throws InterruptedException { ArgumentCaptor transition = ArgumentCaptor.forClass(TransitionInfo.class); verify(updateTContext).transition(transition.capture()); assertThat(transition.getValue().isEndNode()).isTrue(); + ArgumentCaptor iteration = ArgumentCaptor.forClass(Integer.class); + verify(updateTContext).iteration(iteration.capture()); + assertThat(iteration.getValue()).isEqualTo(2); // workflow completed handlers.writer().completed(workflowContext).join(); diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/ScheduleEventConsumerTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/ScheduleEventConsumerTest.java index 30736f1f3..a28f7a895 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/ScheduleEventConsumerTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/ScheduleEventConsumerTest.java @@ -62,7 +62,7 @@ void testAllEvent() throws IOException, InterruptedException, ExecutionException appl.eventPublishers().forEach(p -> p.publish(buildCloudEvent(Map.of("name", "Fulanito")))); await() .pollDelay(Duration.ofMillis(20)) - .atMost(Duration.ofMillis(600)) + .atMost(Duration.ofMillis(980)) .until( () -> instances.stream().filter(i -> i.status() == WorkflowStatus.COMPLETED).count() @@ -80,7 +80,7 @@ void testOneEvent() throws IOException, InterruptedException, ExecutionException Collection instances = definition.scheduledInstances(); await() .pollDelay(Duration.ofMillis(20)) - .atMost(Duration.ofMillis(600)) + .atMost(Duration.ofMillis(980)) .until( () -> instances.stream().filter(i -> i.status() == WorkflowStatus.COMPLETED).count() From 8fdb84d0ea1280c5f63e9700098f5a0f32059886 Mon Sep 17 00:00:00 2001 From: fjtirado Date: Thu, 23 Apr 2026 18:27:41 +0200 Subject: [PATCH 2/2] [Fix #1329] Refined approach Signed-off-by: fjtirado --- .../{lifecycle => }/LifecycleEventsUtils.java | 37 ++++++++------ .../serverlessworkflow/impl/TaskContext.java | 35 ++------------ .../impl/WorkflowApplication.java | 48 ++++++++++++++++--- .../impl/WorkflowDefinition.java | 2 - .../impl/WorkflowMutableInstance.java | 8 +++- .../impl/executors/AbstractTaskExecutor.java | 17 ++----- .../WorkflowPersistenceInstance.java | 14 +++++- .../bigmap/BytesMapInstanceTransaction.java | 6 +-- .../test/AbstractHandlerPersistenceTest.java | 4 +- 9 files changed, 96 insertions(+), 75 deletions(-) rename impl/core/src/main/java/io/serverlessworkflow/impl/{lifecycle => }/LifecycleEventsUtils.java (51%) diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/LifecycleEventsUtils.java b/impl/core/src/main/java/io/serverlessworkflow/impl/LifecycleEventsUtils.java similarity index 51% rename from impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/LifecycleEventsUtils.java rename to impl/core/src/main/java/io/serverlessworkflow/impl/LifecycleEventsUtils.java index 220ec5568..0cb97c1c1 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/LifecycleEventsUtils.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/LifecycleEventsUtils.java @@ -13,9 +13,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.serverlessworkflow.impl.lifecycle; +package io.serverlessworkflow.impl; -import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionCompletableListener; +import java.util.Collection; import java.util.concurrent.CompletableFuture; import java.util.function.Function; import org.slf4j.Logger; @@ -30,17 +31,25 @@ private LifecycleEventsUtils() {} public static CompletableFuture publishEvent( WorkflowContext workflowContext, Function> function) { - return CompletableFuture.allOf( - workflowContext.definition().application().listeners().stream() - .map( - v -> - function - .apply(v) - .exceptionally( - ex -> { - logger.error("Error while executing listener", ex); - return null; - })) - .toArray(CompletableFuture[]::new)); + CompletableFuture result = CompletableFuture.completedFuture(null); + for (Collection listeners : + workflowContext.definition().application().listenersByPriority()) { + result = + result.thenCompose( + __ -> + CompletableFuture.allOf( + listeners.stream() + .map( + v -> + function + .apply(v) + .exceptionally( + ex -> { + logger.error("Error while executing listener", ex); + return null; + })) + .toArray(CompletableFuture[]::new))); + } + return result; } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java b/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java index cb5d61c0e..4c256f7e1 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java @@ -31,7 +31,6 @@ public class TaskContext implements TaskContextData { private final String taskName; private final Map contextVariables; private final Optional parentContext; - private int iteration; private WorkflowModel input; private WorkflowModel output; @@ -39,6 +38,7 @@ public class TaskContext implements TaskContextData { private Instant completedAt; private TransitionInfo transition; private short retryAttempt; + private int iteration; private AuthorizationDescriptor authorization; public TaskContext( @@ -46,19 +46,8 @@ public TaskContext( WorkflowPosition position, Optional parentContext, String taskName, - TaskBase task, - int iterations) { - this( - input, - parentContext, - taskName, - task, - position, - Instant.now(), - input, - input, - input, - iterations); + TaskBase task) { + this(input, parentContext, taskName, task, position, Instant.now(), input, input, input); } private TaskContext( @@ -70,8 +59,7 @@ private TaskContext( Instant startedAt, WorkflowModel input, WorkflowModel output, - WorkflowModel rawOutput, - int iterations) { + WorkflowModel rawOutput) { this.rawInput = rawInput; this.parentContext = parentContext; this.taskName = taskName; @@ -84,21 +72,6 @@ private TaskContext( this.retryAttempt = parentContext.map(TaskContext::retryAttempt).orElse((short) 0); this.contextVariables = parentContext.map(p -> new HashMap<>(p.contextVariables)).orElseGet(HashMap::new); - this.iteration = iterations; - } - - public TaskContext copy() { - return new TaskContext( - rawInput, - parentContext, - taskName, - task, - position, - startedAt, - input, - output, - rawOutput, - iteration); } public void input(WorkflowModel input) { diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java index 5139d6fb8..703e0397f 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java @@ -51,6 +51,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -69,7 +70,7 @@ public class WorkflowApplication implements AutoCloseable { private final ResourceLoaderFactory resourceLoaderFactory; private final SchemaValidatorFactory schemaValidatorFactory; private final WorkflowInstanceIdFactory idFactory; - private final Collection listeners; + private final List> listenersByPriority; private final Map definitions; private final WorkflowPositionFactory positionFactory; private final ExecutorServiceFactory executorFactory; @@ -99,7 +100,7 @@ private WorkflowApplication(Builder builder) { this.idFactory = builder.idFactory; this.runtimeDescriptorFactory = builder.descriptorFactory; this.executorFactory = builder.executorFactory; - this.listeners = new LinkedHashSet<>(builder.listeners); + this.listenersByPriority = groupByPriority(new LinkedHashSet<>(builder.listeners)); this.definitions = new ConcurrentHashMap<>(); this.eventConsumer = builder.eventConsumer; this.eventPublishers = builder.eventPublishers; @@ -140,7 +141,7 @@ public ResourceLoaderFactory resourceLoaderFactory() { } public Collection listeners() { - return listeners; + return this.listenersByPriority.stream().flatMap(x -> x.stream()).toList(); } public Collection eventPublishers() { @@ -151,6 +152,36 @@ public WorkflowInstanceIdFactory idFactory() { return idFactory; } + List> listenersByPriority() { + return listenersByPriority; + } + + private static List> groupByPriority( + Collection listeners) { + if (listeners.isEmpty()) { + return List.of(); + } + List> result = new ArrayList<>(); + Iterator iter = listeners.iterator(); + List currentList = new ArrayList<>(); + WorkflowExecutionCompletableListener currentListener = iter.next(); + int currentPriority = currentListener.priority(); + currentList.add(currentListener); + while (iter.hasNext()) { + currentListener = iter.next(); + if (currentListener.priority() != currentPriority) { + result.add(currentList); + currentList = new ArrayList<>(); + currentPriority = currentListener.priority(); + } + currentList.add(currentListener); + } + if (!currentList.isEmpty()) { + result.add(currentList); + } + return result; + } + public static class Builder { private static final class EmptySchemaValidatorHolder { @@ -423,10 +454,15 @@ public void close() { } definitions.clear(); - for (WorkflowExecutionCompletableListener listener : listeners) { - safeClose(listener); + if (!listenersByPriority.isEmpty()) { + for (Collection listeners : listenersByPriority) { + for (WorkflowExecutionCompletableListener listener : listeners) { + safeClose(listener); + } + listeners.clear(); + } + listenersByPriority.clear(); } - listeners.clear(); } public WorkflowPositionFactory positionFactory() { diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java index 525ac5ed1..770f41367 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java @@ -24,7 +24,6 @@ import io.serverlessworkflow.api.types.Schedule; import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.impl.events.EventRegistrationBuilderInfo; -import io.serverlessworkflow.impl.executors.AbstractTaskExecutor; import io.serverlessworkflow.impl.executors.TaskExecutor; import io.serverlessworkflow.impl.executors.TaskExecutorHelper; import io.serverlessworkflow.impl.resources.ResourceLoader; @@ -182,7 +181,6 @@ public void addScheduledInstance(WorkflowInstance workflowInstance) { void removeInstance(WorkflowInstance instance) { activeInstances.remove(instance.id()); - executors.forEach((k, v) -> ((AbstractTaskExecutor) v).onInstanceCompleted(instance.id())); } void addInstance(WorkflowInstance instance) { diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java index cfd380523..8f67e83c2 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java @@ -15,7 +15,7 @@ */ package io.serverlessworkflow.impl; -import static io.serverlessworkflow.impl.lifecycle.LifecycleEventsUtils.publishEvent; +import static io.serverlessworkflow.impl.LifecycleEventsUtils.publishEvent; import io.serverlessworkflow.impl.executors.TaskExecutorHelper; import io.serverlessworkflow.impl.lifecycle.WorkflowCancelledEvent; @@ -53,6 +53,8 @@ public class WorkflowMutableInstance implements WorkflowInstance { protected final Map additionalObjects = new ConcurrentHashMap<>(); + protected final Map iterationsMap = new ConcurrentHashMap<>(); + private Lock statusLock = new ReentrantLock(); private Map, TaskContext> suspended; @@ -164,6 +166,10 @@ public WorkflowModel input() { return input; } + public int incIteration(WorkflowPosition position) { + return iterationsMap.compute(position.jsonPointer(), (k, v) -> v == null ? 1 : v + 1); + } + @Override public WorkflowStatus status() { return status.get(); diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java index 5aec62cf9..591b3f503 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java @@ -15,9 +15,9 @@ */ package io.serverlessworkflow.impl.executors; +import static io.serverlessworkflow.impl.LifecycleEventsUtils.publishEvent; import static io.serverlessworkflow.impl.WorkflowUtils.buildWorkflowFilter; import static io.serverlessworkflow.impl.WorkflowUtils.getSchemaValidator; -import static io.serverlessworkflow.impl.lifecycle.LifecycleEventsUtils.publishEvent; import io.serverlessworkflow.api.types.Export; import io.serverlessworkflow.api.types.FlowDirective; @@ -54,7 +54,6 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; public abstract class AbstractTaskExecutor implements TaskExecutor { @@ -62,9 +61,6 @@ public abstract class AbstractTaskExecutor implements TaskEx protected final T task; protected final String taskName; protected final WorkflowPosition position; - - private Map iterationsMap = new ConcurrentHashMap(); - private final Optional inputProcessor; private final Optional outputProcessor; private final Optional contextProcessor; @@ -206,18 +202,15 @@ private CompletableFuture executeNext( @Override public CompletableFuture apply( WorkflowContext workflowContext, Optional parentContext, WorkflowModel input) { - String id = workflowContext.instanceData().id(); - TaskContext taskContext = - new TaskContext( - input, position, parentContext, taskName, task, iterationsMap.getOrDefault(id, 0) + 1); + TaskContext taskContext = new TaskContext(input, position, parentContext, taskName, task); workflowContext.instance().restoreContext(workflowContext, taskContext); - iterationsMap.put(id, taskContext.iteration()); CompletableFuture completable = CompletableFuture.completedFuture(taskContext); if (!TaskExecutorHelper.isActive(workflowContext)) { return completable; } else if (taskContext.isCompleted()) { return executeNext(completable, workflowContext); } else if (ifFilter.map(f -> f.test(workflowContext, taskContext, input)).orElse(true)) { + taskContext.iteration(workflowContext.instance().incIteration(position)); completable = completable .thenCompose(workflowContext.instance()::suspendedCheck) @@ -318,10 +311,6 @@ public WorkflowPosition position() { return position; } - public void onInstanceCompleted(String instanceId) { - iterationsMap.remove(instanceId); - } - protected abstract TransitionInfo getSkipTransition(); protected abstract CompletableFuture execute( diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java index 4b248ea4e..dc8737b51 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java @@ -38,6 +38,13 @@ public static WorkflowInstance of(WorkflowDefinition definition, PersistenceWork private WorkflowPersistenceInstance(WorkflowDefinition definition, PersistenceWorkflowInfo info) { super(definition, info.id(), info.input()); this.info = info; + info.tasks() + .forEach( + (k, v) -> { + if (v instanceof CompletedTaskInfo task) { + iterationsMap.put(k, task.iteration()); + } + }); this.startedAt = info.startedAt(); } @@ -54,7 +61,13 @@ public CompletableFuture start() { @Override public void restoreContext(WorkflowContext workflow, TaskContext context) { + if (info.tasks().isEmpty()) { + return; + } PersistenceTaskInfo taskInfo = info.tasks().remove(context.position().jsonPointer()); + if (taskInfo == null) { + return; + } if (taskInfo instanceof CompletedTaskInfo completedTaskInfo) { context.output(completedTaskInfo.model()); context.completedAt(completedTaskInfo.instant()); @@ -64,7 +77,6 @@ public void restoreContext(WorkflowContext workflow, TaskContext context) { ? null : workflow.definition().taskExecutor(completedTaskInfo.nextPosition()), completedTaskInfo.isEndNode())); - context.iteration(completedTaskInfo.iteration()); workflow.context(completedTaskInfo.context()); } else if (taskInfo instanceof RetriedTaskInfo retriedTaskInfo) { if (context.retryAttempt() == 0) { diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceTransaction.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceTransaction.java index b1cdd994b..3435550a8 100644 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceTransaction.java +++ b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceTransaction.java @@ -121,13 +121,13 @@ protected PersistenceTaskInfo unmarshallTaskInfo(byte[] taskData) { byte version = buffer.readByte(); switch (version) { case VERSION_0: - default: return readVersion0(buffer); case VERSION_1: return readVersion1(buffer); case VERSION_2: return readVersion2(buffer); } + throw new UnsupportedOperationException("Unknown version " + version); } } @@ -135,7 +135,6 @@ private PersistenceTaskInfo readVersion2(WorkflowInputBuffer buffer) { TaskStatus taskStatus = buffer.readEnum(TaskStatus.class); switch (taskStatus) { case COMPLETED: - default: return new CompletedTaskInfo( buffer.readInstant(), (WorkflowModel) buffer.readObject(), @@ -146,17 +145,18 @@ private PersistenceTaskInfo readVersion2(WorkflowInputBuffer buffer) { case RETRIED: return new RetriedTaskInfo(buffer.readShort()); } + throw new UnsupportedOperationException("Unknown status " + taskStatus); } private PersistenceTaskInfo readVersion1(WorkflowInputBuffer buffer) { TaskStatus taskStatus = buffer.readEnum(TaskStatus.class); switch (taskStatus) { case COMPLETED: - default: return readVersion0(buffer); case RETRIED: return new RetriedTaskInfo(buffer.readShort()); } + throw new UnsupportedOperationException("Unknown status " + taskStatus); } private PersistenceTaskInfo readVersion0(WorkflowInputBuffer buffer) { diff --git a/impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractHandlerPersistenceTest.java b/impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractHandlerPersistenceTest.java index 2a6549b1b..7e120e735 100644 --- a/impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractHandlerPersistenceTest.java +++ b/impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractHandlerPersistenceTest.java @@ -173,9 +173,7 @@ void testWorkflowInstance() throws InterruptedException { ArgumentCaptor transition = ArgumentCaptor.forClass(TransitionInfo.class); verify(updateTContext).transition(transition.capture()); assertThat(transition.getValue().isEndNode()).isTrue(); - ArgumentCaptor iteration = ArgumentCaptor.forClass(Integer.class); - verify(updateTContext).iteration(iteration.capture()); - assertThat(iteration.getValue()).isEqualTo(2); + assertThat(instance.incIteration(position2)).isEqualTo(3); // workflow completed handlers.writer().completed(workflowContext).join();