From 91cd7b95913c44ad27bc0738fbcbfed1e5ec24c4 Mon Sep 17 00:00:00 2001 From: fjtirado Date: Thu, 23 Oct 2025 12:14:43 +0200 Subject: [PATCH] [Fix #899] Add RunTask infrastructure Fix https://github.com/serverlessworkflow/sdk-java/issues/899 Signed-off-by: fjtirado --- .../executors/DefaultTaskExecutorFactory.java | 3 + .../impl/executors/RunTaskExecutor.java | 71 +++++++++++++++++++ .../impl/executors/RunWorkflowExecutor.java | 60 ++++++++++++++++ .../impl/executors/RunnableTask.java | 32 +++++++++ ...erlessworkflow.impl.executors.RunnableTask | 1 + 5 files changed, 167 insertions(+) create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunTaskExecutor.java create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunWorkflowExecutor.java create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunnableTask.java create mode 100644 impl/core/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.RunnableTask diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DefaultTaskExecutorFactory.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DefaultTaskExecutorFactory.java index f8f7ccb0..dedbf30d 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DefaultTaskExecutorFactory.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DefaultTaskExecutorFactory.java @@ -27,6 +27,7 @@ import io.serverlessworkflow.impl.executors.ForkExecutor.ForkExecutorBuilder; import io.serverlessworkflow.impl.executors.ListenExecutor.ListenExecutorBuilder; import io.serverlessworkflow.impl.executors.RaiseExecutor.RaiseExecutorBuilder; +import io.serverlessworkflow.impl.executors.RunTaskExecutor.RunTaskExecutorBuilder; import io.serverlessworkflow.impl.executors.SetExecutor.SetExecutorBuilder; import io.serverlessworkflow.impl.executors.SwitchExecutor.SwitchExecutorBuilder; import io.serverlessworkflow.impl.executors.TryExecutor.TryExecutorBuilder; @@ -76,6 +77,8 @@ public TaskExecutorBuilder getTaskExecutor( return new ListenExecutorBuilder(position, task.getListenTask(), definition); } else if (task.getEmitTask() != null) { return new EmitExecutorBuilder(position, task.getEmitTask(), definition); + } else if (task.getRunTask() != null) { + return new RunTaskExecutorBuilder(position, task.getRunTask(), definition); } throw new UnsupportedOperationException(task.get().getClass().getName() + " not supported yet"); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunTaskExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunTaskExecutor.java new file mode 100644 index 00000000..5eeedddc --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunTaskExecutor.java @@ -0,0 +1,71 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors; + +import io.serverlessworkflow.api.types.RunTask; +import io.serverlessworkflow.api.types.RunTaskConfiguration; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowMutablePosition; +import java.util.ServiceLoader; +import java.util.ServiceLoader.Provider; +import java.util.concurrent.CompletableFuture; + +public class RunTaskExecutor extends RegularTaskExecutor { + + private final RunnableTask runnable; + + private static final ServiceLoader runnables = + ServiceLoader.load(RunnableTask.class); + + public static class RunTaskExecutorBuilder extends RegularTaskExecutorBuilder { + private RunnableTask runnable; + + protected RunTaskExecutorBuilder( + WorkflowMutablePosition position, RunTask task, WorkflowDefinition definition) { + super(position, task, definition); + RunTaskConfiguration config = task.getRun().get(); + this.runnable = + runnables.stream() + .map(Provider::get) + .filter(r -> r.accept(config.getClass())) + .findFirst() + .orElseThrow( + () -> + new UnsupportedOperationException( + "No runnable found for operation " + config.getClass())); + runnable.init(config, definition); + } + + @Override + public RunTaskExecutor buildInstance() { + return new RunTaskExecutor(this); + } + } + + protected RunTaskExecutor(RunTaskExecutorBuilder builder) { + super(builder); + this.runnable = builder.runnable; + } + + @Override + protected CompletableFuture internalExecute( + WorkflowContext workflow, TaskContext taskContext) { + return runnable.apply(workflow, taskContext, taskContext.input()); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunWorkflowExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunWorkflowExecutor.java new file mode 100644 index 00000000..66cf27e5 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunWorkflowExecutor.java @@ -0,0 +1,60 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors; + +import io.serverlessworkflow.api.types.RunTaskConfiguration; +import io.serverlessworkflow.api.types.RunWorkflow; +import io.serverlessworkflow.api.types.SubflowConfiguration; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowDefinitionId; +import io.serverlessworkflow.impl.WorkflowModel; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +public class RunWorkflowExecutor implements RunnableTask { + + private WorkflowDefinitionId workflowDefinitionId; + private Map additionalParameters; + + public void init(RunWorkflow taskConfiguration, WorkflowDefinition definition) { + SubflowConfiguration workflowConfig = taskConfiguration.getWorkflow(); + this.workflowDefinitionId = + new WorkflowDefinitionId( + workflowConfig.getNamespace(), workflowConfig.getName(), workflowConfig.getVersion()); + this.additionalParameters = workflowConfig.getInput().getAdditionalProperties(); + } + + @Override + public CompletableFuture apply( + WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) { + WorkflowDefinition definition = + workflowContext.definition().application().workflowDefinitions().get(workflowDefinitionId); + if (definition != null) { + // TODO add additional parameters + return definition.instance(input).start(); + } else { + throw new IllegalArgumentException( + "Workflow definition for " + workflowDefinitionId + " has not been found"); + } + } + + @Override + public boolean accept(Class clazz) { + return RunWorkflow.class.equals(clazz); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunnableTask.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunnableTask.java new file mode 100644 index 00000000..d6bf032e --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunnableTask.java @@ -0,0 +1,32 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors; + +import io.serverlessworkflow.api.types.RunTaskConfiguration; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowModel; +import java.util.concurrent.CompletableFuture; + +public interface RunnableTask { + default void init(T taskConfiguration, WorkflowDefinition definition) {} + + CompletableFuture apply( + WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input); + + boolean accept(Class clazz); +} diff --git a/impl/core/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.RunnableTask b/impl/core/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.RunnableTask new file mode 100644 index 00000000..a26105df --- /dev/null +++ b/impl/core/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.RunnableTask @@ -0,0 +1 @@ +io.serverlessworkflow.impl.executors.RunWorkflowExecutor \ No newline at end of file