Skip to content

Commit

Permalink
fix(core): add outputs to WorkingDirectory (#3693)
Browse files Browse the repository at this point in the history
Fix: #3693
  • Loading branch information
fhussonnois committed May 13, 2024
1 parent dc4290e commit 71b231e
Show file tree
Hide file tree
Showing 10 changed files with 129 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ default Optional<State.Type> resolveState(RunContext runContext, Execution execu
);
}

default T outputs(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
default T outputs(final RunContext runContext) throws Exception {
return null;
}
}
38 changes: 23 additions & 15 deletions core/src/main/java/io/kestra/core/runners/ExecutorService.java
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,18 @@ private Optional<WorkerTaskResult> childWorkerTaskResult(Flow flow, Execution ex
);

if (endedTask.isPresent()) {
WorkerTaskResult workerTaskResult = endedTask.get();
if (workerTaskResult.getTaskRun().getState().isTerminated()) {
try {
Output outputs = flowableParent.outputs(runContext);
return Optional.of(new WorkerTaskResult(workerTaskResult
.getTaskRun()
.withOutputs(outputs != null ? outputs.toMap() : ImmutableMap.of()))
);
} catch (Exception e) {
runContext.logger().error("Unable to resolve outputs from the Flowable task: {}", e.getMessage(), e);
}
}
return endedTask;
}

Expand Down Expand Up @@ -289,10 +301,10 @@ private List<TaskRun> childNextsTaskRun(Executor executor, TaskRun parentTaskRun
);

