Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

[FEATURE REQUEST]: Support partitioning and bucketing of the index dataset #351

Open
andrei-ionescu opened this issue Feb 5, 2021 · 5 comments
Labels
enhancement New feature or request untriaged This is the default tag for a newly created issue

Comments

@andrei-ionescu
Copy link
Contributor

Feature requested

In the case of very large datasets (34 Billion records) the generated index is formed out of big files and has a performance degradation.

Given the following details...

Query

val sql = s"""
  SELECT   ts.timestamp
  FROM     ts 
  WHERE    ts.timestamp >= to_timestamp('2020-03-17')
  AND      ts.timestamp < to_timestamp('2020-03-18')
  LIMIT    1000
"""

Executed with:

spark.sql(sql).collect

Dataset

  • schema size is about 20 top fields and about 17 of these are heavily nested
  • about 34 Billion rows
  • the timestamp field is of timestamp type and is up to seconds
  • the cardinality of the timestamp values is: 17 145 000 out of 34 155 510 037
  • the format is Iceberg

Index

hs.createIndex(
  ts, 
  IndexConfig(
    "idx_ts3", 
    indexedColumns = Seq("timestamp"), 
    includedColumns = Seq("ns", "id")))

The index has:

  • 434GB total index size
  • 200 files
  • 2.3GB average file size

Explained query

