diff --git a/cli/build.gradle b/cli/build.gradle index a2191957a0..f64ecc961e 100644 --- a/cli/build.gradle +++ b/cli/build.gradle @@ -19,4 +19,6 @@ dependencies { compile project(":storage-local") compile project(":storage-gcs") compile project(":storage-minio") + + compile project(":task-gcp") } \ No newline at end of file diff --git a/core/src/main/java/org/floworc/core/runners/RunContext.java b/core/src/main/java/org/floworc/core/runners/RunContext.java index 69f363db85..b839da9c43 100644 --- a/core/src/main/java/org/floworc/core/runners/RunContext.java +++ b/core/src/main/java/org/floworc/core/runners/RunContext.java @@ -6,6 +6,7 @@ import ch.qos.logback.classic.LoggerContext; import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.AppenderBase; +import com.github.jknack.handlebars.EscapingStrategy; import com.github.jknack.handlebars.Handlebars; import com.github.jknack.handlebars.Template; import com.github.jknack.handlebars.helper.*; @@ -32,6 +33,7 @@ @Getter public class RunContext { private static Handlebars handlebars = new Handlebars() + .with(EscapingStrategy.NOOP) .registerHelpers(ConditionalHelpers.class) .registerHelpers(EachHelper.class) .registerHelpers(LogHelper.class) diff --git a/settings.gradle b/settings.gradle index 2963ca5223..cdc0b4183b 100644 --- a/settings.gradle +++ b/settings.gradle @@ -12,3 +12,4 @@ include 'storage-minio' include 'repository-memory' +include 'task-gcp' diff --git a/task-gcp/build.gradle b/task-gcp/build.gradle new file mode 100644 index 0000000000..be7eb9fbfe --- /dev/null +++ b/task-gcp/build.gradle @@ -0,0 +1,9 @@ +sourceCompatibility = 11 + +dependencies { + compile project(":core") + compile 'com.google.cloud:google-cloud-bigquery:1.96.0' + compile 'com.google.cloud:google-cloud-storage:1.96.0' + + testCompile project(':core').sourceSets.test.output +} \ No newline at end of file diff --git a/task-gcp/src/main/java/org/floworc/task/gcp/AbstractConnection.java b/task-gcp/src/main/java/org/floworc/task/gcp/AbstractConnection.java new file mode 100644 index 0000000000..98bff0274e --- /dev/null +++ b/task-gcp/src/main/java/org/floworc/task/gcp/AbstractConnection.java @@ -0,0 +1,18 @@ +package org.floworc.task.gcp; + +import com.google.auth.oauth2.GoogleCredentials; +import com.google.auth.oauth2.ServiceAccountCredentials; + +import java.io.ByteArrayInputStream; +import java.io.IOException; + +public class AbstractConnection { + public GoogleCredentials credentials(String serviceAccount) { + ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(serviceAccount.getBytes()); + try { + return ServiceAccountCredentials.fromStream(byteArrayInputStream); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/task-gcp/src/main/java/org/floworc/task/gcp/bigquery/BigQueryConnection.java b/task-gcp/src/main/java/org/floworc/task/gcp/bigquery/BigQueryConnection.java new file mode 100644 index 0000000000..858b3c3373 --- /dev/null +++ b/task-gcp/src/main/java/org/floworc/task/gcp/bigquery/BigQueryConnection.java @@ -0,0 +1,19 @@ +package org.floworc.task.gcp.bigquery; + +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryOptions; +import org.floworc.task.gcp.AbstractConnection; + +public class BigQueryConnection extends AbstractConnection { + public BigQuery of() { + return BigQueryOptions.getDefaultInstance().getService(); + } + + public BigQuery of(String serviceAccount) { + return BigQueryOptions + .newBuilder() + .setCredentials(this.credentials(serviceAccount)) + .build() + .getService(); + } +} diff --git a/task-gcp/src/main/java/org/floworc/task/gcp/bigquery/BigQueryFetch.java b/task-gcp/src/main/java/org/floworc/task/gcp/bigquery/BigQueryFetch.java new file mode 100644 index 0000000000..d186d3084f --- /dev/null +++ b/task-gcp/src/main/java/org/floworc/task/gcp/bigquery/BigQueryFetch.java @@ -0,0 +1,187 @@ +package org.floworc.task.gcp.bigquery; + +import com.google.cloud.bigquery.*; +import com.google.common.collect.ImmutableMap; +import lombok.*; +import lombok.experimental.FieldDefaults; +import lombok.experimental.SuperBuilder; +import org.floworc.core.models.tasks.RunnableTask; +import org.floworc.core.models.tasks.Task; +import org.floworc.core.runners.RunContext; +import org.floworc.core.runners.RunOutput; +import org.slf4j.Logger; + +import java.io.IOException; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalTime; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +@SuperBuilder +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@Getter +@FieldDefaults(level= AccessLevel.PROTECTED) +@AllArgsConstructor +public class BigQueryFetch extends Task implements RunnableTask { + private String sql; + @Builder.Default + private boolean legacySql = false; + private List positionalParameters; + private Map namedParameters; + + private transient BigQuery connection = new BigQueryConnection().of(); + + @Override + public RunOutput run(RunContext runContext) throws Exception { + Logger logger = runContext.logger(this.getClass()); + String sql = runContext.render(this.sql); + + logger.debug("Starting query '{}'", sql); + + QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(sql) + .setUseLegacySql(this.legacySql) + //.setPositionalParameters(this.positionalParameters) + //.setNamedParameters(this.namedParameters) + .build(); + + // @TODO: named based on execution + JobId jobId = JobId.of(UUID.randomUUID().toString()); + Job queryJob = connection.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build()); + queryJob = queryJob.waitFor(); + +// JobStatistics.LoadStatistics stats = queryJob.getStatistics(); + + this.handleErrors(queryJob, logger); + + TableResult result = queryJob.getQueryResults(); + + return RunOutput + .builder() + .outputs(ImmutableMap.of("rows", this.fetchResult(result))) + .build(); + } + + private List> fetchResult(TableResult result) { + return StreamSupport + .stream(result.getValues().spliterator(), false) + .map(fieldValues -> this.convertRows(result, fieldValues)) + .collect(Collectors.toList()); + } + + private Map convertRows(TableResult result, FieldValueList fieldValues) { + HashMap row = new HashMap<>(); + result + .getSchema() + .getFields() + .forEach(field -> { + row.put(field.getName(), convertCell(field, fieldValues.get(field.getName()), false)); + }); + + return row; + } + + private Object convertCell(Field field, FieldValue value, boolean isRepeated) { + if (field.getMode() == Field.Mode.REPEATED && !isRepeated) { + return value + .getRepeatedValue() + .stream() + .map(fieldValue -> this.convertCell(field, fieldValue, true)) + .collect(Collectors.toList()); + } + + if (LegacySQLTypeName.BOOLEAN.equals(field.getType())) { + return value.getBooleanValue(); + } + + if (LegacySQLTypeName.BYTES.equals(field.getType())) { + return value.getBytesValue(); + } + + if (LegacySQLTypeName.DATE.equals(field.getType())) { + return LocalDate.parse(value.getStringValue()); + } + + if (LegacySQLTypeName.DATETIME.equals(field.getType())) { + return Instant.parse(value.getStringValue() + "Z"); + } + + if (LegacySQLTypeName.FLOAT.equals(field.getType())) { + return value.getDoubleValue(); + } + + if (LegacySQLTypeName.GEOGRAPHY.equals(field.getType())) { + Pattern p = Pattern.compile("^POINT\\(([0-9.]+) ([0-9.]+)\\)$"); + Matcher m = p.matcher(value.getStringValue()); + + if (m.find()) { + return Arrays.asList( + Double.parseDouble(m.group(1)), + Double.parseDouble(m.group(2)) + ); + } + + throw new IllegalFormatFlagsException("Couldn't match '" + value.getStringValue() + "'"); + } + + if (LegacySQLTypeName.INTEGER.equals(field.getType())) { + return value.getLongValue(); + } + + if (LegacySQLTypeName.NUMERIC.equals(field.getType())) { + return value.getDoubleValue(); + } + + if (LegacySQLTypeName.RECORD.equals(field.getType())) { + AtomicInteger counter = new AtomicInteger(0); + + return field + .getSubFields() + .stream() + .map(sub -> new AbstractMap.SimpleEntry<>( + sub.getName(), + this.convertCell(sub, value.getRepeatedValue().get(counter.get()), false) + )) + .peek(u -> counter.getAndIncrement()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + if (LegacySQLTypeName.STRING.equals(field.getType())) { + return value.getStringValue(); + } + + if (LegacySQLTypeName.TIME.equals(field.getType())) { + return LocalTime.parse(value.getStringValue()); + } + + if (LegacySQLTypeName.TIMESTAMP.equals(field.getType())) { + return Instant.ofEpochMilli(value.getTimestampValue() / 1000); + } + + throw new IllegalArgumentException("Invalid type '" + field.getType() + "'"); + } + + private void handleErrors(Job queryJob, Logger logger) throws IOException { + if (queryJob == null) { + throw new IllegalArgumentException("Job no longer exists"); + } else if (queryJob.getStatus().getError() != null) { + + queryJob + .getStatus() + .getExecutionErrors() + .forEach(bigQueryError -> { + logger.error( + "Error query with error [\n - {}\n]", + bigQueryError.toString() + ); + }); + + throw new IOException(queryJob.getStatus().getError().toString()); + } + } +} diff --git a/task-gcp/src/main/java/org/floworc/task/gcp/gcs/GcsConnection.java b/task-gcp/src/main/java/org/floworc/task/gcp/gcs/GcsConnection.java new file mode 100644 index 0000000000..d77009f7c9 --- /dev/null +++ b/task-gcp/src/main/java/org/floworc/task/gcp/gcs/GcsConnection.java @@ -0,0 +1,19 @@ +package org.floworc.task.gcp.gcs; + +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageOptions; +import org.floworc.task.gcp.AbstractConnection; + +public class GcsConnection extends AbstractConnection { + public Storage of() { + return StorageOptions.getDefaultInstance().getService(); + } + + public Storage of(String serviceAccount) { + return StorageOptions + .newBuilder() + .setCredentials(this.credentials(serviceAccount)) + .build() + .getService(); + } +} diff --git a/task-gcp/src/main/java/org/floworc/task/gcp/gcs/GcsCopy.java b/task-gcp/src/main/java/org/floworc/task/gcp/gcs/GcsCopy.java new file mode 100644 index 0000000000..a02b4c0c1d --- /dev/null +++ b/task-gcp/src/main/java/org/floworc/task/gcp/gcs/GcsCopy.java @@ -0,0 +1,59 @@ +package org.floworc.task.gcp.gcs; + +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.BlobId; +import com.google.cloud.storage.Storage; +import com.google.common.collect.ImmutableMap; +import lombok.*; +import lombok.experimental.FieldDefaults; +import lombok.experimental.SuperBuilder; +import org.floworc.core.models.tasks.RunnableTask; +import org.floworc.core.models.tasks.Task; +import org.floworc.core.runners.RunContext; +import org.floworc.core.runners.RunOutput; +import org.slf4j.Logger; + +import java.net.URI; + +@SuperBuilder +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@Getter +@FieldDefaults(level= AccessLevel.PROTECTED) +@AllArgsConstructor +public class GcsCopy extends Task implements RunnableTask { + private String from; + private String to; + @Builder.Default + private boolean delete = false; + + private transient GcsConnection gcsConnection = new GcsConnection(); + + @Override + public RunOutput run(RunContext runContext) throws Exception { + Logger logger = runContext.logger(this.getClass()); + URI from = new URI(runContext.render(this.from)); + URI to = new URI(runContext.render(this.to)); + + BlobId source = BlobId.of(from.getAuthority(), from.getPath().substring(1)); + + logger.debug("Moving from '{}' to '{}'", from, to); + + Blob result = gcsConnection.of() + .copy(Storage.CopyRequest.newBuilder() + .setSource(source) + .setTarget(BlobId.of(to.getAuthority(), to.getPath().substring(1))) + .build() + ) + .getResult(); + + if (this.delete) { + gcsConnection.of().delete(source); + } + + return RunOutput + .builder() + .outputs(ImmutableMap.of("uri", new URI("gs://" + result.getBucket() + "/" + result.getName()))) + .build(); + } +} diff --git a/task-gcp/src/test/java/org/floworc/task/gcp/bigquery/BigQueryFetchTest.java b/task-gcp/src/test/java/org/floworc/task/gcp/bigquery/BigQueryFetchTest.java new file mode 100644 index 0000000000..1181014353 --- /dev/null +++ b/task-gcp/src/test/java/org/floworc/task/gcp/bigquery/BigQueryFetchTest.java @@ -0,0 +1,57 @@ +package org.floworc.task.gcp.bigquery; + +import com.google.common.collect.ImmutableMap; +import org.floworc.core.runners.RunContext; +import org.floworc.core.runners.RunOutput; +import org.junit.jupiter.api.Test; + +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalTime; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.is; + +class BigQueryFetchTest { + @Test + @SuppressWarnings("unchecked") + void run() throws Exception { + RunContext runContext = new RunContext(ImmutableMap.of( + "sql", "SELECT " + + " \"hello\" as string," + + " 1 as int," + + " 1.25 AS float," + + " DATE(\"2008-12-25\") AS date," + + " DATETIME \"2008-12-25 15:30:00.123456\" AS datetime," + + " TIME(DATETIME \"2008-12-25 15:30:00.123456\") AS time," + + " TIMESTAMP(\"2008-12-25 15:30:00.123456\") AS timestamp," + + " ST_GEOGPOINT(50.6833, 2.9) AS geopoint," + + " ARRAY(SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3) AS `array`," + + " STRUCT(4 AS x, 0 AS y, ARRAY(SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3) AS z) AS `struct`" + )); + + BigQueryFetch task = BigQueryFetch.builder() + .sql("{{sql}}") + .build(); + + RunOutput run = task.run(runContext); + + List> rows = (List>) run.getOutputs().get("rows"); + assertThat(rows.size(), is(1)); + + assertThat(rows.get(0).get("string"), is("hello")); + assertThat(rows.get(0).get("int"), is(1L)); + assertThat(rows.get(0).get("float"), is(1.25D)); + assertThat(rows.get(0).get("date"), is(LocalDate.parse("2008-12-25"))); + assertThat(rows.get(0).get("time"), is(LocalTime.parse("15:30:00.123456"))); + assertThat(rows.get(0).get("timestamp"), is(Instant.parse("2008-12-25T15:30:00.123Z"))); + assertThat((List) rows.get(0).get("geopoint"), containsInAnyOrder(50.6833, 2.9)); + assertThat((List) rows.get(0).get("array"), containsInAnyOrder(1L, 2L, 3L)); + assertThat(((Map) rows.get(0).get("struct")).get("x"), is(4L)); + assertThat(((Map) rows.get(0).get("struct")).get("y"), is(0L)); + assertThat((List) ((Map) rows.get(0).get("struct")).get("z"), containsInAnyOrder(1L, 2L, 3L)); + } +} \ No newline at end of file diff --git a/task-gcp/src/test/java/org/floworc/task/gcp/gcs/GcsCopyTest.java b/task-gcp/src/test/java/org/floworc/task/gcp/gcs/GcsCopyTest.java new file mode 100644 index 0000000000..83b75df1bc --- /dev/null +++ b/task-gcp/src/test/java/org/floworc/task/gcp/gcs/GcsCopyTest.java @@ -0,0 +1,36 @@ +package org.floworc.task.gcp.gcs; + +import com.google.common.collect.ImmutableMap; +import io.micronaut.context.annotation.Value; +import io.micronaut.test.annotation.MicronautTest; +import org.floworc.core.runners.RunContext; +import org.floworc.core.runners.RunOutput; +import org.junit.jupiter.api.Test; + +import javax.inject.Inject; +import java.net.URI; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +@MicronautTest +class GcsCopyTest { + @Value("${floworc.tasks.gcs.bucket}") + private String bucket; + + @Test + void run() throws Exception { + RunContext runContext = new RunContext(ImmutableMap.of( + "bucket", this.bucket + )); + + GcsCopy task = GcsCopy.builder() + .from("gs://{{bucket}}/file/storage/get.yml") + .to("gs://{{bucket}}/file/storage/get2.yml") + .build(); + + RunOutput run = task.run(runContext); + + assertThat(run.getOutputs().get("uri"), is(new URI("gs://airflow_tmp_lmfr-ddp-dcp-dev/file/storage/get2.yml"))); + } +} \ No newline at end of file diff --git a/task-gcp/src/test/resources/application.yml b/task-gcp/src/test/resources/application.yml new file mode 100644 index 0000000000..4298553230 --- /dev/null +++ b/task-gcp/src/test/resources/application.yml @@ -0,0 +1,4 @@ +floworc: + tasks: + gcs: + bucket: "floworc-unit-test" \ No newline at end of file