-
Notifications
You must be signed in to change notification settings - Fork 5.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve query plan when Hive tables has compatible bucket number #11749
Conversation
b1b4122
to
4b9f464
Compare
180a2a4
to
317438a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some initial quick comment. I would like to understand the design together with evolving bucket feature (done in #10312 )
presto-hive/src/main/java/com/facebook/presto/hive/BackgroundHiveSplitLoader.java
Outdated
Show resolved
Hide resolved
// must be evenly divisible | ||
return Optional.empty(); | ||
} | ||
if (Integer.bitCount(largerBucketCount / smallerBucketCount) != 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't seem necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that this isn't necessary. But this is a reasonable restriction, and the existing bucket # evolution also has restriction.
} | ||
|
||
OptionalInt maxCompatibleBucketCount = min(leftHandle.getMaxCompatibleBucketCount(), rightHandle.getMaxCompatibleBucketCount()); | ||
if (maxCompatibleBucketCount.isPresent() && maxCompatibleBucketCount.getAsInt() < smallerBucketCount) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, when maxCompatibleBucketCount
is not provided, there is no max compatible bucket count...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is true. I suppose you are concerned about this.
//TODO! commented out to run travis. uncomment before merge | ||
//return Optional.empty(); | ||
} | ||
int largerBucketCount = Math.max(leftHandle.getBucketCount(), rightHandle.getBucketCount()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we refactor to use the existing HiveSplitManager.isBucketCountCompatible
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the abstraction perspective: Even though both functions take two integers, the semantics is completely different. One takes table bucket count and partition bucket, the other takes two table bucket count (and order doesn't matter).
From the practical perspective: On the other hand, while the code looks similar, the logic is sufficiently different. While code reuse is still possible, it won't be particularly straightforward.
Overall, I think the two functions should be kept separate.
@@ -125,6 +125,11 @@ public SchemaTableName getSchemaTableName() | |||
@Override | |||
public String toString() | |||
{ | |||
return schemaTableName.toString(); | |||
StringBuilder result = new StringBuilder(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Can MoreObjects.toStringHelper
be used here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
toString
of ConnectorTableLayoutHandle
is used for EXPLAIN. As a result, using toStringHelper
would lead to too much boilerplate.
{ | ||
return nodePartitioning.isPresent() && nodePartitioning.get().equals(partitioning) && this.nullsAndAnyReplicated == nullsAndAnyReplicated; | ||
return nodePartitioning.isPresent() && nodePartitioning.get().isCompatibleWith(partitioning, metadata, session) && this.nullsAndAnyReplicated == nullsAndAnyReplicated; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we consider rename the variable name? (nodePartitioning
) since it's referring to table partitioning here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this would be a reasonable rename to make. I don't think it fits in this PR though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
presto-hive/src/main/java/com/facebook/presto/hive/BackgroundHiveSplitLoader.java
Outdated
Show resolved
Hide resolved
throw new PrestoException( | ||
NOT_SUPPORTED, | ||
"The bucket filter cannot be satisfied. There are restrictions on the bucket filter when all the following is true: " + | ||
"1. a table has a different buckets count as at least one of its partitions that is read in this query; " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This happens when tableBucket > max(readBucket, partitionBucket)
and $bucket
is used. So I can see why tableBucket != partitionBucket
is a necessary condition.
However, this condition combination looks very complicated for user to understand and reason when they can use $bucket
, when they cannot.
Another thinking would be to simply disable compatible bucket when $bucket
is used. This seems easier to reason the engine behavior.
int partitionBucketNumber = bucketNumber % partitionBucketCount; // physical | ||
int tableBucketNumber = bucketNumber % tableBucketCount; // logical | ||
if (bucketSplitInfo.isBucketEnabled(tableBucketNumber)) { | ||
for (int bucketNumber = 0; bucketNumber < Math.max(readBucketCount, partitionBucketCount); bucketNumber++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Depends on the comparison relationship between readBucketCount
and partitionBucketCount
, the loop subject can be readBucketNumber
or partitionBucketNumber
.
One thought is to fix the loop subject to be readBucketNumber
, and it may create 1 or multiple splits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a desire here to make sure that the splits are produced in an order such that it round robins (1 split at a time for each "read bucket id").
|
||
PartitioningHandleReassigner partitioningHandleReassigner = new PartitioningHandleReassigner(fragment.getPartitioning(), metadata, session); | ||
PlanNode newRoot; | ||
if (fragment.getPartitioning().isSingleNode()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the condition here and the condition on 153 the same?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so
presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenter.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just skimmed so far
presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTest.java
Outdated
Show resolved
Hide resolved
presto-hive/src/main/java/com/facebook/presto/hive/HiveBucketHandle.java
Show resolved
Hide resolved
presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTest.java
Outdated
Show resolved
Hide resolved
@@ -314,13 +314,13 @@ private void invokeNoMoreSplitsIfNecessary() | |||
if (partition.getPartition().isPresent()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does it mean bucket numbers are compatible
?
presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenter.java
Outdated
Show resolved
Hide resolved
@@ -98,6 +98,20 @@ default boolean schemaExists(ConnectorSession session, String schemaName) | |||
|
|||
ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle handle); | |||
|
|||
default ConnectorTableLayoutHandle getAlternativeLayoutHandle(ConnectorSession session, ConnectorTableLayoutHandle tableLayoutHandle, ConnectorPartitioningHandle partitioningHandle) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This surely missing a comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe getPartitionedTableLayoutHandle? Maybe it could return Optional, then I suspect that below method would not need to be necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added comment.
I don't think returning an Optional would be helpful in removing the other method.
presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorMetadata.java
Outdated
Show resolved
Hide resolved
return reassignPartitioningHandleIfNecessaryHelper(session, metadata, subPlan, subPlan.getFragment().getPartitioning()); | ||
} | ||
|
||
private SubPlan reassignPartitioningHandleIfNecessaryHelper(Session session, Metadata metadata, SubPlan subPlan, PartitioningHandle newOutputPartitioningHandle) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove Helper
part from the method name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer keeping the suffix as It is a helper method which is necessary in order to do recursion
presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenter.java
Outdated
Show resolved
Hide resolved
presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenter.java
Outdated
Show resolved
Hide resolved
@@ -98,6 +98,20 @@ default boolean schemaExists(ConnectorSession session, String schemaName) | |||
|
|||
ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle handle); | |||
|
|||
default ConnectorTableLayoutHandle getAlternativeLayoutHandle(ConnectorSession session, ConnectorTableLayoutHandle tableLayoutHandle, ConnectorPartitioningHandle partitioningHandle) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe getPartitionedTableLayoutHandle? Maybe it could return Optional, then I suspect that below method would not need to be necessary.
317438a
to
f82a7bf
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Add assertion about plan for tests involving grouped execution" .
Looks good except minor comment.
log.info("FINISHED in presto: %s", nanosSince(start)); | ||
|
||
if (planAssertion.isPresent()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can also write as
planAssertion.ifPresent(assertion -> assertion.accept(queryPlan));
ditto for line 150.
I don't have strong option here, though. Given we anyway have to have the if-statement from line 64-72
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll leave this as is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Rename and add comments to methods in BucketSplitInfo" looks good.
* A bucket predicate can be present in two cases: | ||
* <ul> | ||
* <li>Filter on "$bucket" column. e.g. {@code "$bucket" between 0 and 100} | ||
* <li>Single-value equality filter on all bucket columns. e.g. for a table with two bucketing columns, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is good to learn! How is this done? (bucketFilter
is created by seeking "$bucket"
in effectivePredicate
, does TupleDomain
tries to interpret bucketCol = xxx
as $bucket = yyy
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See HiveBucketing.getHiveBucket
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Add plan partitioning sanity check for TableWriter in PlanFragmenter"
Looks good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
"The bucket filter cannot be satisfied. There are restrictions on the bucket filter when all the following is true: " + | ||
"1. a table has a different buckets count as at least one of its partitions that is read in this query; " + | ||
"2. the table has a different but compatible bucket number with another table in the query; " + | ||
"3. some buckets of the table is filtered out from the query, most likely using a filter on \"$bucket\". " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the comment in 9d3cd72#diff-20dee960e1c124aae6c511152416a860R547
Filtering just on the bucket columns can have this issue right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that is correct.
When two Hive tables have same bucketing key, but different bucket number, the tables are considered to have compatible bucketing. Remote exchange is not necessary to join them when the bucket numbers are compatible. Bucket numbers are considered compatible if the one is the multiple of the other. In the current implementation, the multiplier is required to be a power of two. This power-of-two constraint is not strictly necessary and can be removed. This change also applies when one is reading from a bucketed table and writing into another. It also applies when group by is applied on union all of two or more tables.
f82a7bf
to
c223aec
Compare
" test_mismatch_bucketingN\n" + | ||
"ON key16=keyN"; | ||
|
||
assertUpdate(withoutMismatchOptimization, writeToTableWithMoreBuckets, 15000, assertRemoteExchangesCount(4)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about adding a test case about writeToTableWithNoBuckets
? :)
@@ -315,34 +361,44 @@ public FragmentProperties setSingleNodeDistribution() | |||
return this; | |||
} | |||
|
|||
public FragmentProperties setDistribution(PartitioningHandle distribution) | |||
public FragmentProperties setDistribution(PartitioningHandle distribution, Metadata metadata, Session session) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about call it coalesceDistribution
? Or in the comment mention it now will coalesce fragment distribution to a common one
Merged #11749. Sorry that I didn't mark the PR earlier. |
@@ -375,28 +431,39 @@ public FragmentProperties setCoordinatorOnlyDistribution() | |||
return this; | |||
} | |||
|
|||
public FragmentProperties addSourceDistribution(PlanNodeId source, PartitioningHandle distribution) | |||
public FragmentProperties addSourceDistribution(PlanNodeId source, PartitioningHandle distribution, Metadata metadata, Session session) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now two methods will coalesce
fragment's distribution:
setDistribution
addSourceDistribution
Maybe consider to have some common method to do coalesce
. This might help reason the code ? (e.g. when is fragment coalesce happen)
When two Hive tables have same bucketing key, but different bucket number,
remote exchange is not necessary to join them when the bucket numbers are
compatible.