Skip to content

Commit

Permalink
feat(task-gcp): introduce first gcp tasks
Browse files Browse the repository at this point in the history
- BigQueryFetch
- GcsCopy
  • Loading branch information
tchiotludo committed Oct 19, 2019
1 parent d1f2aff commit 14e3384
Show file tree
Hide file tree
Showing 12 changed files with 413 additions and 0 deletions.
2 changes: 2 additions & 0 deletions cli/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,6 @@ dependencies {
compile project(":storage-local")
compile project(":storage-gcs")
compile project(":storage-minio")

compile project(":task-gcp")
}
2 changes: 2 additions & 0 deletions core/src/main/java/org/floworc/core/runners/RunContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.AppenderBase;
import com.github.jknack.handlebars.EscapingStrategy;
import com.github.jknack.handlebars.Handlebars;
import com.github.jknack.handlebars.Template;
import com.github.jknack.handlebars.helper.*;
Expand All @@ -32,6 +33,7 @@
@Getter
public class RunContext {
private static Handlebars handlebars = new Handlebars()
.with(EscapingStrategy.NOOP)
.registerHelpers(ConditionalHelpers.class)
.registerHelpers(EachHelper.class)
.registerHelpers(LogHelper.class)
Expand Down
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ include 'storage-minio'

include 'repository-memory'

include 'task-gcp'
9 changes: 9 additions & 0 deletions task-gcp/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
sourceCompatibility = 11

dependencies {
compile project(":core")
compile 'com.google.cloud:google-cloud-bigquery:1.96.0'
compile 'com.google.cloud:google-cloud-storage:1.96.0'

testCompile project(':core').sourceSets.test.output
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.floworc.task.gcp;

import com.google.auth.oauth2.GoogleCredentials;
import com.google.auth.oauth2.ServiceAccountCredentials;

import java.io.ByteArrayInputStream;
import java.io.IOException;

public class AbstractConnection {
public GoogleCredentials credentials(String serviceAccount) {
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(serviceAccount.getBytes());
try {
return ServiceAccountCredentials.fromStream(byteArrayInputStream);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.floworc.task.gcp.bigquery;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import org.floworc.task.gcp.AbstractConnection;

public class BigQueryConnection extends AbstractConnection {
public BigQuery of() {
return BigQueryOptions.getDefaultInstance().getService();
}

public BigQuery of(String serviceAccount) {
return BigQueryOptions
.newBuilder()
.setCredentials(this.credentials(serviceAccount))
.build()
.getService();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
package org.floworc.task.gcp.bigquery;

import com.google.cloud.bigquery.*;
import com.google.common.collect.ImmutableMap;
import lombok.*;
import lombok.experimental.FieldDefaults;
import lombok.experimental.SuperBuilder;
import org.floworc.core.models.tasks.RunnableTask;
import org.floworc.core.models.tasks.Task;
import org.floworc.core.runners.RunContext;
import org.floworc.core.runners.RunOutput;
import org.slf4j.Logger;

import java.io.IOException;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

@SuperBuilder
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
@Getter
@FieldDefaults(level= AccessLevel.PROTECTED)
@AllArgsConstructor
public class BigQueryFetch extends Task implements RunnableTask {
private String sql;
@Builder.Default
private boolean legacySql = false;
private List<String> positionalParameters;
private Map<String, String> namedParameters;

private transient BigQuery connection = new BigQueryConnection().of();

@Override
public RunOutput run(RunContext runContext) throws Exception {
Logger logger = runContext.logger(this.getClass());
String sql = runContext.render(this.sql);

logger.debug("Starting query '{}'", sql);

QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(sql)
.setUseLegacySql(this.legacySql)
//.setPositionalParameters(this.positionalParameters)
//.setNamedParameters(this.namedParameters)
.build();

// @TODO: named based on execution
JobId jobId = JobId.of(UUID.randomUUID().toString());
Job queryJob = connection.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());
queryJob = queryJob.waitFor();

// JobStatistics.LoadStatistics stats = queryJob.getStatistics();

this.handleErrors(queryJob, logger);

TableResult result = queryJob.getQueryResults();

return RunOutput
.builder()
.outputs(ImmutableMap.of("rows", this.fetchResult(result)))
.build();
}

private List<Map<String, Object>> fetchResult(TableResult result) {
return StreamSupport
.stream(result.getValues().spliterator(), false)
.map(fieldValues -> this.convertRows(result, fieldValues))
.collect(Collectors.toList());
}

private Map<String, Object> convertRows(TableResult result, FieldValueList fieldValues) {
HashMap<String, Object> row = new HashMap<>();
result
.getSchema()
.getFields()
.forEach(field -> {
row.put(field.getName(), convertCell(field, fieldValues.get(field.getName()), false));
});

return row;
}

private Object convertCell(Field field, FieldValue value, boolean isRepeated) {
if (field.getMode() == Field.Mode.REPEATED && !isRepeated) {
return value
.getRepeatedValue()
.stream()
.map(fieldValue -> this.convertCell(field, fieldValue, true))
.collect(Collectors.toList());
}

if (LegacySQLTypeName.BOOLEAN.equals(field.getType())) {
return value.getBooleanValue();
}

if (LegacySQLTypeName.BYTES.equals(field.getType())) {
return value.getBytesValue();
}

if (LegacySQLTypeName.DATE.equals(field.getType())) {
return LocalDate.parse(value.getStringValue());
}

if (LegacySQLTypeName.DATETIME.equals(field.getType())) {
return Instant.parse(value.getStringValue() + "Z");
}

if (LegacySQLTypeName.FLOAT.equals(field.getType())) {
return value.getDoubleValue();
}

if (LegacySQLTypeName.GEOGRAPHY.equals(field.getType())) {
Pattern p = Pattern.compile("^POINT\\(([0-9.]+) ([0-9.]+)\\)$");
Matcher m = p.matcher(value.getStringValue());

if (m.find()) {
return Arrays.asList(
Double.parseDouble(m.group(1)),
Double.parseDouble(m.group(2))
);
}

throw new IllegalFormatFlagsException("Couldn't match '" + value.getStringValue() + "'");
}

if (LegacySQLTypeName.INTEGER.equals(field.getType())) {
return value.getLongValue();
}

if (LegacySQLTypeName.NUMERIC.equals(field.getType())) {
return value.getDoubleValue();
}

if (LegacySQLTypeName.RECORD.equals(field.getType())) {
AtomicInteger counter = new AtomicInteger(0);

return field
.getSubFields()
.stream()
.map(sub -> new AbstractMap.SimpleEntry<>(
sub.getName(),
this.convertCell(sub, value.getRepeatedValue().get(counter.get()), false)
))
.peek(u -> counter.getAndIncrement())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

if (LegacySQLTypeName.STRING.equals(field.getType())) {
return value.getStringValue();
}

if (LegacySQLTypeName.TIME.equals(field.getType())) {
return LocalTime.parse(value.getStringValue());
}

if (LegacySQLTypeName.TIMESTAMP.equals(field.getType())) {
return Instant.ofEpochMilli(value.getTimestampValue() / 1000);
}

throw new IllegalArgumentException("Invalid type '" + field.getType() + "'");
}

private void handleErrors(Job queryJob, Logger logger) throws IOException {
if (queryJob == null) {
throw new IllegalArgumentException("Job no longer exists");
} else if (queryJob.getStatus().getError() != null) {

queryJob
.getStatus()
.getExecutionErrors()
.forEach(bigQueryError -> {
logger.error(
"Error query with error [\n - {}\n]",
bigQueryError.toString()
);
});

throw new IOException(queryJob.getStatus().getError().toString());
}
}
}
19 changes: 19 additions & 0 deletions task-gcp/src/main/java/org/floworc/task/gcp/gcs/GcsConnection.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.floworc.task.gcp.gcs;

import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import org.floworc.task.gcp.AbstractConnection;

public class GcsConnection extends AbstractConnection {
public Storage of() {
return StorageOptions.getDefaultInstance().getService();
}

public Storage of(String serviceAccount) {
return StorageOptions
.newBuilder()
.setCredentials(this.credentials(serviceAccount))
.build()
.getService();
}
}
59 changes: 59 additions & 0 deletions task-gcp/src/main/java/org/floworc/task/gcp/gcs/GcsCopy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package org.floworc.task.gcp.gcs;

import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.Storage;
import com.google.common.collect.ImmutableMap;
import lombok.*;
import lombok.experimental.FieldDefaults;
import lombok.experimental.SuperBuilder;
import org.floworc.core.models.tasks.RunnableTask;
import org.floworc.core.models.tasks.Task;
import org.floworc.core.runners.RunContext;
import org.floworc.core.runners.RunOutput;
import org.slf4j.Logger;

import java.net.URI;

@SuperBuilder
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
@Getter
@FieldDefaults(level= AccessLevel.PROTECTED)
@AllArgsConstructor
public class GcsCopy extends Task implements RunnableTask {
private String from;
private String to;
@Builder.Default
private boolean delete = false;

private transient GcsConnection gcsConnection = new GcsConnection();

@Override
public RunOutput run(RunContext runContext) throws Exception {
Logger logger = runContext.logger(this.getClass());
URI from = new URI(runContext.render(this.from));
URI to = new URI(runContext.render(this.to));

BlobId source = BlobId.of(from.getAuthority(), from.getPath().substring(1));

logger.debug("Moving from '{}' to '{}'", from, to);

Blob result = gcsConnection.of()
.copy(Storage.CopyRequest.newBuilder()
.setSource(source)
.setTarget(BlobId.of(to.getAuthority(), to.getPath().substring(1)))
.build()
)
.getResult();

if (this.delete) {
gcsConnection.of().delete(source);
}

return RunOutput
.builder()
.outputs(ImmutableMap.of("uri", new URI("gs://" + result.getBucket() + "/" + result.getName())))
.build();
}
}
Loading

0 comments on commit 14e3384

Please sign in to comment.