Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.lifecycle;
package io.serverlessworkflow.impl;

import io.serverlessworkflow.impl.WorkflowContext;
import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionCompletableListener;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.slf4j.Logger;
Expand All @@ -30,17 +31,25 @@ private LifecycleEventsUtils() {}
public static CompletableFuture<?> publishEvent(
WorkflowContext workflowContext,
Function<WorkflowExecutionCompletableListener, CompletableFuture<?>> function) {
return CompletableFuture.allOf(
workflowContext.definition().application().listeners().stream()
.map(
v ->
function
.apply(v)
.exceptionally(
ex -> {
logger.error("Error while executing listener", ex);
return null;
}))
.toArray(CompletableFuture[]::new));
CompletableFuture<?> result = CompletableFuture.completedFuture(null);
for (Collection<WorkflowExecutionCompletableListener> listeners :
workflowContext.definition().application().listenersByPriority()) {
result =
result.thenCompose(
__ ->
CompletableFuture.allOf(
listeners.stream()
.map(
v ->
function
.apply(v)
.exceptionally(
ex -> {
logger.error("Error while executing listener", ex);
return null;
}))
.toArray(CompletableFuture[]::new)));
}
return result;
Comment on lines +34 to +53
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realized that listeners with different priority should not be executed concurrently (it kills the purpose of priority). listeners with higher priority should be completed before listeners with lower priority starts execution.
Since the list of listeners is not mutable once the application has been created, they are grouped by prioriry at application creation time, so there is not need to group them for every application change.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class TaskContext implements TaskContextData {
private Instant completedAt;
private TransitionInfo transition;
private short retryAttempt;
private int iteration;
private AuthorizationDescriptor authorization;

public TaskContext(
Expand Down Expand Up @@ -73,11 +74,6 @@ private TaskContext(
parentContext.map(p -> new HashMap<>(p.contextVariables)).orElseGet(HashMap::new);
}

Comment thread
fjtirado marked this conversation as resolved.
public TaskContext copy() {
return new TaskContext(
rawInput, parentContext, taskName, task, position, startedAt, input, output, rawOutput);
}

public void input(WorkflowModel input) {
this.input = input;
this.rawOutput = input;
Expand Down Expand Up @@ -174,6 +170,7 @@ public boolean isCompleted() {
return completedAt != null;
}

@Override
public short retryAttempt() {
return retryAttempt;
}
Expand All @@ -186,6 +183,15 @@ public boolean isRetrying() {
return retryAttempt > 0;
}

@Override
public int iteration() {
return iteration;
}

public void iteration(int iteration) {
this.iteration = iteration;
}

@Override
public String toString() {
return "TaskContext [position="
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,8 @@ public interface TaskContextData {
String taskName();

Instant completedAt();

int iteration();

short retryAttempt();
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a leftover from previous PR, retryAttemp make sense at interfaz level

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -69,7 +70,7 @@ public class WorkflowApplication implements AutoCloseable {
private final ResourceLoaderFactory resourceLoaderFactory;
private final SchemaValidatorFactory schemaValidatorFactory;
private final WorkflowInstanceIdFactory idFactory;
private final Collection<WorkflowExecutionCompletableListener> listeners;
private final List<Collection<WorkflowExecutionCompletableListener>> listenersByPriority;
private final Map<WorkflowDefinitionId, WorkflowDefinition> definitions;
private final WorkflowPositionFactory positionFactory;
private final ExecutorServiceFactory executorFactory;
Expand Down Expand Up @@ -99,7 +100,7 @@ private WorkflowApplication(Builder builder) {
this.idFactory = builder.idFactory;
this.runtimeDescriptorFactory = builder.descriptorFactory;
this.executorFactory = builder.executorFactory;
this.listeners = new LinkedHashSet<>(builder.listeners);
this.listenersByPriority = groupByPriority(new LinkedHashSet<>(builder.listeners));
this.definitions = new ConcurrentHashMap<>();
this.eventConsumer = builder.eventConsumer;
this.eventPublishers = builder.eventPublishers;
Expand Down Expand Up @@ -140,7 +141,7 @@ public ResourceLoaderFactory resourceLoaderFactory() {
}

public Collection<WorkflowExecutionCompletableListener> listeners() {
return listeners;
return this.listenersByPriority.stream().flatMap(x -> x.stream()).toList();
}

public Collection<EventPublisher> eventPublishers() {
Expand All @@ -151,6 +152,36 @@ public WorkflowInstanceIdFactory idFactory() {
return idFactory;
}

List<Collection<WorkflowExecutionCompletableListener>> listenersByPriority() {
return listenersByPriority;
Comment thread
fjtirado marked this conversation as resolved.
}

private static List<Collection<WorkflowExecutionCompletableListener>> groupByPriority(
Collection<WorkflowExecutionCompletableListener> listeners) {
if (listeners.isEmpty()) {
return List.of();
Comment thread
fjtirado marked this conversation as resolved.
}
List<Collection<WorkflowExecutionCompletableListener>> result = new ArrayList<>();
Iterator<WorkflowExecutionCompletableListener> iter = listeners.iterator();
List<WorkflowExecutionCompletableListener> currentList = new ArrayList<>();
WorkflowExecutionCompletableListener currentListener = iter.next();
int currentPriority = currentListener.priority();
currentList.add(currentListener);
while (iter.hasNext()) {
currentListener = iter.next();
if (currentListener.priority() != currentPriority) {
result.add(currentList);
currentList = new ArrayList<>();
currentPriority = currentListener.priority();
}
currentList.add(currentListener);
}
if (!currentList.isEmpty()) {
result.add(currentList);
}
return result;
Comment thread
fjtirado marked this conversation as resolved.
Comment on lines +160 to +182
Copy link
Copy Markdown
Collaborator Author

@fjtirado fjtirado Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is grouping listeners by priority.
Im using a list rather than map, since we do not really care about the priority number.
Since the source list is already sorted by priority, this old and simple algorithm will work.
It is done here because the list of listener is not mutable once the application is started to avoid repeating the grouping for every modification, which will negatively affect performance

}

public static class Builder {

private static final class EmptySchemaValidatorHolder {
Expand Down Expand Up @@ -423,10 +454,15 @@ public void close() {
}
definitions.clear();

for (WorkflowExecutionCompletableListener listener : listeners) {
safeClose(listener);
if (!listenersByPriority.isEmpty()) {
for (Collection<WorkflowExecutionCompletableListener> listeners : listenersByPriority) {
for (WorkflowExecutionCompletableListener listener : listeners) {
safeClose(listener);
}
listeners.clear();
}
listenersByPriority.clear();
}
listeners.clear();
}

public WorkflowPositionFactory positionFactory() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package io.serverlessworkflow.impl;

import static io.serverlessworkflow.impl.lifecycle.LifecycleEventsUtils.publishEvent;
import static io.serverlessworkflow.impl.LifecycleEventsUtils.publishEvent;

import io.serverlessworkflow.impl.executors.TaskExecutorHelper;
import io.serverlessworkflow.impl.lifecycle.WorkflowCancelledEvent;
Expand Down Expand Up @@ -53,6 +53,8 @@ public class WorkflowMutableInstance implements WorkflowInstance {

protected final Map<String, Object> additionalObjects = new ConcurrentHashMap<>();

protected final Map<String, Integer> iterationsMap = new ConcurrentHashMap<>();

private Lock statusLock = new ReentrantLock();
private Map<CompletableFuture<TaskContext>, TaskContext> suspended;

Expand Down Expand Up @@ -164,6 +166,10 @@ public WorkflowModel input() {
return input;
}

public int incIteration(WorkflowPosition position) {
return iterationsMap.compute(position.jsonPointer(), (k, v) -> v == null ? 1 : v + 1);
}

@Override
public WorkflowStatus status() {
return status.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
*/
package io.serverlessworkflow.impl.executors;

import static io.serverlessworkflow.impl.LifecycleEventsUtils.publishEvent;
import static io.serverlessworkflow.impl.WorkflowUtils.buildWorkflowFilter;
import static io.serverlessworkflow.impl.WorkflowUtils.getSchemaValidator;
import static io.serverlessworkflow.impl.lifecycle.LifecycleEventsUtils.publishEvent;

import io.serverlessworkflow.api.types.Export;
import io.serverlessworkflow.api.types.FlowDirective;
Expand Down Expand Up @@ -210,6 +210,7 @@ public CompletableFuture<TaskContext> apply(
} else if (taskContext.isCompleted()) {
return executeNext(completable, workflowContext);
} else if (ifFilter.map(f -> f.test(workflowContext, taskContext, input)).orElse(true)) {
taskContext.iteration(workflowContext.instance().incIteration(position));
completable =
completable
.thenCompose(workflowContext.instance()::suspendedCheck)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,16 @@ public record CompletedTaskInfo(
WorkflowModel model,
WorkflowModel context,
Boolean isEndNode,
String nextPosition)
implements PersistenceTaskInfo {}
String nextPosition,
int iteration)
implements PersistenceTaskInfo {

public CompletedTaskInfo(
Instant instant,
WorkflowModel model,
WorkflowModel context,
Boolean isEndNode,
String nextPosition) {
this(instant, model, context, isEndNode, nextPosition, 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ public static WorkflowInstance of(WorkflowDefinition definition, PersistenceWork
private WorkflowPersistenceInstance(WorkflowDefinition definition, PersistenceWorkflowInfo info) {
super(definition, info.id(), info.input());
this.info = info;
info.tasks()
.forEach(
(k, v) -> {
if (v instanceof CompletedTaskInfo task) {
iterationsMap.put(k, task.iteration());
}
});
this.startedAt = info.startedAt();
}

Expand All @@ -54,7 +61,13 @@ public CompletableFuture<WorkflowModel> start() {

@Override
public void restoreContext(WorkflowContext workflow, TaskContext context) {
if (info.tasks().isEmpty()) {
return;
}
Comment on lines +64 to +66
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an optimization I forgot to add previously, once all persisted task are processed there is not point on performing a remove and two intances of

PersistenceTaskInfo taskInfo = info.tasks().remove(context.position().jsonPointer());
if (taskInfo == null) {
return;
}
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the same, there is not point in performing two instanceOf if the task is not in the map

if (taskInfo instanceof CompletedTaskInfo completedTaskInfo) {
Comment thread
fjtirado marked this conversation as resolved.
context.output(completedTaskInfo.model());
Comment thread
fjtirado marked this conversation as resolved.
context.completedAt(completedTaskInfo.instant());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public abstract class BytesMapInstanceTransaction

private static final byte VERSION_0 = 0;
private static final byte VERSION_1 = 1;
private static final byte VERSION_2 = 2;

private final WorkflowBufferFactory factory;

Expand All @@ -50,7 +51,7 @@ protected BytesMapInstanceTransaction(WorkflowBufferFactory factory) {
protected byte[] marshallTaskCompleted(WorkflowContextData contextData, TaskContext taskContext) {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
try (WorkflowOutputBuffer writer = factory.output(bytes)) {
writer.writeByte(VERSION_1);
writer.writeByte(VERSION_2);
writer.writeEnum(TaskStatus.COMPLETED);
writer.writeInstant(taskContext.completedAt());
writeModel(writer, taskContext.output());
Expand All @@ -64,6 +65,7 @@ protected byte[] marshallTaskCompleted(WorkflowContextData contextData, TaskCont
writer.writeBoolean(true);
writer.writeString(next.position().jsonPointer());
}
writer.writeInt(taskContext.iteration());
}
return bytes.toByteArray();
}
Expand Down Expand Up @@ -119,23 +121,42 @@ protected PersistenceTaskInfo unmarshallTaskInfo(byte[] taskData) {
byte version = buffer.readByte();
switch (version) {
case VERSION_0:
default:
return readVersion0(buffer);
case VERSION_1:
return readVersion1(buffer);
case VERSION_2:
return readVersion2(buffer);
}
throw new UnsupportedOperationException("Unknown version " + version);
}
}

private PersistenceTaskInfo readVersion2(WorkflowInputBuffer buffer) {
TaskStatus taskStatus = buffer.readEnum(TaskStatus.class);
switch (taskStatus) {
case COMPLETED:
return new CompletedTaskInfo(
buffer.readInstant(),
(WorkflowModel) buffer.readObject(),
(WorkflowModel) buffer.readObject(),
buffer.readBoolean(),
buffer.readBoolean() ? buffer.readString() : null,
buffer.readInt());
case RETRIED:
return new RetriedTaskInfo(buffer.readShort());
Comment thread
fjtirado marked this conversation as resolved.
}
throw new UnsupportedOperationException("Unknown status " + taskStatus);
}
Comment thread
fjtirado marked this conversation as resolved.

private PersistenceTaskInfo readVersion1(WorkflowInputBuffer buffer) {
TaskStatus taskStatus = buffer.readEnum(TaskStatus.class);
switch (taskStatus) {
case COMPLETED:
default:
return readVersion0(buffer);
case RETRIED:
return new RetriedTaskInfo(buffer.readShort());
}
throw new UnsupportedOperationException("Unknown status " + taskStatus);
}

private PersistenceTaskInfo readVersion0(WorkflowInputBuffer buffer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ protected TaskContextData completedTaskContext(
when(taskContext.completedAt()).thenReturn(Instant.now());
when(taskContext.output()).thenReturn(app.modelFactory().from(model));
when(taskContext.transition()).thenReturn(new TransitionInfo(null, true));
when(taskContext.iteration()).thenReturn(2);
return taskContext;
}

Expand Down Expand Up @@ -172,6 +173,7 @@ void testWorkflowInstance() throws InterruptedException {
ArgumentCaptor<TransitionInfo> transition = ArgumentCaptor.forClass(TransitionInfo.class);
verify(updateTContext).transition(transition.capture());
assertThat(transition.getValue().isEndNode()).isTrue();
assertThat(instance.incIteration(position2)).isEqualTo(3);

// workflow completed
handlers.writer().completed(workflowContext).join();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ void testAllEvent() throws IOException, InterruptedException, ExecutionException
appl.eventPublishers().forEach(p -> p.publish(buildCloudEvent(Map.of("name", "Fulanito"))));
await()
.pollDelay(Duration.ofMillis(20))
.atMost(Duration.ofMillis(600))
.atMost(Duration.ofMillis(980))
.until(
() ->
instances.stream().filter(i -> i.status() == WorkflowStatus.COMPLETED).count()
Expand All @@ -80,7 +80,7 @@ void testOneEvent() throws IOException, InterruptedException, ExecutionException
Collection<WorkflowInstance> instances = definition.scheduledInstances();
await()
.pollDelay(Duration.ofMillis(20))
.atMost(Duration.ofMillis(600))
.atMost(Duration.ofMillis(980))
.until(
() ->
instances.stream().filter(i -> i.status() == WorkflowStatus.COMPLETED).count()
Expand Down
Loading