Skip to content

Commit

Permalink
Allow using filter when materializing BigQuery views
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Apr 15, 2024
1 parent ca3503b commit 29b4590
Show file tree
Hide file tree
Showing 11 changed files with 196 additions and 25 deletions.
1 change: 1 addition & 0 deletions docs/src/main/sphinx/connector/bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ a few caveats:
| `bigquery.view-materialization-project` | The project where the materialized view is going to be created | The view's project |
| `bigquery.view-materialization-dataset` | The dataset where the materialized view is going to be created | The view's dataset |
| `bigquery.skip-view-materialization` | Use REST API to access views instead of Storage API. BigQuery `BIGNUMERIC` and `TIMESTAMP` types are unsupported. | `false` |
| `bigqueryview-materialization-with-filter` | Use filter conditions when materializng views. | `false` |
| `bigquery.views-cache-ttl` | Duration for which the materialization of a view will be cached and reused. Set to `0ms` to disable the cache. | `15m` |
| `bigquery.metadata.cache-ttl` | Duration for which metadata retrieved from BigQuery is cached and reused. Set to `0ms` to disable the cache. | `0ms` |
| `bigquery.max-read-rows-retries` | The number of retries in case of retryable server issues | `3` |
Expand Down
2 changes: 2 additions & 0 deletions plugin/trino-bigquery/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,7 @@
<exclude>**/TestBigQueryArrowTypeMapping.java</exclude>
<exclude>**/TestBigQueryArrowSerialization.java</exclude>
<exclude>**/TestBigQueryMetadata.java</exclude>
<exclude>**/TestBigQuerySplitManager.java</exclude>
<exclude>**/TestBigQueryInstanceCleaner.java</exclude>
<exclude>**/TestBigQueryCaseInsensitiveMapping.java</exclude>
<exclude>**/TestBigQuery*FailureRecoveryTest.java</exclude>
Expand Down Expand Up @@ -602,6 +603,7 @@
<include>**/TestBigQueryAvroTypeMapping.java</include>
<include>**/TestBigQueryMetadataCaching.java</include>
<include>**/TestBigQueryMetadata.java</include>
<include>**/TestBigQuerySplitManager.java</include>
<include>**/TestBigQuery*FailureRecoveryTest.java</include>
<include>**/TestBigQueryInstanceCleaner.java</include>
</includes>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,9 @@ public Optional<TableInfo> getTable(TableId remoteTableId)
return Optional.ofNullable(bigQuery.getTable(remoteTableId));
}

public TableInfo getCachedTable(Duration viewExpiration, TableInfo remoteTableId, List<String> requiredColumns)
public TableInfo getCachedTable(Duration viewExpiration, TableInfo remoteTableId, List<String> requiredColumns, Optional<String> filter)
{
String query = selectSql(remoteTableId, requiredColumns);
String query = selectSql(remoteTableId.getTableId(), requiredColumns, filter);
log.debug("query is %s", query);
return materializationCache.getCachedTable(this, query, viewExpiration, remoteTableId);
}
Expand Down Expand Up @@ -373,14 +373,6 @@ public static String selectSql(TableId table, String formattedColumns, Optional<
return query + " WHERE " + filter.get();
}

private String selectSql(TableInfo remoteTable, List<String> requiredColumns)
{
String columns = requiredColumns.isEmpty() ? "*" :
requiredColumns.stream().map(column -> format("`%s`", column)).collect(joining(","));

return selectSql(remoteTable.getTableId(), columns);
}

// assuming the SELECT part is properly formatted, can be used to call functions such as COUNT and SUM
public String selectSql(TableId table, String formattedColumns)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class BigQueryConfig
private boolean arrowSerializationEnabled;
private Duration viewExpireDuration = new Duration(24, HOURS);
private boolean skipViewMaterialization;
private boolean viewMaterializationWithFilter;
private Optional<String> viewMaterializationProject = Optional.empty();
private Optional<String> viewMaterializationDataset = Optional.empty();
private int maxReadRowsRetries = DEFAULT_MAX_READ_ROWS_RETRIES;
Expand Down Expand Up @@ -153,6 +154,19 @@ public BigQueryConfig setSkipViewMaterialization(boolean skipViewMaterialization
return this;
}

public boolean isViewMaterializationWithFilter()
{
return viewMaterializationWithFilter;
}

@Config("bigquery.view-materialization-with-filter")
@ConfigDescription("Use filter when materializing views")
public BigQueryConfig setViewMaterializationWithFilter(boolean viewMaterializationWithFilter)
{
this.viewMaterializationWithFilter = viewMaterializationWithFilter;
return this;
}

