Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix skipping in diverse scalar stream readers #12721

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
d452dd7
Initial commit of Aria scan
mbasmanova Mar 26, 2019
5d363b0
Remove unused flag from ColumnGroupReader
mbasmanova Mar 19, 2019
3e7efe1
Evaluate complex filters on partition columns
mbasmanova Mar 20, 2019
49fcb48
Handle projection pushdown in HivePageSource
mbasmanova Mar 20, 2019
eae15a2
Initial refactoring of pushdown subscripts rule
mbasmanova Mar 15, 2019
f368f32
Add filter_by_subscript_paths function
mbasmanova Apr 3, 2019
2a478b4
Add pushdown_subfields session property
mbasmanova Apr 3, 2019
41f7ad6
Rename SubfieldPath.getPath to getPathElements
mbasmanova Apr 3, 2019
6cb5c9a
Serialize SubfieldPath into a string
mbasmanova Apr 3, 2019
c31d27f
Pushdown subfields into project on top of scan
mbasmanova Apr 3, 2019
3f76fb1
Remove ColumnHandle.getReferencedSubfields API
mbasmanova Apr 3, 2019
f28d03e
Add PushdownSubfieldsIntoConnector rule
mbasmanova Apr 3, 2019
ee7ff1d
Enforce scan batch by exception on exceeding space budget
Apr 10, 2019
c0dab50
Catch up with changes on master
mbasmanova Apr 17, 2019
cbee281
Fix TestLogicalPlanner.testPushdownSubfields
mbasmanova Apr 17, 2019
6fafa06
Move filter reordering and budget enforcement flags into Hive connector
mbasmanova Apr 17, 2019
9cee337
Fix Slice dictionary reader with row group level dictionaries
Apr 10, 2019
9f7fc0f
Scan support for LongDictionaryStreamReader
Apr 1, 2019
2695a1d
Store RuntimeExceptions in ErrorSet
mbasmanova Apr 18, 2019
837fc33
Pushdown of constant columns
Apr 11, 2019
8db0190
Fix skipping and values sizing in diverse scalar stream readers
Apr 24, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -15,11 +15,13 @@

import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.SubfieldPath;
import com.facebook.presto.spi.type.TypeManager;
import com.facebook.presto.spi.type.TypeSignature;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.List;
import java.util.Objects;
import java.util.Optional;

Expand Down Expand Up @@ -60,6 +62,8 @@ public enum ColumnType
private final int hiveColumnIndex;
private final ColumnType columnType;
private final Optional<String> comment;
private final SubfieldPath subfieldPath;
private final List<SubfieldPath> referencedSubfields;

@JsonCreator
public HiveColumnHandle(
Expand All @@ -68,7 +72,9 @@ public HiveColumnHandle(
@JsonProperty("typeSignature") TypeSignature typeSignature,
@JsonProperty("hiveColumnIndex") int hiveColumnIndex,
@JsonProperty("columnType") ColumnType columnType,
@JsonProperty("comment") Optional<String> comment)
@JsonProperty("comment") Optional<String> comment,
@JsonProperty("subfieldPath") SubfieldPath subfieldPath,
@JsonProperty("referencedSubfields") List<SubfieldPath> referencedSubfields)
{
this.name = requireNonNull(name, "name is null");
checkArgument(hiveColumnIndex >= 0 || columnType == PARTITION_KEY || columnType == SYNTHESIZED, "hiveColumnIndex is negative");
Expand All @@ -77,6 +83,19 @@ public HiveColumnHandle(
this.typeName = requireNonNull(typeSignature, "type is null");
this.columnType = requireNonNull(columnType, "columnType is null");
this.comment = requireNonNull(comment, "comment is null");
this.subfieldPath = subfieldPath;
this.referencedSubfields = referencedSubfields;
}

public HiveColumnHandle(
String name,
HiveType hiveType,
TypeSignature typeSignature,
int hiveColumnIndex,
ColumnType columnType,
Optional<String> comment)
{
this(name, hiveType, typeSignature, hiveColumnIndex, columnType, comment, null, null);
}

@JsonProperty
Expand Down Expand Up @@ -130,10 +149,22 @@ public ColumnType getColumnType()
return columnType;
}

@JsonProperty
public SubfieldPath getSubfieldPath()
{
return subfieldPath;
}

@JsonProperty
public List<SubfieldPath> getReferencedSubfields()
{
return referencedSubfields;
}

@Override
public int hashCode()
{
return Objects.hash(name, hiveColumnIndex, hiveType, columnType, comment);
return Objects.hash(name, hiveColumnIndex, hiveType, columnType, comment, subfieldPath);
}

