Skip to content

Commit

Permalink
Allow using filter when materializing BigQuery views
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr authored and wendigo committed Apr 17, 2024
1 parent 99647b1 commit 0329c4c
Show file tree
Hide file tree
Showing 10 changed files with 184 additions and 16 deletions.
1 change: 1 addition & 0 deletions docs/src/main/sphinx/connector/bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ a few caveats:
| `bigquery.view-materialization-project` | The project where the materialized view is going to be created | The view's project |
| `bigquery.view-materialization-dataset` | The dataset where the materialized view is going to be created | The view's dataset |
| `bigquery.skip-view-materialization` | Use REST API to access views instead of Storage API. BigQuery `BIGNUMERIC` and `TIMESTAMP` types are unsupported. | `false` |
| `bigqueryview-materialization-with-filter` | Use filter conditions when materializng views. | `false` |
| `bigquery.views-cache-ttl` | Duration for which the materialization of a view will be cached and reused. Set to `0ms` to disable the cache. | `15m` |
| `bigquery.metadata.cache-ttl` | Duration for which metadata retrieved from BigQuery is cached and reused. Set to `0ms` to disable the cache. | `0ms` |
| `bigquery.max-read-rows-retries` | The number of retries in case of retryable server issues | `3` |
Expand Down
2 changes: 2 additions & 0 deletions plugin/trino-bigquery/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,7 @@
<exclude>**/TestBigQueryArrowTypeMapping.java</exclude>
<exclude>**/TestBigQueryArrowSerialization.java</exclude>
<exclude>**/TestBigQueryMetadata.java</exclude>
<exclude>**/TestBigQuerySplitManager.java</exclude>
<exclude>**/TestBigQueryInstanceCleaner.java</exclude>
<exclude>**/TestBigQueryCaseInsensitiveMapping.java</exclude>
<exclude>**/TestBigQuery*FailureRecoveryTest.java</exclude>
Expand Down Expand Up @@ -602,6 +603,7 @@
<include>**/TestBigQueryAvroTypeMapping.java</include>
<include>**/TestBigQueryMetadataCaching.java</include>
<include>**/TestBigQueryMetadata.java</include>
<include>**/TestBigQuerySplitManager.java</include>
<include>**/TestBigQuery*FailureRecoveryTest.java</include>
<include>**/TestBigQueryInstanceCleaner.java</include>
</includes>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,9 @@ public Optional<TableInfo> getTable(TableId remoteTableId)
return Optional.ofNullable(bigQuery.getTable(remoteTableId));
}

public TableInfo getCachedTable(Duration viewExpiration, TableInfo remoteTableId, List<String> requiredColumns)
public TableInfo getCachedTable(Duration viewExpiration, TableInfo remoteTableId, List<String> requiredColumns, Optional<String> filter)
{
String query = selectSql(remoteTableId, requiredColumns);
String query = selectSql(remoteTableId.getTableId(), requiredColumns, filter);
log.debug("query is %s", query);
return materializationCache.getCachedTable(this, query, viewExpiration, remoteTableId);
}
Expand Down Expand Up @@ -373,14 +373,6 @@ public static String selectSql(TableId table, String formattedColumns, Optional<
return query + " WHERE " + filter.get();
}

private String selectSql(TableInfo remoteTable, List<String> requiredColumns)
{
String columns = requiredColumns.isEmpty() ? "*" :
requiredColumns.stream().map(column -> format("`%s`", column)).collect(joining(","));

return selectSql(remoteTable.getTableId(), columns);
}

// assuming the SELECT part is properly formatted, can be used to call functions such as COUNT and SUM
public String selectSql(TableId table, String formattedColumns)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class BigQueryConfig
private boolean arrowSerializationEnabled;
private Duration viewExpireDuration = new Duration(24, HOURS);
private boolean skipViewMaterialization;
private boolean viewMaterializationWithFilter;
private Optional<String> viewMaterializationProject = Optional.empty();
private Optional<String> viewMaterializationDataset = Optional.empty();
private int maxReadRowsRetries = DEFAULT_MAX_READ_ROWS_RETRIES;
Expand Down Expand Up @@ -155,6 +156,19 @@ public BigQueryConfig setSkipViewMaterialization(boolean skipViewMaterialization
return this;
}

public boolean isViewMaterializationWithFilter()
{
return viewMaterializationWithFilter;
}