if (!nexts.isEmpty()) {
// TODO - saveFlowableOutput seems to be useless
return this.saveFlowableOutput(
nexts,
executor,
parentTaskRun
executor
);
}
} catch (Exception e) {
Expand All @@ -305,8 +317,7 @@ private List<TaskRun> childNextsTaskRun(Executor executor, TaskRun parentTaskRun

private List<TaskRun> saveFlowableOutput(
List<NextTaskRun> nextTaskRuns,
Executor executor,
TaskRun parentTaskRun
Executor executor
) {
return nextTaskRuns
.stream()
Expand All @@ -326,7 +337,7 @@ private List<TaskRun> saveFlowableOutput(
t.getTaskRun()
);

Output outputs = flowableTask.outputs(runContext, executor.getExecution(), parentTaskRun);
Output outputs = flowableTask.outputs(runContext);
taskRun = taskRun.withOutputs(outputs != null ? outputs.toMap() : ImmutableMap.of());
} catch (Exception e) {
executor.getFlow().logger().warn("Unable to save output on taskRun '{}'", taskRun, e);
Expand Down Expand Up @@ -404,7 +415,8 @@ private Executor handleNext(Executor executor) {
}

return executor.withTaskRun(
this.saveFlowableOutput(nextTaskRuns, executor, null),
// TODO - saveFlowableOutput seems to be only useful for Template
this.saveFlowableOutput(nextTaskRuns, executor),
"handleNext"
);
}
Expand Down Expand Up @@ -606,15 +618,11 @@ private Executor handleListeners(Executor executor) {
}

List<ResolvedTask> currentTasks = conditionService.findValidListeners(executor.getFlow(), executor.getExecution());

List<TaskRun> nexts = this.saveFlowableOutput(
FlowableUtils.resolveSequentialNexts(
executor.getExecution(),
currentTasks
),
executor,
null
);

List<TaskRun> nexts = FlowableUtils.resolveSequentialNexts(executor.getExecution(), currentTasks)
.stream()
.map(throwFunction(NextTaskRun::getTaskRun))
.toList();

if (nexts.isEmpty()) {
return executor;
Expand Down
46 changes: 27 additions & 19 deletions core/src/main/java/io/kestra/core/runners/RunContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ public URI getContextStorageURI() {
);
this.pluginConfiguration = Collections.emptyMap();
}

private RunContext(final ApplicationContext context) {
this.applicationContext = context;
this.initBean(context);
}

private void initPluginConfiguration(ApplicationContext applicationContext, String plugin) {
this.pluginConfiguration = applicationContext.findBean(PluginConfigurations.class)
Expand Down Expand Up @@ -170,7 +175,10 @@ private void initContext(Flow flow, Task task, Execution execution, TaskRun task

private void initContext(Flow flow, Task task, Execution execution, TaskRun taskRun, boolean decryptVariables) {
this.variables = this.variables(flow, task, execution, taskRun, null, decryptVariables);

initStorage(taskRun);
}

private void initStorage(TaskRun taskRun) {
if (taskRun != null && this.storageInterface != null) {
this.storage = new InternalStorage(
logger(),
Expand All @@ -179,7 +187,7 @@ private void initContext(Flow flow, Task task, Execution execution, TaskRun task
);
}
}

@SuppressWarnings("unchecked")
private void initLogger(TaskRun taskRun, Task task) {
this.runContextLogger = new RunContextLogger(
Expand Down Expand Up @@ -498,13 +506,10 @@ public RunContext forScheduler(TriggerContext triggerContext, AbstractTrigger tr
@SuppressWarnings("unchecked")
public RunContext forWorker(ApplicationContext applicationContext, WorkerTask workerTask) {
this.initBean(applicationContext);

final TaskRun taskRun = workerTask.getTaskRun();

this.initLogger(taskRun, workerTask.getTask());

Map<String, Object> clone = new HashMap<>(this.variables);


final Map<String, Object> clone = new HashMap<>(this.variables);
clone.remove("taskrun");
clone.put("taskrun", this.variables(taskRun));

Expand All @@ -522,11 +527,14 @@ public RunContext forWorker(ApplicationContext applicationContext, WorkerTask wo
}

clone.put("addSecretConsumer", (Consumer<String>) s -> runContextLogger.usedSecret(s));

this.variables = ImmutableMap.copyOf(clone);
this.storage = new InternalStorage(logger(), StorageContext.forTask(taskRun), storageInterface);
this.initPluginConfiguration(applicationContext, workerTask.getTask().getType());
return this;

final RunContext newContext = new RunContext(applicationContext);
newContext.variables = ImmutableMap.copyOf(clone);
newContext.temporaryDirectory = this.tempDir();
newContext.initLogger(taskRun, workerTask.getTask());
newContext.initStorage(taskRun);
newContext.initPluginConfiguration(applicationContext, workerTask.getTask().getType());
return newContext;
}

public RunContext forWorker(ApplicationContext applicationContext, WorkerTrigger workerTrigger) {
Expand All @@ -542,15 +550,15 @@ public RunContext forWorker(ApplicationContext applicationContext, WorkerTrigger
}

public RunContext forWorkingDirectory(ApplicationContext applicationContext, WorkerTask workerTask) {
forWorker(applicationContext, workerTask);

Map<String, Object> clone = new HashMap<>(this.variables);
RunContext newContext = forWorker(applicationContext, workerTask);
Map<String, Object> clone = new HashMap<>(newContext.variables);

clone.put("workerTaskrun", clone.get("taskrun"));

newContext.variables = ImmutableMap.copyOf(clone);

this.variables = ImmutableMap.copyOf(clone);

return this;
return newContext;
}

public RunContext forTaskRunner(TaskRunner taskRunner) {
Expand Down
22 changes: 13 additions & 9 deletions core/src/main/java/io/kestra/core/runners/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.Label;
import io.kestra.core.models.executions.*;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.PollingTriggerInterface;
Expand Down Expand Up @@ -258,14 +259,18 @@ private void handleTask(WorkerTask workerTask) {
if (workerTask.getTask() instanceof RunnableTask) {
this.run(workerTask, true);
} else if (workerTask.getTask() instanceof WorkingDirectory workingDirectory) {
RunContext runContext = workerTask.getRunContext().forWorkingDirectory(applicationContext, workerTask);


final RunContext workingDirectoryRunContext = workerTask
.getRunContext()
.forWorkingDirectory(applicationContext, workerTask);

RunContext runContext = workingDirectoryRunContext;
try {
// preExecuteTasks
try {
workingDirectory.preExecuteTasks(runContext, workerTask.getTaskRun());
workingDirectory.preExecuteTasks(workingDirectoryRunContext, workerTask.getTaskRun());
} catch (Exception e) {
runContext.logger().error("Failed preExecuteTasks on WorkingDirectory: {}", e.getMessage(), e);
workingDirectoryRunContext.logger().error("Failed preExecuteTasks on WorkingDirectory: {}", e.getMessage(), e);
workerTask = workerTask.fail();
this.workerTaskResultQueue.emit(new WorkerTaskResult(workerTask));
this.logTerminated(workerTask);
Expand All @@ -292,18 +297,17 @@ private void handleTask(WorkerTask workerTask) {

runContext = runContext.updateVariables(workerTaskResult, workerTask.getTaskRun());
}

// postExecuteTasks
try {
workingDirectory.postExecuteTasks(runContext, workerTask.getTaskRun());
workingDirectory.postExecuteTasks(workingDirectoryRunContext, workerTask.getTaskRun());
} catch (Exception e) {
runContext.logger().error("Failed postExecuteTasks on WorkingDirectory: {}", e.getMessage(), e);
workingDirectoryRunContext.logger().error("Failed postExecuteTasks on WorkingDirectory: {}", e.getMessage(), e);
workerTask = workerTask.fail();
this.workerTaskResultQueue.emit(new WorkerTaskResult(workerTask));
this.logTerminated(workerTask);
return;
}
} finally {
this.logTerminated(workerTask);
runContext.cleanup();
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public static Flux<Object> readAll(InputStream in) throws IOException {
public static <T> Flux<T> readAll(InputStream in, TypeReference<T> type) throws IOException {
return readAll(DEFAULT_OBJECT_MAPPER, in, type);
}

public static Flux<Object> readAll(ObjectMapper objectMapper, InputStream in) throws IOException {
return readAll(objectMapper, in, DEFAULT_TYPE_REFERENCE);
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/io/kestra/core/tasks/flows/Switch.java
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ public List<NextTaskRun> resolveNexts(RunContext runContext, Execution execution
}

@Override
public Switch.Output outputs(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
public Switch.Output outputs(RunContext runContext) throws IllegalVariableEvaluationException {
return Output.builder()
.value(rendererValue(runContext))
.defaults(cases
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public List<NextTaskRun> resolveNexts(RunContext runContext, Execution execution
}

@Override
public Template.Output outputs(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
public Template.Output outputs(RunContext runContext) throws IllegalVariableEvaluationException {
Output.OutputBuilder builder = Output.builder();

if (this.args != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.kestra.core.tasks.flows;

import com.fasterxml.jackson.core.type.TypeReference;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
Expand All @@ -11,22 +12,28 @@
import io.kestra.core.models.tasks.InputFilesInterface;
import io.kestra.core.models.tasks.NamespaceFiles;
import io.kestra.core.models.tasks.NamespaceFilesInterface;
import io.kestra.core.models.tasks.Output;
import io.kestra.core.models.tasks.OutputFilesInterface;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.VoidOutput;
import io.kestra.core.runners.FilesService;
import io.kestra.core.runners.NamespaceFilesService;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.WorkerTask;
import io.kestra.core.serializers.FileSerde;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.validations.WorkingDirectoryTaskValidation;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.file.FileSystem;
import java.nio.file.FileSystems;
Expand Down Expand Up @@ -190,7 +197,9 @@ with open('output.json', 'w') as output_file:
)
@WorkingDirectoryTaskValidation
public class WorkingDirectory extends Sequential implements NamespaceFilesInterface, InputFilesInterface, OutputFilesInterface {


private static final String OUTPUTS_FILE = "outputs.ion";

@Schema(
title = "Cache configuration.",
description = """
Expand All @@ -206,9 +215,9 @@ public class WorkingDirectory extends Sequential implements NamespaceFilesInterf
@Getter(AccessLevel.PRIVATE)
@Builder.Default
private transient long cacheDownloadedTime = 0L;

private Object inputFiles;

private List<String> outputFiles;

@Override
Expand Down Expand Up @@ -273,23 +282,29 @@ public void preExecuteTasks(RunContext runContext, TaskRun taskRun) throws Excep
NamespaceFilesService namespaceFilesService = runContext.getApplicationContext().getBean(NamespaceFilesService.class);
namespaceFilesService.inject(runContext, taskRun.getTenantId(), taskRun.getNamespace(), runContext.tempDir(), this.namespaceFiles);
}

if (this.inputFiles != null) {
FilesService.inputFiles(runContext, Map.of(), this.inputFiles);
}
}

public void postExecuteTasks(RunContext runContext, TaskRun taskRun) throws Exception {

if (this.outputFiles != null) {
try {
FilesService.outputFiles(runContext, this.outputFiles);
Map<String, URI> outputFilesURIs = FilesService.outputFiles(runContext, this.outputFiles);
if (!outputFilesURIs.isEmpty()) {
final ByteArrayOutputStream os = new ByteArrayOutputStream();
try (os) {
FileSerde.write(os, outputFilesURIs);
}
runContext.storage().putFile(new ByteArrayInputStream(os.toByteArray()), OUTPUTS_FILE);
}
} catch (Exception e) {
runContext.logger().error("Unable to capture WorkingDirectory output files", e);
throw e;
}
}

if (cache == null) {
return;
}
Expand Down Expand Up @@ -357,9 +372,35 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attribs) throws
} catch (IOException e) {
runContext.logger().error("Unable to execute WorkingDirectory post actions", e);
}

}


@Override
public Outputs outputs(final RunContext runContext) throws IOException {
URI uri = URI.create("kestra://" + runContext.storage().getContextBaseURI() + "/").resolve(OUTPUTS_FILE);
if (!runContext.storage().isFileExist(uri)) {
return null;
}

try(InputStream is = runContext.storage().getFile(uri)) {
Map<String, URI> outputs = FileSerde
.readAll(is, new TypeReference<Map<String, URI>>() {})
.blockFirst();
return new Outputs(outputs);
}
}

@Getter
public static class Outputs extends VoidOutput {
@Schema(
title = "The URIs for output files."
)
private final Map<String, URI> outputsFiles;

public Outputs(final Map<String, URI> outputsFiles) {
this.outputsFiles = outputsFiles;
}
}

@SuperBuilder
@ToString
@EqualsAndHashCode
Expand Down

0 comments on commit 71b231e

Please sign in to comment.