Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version=0.10.1-SNAPSHOT
kestraVersion=0.10.+
version=0.11.0-SNAPSHOT
kestraVersion=0.11.+
micronautVersion=3.9.3
lombokVersion=1.18.28
205 changes: 158 additions & 47 deletions src/main/java/io/kestra/plugin/dbt/cli/AbstractDbt.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,41 @@
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.RunContext;
import io.kestra.core.tasks.scripts.AbstractBash;
import io.kestra.core.tasks.scripts.AbstractLogThread;
import io.kestra.core.tasks.scripts.ScriptOutput;
import io.kestra.core.tasks.scripts.BashService;
import io.kestra.plugin.dbt.ResultParser;
import io.kestra.plugin.scripts.exec.scripts.models.DockerOptions;
import io.kestra.plugin.scripts.exec.scripts.models.RunnerType;
import io.kestra.plugin.scripts.exec.scripts.models.ScriptOutput;
import io.kestra.plugin.scripts.exec.scripts.runners.AbstractLogConsumer;
import io.kestra.plugin.scripts.exec.scripts.runners.CommandsWrapper;
import io.kestra.plugin.scripts.exec.scripts.services.ScriptService;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
import org.slf4j.Logger;
import org.apache.commons.io.FileUtils;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;

import static io.kestra.core.utils.Rethrow.throwSupplier;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
public abstract class AbstractDbt extends AbstractBash implements RunnableTask<ScriptOutput> {
public abstract class AbstractDbt extends Task implements RunnableTask<ScriptOutput> {
@Builder.Default
@Schema(
title = "Stop execution at the first failure."
Expand Down Expand Up @@ -63,64 +76,162 @@ public abstract class AbstractDbt extends AbstractBash implements RunnableTask<S
@PluginProperty(dynamic = true)
String dbtPath = "./bin/dbt";

protected abstract java.util.List<String> commands(RunContext runContext) throws IllegalVariableEvaluationException;
@Schema(
title = "Input files are extra files that will be available in the dbt working directory.",
description = "You can define the files as map or a JSON string. " +
"Each file can be defined inlined or can reference a file from Kestra's internal storage."
)
@PluginProperty(
additionalProperties = String.class,
dynamic = true
)
private Object inputFiles;

@Override
public ScriptOutput run(RunContext runContext) throws Exception {
if (this.workingDirectory == null) {
this.workingDirectory = runContext.tempDir();
}
@Schema(
title = "The `profiles.yml` file content",
description = "If a `profile.yml` file already exist in the current working directory, setting this property will generate an error."
)
@PluginProperty(dynamic = true)
private String profiles;

ScriptOutput run = run(runContext, throwSupplier(() -> {
java.util.List<String> commands = new ArrayList<>(java.util.List.of(
runContext.render(dbtPath),
"--log-format json"
));
// set RunnerType to PROCESS to keep backward compatibility as the old script engine has PROCESS by default and the new DOCKER
@Builder.Default
@Schema(
title = "Runner to use"
)
@PluginProperty
@NotNull
@NotEmpty
protected RunnerType runner = RunnerType.PROCESS;

if (this.projectDir != null) {
commands.add("--project-dir " + runContext.render(this.projectDir));
}
@Schema(
title = "Docker options for the `DOCKER` runner"
)
@PluginProperty
@Builder.Default
private DockerOptions docker = DockerOptions.builder()
.image("ghcr.io/kestra-io/dbt")
.entryPoint(List.of())
.build();

if (this.debug) {
commands.add("--debug");
}
@Schema(title = "Deprecated, use the `docker` property instead", deprecated = true)
@PluginProperty
@Deprecated
public DockerOptions getDockerOptions() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does the getter is needed (only the setter no?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

without the getter the property is not added to the JSONSchema so old flow will no longer validate.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a good way to force the migration no?

return docker;
}

@Deprecated
public void setDockerOptions(DockerOptions dockerOptions) {
this.docker = dockerOptions;
}

if (this.failFast) {
commands.add("--fail-fast");
}
@Schema(
title = "Additional environment variables for the current process."
)
@PluginProperty(
additionalProperties = String.class,
dynamic = true
)
protected Map<String, String> env;

if (this.warnError) {
commands.add("--warn-error");
protected abstract java.util.List<String> dbtCommands(RunContext runContext, Path workingDirectory) throws IllegalVariableEvaluationException;

@Override
public ScriptOutput run(RunContext runContext) throws Exception {
CommandsWrapper commandsWrapper = new CommandsWrapper(runContext)
.withEnv(this.getEnv())
.withRunnerType(this.getRunner())
.withDockerOptions(this.getDocker())
.withLogConsumer(new AbstractLogConsumer() {
@Override
public void accept(String line, Boolean isStdErr) throws Exception {
LogService.parse(runContext, line);
}
});
Path workingDirectory = commandsWrapper.getWorkingDirectory();

if (profiles != null && !profiles.isEmpty()) {
if (Files.exists(Path.of(".profiles/profiles.yml"))) {
throw new IllegalArgumentException("Cannot use the profiles property if there is already a 'profiles.yml' file");
}

commands.addAll(commands(runContext));
FileUtils.writeStringToFile(
new File(workingDirectory.resolve(".profile").toString(), "profiles.yml"),
runContext.render(profiles),
StandardCharsets.UTF_8
);
}

BashService.createInputFiles(
runContext,
workingDirectory,
this.finalInputFiles(runContext),
Collections.emptyMap()
);

return String.join(" ", commands);
}));
List<String> commandsArgs = ScriptService.scriptCommands(
List.of("/bin/sh", "-c"),
null,
List.of(createDbtCommand(runContext, workingDirectory))
);

parseResults(runContext, run);
ScriptOutput run = commandsWrapper
.addEnv(Map.of("PYTHONUNBUFFERED", "true"))
.withCommands(commandsArgs)
.run();

parseResults(runContext, workingDirectory, run);

return run;
}

protected void parseResults(RunContext runContext, ScriptOutput scriptOutput) throws IllegalVariableEvaluationException, IOException {
URI results = ResultParser.parseRunResult(runContext, this.workingDirectory.resolve("target/run_results.json").toFile());
scriptOutput.getOutputFiles().put("run_results.json", results);
URI manifest = ResultParser.parseManifest(runContext, this.workingDirectory.resolve("target/manifest.json").toFile());
scriptOutput.getOutputFiles().put("manifest.json", manifest);
private Map<String, String> finalInputFiles(RunContext runContext) throws IOException, IllegalVariableEvaluationException {
return this.inputFiles != null ? new HashMap<>(BashService.transformInputFiles(runContext, this.inputFiles)) : new HashMap<>();
}

@Override
protected LogSupplier defaultLogSupplier(Logger logger, RunContext runContext) {
return (inputStream, isStdErr) -> {
AbstractLogThread thread;
thread = new DbtLogParser(inputStream, logger, runContext);
thread.setName("dbt-log-" + (isStdErr ? "-err" : "-out"));
private String createDbtCommand(RunContext runContext, Path workingDirectory) throws IllegalVariableEvaluationException {
List<String> commands = new ArrayList<>(List.of(
runContext.render(dbtPath),
"--log-format json"
));

if (this.debug) {
commands.add("--debug");
}

thread.start();
if (this.failFast) {
commands.add("--fail-fast");
}

return thread;
};
if (this.warnError) {
commands.add("--warn-error");
}

commands.addAll(dbtCommands(runContext, workingDirectory));

if (this.projectDir != null) {
commands.add("--project-dir " + runContext.render(this.projectDir));
}

return String.join(" ", commands);
}

protected void parseResults(RunContext runContext, Path workingDirectory, ScriptOutput scriptOutput) throws IllegalVariableEvaluationException, IOException {
String baseDir = this.projectDir != null ? runContext.render(this.projectDir) : "";

File runResults = workingDirectory.resolve(baseDir + "target/run_results.json").toFile();

if (runResults.exists()) {
URI results = ResultParser.parseRunResult(runContext, runResults);
scriptOutput.getOutputFiles().put("run_results.json", results);
}

File manifestFile = workingDirectory.resolve(baseDir + "target/manifest.json").toFile();

if (manifestFile.exists()) {
URI manifest = ResultParser.parseManifest(runContext, manifestFile);
scriptOutput.getOutputFiles().put("manifest.json", manifest);
}
}
}
9 changes: 5 additions & 4 deletions src/main/java/io/kestra/plugin/dbt/cli/AbstractRun.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import lombok.*;
import lombok.experimental.SuperBuilder;

import java.nio.file.Path;
import java.util.ArrayList;

@SuperBuilder
Expand Down Expand Up @@ -53,13 +54,13 @@ public abstract class AbstractRun extends AbstractDbt {
@PluginProperty(dynamic = true)
java.util.List<String> exclude;

abstract protected String command();
abstract protected String dbtCommand();

@Override
protected java.util.List<String> commands(RunContext runContext) throws IllegalVariableEvaluationException {
protected java.util.List<String> dbtCommands(RunContext runContext, Path workingDirectory) throws IllegalVariableEvaluationException {
java.util.List<String> commands = new ArrayList<>(java.util.List.of(
this.command(),
"--profiles-dir " + this.workingDirectory.resolve(".profile").toAbsolutePath()));
this.dbtCommand(),
"--profiles-dir " + workingDirectory.resolve(".profile").toAbsolutePath()));

if (this.thread != null) {
commands.add("--threads " + this.thread);
Expand Down
48 changes: 24 additions & 24 deletions src/main/java/io/kestra/plugin/dbt/cli/Build.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,36 +33,36 @@
type: io.kestra.plugin.git.Clone
url: https://github.com/kestra-io/dbt-demo
branch: main
- id: dbt-build
type: io.kestra.plugin.dbt.cli.Build
runner: DOCKER
dbtPath: /usr/local/bin/dbt
dockerOptions:
image: ghcr.io/kestra-io/dbt-bigquery:latest
inputFiles:
.profile/profiles.yml: |
jaffle_shop:
outputs:
dev:
type: bigquery
dataset: dwh
fixed_retries: 1
keyfile: sa.json
location: EU
method: service-account
priority: interactive
project: my-project
threads: 8
timeout_seconds: 300
target: dev
sa.json: "{{ secret('GCP_CREDS') }}"
- id: dbt-build
type: io.kestra.plugin.dbt.cli.Build
runner: DOCKER
dbtPath: /usr/local/bin/dbt
docker:
image: ghcr.io/kestra-io/dbt-bigquery:latest
profiles: |
jaffle_shop:
outputs:
dev:
type: bigquery
dataset: dwh
fixed_retries: 1
keyfile: sa.json
location: EU
method: service-account
priority: interactive
project: my-project
threads: 8
timeout_seconds: 300
target: dev
inputFiles:
sa.json: "{{ secret('GCP_CREDS') }}"
"""
)
}
)
public class Build extends AbstractRun {
@Override
protected String command() {
protected String dbtCommand() {
return "build";
}
}
Loading