Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,20 @@
package io.serverlessworkflow.impl.executors.func;

import io.serverlessworkflow.api.types.TaskBase;
import io.serverlessworkflow.api.types.Workflow;
import io.serverlessworkflow.api.types.func.CallJava;
import io.serverlessworkflow.impl.TaskContext;
import io.serverlessworkflow.impl.WorkflowApplication;
import io.serverlessworkflow.impl.WorkflowContext;
import io.serverlessworkflow.impl.WorkflowDefinition;
import io.serverlessworkflow.impl.WorkflowModel;
import io.serverlessworkflow.impl.executors.CallableTask;
import io.serverlessworkflow.impl.resources.ResourceLoader;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

public class JavaConsumerCallExecutor implements CallableTask<CallJava.CallJavaConsumer> {

private Consumer consumer;

public void init(
CallJava.CallJavaConsumer task,
Workflow workflow,
WorkflowApplication application,
ResourceLoader loader) {
public void init(CallJava.CallJavaConsumer task, WorkflowDefinition definition) {
consumer = task.consumer();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,23 @@
import static io.serverlessworkflow.impl.executors.func.JavaFuncUtils.safeObject;

import io.serverlessworkflow.api.types.ForTask;
import io.serverlessworkflow.api.types.Workflow;
import io.serverlessworkflow.api.types.func.ForTaskFunction;
import io.serverlessworkflow.api.types.func.LoopPredicateIndex;
import io.serverlessworkflow.api.types.func.TypedFunction;
import io.serverlessworkflow.impl.WorkflowApplication;
import io.serverlessworkflow.impl.WorkflowDefinition;
import io.serverlessworkflow.impl.WorkflowMutablePosition;
import io.serverlessworkflow.impl.WorkflowPredicate;
import io.serverlessworkflow.impl.WorkflowValueResolver;
import io.serverlessworkflow.impl.executors.ForExecutor.ForExecutorBuilder;
import io.serverlessworkflow.impl.expressions.ExpressionDescriptor;
import io.serverlessworkflow.impl.resources.ResourceLoader;
import java.util.Collection;
import java.util.Optional;

public class JavaForExecutorBuilder extends ForExecutorBuilder {

protected JavaForExecutorBuilder(
WorkflowMutablePosition position,
ForTask task,
Workflow workflow,
WorkflowApplication application,
ResourceLoader resourceLoader) {
super(position, task, workflow, application, resourceLoader);
WorkflowMutablePosition position, ForTask task, WorkflowDefinition definition) {
super(position, task, definition);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,13 @@
package io.serverlessworkflow.impl.executors.func;

import io.serverlessworkflow.api.types.TaskBase;
import io.serverlessworkflow.api.types.Workflow;
import io.serverlessworkflow.api.types.func.CallJava;
import io.serverlessworkflow.impl.TaskContext;
import io.serverlessworkflow.impl.WorkflowApplication;
import io.serverlessworkflow.impl.WorkflowContext;
import io.serverlessworkflow.impl.WorkflowDefinition;
import io.serverlessworkflow.impl.WorkflowModel;
import io.serverlessworkflow.impl.WorkflowModelFactory;
import io.serverlessworkflow.impl.executors.CallableTask;
import io.serverlessworkflow.impl.resources.ResourceLoader;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
Expand All @@ -39,11 +37,7 @@ static String fromInt(Integer integer) {
return Integer.toString(integer);
}

public void init(
CallJava.CallJavaFunction<T, V> task,
Workflow workflow,
WorkflowApplication application,
ResourceLoader loader) {
public void init(CallJava.CallJavaFunction<T, V> task, WorkflowDefinition definition) {
function = task.function();
inputClass = task.inputClass();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,18 @@

import io.serverlessworkflow.api.types.ListenTask;
import io.serverlessworkflow.api.types.Until;
import io.serverlessworkflow.api.types.Workflow;
import io.serverlessworkflow.api.types.func.UntilPredicate;
import io.serverlessworkflow.impl.WorkflowApplication;
import io.serverlessworkflow.impl.WorkflowDefinition;
import io.serverlessworkflow.impl.WorkflowMutablePosition;
import io.serverlessworkflow.impl.WorkflowPredicate;
import io.serverlessworkflow.impl.executors.ListenExecutor.ListenExecutorBuilder;
import io.serverlessworkflow.impl.expressions.ExpressionDescriptor;
import io.serverlessworkflow.impl.resources.ResourceLoader;

public class JavaListenExecutorBuilder extends ListenExecutorBuilder {

protected JavaListenExecutorBuilder(
WorkflowMutablePosition position,
ListenTask task,
Workflow workflow,
WorkflowApplication application,
ResourceLoader resourceLoader) {
super(position, task, workflow, application, resourceLoader);
WorkflowMutablePosition position, ListenTask task, WorkflowDefinition definition) {
super(position, task, definition);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,22 @@
import static io.serverlessworkflow.impl.executors.func.JavaFuncUtils.safeObject;

import io.serverlessworkflow.api.types.TaskBase;
import io.serverlessworkflow.api.types.Workflow;
import io.serverlessworkflow.api.types.func.CallJava;
import io.serverlessworkflow.api.types.func.LoopFunction;
import io.serverlessworkflow.impl.TaskContext;
import io.serverlessworkflow.impl.WorkflowApplication;
import io.serverlessworkflow.impl.WorkflowContext;
import io.serverlessworkflow.impl.WorkflowDefinition;
import io.serverlessworkflow.impl.WorkflowModel;
import io.serverlessworkflow.impl.WorkflowModelFactory;
import io.serverlessworkflow.impl.executors.CallableTask;
import io.serverlessworkflow.impl.resources.ResourceLoader;
import java.util.concurrent.CompletableFuture;

public class JavaLoopFunctionCallExecutor implements CallableTask<CallJava.CallJavaLoopFunction> {

private LoopFunction function;
private String varName;

public void init(
CallJava.CallJavaLoopFunction task,
Workflow workflow,
WorkflowApplication application,
ResourceLoader loader) {
public void init(CallJava.CallJavaLoopFunction task, WorkflowDefinition definition) {
function = task.function();
varName = task.varName();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,14 @@
import static io.serverlessworkflow.impl.executors.func.JavaFuncUtils.safeObject;

import io.serverlessworkflow.api.types.TaskBase;
import io.serverlessworkflow.api.types.Workflow;
import io.serverlessworkflow.api.types.func.CallJava;
import io.serverlessworkflow.api.types.func.LoopFunctionIndex;
import io.serverlessworkflow.impl.TaskContext;
import io.serverlessworkflow.impl.WorkflowApplication;
import io.serverlessworkflow.impl.WorkflowContext;
import io.serverlessworkflow.impl.WorkflowDefinition;
import io.serverlessworkflow.impl.WorkflowModel;
import io.serverlessworkflow.impl.WorkflowModelFactory;
import io.serverlessworkflow.impl.executors.CallableTask;
import io.serverlessworkflow.impl.resources.ResourceLoader;
import java.util.concurrent.CompletableFuture;

public class JavaLoopFunctionIndexCallExecutor
Expand All @@ -37,11 +35,7 @@ public class JavaLoopFunctionIndexCallExecutor
private String varName;
private String indexName;

public void init(
CallJava.CallJavaLoopFunctionIndex task,
Workflow workflow,
WorkflowApplication application,
ResourceLoader loader) {
public void init(CallJava.CallJavaLoopFunctionIndex task, WorkflowDefinition definition) {
function = task.function();
varName = task.varName();
indexName = task.indexName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,19 @@

import io.serverlessworkflow.api.types.SwitchCase;
import io.serverlessworkflow.api.types.SwitchTask;
import io.serverlessworkflow.api.types.Workflow;
import io.serverlessworkflow.api.types.func.SwitchCaseFunction;
import io.serverlessworkflow.impl.WorkflowApplication;
import io.serverlessworkflow.impl.WorkflowDefinition;
import io.serverlessworkflow.impl.WorkflowMutablePosition;
import io.serverlessworkflow.impl.WorkflowPredicate;
import io.serverlessworkflow.impl.executors.SwitchExecutor.SwitchExecutorBuilder;
import io.serverlessworkflow.impl.expressions.ExpressionDescriptor;
import io.serverlessworkflow.impl.resources.ResourceLoader;
import java.util.Optional;

public class JavaSwitchExecutorBuilder extends SwitchExecutorBuilder {

protected JavaSwitchExecutorBuilder(
WorkflowMutablePosition position,
SwitchTask task,
Workflow workflow,
WorkflowApplication application,
ResourceLoader resourceLoader) {
super(position, task, workflow, application, resourceLoader);
WorkflowMutablePosition position, SwitchTask task, WorkflowDefinition definition) {
super(position, task, definition);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,23 @@

import io.serverlessworkflow.api.types.Task;
import io.serverlessworkflow.api.types.TaskBase;
import io.serverlessworkflow.api.types.Workflow;
import io.serverlessworkflow.impl.WorkflowApplication;
import io.serverlessworkflow.impl.WorkflowDefinition;
import io.serverlessworkflow.impl.WorkflowMutablePosition;
import io.serverlessworkflow.impl.executors.DefaultTaskExecutorFactory;
import io.serverlessworkflow.impl.executors.TaskExecutorBuilder;
import io.serverlessworkflow.impl.resources.ResourceLoader;

public class JavaTaskExecutorFactory extends DefaultTaskExecutorFactory {

public TaskExecutorBuilder<? extends TaskBase> getTaskExecutor(
WorkflowMutablePosition position,
Task task,
Workflow workflow,
WorkflowApplication application,
ResourceLoader resourceLoader) {
WorkflowMutablePosition position, Task task, WorkflowDefinition definition) {
if (task.getForTask() != null) {
return new JavaForExecutorBuilder(
position, task.getForTask(), workflow, application, resourceLoader);
return new JavaForExecutorBuilder(position, task.getForTask(), definition);
} else if (task.getSwitchTask() != null) {
return new JavaSwitchExecutorBuilder(
position, task.getSwitchTask(), workflow, application, resourceLoader);
return new JavaSwitchExecutorBuilder(position, task.getSwitchTask(), definition);
} else if (task.getListenTask() != null) {
return new JavaListenExecutorBuilder(
position, task.getListenTask(), workflow, application, resourceLoader);
return new JavaListenExecutorBuilder(position, task.getListenTask(), definition);
} else {
return super.getTaskExecutor(position, task, workflow, application, resourceLoader);
return super.getTaskExecutor(position, task, definition);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@ public class WorkflowDefinition implements AutoCloseable, WorkflowDefinitionData
private Optional<WorkflowFilter> outputFilter = Optional.empty();
private final WorkflowApplication application;
private final TaskExecutor<?> taskExecutor;
private final ResourceLoader resourceLoader;

private WorkflowDefinition(
WorkflowApplication application, Workflow workflow, ResourceLoader resourceLoader) {
this.workflow = workflow;
this.application = application;
this.resourceLoader = resourceLoader;
if (workflow.getInput() != null) {
Input input = workflow.getInput();
this.inputSchemaValidator =
Expand All @@ -55,11 +57,7 @@ private WorkflowDefinition(
}
this.taskExecutor =
TaskExecutorHelper.createExecutorList(
application.positionFactory().get(),
workflow.getDo(),
workflow,
application,
resourceLoader);
application.positionFactory().get(), workflow.getDo(), this);
}

static WorkflowDefinition of(WorkflowApplication application, Workflow workflow) {
Expand Down Expand Up @@ -105,6 +103,10 @@ public WorkflowApplication application() {
return application;
}

public ResourceLoader resourceLoader() {
return resourceLoader;
}

@Override
public void close() {
// TODO close resourcers hold for uncompleted process instances, if any
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.serverlessworkflow.impl.TaskContext;
import io.serverlessworkflow.impl.WorkflowApplication;
import io.serverlessworkflow.impl.WorkflowContext;
import io.serverlessworkflow.impl.WorkflowDefinition;
import io.serverlessworkflow.impl.WorkflowFilter;
import io.serverlessworkflow.impl.WorkflowModel;
import io.serverlessworkflow.impl.WorkflowMutablePosition;
Expand Down Expand Up @@ -81,17 +82,13 @@ public abstract static class AbstractTaskExecutorBuilder<
private V instance;

protected AbstractTaskExecutorBuilder(
WorkflowMutablePosition position,
T task,
Workflow workflow,
WorkflowApplication application,
ResourceLoader resourceLoader) {
this.workflow = workflow;
WorkflowMutablePosition position, T task, WorkflowDefinition definition) {
this.workflow = definition.workflow();
this.taskName = position.last().toString();
this.position = position;
this.task = task;
this.application = application;
this.resourceLoader = resourceLoader;
this.application = definition.application();
this.resourceLoader = definition.resourceLoader();
if (task.getInput() != null) {
Input input = task.getInput();
this.inputProcessor = buildWorkflowFilter(application, input.getFrom());
Expand Down Expand Up @@ -174,16 +171,18 @@ protected AbstractTaskExecutor(AbstractTaskExecutorBuilder<T, ?> builder) {

protected final CompletableFuture<TaskContext> executeNext(
CompletableFuture<TaskContext> future, WorkflowContext workflow) {
return future.thenCompose(
t -> {
TransitionInfo transition = t.transition();
if (transition.isEndNode()) {
workflow.instance().status(WorkflowStatus.COMPLETED);
} else if (transition.next() != null) {
return transition.next().apply(workflow, t.parent(), t.output());
}
return CompletableFuture.completedFuture(t);
});
return future.thenCompose(t -> executeNext(workflow, t));
}

private CompletableFuture<TaskContext> executeNext(
WorkflowContext workflow, TaskContext taskContext) {
TransitionInfo transition = taskContext.transition();
if (transition.isEndNode()) {
workflow.instance().status(WorkflowStatus.COMPLETED);
} else if (transition.next() != null) {
return transition.next().apply(workflow, taskContext.parent(), taskContext.output());
}
return CompletableFuture.completedFuture(taskContext);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,11 @@
package io.serverlessworkflow.impl.executors;

import io.serverlessworkflow.api.types.TaskBase;
import io.serverlessworkflow.api.types.Workflow;
import io.serverlessworkflow.impl.TaskContext;
import io.serverlessworkflow.impl.WorkflowApplication;
import io.serverlessworkflow.impl.WorkflowContext;
import io.serverlessworkflow.impl.WorkflowDefinition;
import io.serverlessworkflow.impl.WorkflowModel;
import io.serverlessworkflow.impl.WorkflowMutablePosition;
import io.serverlessworkflow.impl.resources.ResourceLoader;
import java.util.concurrent.CompletableFuture;

public class CallTaskExecutor<T extends TaskBase> extends RegularTaskExecutor<T> {
Expand All @@ -36,13 +34,11 @@ public static class CallTaskExecutorBuilder<T extends TaskBase>
protected CallTaskExecutorBuilder(
WorkflowMutablePosition position,
T task,
Workflow workflow,
WorkflowApplication application,
ResourceLoader resourceLoader,
WorkflowDefinition definition,
CallableTask<T> callable) {
super(position, task, workflow, application, resourceLoader);
super(position, task, definition);
this.callable = callable;
callable.init(task, workflow, application, resourceLoader);
callable.init(task, definition);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,14 @@
package io.serverlessworkflow.impl.executors;

import io.serverlessworkflow.api.types.TaskBase;
import io.serverlessworkflow.api.types.Workflow;
import io.serverlessworkflow.impl.TaskContext;
import io.serverlessworkflow.impl.WorkflowApplication;
import io.serverlessworkflow.impl.WorkflowContext;
import io.serverlessworkflow.impl.WorkflowDefinition;
import io.serverlessworkflow.impl.WorkflowModel;
import io.serverlessworkflow.impl.resources.ResourceLoader;
import java.util.concurrent.CompletableFuture;

public interface CallableTask<T extends TaskBase> {
default void init(
T task, Workflow workflow, WorkflowApplication application, ResourceLoader loader) {}
default void init(T task, WorkflowDefinition definition) {}

CompletableFuture<WorkflowModel> apply(
WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input);
Expand Down
Loading