Skip to content

Commit 643460f

Browse files
authored
[Fix #987] Refactor shell impl (#990)
[Fix #897] Fixing flaky test Signed-off-by: fjtirado <ftirados@redhat.com>
1 parent bf52277 commit 643460f

File tree

5 files changed

+145
-155
lines changed

5 files changed

+145
-155
lines changed

impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunScriptExecutor.java

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,10 @@
3939
public class RunScriptExecutor implements RunnableTask<RunScript> {
4040

4141
private Optional<WorkflowValueResolver<Map<String, Object>>> environmentExpr;
42-
4342
private Optional<WorkflowValueResolver<Map<String, Object>>> argumentExpr;
44-
4543
private WorkflowValueResolver<String> codeSupplier;
4644
private boolean isAwait;
47-
private RunTaskConfiguration.ProcessReturnType returnType;
45+
private Optional<RunTaskConfiguration.ProcessReturnType> returnType;
4846
private ScriptRunner taskRunner;
4947

5048
@Override
@@ -65,7 +63,7 @@ public void init(RunScript taskConfiguration, WorkflowDefinition definition) {
6563

6664
this.isAwait = taskConfiguration.isAwait();
6765

68-
this.returnType = taskConfiguration.getReturn();
66+
this.returnType = Optional.ofNullable(taskConfiguration.getReturn());
6967

7068
WorkflowApplication application = definition.application();
7169
this.environmentExpr =
@@ -102,22 +100,24 @@ public void init(RunScript taskConfiguration, WorkflowDefinition definition) {
102100
@Override
103101
public CompletableFuture<WorkflowModel> apply(
104102
WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) {
105-
return CompletableFuture.supplyAsync(
106-
() ->
107-
taskRunner.runScript(
108-
new ScriptContext(
109-
argumentExpr
110-
.map(m -> m.apply(workflowContext, taskContext, input))
111-
.orElse(Map.of()),
112-
environmentExpr
113-
.map(m -> m.apply(workflowContext, taskContext, input))
114-
.orElse(Map.of()),
115-
codeSupplier.apply(workflowContext, taskContext, input),
116-
isAwait,
117-
returnType),
118-
workflowContext,
119-
taskContext,
120-
input));
103+
ScriptContext scriptContext =
104+
new ScriptContext(
105+
argumentExpr.map(m -> m.apply(workflowContext, taskContext, input)).orElse(Map.of()),
106+
environmentExpr.map(m -> m.apply(workflowContext, taskContext, input)).orElse(Map.of()),
107+
codeSupplier.apply(workflowContext, taskContext, input),
108+
returnType);
109+
if (isAwait) {
110+
return CompletableFuture.supplyAsync(
111+
() -> taskRunner.runScript(scriptContext, workflowContext, taskContext, input),
112+
workflowContext.definition().application().executorService());
113+
} else {
114+
workflowContext
115+
.definition()
116+
.application()
117+
.executorService()
118+
.submit(() -> taskRunner.runScript(scriptContext, workflowContext, taskContext, input));
119+
return CompletableFuture.completedFuture(input);
120+
}
121121
}
122122

123123
@Override

impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunShellExecutor.java

Lines changed: 109 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -17,141 +17,142 @@
1717

1818
import io.serverlessworkflow.api.types.RunShell;
1919
import io.serverlessworkflow.api.types.RunTaskConfiguration;
20+
import io.serverlessworkflow.api.types.RunTaskConfiguration.ProcessReturnType;
2021
import io.serverlessworkflow.api.types.Shell;
2122
import io.serverlessworkflow.impl.TaskContext;
22-
import io.serverlessworkflow.impl.WorkflowApplication;
2323
import io.serverlessworkflow.impl.WorkflowContext;
2424
import io.serverlessworkflow.impl.WorkflowDefinition;
25-
import io.serverlessworkflow.impl.WorkflowError;
26-
import io.serverlessworkflow.impl.WorkflowException;
2725
import io.serverlessworkflow.impl.WorkflowModel;
2826
import io.serverlessworkflow.impl.WorkflowModelFactory;
2927
import io.serverlessworkflow.impl.WorkflowUtils;
30-
import io.serverlessworkflow.impl.expressions.ExpressionUtils;
28+
import io.serverlessworkflow.impl.WorkflowValueResolver;
3129
import java.io.IOException;
30+
import java.io.UncheckedIOException;
3231
import java.nio.charset.StandardCharsets;
32+
import java.util.LinkedHashMap;
3333
import java.util.Map;
34+
import java.util.Optional;
3435
import java.util.concurrent.CompletableFuture;
36+
import java.util.stream.Collectors;
3537

3638
public class RunShellExecutor implements RunnableTask<RunShell> {
3739

38-
private ShellResultSupplier shellResultSupplier;
39-
private ProcessBuilderSupplier processBuilderSupplier;
40+
private WorkflowValueResolver<String> shellCommand;
41+
private Map<WorkflowValueResolver<String>, Optional<WorkflowValueResolver<String>>>
42+
shellArguments;
43+
private Optional<WorkflowValueResolver<Map<String, Object>>> shellEnv;
44+
private Optional<ProcessReturnType> returnType;
4045

41-
@FunctionalInterface
42-
private interface ShellResultSupplier {
43-
WorkflowModel apply(
44-
TaskContext taskContext, WorkflowModel input, ProcessBuilder processBuilder);
45-
}
46+
@Override
47+
public CompletableFuture<WorkflowModel> apply(
48+
WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel model) {
49+
StringBuilder commandBuilder =
50+
new StringBuilder(shellCommand.apply(workflowContext, taskContext, model));
51+
for (var entry : shellArguments.entrySet()) {
52+
commandBuilder.append(" ").append(entry.getKey().apply(workflowContext, taskContext, model));
53+
entry
54+
.getValue()
55+
.ifPresent(
56+
v -> commandBuilder.append("=").append(v.apply(workflowContext, taskContext, model)));
57+
}
4658

47-
@FunctionalInterface
48-
private interface ProcessBuilderSupplier {
49-
ProcessBuilder apply(WorkflowContext workflowContext, TaskContext taskContext);
59+
ProcessBuilder builder = new ProcessBuilder("sh", "-c", commandBuilder.toString());
60+
shellEnv.ifPresent(
61+
map -> {
62+
for (Map.Entry<String, Object> entry :
63+
map.apply(workflowContext, taskContext, model).entrySet()) {
64+
builder.environment().put(entry.getKey(), (String) entry.getValue());
65+
}
66+
});
67+
68+
return returnType
69+
.map(
70+
type ->
71+
CompletableFuture.supplyAsync(
72+
() ->
73+
buildResultFromProcess(
74+
workflowContext.definition().application().modelFactory(),
75+
uncheckedStart(builder),
76+
type)
77+
.orElse(model),
78+
workflowContext.definition().application().executorService()))
79+
.orElseGet(
80+
() -> {
81+
workflowContext
82+
.definition()
83+
.application()
84+
.executorService()
85+
.submit(() -> uncheckedStart(builder));
86+
return CompletableFuture.completedFuture(model);
87+
});
5088
}
5189

52-
@Override
53-
public CompletableFuture<WorkflowModel> apply(
54-
WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) {
55-
ProcessBuilder processBuilder = this.processBuilderSupplier.apply(workflowContext, taskContext);
56-
return CompletableFuture.supplyAsync(
57-
() -> this.shellResultSupplier.apply(taskContext, input, processBuilder));
90+
private Process uncheckedStart(ProcessBuilder builder) {
91+
try {
92+
return builder.start();
93+
} catch (IOException e) {
94+
throw new UncheckedIOException(e);
95+
}
5896
}
5997

6098
@Override
6199
public void init(RunShell taskConfiguration, WorkflowDefinition definition) {
62100
Shell shell = taskConfiguration.getShell();
63-
final String shellCommand = shell.getCommand();
64-
65-
if (shellCommand == null || shellCommand.isBlank()) {
101+
if (!WorkflowUtils.isValid(taskConfiguration.getShell().getCommand())) {
66102
throw new IllegalStateException("Missing shell command in RunShell task configuration");
67103
}
68-
this.processBuilderSupplier =
69-
(workflowContext, taskContext) -> {
70-
WorkflowApplication application = definition.application();
71-
72-
StringBuilder commandBuilder =
73-
new StringBuilder(
74-
ExpressionUtils.isExpr(shellCommand)
75-
? WorkflowUtils.buildStringFilter(application, shellCommand)
76-
.apply(workflowContext, taskContext, taskContext.input())
77-
: shellCommand);
78-
79-
if (shell.getArguments() != null
80-
&& shell.getArguments().getAdditionalProperties() != null) {
81-
for (Map.Entry<String, Object> entry :
82-
shell.getArguments().getAdditionalProperties().entrySet()) {
83-
commandBuilder
84-
.append(" ")
85-
.append(
86-
ExpressionUtils.isExpr(entry.getKey())
87-
? WorkflowUtils.buildStringFilter(application, entry.getKey())
88-
.apply(workflowContext, taskContext, taskContext.input())
89-
: entry.getKey());
90-
if (entry.getValue() != null) {
91-
92-
commandBuilder
93-
.append("=")
94-
.append(
95-
ExpressionUtils.isExpr(entry.getValue())
96-
? WorkflowUtils.buildStringFilter(
97-
application, entry.getValue().toString())
98-
.apply(workflowContext, taskContext, taskContext.input())
99-
: entry.getValue().toString());
100-
}
101-
}
102-
}
103-
104-
// TODO: support Windows cmd.exe
105-
ProcessBuilder builder = new ProcessBuilder("sh", "-c", commandBuilder.toString());
106-
if (shell.getEnvironment() != null
107-
&& shell.getEnvironment().getAdditionalProperties() != null) {
108-
for (Map.Entry<String, Object> entry :
109-
shell.getEnvironment().getAdditionalProperties().entrySet()) {
110-
String value =
111-
ExpressionUtils.isExpr(entry.getValue())
112-
? WorkflowUtils.buildStringFilter(application, entry.getValue().toString())
113-
.apply(workflowContext, taskContext, taskContext.input())
114-
: entry.getValue().toString();
115-
116-
// configure environments
117-
builder.environment().put(entry.getKey(), value);
118-
}
119-
}
120-
return builder;
121-
};
122-
123-
this.shellResultSupplier =
124-
(taskContext, input, processBuilder) -> {
125-
try {
126-
Process process = processBuilder.start();
127-
return taskConfiguration.isAwait()
128-
? buildResultFromProcess(taskConfiguration, definition, process)
129-
: input;
130-
} catch (IOException | InterruptedException e) {
131-
throw new WorkflowException(WorkflowError.runtime(taskContext, e).build(), e);
132-
}
133-
};
104+
shellCommand =
105+
WorkflowUtils.buildStringFilter(
106+
definition.application(), taskConfiguration.getShell().getCommand());
107+
108+
shellArguments =
109+
shell.getArguments() != null && shell.getArguments().getAdditionalProperties() != null
110+
? shell.getArguments().getAdditionalProperties().entrySet().stream()
111+
.collect(
112+
Collectors.toMap(
113+
e -> WorkflowUtils.buildStringFilter(definition.application(), e.getKey()),
114+
e ->
115+
e.getValue() != null
116+
? Optional.of(
117+
WorkflowUtils.buildStringFilter(
118+
definition.application(), e.getValue().toString()))
119+
: Optional.empty(),
120+
(x, y) -> y,
121+
LinkedHashMap::new))
122+
: Map.of();
123+
124+
shellEnv =
125+
shell.getEnvironment() != null && shell.getEnvironment().getAdditionalProperties() != null
126+
? Optional.of(
127+
WorkflowUtils.buildMapResolver(
128+
definition.application(), shell.getEnvironment().getAdditionalProperties()))
129+
: Optional.empty();
130+
131+
returnType =
132+
taskConfiguration.isAwait() ? Optional.of(taskConfiguration.getReturn()) : Optional.empty();
134133
}
135134

136-
/**
137-
* Builds the WorkflowModel result from the executed process. It waits for the process to finish
138-
* and captures the exit code, stdout, and stderr based on the task configuration.
139-
*/
140-
private WorkflowModel buildResultFromProcess(
141-
RunShell taskConfiguration, WorkflowDefinition definition, Process process)
142-
throws IOException, InterruptedException {
143-
int exitCode = process.waitFor();
144-
String stdout = new String(process.getInputStream().readAllBytes(), StandardCharsets.UTF_8);
145-
String stderr = new String(process.getErrorStream().readAllBytes(), StandardCharsets.UTF_8);
146-
147-
WorkflowModelFactory modelFactory = definition.application().modelFactory();
148-
return switch (taskConfiguration.getReturn()) {
149-
case ALL -> modelFactory.fromAny(new ProcessResult(exitCode, stdout.trim(), stderr.trim()));
150-
case NONE -> modelFactory.fromNull();
151-
case CODE -> modelFactory.from(exitCode);
152-
case STDOUT -> modelFactory.from(stdout.trim());
153-
case STDERR -> modelFactory.from(stderr.trim());
154-
};
135+
private static Optional<WorkflowModel> buildResultFromProcess(
136+
WorkflowModelFactory modelFactory, Process process, ProcessReturnType type) {
137+
try {
138+
int exitCode = process.waitFor();
139+
String stdout = new String(process.getInputStream().readAllBytes(), StandardCharsets.UTF_8);
140+
String stderr = new String(process.getErrorStream().readAllBytes(), StandardCharsets.UTF_8);
141+
return Optional.of(
142+
switch (type) {
143+
case ALL ->
144+
modelFactory.fromAny(new ProcessResult(exitCode, stdout.trim(), stderr.trim()));
145+
case NONE -> modelFactory.fromNull();
146+
case CODE -> modelFactory.from(exitCode);
147+
case STDOUT -> modelFactory.from(stdout.trim());
148+
case STDERR -> modelFactory.from(stderr.trim());
149+
});
150+
} catch (IOException e) {
151+
throw new UncheckedIOException(e);
152+
} catch (InterruptedException e) {
153+
Thread.currentThread().interrupt();
154+
return Optional.empty();
155+
}
155156
}
156157

157158
@Override

impl/core/src/main/java/io/serverlessworkflow/impl/scripts/ScriptContext.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@
1717

1818
import io.serverlessworkflow.api.types.RunTaskConfiguration;
1919
import java.util.Map;
20+
import java.util.Optional;
2021

2122
public record ScriptContext(
2223
Map<String, Object> args,
2324
Map<String, Object> envs,
2425
String code,
25-
boolean isAwait,
26-
RunTaskConfiguration.ProcessReturnType returnType) {}
26+
Optional<RunTaskConfiguration.ProcessReturnType> returnType) {}

impl/script-js/src/main/java/io/serverlessworkflow/impl/executors/script/js/JavaScriptScriptTaskRunner.java

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ public WorkflowModel runScript(
5555
WorkflowApplication application = workflowContext.definition().application();
5656
ByteArrayOutputStream stderr = new ByteArrayOutputStream();
5757
ByteArrayOutputStream stdout = new ByteArrayOutputStream();
58-
5958
try (Context ctx =
6059
Context.newBuilder()
6160
.err(stderr)
@@ -71,29 +70,26 @@ public WorkflowModel runScript(
7170
(key, val) -> {
7271
ctx.getBindings(identifier().getLang()).putMember(key, val);
7372
});
74-
7573
configureProcessEnv(ctx, script.envs());
76-
77-
if (!script.isAwait()) {
78-
application
79-
.executorService()
80-
.submit(
81-
() -> {
82-
ctx.eval(identifier().getLang(), script.code());
83-
});
84-
return application.modelFactory().fromAny(input);
85-
}
86-
8774
ctx.eval(Source.create(identifier().getLang(), script.code()));
88-
89-
return modelFromOutput(
90-
script.returnType(), application.modelFactory(), stdout, () -> stderr.toString());
75+
return script
76+
.returnType()
77+
.map(
78+
type ->
79+
modelFromOutput(
80+
type, application.modelFactory(), stdout, () -> stderr.toString()))
81+
.orElse(input);
9182
} catch (PolyglotException e) {
9283
if (e.getExitStatus() != 0 || e.isSyntaxError()) {
9384
throw new WorkflowException(WorkflowError.runtime(taskContext, e).build());
9485
} else {
95-
return modelFromOutput(
96-
script.returnType(), application.modelFactory(), stdout, () -> buildStderr(e, stderr));
86+
return script
87+
.returnType()
88+
.map(
89+
type ->
90+
modelFromOutput(
91+
type, application.modelFactory(), stdout, () -> buildStderr(e, stderr)))
92+
.orElse(input);
9793
}
9894
}
9995
}

0 commit comments

Comments
 (0)