Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 33 additions & 11 deletions src/main/java/io/kestra/plugin/dbt/cli/AbstractDbt.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@
import io.kestra.core.models.tasks.*;
import io.kestra.core.models.tasks.runners.AbstractLogConsumer;
import io.kestra.core.models.tasks.runners.ScriptService;
import io.kestra.core.models.tasks.runners.TaskRunner;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.dbt.ResultParser;
import io.kestra.plugin.scripts.exec.scripts.models.DockerOptions;
import io.kestra.plugin.scripts.exec.scripts.models.RunnerType;
import io.kestra.plugin.scripts.exec.scripts.models.ScriptOutput;
import io.kestra.plugin.scripts.exec.scripts.runners.CommandsWrapper;
import io.kestra.plugin.scripts.runner.docker.Docker;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.Valid;
import lombok.*;
import lombok.experimental.SuperBuilder;
import org.apache.commons.io.FileUtils;
Expand All @@ -24,16 +27,18 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import jakarta.validation.constraints.NotNull;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
public abstract class AbstractDbt extends Task implements RunnableTask<ScriptOutput>, NamespaceFilesInterface, InputFilesInterface, OutputFilesInterface {
private static final String DEFAULT_IMAGE = "ghcr.io/kestra-io/dbt";

@Builder.Default
@Schema(
title = "Stop execution at the first failure."
Expand Down Expand Up @@ -79,24 +84,39 @@ public abstract class AbstractDbt extends Task implements RunnableTask<ScriptOut
@PluginProperty(dynamic = true)
private String profiles;

// set RunnerType to PROCESS to keep backward compatibility as the old script engine has PROCESS by default and the new DOCKER
@Schema(
title = "The task runner to use.",
description = """
Task runners are provided by plugins, each have their own properties.
If you change from the default one, be careful to also configure the entrypoint to an empty list if needed."""
)
@PluginProperty
@Builder.Default
@Valid
protected TaskRunner taskRunner = Docker.builder()
.type(Docker.class.getName())
.entryPoint(Collections.emptyList())
.build();

@Schema(title = "The task runner container image, only used if the task runner is container-based.")
@PluginProperty(dynamic = true)
@Builder.Default
protected String containerImage = DEFAULT_IMAGE;

@Schema(
title = "Runner to use"
title = "The runner type.",
description = "Deprecated, use 'taskRunner' instead."
)
@Deprecated
@PluginProperty
@NotNull
protected RunnerType runner = RunnerType.PROCESS;
protected RunnerType runner;

@Schema(
title = "Docker options for the `DOCKER` runner"
title = "Deprecated, use 'taskRunner' instead"
)
@PluginProperty
@Builder.Default
private DockerOptions docker = DockerOptions.builder()
.image("ghcr.io/kestra-io/dbt")
.entryPoint(List.of())
.build();
@Deprecated
private DockerOptions docker;

@Schema(title = "Deprecated, use the `docker` property instead", deprecated = true)
@PluginProperty
Expand Down Expand Up @@ -143,6 +163,8 @@ public ScriptOutput run(RunContext runContext) throws Exception {
.withOutputFiles(outputFiles)
.withRunnerType(this.getRunner())
.withDockerOptions(this.getDocker())
.withContainerImage(this.containerImage)
.withTaskRunner(this.taskRunner)
.withLogConsumer(new AbstractLogConsumer() {
@Override
public void accept(String line, Boolean isStdErr) {
Expand Down
29 changes: 21 additions & 8 deletions src/main/java/io/kestra/plugin/dbt/cli/DbtCLI.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.tasks.runners.AbstractLogConsumer;
import io.kestra.core.models.tasks.runners.ScriptService;
import io.kestra.core.models.tasks.runners.TaskRunner;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.plugin.dbt.ResultParser;
import io.kestra.plugin.scripts.exec.AbstractExecScript;
import io.kestra.plugin.scripts.exec.scripts.models.DockerOptions;
import io.kestra.plugin.scripts.exec.scripts.models.ScriptOutput;
import io.kestra.plugin.scripts.exec.scripts.runners.CommandsWrapper;
import io.kestra.plugin.scripts.runner.docker.Docker;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.Valid;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
Expand Down Expand Up @@ -149,14 +152,6 @@ public class DbtCLI extends AbstractExecScript {
@PluginProperty(dynamic = true)
private String projectDir;

@Schema(
title = "Docker options for the `DOCKER` runner.",
defaultValue = "{image=" + DEFAULT_IMAGE + ", pullPolicy=ALWAYS}"
)
@PluginProperty
@Builder.Default
private DockerOptions docker = DockerOptions.builder().build();

@Builder.Default
@Schema(
title = "Parse run result.",
Expand All @@ -165,11 +160,29 @@ public class DbtCLI extends AbstractExecScript {
@PluginProperty
protected Boolean parseRunResults = true;

@Schema(
title = "The task runner to use.",
description = """
Task runners are provided by plugins, each have their own properties.
If you change from the default one, be careful to also configure the entrypoint to an empty list if needed."""
)
@PluginProperty
@Builder.Default
@Valid
protected TaskRunner taskRunner = Docker.builder()
.type(Docker.class.getName())
.entryPoint(Collections.emptyList())
.build();

@Builder.Default
protected String containerImage = DEFAULT_IMAGE;

@Override
protected DockerOptions injectDefaults(DockerOptions original) {
if (original == null) {
return null;
}

var builder = original.toBuilder();
if (original.getImage() == null) {
builder.image(DEFAULT_IMAGE);
Expand Down
21 changes: 7 additions & 14 deletions src/main/java/io/kestra/plugin/dbt/cli/Setup.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,16 @@
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.runners.PluginUtilsService;
import io.kestra.core.models.tasks.runners.ScriptService;
import io.kestra.core.models.tasks.runners.TaskRunner;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.plugin.scripts.exec.AbstractExecScript;
import io.kestra.plugin.scripts.exec.scripts.models.DockerOptions;
import io.kestra.plugin.scripts.exec.scripts.models.RunnerType;
import io.kestra.plugin.scripts.exec.scripts.models.ScriptOutput;
import io.kestra.plugin.scripts.exec.scripts.runners.CommandsWrapper;
import io.kestra.plugin.scripts.runner.docker.Docker;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.Valid;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
Expand Down Expand Up @@ -139,23 +141,14 @@ public class Setup extends AbstractExecScript implements RunnableTask<ScriptOutp
)
private Object inputFiles;

// set RunnerType to PROCESS to keep backward compatibility as the old script engine has PROCESS by default and the new DOCKER
@Builder.Default
@Schema(
title = "Runner to use."
)
@PluginProperty
@NotNull
protected RunnerType runner = RunnerType.PROCESS;

@Schema(
title = "Docker options for the `DOCKER` runner."
title = "The task runner to use.",
description = "Task runners are provided by plugins, each have their own properties."
)
@PluginProperty
@Builder.Default
protected DockerOptions docker = DockerOptions.builder()
.image(DEFAULT_IMAGE)
.build();
@Valid
protected TaskRunner taskRunner = Docker.INSTANCE;

@Builder.Default
protected String containerImage = DEFAULT_IMAGE;
Expand Down
51 changes: 0 additions & 51 deletions src/test/java/io/kestra/plugin/dbt/cli/BuildRunnerTest.java

This file was deleted.

20 changes: 15 additions & 5 deletions src/test/java/io/kestra/plugin/dbt/cli/BuildTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,24 @@
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.TestsUtils;
import io.kestra.plugin.core.runner.Process;
import io.kestra.plugin.scripts.exec.scripts.models.ScriptOutput;
import io.kestra.core.junit.annotations.KestraTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Stream;

import static io.kestra.core.utils.Rethrow.throwConsumer;
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.*;
import static org.junit.jupiter.api.Assertions.assertTrue;

@KestraTest
Expand All @@ -44,6 +45,7 @@ void run() throws Exception {
Setup setup = Setup.builder()
.id(IdUtils.create())
.type(Setup.class.getName())
.taskRunner(Process.INSTANCE)
.profiles(Map.of(
"unit-kestra", Map.of(
"outputs", Map.of(
Expand All @@ -69,14 +71,22 @@ void run() throws Exception {

setup.run(runContext);

try(var inputStream = new ByteArrayInputStream(Base64.getDecoder().decode(System.getenv("GOOGLE_SERVICE_ACCOUNT").getBytes()))) {
Files.copy(inputStream, runContext.workingDir().resolve(Path.of("sa.json")));
}
Map<String, String> env = new HashMap<>();
env.put("GOOGLE_APPLICATION_CREDENTIALS", runContext.workingDir().resolve(Path.of("sa.json")).toString());
Build task = Build.builder()
.thread(8)
.projectDir(runContext.workingDir().path().toString())
.env(env)
.build();

ScriptOutput runOutput = task.run(runContext);

assertThat(runOutput.getExitCode(), is(0));
assertTrue(runOutput.getOutputFiles().containsKey("run_results.json"));
assertTrue(runOutput.getOutputFiles().containsKey("manifest.json"));
//FIXME, also assert on dynamic tasks generation
// assertTrue(runOutput.getOutputFiles().containsKey("run_results.json"));
// assertTrue(runOutput.getOutputFiles().containsKey("manifest.json"));
}
}
8 changes: 1 addition & 7 deletions src/test/java/io/kestra/plugin/dbt/cli/DbtCLITest.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,11 @@
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.TestsUtils;
import io.kestra.plugin.scripts.exec.scripts.models.DockerOptions;
import io.kestra.plugin.scripts.exec.scripts.models.ScriptOutput;
import io.kestra.core.junit.annotations.KestraTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
Expand Down Expand Up @@ -62,11 +60,7 @@ void run() throws Exception {
target: dev
"""
)
.docker(DockerOptions.builder()
.image("ghcr.io/kestra-io/dbt-bigquery:latest")
.entryPoint(List.of())
.build()
)
.containerImage("ghcr.io/kestra-io/dbt-bigquery:latest")
.commands(List.of("dbt build"))
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@

@KestraTest
class SerializationTest {
@Inject
private RunContextFactory runContextFactory;

@Inject
private ApplicationContext applicationContext;

Expand Down
39 changes: 0 additions & 39 deletions src/test/resources/flows/full.yaml

This file was deleted.