Skip to content

Commit

Permalink
Improve speed when listing columns in BigQuery
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr authored and nineinchnick committed May 24, 2024
1 parent b92cf5b commit 8c16327
Show file tree
Hide file tree
Showing 14 changed files with 1,069 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.FieldValue;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobException;
import com.google.cloud.bigquery.JobInfo;
Expand All @@ -32,6 +33,7 @@
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.TableResult;
import com.google.cloud.http.BaseHttpServiceException;
import com.google.common.base.Suppliers;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableSet;
Expand All @@ -40,6 +42,7 @@
import io.trino.cache.EvictableCacheBuilder;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.RelationColumnsMetadata;
import io.trino.spi.connector.RelationCommentMetadata;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.TableNotFoundException;
Expand Down Expand Up @@ -408,6 +411,34 @@ private static String fullTableName(TableId remoteTableId)
return format("%s.%s.%s", remoteTableId.getProject(), remoteTableId.getDataset(), remoteTableId.getTable());
}

public Stream<RelationColumnsMetadata> listRelationColumnsMetadata(ConnectorSession session, BigQueryClient client, String projectId, String remoteSchemaName)
{
TableResult result = client.executeQuery(session, """
SELECT
table_catalog,
table_schema,
table_name,
array_agg(column_name order by ordinal_position),
array_agg(data_type order by ordinal_position),
FROM %s.INFORMATION_SCHEMA.COLUMNS
GROUP BY table_catalog, table_schema, table_name
""".formatted(quote(remoteSchemaName)));
String schemaName = client.toSchemaName(DatasetId.of(projectId, remoteSchemaName));
return result.streamValues()
.map(row -> {
RemoteTableName remoteTableName = new RemoteTableName(
row.get(0).getStringValue(),
row.get(1).getStringValue(),
row.get(2).getStringValue());
List<String> names = row.get(3).getRepeatedValue().stream().map(FieldValue::getStringValue).collect(toImmutableList());
List<String> types = row.get(4).getRepeatedValue().stream().map(FieldValue::getStringValue).collect(toImmutableList());
verify(names.size() == types.size(), "Mismatched column names and types");
return RelationColumnsMetadata.forTable(
new SchemaTableName(schemaName, remoteTableName.tableName()),
typeManager.convertToTrinoType(names, types, Suppliers.memoize(() -> getTable(remoteTableName.toTableId()))));
});
}