@Override
Expand All @@ -150,13 +181,13 @@ public boolean equals(Object obj)
Objects.equals(this.hiveColumnIndex, other.hiveColumnIndex) &&
Objects.equals(this.hiveType, other.hiveType) &&
Objects.equals(this.columnType, other.columnType) &&
Objects.equals(this.comment, other.comment);
Objects.equals(this.comment, other.comment) && Objects.equals(this.subfieldPath, other.subfieldPath);
}

@Override
public String toString()
{
return name + ":" + hiveType + ":" + hiveColumnIndex + ":" + columnType;
return name + (subfieldPath != null ? subfieldPath.toString() : "") + ":" + hiveType + ":" + hiveColumnIndex + ":" + columnType;
}

public static HiveColumnHandle updateRowIdHandle()
Expand Down Expand Up @@ -194,4 +225,16 @@ public static boolean isBucketColumnHandle(HiveColumnHandle column)
{
return column.getHiveColumnIndex() == BUCKET_COLUMN_INDEX;
}

@Override
public boolean supportsSubfieldTupleDomain()
{
return true;
}

@Override
public ColumnHandle createSubfieldColumnHandle(SubfieldPath path)
{
return new HiveColumnHandle(name, hiveType, typeName, hiveColumnIndex, columnType, comment, path, null);
}
}
Expand Up @@ -51,6 +51,7 @@
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.SchemaTablePrefix;
import com.facebook.presto.spi.StandardErrorCode;
import com.facebook.presto.spi.SubfieldPath;
import com.facebook.presto.spi.SystemTable;
import com.facebook.presto.spi.TableNotFoundException;
import com.facebook.presto.spi.ViewNotFoundException;
Expand Down Expand Up @@ -203,18 +204,22 @@
import static com.facebook.presto.spi.StandardErrorCode.INVALID_TABLE_PROPERTY;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.spi.StandardErrorCode.SCHEMA_NOT_EMPTY;
import static com.facebook.presto.spi.predicate.TupleDomain.all;
import static com.facebook.presto.spi.predicate.TupleDomain.withColumnDomains;
import static com.facebook.presto.spi.security.PrincipalType.USER;
import static com.facebook.presto.spi.statistics.TableStatisticType.ROW_COUNT;
import static com.facebook.presto.spi.type.BigintType.BIGINT;
import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Predicates.in;
import static com.google.common.base.Predicates.not;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Iterables.concat;
import static com.google.common.collect.Maps.filterKeys;
import static com.google.common.collect.Streams.stream;
import static java.lang.String.format;
import static java.util.Collections.emptyList;
Expand Down Expand Up @@ -1689,6 +1694,13 @@ public OptionalLong metadataDelete(ConnectorSession session, ConnectorTableHandl
throw new TableNotFoundException(handle.getSchemaTableName());
}

List<ColumnHandle> partitionColumns = layoutHandle.getPartitionColumns();
if (!layoutHandle.getEffectivePredicate().getDomains()
.map(domains -> filterKeys(domains, not(in(partitionColumns))))
.orElse(ImmutableMap.of()).isEmpty()) {
throw new PrestoException(NOT_SUPPORTED, "This connector only supports delete where one or more partitions are deleted entirely");
}

if (table.get().getPartitionColumns().isEmpty()) {
metastore.truncateUnpartitionedTable(session, handle.getSchemaName(), handle.getTableName());
}
Expand All @@ -1707,9 +1719,9 @@ private List<HivePartition> getOrComputePartitions(HiveTableLayoutHandle layoutH
return layoutHandle.getPartitions().get();
}
else {
TupleDomain<ColumnHandle> promisedPredicate = layoutHandle.getPromisedPredicate();
Predicate<Map<ColumnHandle, NullableValue>> predicate = convertToPredicate(promisedPredicate);
List<ConnectorTableLayoutResult> tableLayoutResults = getTableLayouts(session, tableHandle, new Constraint<>(promisedPredicate, predicate), Optional.empty());
TupleDomain<ColumnHandle> partitionColumnPredicate = layoutHandle.getPartitionColumnPredicate();
Predicate<Map<ColumnHandle, NullableValue>> predicate = convertToPredicate(partitionColumnPredicate);
List<ConnectorTableLayoutResult> tableLayoutResults = getTableLayouts(session, tableHandle, new Constraint<>(partitionColumnPredicate, predicate), Optional.empty());
return ((HiveTableLayoutHandle) Iterables.getOnlyElement(tableLayoutResults).getTableLayout().getHandle()).getPartitions().get();
}
}
Expand All @@ -1726,6 +1738,33 @@ public boolean supportsMetadataDelete(ConnectorSession session, ConnectorTableHa
return true;
}

@Override
public Map<ColumnHandle, ColumnHandle> pushdownSubfieldPruning(
ConnectorSession session,
ConnectorTableHandle table,
Map<ColumnHandle, List<SubfieldPath>> desiredSubfields)
{
if (!isAriaScanEnabled(session, table)) {
return ImmutableMap.of();
}

ImmutableMap.Builder<ColumnHandle, ColumnHandle> newColumnHandles = ImmutableMap.builder();
for (Map.Entry<ColumnHandle, List<SubfieldPath>> entry : desiredSubfields.entrySet()) {
HiveColumnHandle columnHandle = (HiveColumnHandle) entry.getKey();
newColumnHandles.put(columnHandle, new HiveColumnHandle(
columnHandle.getName(),
columnHandle.getHiveType(),
columnHandle.getTypeSignature(),
columnHandle.getHiveColumnIndex(),
columnHandle.getColumnType(),
columnHandle.getComment(),
null,
entry.getValue()));
}

return newColumnHandles.build();
}

@Override
public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession session, ConnectorTableHandle tableHandle, Constraint<ColumnHandle> constraint, Optional<Set<ColumnHandle>> desiredColumns)
{
Expand All @@ -1739,18 +1778,35 @@ public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession session
hivePartitionResult = partitionManager.getPartitions(metastore, tableHandle, constraint, session);
}

// TODO Synchronize this flag with engine's aria-enabled
boolean ariaScanEnabled = isAriaScanEnabled(session, tableHandle);

return ImmutableList.of(new ConnectorTableLayoutResult(
getTableLayout(
session,
new HiveTableLayoutHandle(
handle.getSchemaTableName(),
ImmutableList.copyOf(hivePartitionResult.getPartitionColumns()),
getPartitionsAsList(hivePartitionResult),
hivePartitionResult.getCompactEffectivePredicate(),
hivePartitionResult.getEffectivePredicate(),
hivePartitionResult.getEnforcedConstraint(),
hivePartitionResult.getBucketHandle(),
hivePartitionResult.getBucketFilter())),
hivePartitionResult.getUnenforcedConstraint()));
ariaScanEnabled ? all() : hivePartitionResult.getUnenforcedConstraint()));
}

private boolean isAriaScanEnabled(ConnectorSession session, ConnectorTableHandle tableHandle)
{
boolean ariaScanEnabled = HiveSessionProperties.isAriaScanEnabled(session);
if (ariaScanEnabled) {
HiveStorageFormat hiveStorageFormat = getHiveStorageFormat(getTableMetadata(session, tableHandle).getProperties());
if (hiveStorageFormat == HiveStorageFormat.ORC || hiveStorageFormat == HiveStorageFormat.DWRF) {
return true;
}

// TODO Make HivePageSourceFactory for RC and Parquet fail if non-partition key predicates are present
}
return false;
}

@Override
Expand All @@ -1760,7 +1816,19 @@ public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTa
List<ColumnHandle> partitionColumns = hiveLayoutHandle.getPartitionColumns();
List<HivePartition> partitions = hiveLayoutHandle.getPartitions().get();

TupleDomain<ColumnHandle> predicate = createPredicate(partitionColumns, partitions);
TupleDomain<ColumnHandle> predicate;
if (partitions.isEmpty()) {
predicate = TupleDomain.none();
}
else {
ImmutableMap.Builder<ColumnHandle, Domain> predicateBuilder = ImmutableMap.builder();
hiveLayoutHandle.getEffectivePredicate().getDomains()
.map(domains -> filterKeys(domains, not(in(partitionColumns))))
.ifPresent(predicateBuilder::putAll);
createPredicate(partitionColumns, partitions).getDomains()
.ifPresent(predicateBuilder::putAll);
predicate = withColumnDomains(predicateBuilder.build());
}

Optional<DiscretePredicates> discretePredicates = Optional.empty();
if (!partitionColumns.isEmpty()) {
Expand Down Expand Up @@ -1870,8 +1938,8 @@ public ConnectorTableLayoutHandle getAlternativeLayoutHandle(ConnectorSession se
hiveLayoutHandle.getSchemaTableName(),
hiveLayoutHandle.getPartitionColumns(),
hiveLayoutHandle.getPartitions().get(),
hiveLayoutHandle.getCompactEffectivePredicate(),
hiveLayoutHandle.getPromisedPredicate(),
hiveLayoutHandle.getEffectivePredicate(),
hiveLayoutHandle.getPartitionColumnPredicate(),
Optional.of(new HiveBucketHandle(bucketHandle.getColumns(), bucketHandle.getTableBucketCount(), hivePartitioningHandle.getBucketCount())),
hiveLayoutHandle.getBucketFilter());
}
Expand Down