Skip to content

Commit

Permalink
Implement node local dynamic bucket pruning
Browse files Browse the repository at this point in the history
  • Loading branch information
raunaqmorarka authored and sopel39 committed Oct 27, 2020
1 parent 9535ef6 commit 06c8ef8
Show file tree
Hide file tree
Showing 11 changed files with 256 additions and 25 deletions.
Expand Up @@ -234,6 +234,7 @@
import static io.prestosql.plugin.hive.util.HiveBucketing.getHiveBucketHandle;
import static io.prestosql.plugin.hive.util.HiveUtil.columnExtraInfo;
import static io.prestosql.plugin.hive.util.HiveUtil.getPartitionKeyColumnHandles;
import static io.prestosql.plugin.hive.util.HiveUtil.getRegularColumnHandles;
import static io.prestosql.plugin.hive.util.HiveUtil.hiveColumnHandles;
import static io.prestosql.plugin.hive.util.HiveUtil.toPartitionValues;
import static io.prestosql.plugin.hive.util.HiveUtil.verifyPartitionTypeSupported;
Expand Down Expand Up @@ -389,6 +390,7 @@ public HiveTableHandle getTableHandle(ConnectorSession session, SchemaTableName
tableName.getTableName(),
table.get().getParameters(),
getPartitionKeyColumnHandles(table.get(), typeManager),
getRegularColumnHandles(table.get(), typeManager, getTimestampPrecision(session).getPrecision()),
getHiveBucketHandle(session, table.get(), typeManager));
}

