Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.serverlessworkflow.impl.executors.ForkExecutor.ForkExecutorBuilder;
import io.serverlessworkflow.impl.executors.ListenExecutor.ListenExecutorBuilder;
import io.serverlessworkflow.impl.executors.RaiseExecutor.RaiseExecutorBuilder;
import io.serverlessworkflow.impl.executors.RunTaskExecutor.RunTaskExecutorBuilder;
import io.serverlessworkflow.impl.executors.SetExecutor.SetExecutorBuilder;
import io.serverlessworkflow.impl.executors.SwitchExecutor.SwitchExecutorBuilder;
import io.serverlessworkflow.impl.executors.TryExecutor.TryExecutorBuilder;
Expand Down Expand Up @@ -76,6 +77,8 @@ public TaskExecutorBuilder<? extends TaskBase> getTaskExecutor(
return new ListenExecutorBuilder(position, task.getListenTask(), definition);
} else if (task.getEmitTask() != null) {
return new EmitExecutorBuilder(position, task.getEmitTask(), definition);
} else if (task.getRunTask() != null) {
return new RunTaskExecutorBuilder(position, task.getRunTask(), definition);
}
throw new UnsupportedOperationException(task.get().getClass().getName() + " not supported yet");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.executors;

import io.serverlessworkflow.api.types.RunTask;
import io.serverlessworkflow.api.types.RunTaskConfiguration;
import io.serverlessworkflow.impl.TaskContext;
import io.serverlessworkflow.impl.WorkflowContext;
import io.serverlessworkflow.impl.WorkflowDefinition;
import io.serverlessworkflow.impl.WorkflowModel;
import io.serverlessworkflow.impl.WorkflowMutablePosition;
import java.util.ServiceLoader;
import java.util.ServiceLoader.Provider;
import java.util.concurrent.CompletableFuture;

public class RunTaskExecutor<T extends RunTaskConfiguration> extends RegularTaskExecutor<RunTask> {

private final RunnableTask<T> runnable;

private static final ServiceLoader<RunnableTask> runnables =
ServiceLoader.load(RunnableTask.class);

public static class RunTaskExecutorBuilder extends RegularTaskExecutorBuilder<RunTask> {
private RunnableTask runnable;

protected RunTaskExecutorBuilder(
WorkflowMutablePosition position, RunTask task, WorkflowDefinition definition) {
super(position, task, definition);
RunTaskConfiguration config = task.getRun().get();
this.runnable =
runnables.stream()
.map(Provider::get)
.filter(r -> r.accept(config.getClass()))
.findFirst()
.orElseThrow(
() ->
new UnsupportedOperationException(
"No runnable found for operation " + config.getClass()));
runnable.init(config, definition);
}

@Override
public RunTaskExecutor buildInstance() {
return new RunTaskExecutor(this);
}
}

protected RunTaskExecutor(RunTaskExecutorBuilder builder) {
super(builder);
this.runnable = builder.runnable;
}

@Override
protected CompletableFuture<WorkflowModel> internalExecute(
WorkflowContext workflow, TaskContext taskContext) {
return runnable.apply(workflow, taskContext, taskContext.input());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.executors;

import io.serverlessworkflow.api.types.RunTaskConfiguration;
import io.serverlessworkflow.api.types.RunWorkflow;
import io.serverlessworkflow.api.types.SubflowConfiguration;
import io.serverlessworkflow.impl.TaskContext;
import io.serverlessworkflow.impl.WorkflowContext;
import io.serverlessworkflow.impl.WorkflowDefinition;
import io.serverlessworkflow.impl.WorkflowDefinitionId;
import io.serverlessworkflow.impl.WorkflowModel;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

public class RunWorkflowExecutor implements RunnableTask<RunWorkflow> {

private WorkflowDefinitionId workflowDefinitionId;
private Map<String, Object> additionalParameters;

public void init(RunWorkflow taskConfiguration, WorkflowDefinition definition) {
SubflowConfiguration workflowConfig = taskConfiguration.getWorkflow();
this.workflowDefinitionId =
new WorkflowDefinitionId(
workflowConfig.getNamespace(), workflowConfig.getName(), workflowConfig.getVersion());
this.additionalParameters = workflowConfig.getInput().getAdditionalProperties();
}

@Override
public CompletableFuture<WorkflowModel> apply(
WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) {
WorkflowDefinition definition =
workflowContext.definition().application().workflowDefinitions().get(workflowDefinitionId);
if (definition != null) {
// TODO add additional parameters
return definition.instance(input).start();
} else {
throw new IllegalArgumentException(
"Workflow definition for " + workflowDefinitionId + " has not been found");
}
}

@Override
public boolean accept(Class<? extends RunTaskConfiguration> clazz) {
return RunWorkflow.class.equals(clazz);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.executors;

import io.serverlessworkflow.api.types.RunTaskConfiguration;
import io.serverlessworkflow.impl.TaskContext;
import io.serverlessworkflow.impl.WorkflowContext;
import io.serverlessworkflow.impl.WorkflowDefinition;
import io.serverlessworkflow.impl.WorkflowModel;
import java.util.concurrent.CompletableFuture;

public interface RunnableTask<T extends RunTaskConfiguration> {
default void init(T taskConfiguration, WorkflowDefinition definition) {}

CompletableFuture<WorkflowModel> apply(
WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input);

boolean accept(Class<? extends RunTaskConfiguration> clazz);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
io.serverlessworkflow.impl.executors.RunWorkflowExecutor