public Optional<String> getViewMaterializationProject()
{
return viewMaterializationProject;
Expand Down Expand Up @@ -332,5 +346,8 @@ public void validate()
if (skipViewMaterialization) {
checkState(viewsEnabled, "%s config property must be enabled when skipping view materialization", VIEWS_ENABLED);
}
if (viewMaterializationWithFilter) {
checkState(viewsEnabled, "%s config property must be enabled when view materialization with filter is enabled", VIEWS_ENABLED);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public final class BigQuerySessionProperties
implements SessionPropertiesProvider
{
private static final String SKIP_VIEW_MATERIALIZATION = "skip_view_materialization";
private static final String VIEW_MATERIALIZATION_WITH_FILTER = "view_materialization_with_filter";
private static final String QUERY_RESULTS_CACHE_ENABLED = "query_results_cache_enabled";
private static final String CREATE_DISPOSITION_TYPE = "create_disposition_type";

Expand All @@ -43,6 +44,11 @@ public BigQuerySessionProperties(BigQueryConfig config)
"Skip materializing views",
config.isSkipViewMaterialization(),
false))
.add(booleanProperty(
VIEW_MATERIALIZATION_WITH_FILTER,
"Materialize views with filters",
config.isViewMaterializationWithFilter(),
false))
.add(booleanProperty(
QUERY_RESULTS_CACHE_ENABLED,
"Enable query results cache",
Expand All @@ -68,6 +74,11 @@ public static boolean isSkipViewMaterialization(ConnectorSession session)
return session.getProperty(SKIP_VIEW_MATERIALIZATION, Boolean.class);
}

public static boolean isViewMaterializationWithFilter(ConnectorSession session)
{
return session.getProperty(VIEW_MATERIALIZATION_WITH_FILTER, Boolean.class);
}

public static boolean isQueryResultsCacheEnabled(ConnectorSession session)
{
return session.getProperty(QUERY_RESULTS_CACHE_ENABLED, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class BigQuerySplit

private final Mode mode;
private final String streamName;
private final Optional<String> iamResourceName;
private final String schemaString;
private final List<BigQueryColumnHandle> columns;
private final long emptyRowsToGenerate;
Expand All @@ -52,6 +53,7 @@ public class BigQuerySplit
public BigQuerySplit(
@JsonProperty("mode") Mode mode,
@JsonProperty("streamName") String streamName,
@JsonProperty("iamResourceName") Optional<String> iamResourceName,
@JsonProperty("schemaString") String schemaString,
@JsonProperty("columns") List<BigQueryColumnHandle> columns,
@JsonProperty("emptyRowsToGenerate") long emptyRowsToGenerate,
Expand All @@ -60,26 +62,27 @@ public BigQuerySplit(
{
this.mode = requireNonNull(mode, "mode is null");
this.streamName = requireNonNull(streamName, "streamName cannot be null");
this.iamResourceName = requireNonNull(iamResourceName, "iamResourceName is null");
this.schemaString = requireNonNull(schemaString, "schemaString cannot be null");
this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns cannot be null"));
this.emptyRowsToGenerate = emptyRowsToGenerate;
this.filter = requireNonNull(filter, "filter is null");
this.dataSize = requireNonNull(dataSize, "dataSize is null");
}

static BigQuerySplit forStream(String streamName, String schemaString, List<BigQueryColumnHandle> columns, OptionalInt dataSize)
static BigQuerySplit forStream(String streamName, String iamResourceName, String schemaString, List<BigQueryColumnHandle> columns, OptionalInt dataSize)
{
return new BigQuerySplit(STORAGE, streamName, schemaString, columns, NO_ROWS_TO_GENERATE, Optional.empty(), dataSize);
return new BigQuerySplit(STORAGE, streamName, Optional.of(iamResourceName), schemaString, columns, NO_ROWS_TO_GENERATE, Optional.empty(), dataSize);
}

static BigQuerySplit forViewStream(List<BigQueryColumnHandle> columns, Optional<String> filter)
static BigQuerySplit forViewStream(Optional<String> iamResourceName, List<BigQueryColumnHandle> columns, Optional<String> filter)
{
return new BigQuerySplit(QUERY, "", "", columns, NO_ROWS_TO_GENERATE, filter, OptionalInt.empty());
return new BigQuerySplit(QUERY, "", iamResourceName, "", columns, NO_ROWS_TO_GENERATE, filter, OptionalInt.empty());
}

static BigQuerySplit emptyProjection(long numberOfRows)
{
return new BigQuerySplit(STORAGE, "", "", ImmutableList.of(), numberOfRows, Optional.empty(), OptionalInt.of(0));
return new BigQuerySplit(STORAGE, "", Optional.empty(), "", ImmutableList.of(), numberOfRows, Optional.empty(), OptionalInt.of(0));
}

@JsonProperty
Expand All @@ -94,6 +97,12 @@ public String getStreamName()
return streamName;
}

@JsonProperty
public Optional<String> getIamResourceName()
{
return iamResourceName;
}

@JsonProperty
public String getSchemaString()
{
Expand Down Expand Up @@ -155,6 +164,7 @@ public boolean equals(Object o)
BigQuerySplit that = (BigQuerySplit) o;
return Objects.equals(mode, that.mode) &&
Objects.equals(streamName, that.streamName) &&
Objects.equals(iamResourceName, that.iamResourceName) &&
Objects.equals(schemaString, that.schemaString) &&
Objects.equals(columns, that.columns) &&
Objects.equals(emptyRowsToGenerate, that.emptyRowsToGenerate);
Expand All @@ -163,7 +173,7 @@ public boolean equals(Object o)
@Override
public int hashCode()
{
return Objects.hash(mode, streamName, schemaString, columns, emptyRowsToGenerate);
return Objects.hash(mode, streamName, iamResourceName, schemaString, columns, emptyRowsToGenerate);
}

@Override
Expand All @@ -172,6 +182,7 @@ public String toString()
return toStringHelper(this)
.add("mode", mode)
.add("streamName", streamName)
.add("iamResourceName", iamResourceName)
.add("schemaString", schemaString)
.add("columns", columns)
.add("emptyRowsToGenerate", emptyRowsToGenerate)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public ConnectorSplitSource getSplits(

if (!bigQueryTableHandle.isNamedRelation()) {
List<BigQueryColumnHandle> columns = bigQueryTableHandle.projectedColumns().orElse(ImmutableList.of());
return new FixedSplitSource(BigQuerySplit.forViewStream(columns, filter));
return new FixedSplitSource(BigQuerySplit.forViewStream(Optional.empty(), columns, filter));
}

TableId remoteTableId = bigQueryTableHandle.asPlainTable().getRemoteTableName().toTableId();
Expand All @@ -132,15 +132,15 @@ private List<BigQuerySplit> readFromBigQuery(ConnectorSession session, TableDefi

if (isWildcardTable(type, remoteTableId.getTable())) {
// Storage API doesn't support reading wildcard tables
return ImmutableList.of(BigQuerySplit.forViewStream(columns, filter));
return ImmutableList.of(BigQuerySplit.forViewStream(Optional.of(remoteTableId.getIAMResourceName()), columns, filter));
}
if (type == EXTERNAL) {
// Storage API doesn't support reading external tables
return ImmutableList.of(BigQuerySplit.forViewStream(columns, filter));
return ImmutableList.of(BigQuerySplit.forViewStream(Optional.of(remoteTableId.getIAMResourceName()), columns, filter));
}
if (type == VIEW || type == MATERIALIZED_VIEW) {
if (isSkipViewMaterialization(session)) {
return ImmutableList.of(BigQuerySplit.forViewStream(columns, filter));
return ImmutableList.of(BigQuerySplit.forViewStream(Optional.of(remoteTableId.getIAMResourceName()), columns, filter));
}
tableConstraint.getDomains().ifPresent(domains -> domains.keySet().stream()
.map(column -> ((BigQueryColumnHandle) column).name())
Expand All @@ -151,7 +151,7 @@ private List<BigQuerySplit> readFromBigQuery(ConnectorSession session, TableDefi
ReadSession readSession = readSessionCreator.create(session, remoteTableId, ImmutableList.copyOf(projectedColumnsNames), filter, actualParallelism);

return readSession.getStreamsList().stream()
.map(stream -> BigQuerySplit.forStream(stream.getName(), readSessionCreator.getSchemaAsString(readSession), columns, OptionalInt.of(stream.getSerializedSize())))
.map(stream -> BigQuerySplit.forStream(stream.getName(), readSession.getTable(), readSessionCreator.getSchemaAsString(readSession), columns, OptionalInt.of(stream.getSerializedSize())))
.collect(toImmutableList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import static com.google.cloud.bigquery.TableDefinition.Type.TABLE;
import static com.google.cloud.bigquery.TableDefinition.Type.VIEW;
import static com.google.cloud.bigquery.storage.v1.ArrowSerializationOptions.CompressionCodec.ZSTD;
import static io.trino.plugin.bigquery.BigQuerySessionProperties.isViewMaterializationWithFilter;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.lang.String.format;
import static java.time.temporal.ChronoUnit.MILLIS;
Expand Down Expand Up @@ -83,7 +84,7 @@ public ReadSession create(ConnectorSession session, TableId remoteTable, List<St
TableInfo tableDetails = client.getTable(remoteTable)
.orElseThrow(() -> new TableNotFoundException(new SchemaTableName(remoteTable.getDataset(), remoteTable.getTable())));

TableInfo actualTable = getActualTable(client, tableDetails, selectedFields);
TableInfo actualTable = getActualTable(client, tableDetails, selectedFields, isViewMaterializationWithFilter(session) ? filter : Optional.empty());

List<String> filteredSelectedFields = selectedFields.stream()
.map(BigQueryUtil::toBigQueryColumnName)
Expand Down Expand Up @@ -146,7 +147,8 @@ String toTableResourceName(TableId tableId)
private TableInfo getActualTable(
BigQueryClient client,
TableInfo remoteTable,
List<String> requiredColumns)
List<String> requiredColumns,
Optional<String> filter)
{
TableDefinition tableDefinition = remoteTable.getDefinition();
TableDefinition.Type tableType = tableDefinition.getType();
Expand All @@ -160,7 +162,7 @@ private TableInfo getActualTable(
BigQueryConfig.VIEWS_ENABLED));
}
// get it from the view
return client.getCachedTable(viewExpiration, remoteTable, requiredColumns);
return client.getCachedTable(viewExpiration, remoteTable, requiredColumns, filter);
}
// Storage API doesn't support reading other table types (materialized views, external)
throw new TrinoException(NOT_SUPPORTED, format("Table type '%s' of table '%s.%s' is not supported",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public class ViewMaterializationCache
{
private static final Logger log = Logger.get(ViewMaterializationCache.class);

public static final String TEMP_TABLE_PREFIX = "_pbc_";

private final NonEvictableCache<String, TableInfo> destinationTableCache;
private final Optional<String> viewMaterializationProject;
private final Optional<String> viewMaterializationDataset;
Expand All @@ -70,7 +72,7 @@ private TableId createDestinationTable(TableId remoteTableId)
String project = viewMaterializationProject.orElseGet(remoteTableId::getProject);
String dataset = viewMaterializationDataset.orElseGet(remoteTableId::getDataset);

String name = format("_pbc_%s", randomUUID().toString().toLowerCase(ENGLISH).replace("-", ""));
String name = format("%s%s", TEMP_TABLE_PREFIX, randomUUID().toString().toLowerCase(ENGLISH).replace("-", ""));
return TableId.of(project, dataset, name);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public void testDefaults()
.setParallelism(null)
.setViewExpireDuration(new Duration(24, HOURS))
.setSkipViewMaterialization(false)
.setViewMaterializationWithFilter(false)
.setViewMaterializationProject(null)
.setViewMaterializationDataset(null)
.setMaxReadRowsRetries(3)
Expand Down Expand Up @@ -66,6 +67,7 @@ public void testExplicitPropertyMappingsWithCredentialsKey()
.put("bigquery.experimental.arrow-serialization.enabled", "true")
.put("bigquery.view-expire-duration", "30m")
.put("bigquery.skip-view-materialization", "true")
.put("bigquery.view-materialization-with-filter", "true")
.put("bigquery.view-materialization-project", "vmproject")
.put("bigquery.view-materialization-dataset", "vmdataset")
.put("bigquery.max-read-rows-retries", "10")
Expand All @@ -88,6 +90,7 @@ public void testExplicitPropertyMappingsWithCredentialsKey()
.setArrowSerializationEnabled(true)
.setViewExpireDuration(new Duration(30, MINUTES))
.setSkipViewMaterialization(true)
.setViewMaterializationWithFilter(true)
.setViewMaterializationProject("vmproject")
.setViewMaterializationDataset("vmdataset")
.setMaxReadRowsRetries(10)
Expand Down Expand Up @@ -120,5 +123,12 @@ public void testInvalidViewSetting()
.validate())
.isInstanceOf(IllegalStateException.class)
.hasMessage("bigquery.views-enabled config property must be enabled when skipping view materialization");

assertThatThrownBy(() -> new BigQueryConfig()
.setViewMaterializationWithFilter(true)
.setViewsEnabled(false)
.validate())
.isInstanceOf(IllegalStateException.class)
.hasMessage("bigquery.views-enabled config property must be enabled when view materialization with filter is enabled");
}
}

0 comments on commit 29b4590

Please sign in to comment.