Expand Down Expand Up @@ -2316,6 +2318,7 @@ public ConnectorTableHandle makeCompatiblePartitioning(ConnectorSession session,
hiveTable.getTableName(),
hiveTable.getTableParameters(),
hiveTable.getPartitionColumns(),
hiveTable.getDataColumns(),
hiveTable.getPartitions(),
hiveTable.getCompactEffectivePredicate(),
hiveTable.getEnforcedConstraint(),
Expand Down
Expand Up @@ -66,6 +66,8 @@
import static io.prestosql.plugin.hive.HiveUpdatablePageSource.ACID_ROW_STRUCT_COLUMN_ID;
import static io.prestosql.plugin.hive.HiveUpdatablePageSource.ORIGINAL_FILE_PATH_MATCHER;
import static io.prestosql.plugin.hive.orc.OrcTypeToHiveTypeTranslator.fromOrcTypeToHiveType;
import static io.prestosql.plugin.hive.util.HiveBucketing.HiveBucketFilter;
import static io.prestosql.plugin.hive.util.HiveBucketing.getHiveBucketFilter;
import static io.prestosql.plugin.hive.util.HiveUtil.getPrefilledColumnValue;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;
Expand Down Expand Up @@ -123,12 +125,15 @@ public ConnectorPageSource createPageSource(
DynamicFilter dynamicFilter)
{
HiveTableHandle hiveTable = (HiveTableHandle) tableHandle;
HiveSplit hiveSplit = (HiveSplit) split;

if (shouldSkipBucket(hiveTable, hiveSplit, dynamicFilter)) {
return new EmptyPageSource();
}

List<HiveColumnHandle> hiveColumns = columns.stream()
.map(HiveColumnHandle.class::cast)
.collect(toList());

HiveSplit hiveSplit = (HiveSplit) split;
Path path = new Path(hiveSplit.getPath());
boolean originalFile = ORIGINAL_FILE_PATH_MATCHER.matcher(path.toString()).matches();

Expand Down Expand Up @@ -326,6 +331,15 @@ public static Optional<ConnectorPageSource> createHivePageSource(
return Optional.empty();
}

private static boolean shouldSkipBucket(HiveTableHandle hiveTable, HiveSplit hiveSplit, DynamicFilter dynamicFilter)
{
if (hiveSplit.getBucketNumber().isEmpty()) {
return false;
}
Optional<HiveBucketFilter> hiveBucketFilter = getHiveBucketFilter(hiveTable, dynamicFilter.getCurrentPredicate());
return hiveBucketFilter.map(filter -> !filter.getBucketsToKeep().contains(hiveSplit.getBucketNumber().getAsInt())).orElse(false);
}

public static class ColumnMapping
{
private final ColumnMappingKind kind;
Expand Down
Expand Up @@ -19,7 +19,6 @@
import com.google.common.collect.Sets;
import io.prestosql.plugin.hive.authentication.HiveIdentity;
import io.prestosql.plugin.hive.metastore.SemiTransactionalHiveMetastore;
import io.prestosql.plugin.hive.metastore.Table;
import io.prestosql.plugin.hive.util.HiveBucketing.HiveBucketFilter;
import io.prestosql.plugin.hive.util.Optionals;
import io.prestosql.spi.PrestoException;
Expand Down Expand Up @@ -91,10 +90,7 @@ public HivePartitionResult getPartitions(SemiTransactionalHiveMetastore metastor
return new HivePartitionResult(partitionColumns, ImmutableList.of(), none(), none(), none(), hiveBucketHandle, Optional.empty());
}

Table table = metastore.getTable(identity, tableName.getSchemaName(), tableName.getTableName())
.orElseThrow(() -> new TableNotFoundException(tableName));

Optional<HiveBucketFilter> bucketFilter = getHiveBucketFilter(table, effectivePredicate);
Optional<HiveBucketFilter> bucketFilter = getHiveBucketFilter(hiveTableHandle, effectivePredicate);
TupleDomain<HiveColumnHandle> compactEffectivePredicate = effectivePredicate
.transform(HiveColumnHandle.class::cast)
.simplify(domainCompactionThreshold);
Expand Down Expand Up @@ -187,6 +183,7 @@ public HiveTableHandle applyPartitionResult(HiveTableHandle handle, HivePartitio
handle.getTableName(),
handle.getTableParameters(),
ImmutableList.copyOf(partitions.getPartitionColumns()),
handle.getDataColumns(),
Optional.of(getPartitionsAsList(partitions)),
partitions.getCompactEffectivePredicate(),
partitions.getEnforcedConstraint(),
Expand Down
Expand Up @@ -43,6 +43,7 @@ public class HiveTableHandle
private final String tableName;
private final Optional<Map<String, String>> tableParameters;
private final List<HiveColumnHandle> partitionColumns;
private final List<HiveColumnHandle> dataColumns;
private final Optional<List<HivePartition>> partitions;
private final TupleDomain<HiveColumnHandle> compactEffectivePredicate;
private final TupleDomain<ColumnHandle> enforcedConstraint;
Expand All @@ -58,6 +59,7 @@ public HiveTableHandle(
@JsonProperty("schemaName") String schemaName,
@JsonProperty("tableName") String tableName,
@JsonProperty("partitionColumns") List<HiveColumnHandle> partitionColumns,
@JsonProperty("dataColumns") List<HiveColumnHandle> dataColumns,
@JsonProperty("compactEffectivePredicate") TupleDomain<HiveColumnHandle> compactEffectivePredicate,
@JsonProperty("enforcedConstraint") TupleDomain<ColumnHandle> enforcedConstraint,
@JsonProperty("bucketHandle") Optional<HiveBucketHandle> bucketHandle,
Expand All @@ -71,6 +73,7 @@ public HiveTableHandle(
tableName,
Optional.empty(),
partitionColumns,
dataColumns,
Optional.empty(),
compactEffectivePredicate,
enforcedConstraint,
Expand All @@ -87,13 +90,15 @@ public HiveTableHandle(
String tableName,
Map<String, String> tableParameters,
List<HiveColumnHandle> partitionColumns,
List<HiveColumnHandle> dataColumns,
Optional<HiveBucketHandle> bucketHandle)
{
this(
schemaName,
tableName,
Optional.of(tableParameters),
partitionColumns,
dataColumns,
Optional.empty(),
TupleDomain.all(),
TupleDomain.all(),
Expand All @@ -110,6 +115,7 @@ public HiveTableHandle(
String tableName,
Optional<Map<String, String>> tableParameters,
List<HiveColumnHandle> partitionColumns,
List<HiveColumnHandle> dataColumns,
Optional<List<HivePartition>> partitions,
TupleDomain<HiveColumnHandle> compactEffectivePredicate,
TupleDomain<ColumnHandle> enforcedConstraint,
Expand All @@ -124,6 +130,7 @@ public HiveTableHandle(
this.tableName = requireNonNull(tableName, "tableName is null");
this.tableParameters = requireNonNull(tableParameters, "tableParameters is null").map(ImmutableMap::copyOf);
this.partitionColumns = ImmutableList.copyOf(requireNonNull(partitionColumns, "partitionColumns is null"));
this.dataColumns = ImmutableList.copyOf(requireNonNull(dataColumns, "dataColumns is null"));
this.partitions = requireNonNull(partitions, "partitions is null").map(ImmutableList::copyOf);
this.compactEffectivePredicate = requireNonNull(compactEffectivePredicate, "compactEffectivePredicate is null");
this.enforcedConstraint = requireNonNull(enforcedConstraint, "enforcedConstraint is null");
Expand All @@ -142,6 +149,7 @@ public HiveTableHandle withAnalyzePartitionValues(List<List<String>> analyzePart
tableName,
tableParameters,
partitionColumns,
dataColumns,
partitions,
compactEffectivePredicate,
enforcedConstraint,
Expand All @@ -160,6 +168,7 @@ public HiveTableHandle withAnalyzeColumnNames(Set<String> analyzeColumnNames)
tableName,
tableParameters,
partitionColumns,
dataColumns,
partitions,
compactEffectivePredicate,
enforcedConstraint,
Expand All @@ -178,6 +187,7 @@ public HiveTableHandle withTransaction(AcidTransaction transaction)
tableName,
tableParameters,
partitionColumns,
dataColumns,
partitions,
compactEffectivePredicate,
enforcedConstraint,
Expand Down Expand Up @@ -214,6 +224,12 @@ public List<HiveColumnHandle> getPartitionColumns()
return partitionColumns;
}

@JsonProperty
public List<HiveColumnHandle> getDataColumns()
{
return dataColumns;
}

// do not serialize partitions as they are not needed on workers
@JsonIgnore
public Optional<List<HivePartition>> getPartitions()
Expand Down
Expand Up @@ -21,6 +21,7 @@
import io.prestosql.plugin.hive.HiveBucketHandle;
import io.prestosql.plugin.hive.HiveBucketProperty;
import io.prestosql.plugin.hive.HiveColumnHandle;
import io.prestosql.plugin.hive.HiveTableHandle;
import io.prestosql.plugin.hive.HiveType;
import io.prestosql.plugin.hive.metastore.Column;
import io.prestosql.plugin.hive.metastore.Table;
Expand Down Expand Up @@ -203,21 +204,25 @@ public static Optional<HiveBucketHandle> getHiveBucketHandle(ConnectorSession se
return Optional.of(new HiveBucketHandle(bucketColumns.build(), bucketingVersion, bucketCount, bucketCount));
}

public static Optional<HiveBucketFilter> getHiveBucketFilter(Table table, TupleDomain<ColumnHandle> effectivePredicate)
public static Optional<HiveBucketFilter> getHiveBucketFilter(HiveTableHandle hiveTable, TupleDomain<ColumnHandle> effectivePredicate)
{
if (table.getStorage().getBucketProperty().isEmpty()) {
if (hiveTable.getBucketHandle().isEmpty()) {
return Optional.empty();
}

if (bucketedOnTimestamp(table.getStorage().getBucketProperty().get(), table)) {
HiveBucketProperty hiveBucketProperty = hiveTable.getBucketHandle().get().toTableBucketProperty();
List<Column> dataColumns = hiveTable.getDataColumns().stream()
.map(HiveColumnHandle::toMetastoreColumn)
.collect(toImmutableList());
if (bucketedOnTimestamp(hiveBucketProperty, dataColumns, hiveTable.getTableName())) {
return Optional.empty();
}

Optional<Map<ColumnHandle, List<NullableValue>>> bindings = TupleDomain.extractDiscreteValues(effectivePredicate);
if (bindings.isEmpty()) {
return Optional.empty();
}
Optional<Set<Integer>> buckets = getHiveBuckets(table, bindings.get());
Optional<Set<Integer>> buckets = getHiveBuckets(hiveBucketProperty, dataColumns, bindings.get());
if (buckets.isPresent()) {
return Optional.of(new HiveBucketFilter(buckets.get()));
}
Expand All @@ -232,7 +237,7 @@ public static Optional<HiveBucketFilter> getHiveBucketFilter(Table table, TupleD
}
ValueSet values = domain.get().getValues();
ImmutableSet.Builder<Integer> builder = ImmutableSet.builder();
int bucketCount = table.getStorage().getBucketProperty().get().getBucketCount();
int bucketCount = hiveBucketProperty.getBucketCount();
for (int i = 0; i < bucketCount; i++) {
if (values.containsValue((long) i)) {
builder.add(i);
Expand All @@ -241,18 +246,18 @@ public static Optional<HiveBucketFilter> getHiveBucketFilter(Table table, TupleD
return Optional.of(new HiveBucketFilter(builder.build()));
}

private static Optional<Set<Integer>> getHiveBuckets(Table table, Map<ColumnHandle, List<NullableValue>> bindings)
private static Optional<Set<Integer>> getHiveBuckets(HiveBucketProperty hiveBucketProperty, List<Column> dataColumns, Map<ColumnHandle, List<NullableValue>> bindings)
{
if (bindings.isEmpty()) {
return Optional.empty();
}

// Get bucket columns names
List<String> bucketColumns = table.getStorage().getBucketProperty().get().getBucketedBy();
List<String> bucketColumns = hiveBucketProperty.getBucketedBy();

// Verify the bucket column types are supported
Map<String, HiveType> hiveTypes = new HashMap<>();
for (Column column : table.getDataColumns()) {
for (Column column : dataColumns) {
hiveTypes.put(column.getName(), column.getType());
}
for (String column : bucketColumns) {
Expand Down Expand Up @@ -286,8 +291,8 @@ private static Optional<Set<Integer>> getHiveBuckets(Table table, Map<ColumnHand
.collect(toImmutableList());

return getHiveBuckets(
getBucketingVersion(table.getParameters()),
table.getStorage().getBucketProperty().get().getBucketCount(),
hiveBucketProperty.getBucketingVersion(),
hiveBucketProperty.getBucketCount(),
typeInfos,
orderedBindings);
}
Expand All @@ -307,10 +312,15 @@ public static BucketingVersion getBucketingVersion(Map<String, String> tableProp
}

public static boolean bucketedOnTimestamp(HiveBucketProperty bucketProperty, Table table)
{
return bucketedOnTimestamp(bucketProperty, table.getDataColumns(), table.getTableName());
}

public static boolean bucketedOnTimestamp(HiveBucketProperty bucketProperty, List<Column> dataColumns, String tableName)
{
return bucketProperty.getBucketedBy().stream()
.map(columnName -> table.getColumn(columnName)
.orElseThrow(() -> new IllegalArgumentException(format("Cannot find column '%s' in %s", columnName, table))))
.map(columnName -> dataColumns.stream().filter(column -> column.getName().equals(columnName)).findFirst()
.orElseThrow(() -> new IllegalArgumentException(format("Cannot find column '%s' in %s", columnName, tableName))))
.map(Column::getType)
.map(HiveType::getTypeInfo)
.anyMatch(HiveBucketing::bucketedOnTimestamp);
Expand Down
Expand Up @@ -656,7 +656,7 @@ protected void setupHive(String databaseName)
tablePartitionSchemaChangeNonCanonical = new SchemaTableName(database, "presto_test_partition_schema_change_non_canonical");
tableBucketEvolution = new SchemaTableName(database, "presto_test_bucket_evolution");

invalidTableHandle = new HiveTableHandle(database, INVALID_TABLE, ImmutableMap.of(), ImmutableList.of(), Optional.empty());
invalidTableHandle = new HiveTableHandle(database, INVALID_TABLE, ImmutableMap.of(), ImmutableList.of(), ImmutableList.of(), Optional.empty());

dsColumn = createBaseColumn("ds", -1, HIVE_STRING, VARCHAR, PARTITION_KEY, Optional.empty());
fileFormatColumn = createBaseColumn("file_format", -1, HIVE_STRING, VARCHAR, PARTITION_KEY, Optional.empty());
Expand Down
Expand Up @@ -242,7 +242,7 @@ private static ConnectorPageSource createPageSource(HiveTransactionHandle transa
Optional.empty(),
false,
Optional.empty());
ConnectorTableHandle table = new HiveTableHandle(SCHEMA_NAME, TABLE_NAME, ImmutableMap.of(), ImmutableList.of(), Optional.empty());
ConnectorTableHandle table = new HiveTableHandle(SCHEMA_NAME, TABLE_NAME, ImmutableMap.of(), ImmutableList.of(), ImmutableList.of(), Optional.empty());
HivePageSourceProvider provider = new HivePageSourceProvider(
TYPE_MANAGER,
HDFS_ENVIRONMENT,
Expand Down
Expand Up @@ -29,7 +29,7 @@ public class TestHiveTableHandle
@Test
public void testRoundTrip()
{
HiveTableHandle expected = new HiveTableHandle("schema", "table", ImmutableMap.of(), ImmutableList.of(), Optional.empty());
HiveTableHandle expected = new HiveTableHandle("schema", "table", ImmutableMap.of(), ImmutableList.of(), ImmutableList.of(), Optional.empty());

String json = codec.toJson(expected);
HiveTableHandle actual = codec.fromJson(json);
Expand Down

0 comments on commit 06c8ef8

Please sign in to comment.