Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Clustering] Check in new table properties #15985

Closed
wants to merge 9 commits into from

Conversation

lliiun-z
Copy link
Contributor

To enable clustering, we add three table properties.

  1. clustered_by: the columns used for clustering.
  2. cluster_count: the number of clusters.
  3. distribution: the data distribution of each cluster column.

Test:

  1. Added an integration test to ensure the syntax is accepted by Presto.

Test plan - (Please fill in how you tested your changes)

Please make sure your submission complies with our Development, Formatting, and Commit Message guidelines. Don't forget to follow our attribution guidelines for any code copied from other projects.

Fill in the release notes towards the bottom of the PR description.
See Release Notes Guidelines for details.

== RELEASE NOTES ==

General Changes
* ...
* ...

Hive Changes
* ...
* ...

If release note is NOT required, use:

== NO RELEASE NOTE ==

@linux-foundation-easycla
Copy link

linux-foundation-easycla bot commented Apr 22, 2021

CLA Missing ID

  • ❌ The commit (03d582e ,23a5a53f2a9d2b6ae34a5682b3561a8c010d104c ,5ec42b3cc67936c142cadbb07ad98183bfb30cfc ,5a740b675094e37b86e46cce6471da74d8104b15 ,bb071bd49b2575b172bdfa7f405140f02e1d2c4b ,6b981e286cbae4606202b596b96c792b108e108b ,3b60b7df68f53e250d924dcfe916dfd8d3309a13 ,b6d06f5854987a6cee4f08a3cc364778d3a3cbb6) is missing the User's ID, preventing the EasyCLA check. Consult GitHub Help to resolve.For further assistance with EasyCLA, please submit a support request ticket.

Copy link
Contributor

@shixuan-fan shixuan-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"[Clustering] Check in new table properties"

Besides comment, how about changing commit title to: Introduce clustering to hive table properties? Also, we probably don't need the Test part in commit message.

