This repository has been archived by the owner on Aug 31, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 32
migrate to new bq client #201
Merged
Merged
Changes from 6 commits
Commits
Show all changes
11 commits
Select commit
Hold shift + click to select a range
9d5f9b8
migrate to new bq client
honnix 6813b4c
fix tests
honnix a6d3c45
ignore guava
honnix ff544bd
ignore guava
honnix ead8180
ignore the right package
honnix 7aac7f7
ignore more
honnix 96bd177
remove unused code
honnix e945f77
javadoc
honnix b0f4e25
downgrade to scio 0.6.1
honnix 0e90477
no missinglink
honnix dd10ab9
Merge pull request #204 from spotify/scio-0.6.1
honnix File filter
Filter by extension
Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,29 +22,25 @@ | |
|
||
import static com.spotify.flo.contrib.bigquery.FloBigQueryClient.randomStagingTableId; | ||
|
||
import com.google.cloud.WaitForOption; | ||
import com.google.cloud.RetryOption; | ||
import com.google.cloud.bigquery.BigQuery; | ||
import com.google.cloud.bigquery.BigQuery.JobOption; | ||
import com.google.cloud.bigquery.BigQueryError; | ||
import com.google.cloud.bigquery.BigQueryException; | ||
import com.google.cloud.bigquery.CopyJobConfiguration; | ||
import com.google.cloud.bigquery.DatasetId; | ||
import com.google.cloud.bigquery.DatasetInfo; | ||
import com.google.cloud.bigquery.FieldValue; | ||
import com.google.cloud.bigquery.FieldValueList; | ||
import com.google.cloud.bigquery.Job; | ||
import com.google.cloud.bigquery.JobInfo; | ||
import com.google.cloud.bigquery.QueryRequest; | ||
import com.google.cloud.bigquery.QueryResponse; | ||
import com.google.cloud.bigquery.Schema; | ||
import com.google.cloud.bigquery.TableId; | ||
import java.util.concurrent.ThreadLocalRandom; | ||
import com.google.cloud.bigquery.TableResult; | ||
import java.util.Iterator; | ||
import java.util.List; | ||
import java.util.Objects; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.TimeoutException; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import org.threeten.bp.Duration; | ||
|
||
class DefaultBigQueryClient implements FloBigQueryClient { | ||
|
||
|
@@ -75,25 +71,6 @@ public TableId createStagingTableId(TableId tableId, String location) { | |
return randomStagingTableId(tableId, location); | ||
} | ||
|
||
@Override | ||
public BigQueryResult query(QueryRequest queryRequest) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we want to bring back this method? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not unless it's needed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not really. Even query is wrapped in a |
||
QueryResponse response = client.query(queryRequest); | ||
while (!response.jobCompleted()) { | ||
try { | ||
Thread.sleep(1000); | ||
} catch (InterruptedException e) { | ||
Thread.currentThread().interrupt(); | ||
throw new RuntimeException(e); | ||
} | ||
response = client.getQueryResults(response.getJobId()); | ||
} | ||
if (response.hasErrors()) { | ||
throw new RuntimeException("BigQuery query failed: " + response.getExecutionErrors()); | ||
} | ||
return DefaultQueryResult.of(response); | ||
|
||
} | ||
|
||
@Override | ||
public JobInfo job(JobInfo jobInfo, JobOption... options) { | ||
Job job = client.create(jobInfo, options); | ||
|
@@ -106,6 +83,7 @@ public JobInfo job(JobInfo jobInfo, JobOption... options) { | |
} | ||
job = job.reload(); | ||
} | ||
|
||
final BigQueryError error = job.getStatus().getError(); | ||
if (error != null) { | ||
throw new BigQueryException(0, "BigQuery job failed: " + error); | ||
|
@@ -119,12 +97,13 @@ public void publish(StagingTableId stagingTableId, TableId tableId) { | |
LOG.debug("copying staging table {} to {}", staging, tableId); | ||
try { | ||
final Job job = client.create(JobInfo.of(CopyJobConfiguration.of(tableId, staging))) | ||
.waitFor(WaitForOption.timeout(1, TimeUnit.MINUTES)); | ||
.waitFor(RetryOption.initialRetryDelay(Duration.ofSeconds(1)), | ||
RetryOption.totalTimeout(Duration.ofMinutes(1L))); | ||
throwIfUnsuccessfulJobStatus(job, tableId); | ||
} catch (BigQueryException e) { | ||
LOG.error("Could not copy BigQuery table {} from staging to target", tableId, e); | ||
throw e; | ||
} catch (InterruptedException | TimeoutException e) { | ||
} catch (InterruptedException e) { | ||
LOG.error("Could not copy BigQuery table {} from staging to target", tableId, e); | ||
throw new RuntimeException(e); | ||
} | ||
|
@@ -151,39 +130,29 @@ private static void throwIfUnsuccessfulJobStatus(Job job, TableId tableId) { | |
|
||
private static class DefaultQueryResult implements BigQueryResult { | ||
|
||
private final QueryResponse response; | ||
private final TableResult result; | ||
|
||
private DefaultQueryResult(QueryResponse response) { | ||
this.response = Objects.requireNonNull(response, "response"); | ||
} | ||
|
||
@Override | ||
public boolean cacheHit() { | ||
return response.getResult().cacheHit(); | ||
private DefaultQueryResult(TableResult result) { | ||
this.result = Objects.requireNonNull(result, "result"); | ||
} | ||
|
||
@Override | ||
public Schema schema() { | ||
return response.getResult().getSchema(); | ||
} | ||
|
||
@Override | ||
public long totalBytesProcessed() { | ||
return response.getResult().getTotalBytesProcessed(); | ||
return result.getSchema(); | ||
} | ||
|
||
@Override | ||
public long totalRows() { | ||
return response.getResult().getTotalRows(); | ||
return result.getTotalRows(); | ||
} | ||
|
||
@Override | ||
public Iterator<List<FieldValue>> iterator() { | ||
return response.getResult().iterateAll().iterator(); | ||
public Iterator<FieldValueList> iterator() { | ||
return result.getValues().iterator(); | ||
} | ||
|
||
public static DefaultQueryResult of(QueryResponse response) { | ||
return new DefaultQueryResult(response); | ||
public static DefaultQueryResult of(TableResult result) { | ||
return new DefaultQueryResult(result); | ||
} | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be javadoc instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed. I just copied Google's sample code. xD