From f03e731dd8071625d2227cd36208351210768d85 Mon Sep 17 00:00:00 2001 From: fjtirado Date: Fri, 26 Sep 2025 17:55:48 +0200 Subject: [PATCH] [Fix #822] TaskExecutorFactory to accept WorkflowDefinition Signed-off-by: fjtirado --- .../func/JavaConsumerCallExecutor.java | 10 +--- .../func/JavaForExecutorBuilder.java | 12 ++--- .../func/JavaFunctionCallExecutor.java | 10 +--- .../func/JavaListenExecutorBuilder.java | 12 ++--- .../func/JavaLoopFunctionCallExecutor.java | 10 +--- .../JavaLoopFunctionIndexCallExecutor.java | 10 +--- .../func/JavaSwitchExecutorBuilder.java | 12 ++--- .../func/JavaTaskExecutorFactory.java | 21 +++------ .../impl/WorkflowDefinition.java | 12 +++-- .../impl/executors/AbstractTaskExecutor.java | 35 +++++++------- .../impl/executors/CallTaskExecutor.java | 12 ++--- .../impl/executors/CallableTask.java | 7 +-- .../executors/DefaultTaskExecutorFactory.java | 47 +++++-------------- .../impl/executors/DoExecutor.java | 16 ++----- .../impl/executors/EmitExecutor.java | 11 ++--- .../impl/executors/ForExecutor.java | 16 ++----- .../impl/executors/ForkExecutor.java | 15 ++---- .../impl/executors/ListenExecutor.java | 16 ++----- .../impl/executors/RaiseExecutor.java | 11 ++--- .../impl/executors/RegularTaskExecutor.java | 12 ++--- .../impl/executors/SetExecutor.java | 12 ++--- .../impl/executors/SwitchExecutor.java | 12 ++--- .../impl/executors/TaskExecutorFactory.java | 10 +--- .../impl/executors/TaskExecutorHelper.java | 30 ++++-------- .../impl/executors/TryExecutor.java | 17 ++----- .../impl/executors/WaitExecutor.java | 12 ++--- .../impl/executors/http/HttpExecutor.java | 15 +++--- 27 files changed, 122 insertions(+), 293 deletions(-) diff --git a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaConsumerCallExecutor.java b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaConsumerCallExecutor.java index 4c1abce7..b5a19546 100644 --- a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaConsumerCallExecutor.java +++ b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaConsumerCallExecutor.java @@ -16,14 +16,12 @@ package io.serverlessworkflow.impl.executors.func; import io.serverlessworkflow.api.types.TaskBase; -import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.api.types.func.CallJava; import io.serverlessworkflow.impl.TaskContext; -import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.executors.CallableTask; -import io.serverlessworkflow.impl.resources.ResourceLoader; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; @@ -31,11 +29,7 @@ public class JavaConsumerCallExecutor implements CallableTask task, - Workflow workflow, - WorkflowApplication application, - ResourceLoader loader) { + public void init(CallJava.CallJavaFunction task, WorkflowDefinition definition) { function = task.function(); inputClass = task.inputClass(); } diff --git a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaListenExecutorBuilder.java b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaListenExecutorBuilder.java index 295610bb..57393976 100644 --- a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaListenExecutorBuilder.java +++ b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaListenExecutorBuilder.java @@ -19,24 +19,18 @@ import io.serverlessworkflow.api.types.ListenTask; import io.serverlessworkflow.api.types.Until; -import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.api.types.func.UntilPredicate; -import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowMutablePosition; import io.serverlessworkflow.impl.WorkflowPredicate; import io.serverlessworkflow.impl.executors.ListenExecutor.ListenExecutorBuilder; import io.serverlessworkflow.impl.expressions.ExpressionDescriptor; -import io.serverlessworkflow.impl.resources.ResourceLoader; public class JavaListenExecutorBuilder extends ListenExecutorBuilder { protected JavaListenExecutorBuilder( - WorkflowMutablePosition position, - ListenTask task, - Workflow workflow, - WorkflowApplication application, - ResourceLoader resourceLoader) { - super(position, task, workflow, application, resourceLoader); + WorkflowMutablePosition position, ListenTask task, WorkflowDefinition definition) { + super(position, task, definition); } @Override diff --git a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaLoopFunctionCallExecutor.java b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaLoopFunctionCallExecutor.java index 6eaba85b..9c6c11df 100644 --- a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaLoopFunctionCallExecutor.java +++ b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaLoopFunctionCallExecutor.java @@ -18,16 +18,14 @@ import static io.serverlessworkflow.impl.executors.func.JavaFuncUtils.safeObject; import io.serverlessworkflow.api.types.TaskBase; -import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.api.types.func.CallJava; import io.serverlessworkflow.api.types.func.LoopFunction; import io.serverlessworkflow.impl.TaskContext; -import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.WorkflowModelFactory; import io.serverlessworkflow.impl.executors.CallableTask; -import io.serverlessworkflow.impl.resources.ResourceLoader; import java.util.concurrent.CompletableFuture; public class JavaLoopFunctionCallExecutor implements CallableTask { @@ -35,11 +33,7 @@ public class JavaLoopFunctionCallExecutor implements CallableTask getTaskExecutor( - WorkflowMutablePosition position, - Task task, - Workflow workflow, - WorkflowApplication application, - ResourceLoader resourceLoader) { + WorkflowMutablePosition position, Task task, WorkflowDefinition definition) { if (task.getForTask() != null) { - return new JavaForExecutorBuilder( - position, task.getForTask(), workflow, application, resourceLoader); + return new JavaForExecutorBuilder(position, task.getForTask(), definition); } else if (task.getSwitchTask() != null) { - return new JavaSwitchExecutorBuilder( - position, task.getSwitchTask(), workflow, application, resourceLoader); + return new JavaSwitchExecutorBuilder(position, task.getSwitchTask(), definition); } else if (task.getListenTask() != null) { - return new JavaListenExecutorBuilder( - position, task.getListenTask(), workflow, application, resourceLoader); + return new JavaListenExecutorBuilder(position, task.getListenTask(), definition); } else { - return super.getTaskExecutor(position, task, workflow, application, resourceLoader); + return super.getTaskExecutor(position, task, definition); } } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java index 92b01ed9..e0805c39 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java @@ -36,11 +36,13 @@ public class WorkflowDefinition implements AutoCloseable, WorkflowDefinitionData private Optional outputFilter = Optional.empty(); private final WorkflowApplication application; private final TaskExecutor taskExecutor; + private final ResourceLoader resourceLoader; private WorkflowDefinition( WorkflowApplication application, Workflow workflow, ResourceLoader resourceLoader) { this.workflow = workflow; this.application = application; + this.resourceLoader = resourceLoader; if (workflow.getInput() != null) { Input input = workflow.getInput(); this.inputSchemaValidator = @@ -55,11 +57,7 @@ private WorkflowDefinition( } this.taskExecutor = TaskExecutorHelper.createExecutorList( - application.positionFactory().get(), - workflow.getDo(), - workflow, - application, - resourceLoader); + application.positionFactory().get(), workflow.getDo(), this); } static WorkflowDefinition of(WorkflowApplication application, Workflow workflow) { @@ -105,6 +103,10 @@ public WorkflowApplication application() { return application; } + public ResourceLoader resourceLoader() { + return resourceLoader; + } + @Override public void close() { // TODO close resourcers hold for uncompleted process instances, if any diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java index f7f78bc5..ba3cae0d 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java @@ -28,6 +28,7 @@ import io.serverlessworkflow.impl.TaskContext; import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowFilter; import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.WorkflowMutablePosition; @@ -81,17 +82,13 @@ public abstract static class AbstractTaskExecutorBuilder< private V instance; protected AbstractTaskExecutorBuilder( - WorkflowMutablePosition position, - T task, - Workflow workflow, - WorkflowApplication application, - ResourceLoader resourceLoader) { - this.workflow = workflow; + WorkflowMutablePosition position, T task, WorkflowDefinition definition) { + this.workflow = definition.workflow(); this.taskName = position.last().toString(); this.position = position; this.task = task; - this.application = application; - this.resourceLoader = resourceLoader; + this.application = definition.application(); + this.resourceLoader = definition.resourceLoader(); if (task.getInput() != null) { Input input = task.getInput(); this.inputProcessor = buildWorkflowFilter(application, input.getFrom()); @@ -174,16 +171,18 @@ protected AbstractTaskExecutor(AbstractTaskExecutorBuilder builder) { protected final CompletableFuture executeNext( CompletableFuture future, WorkflowContext workflow) { - return future.thenCompose( - t -> { - TransitionInfo transition = t.transition(); - if (transition.isEndNode()) { - workflow.instance().status(WorkflowStatus.COMPLETED); - } else if (transition.next() != null) { - return transition.next().apply(workflow, t.parent(), t.output()); - } - return CompletableFuture.completedFuture(t); - }); + return future.thenCompose(t -> executeNext(workflow, t)); + } + + private CompletableFuture executeNext( + WorkflowContext workflow, TaskContext taskContext) { + TransitionInfo transition = taskContext.transition(); + if (transition.isEndNode()) { + workflow.instance().status(WorkflowStatus.COMPLETED); + } else if (transition.next() != null) { + return transition.next().apply(workflow, taskContext.parent(), taskContext.output()); + } + return CompletableFuture.completedFuture(taskContext); } @Override diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/CallTaskExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/CallTaskExecutor.java index 60733221..99d2ec79 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/CallTaskExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/CallTaskExecutor.java @@ -16,13 +16,11 @@ package io.serverlessworkflow.impl.executors; import io.serverlessworkflow.api.types.TaskBase; -import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.impl.TaskContext; -import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.WorkflowMutablePosition; -import io.serverlessworkflow.impl.resources.ResourceLoader; import java.util.concurrent.CompletableFuture; public class CallTaskExecutor extends RegularTaskExecutor { @@ -36,13 +34,11 @@ public static class CallTaskExecutorBuilder protected CallTaskExecutorBuilder( WorkflowMutablePosition position, T task, - Workflow workflow, - WorkflowApplication application, - ResourceLoader resourceLoader, + WorkflowDefinition definition, CallableTask callable) { - super(position, task, workflow, application, resourceLoader); + super(position, task, definition); this.callable = callable; - callable.init(task, workflow, application, resourceLoader); + callable.init(task, definition); } @Override diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/CallableTask.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/CallableTask.java index 6cc52922..c6f7be0d 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/CallableTask.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/CallableTask.java @@ -16,17 +16,14 @@ package io.serverlessworkflow.impl.executors; import io.serverlessworkflow.api.types.TaskBase; -import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.impl.TaskContext; -import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowModel; -import io.serverlessworkflow.impl.resources.ResourceLoader; import java.util.concurrent.CompletableFuture; public interface CallableTask { - default void init( - T task, Workflow workflow, WorkflowApplication application, ResourceLoader loader) {} + default void init(T task, WorkflowDefinition definition) {} CompletableFuture apply( WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input); diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DefaultTaskExecutorFactory.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DefaultTaskExecutorFactory.java index a1ea9a1d..f8f7ccb0 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DefaultTaskExecutorFactory.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DefaultTaskExecutorFactory.java @@ -18,8 +18,7 @@ import io.serverlessworkflow.api.types.CallTask; import io.serverlessworkflow.api.types.Task; import io.serverlessworkflow.api.types.TaskBase; -import io.serverlessworkflow.api.types.Workflow; -import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowMutablePosition; import io.serverlessworkflow.impl.executors.CallTaskExecutor.CallTaskExecutorBuilder; import io.serverlessworkflow.impl.executors.DoExecutor.DoExecutorBuilder; @@ -32,7 +31,6 @@ import io.serverlessworkflow.impl.executors.SwitchExecutor.SwitchExecutorBuilder; import io.serverlessworkflow.impl.executors.TryExecutor.TryExecutorBuilder; import io.serverlessworkflow.impl.executors.WaitExecutor.WaitExecutorBuilder; -import io.serverlessworkflow.impl.resources.ResourceLoader; import java.util.ServiceLoader; import java.util.ServiceLoader.Provider; @@ -50,53 +48,34 @@ protected DefaultTaskExecutorFactory() {} @Override public TaskExecutorBuilder getTaskExecutor( - WorkflowMutablePosition position, - Task task, - Workflow workflow, - WorkflowApplication application, - ResourceLoader resourceLoader) { + WorkflowMutablePosition position, Task task, WorkflowDefinition definition) { if (task.getCallTask() != null) { CallTask callTask = task.getCallTask(); TaskBase taskBase = (TaskBase) callTask.get(); if (taskBase != null) { return new CallTaskExecutorBuilder( - position, - taskBase, - workflow, - application, - resourceLoader, - findCallTask(taskBase.getClass())); + position, taskBase, definition, findCallTask(taskBase.getClass())); } } else if (task.getSwitchTask() != null) { - return new SwitchExecutorBuilder( - position, task.getSwitchTask(), workflow, application, resourceLoader); + return new SwitchExecutorBuilder(position, task.getSwitchTask(), definition); } else if (task.getDoTask() != null) { - return new DoExecutorBuilder( - position, task.getDoTask(), workflow, application, resourceLoader); + return new DoExecutorBuilder(position, task.getDoTask(), definition); } else if (task.getSetTask() != null) { - return new SetExecutorBuilder( - position, task.getSetTask(), workflow, application, resourceLoader); + return new SetExecutorBuilder(position, task.getSetTask(), definition); } else if (task.getForTask() != null) { - return new ForExecutorBuilder( - position, task.getForTask(), workflow, application, resourceLoader); + return new ForExecutorBuilder(position, task.getForTask(), definition); } else if (task.getRaiseTask() != null) { - return new RaiseExecutorBuilder( - position, task.getRaiseTask(), workflow, application, resourceLoader); + return new RaiseExecutorBuilder(position, task.getRaiseTask(), definition); } else if (task.getTryTask() != null) { - return new TryExecutorBuilder( - position, task.getTryTask(), workflow, application, resourceLoader); + return new TryExecutorBuilder(position, task.getTryTask(), definition); } else if (task.getForkTask() != null) { - return new ForkExecutorBuilder( - position, task.getForkTask(), workflow, application, resourceLoader); + return new ForkExecutorBuilder(position, task.getForkTask(), definition); } else if (task.getWaitTask() != null) { - return new WaitExecutorBuilder( - position, task.getWaitTask(), workflow, application, resourceLoader); + return new WaitExecutorBuilder(position, task.getWaitTask(), definition); } else if (task.getListenTask() != null) { - return new ListenExecutorBuilder( - position, task.getListenTask(), workflow, application, resourceLoader); + return new ListenExecutorBuilder(position, task.getListenTask(), definition); } else if (task.getEmitTask() != null) { - return new EmitExecutorBuilder( - position, task.getEmitTask(), workflow, application, resourceLoader); + return new EmitExecutorBuilder(position, task.getEmitTask(), definition); } throw new UnsupportedOperationException(task.get().getClass().getName() + " not supported yet"); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DoExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DoExecutor.java index 23f06e1f..c5a1b95f 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DoExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DoExecutor.java @@ -16,13 +16,11 @@ package io.serverlessworkflow.impl.executors; import io.serverlessworkflow.api.types.DoTask; -import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.impl.TaskContext; -import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.WorkflowMutablePosition; -import io.serverlessworkflow.impl.resources.ResourceLoader; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -34,15 +32,9 @@ public static class DoExecutorBuilder extends RegularTaskExecutorBuilder private TaskExecutor taskExecutor; protected DoExecutorBuilder( - WorkflowMutablePosition position, - DoTask task, - Workflow workflow, - WorkflowApplication application, - ResourceLoader resourceLoader) { - super(position, task, workflow, application, resourceLoader); - taskExecutor = - TaskExecutorHelper.createExecutorList( - position, task.getDo(), workflow, application, resourceLoader); + WorkflowMutablePosition position, DoTask task, WorkflowDefinition definition) { + super(position, task, definition); + taskExecutor = TaskExecutorHelper.createExecutorList(position, task.getDo(), definition); } @Override diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/EmitExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/EmitExecutor.java index 5c9d10ba..0dad582c 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/EmitExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/EmitExecutor.java @@ -24,10 +24,10 @@ import io.serverlessworkflow.api.types.EventProperties; import io.serverlessworkflow.api.types.EventSource; import io.serverlessworkflow.api.types.EventTime; -import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.impl.TaskContext; import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.WorkflowMutablePosition; import io.serverlessworkflow.impl.WorkflowUtils; @@ -35,7 +35,6 @@ import io.serverlessworkflow.impl.events.CloudEventUtils; import io.serverlessworkflow.impl.events.EventPublisher; import io.serverlessworkflow.impl.expressions.ExpressionDescriptor; -import io.serverlessworkflow.impl.resources.ResourceLoader; import java.net.URI; import java.time.OffsetDateTime; import java.util.Collection; @@ -52,12 +51,8 @@ public static class EmitExecutorBuilder extends RegularTaskExecutorBuilder taskExecutor; protected ForExecutorBuilder( - WorkflowMutablePosition position, - ForTask task, - Workflow workflow, - WorkflowApplication application, - ResourceLoader resourceLoader) { - super(position, task, workflow, application, resourceLoader); + WorkflowMutablePosition position, ForTask task, WorkflowDefinition definition) { + super(position, task, definition); this.collectionExpr = buildCollectionFilter(); this.whileExpr = buildWhileFilter(); - this.taskExecutor = - TaskExecutorHelper.createExecutorList( - position, task.getDo(), workflow, application, resourceLoader); + this.taskExecutor = TaskExecutorHelper.createExecutorList(position, task.getDo(), definition); } protected Optional buildWhileFilter() { diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForkExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForkExecutor.java index 75873e63..44acdd67 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForkExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForkExecutor.java @@ -17,14 +17,12 @@ import io.serverlessworkflow.api.types.ForkTask; import io.serverlessworkflow.api.types.ForkTaskConfiguration; -import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.impl.TaskContext; -import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.WorkflowMutablePosition; import io.serverlessworkflow.impl.executors.RegularTaskExecutor.RegularTaskExecutorBuilder; -import io.serverlessworkflow.impl.resources.ResourceLoader; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; @@ -48,16 +46,11 @@ public static class ForkExecutorBuilder extends RegularTaskExecutorBuilder detailFilter; protected RaiseExecutorBuilder( - WorkflowMutablePosition position, - RaiseTask task, - Workflow workflow, - WorkflowApplication application, - ResourceLoader resourceLoader) { - super(position, task, workflow, application, resourceLoader); + WorkflowMutablePosition position, RaiseTask task, WorkflowDefinition definition) { + super(position, task, definition); RaiseTaskError raiseError = task.getRaise().getError(); Error error = raiseError.getRaiseErrorDefinition() != null diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RegularTaskExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RegularTaskExecutor.java index 30744e71..9213dc3f 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RegularTaskExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RegularTaskExecutor.java @@ -16,13 +16,11 @@ package io.serverlessworkflow.impl.executors; import io.serverlessworkflow.api.types.TaskBase; -import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.impl.TaskContext; -import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.WorkflowMutablePosition; -import io.serverlessworkflow.impl.resources.ResourceLoader; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -40,12 +38,8 @@ public abstract static class RegularTaskExecutorBuilder private TransitionInfoBuilder transition; protected RegularTaskExecutorBuilder( - WorkflowMutablePosition position, - T task, - Workflow workflow, - WorkflowApplication application, - ResourceLoader resourceLoader) { - super(position, task, workflow, application, resourceLoader); + WorkflowMutablePosition position, T task, WorkflowDefinition definition) { + super(position, task, definition); } public void connect(Map> connections) { diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/SetExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/SetExecutor.java index fda5b6f9..5394c8e5 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/SetExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/SetExecutor.java @@ -18,15 +18,13 @@ import io.serverlessworkflow.api.types.Set; import io.serverlessworkflow.api.types.SetTask; import io.serverlessworkflow.api.types.SetTaskConfiguration; -import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.impl.TaskContext; -import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowFilter; import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.WorkflowMutablePosition; import io.serverlessworkflow.impl.WorkflowUtils; -import io.serverlessworkflow.impl.resources.ResourceLoader; import java.util.concurrent.CompletableFuture; public class SetExecutor extends RegularTaskExecutor { @@ -38,12 +36,8 @@ public static class SetExecutorBuilder extends RegularTaskExecutorBuilder getTaskExecutor( - WorkflowMutablePosition position, - Task task, - Workflow workflow, - WorkflowApplication application, - ResourceLoader resourceLoader); + WorkflowMutablePosition position, Task task, WorkflowDefinition definition); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TaskExecutorHelper.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TaskExecutorHelper.java index 32516e5b..c5b26e0d 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TaskExecutorHelper.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TaskExecutorHelper.java @@ -16,14 +16,12 @@ package io.serverlessworkflow.impl.executors; import io.serverlessworkflow.api.types.TaskItem; -import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.impl.TaskContext; -import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.WorkflowMutablePosition; import io.serverlessworkflow.impl.WorkflowStatus; -import io.serverlessworkflow.impl.resources.ResourceLoader; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; @@ -62,11 +60,9 @@ public static boolean isActive(WorkflowStatus status) { public static TaskExecutor createExecutorList( WorkflowMutablePosition position, List taskItems, - Workflow workflow, - WorkflowApplication application, - ResourceLoader resourceLoader) { + WorkflowDefinition workflowDefinition) { Map> executors = - createExecutorBuilderList(position, taskItems, workflow, application, resourceLoader, "do"); + createExecutorBuilderList(position, taskItems, workflowDefinition, "do"); executors.values().forEach(t -> t.connect(executors)); Iterator> iter = executors.values().iterator(); TaskExecutor first = iter.next().build(); @@ -77,34 +73,24 @@ public static TaskExecutor createExecutorList( } public static Map> createBranchList( - WorkflowMutablePosition position, - List taskItems, - Workflow workflow, - WorkflowApplication application, - ResourceLoader resourceLoader) { - return createExecutorBuilderList( - position, taskItems, workflow, application, resourceLoader, "branch") - .entrySet() - .stream() + WorkflowMutablePosition position, List taskItems, WorkflowDefinition definition) { + return createExecutorBuilderList(position, taskItems, definition, "branch").entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().build())); } private static Map> createExecutorBuilderList( WorkflowMutablePosition position, List taskItems, - Workflow workflow, - WorkflowApplication application, - ResourceLoader resourceLoader, + WorkflowDefinition definition, String containerName) { - TaskExecutorFactory taskFactory = application.taskFactory(); + TaskExecutorFactory taskFactory = definition.application().taskFactory(); Map> executors = new LinkedHashMap<>(); position.addProperty(containerName); int index = 0; for (TaskItem item : taskItems) { position.addIndex(index++).addProperty(item.getName()); TaskExecutorBuilder taskExecutorBuilder = - taskFactory.getTaskExecutor( - position.copy(), item.getTask(), workflow, application, resourceLoader); + taskFactory.getTaskExecutor(position.copy(), item.getTask(), definition); executors.put(item.getName(), taskExecutorBuilder); position.back().back(); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TryExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TryExecutor.java index 5d303aff..d7d7cf0b 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TryExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TryExecutor.java @@ -20,17 +20,15 @@ import io.serverlessworkflow.api.types.TaskItem; import io.serverlessworkflow.api.types.TryTask; import io.serverlessworkflow.api.types.TryTaskCatch; -import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.impl.TaskContext; -import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowError; import io.serverlessworkflow.impl.WorkflowException; import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.WorkflowMutablePosition; import io.serverlessworkflow.impl.WorkflowPredicate; import io.serverlessworkflow.impl.WorkflowUtils; -import io.serverlessworkflow.impl.resources.ResourceLoader; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -54,25 +52,20 @@ public static class TryExecutorBuilder extends RegularTaskExecutorBuilder> catchTaskExecutor; protected TryExecutorBuilder( - WorkflowMutablePosition position, - TryTask task, - Workflow workflow, - WorkflowApplication application, - ResourceLoader resourceLoader) { - super(position, task, workflow, application, resourceLoader); + WorkflowMutablePosition position, TryTask task, WorkflowDefinition definition) { + super(position, task, definition); TryTaskCatch catchInfo = task.getCatch(); this.errorFilter = buildErrorFilter(catchInfo.getErrors()); this.whenFilter = WorkflowUtils.optionalPredicate(application, catchInfo.getWhen()); this.exceptFilter = WorkflowUtils.optionalPredicate(application, catchInfo.getExceptWhen()); this.taskExecutor = - TaskExecutorHelper.createExecutorList( - position, task.getTry(), workflow, application, resourceLoader); + TaskExecutorHelper.createExecutorList(position, task.getTry(), definition); List catchTask = task.getCatch().getDo(); this.catchTaskExecutor = catchTask != null && !catchTask.isEmpty() ? Optional.of( TaskExecutorHelper.createExecutorList( - position, task.getCatch().getDo(), workflow, application, resourceLoader)) + position, task.getCatch().getDo(), definition)) : Optional.empty(); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/WaitExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/WaitExecutor.java index afd1e890..d649f47e 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/WaitExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/WaitExecutor.java @@ -17,15 +17,13 @@ import io.serverlessworkflow.api.types.DurationInline; import io.serverlessworkflow.api.types.WaitTask; -import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.impl.TaskContext; -import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.WorkflowMutableInstance; import io.serverlessworkflow.impl.WorkflowMutablePosition; import io.serverlessworkflow.impl.WorkflowStatus; -import io.serverlessworkflow.impl.resources.ResourceLoader; import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -38,12 +36,8 @@ public static class WaitExecutorBuilder extends RegularTaskExecutorBuilder