=============================================================
Plan with indexes:
=============================================================
CollectLimit 1000
+- *(1) Project [timestamp#207]
   +- *(1) Filter ((isnotnull(timestamp#207) && (timestamp#207 >= 1584403200000000)) && (timestamp#207 < 1584489600000000))
      <----+- *(1) FileScan Hyperspace(Type: CI, Name: idx_ts3, LogVersion: 1) [timestamp#207] Batched: true, DataFilters: [isnotnull(timestamp#207), (timestamp#207 >= 1584403200000000), (timestamp#207 < 1584489600000000)], Format: Parquet, Location: InMemoryFileIndex[dbfs:/u.../spark-warehouse/indexes/idx_ts3/v__=0/part-00000-tid-451174797136..., PartitionFilters: [], PushedFilters: [IsNotNull(timestamp), GreaterThanOrEqual(timestamp,2020-03-17 00:00:00.0), LessThan(timestamp,20..., ReadSchema: struct<timestamp:timestamp>---->

=============================================================
Plan without indexes:
=============================================================
CollectLimit 1000
+- *(1) Project [timestamp#207]
   +- *(1) Filter ((isnotnull(timestamp#207) && (timestamp#207 >= 1584403200000000)) && (timestamp#207 < 1584489600000000))
      <----+- *(1) ScanV2 iceberg[timestamp#207] (Filters: [isnotnull(timestamp#207), (timestamp#207 >= 1584403200000000), (timestamp#207 < 1584489600000000)], Options: [...)---->

=============================================================
Indexes used:
=============================================================
idx_ts3:/.../spark-warehouse/indexes/idx_ts3/v__=0

=============================================================
Physical operator stats:
=============================================================
+--------------------------------------------------------+-------------------+------------------+----------+
|                                       Physical Operator|Hyperspace Disabled|Hyperspace Enabled|Difference|
+--------------------------------------------------------+-------------------+------------------+----------+
|                                       *DataSourceV2Scan|                  1|                 0|        -1|
|*Scan Hyperspace(Type: CI, Name: idx_ts3, LogVersion: 1)|                  0|                 1|         1|
|                                            CollectLimit|                  1|                 1|         0|
|                                                  Filter|                  1|                 1|         0|
|                                                 Project|                  1|                 1|         0|
|                                       WholeStageCodegen|                  1|                 1|         0|
+--------------------------------------------------------+-------------------+------------------+----------+

The cluster

I did run the experiment on Databricks cluster with the following details:

  • driver: 64 cores 432GB memory
  • 6 workers: 32 cores 256GB memory
  • Spark version 2.4.5

Results

Time to get the 1000 rows:

  • with Hyperspace is 17.24s
  • without Hyperspace is 16.86s

Acceptance criteria

The time to get 1000 rows using Hyperspace should be at least as twice as fast.

Additional context

For some more context, this has been started on #329 PR.

@andrei-ionescu andrei-ionescu added enhancement New feature or request untriaged This is the default tag for a newly created issue labels Feb 5, 2021
@rapoth
Copy link
Contributor

rapoth commented Feb 5, 2021

Thank you for opening this issue @andrei-ionescu!

I'm copying some context from #329 for easier readability

From: @imback82

Thanks @andrei-ionescu for the info.

I guess one solution to this is to allow transformation of the indexed keys (bucketing to minutes/hours instead of seconds, for example).

From: @andrei-ionescu

Even if we don't modify (transform) the timestamp to minutes/hours, it is better from the current form. I'm having in average about 2K duplicates for each timestamp value in the index. Bucketing or partitioning just by the values will reduce the query time tremendously.

For example, instead of having the index dataset laid out like this:

/.../spark-warehouse/indexes/idx_ts3/v__=0/.....parquet

I would suggest the following:

/.../spark-warehouse/indexes/idx_ts3/v__=0/timestamp=2020-03-17T00:03:07/.....parquet

From: @imback82

I would suggest the following:

/.../spark-warehouse/indexes/idx_ts3/v__=0/timestamp=2020-03-17T00:03:07/.....parquet

Hive-partitioning was explored before but abandoned due to the fact that we need to create bucket files for each partition and wasn't scalable in our scenario.

But now that we have a specific use case, we can explore this again (prob. in the form of specialized index).

From: @andrei-ionescu

@imback82 I'm proposing to add .partitionBy(resolvedIndexedColumns: _*). in between write and parquet similar to this:

  .repartition(resolvedIndexedColumns.map(df(_)): _*)
  .write
  .partitionBy(resolvedIndexedColumns: _*)
  .parquet(...)

somewhere around this place: CreateActionBase.scala#L129-L139.

This can be just a flag, or even better, an index config property, as in cases of high cardinality it may throw out a lot of folders/partitions.

We can go even a step further and detect it and choose the best approach.

@rapoth
Copy link
Contributor

rapoth commented Feb 5, 2021

I have some minor questions:

  1. What is the query pattern here? Filter queries only or even join queries?

  2. Since you are referring to hive-partitioning, let us take a simple example.

    • Imagine we are doing this on a column with cardinality 100, then it'd create a hundred folders. Each folder will now have 200 buckets each. That would make it 100*200 files that need to be written.
    • When we did this experiment, Spark's performance was actually very bad for writing out such large number of files. If the # of partitions increase, the performance was worse.

    What is your opinion about adding an additional "transformed" column (similar to what @imback82) was suggesting? Both approaches have pros and cons. For instance, hive-partitioning style will also allow us to leverage dynamic partition pruning but I'm not (yet) sure how we could do similar things with the latter approach and it's implication on joins.

  3. How many predicates do you expect in the queries? And do users care about the order of these predicates? For instance, index(C1, C2) but query using C2 only or will they always provide these predicates together (in that order).

@andrei-ionescu
Copy link
Contributor Author

What is the query pattern here? Filter queries only or even join queries?

I don't think this matter too much. In any of the two cases, in the best case, the plan gets transformed into reading only the indexes instead of the datasets. Can you bring more light why this matters in your opinion?

Since you are referring to hive-partitioning, let us take a simple example.

  • Imagine we are doing this on a column with cardinality 100, then it'd create a hundred folders. Each folder will now have 200 buckets each. That would make it 100*200 files that need to be written.

  • When we did this experiment, Spark's performance was actually very bad for writing out such large number of files. If the # of partitions increase, the performance was worse.

Let me put my point of view in this area...

  1. The use case of an index is for reading purposes. An index shows its power at read time. If there is no performance improvement on read, then the write time doesn't matter at all. So, I would say that we should focus on the read performance even if there is penalty at write time. With the current way Hyperspace is built this write performance penalty can be alleviated with incremental refresh and hybrid scans.
  2. Depending on the cluster you have, writing 20K files can be very fast or very slow. Let me try putting up some cases and numbers. The best would be to have a cluster with 20K cores (each file written by 1 task, all 20K in parallel) and given my created index example that would take a few seconds (434GB / 20000 = 21MB per file - writing 21MB files should be in the order of seconds). This is the best scenario. Now, taking into consideration my cluster - 192 cores avaialable - it will need to use all 192 cores over and over again for about 105 times (20000/192=105). If we say that writing a 21MB file takes a second, it will mean that all will take 105 seconds, under 2 minutes (less probably). If we say that writing a 21MB file takes 1 minute then we can say it will take under 2 hours. It does not look that bad.
  3. I don't understand why you're saying that Spark's performance was bad. It really depends on the cluster you have. A file can be written by only one thread at once - it is ok to assume that writing data uses one core per output file. The lesser time writing 200 files (as you have it right now) of 2.5GB will be in the case of a cluster with 200 cores and that time would not be changed if you throw more resources to it as at write time it will use only 200 cores to write those 200 files of 2.5GB. On the other hand the lesser time writing 20K files (partitioning + bucketing) of 25MB will be in the case of a cluster with 20K cores and will take sensibly less than in the case of 2.5GB files.

What is your opinion about adding an additional "transformed" column (similar to what @imback82) was suggesting? Both approaches have pros and cons. For instance, hive-partitioning style will also allow us to leverage dynamic partition pruning but I'm not (yet) sure how we could do similar things with the latter approach and it's implication on joins.

I'm not against it at all. I think it solves this problem but ads in a lot of complexities. I've seen it in Iceberg in their partition spec where they offer such column transformations for partitioning. Adding this new concept will add more complexities to Hyperspace: migration, management, how can it be altered, which column transformation will be used/available, how to specify those when creating the index, etc. I think adding partitioning as an option besides the current bucketing adds less complexities than bringing in the "transformations".

How many predicates do you expect in the queries? And do users care about the order of these predicates? For instance, index(C1, C2) but query using C2 only or will they always provide these predicates together (in that order).

It really depends on how the index works. I'm also a begginger with Hyperspace but I can see its power in the Data Lake world.

The use case I'm currently testing Hyperspace is improving the read performance when joining two big Iceberg datasets (billions of rows). The join clause is on two non-timebased columns, while the where clause is on the timestamp. Because I have limited experience with Hyperspace I may not have been using it the best way.

Given this 2 datasets join query

val sql2 = s""" 
SELECT   ts.*, dsmap.other_id as new_id
FROM     ts 
LEFT OUTER JOIN dsmap ON (ts.ns=dsmap.ns AND ts.id=dsmap.id)
WHERE    ts.timestamp >= to_timestamp('2020-03-17')
AND      ts.timestamp < to_timestamp('2020-03-18')
LIMIT 1000 
"""

with ts schema

root
 |-- timestamp: timestamp (nullable = true)
 |-- ns: string (nullable = true)
 |-- id: string (nullable = true)
 |-- ...
 |-- ...

and dismat schema

root
 |-- id: string (nullable = false)
 |-- timestamp: timestamp (nullable = false)
 |-- ns: string (nullable = false)
 |-- other_id: string (nullable = false)

What indexes should I create?

Now, regarding your questions "How many predicates do you expect in the queries? And do users care about the order of these predicates?", I would say I've been working with 2,3 predicates and I usually try to preserve the order. I would also suggest to clearly specify if the order matters or not.

@sezruby
Copy link
Collaborator

sezruby commented Feb 10, 2021

@andrei-ionescu could you share the plan of the query?

With the current version of Hyperspace, you could create these indexes for your join query:

  • ts => indexed column (ns, id) included columns ( all other columns (required for the result) )
  • dismat => indexed column (ns, id), included columns (other_id)

These 2 indexes will remove 2 shuffles for join.

You can use the filter index as you tested, but I guess the performance is similar because Iceberg also handles push-down conditions & partitioning in somehow.

Note that only equalTo query is the candidate of bucket pruning, bucketed scan config won't be effective for your query.

@andrei-ionescu
Copy link
Contributor Author

@sezruby Thanks for confirming my current understanding of how Hyperspace works.

  • ts => indexed column (ns, id) included columns ( all other columns (required for the result) )

This is something that I want to avoid - duplicating the dataset once more (all columns means the whole dataset but bucketed and sorted differently).

But I guess the need of include all columns in the includedColumns is needed because:

  1. In my query I used the ts.* (aka select all)
  2. Hyperspace does NOT support clustered indexes (this is related to Clustered index #354)

You can use the filter index as you tested, but I guess the performance is similar because Iceberg also handles push-down conditions & partitioning in somehow.

Iceberg uses partition pruning, file skipping based on the metadata it has stored and pushed down predicates. This is the reason why it is faster than Hyperspace in some cases.

For timestamp types there is a complexity added to it - the fact that the precision of the timestamp stored in the dataset is at second level while we query for days. I guess that it needs to go through the whole dataset to transform the column even though the index dataset is bucketed.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
enhancement New feature or request untriaged This is the default tag for a newly created issue
Projects
None yet
Development

No branches or pull requests

3 participants