Skip to content
This repository has been archived by the owner on Aug 31, 2022. It is now read-only.

migrate to new bq client #201

Merged
merged 11 commits into from
Jan 17, 2019
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,9 @@
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.FieldValue;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryRequest;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.TableId;
import com.spotify.flo.TestContext;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -156,11 +150,6 @@ public JobInfo job(JobInfo jobInfo, JobOption... options) {
return jobInfo;
}

@Override
public BigQueryResult query(QueryRequest request) {
return new MockQueryResult(request);
}

@Override
public void publish(StagingTableId stagingTableId, TableId tableId) {
stagingTableIds.remove(formatTableIdKey(tableId));
Expand All @@ -169,39 +158,5 @@ public void publish(StagingTableId stagingTableId, TableId tableId) {
publishedTables.computeIfAbsent(datasetId, k -> new ConcurrentSkipListSet<>())
.add(tableId.getTable());
}

private class MockQueryResult implements BigQueryResult {

private final QueryRequest request;

public MockQueryResult(QueryRequest request) {
this.request = Objects.requireNonNull(request, "request");
}

@Override
public boolean cacheHit() {
return false;
}

@Override
public Schema schema() {
throw new UnsupportedOperationException("TODO");
}

@Override
public long totalBytesProcessed() {
throw new UnsupportedOperationException("TODO");
}

@Override
public long totalRows() {
throw new UnsupportedOperationException("TODO");
}

@Override
public Iterator<List<FieldValue>> iterator() {
throw new UnsupportedOperationException("TODO");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,63 +20,45 @@

package com.spotify.flo.contrib.bigquery;

import static com.google.cloud.bigquery.QueryJobConfiguration.newBuilder;

import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryRequest;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.spotify.flo.Fn;
import com.spotify.flo.TaskBuilder.F1;
import java.io.Serializable;
import java.util.Objects;
import java.util.UUID;

/**
* A BigQuery operation to be executed by the {@link BigQueryOperator}.
*/
public class BigQueryOperation<T, R> implements Serializable {
public class BigQueryOperation<T> implements Serializable {

private static final long serialVersionUID = 1L;

Fn<JobInfo> jobRequest;
Fn<QueryRequest> queryRequest;
F1<Object, T> success;

/**
* Run a query. Result are returned directly and not written to a table.
*/
@SuppressWarnings("unchecked")
BigQueryOperation<T, BigQueryResult> query(Fn<QueryRequest> queryRequest) {
if (jobRequest != null) {
throw new IllegalStateException("can only run either a query or a job");
}
this.queryRequest = Objects.requireNonNull(queryRequest);
return (BigQueryOperation<T, BigQueryResult>) this;
}
F1<JobInfo, T> success;

/**
* Run a job. This can be a query, copy, load or extract with results written to a table, etc.
*/
@SuppressWarnings("unchecked")
BigQueryOperation<T, JobInfo> job(Fn<JobInfo> jobRequest) {
if (queryRequest != null) {
throw new IllegalStateException("can only run either a query or a job");
}
BigQueryOperation<T> job(Fn<JobInfo> jobRequest) {
this.jobRequest = Objects.requireNonNull(jobRequest);
return (BigQueryOperation<T, JobInfo>) this;
return this;
}

/**
* Specify some action to take on success. E.g. publishing a staging table.
*/
@SuppressWarnings("unchecked")
BigQueryOperation<T, R> success(F1<R, T> success) {
this.success = (F1<Object, T>) Objects.requireNonNull(success);
BigQueryOperation<T> success(F1<JobInfo, T> success) {
this.success = Objects.requireNonNull(success);
return this;
}

static <T> BigQueryOperation<T, BigQueryResult> ofQuery(Fn<QueryRequest> queryRequest) {
return new BigQueryOperation<T, BigQueryResult>().query(queryRequest);
}

static <T> BigQueryOperation<T, JobInfo> ofJob(Fn<JobInfo> job) {
return new BigQueryOperation<T, JobInfo>().job(job);
static <T> BigQueryOperation<T> ofJob(Fn<JobInfo> job) {
return new BigQueryOperation<T>().job(job);
}

public static class Provider<T> implements Serializable {
Expand All @@ -86,27 +68,31 @@ public static class Provider<T> implements Serializable {
Provider() {
}

public BigQueryOperation<T, Object> bq() {
public BigQueryOperation<T> bq() {
return new BigQueryOperation<>();
}

public BigQueryOperation<T, BigQueryResult> query(Fn<QueryRequest> queryRequest) {
return BigQueryOperation.ofQuery(queryRequest);
}

public BigQueryOperation<T, BigQueryResult> query(QueryRequest queryRequest) {
return BigQueryOperation.ofQuery(() -> queryRequest);
}

public BigQueryOperation<T, BigQueryResult> query(String query) {
return BigQueryOperation.ofQuery(() -> QueryRequest.of(query));
/**
* Use standard SQL syntax for queries.
* See: https://cloud.google.com/bigquery/sql-reference/
*
* @param query the standard (non legacy) SQL statement
*
* @return a {@link BigQueryOperation} instance to be executed by the {@link BigQueryOperator}
*/
public BigQueryOperation<T> query(String query) {
final QueryJobConfiguration queryConfig = newBuilder(query)
.setUseLegacySql(false)
.build();
final JobId jobId = JobId.of(UUID.randomUUID().toString());
return job(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());
}

public BigQueryOperation<T, JobInfo> job(Fn<JobInfo> jobInfo) {
public BigQueryOperation<T> job(Fn<JobInfo> jobInfo) {
return BigQueryOperation.ofJob(jobInfo);
}

public BigQueryOperation<T, JobInfo> job(JobInfo jobInfo) {
public BigQueryOperation<T> job(JobInfo jobInfo) {
return BigQueryOperation.ofJob(() -> jobInfo);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,34 +27,28 @@
import com.spotify.flo.TaskOperator;
import com.spotify.flo.contrib.bigquery.BigQueryOperation.Provider;

public class BigQueryOperator<T> implements TaskOperator<BigQueryOperation.Provider<T>, BigQueryOperation<T, ?>, T> {
public class BigQueryOperator<T> implements TaskOperator<BigQueryOperation.Provider<T>, BigQueryOperation<T>, T> {

private static final long serialVersionUID = 1L;

private BigQueryOperator() {
}

@Override
public T perform(BigQueryOperation<T, ?> spec, Listener listener) {
final Object result;
if (spec.queryRequest != null) {
result = runQuery(spec);
} else if (spec.jobRequest != null) {
public T perform(BigQueryOperation<T> spec, Listener listener) {
final JobInfo result;
if (spec.jobRequest != null) {
result = runJob(spec);
} else {
throw new AssertionError();
}
return spec.success.apply(result);
}

private JobInfo runJob(BigQueryOperation<T, ?> spec) {
private JobInfo runJob(BigQueryOperation<T> spec) {
return bq().job(spec.jobRequest.get());
}

private BigQueryResult runQuery(BigQueryOperation<T, ?> spec) {
return bq().query(spec.queryRequest.get());
}

public static <T> BigQueryOperator<T> create() {
return new BigQueryOperator<>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,15 @@

package com.spotify.flo.contrib.bigquery;

import com.google.cloud.bigquery.FieldValue;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.Schema;
import java.util.List;

/**
* A mockable {@link com.google.cloud.bigquery.QueryResult}
* A mockable {@link com.google.cloud.bigquery.TableResult}
*/
public interface BigQueryResult extends Iterable<List<FieldValue>> {

boolean cacheHit();
public interface BigQueryResult extends Iterable<FieldValueList> {

Schema schema();

long totalBytesProcessed();

long totalRows();
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,20 @@

import static com.spotify.flo.contrib.bigquery.FloBigQueryClient.randomStagingTableId;

import com.google.cloud.WaitForOption;
import com.google.cloud.RetryOption;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQuery.JobOption;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.CopyJobConfiguration;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.FieldValue;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryRequest;
import com.google.cloud.bigquery.QueryResponse;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.TableId;
import java.util.concurrent.ThreadLocalRandom;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.threeten.bp.Duration;

class DefaultBigQueryClient implements FloBigQueryClient {

Expand Down Expand Up @@ -75,25 +66,6 @@ public TableId createStagingTableId(TableId tableId, String location) {
return randomStagingTableId(tableId, location);
}

@Override
public BigQueryResult query(QueryRequest queryRequest) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to bring back this method?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not unless it's needed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really. Even query is wrapped in a JobInfo, and QueryRequest should not be used anymore.

QueryResponse response = client.query(queryRequest);
while (!response.jobCompleted()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
response = client.getQueryResults(response.getJobId());
}
if (response.hasErrors()) {
throw new RuntimeException("BigQuery query failed: " + response.getExecutionErrors());
}
return DefaultQueryResult.of(response);

}

@Override
public JobInfo job(JobInfo jobInfo, JobOption... options) {
Job job = client.create(jobInfo, options);
Expand All @@ -106,6 +78,7 @@ public JobInfo job(JobInfo jobInfo, JobOption... options) {
}
job = job.reload();
}

final BigQueryError error = job.getStatus().getError();
if (error != null) {
throw new BigQueryException(0, "BigQuery job failed: " + error);
Expand All @@ -119,12 +92,13 @@ public void publish(StagingTableId stagingTableId, TableId tableId) {
LOG.debug("copying staging table {} to {}", staging, tableId);
try {
final Job job = client.create(JobInfo.of(CopyJobConfiguration.of(tableId, staging)))
.waitFor(WaitForOption.timeout(1, TimeUnit.MINUTES));
.waitFor(RetryOption.initialRetryDelay(Duration.ofSeconds(1)),
RetryOption.totalTimeout(Duration.ofMinutes(1L)));
throwIfUnsuccessfulJobStatus(job, tableId);
} catch (BigQueryException e) {
LOG.error("Could not copy BigQuery table {} from staging to target", tableId, e);
throw e;
} catch (InterruptedException | TimeoutException e) {
} catch (InterruptedException e) {
LOG.error("Could not copy BigQuery table {} from staging to target", tableId, e);
throw new RuntimeException(e);
}
Expand All @@ -148,42 +122,4 @@ private static void throwIfUnsuccessfulJobStatus(Job job, TableId tableId) {
throw new RuntimeException(error);
}
}

private static class DefaultQueryResult implements BigQueryResult {

private final QueryResponse response;

private DefaultQueryResult(QueryResponse response) {
this.response = Objects.requireNonNull(response, "response");
}

@Override
public boolean cacheHit() {
return response.getResult().cacheHit();
}

@Override
public Schema schema() {
return response.getResult().getSchema();
}

@Override
public long totalBytesProcessed() {
return response.getResult().getTotalBytesProcessed();
}

@Override
public long totalRows() {
return response.getResult().getTotalRows();
}

@Override
public Iterator<List<FieldValue>> iterator() {
return response.getResult().iterateAll().iterator();
}

public static DefaultQueryResult of(QueryResponse response) {
return new DefaultQueryResult(response);
}
}
}