diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerRunner.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerRunner.java index 5c14a3ac..d414c82d 100644 --- a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerRunner.java +++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerRunner.java @@ -28,24 +28,21 @@ import com.github.dockerjava.core.DockerClientImpl; import com.github.dockerjava.core.NameParser; import com.github.dockerjava.httpclient5.ApacheDockerHttpClient; -import io.serverlessworkflow.api.types.Container; -import io.serverlessworkflow.api.types.ContainerLifetime; -import io.serverlessworkflow.api.types.TimeoutAfter; +import io.serverlessworkflow.api.types.ContainerLifetime.ContainerCleanupPolicy; import io.serverlessworkflow.impl.TaskContext; import io.serverlessworkflow.impl.WorkflowContext; -import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.WorkflowUtils; import io.serverlessworkflow.impl.WorkflowValueResolver; +import io.serverlessworkflow.impl.executors.CallableTask; import java.io.IOException; import java.time.Duration; -import java.util.ArrayList; import java.util.Collection; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -class ContainerRunner { +class ContainerRunner implements CallableTask { private static final DefaultDockerClientConfig DEFAULT_CONFIG = DefaultDockerClientConfig.createDefaultConfigBuilder().build(); @@ -64,14 +61,19 @@ private static class DockerClientHolder { private final ContainerCleanupPolicy policy; private final String containerImage; - private ContainerRunner(ContainerRunnerBuilder builder) { - this.propertySetters = builder.propertySetters; - this.timeout = Optional.ofNullable(builder.timeout); - this.policy = builder.policy; - this.containerImage = builder.containerImage; + public ContainerRunner( + Collection propertySetters, + Optional> timeout, + ContainerCleanupPolicy policy, + String containerImage) { + this.propertySetters = propertySetters; + this.timeout = timeout; + this.policy = policy; + this.containerImage = containerImage; } - CompletableFuture start( + @Override + public CompletableFuture apply( WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) { return CompletableFuture.supplyAsync( () -> startSync(workflowContext, taskContext, input), @@ -215,52 +217,4 @@ private static RuntimeException mapExitCode(int exit) { private static RuntimeException failed(String message) { return new RuntimeException(message); } - - static ContainerRunnerBuilder builder() { - return new ContainerRunnerBuilder(); - } - - public static class ContainerRunnerBuilder { - private Container container; - private WorkflowDefinition definition; - private WorkflowValueResolver timeout; - private ContainerCleanupPolicy policy; - private String containerImage; - private Collection propertySetters = new ArrayList<>(); - - private ContainerRunnerBuilder() {} - - ContainerRunnerBuilder withContainer(Container container) { - this.container = container; - return this; - } - - public ContainerRunnerBuilder withWorkflowDefinition(WorkflowDefinition definition) { - this.definition = definition; - return this; - } - - ContainerRunner build() { - propertySetters.add(new NamePropertySetter(definition, container)); - propertySetters.add(new CommandPropertySetter(definition, container)); - propertySetters.add(new ContainerEnvironmentPropertySetter(definition, container)); - propertySetters.add(new LifetimePropertySetter(container)); - propertySetters.add(new PortsPropertySetter(container)); - propertySetters.add(new VolumesPropertySetter(definition, container)); - - containerImage = container.getImage(); - if (containerImage == null || container.getImage().isBlank()) { - throw new IllegalArgumentException("Container image must be provided"); - } - ContainerLifetime lifetime = container.getLifetime(); - if (lifetime != null) { - policy = lifetime.getCleanup(); - TimeoutAfter afterTimeout = lifetime.getAfter(); - if (afterTimeout != null) - timeout = WorkflowUtils.fromTimeoutAfter(definition.application(), afterTimeout); - } - - return new ContainerRunner(this); - } - } } diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/RunContainerExecutor.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/RunContainerExecutor.java deleted file mode 100644 index adf7482b..00000000 --- a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/RunContainerExecutor.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright 2020-Present The Serverless Workflow Specification Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.serverlessworkflow.impl.container.executors; - -import io.serverlessworkflow.api.types.Container; -import io.serverlessworkflow.api.types.RunContainer; -import io.serverlessworkflow.api.types.RunTaskConfiguration; -import io.serverlessworkflow.impl.TaskContext; -import io.serverlessworkflow.impl.WorkflowContext; -import io.serverlessworkflow.impl.WorkflowDefinition; -import io.serverlessworkflow.impl.WorkflowModel; -import io.serverlessworkflow.impl.executors.RunnableTask; -import java.util.concurrent.CompletableFuture; - -public class RunContainerExecutor implements RunnableTask { - - private ContainerRunner containerRunner; - - public void init(RunContainer taskConfiguration, WorkflowDefinition definition) { - Container container = taskConfiguration.getContainer(); - containerRunner = - ContainerRunner.builder() - .withContainer(container) - .withWorkflowDefinition(definition) - .build(); - } - - @Override - public CompletableFuture apply( - WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) { - return containerRunner.start(workflowContext, taskContext, input); - } - - @Override - public boolean accept(Class clazz) { - return RunContainer.class.equals(clazz); - } -} diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/RunContainerExecutorBuilder.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/RunContainerExecutorBuilder.java new file mode 100644 index 00000000..d58d5647 --- /dev/null +++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/RunContainerExecutorBuilder.java @@ -0,0 +1,64 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.container.executors; + +import io.serverlessworkflow.api.types.Container; +import io.serverlessworkflow.api.types.ContainerLifetime; +import io.serverlessworkflow.api.types.ContainerLifetime.ContainerCleanupPolicy; +import io.serverlessworkflow.api.types.RunContainer; +import io.serverlessworkflow.api.types.RunTaskConfiguration; +import io.serverlessworkflow.api.types.TimeoutAfter; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowUtils; +import io.serverlessworkflow.impl.WorkflowValueResolver; +import io.serverlessworkflow.impl.executors.CallableTask; +import io.serverlessworkflow.impl.executors.RunnableTaskBuilder; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Optional; + +public class RunContainerExecutorBuilder implements RunnableTaskBuilder { + + @Override + public CallableTask build(RunContainer taskConfiguration, WorkflowDefinition definition) { + Collection propertySetters = new ArrayList<>(); + Container container = taskConfiguration.getContainer(); + propertySetters.add(new NamePropertySetter(definition, container)); + propertySetters.add(new CommandPropertySetter(definition, container)); + propertySetters.add(new ContainerEnvironmentPropertySetter(definition, container)); + propertySetters.add(new LifetimePropertySetter(container)); + propertySetters.add(new PortsPropertySetter(container)); + propertySetters.add(new VolumesPropertySetter(definition, container)); + + ContainerCleanupPolicy policy = null; + WorkflowValueResolver timeout = null; + ContainerLifetime lifetime = container.getLifetime(); + if (lifetime != null) { + policy = lifetime.getCleanup(); + TimeoutAfter afterTimeout = lifetime.getAfter(); + if (afterTimeout != null) + timeout = WorkflowUtils.fromTimeoutAfter(definition.application(), afterTimeout); + } + return new ContainerRunner( + propertySetters, Optional.ofNullable(timeout), policy, container.getImage()); + } + + @Override + public boolean accept(Class clazz) { + return RunContainer.class.equals(clazz); + } +} diff --git a/impl/container/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.RunnableTask b/impl/container/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.RunnableTaskBuilder similarity index 86% rename from impl/container/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.RunnableTask rename to impl/container/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.RunnableTaskBuilder index c1450d21..606ecc83 100644 --- a/impl/container/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.RunnableTask +++ b/impl/container/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.RunnableTaskBuilder @@ -1 +1 @@ -io.serverlessworkflow.impl.container.executors.RunContainerExecutor \ No newline at end of file +io.serverlessworkflow.impl.container.executors.RunContainerExecutorBuilder \ No newline at end of file diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java index a808650c..b724c012 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java @@ -48,7 +48,7 @@ public class WorkflowMutableInstance implements WorkflowInstance { protected AtomicReference> futureRef = new AtomicReference<>(); protected Instant completedAt; - protected final Map additionalObjects = new ConcurrentHashMap(); + protected final Map additionalObjects = new ConcurrentHashMap<>(); private Lock statusLock = new ReentrantLock(); private Map, TaskContext> suspended; diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/config/AbstractConfigManager.java b/impl/core/src/main/java/io/serverlessworkflow/impl/config/AbstractConfigManager.java index f12c341f..ddb7b5dd 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/config/AbstractConfigManager.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/config/AbstractConfigManager.java @@ -17,6 +17,9 @@ import java.time.Instant; import java.time.OffsetDateTime; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.Optional; public abstract class AbstractConfigManager implements ConfigManager { @@ -56,5 +59,19 @@ protected T convert(String value, Class propClass) { return propClass.cast(result); } + @Override + public Collection multiConfig(String propName, Class propClass) { + String multiValue = get(propName); + if (multiValue != null) { + Collection result = new ArrayList<>(); + for (String value : multiValue.split(",")) { + result.add(convert(value, propClass)); + } + return result; + } else { + return Collections.emptyList(); + } + } + protected abstract T convertComplex(String value, Class propClass); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/config/ConfigManager.java b/impl/core/src/main/java/io/serverlessworkflow/impl/config/ConfigManager.java index 08f2fb0e..344ea961 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/config/ConfigManager.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/config/ConfigManager.java @@ -16,11 +16,17 @@ package io.serverlessworkflow.impl.config; import io.serverlessworkflow.impl.ServicePriority; +import java.util.Collection; +import java.util.List; import java.util.Optional; public interface ConfigManager extends ServicePriority { Optional config(String propName, Class propClass); + default Collection multiConfig(String propName, Class propClass) { + return List.of(); + } + Iterable names(); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunScriptExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunScriptExecutor.java index 32e13b0c..c6003cc0 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunScriptExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunScriptExecutor.java @@ -15,86 +15,42 @@ */ package io.serverlessworkflow.impl.executors; -import io.serverlessworkflow.api.types.RunScript; import io.serverlessworkflow.api.types.RunTaskConfiguration; -import io.serverlessworkflow.api.types.Script; -import io.serverlessworkflow.api.types.ScriptUnion; +import io.serverlessworkflow.api.types.RunTaskConfiguration.ProcessReturnType; import io.serverlessworkflow.impl.TaskContext; -import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowContext; -import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowError; +import io.serverlessworkflow.impl.WorkflowException; import io.serverlessworkflow.impl.WorkflowModel; -import io.serverlessworkflow.impl.WorkflowUtils; import io.serverlessworkflow.impl.WorkflowValueResolver; -import io.serverlessworkflow.impl.resources.ResourceLoaderUtils; import io.serverlessworkflow.impl.scripts.ScriptContext; -import io.serverlessworkflow.impl.scripts.ScriptLanguageId; import io.serverlessworkflow.impl.scripts.ScriptRunner; import java.util.Map; -import java.util.Objects; import java.util.Optional; -import java.util.ServiceLoader; import java.util.concurrent.CompletableFuture; -public class RunScriptExecutor implements RunnableTask { +public class RunScriptExecutor implements CallableTask { - private Optional>> environmentExpr; - private Optional>> argumentExpr; - private WorkflowValueResolver codeSupplier; - private boolean isAwait; - private Optional returnType; - private ScriptRunner taskRunner; + private final Optional>> environmentExpr; + private final Optional>> argumentExpr; + private final WorkflowValueResolver codeSupplier; + private final boolean isAwait; + private final RunTaskConfiguration.ProcessReturnType returnType; + private final ScriptRunner taskRunner; - @Override - public void init(RunScript taskConfiguration, WorkflowDefinition definition) { - ScriptUnion scriptUnion = taskConfiguration.getScript(); - Script script = scriptUnion.get(); - ScriptLanguageId language = ScriptLanguageId.from(script.getLanguage()); - - this.taskRunner = - ServiceLoader.load(ScriptRunner.class).stream() - .map(ServiceLoader.Provider::get) - .filter(s -> s.identifier().equals(language)) - .findFirst() - .orElseThrow( - () -> - new IllegalStateException( - "No script runner implementation found for language " + language)); - - this.isAwait = taskConfiguration.isAwait(); - - this.returnType = Optional.ofNullable(taskConfiguration.getReturn()); - - WorkflowApplication application = definition.application(); - this.environmentExpr = - script.getEnvironment() != null && script.getEnvironment().getAdditionalProperties() != null - ? Optional.of( - WorkflowUtils.buildMapResolver( - application, script.getEnvironment().getAdditionalProperties())) - : Optional.empty(); - - this.argumentExpr = - script.getArguments() != null && script.getArguments().getAdditionalProperties() != null - ? Optional.of( - WorkflowUtils.buildMapResolver( - application, script.getArguments().getAdditionalProperties())) - : Optional.empty(); - - this.codeSupplier = - scriptUnion.getInlineScript() != null - ? WorkflowUtils.buildStringFilter(application, scriptUnion.getInlineScript().getCode()) - : (w, t, m) -> - definition - .resourceLoader() - .load( - Objects.requireNonNull( - scriptUnion.getExternalScript(), - "External script is required if inline script was not set") - .getSource(), - ResourceLoaderUtils::readString, - w, - t, - m); + public RunScriptExecutor( + Optional>> environmentExpr, + Optional>> argumentExpr, + WorkflowValueResolver codeSupplier, + boolean isAwait, + ProcessReturnType returnType, + ScriptRunner taskRunner) { + this.environmentExpr = environmentExpr; + this.argumentExpr = argumentExpr; + this.codeSupplier = codeSupplier; + this.isAwait = isAwait; + this.returnType = returnType; + this.taskRunner = taskRunner; } @Override @@ -108,20 +64,27 @@ public CompletableFuture apply( returnType); if (isAwait) { return CompletableFuture.supplyAsync( - () -> taskRunner.runScript(scriptContext, workflowContext, taskContext, input), + () -> runScript(scriptContext, workflowContext, taskContext, input), workflowContext.definition().application().executorService()); } else { workflowContext .definition() .application() .executorService() - .submit(() -> taskRunner.runScript(scriptContext, workflowContext, taskContext, input)); + .submit(() -> runScript(scriptContext, workflowContext, taskContext, input)); return CompletableFuture.completedFuture(input); } } - @Override - public boolean accept(Class clazz) { - return RunScript.class.equals(clazz); + private WorkflowModel runScript( + ScriptContext scriptContext, + WorkflowContext workflowContext, + TaskContext taskContext, + WorkflowModel input) { + try { + return taskRunner.runScript(scriptContext, workflowContext, taskContext, input); + } catch (Exception ex) { + throw new WorkflowException(WorkflowError.runtime(taskContext, ex).build()); + } } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunScriptExecutorBuilder.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunScriptExecutorBuilder.java new file mode 100644 index 00000000..f5185090 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunScriptExecutorBuilder.java @@ -0,0 +1,82 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors; + +import io.serverlessworkflow.api.types.RunScript; +import io.serverlessworkflow.api.types.RunTaskConfiguration; +import io.serverlessworkflow.api.types.Script; +import io.serverlessworkflow.api.types.ScriptUnion; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowUtils; +import io.serverlessworkflow.impl.resources.ResourceLoaderUtils; +import io.serverlessworkflow.impl.scripts.ScriptLanguageId; +import io.serverlessworkflow.impl.scripts.ScriptRunner; +import java.util.Objects; +import java.util.Optional; +import java.util.ServiceLoader; + +public class RunScriptExecutorBuilder implements RunnableTaskBuilder { + + @Override + public CallableTask build(RunScript taskConfiguration, WorkflowDefinition definition) { + ScriptUnion scriptUnion = taskConfiguration.getScript(); + Script script = scriptUnion.get(); + ScriptLanguageId language = ScriptLanguageId.from(script.getLanguage()); + WorkflowApplication application = definition.application(); + + return new RunScriptExecutor( + script.getEnvironment() != null && script.getEnvironment().getAdditionalProperties() != null + ? Optional.of( + WorkflowUtils.buildMapResolver( + application, script.getEnvironment().getAdditionalProperties())) + : Optional.empty(), + script.getArguments() != null && script.getArguments().getAdditionalProperties() != null + ? Optional.of( + WorkflowUtils.buildMapResolver( + application, script.getArguments().getAdditionalProperties())) + : Optional.empty(), + scriptUnion.getInlineScript() != null + ? WorkflowUtils.buildStringFilter(application, scriptUnion.getInlineScript().getCode()) + : (w, t, m) -> + definition + .resourceLoader() + .load( + Objects.requireNonNull( + scriptUnion.getExternalScript(), + "External script is required if inline script was not set") + .getSource(), + ResourceLoaderUtils::readString, + w, + t, + m), + taskConfiguration.isAwait(), + taskConfiguration.getReturn(), + ServiceLoader.load(ScriptRunner.class).stream() + .map(ServiceLoader.Provider::get) + .filter(s -> s.identifier().equals(language)) + .findFirst() + .orElseThrow( + () -> + new IllegalStateException( + "No script runner implementation found for language " + language))); + } + + @Override + public boolean accept(Class clazz) { + return RunScript.class.equals(clazz); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunShellExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunShellExecutor.java index 08931cb3..349c4a2a 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunShellExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunShellExecutor.java @@ -15,33 +15,36 @@ */ package io.serverlessworkflow.impl.executors; -import io.serverlessworkflow.api.types.RunShell; -import io.serverlessworkflow.api.types.RunTaskConfiguration; +import static io.serverlessworkflow.impl.scripts.ScriptUtils.uncheckedStart; + import io.serverlessworkflow.api.types.RunTaskConfiguration.ProcessReturnType; -import io.serverlessworkflow.api.types.Shell; import io.serverlessworkflow.impl.TaskContext; import io.serverlessworkflow.impl.WorkflowContext; -import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowModel; -import io.serverlessworkflow.impl.WorkflowModelFactory; -import io.serverlessworkflow.impl.WorkflowUtils; import io.serverlessworkflow.impl.WorkflowValueResolver; -import java.io.IOException; -import java.io.UncheckedIOException; -import java.nio.charset.StandardCharsets; -import java.util.LinkedHashMap; +import io.serverlessworkflow.impl.scripts.ScriptUtils; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.stream.Collectors; - -public class RunShellExecutor implements RunnableTask { - private WorkflowValueResolver shellCommand; - private Map, Optional>> +public class RunShellExecutor implements CallableTask { + private final WorkflowValueResolver shellCommand; + private final Map, Optional>> shellArguments; - private Optional>> shellEnv; - private Optional returnType; + private final Optional>> shellEnv; + private final Optional returnType; + + public RunShellExecutor( + WorkflowValueResolver shellCommand, + Map, Optional>> shellArguments, + Optional>> shellEnv, + Optional returnType) { + super(); + this.shellCommand = shellCommand; + this.shellArguments = shellArguments; + this.shellEnv = shellEnv; + this.returnType = returnType; + } @Override public CompletableFuture apply( @@ -55,26 +58,17 @@ public CompletableFuture apply( .ifPresent( v -> commandBuilder.append("=").append(v.apply(workflowContext, taskContext, model))); } - ProcessBuilder builder = new ProcessBuilder("sh", "-c", commandBuilder.toString()); shellEnv.ifPresent( - map -> { - for (Map.Entry entry : - map.apply(workflowContext, taskContext, model).entrySet()) { - builder.environment().put(entry.getKey(), (String) entry.getValue()); - } - }); + map -> ScriptUtils.addEnviromment(builder, map.apply(workflowContext, taskContext, model))); return returnType .map( type -> CompletableFuture.supplyAsync( () -> - buildResultFromProcess( - workflowContext.definition().application().modelFactory(), - uncheckedStart(builder), - type) - .orElse(model), + ScriptUtils.buildResultFromProcess( + workflowContext.definition(), uncheckedStart(builder), type, model), workflowContext.definition().application().executorService())) .orElseGet( () -> { @@ -86,77 +80,4 @@ public CompletableFuture apply( return CompletableFuture.completedFuture(model); }); } - - private Process uncheckedStart(ProcessBuilder builder) { - try { - return builder.start(); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - - @Override - public void init(RunShell taskConfiguration, WorkflowDefinition definition) { - Shell shell = taskConfiguration.getShell(); - if (!WorkflowUtils.isValid(taskConfiguration.getShell().getCommand())) { - throw new IllegalStateException("Missing shell command in RunShell task configuration"); - } - shellCommand = - WorkflowUtils.buildStringFilter( - definition.application(), taskConfiguration.getShell().getCommand()); - - shellArguments = - shell.getArguments() != null && shell.getArguments().getAdditionalProperties() != null - ? shell.getArguments().getAdditionalProperties().entrySet().stream() - .collect( - Collectors.toMap( - e -> WorkflowUtils.buildStringFilter(definition.application(), e.getKey()), - e -> - e.getValue() != null - ? Optional.of( - WorkflowUtils.buildStringFilter( - definition.application(), e.getValue().toString())) - : Optional.empty(), - (x, y) -> y, - LinkedHashMap::new)) - : Map.of(); - - shellEnv = - shell.getEnvironment() != null && shell.getEnvironment().getAdditionalProperties() != null - ? Optional.of( - WorkflowUtils.buildMapResolver( - definition.application(), shell.getEnvironment().getAdditionalProperties())) - : Optional.empty(); - - returnType = - taskConfiguration.isAwait() ? Optional.of(taskConfiguration.getReturn()) : Optional.empty(); - } - - private static Optional buildResultFromProcess( - WorkflowModelFactory modelFactory, Process process, ProcessReturnType type) { - try { - int exitCode = process.waitFor(); - String stdout = new String(process.getInputStream().readAllBytes(), StandardCharsets.UTF_8); - String stderr = new String(process.getErrorStream().readAllBytes(), StandardCharsets.UTF_8); - return Optional.of( - switch (type) { - case ALL -> - modelFactory.fromAny(new ProcessResult(exitCode, stdout.trim(), stderr.trim())); - case NONE -> modelFactory.fromNull(); - case CODE -> modelFactory.from(exitCode); - case STDOUT -> modelFactory.from(stdout.trim()); - case STDERR -> modelFactory.from(stderr.trim()); - }); - } catch (IOException e) { - throw new UncheckedIOException(e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return Optional.empty(); - } - } - - @Override - public boolean accept(Class clazz) { - return RunShell.class.equals(clazz); - } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunShellExecutorBuilder.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunShellExecutorBuilder.java new file mode 100644 index 00000000..ce31cefa --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunShellExecutorBuilder.java @@ -0,0 +1,67 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors; + +import io.serverlessworkflow.api.types.RunShell; +import io.serverlessworkflow.api.types.RunTaskConfiguration; +import io.serverlessworkflow.api.types.Shell; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowUtils; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +public class RunShellExecutorBuilder implements RunnableTaskBuilder { + + @Override + public CallableTask build(RunShell taskConfiguration, WorkflowDefinition definition) { + Shell shell = taskConfiguration.getShell(); + if (!WorkflowUtils.isValid(taskConfiguration.getShell().getCommand())) { + throw new IllegalStateException("Missing shell command in RunShell task configuration"); + } + return new RunShellExecutor( + WorkflowUtils.buildStringFilter( + definition.application(), taskConfiguration.getShell().getCommand()), + shell.getArguments() != null + ? shell.getArguments().getAdditionalProperties().entrySet().stream() + .collect( + Collectors.toMap( + e -> WorkflowUtils.buildStringFilter(definition.application(), e.getKey()), + e -> + e.getValue() != null + ? Optional.of( + WorkflowUtils.buildStringFilter( + definition.application(), e.getValue().toString())) + : Optional.empty(), + (x, y) -> y, + LinkedHashMap::new)) + : Map.of(), + shell.getEnvironment() != null + ? Optional.of( + WorkflowUtils.buildMapResolver( + definition.application(), shell.getEnvironment().getAdditionalProperties())) + : Optional.empty(), + taskConfiguration.isAwait() + ? Optional.of(taskConfiguration.getReturn()) + : Optional.empty()); + } + + @Override + public boolean accept(Class clazz) { + return RunShell.class.equals(clazz); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunTaskExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunTaskExecutor.java index 5eeedddc..5bd26a1b 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunTaskExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunTaskExecutor.java @@ -26,15 +26,15 @@ import java.util.ServiceLoader.Provider; import java.util.concurrent.CompletableFuture; -public class RunTaskExecutor extends RegularTaskExecutor { +public class RunTaskExecutor extends RegularTaskExecutor { - private final RunnableTask runnable; + private final CallableTask runnable; - private static final ServiceLoader runnables = - ServiceLoader.load(RunnableTask.class); + private static final ServiceLoader runnables = + ServiceLoader.load(RunnableTaskBuilder.class); public static class RunTaskExecutorBuilder extends RegularTaskExecutorBuilder { - private RunnableTask runnable; + private CallableTask runnable; protected RunTaskExecutorBuilder( WorkflowMutablePosition position, RunTask task, WorkflowDefinition definition) { @@ -45,11 +45,11 @@ protected RunTaskExecutorBuilder( .map(Provider::get) .filter(r -> r.accept(config.getClass())) .findFirst() + .map(r -> r.build(config, definition)) .orElseThrow( () -> new UnsupportedOperationException( "No runnable found for operation " + config.getClass())); - runnable.init(config, definition); } @Override diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunWorkflowExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunWorkflowExecutor.java index 66cf27e5..cf0a8faa 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunWorkflowExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunWorkflowExecutor.java @@ -15,28 +15,24 @@ */ package io.serverlessworkflow.impl.executors; -import io.serverlessworkflow.api.types.RunTaskConfiguration; -import io.serverlessworkflow.api.types.RunWorkflow; -import io.serverlessworkflow.api.types.SubflowConfiguration; import io.serverlessworkflow.impl.TaskContext; import io.serverlessworkflow.impl.WorkflowContext; import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowDefinitionId; import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowValueResolver; import java.util.Map; import java.util.concurrent.CompletableFuture; -public class RunWorkflowExecutor implements RunnableTask { +public class RunWorkflowExecutor implements CallableTask { + private final WorkflowDefinitionId workflowDefinitionId; + private final WorkflowValueResolver> additionalParameters; - private WorkflowDefinitionId workflowDefinitionId; - private Map additionalParameters; - - public void init(RunWorkflow taskConfiguration, WorkflowDefinition definition) { - SubflowConfiguration workflowConfig = taskConfiguration.getWorkflow(); - this.workflowDefinitionId = - new WorkflowDefinitionId( - workflowConfig.getNamespace(), workflowConfig.getName(), workflowConfig.getVersion()); - this.additionalParameters = workflowConfig.getInput().getAdditionalProperties(); + public RunWorkflowExecutor( + WorkflowDefinitionId workflowDefinitionId, + WorkflowValueResolver> additionalParameters) { + this.workflowDefinitionId = workflowDefinitionId; + this.additionalParameters = additionalParameters; } @Override @@ -44,17 +40,16 @@ public CompletableFuture apply( WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) { WorkflowDefinition definition = workflowContext.definition().application().workflowDefinitions().get(workflowDefinitionId); - if (definition != null) { - // TODO add additional parameters - return definition.instance(input).start(); - } else { + if (definition == null) { throw new IllegalArgumentException( "Workflow definition for " + workflowDefinitionId + " has not been found"); } - } - - @Override - public boolean accept(Class clazz) { - return RunWorkflow.class.equals(clazz); + Map args = additionalParameters.apply(workflowContext, taskContext, input); + return definition + .instance( + !args.isEmpty() + ? workflowContext.definition().application().modelFactory().from(args) + : input) + .start(); } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunWorkflowExecutorBuilder.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunWorkflowExecutorBuilder.java new file mode 100644 index 00000000..71208c43 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunWorkflowExecutorBuilder.java @@ -0,0 +1,40 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors; + +import io.serverlessworkflow.api.types.RunTaskConfiguration; +import io.serverlessworkflow.api.types.RunWorkflow; +import io.serverlessworkflow.api.types.SubflowConfiguration; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowDefinitionId; +import io.serverlessworkflow.impl.WorkflowUtils; + +public class RunWorkflowExecutorBuilder implements RunnableTaskBuilder { + + public CallableTask build(RunWorkflow taskConfiguration, WorkflowDefinition definition) { + SubflowConfiguration workflowConfig = taskConfiguration.getWorkflow(); + return new RunWorkflowExecutor( + new WorkflowDefinitionId( + workflowConfig.getNamespace(), workflowConfig.getName(), workflowConfig.getVersion()), + WorkflowUtils.buildMapResolver( + definition.application(), workflowConfig.getInput().getAdditionalProperties())); + } + + @Override + public boolean accept(Class clazz) { + return RunWorkflow.class.equals(clazz); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunnableTask.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunnableTaskBuilder.java similarity index 65% rename from impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunnableTask.java rename to impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunnableTaskBuilder.java index d6bf032e..eb0f4ba0 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunnableTask.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunnableTaskBuilder.java @@ -16,17 +16,10 @@ package io.serverlessworkflow.impl.executors; import io.serverlessworkflow.api.types.RunTaskConfiguration; -import io.serverlessworkflow.impl.TaskContext; -import io.serverlessworkflow.impl.WorkflowContext; import io.serverlessworkflow.impl.WorkflowDefinition; -import io.serverlessworkflow.impl.WorkflowModel; -import java.util.concurrent.CompletableFuture; - -public interface RunnableTask { - default void init(T taskConfiguration, WorkflowDefinition definition) {} - - CompletableFuture apply( - WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input); +public interface RunnableTaskBuilder { boolean accept(Class clazz); + + CallableTask build(T taskConfiguration, WorkflowDefinition definition); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/scripts/ScriptContext.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scripts/ScriptContext.java index 482febf7..44da27bc 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/scripts/ScriptContext.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scripts/ScriptContext.java @@ -17,10 +17,9 @@ import io.serverlessworkflow.api.types.RunTaskConfiguration; import java.util.Map; -import java.util.Optional; public record ScriptContext( Map args, Map envs, String code, - Optional returnType) {} + RunTaskConfiguration.ProcessReturnType returnType) {} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/scripts/ScriptUtils.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scripts/ScriptUtils.java new file mode 100644 index 00000000..3eb20a84 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scripts/ScriptUtils.java @@ -0,0 +1,92 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.scripts; + +import io.serverlessworkflow.api.types.RunTaskConfiguration; +import io.serverlessworkflow.api.types.RunTaskConfiguration.ProcessReturnType; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowModelFactory; +import io.serverlessworkflow.impl.executors.ProcessResult; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ScriptUtils { + + private static final Logger logger = LoggerFactory.getLogger(ScriptUtils.class); + + public static WorkflowModel modelFromOutput( + int code, + StreamSuppliers streamSuppliers, + RunTaskConfiguration.ProcessReturnType returnType, + WorkflowModelFactory modelFactory, + WorkflowModel model) { + if (code != 0) { + logger.warn("Process call failed with code {}", code); + if (logger.isDebugEnabled()) { + logger.debug(streamSuppliers.errorStream().get()); + } + } + return switch (returnType) { + case ALL -> + modelFactory.fromAny( + new ProcessResult( + 0, streamSuppliers.outputStream().get(), streamSuppliers.errorStream().get())); + case NONE -> model; + case CODE -> modelFactory.from(0); + case STDOUT -> modelFactory.from(streamSuppliers.outputStream().get()); + case STDERR -> modelFactory.from(streamSuppliers.errorStream().get()); + }; + } + + public static Process uncheckedStart(ProcessBuilder builder) { + try { + return builder.start(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + public static void addEnviromment(ProcessBuilder builder, Map env) { + for (Map.Entry entry : env.entrySet()) { + builder.environment().put(entry.getKey(), (String) entry.getValue()); + } + } + + public static WorkflowModel buildResultFromProcess( + WorkflowDefinition definition, Process process, ProcessReturnType type, WorkflowModel model) { + try { + int code = process.waitFor(); + StreamSuppliers suppliers = StreamSuppliers.from(process); + if (definition + .application() + .configManager() + .config("io.serverlessworkflow.impl.scripts.dumpOutput", Boolean.class) + .orElse(true)) { + System.out.println(suppliers.outputStream().get()); + } + return modelFromOutput(code, suppliers, type, definition.application().modelFactory(), model); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return model; + } + } + + private ScriptUtils() {} +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/scripts/StreamSuppliers.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scripts/StreamSuppliers.java new file mode 100644 index 00000000..2e21cb89 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scripts/StreamSuppliers.java @@ -0,0 +1,77 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.scripts; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.nio.charset.StandardCharsets; +import java.util.function.Supplier; + +public record StreamSuppliers(Supplier outputStream, Supplier errorStream) { + + public static StreamSuppliers from(Process process) { + return new StreamSuppliers( + new InputStreamSupplier(process.getInputStream()), + new InputStreamSupplier(process.getErrorStream())); + } + + public static StreamSuppliers from(ByteArrayOutputStream stdout, ByteArrayOutputStream stderr) { + return new StreamSuppliers( + new ByteArrayStreamSupplier(stdout), new ByteArrayStreamSupplier(stderr)); + } + + private static class ByteArrayStreamSupplier implements Supplier { + + private String value; + private final ByteArrayOutputStream stream; + + public ByteArrayStreamSupplier(ByteArrayOutputStream stream) { + this.stream = stream; + } + + @Override + public String get() { + if (value == null) { + value = stream.toString().trim(); + } + return value; + } + } + + private static class InputStreamSupplier implements Supplier { + + private String value; + private final InputStream stream; + + public InputStreamSupplier(InputStream stream) { + this.stream = stream; + } + + @Override + public String get() { + if (value == null) { + try { + value = new String(stream.readAllBytes(), StandardCharsets.UTF_8); + } catch (IOException ex) { + throw new UncheckedIOException(ex); + } + } + return value; + } + } +} diff --git a/impl/core/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.RunnableTask b/impl/core/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.RunnableTask deleted file mode 100644 index ea1bb37e..00000000 --- a/impl/core/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.RunnableTask +++ /dev/null @@ -1,3 +0,0 @@ -io.serverlessworkflow.impl.executors.RunWorkflowExecutor -io.serverlessworkflow.impl.executors.RunShellExecutor -io.serverlessworkflow.impl.executors.RunScriptExecutor \ No newline at end of file diff --git a/impl/core/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.RunnableTaskBuilder b/impl/core/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.RunnableTaskBuilder new file mode 100644 index 00000000..6f0679b4 --- /dev/null +++ b/impl/core/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.RunnableTaskBuilder @@ -0,0 +1,3 @@ +io.serverlessworkflow.impl.executors.RunWorkflowExecutorBuilder +io.serverlessworkflow.impl.executors.RunShellExecutorBuilder +io.serverlessworkflow.impl.executors.RunScriptExecutorBuilder \ No newline at end of file diff --git a/impl/jackson/pom.xml b/impl/jackson/pom.xml index 1f3bae03..1a4176fd 100644 --- a/impl/jackson/pom.xml +++ b/impl/jackson/pom.xml @@ -7,7 +7,7 @@ serverlessworkflow-impl-jackson Serverless Workflow :: Impl :: Jackson - + io.serverlessworkflow serverlessworkflow-impl-core @@ -36,5 +36,5 @@ io.serverlessworkflow serverlessworkflow-impl-lifecycle-events - + \ No newline at end of file diff --git a/impl/pom.xml b/impl/pom.xml index b052dd52..ec4b036f 100644 --- a/impl/pom.xml +++ b/impl/pom.xml @@ -142,6 +142,11 @@ serverlessworkflow-impl-script-js ${project.version} + + io.serverlessworkflow + serverlessworkflow-impl-script-python + ${project.version} + com.cronutils cron-utils @@ -187,5 +192,6 @@ container test script-js + python diff --git a/impl/python/pom.xml b/impl/python/pom.xml new file mode 100644 index 00000000..13d7c4d2 --- /dev/null +++ b/impl/python/pom.xml @@ -0,0 +1,16 @@ + + 4.0.0 + + io.serverlessworkflow + serverlessworkflow-impl + 8.0.0-SNAPSHOT + + serverlessworkflow-impl-script-python + Serverless Workflow :: Impl :: Script Python + + + io.serverlessworkflow + serverlessworkflow-impl-core + + + \ No newline at end of file diff --git a/impl/python/src/main/java/io/serverlessworkflow/impl/executors/script/python/PythonScriptTaskRunner.java b/impl/python/src/main/java/io/serverlessworkflow/impl/executors/script/python/PythonScriptTaskRunner.java new file mode 100644 index 00000000..d7129090 --- /dev/null +++ b/impl/python/src/main/java/io/serverlessworkflow/impl/executors/script/python/PythonScriptTaskRunner.java @@ -0,0 +1,65 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.script.python; + +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.scripts.ScriptContext; +import io.serverlessworkflow.impl.scripts.ScriptLanguageId; +import io.serverlessworkflow.impl.scripts.ScriptRunner; +import io.serverlessworkflow.impl.scripts.ScriptUtils; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PythonScriptTaskRunner implements ScriptRunner { + + private static final Logger logger = LoggerFactory.getLogger(PythonScriptTaskRunner.class); + + @Override + public ScriptLanguageId identifier() { + return ScriptLanguageId.PYTHON; + } + + @Override + public WorkflowModel runScript( + ScriptContext scriptContext, + WorkflowContext workflowContext, + TaskContext taskContext, + WorkflowModel model) { + ProcessBuilder builder = new ProcessBuilder("python", "-c", scriptContext.code()); + ScriptUtils.addEnviromment(builder, scriptContext.envs()); + scriptContext.args().forEach((k, v) -> addArg(builder.command(), k, v)); + logger.debug("Invoking python with command line {}", builder.command()); + return ScriptUtils.buildResultFromProcess( + workflowContext.definition(), + ScriptUtils.uncheckedStart(builder), + scriptContext.returnType(), + model); + } + + private void addArg(List list, String name, Object value) { + if (value instanceof Boolean bool) { + if (bool) { + list.add("--" + name); + } + } else if (value != null) { + list.add("--" + name); + list.add(value.toString()); + } + } +} diff --git a/impl/python/src/main/resources/META-INF/services/io.serverlessworkflow.impl.scripts.ScriptRunner b/impl/python/src/main/resources/META-INF/services/io.serverlessworkflow.impl.scripts.ScriptRunner new file mode 100644 index 00000000..347fa206 --- /dev/null +++ b/impl/python/src/main/resources/META-INF/services/io.serverlessworkflow.impl.scripts.ScriptRunner @@ -0,0 +1 @@ +io.serverlessworkflow.impl.executors.script.python.PythonScriptTaskRunner \ No newline at end of file diff --git a/impl/script-js/src/main/java/io/serverlessworkflow/impl/executors/script/js/JavaScriptScriptTaskRunner.java b/impl/script-js/src/main/java/io/serverlessworkflow/impl/executors/script/js/JavaScriptScriptTaskRunner.java index 5b986cd4..dce8f159 100644 --- a/impl/script-js/src/main/java/io/serverlessworkflow/impl/executors/script/js/JavaScriptScriptTaskRunner.java +++ b/impl/script-js/src/main/java/io/serverlessworkflow/impl/executors/script/js/JavaScriptScriptTaskRunner.java @@ -15,23 +15,17 @@ */ package io.serverlessworkflow.impl.executors.script.js; -import io.serverlessworkflow.api.types.RunTaskConfiguration; import io.serverlessworkflow.impl.TaskContext; -import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowContext; -import io.serverlessworkflow.impl.WorkflowError; -import io.serverlessworkflow.impl.WorkflowException; import io.serverlessworkflow.impl.WorkflowModel; -import io.serverlessworkflow.impl.WorkflowModelFactory; -import io.serverlessworkflow.impl.executors.ProcessResult; import io.serverlessworkflow.impl.scripts.ScriptContext; import io.serverlessworkflow.impl.scripts.ScriptLanguageId; import io.serverlessworkflow.impl.scripts.ScriptRunner; +import io.serverlessworkflow.impl.scripts.ScriptUtils; +import io.serverlessworkflow.impl.scripts.StreamSuppliers; import java.io.ByteArrayOutputStream; import java.util.Map; -import java.util.function.Supplier; import org.graalvm.polyglot.Context; -import org.graalvm.polyglot.PolyglotException; import org.graalvm.polyglot.Source; import org.graalvm.polyglot.Value; @@ -46,81 +40,6 @@ public ScriptLanguageId identifier() { return ScriptLanguageId.JS; } - @Override - public WorkflowModel runScript( - ScriptContext script, - WorkflowContext workflowContext, - TaskContext taskContext, - WorkflowModel input) { - WorkflowApplication application = workflowContext.definition().application(); - ByteArrayOutputStream stderr = new ByteArrayOutputStream(); - ByteArrayOutputStream stdout = new ByteArrayOutputStream(); - try (Context ctx = - Context.newBuilder() - .err(stderr) - .out(stdout) - .useSystemExit(true) - .allowCreateProcess(false) - .option("engine.WarnInterpreterOnly", "false") - .build()) { - - script - .args() - .forEach( - (key, val) -> { - ctx.getBindings(identifier().getLang()).putMember(key, val); - }); - configureProcessEnv(ctx, script.envs()); - ctx.eval(Source.create(identifier().getLang(), script.code())); - return script - .returnType() - .map( - type -> - modelFromOutput( - type, application.modelFactory(), stdout, () -> stderr.toString())) - .orElse(input); - } catch (PolyglotException e) { - if (e.getExitStatus() != 0 || e.isSyntaxError()) { - throw new WorkflowException(WorkflowError.runtime(taskContext, e).build()); - } else { - return script - .returnType() - .map( - type -> - modelFromOutput( - type, application.modelFactory(), stdout, () -> buildStderr(e, stderr))) - .orElse(input); - } - } - } - - private WorkflowModel modelFromOutput( - RunTaskConfiguration.ProcessReturnType returnType, - WorkflowModelFactory modelFactory, - ByteArrayOutputStream stdout, - Supplier stderr) { - return switch (returnType) { - case ALL -> - modelFactory.fromAny(new ProcessResult(0, stdout.toString().trim(), stderr.get().trim())); - case NONE -> modelFactory.fromNull(); - case CODE -> modelFactory.from(0); - case STDOUT -> modelFactory.from(stdout.toString().trim()); - case STDERR -> modelFactory.from(stderr.get().trim()); - }; - } - - /* - * Gets the stderr message from the PolyglotException or the stderr stream. - * - * @param e the {@link PolyglotException} thrown during script execution - * @param stderr the stderr stream - * @return the stderr message - */ - private String buildStderr(PolyglotException e, ByteArrayOutputStream stderr) { - String err = stderr.toString(); - return err.isBlank() ? e.getMessage() : err.trim(); - } - /* * Configures the process.env object in the JavaScript context with the provided environment * variables. @@ -138,4 +57,32 @@ private void configureProcessEnv(Context context, Map envs) { } bindings.putMember("process", process); } + + @Override + public WorkflowModel runScript( + ScriptContext scriptContext, + WorkflowContext workflowContext, + TaskContext taskContext, + WorkflowModel model) { + ByteArrayOutputStream stdout = new ByteArrayOutputStream(); + ByteArrayOutputStream stderr = new ByteArrayOutputStream(); + try (Context ctx = + Context.newBuilder() + .err(stderr) + .out(stdout) + .useSystemExit(true) + .allowCreateProcess(false) + .option("engine.WarnInterpreterOnly", "false") + .build()) { + scriptContext.args().forEach(ctx.getBindings(identifier().getLang())::putMember); + configureProcessEnv(ctx, scriptContext.envs()); + ctx.eval(Source.create(identifier().getLang(), scriptContext.code())); + return ScriptUtils.modelFromOutput( + 0, + StreamSuppliers.from(stdout, stderr), + scriptContext.returnType(), + workflowContext.definition().application().modelFactory(), + model); + } + } } diff --git a/impl/test/pom.xml b/impl/test/pom.xml index 444ae5c1..fd47aae9 100644 --- a/impl/test/pom.xml +++ b/impl/test/pom.xml @@ -37,6 +37,10 @@ io.serverlessworkflow serverlessworkflow-impl-script-js + + io.serverlessworkflow + serverlessworkflow-impl-script-python + org.glassfish.jersey.media jersey-media-json-jackson diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/CustomFunctionTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/CustomFunctionTest.java new file mode 100644 index 00000000..ab795c06 --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/CustomFunctionTest.java @@ -0,0 +1,77 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test; + +import static io.serverlessworkflow.api.WorkflowReader.readWorkflowFromClasspath; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowException; +import java.io.IOException; +import java.util.Map; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +public class CustomFunctionTest { + + private static WorkflowApplication app; + + @BeforeAll + static void init() { + app = WorkflowApplication.builder().build(); + } + + @AfterAll + static void cleanup() { + app.close(); + } + + @Test + void testCustomFunction() { + assertThatThrownBy( + () -> + app.workflowDefinition( + readWorkflowFromClasspath( + "workflows-samples/call-custom-function-inline.yaml")) + .instance(Map.of()) + .start() + .join()) + .hasCauseInstanceOf(WorkflowException.class) + .extracting(w -> ((WorkflowException) w.getCause()).getWorkflowError().status()) + .isEqualTo(404); + } + + @ParameterizedTest + @ValueSource( + strings = { + "workflows-samples/call-custom-function-cataloged.yaml", + "workflows-samples/call-custom-function-cataloged-global.yaml" + }) + void testCustomCatalogFunction(String fileName) throws IOException { + assertThat( + app.workflowDefinition(readWorkflowFromClasspath(fileName)) + .instance(Map.of()) + .start() + .join() + .asText() + .orElseThrow()) + .contains("Hello"); + } +} diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/RetryTimeoutTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/RetryTimeoutTest.java index c60e5b73..2a217fc0 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/RetryTimeoutTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/RetryTimeoutTest.java @@ -121,36 +121,4 @@ void testTimeout() throws IOException { .orElseThrow(); assertThat(result.get("message")).isEqualTo("Viva er Beti Balompie"); } - - @Test - void testCustomFunction() { - assertThatThrownBy( - () -> - app.workflowDefinition( - readWorkflowFromClasspath( - "workflows-samples/call-custom-function-inline.yaml")) - .instance(Map.of()) - .start() - .join()) - .hasCauseInstanceOf(WorkflowException.class) - .extracting(w -> ((WorkflowException) w.getCause()).getWorkflowError().status()) - .isEqualTo(404); - } - - @ParameterizedTest - @ValueSource( - strings = { - "workflows-samples/call-custom-function-cataloged.yaml", - "workflows-samples/call-custom-function-cataloged-global.yaml" - }) - void testCustomCatalogFunction(String fileName) throws IOException { - assertThatThrownBy( - () -> - app.workflowDefinition(readWorkflowFromClasspath(fileName)) - .instance(Map.of()) - .start() - .join()) - .isInstanceOf(IllegalStateException.class) - .hasMessageContaining("No script runner implementation found for language PYTHON"); - } } diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/RunScriptJavaScriptTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/RunScriptJavaScriptTest.java index 24832ebf..7ec221c6 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/RunScriptJavaScriptTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/RunScriptJavaScriptTest.java @@ -15,11 +15,13 @@ */ package io.serverlessworkflow.impl.test; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + import io.serverlessworkflow.api.WorkflowReader; import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowException; import io.serverlessworkflow.impl.WorkflowModel; -import io.serverlessworkflow.impl.executors.ProcessResult; import java.io.IOException; import java.util.Map; import okhttp3.mockwebserver.MockResponse; @@ -126,13 +128,9 @@ void testFunctionThrowingError() throws IOException { WorkflowReader.readWorkflowFromClasspath( "workflows-samples/run-script/function-with-throw.yaml"); try (WorkflowApplication appl = WorkflowApplication.builder().build()) { - WorkflowModel model = appl.workflowDefinition(workflow).instance(Map.of()).start().join(); - - SoftAssertions.assertSoftly( - softly -> { - softly.assertThat(model.asText()).isPresent(); - softly.assertThat(model.asText().get()).isEqualTo("Error: This is a test error"); - }); + assertThatThrownBy(() -> appl.workflowDefinition(workflow).instance(Map.of()).start().join()) + .hasCauseInstanceOf(WorkflowException.class) + .hasMessageContaining("test error"); } } @@ -142,15 +140,9 @@ void testFunctionThrowingErrorAndReturnAll() throws IOException { WorkflowReader.readWorkflowFromClasspath( "workflows-samples/run-script/function-with-throw-all.yaml"); try (WorkflowApplication appl = WorkflowApplication.builder().build()) { - WorkflowModel model = appl.workflowDefinition(workflow).instance(Map.of()).start().join(); - - SoftAssertions.assertSoftly( - softly -> { - ProcessResult r = model.as(ProcessResult.class).orElseThrow(); - softly.assertThat(r.stderr()).isEqualTo("Error: This is a test error"); - softly.assertThat(r.stdout()).isEqualTo("logged before the 'throw' statement"); - softly.assertThat(r.code()).isEqualTo(0); - }); + assertThatThrownBy(() -> appl.workflowDefinition(workflow).instance(Map.of()).start().join()) + .hasCauseInstanceOf(WorkflowException.class) + .hasMessageContaining("test error"); } } diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/RunShellExecutorTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/RunShellExecutorTest.java index 1e80ea50..073a0bf6 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/RunShellExecutorTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/RunShellExecutorTest.java @@ -183,7 +183,7 @@ void testNone() throws IOException { SoftAssertions.assertSoftly( softly -> { - softly.assertThat(outputModel.asJavaObject()).isNull(); + softly.assertThat(outputModel.asJavaObject()).isEqualTo(Map.of()); }); } } diff --git a/impl/test/src/test/resources/workflows-samples/call-custom-function-cataloged.yaml b/impl/test/src/test/resources/workflows-samples/call-custom-function-cataloged.yaml index 43fb634e..80a3a4dc 100644 --- a/impl/test/src/test/resources/workflows-samples/call-custom-function-cataloged.yaml +++ b/impl/test/src/test/resources/workflows-samples/call-custom-function-cataloged.yaml @@ -5,7 +5,7 @@ document: version: '0.1.0' do: - log: - call: https://raw.githubusercontent.com/serverlessworkflow/catalog/main/functions/log/1.0.0/function.yaml + call: https://raw.githubusercontent.com/serverlessworkflow/catalog/refs/heads/main/functions/log/1.0.0/function.yaml with: message: Hello, world! level: information