.collect(Collectors.toList())),
value -> value),
new PropertyMetadata<>(
CLUSTER_COUNT_PROPERTY,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use integerProperty for cluster count. You could refer to BUCKET_COUNT_PROPERTY as these two seem pretty similar.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason that we use an array is that we want to allow owners or us to specify the bucket count for each dimension column and remove the complicated logic to determine how many buckets for each dimension column. If later we find out such logic is simple and stable, we can use single integer for cluster count.

new PropertyMetadata<>(
DISTRIBUTION_PROPERTY,
"Distributions of clustering columns",
typeManager.getType(parseTypeSignature("array(varchar)")),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the test below, this seems to be the split values for different range clustering. This is actually quite tricky because:

  • What is the expected input when there are multiple clustering columns?
  • The type of this property depends on the type of input columns, which is not known at compile time

I think keeping it as an array(varchar) is okay because varchar is arguably the most flexible representation. But we probably would need a util class to be able to encode/decode the string from/to the actual Java class for engine processing. Also, we would need to specify in the property distribution about the expected inputs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For cluster count should always be numbers. For distribution, you are right that the types are undetermined at compiling time.

Copy link
Contributor

@shixuan-fan shixuan-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"[Clustering] Expose clustering properties to hivebucketproperties object"

This commit itself is not a complete functionality so it is hard to give a proper commit title. Maybe we should squash them into one once the review is done? 🤔

} else if (bucketFunctionType.equals(HIVE_CLUSTERING)) {
checkArgument(clusteredBy.isPresent(), "Cluster columns must be present for bucket function type " + bucketFunctionType);
checkArgument(clusterCount.isPresent(), "Cluster numbers must be present for bucket function type " + bucketFunctionType);
checkArgument(bucketedBy.isEmpty(), "Bucket columns should not be present for bucket function type " + bucketFunctionType);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess clustering is compatible with sortedBy but want to call out here in case I'm not understanding correctly :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shixuan-fan, right! Clustering should be naturally compatible with SortedBy. I will add tests to confirm if it works.


@JsonCreator
public HiveBucketProperty(
@JsonProperty("bucketedBy") List<String> bucketedBy,
@JsonProperty("bucketCount") int bucketCount,
@JsonProperty("sortedBy") List<SortingColumn> sortedBy,
@JsonProperty("bucketFunctionType") BucketFunctionType bucketFunctionType,
@JsonProperty("types") Optional<List<Type>> types)
@JsonProperty("types") Optional<List<Type>> types,
@JsonProperty("clusteredBy") Optional<List<String>> clusteredBy,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we already have BucketFunctionType, do we still need to differentiate bucketedBy <-> clusteredBy and bucketCount <-> clusterCount? Logically they are doing the same thing, which is figuring out which columns should we bucket on and by how many buckets, and apply the bucket function as specified.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shixuan-fan, actually it's tricky. bucketedBy in hive world has too strong assumption on hash bucketing. This assumption is also buried in other places in our codebase. So might be good to distinguish them.

@@ -85,6 +101,9 @@ public HiveBucketProperty(
storageDescriptor.getNumBuckets(),
sortedBy,
HIVE_COMPATIBLE,
Optional.empty(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should introduce clustering related extraction from storage descriptor here as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have talked with James before; we may do it later after we can store these information in partition property using thrift structure.

return Optional.empty();
}

private static boolean doBucketing(List<String> bucketedBy)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this function and doClustering should be inlined. It is a bit too trivial.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, it is my personal preference of readability. Not sure if Java compiler can do the automatical inlining; if not, I will change.

Copy link
Contributor

@shixuan-fan shixuan-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Clustering] Expose clustering information to HivePartitioningHandle

I think we need to think carefully about how to decode the distribution part to be type compatible with because I imagine when doing the actual Z-order computing, the type of the column would be critical (e.g. 1 - 100 as integer would be very different from '1' - '100' as varchar).

@@ -116,7 +116,19 @@ public HiveBucketProperty(
@JsonProperty
public int getBucketCount()
{
return bucketCount;
if (bucketFunctionType == HIVE_COMPATIBLE || bucketFunctionType == PRESTO_NATIVE) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: how about a switch statement?

return getMergedClusterCount(clusterCount.get());
}

private static int getMergedClusterCount(List<Integer> clusterCount) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind elaborating what we are trying to do here? I'm a bit puzzled 😂

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

E.g., The cluster count is given as a list [8, 8, 8] for three clustering columns; however, the total cluster count should be 8 * 8 * 8 for the table.

private final List<String> distribution;
private final List<Type> types;

// TODO: Add logic to generate Z-Order based on these parameters.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we plan to introduce more clustering function other than Z-Order, or we want to make Z-Order the only function for clustering?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the implementation, the interface should provide a function as {int getClusterId()}. Z-Order should be one possible implementation for the interface.

@@ -68,6 +75,7 @@ public ConnectorBucketNodeMap getBucketNodeMap(ConnectorTransactionHandle transa
HivePartitioningHandle handle = (HivePartitioningHandle) partitioningHandle;
NodeSelectionStrategy nodeSelectionStrategy = getNodeSelectionStrategy(session);
int bucketCount = handle.getBucketCount();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove?

Copy link
Contributor

@highker highker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Early comments; still reviewing

@@ -15,6 +15,7 @@

public enum BucketFunctionType
{
HIVE_CLUSTERING,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's call it CLUSTERING or ZORDER_CLUSTERING given it is not a Hive concept


@JsonCreator
public HiveBucketProperty(
@JsonProperty("bucketedBy") List<String> bucketedBy,
@JsonProperty("bucketCount") int bucketCount,
@JsonProperty("sortedBy") List<SortingColumn> sortedBy,
@JsonProperty("bucketFunctionType") BucketFunctionType bucketFunctionType,
@JsonProperty("types") Optional<List<Type>> types)
@JsonProperty("types") Optional<List<Type>> types,
@JsonProperty("clusteredBy") Optional<List<String>> clusteredBy,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shixuan-fan, actually it's tricky. bucketedBy in hive world has too strong assumption on hash bucketing. This assumption is also buried in other places in our codebase. So might be good to distinguish them.

Comment on lines 44 to 46
private final Optional<List<String>> clusteredBy;
private final Optional<List<Integer>> clusterCount;
private final Optional<List<String>> distribution;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to be Optional I guess. empty list can indicate there is no clustering. Same for bucketing.

if (bucketFunctionType.equals(PRESTO_NATIVE)) {
checkArgument(types.isPresent(), "Types must be present for bucket function type " + bucketFunctionType);
checkArgument(types.get().size() == bucketedBy.size(), "The sizes of bucketedBy and types should match");
} else if (bucketFunctionType.equals(HIVE_CLUSTERING)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We usually put else if in the new line

} else if (bucketFunctionType.equals(HIVE_CLUSTERING)) {
checkArgument(clusteredBy.isPresent(), "Cluster columns must be present for bucket function type " + bucketFunctionType);
checkArgument(clusterCount.isPresent(), "Cluster numbers must be present for bucket function type " + bucketFunctionType);
checkArgument(bucketedBy.isEmpty(), "Bucket columns should not be present for bucket function type " + bucketFunctionType);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

return Optional.empty();
}

private static boolean doBucketing(List<String> bucketedBy)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@@ -47,6 +47,9 @@
public static final String BUCKETED_BY_PROPERTY = "bucketed_by";
public static final String BUCKET_COUNT_PROPERTY = "bucket_count";
public static final String SORTED_BY_PROPERTY = "sorted_by";
public static final String CLUSTERED_BY_PROPERTY = "clustered_by";
public static final String CLUSTER_COUNT_PROPERTY = "cluster_count";
public static final String DISTRIBUTION_PROPERTY = "distribution";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's maybe remove this from table properties at the moment. Kinda wanna make it hidden from users. Let me think about how to do that

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, let's use another API to store and read distribution to hide it from end users.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the discussion, I remember that API is not ready. Let me confirm with Suketu.

@highker highker self-requested a review April 30, 2021 19:55
typeManager.getType(parseTypeSignature("array(varchar)")),
List.class,
ImmutableList.of(),
false,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make it hidden before we find a way to store the distribution

Comment on lines 57 to 58
@JsonProperty("distribution") Optional<List<String>> distribution
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put ) to the previous line

Comment on lines 157 to 158


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We disallow two consecutive empty lines

@@ -1007,12 +1007,16 @@ public ConnectorTableHandle createTemporaryTable(ConnectorSession session, List<
List<String> partitionColumns = partitioning.getPartitionColumns();
BucketFunctionType bucketFunctionType = partitioningHandle.getBucketFunctionType();
switch (bucketFunctionType) {
case HIVE_CLUSTERING:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function is to create temp table. We should actually throw if we see a clustering when writing to temp tables.

Comment on lines 287 to 289
List<String> clusteredBy,
List<Integer> clusterCount,
List<String> distribution)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should check

  • the clusteredBy size is the same as clusterCount
  • clusterCount has all positive integers that are all 2^n. Also it should between something like 0 and 2^16
  • the multiplication of all cluster numbers should be <= 2^16

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. Will check.

Comment on lines 101 to 126
public void testCreateTableAsSelect()
{
String query = (
"CREATE TABLE hive_bucketed.tpch_bucketed.customer_copied \n" +
" WITH (" +
" format = 'TEXTFILE',\n" +
" clustered_by = array['custkey'], \n" +
" cluster_count = array[11], \n" +
" distribution = array['150', '300', '450', '600', '750', '900', '1050', '1200', '1350']) \n" +
" AS SELECT * FROM customer");

MaterializedResult result = computeActual(query);
assertEquals(result.getOnlyValue(), 1500L);

query = "SELECT COUNT(*) FROM hive_bucketed.tpch_bucketed.customer_copied";
result = computeActual(query);
assertEquals(result.getOnlyValue(), 1500L);

query = "SELECT custkey, COUNT(*) FROM hive_bucketed.tpch_bucketed.customer_copied GROUP BY custkey ORDER BY custkey LIMIT 1 ";
result = computeActual(query);
assertEquals(result.getRowCount(), 1L);
assertEquals(result.getMaterializedRows().get(0).getField(1), 1L);
}

@Test
public void testInsert()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merge these two tests together.

result = computeActual(query);
assertEquals(result.getOnlyValue(), 3000L);

query = "SELECT custkey, COUNT(*) FROM hive_bucketed.tpch_bucketed.customer_copied GROUP BY custkey ORDER BY custkey LIMIT 1 ";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An interesting test would be creating two tables: one with clustering and one without. Then use the same select query with some proper filters against these two tables. Then we should expect the one against clustering table will read fewer rows and data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to check the query stats to get the rows and data read?

@@ -43,7 +43,7 @@
private final Optional<List<Type>> types;
private final Optional<List<String>> clusteredBy;
private final Optional<List<Integer>> clusterCount;
private final Optional<List<String>> distribution;
private final Optional<List<Object>> distribution;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, as discussed offline, we will need a good data structure to represent distribution. ColumnStatistics is a good example. We likely could make it a thrift serializable class. This part needs some brainstorming. Let's chat offline.

import java.util.Map;
import java.util.Optional;

public final class IntervalExtractor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's chat offline on the distribution design and how to store it. It is fairly critical I think

Comment on lines +162 to +221
"rO0ABXNyABFqYXZhLmxhbmcuSW50ZWdlchLioKT3gYc4AgABSQAFdmFsdWV4cgAQamF2YS5sYW5nLk51bWJlcoaslR0LlOCLAgAAeHAAAAAB",
"c3IAEWphdmEubGFuZy5JbnRlZ2VyEuKgpPeBhzgCAAFJAAV2YWx1ZXhyABBqYXZhLmxhbmcuTnVtYmVyhqyVHQuU4IsCAAB4cAAAAAI=",
"c3IAEWphdmEubGFuZy5JbnRlZ2VyEuKgpPeBhzgCAAFJAAV2YWx1ZXhyABBqYXZhLmxhbmcuTnVtYmVyhqyVHQuU4IsCAAB4cAAAAAM=",
"c3IAEGphdmEubGFuZy5Eb3VibGWAs8JKKWv7BAIAAUQABXZhbHVleHIAEGphdmEubGFuZy5OdW1iZXKGrJUdC5TgiwIAAHhwP/GZmZmZmZo=",
"c3IAEGphdmEubGFuZy5Eb3VibGWAs8JKKWv7BAIAAUQABXZhbHVleHIAEGphdmEubGFuZy5OdW1iZXKGrJUdC5TgiwIAAHhwQAGZmZmZmZo=",
"c3IAEGphdmEubGFuZy5Eb3VibGWAs8JKKWv7BAIAAUQABXZhbHVleHIAEGphdmEubGFuZy5OdW1iZXKGrJUdC5TgiwIAAHhwQApmZmZmZmY=",
"dAABYQ==",
"dAABYg==",
"dAABeA=="));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wow; definitely we won't expose this to end users lol

@lliiun-z lliiun-z force-pushed the hive_clustering_features branch 2 times, most recently from c462bc1 to 534cf5e Compare May 7, 2021 23:46
Copy link
Contributor

@viczhang861 viczhang861 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this feature about partition data to different clusters?

// TODO: Return Z-Order Address.
public int getBucket(Page page, int position)
{
return clustering.getCluster(page, position);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is supposed to calculate bucket number, is cluster id similar to bucket id?

What do you think of adding getPartitionFunction to interface ConnectorNodePartitioningProvider
and adding ClusterPartitionFunction that implements PartitionFunction

@lliiun-z lliiun-z force-pushed the hive_clustering_features branch 3 times, most recently from 0d84fd4 to 0b9aa26 Compare June 14, 2021 16:14
@lliiun-z lliiun-z force-pushed the hive_clustering_features branch 2 times, most recently from feb5e36 to cbaa6af Compare July 19, 2021 21:47
To enable clustering, we add three table properties.

1. clustered_by: the columns used for clustering.
2. cluster_count: the number of clusters.
3. distribution: the data distribution of each cluster column.

Test:

1. Added an integration test to ensure the syntax is accepted by Presto.
To expose these three parameters, we

1. Create a new BucketFunctionType: HIVE_CLUSTERING.
2. Update the logic/constructor in HiveBucketProperties.
3. Update the usage of HiveBucketProperties constructor to pass
   empty values for clustering parameters.

Test:
  This diff should not produce any side effect, it only update the APIs of the class.
This diff mainly add the z-order logic, which takes the clusterCount,
clusteredBy, distribution, and types to construct the MortonCode object,
which translate the dimension values into clusterId, which is returned
as a bucket id.

TESTS:

Add unit tests.
So far storagedescriptor is not updated to have clustering paramters.
Therefore, some checks will fail when we do clustering.

To solve this problem, we read the clustering paramters from table parameters.
These information will be used in query like DESC or SHOW CREATE TABLE.

TEST:
Ran local integration test and tested in test cluster.
…scriptor

TEST:
1. Add Unit test.
2. Tested in local and testing cluster.
1. Integrate with our own z-order library.
2. Fixed a few bugs
gxin@fb.com and others added 2 commits August 3, 2021 15:15
This is used to reduce the cpu time for creating page source.
@highker highker removed their request for review December 16, 2021 21:18
@lliiun-z lliiun-z closed this Sep 6, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants