Skip to content

Commit

Permalink
Update BigQuery storage protos to v1
Browse files Browse the repository at this point in the history
Update BigQuery storage protos to v1

Update plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySplitManager.java

Co-authored-by: Yuya Ebihara <ebyhry@gmail.com>

Update plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSourceProvider.java

Co-authored-by: Yuya Ebihara <ebyhry@gmail.com>

Update plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/ReadSessionCreator.java

Co-authored-by: Yuya Ebihara <ebyhry@gmail.com>

Fix code review feedback: rename member variable

Revert unneeded pom.xml changes
  • Loading branch information
kmjung authored and hashhar committed Nov 24, 2021
1 parent 5b90642 commit 24afd8d
Show file tree
Hide file tree
Showing 10 changed files with 76 additions and 102 deletions.
2 changes: 1 addition & 1 deletion plugin/trino-bigquery/pom.xml
Expand Up @@ -102,7 +102,7 @@

<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>proto-google-cloud-bigquerystorage-v1beta1</artifactId>
<artifactId>proto-google-cloud-bigquerystorage-v1</artifactId>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
Expand Down
Expand Up @@ -43,7 +43,7 @@ public static HeaderProvider createHeaderProvider(NodeManager nodeManager)
public void configure(Binder binder)
{
// BigQuery related
binder.bind(BigQueryStorageClientFactory.class).in(Scopes.SINGLETON);
binder.bind(BigQueryReadClientFactory.class).in(Scopes.SINGLETON);

// Connector implementation
binder.bind(BigQueryConnector.class).in(Scopes.SINGLETON);
Expand Down
Expand Up @@ -36,13 +36,13 @@ public class BigQueryPageSourceProvider
{
private static final Logger log = Logger.get(BigQueryPageSourceProvider.class);

private final BigQueryStorageClientFactory bigQueryStorageClientFactory;
private final BigQueryReadClientFactory bigQueryReadClientFactory;
private final int maxReadRowsRetries;

@Inject
public BigQueryPageSourceProvider(BigQueryStorageClientFactory bigQueryStorageClientFactory, BigQueryConfig config)
public BigQueryPageSourceProvider(BigQueryReadClientFactory bigQueryReadClientFactory, BigQueryConfig config)
{
this.bigQueryStorageClientFactory = requireNonNull(bigQueryStorageClientFactory, "bigQueryStorageClientFactory is null");
this.bigQueryReadClientFactory = requireNonNull(bigQueryReadClientFactory, "bigQueryReadClientFactory is null");
this.maxReadRowsRetries = requireNonNull(config, "config is null").getMaxReadRowsRetries();
}

Expand Down Expand Up @@ -71,6 +71,6 @@ public ConnectorPageSource createPageSource(
.map(BigQueryColumnHandle.class::cast)
.collect(toImmutableList());

return new BigQueryResultPageSource(bigQueryStorageClientFactory, maxReadRowsRetries, bigQuerySplit, bigQueryColumnHandles);
return new BigQueryResultPageSource(bigQueryReadClientFactory, maxReadRowsRetries, bigQuerySplit, bigQueryColumnHandles);
}
}
Expand Up @@ -16,8 +16,8 @@
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.auth.Credentials;
import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageClient;
import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageSettings;
import com.google.cloud.bigquery.storage.v1.BigQueryReadClient;
import com.google.cloud.bigquery.storage.v1.BigQueryReadSettings;

import javax.inject.Inject;

Expand All @@ -30,32 +30,32 @@
* https://github.com/google/guice/wiki/Avoid-Injecting-Closable-Resources), this factory creates
* short lived clients that can be closed independently.
*/
public class BigQueryStorageClientFactory
public class BigQueryReadClientFactory
{
private final Optional<Credentials> credentials;
private final HeaderProvider headerProvider;

@Inject
public BigQueryStorageClientFactory(BigQueryCredentialsSupplier bigQueryCredentialsSupplier, HeaderProvider headerProvider)
public BigQueryReadClientFactory(BigQueryCredentialsSupplier bigQueryCredentialsSupplier, HeaderProvider headerProvider)
{
this.credentials = bigQueryCredentialsSupplier.getCredentials();
this.headerProvider = headerProvider;
}

BigQueryStorageClient createBigQueryStorageClient()
BigQueryReadClient createBigQueryReadClient()
{
try {
BigQueryStorageSettings.Builder clientSettings = BigQueryStorageSettings.newBuilder()
BigQueryReadSettings.Builder clientSettings = BigQueryReadSettings.newBuilder()
.setTransportChannelProvider(
BigQueryStorageSettings.defaultGrpcTransportProviderBuilder()
BigQueryReadSettings.defaultGrpcTransportProviderBuilder()
.setHeaderProvider(headerProvider)
.build());
credentials.ifPresent(credentials ->
clientSettings.setCredentialsProvider(FixedCredentialsProvider.create(credentials)));
return BigQueryStorageClient.create(clientSettings.build());
return BigQueryReadClient.create(clientSettings.build());
}
catch (IOException e) {
throw new UncheckedIOException("Error creating BigQueryStorageClient", e);
throw new UncheckedIOException("Error creating BigQueryReadClient", e);
}
}
}
Expand Up @@ -13,8 +13,8 @@
*/
package io.trino.plugin.bigquery;

import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageClient;
import com.google.cloud.bigquery.storage.v1beta1.Storage;
import com.google.cloud.bigquery.storage.v1.BigQueryReadClient;
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
import io.airlift.log.Logger;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
Expand Down Expand Up @@ -82,21 +82,21 @@ public class BigQueryResultPageSource

private static final AvroDecimalConverter DECIMAL_CONVERTER = new AvroDecimalConverter();

private final BigQueryStorageClient bigQueryStorageClient;
private final BigQueryReadClient bigQueryReadClient;
private final BigQuerySplit split;
private final List<String> columnNames;
private final List<Type> columnTypes;
private final AtomicLong readBytes;
private final PageBuilder pageBuilder;
private final Iterator<Storage.ReadRowsResponse> responses;
private final Iterator<ReadRowsResponse> responses;

public BigQueryResultPageSource(
BigQueryStorageClientFactory bigQueryStorageClientFactory,
BigQueryReadClientFactory bigQueryReadClientFactory,
int maxReadRowsRetries,
BigQuerySplit split,
List<BigQueryColumnHandle> columns)
{
this.bigQueryStorageClient = requireNonNull(bigQueryStorageClientFactory, "bigQueryStorageClientFactory is null").createBigQueryStorageClient();
this.bigQueryReadClient = requireNonNull(bigQueryReadClientFactory, "bigQueryReadClientFactory is null").createBigQueryReadClient();
this.split = requireNonNull(split, "split is null");
this.readBytes = new AtomicLong();
requireNonNull(columns, "columns is null");
Expand All @@ -109,11 +109,7 @@ public BigQueryResultPageSource(
this.pageBuilder = new PageBuilder(columnTypes);

log.debug("Starting to read from %s", split.getStreamName());
Storage.ReadRowsRequest.Builder readRowsRequest = Storage.ReadRowsRequest.newBuilder()
.setReadPosition(Storage.StreamPosition.newBuilder()
.setStream(Storage.Stream.newBuilder()
.setName(split.getStreamName())));
responses = new ReadRowsHelper(bigQueryStorageClient, readRowsRequest, maxReadRowsRetries).readRows();
responses = new ReadRowsHelper(bigQueryReadClient, split.getStreamName(), maxReadRowsRetries).readRows();
}

@Override
Expand All @@ -138,7 +134,7 @@ public boolean isFinished()
public Page getNextPage()
{
checkState(pageBuilder.isEmpty(), "PageBuilder is not empty at the beginning of a new page");
Storage.ReadRowsResponse response = responses.next();
ReadRowsResponse response = responses.next();
Iterable<GenericRecord> records = parse(response);
for (GenericRecord record : records) {
pageBuilder.declarePosition();
Expand Down Expand Up @@ -280,10 +276,10 @@ public long getSystemMemoryUsage()
@Override
public void close()
{
bigQueryStorageClient.close();
bigQueryReadClient.close();
}

Iterable<GenericRecord> parse(Storage.ReadRowsResponse response)
Iterable<GenericRecord> parse(ReadRowsResponse response)
{
byte[] buffer = response.getAvroRows().getSerializedBinaryRows().toByteArray();
readBytes.addAndGet(buffer.length);
Expand Down
Expand Up @@ -17,7 +17,7 @@
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.TableResult;
import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadSession;
import com.google.cloud.bigquery.storage.v1.ReadSession;
import com.google.common.collect.ImmutableList;
import io.airlift.log.Logger;
import io.airlift.units.Duration;
Expand Down Expand Up @@ -56,7 +56,7 @@ public class BigQuerySplitManager
private static final Logger log = Logger.get(BigQuerySplitManager.class);

private final BigQueryClient bigQueryClient;
private final BigQueryStorageClientFactory bigQueryStorageClientFactory;
private final BigQueryReadClientFactory bigQueryReadClientFactory;
private final OptionalInt parallelism;
private final boolean viewEnabled;
private final Duration viewExpiration;
Expand All @@ -66,13 +66,13 @@ public class BigQuerySplitManager
public BigQuerySplitManager(
BigQueryConfig config,
BigQueryClient bigQueryClient,
BigQueryStorageClientFactory bigQueryStorageClientFactory,
BigQueryReadClientFactory bigQueryReadClientFactory,
NodeManager nodeManager)
{
requireNonNull(config, "config cannot be null");

this.bigQueryClient = requireNonNull(bigQueryClient, "bigQueryClient cannot be null");
this.bigQueryStorageClientFactory = requireNonNull(bigQueryStorageClientFactory, "bigQueryStorageClientFactory cannot be null");
this.bigQueryReadClientFactory = requireNonNull(bigQueryReadClientFactory, "bigQueryReadClientFactory cannot be null");
this.parallelism = config.getParallelism();
this.viewEnabled = config.isViewsEnabled();
this.viewExpiration = config.getViewExpiration();
Expand Down Expand Up @@ -113,7 +113,7 @@ private List<BigQuerySplit> readFromBigQuery(TableId remoteTableId, Optional<Lis
.map(column -> ((BigQueryColumnHandle) column).getName())
.collect(toImmutableList());

ReadSession readSession = new ReadSessionCreator(bigQueryClient, bigQueryStorageClientFactory, viewEnabled, viewExpiration)
ReadSession readSession = new ReadSessionCreator(bigQueryClient, bigQueryReadClientFactory, viewEnabled, viewExpiration)
.create(remoteTableId, projectedColumnsNames, filter, actualParallelism);

return readSession.getStreamsList().stream()
Expand Down
Expand Up @@ -13,10 +13,9 @@
*/
package io.trino.plugin.bigquery;

import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageClient;
import com.google.cloud.bigquery.storage.v1beta1.Storage;
import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadRowsRequest;
import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadRowsResponse;
import com.google.cloud.bigquery.storage.v1.BigQueryReadClient;
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;

import java.util.Iterator;
import java.util.NoSuchElementException;
Expand All @@ -25,28 +24,31 @@

public class ReadRowsHelper
{
private final BigQueryStorageClient client;
private final ReadRowsRequest.Builder request;
private final BigQueryReadClient client;
private final String streamName;
private final int maxReadRowsRetries;

public ReadRowsHelper(BigQueryStorageClient client, ReadRowsRequest.Builder request, int maxReadRowsRetries)
public ReadRowsHelper(BigQueryReadClient client, String streamName, int maxReadRowsRetries)
{
this.client = requireNonNull(client, "client cannot be null");
this.request = requireNonNull(request, "request cannot be null");
this.streamName = requireNonNull(streamName, "streamName cannot be null");
this.maxReadRowsRetries = maxReadRowsRetries;
}

public Iterator<ReadRowsResponse> readRows()
{
Iterator<ReadRowsResponse> serverResponses = fetchResponses(request);
return new ReadRowsIterator(this, request.getReadPositionBuilder(), serverResponses);
Iterator<ReadRowsResponse> serverResponses = fetchResponses(0);
return new ReadRowsIterator(this, serverResponses);
}

// In order to enable testing
protected Iterator<ReadRowsResponse> fetchResponses(ReadRowsRequest.Builder readRowsRequest)
protected Iterator<ReadRowsResponse> fetchResponses(long offset)
{
return client.readRowsCallable()
.call(readRowsRequest.build())
.call(ReadRowsRequest.newBuilder()
.setReadStream(streamName)
.setOffset(offset)
.build())
.iterator();
}

Expand All @@ -55,18 +57,15 @@ private static class ReadRowsIterator
implements Iterator<ReadRowsResponse>
{
private final ReadRowsHelper helper;
private final Storage.StreamPosition.Builder readPosition;
private long nextOffset;
private Iterator<ReadRowsResponse> serverResponses;
private long readRowsCount;
private int retries;

public ReadRowsIterator(
ReadRowsHelper helper,
Storage.StreamPosition.Builder readPosition,
Iterator<ReadRowsResponse> serverResponses)
{
this.helper = helper;
this.readPosition = readPosition;
this.serverResponses = serverResponses;
}

Expand All @@ -82,14 +81,13 @@ public ReadRowsResponse next()
do {
try {
ReadRowsResponse response = serverResponses.next();
readRowsCount += response.getRowCount();
nextOffset += response.getRowCount();
return response;
}
catch (Exception e) {
// if relevant, retry the read, from the last read position
if (BigQueryUtil.isRetryable(e) && retries < helper.maxReadRowsRetries) {
serverResponses = helper.fetchResponses(helper.request.setReadPosition(
readPosition.setOffset(readRowsCount)));
serverResponses = helper.fetchResponses(nextOffset);
retries++;
}
else {
Expand Down
Expand Up @@ -16,10 +16,10 @@
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageClient;
import com.google.cloud.bigquery.storage.v1beta1.ReadOptions;
import com.google.cloud.bigquery.storage.v1beta1.Storage;
import com.google.cloud.bigquery.storage.v1beta1.TableReferenceProto;
import com.google.cloud.bigquery.storage.v1.BigQueryReadClient;
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
import com.google.cloud.bigquery.storage.v1.DataFormat;
import com.google.cloud.bigquery.storage.v1.ReadSession;
import io.airlift.units.Duration;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.SchemaTableName;
Expand All @@ -36,23 +36,23 @@
public class ReadSessionCreator
{
private final BigQueryClient bigQueryClient;
private final BigQueryStorageClientFactory bigQueryStorageClientFactory;
private final BigQueryReadClientFactory bigQueryReadClientFactory;
private final boolean viewEnabled;
private final Duration viewExpiration;

public ReadSessionCreator(
BigQueryClient bigQueryClient,
BigQueryStorageClientFactory bigQueryStorageClientFactory,
BigQueryReadClientFactory bigQueryReadClientFactory,
boolean viewEnabled,
Duration viewExpiration)
{
this.bigQueryClient = bigQueryClient;
this.bigQueryStorageClientFactory = bigQueryStorageClientFactory;
this.bigQueryReadClientFactory = bigQueryReadClientFactory;
this.viewEnabled = viewEnabled;
this.viewExpiration = viewExpiration;
}

public Storage.ReadSession create(TableId remoteTable, List<String> selectedFields, Optional<String> filter, int parallelism)
public ReadSession create(TableId remoteTable, List<String> selectedFields, Optional<String> filter, int parallelism)
{
TableInfo tableDetails = bigQueryClient.getTable(remoteTable)
.orElseThrow(() -> new TableNotFoundException(new SchemaTableName(remoteTable.getDataset(), remoteTable.getTable())));
Expand All @@ -63,36 +63,28 @@ public Storage.ReadSession create(TableId remoteTable, List<String> selectedFiel
.filter(BigQueryUtil::validColumnName)
.collect(toList());

try (BigQueryStorageClient bigQueryStorageClient = bigQueryStorageClientFactory.createBigQueryStorageClient()) {
ReadOptions.TableReadOptions.Builder readOptions = ReadOptions.TableReadOptions.newBuilder()
try (BigQueryReadClient bigQueryReadClient = bigQueryReadClientFactory.createBigQueryReadClient()) {
ReadSession.TableReadOptions.Builder readOptions = ReadSession.TableReadOptions.newBuilder()
.addAllSelectedFields(filteredSelectedFields);
filter.ifPresent(readOptions::setRowRestriction);

TableReferenceProto.TableReference tableReference = toTableReference(actualTable.getTableId());

Storage.ReadSession readSession = bigQueryStorageClient.createReadSession(
Storage.CreateReadSessionRequest.newBuilder()
ReadSession readSession = bigQueryReadClient.createReadSession(
CreateReadSessionRequest.newBuilder()
.setParent("projects/" + bigQueryClient.getProjectId())
.setFormat(Storage.DataFormat.AVRO)
.setRequestedStreams(parallelism)
.setReadOptions(readOptions)
.setTableReference(tableReference)
// The BALANCED sharding strategy causes the server to
// assign roughly the same number of rows to each stream.
.setShardingStrategy(Storage.ShardingStrategy.BALANCED)
.setReadSession(ReadSession.newBuilder()
.setDataFormat(DataFormat.AVRO)
.setTable(toTableResourceName(actualTable.getTableId()))
.setReadOptions(readOptions))
.setMaxStreamCount(parallelism)
.build());

return readSession;
}
}

TableReferenceProto.TableReference toTableReference(TableId tableId)
String toTableResourceName(TableId tableId)
{
return TableReferenceProto.TableReference.newBuilder()
.setProjectId(tableId.getProject())
.setDatasetId(tableId.getDataset())
.setTableId(tableId.getTable())
.build();
return format("projects/%s/datasets/%s/tables/%s", tableId.getProject(), tableId.getDataset(), tableId.getTable());
}

private TableInfo getActualTable(
Expand Down
Expand Up @@ -13,7 +13,7 @@
*/
package io.trino.plugin.bigquery;

import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadRowsResponse;
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;

import java.util.Iterator;
import java.util.LinkedList;
Expand Down

0 comments on commit 24afd8d

Please sign in to comment.