@Config("bigquery.view-materialization-with-filter")
@ConfigDescription("Use filter when materializing views")
public BigQueryConfig setViewMaterializationWithFilter(boolean viewMaterializationWithFilter)
{
this.viewMaterializationWithFilter = viewMaterializationWithFilter;
return this;
}

public Optional<String> getViewMaterializationProject()
{
return viewMaterializationProject;
Expand Down Expand Up @@ -334,5 +348,8 @@ public void validate()
if (skipViewMaterialization) {
checkState(viewsEnabled, "%s config property must be enabled when skipping view materialization", VIEWS_ENABLED);
}
if (viewMaterializationWithFilter) {
checkState(viewsEnabled, "%s config property must be enabled when view materialization with filter is enabled", VIEWS_ENABLED);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public final class BigQuerySessionProperties
implements SessionPropertiesProvider
{
private static final String SKIP_VIEW_MATERIALIZATION = "skip_view_materialization";
private static final String VIEW_MATERIALIZATION_WITH_FILTER = "view_materialization_with_filter";
private static final String QUERY_RESULTS_CACHE_ENABLED = "query_results_cache_enabled";
private static final String CREATE_DISPOSITION_TYPE = "create_disposition_type";

Expand All @@ -43,6 +44,11 @@ public BigQuerySessionProperties(BigQueryConfig config)
"Skip materializing views",
config.isSkipViewMaterialization(),
false))
.add(booleanProperty(
VIEW_MATERIALIZATION_WITH_FILTER,
"Materialize views with filters",
config.isViewMaterializationWithFilter(),
false))
.add(booleanProperty(
QUERY_RESULTS_CACHE_ENABLED,
"Enable query results cache",
Expand All @@ -68,6 +74,11 @@ public static boolean isSkipViewMaterialization(ConnectorSession session)
return session.getProperty(SKIP_VIEW_MATERIALIZATION, Boolean.class);
}

public static boolean isViewMaterializationWithFilter(ConnectorSession session)
{
return session.getProperty(VIEW_MATERIALIZATION_WITH_FILTER, Boolean.class);
}

public static boolean isQueryResultsCacheEnabled(ConnectorSession session)
{
return session.getProperty(QUERY_RESULTS_CACHE_ENABLED, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.TableResult;
import com.google.cloud.bigquery.storage.v1.ReadSession;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import com.google.protobuf.ByteString;
Expand Down Expand Up @@ -153,14 +154,20 @@ private List<BigQuerySplit> readFromBigQuery(ConnectorSession session, TableDefi
.filter(columnName -> !projectedColumnsNames.contains(columnName))
.forEach(projectedColumnsNames::add));
}
ReadSessionCreator readSessionCreator = new ReadSessionCreator(bigQueryClientFactory, bigQueryReadClientFactory, viewEnabled, arrowSerializationEnabled, viewExpiration, maxReadRowsRetries);
ReadSession readSession = readSessionCreator.create(session, remoteTableId, ImmutableList.copyOf(projectedColumnsNames), filter, actualParallelism);
ReadSession readSession = createReadSession(session, remoteTableId, ImmutableList.copyOf(projectedColumnsNames), filter, actualParallelism);

return readSession.getStreamsList().stream()
.map(stream -> BigQuerySplit.forStream(stream.getName(), getSchemaAsString(readSession), columns, OptionalInt.of(stream.getSerializedSize())))
.collect(toImmutableList());
}

@VisibleForTesting
ReadSession createReadSession(ConnectorSession session, TableId remoteTableId, List<String> projectedColumnsNames, Optional<String> filter, int actualParallelism)
{
ReadSessionCreator readSessionCreator = new ReadSessionCreator(bigQueryClientFactory, bigQueryReadClientFactory, viewEnabled, arrowSerializationEnabled, viewExpiration, maxReadRowsRetries);
return readSessionCreator.create(session, remoteTableId, projectedColumnsNames, filter, actualParallelism);
}

private List<BigQuerySplit> createEmptyProjection(ConnectorSession session, TableId remoteTableId, int actualParallelism, Optional<String> filter)
{
BigQueryClient client = bigQueryClientFactory.create(session);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import static com.google.cloud.bigquery.TableDefinition.Type.TABLE;
import static com.google.cloud.bigquery.TableDefinition.Type.VIEW;
import static com.google.cloud.bigquery.storage.v1.ArrowSerializationOptions.CompressionCodec.ZSTD;
import static io.trino.plugin.bigquery.BigQuerySessionProperties.isViewMaterializationWithFilter;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.lang.String.format;
import static java.time.temporal.ChronoUnit.MILLIS;
Expand Down Expand Up @@ -77,7 +78,7 @@ public ReadSession create(ConnectorSession session, TableId remoteTable, List<St
TableInfo tableDetails = client.getTable(remoteTable)
.orElseThrow(() -> new TableNotFoundException(new SchemaTableName(remoteTable.getDataset(), remoteTable.getTable())));

TableInfo actualTable = getActualTable(client, tableDetails, selectedFields);
TableInfo actualTable = getActualTable(client, tableDetails, selectedFields, isViewMaterializationWithFilter(session) ? filter : Optional.empty());

List<String> filteredSelectedFields = selectedFields.stream()
.map(BigQueryUtil::toBigQueryColumnName)
Expand Down Expand Up @@ -121,7 +122,8 @@ String toTableResourceName(TableId tableId)
private TableInfo getActualTable(
BigQueryClient client,
TableInfo remoteTable,
List<String> requiredColumns)
List<String> requiredColumns,
Optional<String> filter)
{
TableDefinition tableDefinition = remoteTable.getDefinition();
TableDefinition.Type tableType = tableDefinition.getType();
Expand All @@ -135,7 +137,7 @@ private TableInfo getActualTable(
BigQueryConfig.VIEWS_ENABLED));
}
// get it from the view
return client.getCachedTable(viewExpiration, remoteTable, requiredColumns);
return client.getCachedTable(viewExpiration, remoteTable, requiredColumns, filter);
}
// Storage API doesn't support reading other table types (materialized views, external)
throw new TrinoException(NOT_SUPPORTED, format("Table type '%s' of table '%s.%s' is not supported",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public class ViewMaterializationCache
{
private static final Logger log = Logger.get(ViewMaterializationCache.class);

public static final String TEMP_TABLE_PREFIX = "_pbc_";

private final NonEvictableCache<String, TableInfo> destinationTableCache;
private final Optional<String> viewMaterializationProject;
private final Optional<String> viewMaterializationDataset;
Expand All @@ -70,7 +72,7 @@ private TableId createDestinationTable(TableId remoteTableId)
String project = viewMaterializationProject.orElseGet(remoteTableId::getProject);
String dataset = viewMaterializationDataset.orElseGet(remoteTableId::getDataset);

String name = format("_pbc_%s", randomUUID().toString().toLowerCase(ENGLISH).replace("-", ""));
String name = format("%s%s", TEMP_TABLE_PREFIX, randomUUID().toString().toLowerCase(ENGLISH).replace("-", ""));
return TableId.of(project, dataset, name);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public void testDefaults()
.setParallelism(null)
.setViewExpireDuration(new Duration(24, HOURS))
.setSkipViewMaterialization(false)
.setViewMaterializationWithFilter(false)
.setViewMaterializationProject(null)
.setViewMaterializationDataset(null)
.setMaxReadRowsRetries(3)
Expand Down Expand Up @@ -66,6 +67,7 @@ public void testExplicitPropertyMappingsWithCredentialsKey()
.put("bigquery.arrow-serialization.enabled", "true")
.put("bigquery.view-expire-duration", "30m")
.put("bigquery.skip-view-materialization", "true")
.put("bigquery.view-materialization-with-filter", "true")
.put("bigquery.view-materialization-project", "vmproject")
.put("bigquery.view-materialization-dataset", "vmdataset")
.put("bigquery.max-read-rows-retries", "10")
Expand All @@ -88,6 +90,7 @@ public void testExplicitPropertyMappingsWithCredentialsKey()
.setArrowSerializationEnabled(true)
.setViewExpireDuration(new Duration(30, MINUTES))
.setSkipViewMaterialization(true)
.setViewMaterializationWithFilter(true)
.setViewMaterializationProject("vmproject")
.setViewMaterializationDataset("vmdataset")
.setMaxReadRowsRetries(10)
Expand Down Expand Up @@ -120,5 +123,12 @@ public void testInvalidViewSetting()
.validate())
.isInstanceOf(IllegalStateException.class)
.hasMessage("bigquery.views-enabled config property must be enabled when skipping view materialization");

assertThatThrownBy(() -> new BigQueryConfig()
.setViewMaterializationWithFilter(true)
.setViewsEnabled(false)
.validate())
.isInstanceOf(IllegalStateException.class)
.hasMessage("bigquery.views-enabled config property must be enabled when view materialization with filter is enabled");
}
}

0 comments on commit 0329c4c

Please sign in to comment.