public Stream<RelationCommentMetadata> listRelationCommentMetadata(ConnectorSession session, BigQueryClient client, String schemaName)
{
TableResult result = client.executeQuery(session, """
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import io.trino.spi.connector.InMemoryRecordSet;
import io.trino.spi.connector.ProjectionApplicationResult;
import io.trino.spi.connector.RecordCursor;
import io.trino.spi.connector.RelationColumnsMetadata;
import io.trino.spi.connector.RelationCommentMetadata;
import io.trino.spi.connector.RetryMode;
import io.trino.spi.connector.SaveMode;
Expand Down Expand Up @@ -263,6 +264,50 @@ private List<SchemaTableName> listTablesInRemoteSchema(BigQueryClient client, St
return tableNames.build();
}

@Override
public Iterator<RelationColumnsMetadata> streamRelationColumns(ConnectorSession session, Optional<String> schemaName, UnaryOperator<Set<SchemaTableName>> relationFilter)
{
if (isLegacyMetadataListing) {
return ConnectorMetadata.super.streamRelationColumns(session, schemaName, relationFilter);
}
BigQueryClient client = bigQueryClientFactory.create(session);
String projectId;
List<String> schemaNames;
if (schemaName.isPresent()) {
DatasetId localDatasetId = client.toDatasetId(schemaName.get());
projectId = localDatasetId.getProject();
String remoteSchemaName = getRemoteSchemaName(client, localDatasetId.getProject(), localDatasetId.getDataset());
schemaNames = List.of(remoteSchemaName);
}
else {
projectId = client.getProjectId();
schemaNames = listRemoteSchemaNames(session);
}
Map<SchemaTableName, RelationColumnsMetadata> resultsByName = schemaNames.stream()
.flatMap(schema -> listRelationColumnsMetadata(session, client, schema, projectId))
.collect(toImmutableMap(RelationColumnsMetadata::name, Functions.identity(), (first, _) -> {
log.debug("Filtered out [%s] from list of tables due to ambiguous name", first.name());
return null;
}));
return relationFilter.apply(resultsByName.keySet()).stream()
.map(resultsByName::get)
.iterator();
}

private static Stream<RelationColumnsMetadata> listRelationColumnsMetadata(ConnectorSession session, BigQueryClient client, String schema, String projectId)
{
try {
return client.listRelationColumnsMetadata(session, client, projectId, schema);
}
catch (BigQueryException e) {
if (e.getCode() == 404) {
log.debug("Dataset disappeared during listing operation: %s", schema);
return Stream.empty();
}
throw new TrinoException(BIGQUERY_LISTING_TABLE_ERROR, "Failed to retrieve tables from BigQuery", e);
}
}

@Override
public Iterator<RelationCommentMetadata> streamRelationComments(ConnectorSession session, Optional<String> schemaName, UnaryOperator<Set<SchemaTableName>> relationFilter)
{
Expand All @@ -278,7 +323,7 @@ public Iterator<RelationCommentMetadata> streamRelationComments(ConnectorSession
}).orElseGet(() -> listSchemaNames(session));
Map<SchemaTableName, RelationCommentMetadata> resultsByName = schemaNames.stream()
.flatMap(schema -> listRelationCommentMetadata(session, client, schema))
.collect(toImmutableMap(RelationCommentMetadata::name, Functions.identity(), (first, second) -> {
.collect(toImmutableMap(RelationCommentMetadata::name, Functions.identity(), (first, _) -> {
log.debug("Filtered out [%s] from list of tables due to ambiguous name", first.name());
return null;
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,19 @@
import com.google.cloud.bigquery.FieldList;
import com.google.cloud.bigquery.LegacySQLTypeName;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.TableInfo;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import io.airlift.slice.Slice;
import io.trino.plugin.bigquery.type.ArrayTypeInfo;
import io.trino.plugin.bigquery.type.BigDecimalTypeInfo;
import io.trino.plugin.bigquery.type.DecimalTypeInfo;
import io.trino.plugin.bigquery.type.PrimitiveTypeInfo;
import io.trino.plugin.bigquery.type.TypeInfo;
import io.trino.plugin.bigquery.type.UnsupportedTypeException;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.DecimalType;
import io.trino.spi.type.Decimals;
Expand All @@ -44,14 +52,17 @@
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;

import static com.google.cloud.bigquery.Field.Mode.REPEATED;
import static com.google.cloud.bigquery.StandardSQLTypeName.STRUCT;
import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.bigquery.BigQueryMetadata.DEFAULT_NUMERIC_TYPE_PRECISION;
import static io.trino.plugin.bigquery.BigQueryMetadata.DEFAULT_NUMERIC_TYPE_SCALE;
import static io.trino.plugin.bigquery.type.TypeInfoUtils.parseTypeString;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.BooleanType.BOOLEAN;
Expand All @@ -73,6 +84,8 @@
import static io.trino.spi.type.Timestamps.PICOSECONDS_PER_NANOSECOND;
import static io.trino.spi.type.Timestamps.PICOSECONDS_PER_SECOND;
import static io.trino.spi.type.TinyintType.TINYINT;
import static io.trino.spi.type.TypeSignature.arrayType;
import static io.trino.spi.type.TypeSignatureParameter.typeParameter;
import static io.trino.spi.type.VarbinaryType.VARBINARY;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static io.trino.spi.type.VarcharType.createUnboundedVarcharType;
Expand Down Expand Up @@ -101,12 +114,14 @@ public final class BigQueryTypeManager
private static final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern("''HH:mm:ss.SSSSSS''");
private static final DateTimeFormatter DATETIME_FORMATTER = DateTimeFormatter.ofPattern("uuuu-MM-dd HH:mm:ss.SSSSSS").withZone(UTC);

private final TypeManager typeManager;
private final Type jsonType;

@Inject
public BigQueryTypeManager(TypeManager typeManager)
{
jsonType = requireNonNull(typeManager, "typeManager is null").getType(new TypeSignature(JSON));
this.typeManager = requireNonNull(typeManager, "typeManager is null");
jsonType = typeManager.getType(new TypeSignature(JSON));
}

private RowType.Field toRawTypeField(String name, Field field)
Expand Down Expand Up @@ -225,7 +240,7 @@ private Field toInnerField(String name, Type type, boolean repeated, @Nullable S
{
Field.Builder builder;
if (type instanceof RowType) {
builder = Field.newBuilder(name, StandardSQLTypeName.STRUCT, toFieldList((RowType) type)).setDescription(comment);
builder = Field.newBuilder(name, STRUCT, toFieldList((RowType) type)).setDescription(comment);
}
else {
builder = Field.newBuilder(name, toStandardSqlTypeName(type)).setDescription(comment);
Expand Down Expand Up @@ -283,7 +298,7 @@ private StandardSQLTypeName toStandardSqlTypeName(Type type)
return StandardSQLTypeName.ARRAY;
}
if (type instanceof RowType) {
return StandardSQLTypeName.STRUCT;
return STRUCT;
}
throw new TrinoException(NOT_SUPPORTED, "Unsupported column type: " + type.getDisplayName());
}
Expand Down Expand Up @@ -368,19 +383,84 @@ private Optional<ColumnMapping> convertToTrinoType(Field field)
case JSON:
return Optional.of(new ColumnMapping(jsonType, false));
case STRUCT:
// create the row
FieldList subTypes = field.getSubFields();
checkArgument(!subTypes.isEmpty(), "a record or struct must have sub-fields");
List<RowType.Field> fields = subTypes.stream()
.map(subField -> toRawTypeField(subField.getName(), subField))
.collect(toImmutableList());
RowType rowType = RowType.from(fields);
return Optional.of(new ColumnMapping(rowType, false));
return Optional.of(new ColumnMapping(createRowType(field), false));
default:
return Optional.empty();
}
}

private RowType createRowType(Field field)
{
FieldList subTypes = field.getSubFields();
checkArgument(!subTypes.isEmpty(), "a record or struct must have sub-fields");
List<RowType.Field> fields = subTypes.stream()
.map(subField -> toRawTypeField(subField.getName(), subField))
.collect(toImmutableList());
return RowType.from(fields);
}

public List<ColumnMetadata> convertToTrinoType(List<String> names, List<String> types)
{
return convertToTrinoType(names, types, Optional::empty);
}

public List<ColumnMetadata> convertToTrinoType(List<String> names, List<String> types, Supplier<Optional<TableInfo>> tableSupplier)
{
checkArgument(names.size() == types.size(), "Mismatched column names and types");

ImmutableList.Builder<ColumnMetadata> columns = ImmutableList.builder();
for (int i = 0; i < names.size(); i++) {
String name = names.get(i);
TypeSignature typeSignature;
try {
TypeInfo typeInfo = parseTypeString(types.get(i));
typeSignature = toTypeSignature(typeInfo);
}
catch (UnsupportedTypeException e) {
Optional<TableInfo> table = tableSupplier.get();
if (!e.getTypeName().equals(STRUCT) || table.isEmpty() || table.get().getDefinition().getSchema() == null) {
// ignore unsupported types
continue;
}
typeSignature = createRowType(table.get().getDefinition().getSchema().getFields().get(name)).getTypeSignature();
}
catch (TrinoException | IllegalArgumentException e) {
// ignore unsupported types
continue;
}
columns.add(new ColumnMetadata(name, typeManager.getType(typeSignature)));
}
return columns.build();
}

private TypeSignature toTypeSignature(TypeInfo typeInfo)
{
return switch (typeInfo) {
case DecimalTypeInfo decimalTypeInfo:
yield createDecimalType(decimalTypeInfo.precision(), decimalTypeInfo.scale()).getTypeSignature();
case BigDecimalTypeInfo decimalTypeInfo:
yield createDecimalType(decimalTypeInfo.precision(), decimalTypeInfo.scale()).getTypeSignature();
case PrimitiveTypeInfo primitiveTypeInfo:
Type type = switch (primitiveTypeInfo.getStandardSqlTypeName()) {
case BOOL -> BOOLEAN;
case INT64 -> BIGINT;
case FLOAT64 -> DOUBLE;
case STRING -> VARCHAR;
case BYTES -> VARBINARY;
case DATE -> DATE;
case DATETIME -> TIMESTAMP_MICROS;
case TIMESTAMP -> TIMESTAMP_TZ_MICROS;
case GEOGRAPHY -> VARCHAR;
case JSON -> jsonType;
default -> throw new IllegalArgumentException("Unsupported type: " + primitiveTypeInfo);
};
yield type.getTypeSignature();
case ArrayTypeInfo arrayTypeInfo:
TypeSignature elementType = toTypeSignature(arrayTypeInfo.getListElementTypeInfo());
yield arrayType(typeParameter(elementType));
};
}

public BigQueryColumnHandle toColumnHandle(Field field)
{
FieldList subFields = field.getSubFields();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.bigquery.type;

import static com.google.cloud.bigquery.StandardSQLTypeName.ARRAY;
import static java.util.Objects.requireNonNull;

public final class ArrayTypeInfo
extends TypeInfo
{
private final TypeInfo elementTypeInfo;

ArrayTypeInfo(TypeInfo elementTypeInfo)
{
this.elementTypeInfo = requireNonNull(elementTypeInfo, "elementTypeInfo is null");
}

@Override
public String toString()
{
return ARRAY + "<" + elementTypeInfo + ">";
}

public TypeInfo getListElementTypeInfo()
{
return elementTypeInfo;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.bigquery.type;

import static com.google.cloud.bigquery.StandardSQLTypeName.BIGNUMERIC;
import static com.google.common.base.Preconditions.checkArgument;

public final class BigDecimalTypeInfo
extends PrimitiveTypeInfo
{
private static final int MAX_PRECISION_MINUS_SCALE = 38;
private static final int MAX_SCALE = 38;

private final int precision;
private final int scale;

public BigDecimalTypeInfo(int precision, int scale)
{
super(BIGNUMERIC.name());
this.precision = precision;
this.scale = scale;
checkArgument(scale >= 0 && scale <= MAX_SCALE, "invalid decimal scale: %s", scale);
checkArgument(precision >= 1 && precision <= MAX_PRECISION_MINUS_SCALE + scale, "invalid decimal precision: %s", precision);
checkArgument(scale <= precision, "invalid decimal precision: %s is lower than scale %s", precision, scale);
}

@Override
public String toString()
{
return decimalTypeName(precision, scale);
}

public int precision()
{
return precision;
}

public int scale()
{
return scale;
}

public static String decimalTypeName(int precision, int scale)
{
return BIGNUMERIC.name() + "(" + precision + ", " + scale + ")";
}
}
Loading

0 comments on commit 8c16327

Please sign in to comment.