From 6fe63bd0d5d3f385846790d2babbbd91f4432ae9 Mon Sep 17 00:00:00 2001 From: fjtirado Date: Tue, 2 Dec 2025 19:54:14 +0100 Subject: [PATCH 1/2] [Fix #936] Adding Python implementation Signed-off-by: fjtirado --- .../impl/WorkflowMutableInstance.java | 6 + .../impl/config/AbstractConfigManager.java | 17 +++ .../impl/config/ConfigManager.java | 3 + .../impl/scripts/AbstractScriptRunner.java | 79 +++++++++++++ impl/pom.xml | 12 ++ impl/python/pom.xml | 20 ++++ .../script/python/PythonScriptTaskRunner.java | 66 +++++++++++ ...rverlessworkflow.impl.scripts.ScriptRunner | 1 + .../script/js/JavaScriptScriptTaskRunner.java | 108 ++++-------------- impl/test/pom.xml | 4 + .../impl/test/CustomFunctionTest.java | 69 +++++++++++ .../impl/test/RetryTimeoutTest.java | 32 ------ .../impl/test/RunScriptJavaScriptTest.java | 26 ++--- ...call-custom-function-cataloged-global.yaml | 2 +- .../call-custom-function-cataloged.yaml | 2 +- 15 files changed, 311 insertions(+), 136 deletions(-) create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/scripts/AbstractScriptRunner.java create mode 100644 impl/python/pom.xml create mode 100644 impl/python/src/main/java/io/serverlessworkflow/impl/executors/script/python/PythonScriptTaskRunner.java create mode 100644 impl/python/src/main/resources/META-INF/services/io.serverlessworkflow.impl.scripts.ScriptRunner create mode 100644 impl/test/src/test/java/io/serverlessworkflow/impl/test/CustomFunctionTest.java diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java index a808650c..0b5e91d5 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java @@ -45,6 +45,8 @@ public class WorkflowMutableInstance implements WorkflowInstance { protected final WorkflowContext workflowContext; protected Instant startedAt; + protected final Map additionalObjects = new ConcurrentHashMap(); + protected AtomicReference> futureRef = new AtomicReference<>(); protected Instant completedAt; @@ -290,4 +292,8 @@ public T additionalObject(String key, Supplier supplier) { } public void restoreContext(WorkflowContext workflow, TaskContext context) {} + + public T additionalObject(String key, Supplier supplier) { + return (T) additionalObjects.computeIfAbsent(key, k -> supplier.get()); + } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/config/AbstractConfigManager.java b/impl/core/src/main/java/io/serverlessworkflow/impl/config/AbstractConfigManager.java index f12c341f..ddb7b5dd 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/config/AbstractConfigManager.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/config/AbstractConfigManager.java @@ -17,6 +17,9 @@ import java.time.Instant; import java.time.OffsetDateTime; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.Optional; public abstract class AbstractConfigManager implements ConfigManager { @@ -56,5 +59,19 @@ protected T convert(String value, Class propClass) { return propClass.cast(result); } + @Override + public Collection multiConfig(String propName, Class propClass) { + String multiValue = get(propName); + if (multiValue != null) { + Collection result = new ArrayList<>(); + for (String value : multiValue.split(",")) { + result.add(convert(value, propClass)); + } + return result; + } else { + return Collections.emptyList(); + } + } + protected abstract T convertComplex(String value, Class propClass); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/config/ConfigManager.java b/impl/core/src/main/java/io/serverlessworkflow/impl/config/ConfigManager.java index 08f2fb0e..d8daa2ae 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/config/ConfigManager.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/config/ConfigManager.java @@ -16,11 +16,14 @@ package io.serverlessworkflow.impl.config; import io.serverlessworkflow.impl.ServicePriority; +import java.util.Collection; import java.util.Optional; public interface ConfigManager extends ServicePriority { Optional config(String propName, Class propClass); + Collection multiConfig(String propName, Class propClass); + Iterable names(); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/scripts/AbstractScriptRunner.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scripts/AbstractScriptRunner.java new file mode 100644 index 00000000..37921aad --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scripts/AbstractScriptRunner.java @@ -0,0 +1,79 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.scripts; + +import io.serverlessworkflow.api.types.RunTaskConfiguration; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowError; +import io.serverlessworkflow.impl.WorkflowException; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowModelFactory; +import io.serverlessworkflow.impl.executors.ProcessResult; +import java.io.ByteArrayOutputStream; + +public abstract class AbstractScriptRunner implements ScriptRunner { + + @Override + public WorkflowModel runScript( + ScriptContext scriptContext, + WorkflowContext workflowContext, + TaskContext taskContext, + WorkflowModel input) { + ByteArrayOutputStream stderr = new ByteArrayOutputStream(); + ByteArrayOutputStream stdout = new ByteArrayOutputStream(); + try { + runScript(scriptContext, stdout, stderr, workflowContext, taskContext); + return scriptContext + .returnType() + .map( + type -> + modelFromOutput( + type, + workflowContext.definition().application().modelFactory(), + stdout, + stderr)) + .orElse(input); + } catch (Exception ex) { + throw new WorkflowException(WorkflowError.runtime(taskContext, ex).build()); + } + } + + protected abstract void runScript( + ScriptContext scriptContext, + ByteArrayOutputStream stdout, + ByteArrayOutputStream stderr, + WorkflowContext workflowContext, + TaskContext taskContext); + + protected WorkflowModel modelFromOutput( + RunTaskConfiguration.ProcessReturnType returnType, + WorkflowModelFactory modelFactory, + ByteArrayOutputStream stdout, + ByteArrayOutputStream stderr) { + return switch (returnType) { + case ALL -> modelFactory.fromAny(new ProcessResult(0, toString(stdout), toString(stderr))); + case NONE -> modelFactory.fromNull(); + case CODE -> modelFactory.from(0); + case STDOUT -> modelFactory.from(toString(stdout)); + case STDERR -> modelFactory.from(toString(stderr)); + }; + } + + private String toString(ByteArrayOutputStream stream) { + return stream.toString().trim(); + } +} diff --git a/impl/pom.xml b/impl/pom.xml index b052dd52..ef67f386 100644 --- a/impl/pom.xml +++ b/impl/pom.xml @@ -17,6 +17,7 @@ 9.2.1 3.7.0 25.0.1 + 4.2.0 @@ -142,6 +143,11 @@ serverlessworkflow-impl-script-js ${project.version} + + io.serverlessworkflow + serverlessworkflow-impl-script-python + ${project.version} + com.cronutils cron-utils @@ -168,6 +174,11 @@ polyglot ${version.org.graalvm.polyglot} + + black.ninia + jep + ${version.black.ninia} + @@ -187,5 +198,6 @@ container test script-js + python diff --git a/impl/python/pom.xml b/impl/python/pom.xml new file mode 100644 index 00000000..cb232423 --- /dev/null +++ b/impl/python/pom.xml @@ -0,0 +1,20 @@ + + 4.0.0 + + io.serverlessworkflow + serverlessworkflow-impl + 8.0.0-SNAPSHOT + + serverlessworkflow-impl-script-python + Serverless Workflow :: Impl :: Script Python + + + io.serverlessworkflow + serverlessworkflow-impl-core + + + black.ninia + jep + + + \ No newline at end of file diff --git a/impl/python/src/main/java/io/serverlessworkflow/impl/executors/script/python/PythonScriptTaskRunner.java b/impl/python/src/main/java/io/serverlessworkflow/impl/executors/script/python/PythonScriptTaskRunner.java new file mode 100644 index 00000000..da6c25d6 --- /dev/null +++ b/impl/python/src/main/java/io/serverlessworkflow/impl/executors/script/python/PythonScriptTaskRunner.java @@ -0,0 +1,66 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.script.python; + +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.config.ConfigManager; +import io.serverlessworkflow.impl.scripts.AbstractScriptRunner; +import io.serverlessworkflow.impl.scripts.ScriptContext; +import io.serverlessworkflow.impl.scripts.ScriptLanguageId; +import java.io.ByteArrayOutputStream; +import java.util.Collection; +import jep.Interpreter; +import jep.SharedInterpreter; + +public class PythonScriptTaskRunner extends AbstractScriptRunner { + + @Override + public ScriptLanguageId identifier() { + return ScriptLanguageId.PYTHON; + } + + private static final String PYTHON_SYS_PATH = "sys.path.append('%s')\n"; + private static final String SEARCH_PATH_PROPERTY = "io.serverlessworkflow.impl."; + + @Override + protected void runScript( + ScriptContext scriptContext, + ByteArrayOutputStream stdout, + ByteArrayOutputStream stderr, + WorkflowContext workflowContext, + TaskContext taskContext) { + Interpreter py = + workflowContext + .instance() + .additionalObject( + "pyInterpreter", + () -> interpreter(workflowContext.definition().application().configManager())); + scriptContext.args().forEach(py::set); + py.exec(scriptContext.code()); + } + + protected Interpreter interpreter(ConfigManager configManager) { + Interpreter py = new SharedInterpreter(); + Collection searchPaths = configManager.multiConfig(SEARCH_PATH_PROPERTY, String.class); + if (!searchPaths.isEmpty()) { + StringBuilder sb = new StringBuilder("import sys\n"); + searchPaths.forEach(path -> sb.append(String.format(PYTHON_SYS_PATH, path))); + py.exec(sb.toString()); + } + return py; + } +} diff --git a/impl/python/src/main/resources/META-INF/services/io.serverlessworkflow.impl.scripts.ScriptRunner b/impl/python/src/main/resources/META-INF/services/io.serverlessworkflow.impl.scripts.ScriptRunner new file mode 100644 index 00000000..347fa206 --- /dev/null +++ b/impl/python/src/main/resources/META-INF/services/io.serverlessworkflow.impl.scripts.ScriptRunner @@ -0,0 +1 @@ +io.serverlessworkflow.impl.executors.script.python.PythonScriptTaskRunner \ No newline at end of file diff --git a/impl/script-js/src/main/java/io/serverlessworkflow/impl/executors/script/js/JavaScriptScriptTaskRunner.java b/impl/script-js/src/main/java/io/serverlessworkflow/impl/executors/script/js/JavaScriptScriptTaskRunner.java index 5b986cd4..632c494e 100644 --- a/impl/script-js/src/main/java/io/serverlessworkflow/impl/executors/script/js/JavaScriptScriptTaskRunner.java +++ b/impl/script-js/src/main/java/io/serverlessworkflow/impl/executors/script/js/JavaScriptScriptTaskRunner.java @@ -15,23 +15,15 @@ */ package io.serverlessworkflow.impl.executors.script.js; -import io.serverlessworkflow.api.types.RunTaskConfiguration; import io.serverlessworkflow.impl.TaskContext; -import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowContext; -import io.serverlessworkflow.impl.WorkflowError; -import io.serverlessworkflow.impl.WorkflowException; -import io.serverlessworkflow.impl.WorkflowModel; -import io.serverlessworkflow.impl.WorkflowModelFactory; -import io.serverlessworkflow.impl.executors.ProcessResult; +import io.serverlessworkflow.impl.scripts.AbstractScriptRunner; import io.serverlessworkflow.impl.scripts.ScriptContext; import io.serverlessworkflow.impl.scripts.ScriptLanguageId; import io.serverlessworkflow.impl.scripts.ScriptRunner; import java.io.ByteArrayOutputStream; import java.util.Map; -import java.util.function.Supplier; import org.graalvm.polyglot.Context; -import org.graalvm.polyglot.PolyglotException; import org.graalvm.polyglot.Source; import org.graalvm.polyglot.Value; @@ -39,88 +31,13 @@ * JavaScript implementation of the {@link ScriptRunner} interface that executes JavaScript scripts * using GraalVM Polyglot API. */ -public class JavaScriptScriptTaskRunner implements ScriptRunner { +public class JavaScriptScriptTaskRunner extends AbstractScriptRunner { @Override public ScriptLanguageId identifier() { return ScriptLanguageId.JS; } - @Override - public WorkflowModel runScript( - ScriptContext script, - WorkflowContext workflowContext, - TaskContext taskContext, - WorkflowModel input) { - WorkflowApplication application = workflowContext.definition().application(); - ByteArrayOutputStream stderr = new ByteArrayOutputStream(); - ByteArrayOutputStream stdout = new ByteArrayOutputStream(); - try (Context ctx = - Context.newBuilder() - .err(stderr) - .out(stdout) - .useSystemExit(true) - .allowCreateProcess(false) - .option("engine.WarnInterpreterOnly", "false") - .build()) { - - script - .args() - .forEach( - (key, val) -> { - ctx.getBindings(identifier().getLang()).putMember(key, val); - }); - configureProcessEnv(ctx, script.envs()); - ctx.eval(Source.create(identifier().getLang(), script.code())); - return script - .returnType() - .map( - type -> - modelFromOutput( - type, application.modelFactory(), stdout, () -> stderr.toString())) - .orElse(input); - } catch (PolyglotException e) { - if (e.getExitStatus() != 0 || e.isSyntaxError()) { - throw new WorkflowException(WorkflowError.runtime(taskContext, e).build()); - } else { - return script - .returnType() - .map( - type -> - modelFromOutput( - type, application.modelFactory(), stdout, () -> buildStderr(e, stderr))) - .orElse(input); - } - } - } - - private WorkflowModel modelFromOutput( - RunTaskConfiguration.ProcessReturnType returnType, - WorkflowModelFactory modelFactory, - ByteArrayOutputStream stdout, - Supplier stderr) { - return switch (returnType) { - case ALL -> - modelFactory.fromAny(new ProcessResult(0, stdout.toString().trim(), stderr.get().trim())); - case NONE -> modelFactory.fromNull(); - case CODE -> modelFactory.from(0); - case STDOUT -> modelFactory.from(stdout.toString().trim()); - case STDERR -> modelFactory.from(stderr.get().trim()); - }; - } - - /* - * Gets the stderr message from the PolyglotException or the stderr stream. - * - * @param e the {@link PolyglotException} thrown during script execution - * @param stderr the stderr stream - * @return the stderr message - */ - private String buildStderr(PolyglotException e, ByteArrayOutputStream stderr) { - String err = stderr.toString(); - return err.isBlank() ? e.getMessage() : err.trim(); - } - /* * Configures the process.env object in the JavaScript context with the provided environment * variables. @@ -138,4 +55,25 @@ private void configureProcessEnv(Context context, Map envs) { } bindings.putMember("process", process); } + + @Override + protected void runScript( + ScriptContext scriptContext, + ByteArrayOutputStream stdout, + ByteArrayOutputStream stderr, + WorkflowContext workflowContext, + TaskContext taskContext) { + try (Context ctx = + Context.newBuilder() + .err(stderr) + .out(stdout) + .useSystemExit(true) + .allowCreateProcess(false) + .option("engine.WarnInterpreterOnly", "false") + .build()) { + scriptContext.args().forEach(ctx.getBindings(identifier().getLang())::putMember); + configureProcessEnv(ctx, scriptContext.envs()); + ctx.eval(Source.create(identifier().getLang(), scriptContext.code())); + } + } } diff --git a/impl/test/pom.xml b/impl/test/pom.xml index 444ae5c1..fd47aae9 100644 --- a/impl/test/pom.xml +++ b/impl/test/pom.xml @@ -37,6 +37,10 @@ io.serverlessworkflow serverlessworkflow-impl-script-js + + io.serverlessworkflow + serverlessworkflow-impl-script-python + org.glassfish.jersey.media jersey-media-json-jackson diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/CustomFunctionTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/CustomFunctionTest.java new file mode 100644 index 00000000..387e83d6 --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/CustomFunctionTest.java @@ -0,0 +1,69 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test; + +import static io.serverlessworkflow.api.WorkflowReader.readWorkflowFromClasspath; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowException; +import java.io.IOException; +import java.util.Map; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +public class CustomFunctionTest { + + private static WorkflowApplication app; + + @BeforeAll + static void init() { + app = WorkflowApplication.builder().build(); + } + + @AfterAll + static void cleanup() { + app.close(); + } + + @Test + void testCustomFunction() { + assertThatThrownBy( + () -> + app.workflowDefinition( + readWorkflowFromClasspath( + "workflows-samples/call-custom-function-inline.yaml")) + .instance(Map.of()) + .start() + .join()) + .hasCauseInstanceOf(WorkflowException.class) + .extracting(w -> ((WorkflowException) w.getCause()).getWorkflowError().status()) + .isEqualTo(404); + } + + @ParameterizedTest + @ValueSource( + strings = { + "workflows-samples/call-custom-function-cataloged.yaml", + "workflows-samples/call-custom-function-cataloged-global.yaml" + }) + void testCustomCatalogFunction(String fileName) throws IOException { + app.workflowDefinition(readWorkflowFromClasspath(fileName)).instance(Map.of()).start().join(); + } +} diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/RetryTimeoutTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/RetryTimeoutTest.java index c60e5b73..2a217fc0 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/RetryTimeoutTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/RetryTimeoutTest.java @@ -121,36 +121,4 @@ void testTimeout() throws IOException { .orElseThrow(); assertThat(result.get("message")).isEqualTo("Viva er Beti Balompie"); } - - @Test - void testCustomFunction() { - assertThatThrownBy( - () -> - app.workflowDefinition( - readWorkflowFromClasspath( - "workflows-samples/call-custom-function-inline.yaml")) - .instance(Map.of()) - .start() - .join()) - .hasCauseInstanceOf(WorkflowException.class) - .extracting(w -> ((WorkflowException) w.getCause()).getWorkflowError().status()) - .isEqualTo(404); - } - - @ParameterizedTest - @ValueSource( - strings = { - "workflows-samples/call-custom-function-cataloged.yaml", - "workflows-samples/call-custom-function-cataloged-global.yaml" - }) - void testCustomCatalogFunction(String fileName) throws IOException { - assertThatThrownBy( - () -> - app.workflowDefinition(readWorkflowFromClasspath(fileName)) - .instance(Map.of()) - .start() - .join()) - .isInstanceOf(IllegalStateException.class) - .hasMessageContaining("No script runner implementation found for language PYTHON"); - } } diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/RunScriptJavaScriptTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/RunScriptJavaScriptTest.java index 24832ebf..7ec221c6 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/RunScriptJavaScriptTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/RunScriptJavaScriptTest.java @@ -15,11 +15,13 @@ */ package io.serverlessworkflow.impl.test; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + import io.serverlessworkflow.api.WorkflowReader; import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowException; import io.serverlessworkflow.impl.WorkflowModel; -import io.serverlessworkflow.impl.executors.ProcessResult; import java.io.IOException; import java.util.Map; import okhttp3.mockwebserver.MockResponse; @@ -126,13 +128,9 @@ void testFunctionThrowingError() throws IOException { WorkflowReader.readWorkflowFromClasspath( "workflows-samples/run-script/function-with-throw.yaml"); try (WorkflowApplication appl = WorkflowApplication.builder().build()) { - WorkflowModel model = appl.workflowDefinition(workflow).instance(Map.of()).start().join(); - - SoftAssertions.assertSoftly( - softly -> { - softly.assertThat(model.asText()).isPresent(); - softly.assertThat(model.asText().get()).isEqualTo("Error: This is a test error"); - }); + assertThatThrownBy(() -> appl.workflowDefinition(workflow).instance(Map.of()).start().join()) + .hasCauseInstanceOf(WorkflowException.class) + .hasMessageContaining("test error"); } } @@ -142,15 +140,9 @@ void testFunctionThrowingErrorAndReturnAll() throws IOException { WorkflowReader.readWorkflowFromClasspath( "workflows-samples/run-script/function-with-throw-all.yaml"); try (WorkflowApplication appl = WorkflowApplication.builder().build()) { - WorkflowModel model = appl.workflowDefinition(workflow).instance(Map.of()).start().join(); - - SoftAssertions.assertSoftly( - softly -> { - ProcessResult r = model.as(ProcessResult.class).orElseThrow(); - softly.assertThat(r.stderr()).isEqualTo("Error: This is a test error"); - softly.assertThat(r.stdout()).isEqualTo("logged before the 'throw' statement"); - softly.assertThat(r.code()).isEqualTo(0); - }); + assertThatThrownBy(() -> appl.workflowDefinition(workflow).instance(Map.of()).start().join()) + .hasCauseInstanceOf(WorkflowException.class) + .hasMessageContaining("test error"); } } diff --git a/impl/test/src/test/resources/workflows-samples/call-custom-function-cataloged-global.yaml b/impl/test/src/test/resources/workflows-samples/call-custom-function-cataloged-global.yaml index 534465d4..8c8cd0b2 100644 --- a/impl/test/src/test/resources/workflows-samples/call-custom-function-cataloged-global.yaml +++ b/impl/test/src/test/resources/workflows-samples/call-custom-function-cataloged-global.yaml @@ -6,7 +6,7 @@ document: use: catalogs: global: - endpoint: https://github.com/serverlessworkflow/catalog + endpoint: https://github.com/fjtirado/catalog do: - log: call: log:1.0.0@global diff --git a/impl/test/src/test/resources/workflows-samples/call-custom-function-cataloged.yaml b/impl/test/src/test/resources/workflows-samples/call-custom-function-cataloged.yaml index 43fb634e..84fc8365 100644 --- a/impl/test/src/test/resources/workflows-samples/call-custom-function-cataloged.yaml +++ b/impl/test/src/test/resources/workflows-samples/call-custom-function-cataloged.yaml @@ -5,7 +5,7 @@ document: version: '0.1.0' do: - log: - call: https://raw.githubusercontent.com/serverlessworkflow/catalog/main/functions/log/1.0.0/function.yaml + call: https://raw.githubusercontent.com/fjtirado/catalog/refs/heads/main/functions/log/1.0.0/function.yaml with: message: Hello, world! level: information From cdf5ea5d0806f44de720661f50e689e99db3da17 Mon Sep 17 00:00:00 2001 From: fjtirado Date: Wed, 3 Dec 2025 17:48:28 +0100 Subject: [PATCH 2/2] [Fix #936] Alternative approach Signed-off-by: fjtirado --- .../container/executors/ContainerRunner.java | 74 ++--------- .../executors/RunContainerExecutor.java | 51 ------- .../RunContainerExecutorBuilder.java | 64 +++++++++ ...rkflow.impl.executors.RunnableTaskBuilder} | 2 +- .../impl/WorkflowMutableInstance.java | 8 +- .../impl/config/ConfigManager.java | 5 +- .../impl/executors/RunScriptExecutor.java | 107 +++++---------- .../executors/RunScriptExecutorBuilder.java | 82 ++++++++++++ .../impl/executors/RunShellExecutor.java | 125 ++++-------------- .../executors/RunShellExecutorBuilder.java | 67 ++++++++++ .../impl/executors/RunTaskExecutor.java | 12 +- .../impl/executors/RunWorkflowExecutor.java | 39 +++--- .../executors/RunWorkflowExecutorBuilder.java | 40 ++++++ ...ableTask.java => RunnableTaskBuilder.java} | 13 +- .../impl/scripts/AbstractScriptRunner.java | 79 ----------- .../impl/scripts/ScriptContext.java | 3 +- .../impl/scripts/ScriptUtils.java | 92 +++++++++++++ .../impl/scripts/StreamSuppliers.java | 77 +++++++++++ ...erlessworkflow.impl.executors.RunnableTask | 3 - ...orkflow.impl.executors.RunnableTaskBuilder | 3 + impl/jackson/pom.xml | 4 +- impl/pom.xml | 6 - impl/python/pom.xml | 4 - .../script/python/PythonScriptTaskRunner.java | 59 ++++----- .../script/js/JavaScriptScriptTaskRunner.java | 21 ++- .../impl/test/CustomFunctionTest.java | 10 +- .../impl/test/RunShellExecutorTest.java | 2 +- ...call-custom-function-cataloged-global.yaml | 2 +- .../call-custom-function-cataloged.yaml | 2 +- 29 files changed, 588 insertions(+), 468 deletions(-) delete mode 100644 impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/RunContainerExecutor.java create mode 100644 impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/RunContainerExecutorBuilder.java rename impl/container/src/main/resources/META-INF/services/{io.serverlessworkflow.impl.executors.RunnableTask => io.serverlessworkflow.impl.executors.RunnableTaskBuilder} (86%) create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunScriptExecutorBuilder.java create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunShellExecutorBuilder.java create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunWorkflowExecutorBuilder.java rename impl/core/src/main/java/io/serverlessworkflow/impl/executors/{RunnableTask.java => RunnableTaskBuilder.java} (65%) delete mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/scripts/AbstractScriptRunner.java create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/scripts/ScriptUtils.java create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/scripts/StreamSuppliers.java delete mode 100644 impl/core/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.RunnableTask create mode 100644 impl/core/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.RunnableTaskBuilder diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerRunner.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerRunner.java index 5c14a3ac..d414c82d 100644 --- a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerRunner.java +++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerRunner.java @@ -28,24 +28,21 @@ import com.github.dockerjava.core.DockerClientImpl; import com.github.dockerjava.core.NameParser; import com.github.dockerjava.httpclient5.ApacheDockerHttpClient; -import io.serverlessworkflow.api.types.Container; -import io.serverlessworkflow.api.types.ContainerLifetime; -import io.serverlessworkflow.api.types.TimeoutAfter; +import io.serverlessworkflow.api.types.ContainerLifetime.ContainerCleanupPolicy; import io.serverlessworkflow.impl.TaskContext; import io.serverlessworkflow.impl.WorkflowContext; -import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.WorkflowUtils; import io.serverlessworkflow.impl.WorkflowValueResolver; +import io.serverlessworkflow.impl.executors.CallableTask; import java.io.IOException; import java.time.Duration; -import java.util.ArrayList; import java.util.Collection; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -class ContainerRunner { +class ContainerRunner implements CallableTask { private static final DefaultDockerClientConfig DEFAULT_CONFIG = DefaultDockerClientConfig.createDefaultConfigBuilder().build(); @@ -64,14 +61,19 @@ private static class DockerClientHolder { private final ContainerCleanupPolicy policy; private final String containerImage; - private ContainerRunner(ContainerRunnerBuilder builder) { - this.propertySetters = builder.propertySetters; - this.timeout = Optional.ofNullable(builder.timeout); - this.policy = builder.policy; - this.containerImage = builder.containerImage; + public ContainerRunner( + Collection propertySetters, + Optional> timeout, + ContainerCleanupPolicy policy, + String containerImage) { + this.propertySetters = propertySetters; + this.timeout = timeout; + this.policy = policy; + this.containerImage = containerImage; } - CompletableFuture start( + @Override + public CompletableFuture apply( WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) { return CompletableFuture.supplyAsync( () -> startSync(workflowContext, taskContext, input), @@ -215,52 +217,4 @@ private static RuntimeException mapExitCode(int exit) { private static RuntimeException failed(String message) { return new RuntimeException(message); } - - static ContainerRunnerBuilder builder() { - return new ContainerRunnerBuilder(); - } - - public static class ContainerRunnerBuilder { - private Container container; - private WorkflowDefinition definition; - private WorkflowValueResolver timeout; - private ContainerCleanupPolicy policy; - private String containerImage; - private Collection propertySetters = new ArrayList<>(); - - private ContainerRunnerBuilder() {} - - ContainerRunnerBuilder withContainer(Container container) { - this.container = container; - return this; - } - - public ContainerRunnerBuilder withWorkflowDefinition(WorkflowDefinition definition) { - this.definition = definition; - return this; - } - - ContainerRunner build() { - propertySetters.add(new NamePropertySetter(definition, container)); - propertySetters.add(new CommandPropertySetter(definition, container)); - propertySetters.add(new ContainerEnvironmentPropertySetter(definition, container)); - propertySetters.add(new LifetimePropertySetter(container)); - propertySetters.add(new PortsPropertySetter(container)); - propertySetters.add(new VolumesPropertySetter(definition, container)); - - containerImage = container.getImage(); - if (containerImage == null || container.getImage().isBlank()) { - throw new IllegalArgumentException("Container image must be provided"); - } - ContainerLifetime lifetime = container.getLifetime(); - if (lifetime != null) { - policy = lifetime.getCleanup(); - TimeoutAfter afterTimeout = lifetime.getAfter(); - if (afterTimeout != null) - timeout = WorkflowUtils.fromTimeoutAfter(definition.application(), afterTimeout); - } - - return new ContainerRunner(this); - } - } } diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/RunContainerExecutor.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/RunContainerExecutor.java deleted file mode 100644 index adf7482b..00000000 --- a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/RunContainerExecutor.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright 2020-Present The Serverless Workflow Specification Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.serverlessworkflow.impl.container.executors; - -import io.serverlessworkflow.api.types.Container; -import io.serverlessworkflow.api.types.RunContainer; -import io.serverlessworkflow.api.types.RunTaskConfiguration; -import io.serverlessworkflow.impl.TaskContext; -import io.serverlessworkflow.impl.WorkflowContext; -import io.serverlessworkflow.impl.WorkflowDefinition; -import io.serverlessworkflow.impl.WorkflowModel; -import io.serverlessworkflow.impl.executors.RunnableTask; -import java.util.concurrent.CompletableFuture; - -public class RunContainerExecutor implements RunnableTask { - - private ContainerRunner containerRunner; - - public void init(RunContainer taskConfiguration, WorkflowDefinition definition) { - Container container = taskConfiguration.getContainer(); - containerRunner = - ContainerRunner.builder() - .withContainer(container) - .withWorkflowDefinition(definition) - .build(); - } - - @Override - public CompletableFuture apply( - WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) { - return containerRunner.start(workflowContext, taskContext, input); - } - - @Override - public boolean accept(Class clazz) { - return RunContainer.class.equals(clazz); - } -} diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/RunContainerExecutorBuilder.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/RunContainerExecutorBuilder.java new file mode 100644 index 00000000..d58d5647 --- /dev/null +++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/RunContainerExecutorBuilder.java @@ -0,0 +1,64 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.container.executors; + +import io.serverlessworkflow.api.types.Container; +import io.serverlessworkflow.api.types.ContainerLifetime; +import io.serverlessworkflow.api.types.ContainerLifetime.ContainerCleanupPolicy; +import io.serverlessworkflow.api.types.RunContainer; +import io.serverlessworkflow.api.types.RunTaskConfiguration; +import io.serverlessworkflow.api.types.TimeoutAfter; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowUtils; +import io.serverlessworkflow.impl.WorkflowValueResolver; +import io.serverlessworkflow.impl.executors.CallableTask; +import io.serverlessworkflow.impl.executors.RunnableTaskBuilder; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Optional; + +public class RunContainerExecutorBuilder implements RunnableTaskBuilder { + + @Override + public CallableTask build(RunContainer taskConfiguration, WorkflowDefinition definition) { + Collection propertySetters = new ArrayList<>(); + Container container = taskConfiguration.getContainer(); + propertySetters.add(new NamePropertySetter(definition, container)); + propertySetters.add(new CommandPropertySetter(definition, container)); + propertySetters.add(new ContainerEnvironmentPropertySetter(definition, container)); + propertySetters.add(new LifetimePropertySetter(container)); + propertySetters.add(new PortsPropertySetter(container)); + propertySetters.add(new VolumesPropertySetter(definition, container)); + + ContainerCleanupPolicy policy = null; + WorkflowValueResolver timeout = null; + ContainerLifetime lifetime = container.getLifetime(); + if (lifetime != null) { + policy = lifetime.getCleanup(); + TimeoutAfter afterTimeout = lifetime.getAfter(); + if (afterTimeout != null) + timeout = WorkflowUtils.fromTimeoutAfter(definition.application(), afterTimeout); + } + return new ContainerRunner( + propertySetters, Optional.ofNullable(timeout), policy, container.getImage()); + } + + @Override + public boolean accept(Class clazz) { + return RunContainer.class.equals(clazz); + } +} diff --git a/impl/container/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.RunnableTask b/impl/container/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.RunnableTaskBuilder similarity index 86% rename from impl/container/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.RunnableTask rename to impl/container/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.RunnableTaskBuilder index c1450d21..606ecc83 100644 --- a/impl/container/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.RunnableTask +++ b/impl/container/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.RunnableTaskBuilder @@ -1 +1 @@ -io.serverlessworkflow.impl.container.executors.RunContainerExecutor \ No newline at end of file +io.serverlessworkflow.impl.container.executors.RunContainerExecutorBuilder \ No newline at end of file diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java index 0b5e91d5..b724c012 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java @@ -45,12 +45,10 @@ public class WorkflowMutableInstance implements WorkflowInstance { protected final WorkflowContext workflowContext; protected Instant startedAt; - protected final Map additionalObjects = new ConcurrentHashMap(); - protected AtomicReference> futureRef = new AtomicReference<>(); protected Instant completedAt; - protected final Map additionalObjects = new ConcurrentHashMap(); + protected final Map additionalObjects = new ConcurrentHashMap<>(); private Lock statusLock = new ReentrantLock(); private Map, TaskContext> suspended; @@ -292,8 +290,4 @@ public T additionalObject(String key, Supplier supplier) { } public void restoreContext(WorkflowContext workflow, TaskContext context) {} - - public T additionalObject(String key, Supplier supplier) { - return (T) additionalObjects.computeIfAbsent(key, k -> supplier.get()); - } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/config/ConfigManager.java b/impl/core/src/main/java/io/serverlessworkflow/impl/config/ConfigManager.java index d8daa2ae..344ea961 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/config/ConfigManager.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/config/ConfigManager.java @@ -17,13 +17,16 @@ import io.serverlessworkflow.impl.ServicePriority; import java.util.Collection; +import java.util.List; import java.util.Optional; public interface ConfigManager extends ServicePriority { Optional config(String propName, Class propClass); - Collection multiConfig(String propName, Class propClass); + default Collection multiConfig(String propName, Class propClass) { + return List.of(); + } Iterable names(); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunScriptExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunScriptExecutor.java index 32e13b0c..c6003cc0 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunScriptExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunScriptExecutor.java @@ -15,86 +15,42 @@ */ package io.serverlessworkflow.impl.executors; -import io.serverlessworkflow.api.types.RunScript; import io.serverlessworkflow.api.types.RunTaskConfiguration; -import io.serverlessworkflow.api.types.Script; -import io.serverlessworkflow.api.types.ScriptUnion; +import io.serverlessworkflow.api.types.RunTaskConfiguration.ProcessReturnType; import io.serverlessworkflow.impl.TaskContext; -import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowContext; -import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowError; +import io.serverlessworkflow.impl.WorkflowException; import io.serverlessworkflow.impl.WorkflowModel; -import io.serverlessworkflow.impl.WorkflowUtils; import io.serverlessworkflow.impl.WorkflowValueResolver; -import io.serverlessworkflow.impl.resources.ResourceLoaderUtils; import io.serverlessworkflow.impl.scripts.ScriptContext; -import io.serverlessworkflow.impl.scripts.ScriptLanguageId; import io.serverlessworkflow.impl.scripts.ScriptRunner; import java.util.Map; -import java.util.Objects; import java.util.Optional; -import java.util.ServiceLoader; import java.util.concurrent.CompletableFuture; -public class RunScriptExecutor implements RunnableTask { +public class RunScriptExecutor implements CallableTask { - private Optional>> environmentExpr; - private Optional>> argumentExpr; - private WorkflowValueResolver codeSupplier; - private boolean isAwait; - private Optional returnType; - private ScriptRunner taskRunner; + private final Optional>> environmentExpr; + private final Optional>> argumentExpr; + private final WorkflowValueResolver codeSupplier; + private final boolean isAwait; + private final RunTaskConfiguration.ProcessReturnType returnType; + private final ScriptRunner taskRunner; - @Override - public void init(RunScript taskConfiguration, WorkflowDefinition definition) { - ScriptUnion scriptUnion = taskConfiguration.getScript(); - Script script = scriptUnion.get(); - ScriptLanguageId language = ScriptLanguageId.from(script.getLanguage()); - - this.taskRunner = - ServiceLoader.load(ScriptRunner.class).stream() - .map(ServiceLoader.Provider::get) - .filter(s -> s.identifier().equals(language)) - .findFirst() - .orElseThrow( - () -> - new IllegalStateException( - "No script runner implementation found for language " + language)); - - this.isAwait = taskConfiguration.isAwait(); - - this.returnType = Optional.ofNullable(taskConfiguration.getReturn()); - - WorkflowApplication application = definition.application(); - this.environmentExpr = - script.getEnvironment() != null && script.getEnvironment().getAdditionalProperties() != null - ? Optional.of( - WorkflowUtils.buildMapResolver( - application, script.getEnvironment().getAdditionalProperties())) - : Optional.empty(); - - this.argumentExpr = - script.getArguments() != null && script.getArguments().getAdditionalProperties() != null - ? Optional.of( - WorkflowUtils.buildMapResolver( - application, script.getArguments().getAdditionalProperties())) - : Optional.empty(); - - this.codeSupplier = - scriptUnion.getInlineScript() != null - ? WorkflowUtils.buildStringFilter(application, scriptUnion.getInlineScript().getCode()) - : (w, t, m) -> - definition - .resourceLoader() - .load( - Objects.requireNonNull( - scriptUnion.getExternalScript(), - "External script is required if inline script was not set") - .getSource(), - ResourceLoaderUtils::readString, - w, - t, - m); + public RunScriptExecutor( + Optional>> environmentExpr, + Optional>> argumentExpr, + WorkflowValueResolver codeSupplier, + boolean isAwait, + ProcessReturnType returnType, + ScriptRunner taskRunner) { + this.environmentExpr = environmentExpr; + this.argumentExpr = argumentExpr; + this.codeSupplier = codeSupplier; + this.isAwait = isAwait; + this.returnType = returnType; + this.taskRunner = taskRunner; } @Override @@ -108,20 +64,27 @@ public CompletableFuture apply( returnType); if (isAwait) { return CompletableFuture.supplyAsync( - () -> taskRunner.runScript(scriptContext, workflowContext, taskContext, input), + () -> runScript(scriptContext, workflowContext, taskContext, input), workflowContext.definition().application().executorService()); } else { workflowContext .definition() .application() .executorService() - .submit(() -> taskRunner.runScript(scriptContext, workflowContext, taskContext, input)); + .submit(() -> runScript(scriptContext, workflowContext, taskContext, input)); return CompletableFuture.completedFuture(input); } } - @Override - public boolean accept(Class clazz) { - return RunScript.class.equals(clazz); + private WorkflowModel runScript( + ScriptContext scriptContext, + WorkflowContext workflowContext, + TaskContext taskContext, + WorkflowModel input) { + try { + return taskRunner.runScript(scriptContext, workflowContext, taskContext, input); + } catch (Exception ex) { + throw new WorkflowException(WorkflowError.runtime(taskContext, ex).build()); + } } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunScriptExecutorBuilder.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunScriptExecutorBuilder.java new file mode 100644 index 00000000..f5185090 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunScriptExecutorBuilder.java @@ -0,0 +1,82 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors; + +import io.serverlessworkflow.api.types.RunScript; +import io.serverlessworkflow.api.types.RunTaskConfiguration; +import io.serverlessworkflow.api.types.Script; +import io.serverlessworkflow.api.types.ScriptUnion; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowUtils; +import io.serverlessworkflow.impl.resources.ResourceLoaderUtils; +import io.serverlessworkflow.impl.scripts.ScriptLanguageId; +import io.serverlessworkflow.impl.scripts.ScriptRunner; +import java.util.Objects; +import java.util.Optional; +import java.util.ServiceLoader; + +public class RunScriptExecutorBuilder implements RunnableTaskBuilder { + + @Override + public CallableTask build(RunScript taskConfiguration, WorkflowDefinition definition) { + ScriptUnion scriptUnion = taskConfiguration.getScript(); + Script script = scriptUnion.get(); + ScriptLanguageId language = ScriptLanguageId.from(script.getLanguage()); + WorkflowApplication application = definition.application(); + + return new RunScriptExecutor( + script.getEnvironment() != null && script.getEnvironment().getAdditionalProperties() != null + ? Optional.of( + WorkflowUtils.buildMapResolver( + application, script.getEnvironment().getAdditionalProperties())) + : Optional.empty(), + script.getArguments() != null && script.getArguments().getAdditionalProperties() != null + ? Optional.of( + WorkflowUtils.buildMapResolver( + application, script.getArguments().getAdditionalProperties())) + : Optional.empty(), + scriptUnion.getInlineScript() != null + ? WorkflowUtils.buildStringFilter(application, scriptUnion.getInlineScript().getCode()) + : (w, t, m) -> + definition + .resourceLoader() + .load( + Objects.requireNonNull( + scriptUnion.getExternalScript(), + "External script is required if inline script was not set") + .getSource(), + ResourceLoaderUtils::readString, + w, + t, + m), + taskConfiguration.isAwait(), + taskConfiguration.getReturn(), + ServiceLoader.load(ScriptRunner.class).stream() + .map(ServiceLoader.Provider::get) + .filter(s -> s.identifier().equals(language)) + .findFirst() + .orElseThrow( + () -> + new IllegalStateException( + "No script runner implementation found for language " + language))); + } + + @Override + public boolean accept(Class clazz) { + return RunScript.class.equals(clazz); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunShellExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunShellExecutor.java index 08931cb3..349c4a2a 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunShellExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunShellExecutor.java @@ -15,33 +15,36 @@ */ package io.serverlessworkflow.impl.executors; -import io.serverlessworkflow.api.types.RunShell; -import io.serverlessworkflow.api.types.RunTaskConfiguration; +import static io.serverlessworkflow.impl.scripts.ScriptUtils.uncheckedStart; + import io.serverlessworkflow.api.types.RunTaskConfiguration.ProcessReturnType; -import io.serverlessworkflow.api.types.Shell; import io.serverlessworkflow.impl.TaskContext; import io.serverlessworkflow.impl.WorkflowContext; -import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowModel; -import io.serverlessworkflow.impl.WorkflowModelFactory; -import io.serverlessworkflow.impl.WorkflowUtils; import io.serverlessworkflow.impl.WorkflowValueResolver; -import java.io.IOException; -import java.io.UncheckedIOException; -import java.nio.charset.StandardCharsets; -import java.util.LinkedHashMap; +import io.serverlessworkflow.impl.scripts.ScriptUtils; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.stream.Collectors; - -public class RunShellExecutor implements RunnableTask { - private WorkflowValueResolver shellCommand; - private Map, Optional>> +public class RunShellExecutor implements CallableTask { + private final WorkflowValueResolver shellCommand; + private final Map, Optional>> shellArguments; - private Optional>> shellEnv; - private Optional returnType; + private final Optional>> shellEnv; + private final Optional returnType; + + public RunShellExecutor( + WorkflowValueResolver shellCommand, + Map, Optional>> shellArguments, + Optional>> shellEnv, + Optional returnType) { + super(); + this.shellCommand = shellCommand; + this.shellArguments = shellArguments; + this.shellEnv = shellEnv; + this.returnType = returnType; + } @Override public CompletableFuture apply( @@ -55,26 +58,17 @@ public CompletableFuture apply( .ifPresent( v -> commandBuilder.append("=").append(v.apply(workflowContext, taskContext, model))); } - ProcessBuilder builder = new ProcessBuilder("sh", "-c", commandBuilder.toString()); shellEnv.ifPresent( - map -> { - for (Map.Entry entry : - map.apply(workflowContext, taskContext, model).entrySet()) { - builder.environment().put(entry.getKey(), (String) entry.getValue()); - } - }); + map -> ScriptUtils.addEnviromment(builder, map.apply(workflowContext, taskContext, model))); return returnType .map( type -> CompletableFuture.supplyAsync( () -> - buildResultFromProcess( - workflowContext.definition().application().modelFactory(), - uncheckedStart(builder), - type) - .orElse(model), + ScriptUtils.buildResultFromProcess( + workflowContext.definition(), uncheckedStart(builder), type, model), workflowContext.definition().application().executorService())) .orElseGet( () -> { @@ -86,77 +80,4 @@ public CompletableFuture apply( return CompletableFuture.completedFuture(model); }); } - - private Process uncheckedStart(ProcessBuilder builder) { - try { - return builder.start(); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - - @Override - public void init(RunShell taskConfiguration, WorkflowDefinition definition) { - Shell shell = taskConfiguration.getShell(); - if (!WorkflowUtils.isValid(taskConfiguration.getShell().getCommand())) { - throw new IllegalStateException("Missing shell command in RunShell task configuration"); - } - shellCommand = - WorkflowUtils.buildStringFilter( - definition.application(), taskConfiguration.getShell().getCommand()); - - shellArguments = - shell.getArguments() != null && shell.getArguments().getAdditionalProperties() != null - ? shell.getArguments().getAdditionalProperties().entrySet().stream() - .collect( - Collectors.toMap( - e -> WorkflowUtils.buildStringFilter(definition.application(), e.getKey()), - e -> - e.getValue() != null - ? Optional.of( - WorkflowUtils.buildStringFilter( - definition.application(), e.getValue().toString())) - : Optional.empty(), - (x, y) -> y, - LinkedHashMap::new)) - : Map.of(); - - shellEnv = - shell.getEnvironment() != null && shell.getEnvironment().getAdditionalProperties() != null - ? Optional.of( - WorkflowUtils.buildMapResolver( - definition.application(), shell.getEnvironment().getAdditionalProperties())) - : Optional.empty(); - - returnType = - taskConfiguration.isAwait() ? Optional.of(taskConfiguration.getReturn()) : Optional.empty(); - } - - private static Optional buildResultFromProcess( - WorkflowModelFactory modelFactory, Process process, ProcessReturnType type) { - try { - int exitCode = process.waitFor(); - String stdout = new String(process.getInputStream().readAllBytes(), StandardCharsets.UTF_8); - String stderr = new String(process.getErrorStream().readAllBytes(), StandardCharsets.UTF_8); - return Optional.of( - switch (type) { - case ALL -> - modelFactory.fromAny(new ProcessResult(exitCode, stdout.trim(), stderr.trim())); - case NONE -> modelFactory.fromNull(); - case CODE -> modelFactory.from(exitCode); - case STDOUT -> modelFactory.from(stdout.trim()); - case STDERR -> modelFactory.from(stderr.trim()); - }); - } catch (IOException e) { - throw new UncheckedIOException(e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return Optional.empty(); - } - } - - @Override - public boolean accept(Class clazz) { - return RunShell.class.equals(clazz); - } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunShellExecutorBuilder.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunShellExecutorBuilder.java new file mode 100644 index 00000000..ce31cefa --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunShellExecutorBuilder.java @@ -0,0 +1,67 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors; + +import io.serverlessworkflow.api.types.RunShell; +import io.serverlessworkflow.api.types.RunTaskConfiguration; +import io.serverlessworkflow.api.types.Shell; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowUtils; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +public class RunShellExecutorBuilder implements RunnableTaskBuilder { + + @Override + public CallableTask build(RunShell taskConfiguration, WorkflowDefinition definition) { + Shell shell = taskConfiguration.getShell(); + if (!WorkflowUtils.isValid(taskConfiguration.getShell().getCommand())) { + throw new IllegalStateException("Missing shell command in RunShell task configuration"); + } + return new RunShellExecutor( + WorkflowUtils.buildStringFilter( + definition.application(), taskConfiguration.getShell().getCommand()), + shell.getArguments() != null + ? shell.getArguments().getAdditionalProperties().entrySet().stream() + .collect( + Collectors.toMap( + e -> WorkflowUtils.buildStringFilter(definition.application(), e.getKey()), + e -> + e.getValue() != null + ? Optional.of( + WorkflowUtils.buildStringFilter( + definition.application(), e.getValue().toString())) + : Optional.empty(), + (x, y) -> y, + LinkedHashMap::new)) + : Map.of(), + shell.getEnvironment() != null + ? Optional.of( + WorkflowUtils.buildMapResolver( + definition.application(), shell.getEnvironment().getAdditionalProperties())) + : Optional.empty(), + taskConfiguration.isAwait() + ? Optional.of(taskConfiguration.getReturn()) + : Optional.empty()); + } + + @Override + public boolean accept(Class clazz) { + return RunShell.class.equals(clazz); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunTaskExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunTaskExecutor.java index 5eeedddc..5bd26a1b 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunTaskExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunTaskExecutor.java @@ -26,15 +26,15 @@ import java.util.ServiceLoader.Provider; import java.util.concurrent.CompletableFuture; -public class RunTaskExecutor extends RegularTaskExecutor { +public class RunTaskExecutor extends RegularTaskExecutor { - private final RunnableTask runnable; + private final CallableTask runnable; - private static final ServiceLoader runnables = - ServiceLoader.load(RunnableTask.class); + private static final ServiceLoader runnables = + ServiceLoader.load(RunnableTaskBuilder.class); public static class RunTaskExecutorBuilder extends RegularTaskExecutorBuilder { - private RunnableTask runnable; + private CallableTask runnable; protected RunTaskExecutorBuilder( WorkflowMutablePosition position, RunTask task, WorkflowDefinition definition) { @@ -45,11 +45,11 @@ protected RunTaskExecutorBuilder( .map(Provider::get) .filter(r -> r.accept(config.getClass())) .findFirst() + .map(r -> r.build(config, definition)) .orElseThrow( () -> new UnsupportedOperationException( "No runnable found for operation " + config.getClass())); - runnable.init(config, definition); } @Override diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunWorkflowExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunWorkflowExecutor.java index 66cf27e5..cf0a8faa 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunWorkflowExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunWorkflowExecutor.java @@ -15,28 +15,24 @@ */ package io.serverlessworkflow.impl.executors; -import io.serverlessworkflow.api.types.RunTaskConfiguration; -import io.serverlessworkflow.api.types.RunWorkflow; -import io.serverlessworkflow.api.types.SubflowConfiguration; import io.serverlessworkflow.impl.TaskContext; import io.serverlessworkflow.impl.WorkflowContext; import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowDefinitionId; import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowValueResolver; import java.util.Map; import java.util.concurrent.CompletableFuture; -public class RunWorkflowExecutor implements RunnableTask { +public class RunWorkflowExecutor implements CallableTask { + private final WorkflowDefinitionId workflowDefinitionId; + private final WorkflowValueResolver> additionalParameters; - private WorkflowDefinitionId workflowDefinitionId; - private Map additionalParameters; - - public void init(RunWorkflow taskConfiguration, WorkflowDefinition definition) { - SubflowConfiguration workflowConfig = taskConfiguration.getWorkflow(); - this.workflowDefinitionId = - new WorkflowDefinitionId( - workflowConfig.getNamespace(), workflowConfig.getName(), workflowConfig.getVersion()); - this.additionalParameters = workflowConfig.getInput().getAdditionalProperties(); + public RunWorkflowExecutor( + WorkflowDefinitionId workflowDefinitionId, + WorkflowValueResolver> additionalParameters) { + this.workflowDefinitionId = workflowDefinitionId; + this.additionalParameters = additionalParameters; } @Override @@ -44,17 +40,16 @@ public CompletableFuture apply( WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) { WorkflowDefinition definition = workflowContext.definition().application().workflowDefinitions().get(workflowDefinitionId); - if (definition != null) { - // TODO add additional parameters - return definition.instance(input).start(); - } else { + if (definition == null) { throw new IllegalArgumentException( "Workflow definition for " + workflowDefinitionId + " has not been found"); } - } - - @Override - public boolean accept(Class clazz) { - return RunWorkflow.class.equals(clazz); + Map args = additionalParameters.apply(workflowContext, taskContext, input); + return definition + .instance( + !args.isEmpty() + ? workflowContext.definition().application().modelFactory().from(args) + : input) + .start(); } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunWorkflowExecutorBuilder.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunWorkflowExecutorBuilder.java new file mode 100644 index 00000000..71208c43 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunWorkflowExecutorBuilder.java @@ -0,0 +1,40 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors; + +import io.serverlessworkflow.api.types.RunTaskConfiguration; +import io.serverlessworkflow.api.types.RunWorkflow; +import io.serverlessworkflow.api.types.SubflowConfiguration; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowDefinitionId; +import io.serverlessworkflow.impl.WorkflowUtils; + +public class RunWorkflowExecutorBuilder implements RunnableTaskBuilder { + + public CallableTask build(RunWorkflow taskConfiguration, WorkflowDefinition definition) { + SubflowConfiguration workflowConfig = taskConfiguration.getWorkflow(); + return new RunWorkflowExecutor( + new WorkflowDefinitionId( + workflowConfig.getNamespace(), workflowConfig.getName(), workflowConfig.getVersion()), + WorkflowUtils.buildMapResolver( + definition.application(), workflowConfig.getInput().getAdditionalProperties())); + } + + @Override + public boolean accept(Class clazz) { + return RunWorkflow.class.equals(clazz); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunnableTask.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunnableTaskBuilder.java similarity index 65% rename from impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunnableTask.java rename to impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunnableTaskBuilder.java index d6bf032e..eb0f4ba0 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunnableTask.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunnableTaskBuilder.java @@ -16,17 +16,10 @@ package io.serverlessworkflow.impl.executors; import io.serverlessworkflow.api.types.RunTaskConfiguration; -import io.serverlessworkflow.impl.TaskContext; -import io.serverlessworkflow.impl.WorkflowContext; import io.serverlessworkflow.impl.WorkflowDefinition; -import io.serverlessworkflow.impl.WorkflowModel; -import java.util.concurrent.CompletableFuture; - -public interface RunnableTask { - default void init(T taskConfiguration, WorkflowDefinition definition) {} - - CompletableFuture apply( - WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input); +public interface RunnableTaskBuilder { boolean accept(Class clazz); + + CallableTask build(T taskConfiguration, WorkflowDefinition definition); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/scripts/AbstractScriptRunner.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scripts/AbstractScriptRunner.java deleted file mode 100644 index 37921aad..00000000 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/scripts/AbstractScriptRunner.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Copyright 2020-Present The Serverless Workflow Specification Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.serverlessworkflow.impl.scripts; - -import io.serverlessworkflow.api.types.RunTaskConfiguration; -import io.serverlessworkflow.impl.TaskContext; -import io.serverlessworkflow.impl.WorkflowContext; -import io.serverlessworkflow.impl.WorkflowError; -import io.serverlessworkflow.impl.WorkflowException; -import io.serverlessworkflow.impl.WorkflowModel; -import io.serverlessworkflow.impl.WorkflowModelFactory; -import io.serverlessworkflow.impl.executors.ProcessResult; -import java.io.ByteArrayOutputStream; - -public abstract class AbstractScriptRunner implements ScriptRunner { - - @Override - public WorkflowModel runScript( - ScriptContext scriptContext, - WorkflowContext workflowContext, - TaskContext taskContext, - WorkflowModel input) { - ByteArrayOutputStream stderr = new ByteArrayOutputStream(); - ByteArrayOutputStream stdout = new ByteArrayOutputStream(); - try { - runScript(scriptContext, stdout, stderr, workflowContext, taskContext); - return scriptContext - .returnType() - .map( - type -> - modelFromOutput( - type, - workflowContext.definition().application().modelFactory(), - stdout, - stderr)) - .orElse(input); - } catch (Exception ex) { - throw new WorkflowException(WorkflowError.runtime(taskContext, ex).build()); - } - } - - protected abstract void runScript( - ScriptContext scriptContext, - ByteArrayOutputStream stdout, - ByteArrayOutputStream stderr, - WorkflowContext workflowContext, - TaskContext taskContext); - - protected WorkflowModel modelFromOutput( - RunTaskConfiguration.ProcessReturnType returnType, - WorkflowModelFactory modelFactory, - ByteArrayOutputStream stdout, - ByteArrayOutputStream stderr) { - return switch (returnType) { - case ALL -> modelFactory.fromAny(new ProcessResult(0, toString(stdout), toString(stderr))); - case NONE -> modelFactory.fromNull(); - case CODE -> modelFactory.from(0); - case STDOUT -> modelFactory.from(toString(stdout)); - case STDERR -> modelFactory.from(toString(stderr)); - }; - } - - private String toString(ByteArrayOutputStream stream) { - return stream.toString().trim(); - } -} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/scripts/ScriptContext.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scripts/ScriptContext.java index 482febf7..44da27bc 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/scripts/ScriptContext.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scripts/ScriptContext.java @@ -17,10 +17,9 @@ import io.serverlessworkflow.api.types.RunTaskConfiguration; import java.util.Map; -import java.util.Optional; public record ScriptContext( Map args, Map envs, String code, - Optional returnType) {} + RunTaskConfiguration.ProcessReturnType returnType) {} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/scripts/ScriptUtils.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scripts/ScriptUtils.java new file mode 100644 index 00000000..3eb20a84 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scripts/ScriptUtils.java @@ -0,0 +1,92 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.scripts; + +import io.serverlessworkflow.api.types.RunTaskConfiguration; +import io.serverlessworkflow.api.types.RunTaskConfiguration.ProcessReturnType; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowModelFactory; +import io.serverlessworkflow.impl.executors.ProcessResult; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ScriptUtils { + + private static final Logger logger = LoggerFactory.getLogger(ScriptUtils.class); + + public static WorkflowModel modelFromOutput( + int code, + StreamSuppliers streamSuppliers, + RunTaskConfiguration.ProcessReturnType returnType, + WorkflowModelFactory modelFactory, + WorkflowModel model) { + if (code != 0) { + logger.warn("Process call failed with code {}", code); + if (logger.isDebugEnabled()) { + logger.debug(streamSuppliers.errorStream().get()); + } + } + return switch (returnType) { + case ALL -> + modelFactory.fromAny( + new ProcessResult( + 0, streamSuppliers.outputStream().get(), streamSuppliers.errorStream().get())); + case NONE -> model; + case CODE -> modelFactory.from(0); + case STDOUT -> modelFactory.from(streamSuppliers.outputStream().get()); + case STDERR -> modelFactory.from(streamSuppliers.errorStream().get()); + }; + } + + public static Process uncheckedStart(ProcessBuilder builder) { + try { + return builder.start(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + public static void addEnviromment(ProcessBuilder builder, Map env) { + for (Map.Entry entry : env.entrySet()) { + builder.environment().put(entry.getKey(), (String) entry.getValue()); + } + } + + public static WorkflowModel buildResultFromProcess( + WorkflowDefinition definition, Process process, ProcessReturnType type, WorkflowModel model) { + try { + int code = process.waitFor(); + StreamSuppliers suppliers = StreamSuppliers.from(process); + if (definition + .application() + .configManager() + .config("io.serverlessworkflow.impl.scripts.dumpOutput", Boolean.class) + .orElse(true)) { + System.out.println(suppliers.outputStream().get()); + } + return modelFromOutput(code, suppliers, type, definition.application().modelFactory(), model); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return model; + } + } + + private ScriptUtils() {} +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/scripts/StreamSuppliers.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scripts/StreamSuppliers.java new file mode 100644 index 00000000..2e21cb89 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scripts/StreamSuppliers.java @@ -0,0 +1,77 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.scripts; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.nio.charset.StandardCharsets; +import java.util.function.Supplier; + +public record StreamSuppliers(Supplier outputStream, Supplier errorStream) { + + public static StreamSuppliers from(Process process) { + return new StreamSuppliers( + new InputStreamSupplier(process.getInputStream()), + new InputStreamSupplier(process.getErrorStream())); + } + + public static StreamSuppliers from(ByteArrayOutputStream stdout, ByteArrayOutputStream stderr) { + return new StreamSuppliers( + new ByteArrayStreamSupplier(stdout), new ByteArrayStreamSupplier(stderr)); + } + + private static class ByteArrayStreamSupplier implements Supplier { + + private String value; + private final ByteArrayOutputStream stream; + + public ByteArrayStreamSupplier(ByteArrayOutputStream stream) { + this.stream = stream; + } + + @Override + public String get() { + if (value == null) { + value = stream.toString().trim(); + } + return value; + } + } + + private static class InputStreamSupplier implements Supplier { + + private String value; + private final InputStream stream; + + public InputStreamSupplier(InputStream stream) { + this.stream = stream; + } + + @Override + public String get() { + if (value == null) { + try { + value = new String(stream.readAllBytes(), StandardCharsets.UTF_8); + } catch (IOException ex) { + throw new UncheckedIOException(ex); + } + } + return value; + } + } +} diff --git a/impl/core/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.RunnableTask b/impl/core/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.RunnableTask deleted file mode 100644 index ea1bb37e..00000000 --- a/impl/core/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.RunnableTask +++ /dev/null @@ -1,3 +0,0 @@ -io.serverlessworkflow.impl.executors.RunWorkflowExecutor -io.serverlessworkflow.impl.executors.RunShellExecutor -io.serverlessworkflow.impl.executors.RunScriptExecutor \ No newline at end of file diff --git a/impl/core/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.RunnableTaskBuilder b/impl/core/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.RunnableTaskBuilder new file mode 100644 index 00000000..6f0679b4 --- /dev/null +++ b/impl/core/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.RunnableTaskBuilder @@ -0,0 +1,3 @@ +io.serverlessworkflow.impl.executors.RunWorkflowExecutorBuilder +io.serverlessworkflow.impl.executors.RunShellExecutorBuilder +io.serverlessworkflow.impl.executors.RunScriptExecutorBuilder \ No newline at end of file diff --git a/impl/jackson/pom.xml b/impl/jackson/pom.xml index 1f3bae03..1a4176fd 100644 --- a/impl/jackson/pom.xml +++ b/impl/jackson/pom.xml @@ -7,7 +7,7 @@ serverlessworkflow-impl-jackson Serverless Workflow :: Impl :: Jackson - + io.serverlessworkflow serverlessworkflow-impl-core @@ -36,5 +36,5 @@ io.serverlessworkflow serverlessworkflow-impl-lifecycle-events - + \ No newline at end of file diff --git a/impl/pom.xml b/impl/pom.xml index ef67f386..ec4b036f 100644 --- a/impl/pom.xml +++ b/impl/pom.xml @@ -17,7 +17,6 @@ 9.2.1 3.7.0 25.0.1 - 4.2.0 @@ -174,11 +173,6 @@ polyglot ${version.org.graalvm.polyglot} - - black.ninia - jep - ${version.black.ninia} - diff --git a/impl/python/pom.xml b/impl/python/pom.xml index cb232423..13d7c4d2 100644 --- a/impl/python/pom.xml +++ b/impl/python/pom.xml @@ -12,9 +12,5 @@ io.serverlessworkflow serverlessworkflow-impl-core - - black.ninia - jep - \ No newline at end of file diff --git a/impl/python/src/main/java/io/serverlessworkflow/impl/executors/script/python/PythonScriptTaskRunner.java b/impl/python/src/main/java/io/serverlessworkflow/impl/executors/script/python/PythonScriptTaskRunner.java index da6c25d6..d7129090 100644 --- a/impl/python/src/main/java/io/serverlessworkflow/impl/executors/script/python/PythonScriptTaskRunner.java +++ b/impl/python/src/main/java/io/serverlessworkflow/impl/executors/script/python/PythonScriptTaskRunner.java @@ -17,50 +17,49 @@ import io.serverlessworkflow.impl.TaskContext; import io.serverlessworkflow.impl.WorkflowContext; -import io.serverlessworkflow.impl.config.ConfigManager; -import io.serverlessworkflow.impl.scripts.AbstractScriptRunner; +import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.scripts.ScriptContext; import io.serverlessworkflow.impl.scripts.ScriptLanguageId; -import java.io.ByteArrayOutputStream; -import java.util.Collection; -import jep.Interpreter; -import jep.SharedInterpreter; +import io.serverlessworkflow.impl.scripts.ScriptRunner; +import io.serverlessworkflow.impl.scripts.ScriptUtils; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public class PythonScriptTaskRunner extends AbstractScriptRunner { +public class PythonScriptTaskRunner implements ScriptRunner { + + private static final Logger logger = LoggerFactory.getLogger(PythonScriptTaskRunner.class); @Override public ScriptLanguageId identifier() { return ScriptLanguageId.PYTHON; } - private static final String PYTHON_SYS_PATH = "sys.path.append('%s')\n"; - private static final String SEARCH_PATH_PROPERTY = "io.serverlessworkflow.impl."; - @Override - protected void runScript( + public WorkflowModel runScript( ScriptContext scriptContext, - ByteArrayOutputStream stdout, - ByteArrayOutputStream stderr, WorkflowContext workflowContext, - TaskContext taskContext) { - Interpreter py = - workflowContext - .instance() - .additionalObject( - "pyInterpreter", - () -> interpreter(workflowContext.definition().application().configManager())); - scriptContext.args().forEach(py::set); - py.exec(scriptContext.code()); + TaskContext taskContext, + WorkflowModel model) { + ProcessBuilder builder = new ProcessBuilder("python", "-c", scriptContext.code()); + ScriptUtils.addEnviromment(builder, scriptContext.envs()); + scriptContext.args().forEach((k, v) -> addArg(builder.command(), k, v)); + logger.debug("Invoking python with command line {}", builder.command()); + return ScriptUtils.buildResultFromProcess( + workflowContext.definition(), + ScriptUtils.uncheckedStart(builder), + scriptContext.returnType(), + model); } - protected Interpreter interpreter(ConfigManager configManager) { - Interpreter py = new SharedInterpreter(); - Collection searchPaths = configManager.multiConfig(SEARCH_PATH_PROPERTY, String.class); - if (!searchPaths.isEmpty()) { - StringBuilder sb = new StringBuilder("import sys\n"); - searchPaths.forEach(path -> sb.append(String.format(PYTHON_SYS_PATH, path))); - py.exec(sb.toString()); + private void addArg(List list, String name, Object value) { + if (value instanceof Boolean bool) { + if (bool) { + list.add("--" + name); + } + } else if (value != null) { + list.add("--" + name); + list.add(value.toString()); } - return py; } } diff --git a/impl/script-js/src/main/java/io/serverlessworkflow/impl/executors/script/js/JavaScriptScriptTaskRunner.java b/impl/script-js/src/main/java/io/serverlessworkflow/impl/executors/script/js/JavaScriptScriptTaskRunner.java index 632c494e..dce8f159 100644 --- a/impl/script-js/src/main/java/io/serverlessworkflow/impl/executors/script/js/JavaScriptScriptTaskRunner.java +++ b/impl/script-js/src/main/java/io/serverlessworkflow/impl/executors/script/js/JavaScriptScriptTaskRunner.java @@ -17,10 +17,12 @@ import io.serverlessworkflow.impl.TaskContext; import io.serverlessworkflow.impl.WorkflowContext; -import io.serverlessworkflow.impl.scripts.AbstractScriptRunner; +import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.scripts.ScriptContext; import io.serverlessworkflow.impl.scripts.ScriptLanguageId; import io.serverlessworkflow.impl.scripts.ScriptRunner; +import io.serverlessworkflow.impl.scripts.ScriptUtils; +import io.serverlessworkflow.impl.scripts.StreamSuppliers; import java.io.ByteArrayOutputStream; import java.util.Map; import org.graalvm.polyglot.Context; @@ -31,7 +33,7 @@ * JavaScript implementation of the {@link ScriptRunner} interface that executes JavaScript scripts * using GraalVM Polyglot API. */ -public class JavaScriptScriptTaskRunner extends AbstractScriptRunner { +public class JavaScriptScriptTaskRunner implements ScriptRunner { @Override public ScriptLanguageId identifier() { @@ -57,12 +59,13 @@ private void configureProcessEnv(Context context, Map envs) { } @Override - protected void runScript( + public WorkflowModel runScript( ScriptContext scriptContext, - ByteArrayOutputStream stdout, - ByteArrayOutputStream stderr, WorkflowContext workflowContext, - TaskContext taskContext) { + TaskContext taskContext, + WorkflowModel model) { + ByteArrayOutputStream stdout = new ByteArrayOutputStream(); + ByteArrayOutputStream stderr = new ByteArrayOutputStream(); try (Context ctx = Context.newBuilder() .err(stderr) @@ -74,6 +77,12 @@ protected void runScript( scriptContext.args().forEach(ctx.getBindings(identifier().getLang())::putMember); configureProcessEnv(ctx, scriptContext.envs()); ctx.eval(Source.create(identifier().getLang(), scriptContext.code())); + return ScriptUtils.modelFromOutput( + 0, + StreamSuppliers.from(stdout, stderr), + scriptContext.returnType(), + workflowContext.definition().application().modelFactory(), + model); } } } diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/CustomFunctionTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/CustomFunctionTest.java index 387e83d6..ab795c06 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/CustomFunctionTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/CustomFunctionTest.java @@ -16,6 +16,7 @@ package io.serverlessworkflow.impl.test; import static io.serverlessworkflow.api.WorkflowReader.readWorkflowFromClasspath; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import io.serverlessworkflow.impl.WorkflowApplication; @@ -64,6 +65,13 @@ void testCustomFunction() { "workflows-samples/call-custom-function-cataloged-global.yaml" }) void testCustomCatalogFunction(String fileName) throws IOException { - app.workflowDefinition(readWorkflowFromClasspath(fileName)).instance(Map.of()).start().join(); + assertThat( + app.workflowDefinition(readWorkflowFromClasspath(fileName)) + .instance(Map.of()) + .start() + .join() + .asText() + .orElseThrow()) + .contains("Hello"); } } diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/RunShellExecutorTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/RunShellExecutorTest.java index 1e80ea50..073a0bf6 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/RunShellExecutorTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/RunShellExecutorTest.java @@ -183,7 +183,7 @@ void testNone() throws IOException { SoftAssertions.assertSoftly( softly -> { - softly.assertThat(outputModel.asJavaObject()).isNull(); + softly.assertThat(outputModel.asJavaObject()).isEqualTo(Map.of()); }); } } diff --git a/impl/test/src/test/resources/workflows-samples/call-custom-function-cataloged-global.yaml b/impl/test/src/test/resources/workflows-samples/call-custom-function-cataloged-global.yaml index 8c8cd0b2..534465d4 100644 --- a/impl/test/src/test/resources/workflows-samples/call-custom-function-cataloged-global.yaml +++ b/impl/test/src/test/resources/workflows-samples/call-custom-function-cataloged-global.yaml @@ -6,7 +6,7 @@ document: use: catalogs: global: - endpoint: https://github.com/fjtirado/catalog + endpoint: https://github.com/serverlessworkflow/catalog do: - log: call: log:1.0.0@global diff --git a/impl/test/src/test/resources/workflows-samples/call-custom-function-cataloged.yaml b/impl/test/src/test/resources/workflows-samples/call-custom-function-cataloged.yaml index 84fc8365..80a3a4dc 100644 --- a/impl/test/src/test/resources/workflows-samples/call-custom-function-cataloged.yaml +++ b/impl/test/src/test/resources/workflows-samples/call-custom-function-cataloged.yaml @@ -5,7 +5,7 @@ document: version: '0.1.0' do: - log: - call: https://raw.githubusercontent.com/fjtirado/catalog/refs/heads/main/functions/log/1.0.0/function.yaml + call: https://raw.githubusercontent.com/serverlessworkflow/catalog/refs/heads/main/functions/log/1.0.0/function.yaml with: message: Hello, world! level: information