From b82f5ff978f95c7ef4b71f5bb69610736d33b258 Mon Sep 17 00:00:00 2001 From: fjtirado Date: Tue, 18 Nov 2025 18:29:01 +0100 Subject: [PATCH] [Fix #987] Refactor shell impl Signed-off-by: fjtirado [Fix #897] Fixing flaky test Signed-off-by: fjtirado --- .../impl/executors/RunScriptExecutor.java | 40 ++-- .../impl/executors/RunShellExecutor.java | 217 +++++++++--------- .../impl/scripts/ScriptContext.java | 4 +- .../script/js/JavaScriptScriptTaskRunner.java | 32 ++- .../impl/test/RunShellExecutorTest.java | 7 - 5 files changed, 145 insertions(+), 155 deletions(-) diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunScriptExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunScriptExecutor.java index 5bc4d494..32e13b0c 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunScriptExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunScriptExecutor.java @@ -39,12 +39,10 @@ public class RunScriptExecutor implements RunnableTask { private Optional>> environmentExpr; - private Optional>> argumentExpr; - private WorkflowValueResolver codeSupplier; private boolean isAwait; - private RunTaskConfiguration.ProcessReturnType returnType; + private Optional returnType; private ScriptRunner taskRunner; @Override @@ -65,7 +63,7 @@ public void init(RunScript taskConfiguration, WorkflowDefinition definition) { this.isAwait = taskConfiguration.isAwait(); - this.returnType = taskConfiguration.getReturn(); + this.returnType = Optional.ofNullable(taskConfiguration.getReturn()); WorkflowApplication application = definition.application(); this.environmentExpr = @@ -102,22 +100,24 @@ public void init(RunScript taskConfiguration, WorkflowDefinition definition) { @Override public CompletableFuture apply( WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) { - return CompletableFuture.supplyAsync( - () -> - taskRunner.runScript( - new ScriptContext( - argumentExpr - .map(m -> m.apply(workflowContext, taskContext, input)) - .orElse(Map.of()), - environmentExpr - .map(m -> m.apply(workflowContext, taskContext, input)) - .orElse(Map.of()), - codeSupplier.apply(workflowContext, taskContext, input), - isAwait, - returnType), - workflowContext, - taskContext, - input)); + ScriptContext scriptContext = + new ScriptContext( + argumentExpr.map(m -> m.apply(workflowContext, taskContext, input)).orElse(Map.of()), + environmentExpr.map(m -> m.apply(workflowContext, taskContext, input)).orElse(Map.of()), + codeSupplier.apply(workflowContext, taskContext, input), + returnType); + if (isAwait) { + return CompletableFuture.supplyAsync( + () -> taskRunner.runScript(scriptContext, workflowContext, taskContext, input), + workflowContext.definition().application().executorService()); + } else { + workflowContext + .definition() + .application() + .executorService() + .submit(() -> taskRunner.runScript(scriptContext, workflowContext, taskContext, input)); + return CompletableFuture.completedFuture(input); + } } @Override diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunShellExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunShellExecutor.java index 4820ae40..08931cb3 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunShellExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunShellExecutor.java @@ -17,141 +17,142 @@ import io.serverlessworkflow.api.types.RunShell; import io.serverlessworkflow.api.types.RunTaskConfiguration; +import io.serverlessworkflow.api.types.RunTaskConfiguration.ProcessReturnType; import io.serverlessworkflow.api.types.Shell; import io.serverlessworkflow.impl.TaskContext; -import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowContext; import io.serverlessworkflow.impl.WorkflowDefinition; -import io.serverlessworkflow.impl.WorkflowError; -import io.serverlessworkflow.impl.WorkflowException; import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.WorkflowModelFactory; import io.serverlessworkflow.impl.WorkflowUtils; -import io.serverlessworkflow.impl.expressions.ExpressionUtils; +import io.serverlessworkflow.impl.WorkflowValueResolver; import java.io.IOException; +import java.io.UncheckedIOException; import java.nio.charset.StandardCharsets; +import java.util.LinkedHashMap; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; public class RunShellExecutor implements RunnableTask { - private ShellResultSupplier shellResultSupplier; - private ProcessBuilderSupplier processBuilderSupplier; + private WorkflowValueResolver shellCommand; + private Map, Optional>> + shellArguments; + private Optional>> shellEnv; + private Optional returnType; - @FunctionalInterface - private interface ShellResultSupplier { - WorkflowModel apply( - TaskContext taskContext, WorkflowModel input, ProcessBuilder processBuilder); - } + @Override + public CompletableFuture apply( + WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel model) { + StringBuilder commandBuilder = + new StringBuilder(shellCommand.apply(workflowContext, taskContext, model)); + for (var entry : shellArguments.entrySet()) { + commandBuilder.append(" ").append(entry.getKey().apply(workflowContext, taskContext, model)); + entry + .getValue() + .ifPresent( + v -> commandBuilder.append("=").append(v.apply(workflowContext, taskContext, model))); + } - @FunctionalInterface - private interface ProcessBuilderSupplier { - ProcessBuilder apply(WorkflowContext workflowContext, TaskContext taskContext); + ProcessBuilder builder = new ProcessBuilder("sh", "-c", commandBuilder.toString()); + shellEnv.ifPresent( + map -> { + for (Map.Entry entry : + map.apply(workflowContext, taskContext, model).entrySet()) { + builder.environment().put(entry.getKey(), (String) entry.getValue()); + } + }); + + return returnType + .map( + type -> + CompletableFuture.supplyAsync( + () -> + buildResultFromProcess( + workflowContext.definition().application().modelFactory(), + uncheckedStart(builder), + type) + .orElse(model), + workflowContext.definition().application().executorService())) + .orElseGet( + () -> { + workflowContext + .definition() + .application() + .executorService() + .submit(() -> uncheckedStart(builder)); + return CompletableFuture.completedFuture(model); + }); } - @Override - public CompletableFuture apply( - WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) { - ProcessBuilder processBuilder = this.processBuilderSupplier.apply(workflowContext, taskContext); - return CompletableFuture.supplyAsync( - () -> this.shellResultSupplier.apply(taskContext, input, processBuilder)); + private Process uncheckedStart(ProcessBuilder builder) { + try { + return builder.start(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } } @Override public void init(RunShell taskConfiguration, WorkflowDefinition definition) { Shell shell = taskConfiguration.getShell(); - final String shellCommand = shell.getCommand(); - - if (shellCommand == null || shellCommand.isBlank()) { + if (!WorkflowUtils.isValid(taskConfiguration.getShell().getCommand())) { throw new IllegalStateException("Missing shell command in RunShell task configuration"); } - this.processBuilderSupplier = - (workflowContext, taskContext) -> { - WorkflowApplication application = definition.application(); - - StringBuilder commandBuilder = - new StringBuilder( - ExpressionUtils.isExpr(shellCommand) - ? WorkflowUtils.buildStringFilter(application, shellCommand) - .apply(workflowContext, taskContext, taskContext.input()) - : shellCommand); - - if (shell.getArguments() != null - && shell.getArguments().getAdditionalProperties() != null) { - for (Map.Entry entry : - shell.getArguments().getAdditionalProperties().entrySet()) { - commandBuilder - .append(" ") - .append( - ExpressionUtils.isExpr(entry.getKey()) - ? WorkflowUtils.buildStringFilter(application, entry.getKey()) - .apply(workflowContext, taskContext, taskContext.input()) - : entry.getKey()); - if (entry.getValue() != null) { - - commandBuilder - .append("=") - .append( - ExpressionUtils.isExpr(entry.getValue()) - ? WorkflowUtils.buildStringFilter( - application, entry.getValue().toString()) - .apply(workflowContext, taskContext, taskContext.input()) - : entry.getValue().toString()); - } - } - } - - // TODO: support Windows cmd.exe - ProcessBuilder builder = new ProcessBuilder("sh", "-c", commandBuilder.toString()); - if (shell.getEnvironment() != null - && shell.getEnvironment().getAdditionalProperties() != null) { - for (Map.Entry entry : - shell.getEnvironment().getAdditionalProperties().entrySet()) { - String value = - ExpressionUtils.isExpr(entry.getValue()) - ? WorkflowUtils.buildStringFilter(application, entry.getValue().toString()) - .apply(workflowContext, taskContext, taskContext.input()) - : entry.getValue().toString(); - - // configure environments - builder.environment().put(entry.getKey(), value); - } - } - return builder; - }; - - this.shellResultSupplier = - (taskContext, input, processBuilder) -> { - try { - Process process = processBuilder.start(); - return taskConfiguration.isAwait() - ? buildResultFromProcess(taskConfiguration, definition, process) - : input; - } catch (IOException | InterruptedException e) { - throw new WorkflowException(WorkflowError.runtime(taskContext, e).build(), e); - } - }; + shellCommand = + WorkflowUtils.buildStringFilter( + definition.application(), taskConfiguration.getShell().getCommand()); + + shellArguments = + shell.getArguments() != null && shell.getArguments().getAdditionalProperties() != null + ? shell.getArguments().getAdditionalProperties().entrySet().stream() + .collect( + Collectors.toMap( + e -> WorkflowUtils.buildStringFilter(definition.application(), e.getKey()), + e -> + e.getValue() != null + ? Optional.of( + WorkflowUtils.buildStringFilter( + definition.application(), e.getValue().toString())) + : Optional.empty(), + (x, y) -> y, + LinkedHashMap::new)) + : Map.of(); + + shellEnv = + shell.getEnvironment() != null && shell.getEnvironment().getAdditionalProperties() != null + ? Optional.of( + WorkflowUtils.buildMapResolver( + definition.application(), shell.getEnvironment().getAdditionalProperties())) + : Optional.empty(); + + returnType = + taskConfiguration.isAwait() ? Optional.of(taskConfiguration.getReturn()) : Optional.empty(); } - /** - * Builds the WorkflowModel result from the executed process. It waits for the process to finish - * and captures the exit code, stdout, and stderr based on the task configuration. - */ - private WorkflowModel buildResultFromProcess( - RunShell taskConfiguration, WorkflowDefinition definition, Process process) - throws IOException, InterruptedException { - int exitCode = process.waitFor(); - String stdout = new String(process.getInputStream().readAllBytes(), StandardCharsets.UTF_8); - String stderr = new String(process.getErrorStream().readAllBytes(), StandardCharsets.UTF_8); - - WorkflowModelFactory modelFactory = definition.application().modelFactory(); - return switch (taskConfiguration.getReturn()) { - case ALL -> modelFactory.fromAny(new ProcessResult(exitCode, stdout.trim(), stderr.trim())); - case NONE -> modelFactory.fromNull(); - case CODE -> modelFactory.from(exitCode); - case STDOUT -> modelFactory.from(stdout.trim()); - case STDERR -> modelFactory.from(stderr.trim()); - }; + private static Optional buildResultFromProcess( + WorkflowModelFactory modelFactory, Process process, ProcessReturnType type) { + try { + int exitCode = process.waitFor(); + String stdout = new String(process.getInputStream().readAllBytes(), StandardCharsets.UTF_8); + String stderr = new String(process.getErrorStream().readAllBytes(), StandardCharsets.UTF_8); + return Optional.of( + switch (type) { + case ALL -> + modelFactory.fromAny(new ProcessResult(exitCode, stdout.trim(), stderr.trim())); + case NONE -> modelFactory.fromNull(); + case CODE -> modelFactory.from(exitCode); + case STDOUT -> modelFactory.from(stdout.trim()); + case STDERR -> modelFactory.from(stderr.trim()); + }); + } catch (IOException e) { + throw new UncheckedIOException(e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return Optional.empty(); + } } @Override diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/scripts/ScriptContext.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scripts/ScriptContext.java index 68f6692d..482febf7 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/scripts/ScriptContext.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scripts/ScriptContext.java @@ -17,10 +17,10 @@ import io.serverlessworkflow.api.types.RunTaskConfiguration; import java.util.Map; +import java.util.Optional; public record ScriptContext( Map args, Map envs, String code, - boolean isAwait, - RunTaskConfiguration.ProcessReturnType returnType) {} + Optional returnType) {} diff --git a/impl/script-js/src/main/java/io/serverlessworkflow/impl/executors/script/js/JavaScriptScriptTaskRunner.java b/impl/script-js/src/main/java/io/serverlessworkflow/impl/executors/script/js/JavaScriptScriptTaskRunner.java index 12540048..5b986cd4 100644 --- a/impl/script-js/src/main/java/io/serverlessworkflow/impl/executors/script/js/JavaScriptScriptTaskRunner.java +++ b/impl/script-js/src/main/java/io/serverlessworkflow/impl/executors/script/js/JavaScriptScriptTaskRunner.java @@ -55,7 +55,6 @@ public WorkflowModel runScript( WorkflowApplication application = workflowContext.definition().application(); ByteArrayOutputStream stderr = new ByteArrayOutputStream(); ByteArrayOutputStream stdout = new ByteArrayOutputStream(); - try (Context ctx = Context.newBuilder() .err(stderr) @@ -71,29 +70,26 @@ public WorkflowModel runScript( (key, val) -> { ctx.getBindings(identifier().getLang()).putMember(key, val); }); - configureProcessEnv(ctx, script.envs()); - - if (!script.isAwait()) { - application - .executorService() - .submit( - () -> { - ctx.eval(identifier().getLang(), script.code()); - }); - return application.modelFactory().fromAny(input); - } - ctx.eval(Source.create(identifier().getLang(), script.code())); - - return modelFromOutput( - script.returnType(), application.modelFactory(), stdout, () -> stderr.toString()); + return script + .returnType() + .map( + type -> + modelFromOutput( + type, application.modelFactory(), stdout, () -> stderr.toString())) + .orElse(input); } catch (PolyglotException e) { if (e.getExitStatus() != 0 || e.isSyntaxError()) { throw new WorkflowException(WorkflowError.runtime(taskContext, e).build()); } else { - return modelFromOutput( - script.returnType(), application.modelFactory(), stdout, () -> buildStderr(e, stderr)); + return script + .returnType() + .map( + type -> + modelFromOutput( + type, application.modelFactory(), stdout, () -> buildStderr(e, stderr))) + .orElse(input); } } } diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/RunShellExecutorTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/RunShellExecutorTest.java index 40f8600f..1e80ea50 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/RunShellExecutorTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/RunShellExecutorTest.java @@ -21,8 +21,6 @@ import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.executors.ProcessResult; import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; import java.util.Map; import org.assertj.core.api.SoftAssertions; import org.junit.jupiter.api.Test; @@ -127,15 +125,10 @@ void testAwaitBehavior() throws IOException { "workflows-samples/run-shell/echo-not-awaiting.yaml"); try (WorkflowApplication appl = WorkflowApplication.builder().build()) { Map inputMap = Map.of("full_name", "Matheus Cruz"); - WorkflowModel outputModel = appl.workflowDefinition(workflow).instance(inputMap).start().join(); - - String content = Files.readString(Path.of("/tmp/hello.txt")); - SoftAssertions.assertSoftly( softly -> { - softly.assertThat(content).contains("hello world not awaiting (Matheus Cruz)"); softly.assertThat(outputModel.asMap().get()).isEqualTo(inputMap); }); }