Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Add config to use bucketed scan for filter indexes #329

Merged
merged 9 commits into from
Feb 6, 2021

Conversation

sezruby
Copy link
Collaborator

@sezruby sezruby commented Jan 20, 2021

What is the context for this pull request?

  • Tracking Issue: n/a
  • Parent Issue: n/a
  • Dependencies: n/a

What changes were proposed in this pull request?

Add a config for filter index rule to apply bucketing information when reading the index data.

  // Config used to set bucketSpec for Filter Index. If bucketSpec is used, Spark can prune
  // not applicable buckets, so we could read less files in case of a high selectivity query.
  val INDEX_FILTER_RULE_USE_BUCKET_SPEC = "spark.hyperspace.index.filterRule.useBucketSpec"
  val INDEX_FILTER_RULE_USE_BUCKET_SPEC_DEFAULT = "false"

Does this PR introduce any user-facing change?

Yes, plan is changed a bit for filter indexes if the config is true.

How was this patch tested?

Unit test

@sezruby sezruby changed the title Add config to use bucketing for filter indexes Add config to use bucketed scan for filter indexes Jan 20, 2021
Copy link
Contributor

@imback82 imback82 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will check back when the test is added.

@sezruby sezruby self-assigned this Jan 20, 2021
@rapoth rapoth added the enhancement New feature or request label Jan 27, 2021
Copy link
Contributor

@imback82 imback82 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the plan for this PR vs. #332? This PR wants to enable bucketing so that bucket pruning can kick in; but it may regress if bucket pruning does not kick in. Is it possible to do some check if bucket pruning would kick in or not? If this is not feasible, maybe introduce a strategy to look at the physical plan?

@sezruby
Copy link
Collaborator Author

sezruby commented Feb 2, 2021

The config is disabled by default. 2 main points:

  • Bucket union handling
  • Make it switchable option so if a user would like to force it, then they can use this config
    • other than bucket pruning, bucketing might be beneficial if there's a sort merge join which are not covered by indexes.

For #332, I'd like to exclude "pruned" index data files paths only, using the bucket pruning logic. After that, setting bucket spec could be a different option depending on their workloads.

We might be able to detect such SMJ and setting bucket spec or not - as filter indexes are the last rule.

@imback82
Copy link
Contributor

imback82 commented Feb 2, 2021

  • Bucket union handling

Seems like a byproduct of introducing the new config so that we can always disable bucket union for filter rule (I see useBucketUnionForAppended = false for filter rule)? What happens if we enable bucket union for filter rule if bucket spec is enabled?

  • other than bucket pruning, bucketing might be beneficial if there's a sort merge join which are not covered by indexes.

Hmm, if this is the case (benefits SMJ), shouldn't join rule have already kicked in?

For #332, I'd like to exclude index data files only, using the bucket pruning logic. After that, setting bucket spec could be a different option depending on their workloads.

Yea, excluding index data files makes sense. But we are going introduce "additional" option for setting the bucket spec? It's usually better for Spark to decide how to distribute the tasks (instead of limiting it to the number of buckets). @apoorvedave1 has done some benchmarks on this.

@imback82
Copy link
Contributor

imback82 commented Feb 2, 2021

Btw, I am +1 for having this config, but just have few questions.

@sezruby
Copy link
Collaborator Author

sezruby commented Feb 2, 2021

Seems like a byproduct of introducing the new config so that we can always disable bucket union for filter rule (I see useBucketUnionForAppended = false for filter rule)? What happens if we enable bucket union for filter rule if bucket spec is enabled?

Oh I forgot to disable bucket union for Filter 🤔..
Yeah if we could extract the join and apply the index properly with join rule(+v2) then that would be the best.

But we are going introduce "additional" option for setting the bucket spec? It's usually better for Spark to decide how to distribute the tasks (instead of limiting it to the number of buckets).

If we don't set the bucket spec, spark will create splits with the input files by bin packing -
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L594

One benefit using bucketSpec is that we could enforce each file will be handled separately by an executor.
This might be good or bad depending on the environment/workloads, but for cache thing, this would help.
(though I didn't measure the performance after filtering the file path + no bucket spec)

We could use this config internally, and then decide to expose this or not later? WDYT?

@imback82
Copy link
Contributor

imback82 commented Feb 2, 2021

If we don't set the bucket spec, spark will create splits with the input files by bin packing -
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L594

Yes, but from my experience, this works out better by utilizing the full cluster than limiting the number of tasks to the number of buckets.

We could use this config internally, and then decide to expose this or not later? WDYT?

Yes, I think we can have this config and document the behavior clearly.

@imback82 imback82 added this to the February 2021 (v0.5.0) milestone Feb 2, 2021
@andrei-ionescu
Copy link
Contributor

I do like this new feature even if added as an undocumented flag. I did some tests over big datasets (billions of records) and having the index on a high cardinality field without bucketing and bucket pruning is not adding any performance improvement. Bucketing (and maybe partitioning by the indexed column) will add the benefit of pruning.

I'm looking forward for such a feature.

@rapoth
Copy link
Contributor

rapoth commented Feb 5, 2021

I do like this new feature even if added as an undocumented flag. I did some tests over big datasets (billions of records) and having the index on a high cardinality field without bucketing and bucket pruning is not adding any performance improvement. Bucketing (and maybe partitioning by the indexed column) will add the benefit of pruning.

I'm looking forward for such a feature.

@andrei-ionescu This is very interesting. Do you have any more information (e.g., workload, numbers) you could share? Also, was it a single query or were you running a concurrent workload?

I do agree with supporting partitioning though (I'm assuming you are referring to low cardinality columns). We haven't been able to get to working on partitioning yet (but it's definitely I've heard from multiple customers). It also has several other benefits like being able to support advanced scenarios like index retention (e.g., if partitioned by timestamp, we could do clean-up very easily - in the current implementation, it'd have to rewrite the index to some large degree) and I guess we can also leverage dynamic partition pruning. :)

@andrei-ionescu
Copy link
Contributor

andrei-ionescu commented Feb 5, 2021

@rapoth Here are some details:

Query

val sql = s"""
  SELECT   ts.timestamp
  FROM     ts 
  WHERE    ts.timestamp >= to_timestamp('2020-03-17')
  AND      ts.timestamp < to_timestamp('2020-03-18')
  LIMIT    1000
"""

Executed with:

spark.sql(sql).collect

Dataset

  • schema size is about 20 top fields and about 17 of these are heavily nested
  • about 34 Billion rows
  • the timestamp field is of timestamp type and is up to seconds
  • the cardinality of the timestamp values is: 17 145 000 out of 34 155 510 037
  • the format is Iceberg

Index

hs.createIndex(
  ts, 
  IndexConfig(
    "idx_ts3", 
    indexedColumns = Seq("timestamp"), 
    includedColumns = Seq("ns", "id")))

The index has:

  • 434GB total index size
  • 200 files
  • 2.3GB average file size

Explained query

=============================================================
Plan with indexes:
=============================================================
CollectLimit 1000
+- *(1) Project [timestamp#207]
   +- *(1) Filter ((isnotnull(timestamp#207) && (timestamp#207 >= 1584403200000000)) && (timestamp#207 < 1584489600000000))
      <----+- *(1) FileScan Hyperspace(Type: CI, Name: idx_ts3, LogVersion: 1) [timestamp#207] Batched: true, DataFilters: [isnotnull(timestamp#207), (timestamp#207 >= 1584403200000000), (timestamp#207 < 1584489600000000)], Format: Parquet, Location: InMemoryFileIndex[dbfs:/u.../spark-warehouse/indexes/idx_ts3/v__=0/part-00000-tid-451174797136..., PartitionFilters: [], PushedFilters: [IsNotNull(timestamp), GreaterThanOrEqual(timestamp,2020-03-17 00:00:00.0), LessThan(timestamp,20..., ReadSchema: struct<timestamp:timestamp>---->

=============================================================
Plan without indexes:
=============================================================
CollectLimit 1000
+- *(1) Project [timestamp#207]
   +- *(1) Filter ((isnotnull(timestamp#207) && (timestamp#207 >= 1584403200000000)) && (timestamp#207 < 1584489600000000))
      <----+- *(1) ScanV2 iceberg[timestamp#207] (Filters: [isnotnull(timestamp#207), (timestamp#207 >= 1584403200000000), (timestamp#207 < 1584489600000000)], Options: [...)---->

=============================================================
Indexes used:
=============================================================
idx_ts3:/.../spark-warehouse/indexes/idx_ts3/v__=0

=============================================================
Physical operator stats:
=============================================================
+--------------------------------------------------------+-------------------+------------------+----------+
|                                       Physical Operator|Hyperspace Disabled|Hyperspace Enabled|Difference|
+--------------------------------------------------------+-------------------+------------------+----------+
|                                       *DataSourceV2Scan|                  1|                 0|        -1|
|*Scan Hyperspace(Type: CI, Name: idx_ts3, LogVersion: 1)|                  0|                 1|         1|
|                                            CollectLimit|                  1|                 1|         0|
|                                                  Filter|                  1|                 1|         0|
|                                                 Project|                  1|                 1|         0|
|                                       WholeStageCodegen|                  1|                 1|         0|
+--------------------------------------------------------+-------------------+------------------+----------+

The cluster

I did run the experiment on Databricks cluster with the following details:

  • driver: 64 cores 432GB memory
  • 6 workers: 32 cores 256GB memory
  • Spark version 2.4.5

Results

Time to get the 1000 rows:

  • with Hyperspace is 17.24s
  • without Hyperspace is 16.86s

Hope all these helps.

@imback82
Copy link
Contributor

imback82 commented Feb 5, 2021

Thanks @andrei-ionescu for the info.

I guess one solution to this is to allow transformation of the indexed keys (bucketing to minutes/hours instead of seconds, for example).

@andrei-ionescu
Copy link
Contributor

andrei-ionescu commented Feb 5, 2021

Even if we don't modify (transform) the timestamp to minutes/hours, it is better from the current form. I'm having in average about 2K duplicates for each timestamp value in the index. Bucketing or partitioning just by the values will reduce the query time tremendously.

For example, instead of having the index dataset laid out like this:

/.../spark-warehouse/indexes/idx_ts3/v__=0/.....parquet

I would suggest the following:

/.../spark-warehouse/indexes/idx_ts3/v__=0/timestamp=2020-03-17T00:03:07/.....parquet

@rapoth
Copy link
Contributor

rapoth commented Feb 5, 2021

@andrei-ionescu Would you mind opening a feature request with your original benchmark? You are bringing up a very interesting topic of indexing for timeseries data (I did not know you were looking at timestamp predicates :)). Alternately, I can copy some of this discussion and create one. Please let me know which one you prefer. I have some follow-up questions but I will ask them in the new issue.

We are also heavily interested in building specialized indexes for timeseries data so it is definitely awesome you have the same scenarios!

@imback82
Copy link
Contributor

imback82 commented Feb 5, 2021

I would suggest the following:

/.../spark-warehouse/indexes/idx_ts3/v__=0/timestamp=2020-03-17T00:03:07/.....parquet

Hive-partitioning was explored before but abandoned due to the fact that we need to create bucket files for each partition and wasn't scalable in our scenario.

But now that we have a specific use case, we can explore this again (prob. in the form of specialized index).

@andrei-ionescu
Copy link
Contributor

andrei-ionescu commented Feb 5, 2021

@imback82 I'm proposing to add .partitionBy(resolvedIndexedColumns: _*). in between write and parquet similar to this:

  .repartition(resolvedIndexedColumns.map(df(_)): _*)
  .write
  .partitionBy(resolvedIndexedColumns: _*)
  .parquet(...)

somewhere around this place: CreateActionBase.scala#L129-L139.

This can be just a flag, or even better, an index config property, as in cases of high cardinality it may throw out a lot of folders/partitions.

We can go even a step further and detect it and choose the best approach.

@andrei-ionescu
Copy link
Contributor

@rapoth I'll open up a feature request as you suggest but currently I'm focused on the following things: DataSourceV2 support (#321), Iceberg support (#320) and the Nested fields support (#347).

@rapoth
Copy link
Contributor

rapoth commented Feb 5, 2021

@andrei-ionescu I apologize. I only meant to open up a new thread so we can continue the conversation there (I did not mean to say we'd work on it immediately).

@andrei-ionescu
Copy link
Contributor

@rapoth One more thing, I don't think it really matter if is a time series dataset. In any dataset that has lots of data and the resulted index is also massive adding this option will bring a lot of benefits.

@rapoth
Copy link
Contributor

rapoth commented Feb 5, 2021

@andrei-ionescu Fair point. I recently talked with a lot of customers who had timeseries data specifically so seeing your comment on timestamps made me forget every other use case 🦖

@andrei-ionescu
Copy link
Contributor

andrei-ionescu commented Feb 5, 2021

@rapoth Here is the feature request: #351.

I'll try submitting a proposal for it next week.

@imback82
Copy link
Contributor

imback82 commented Feb 5, 2021

@imback82 I'm proposing to add .partitionBy(resolvedIndexedColumns: _*). in between write and parquet similar to this:

Yes, that's the hive partitioning I was referring to.

@apoorvedave1 has done some prototyping on this and write wasn't really scaling. So you may want to try it first on your dataset as well.

Copy link
Contributor

@imback82 imback82 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM (few nits/questions), thanks @sezruby!

@@ -62,7 +62,8 @@ object FilterIndexRule
spark,
index,
originalPlan,
useBucketSpec = false)
useBucketSpec = HyperspaceConf.useBucketSpecForFilterRule(spark),
useBucketUnionForAppended = false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a comment why this will always be false for the filter index rule? We will never take advantage of bucketing from the union right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be beneficial for later ops which requires bucketing, but just for filter index, we don't need it.
I think it's better to write a new rule for the cases if needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we add the comment to the code (since it's not straightforward to understand)?

test(
"Append-only: filter rule and non-parquet format," +
"appended data should be shuffled and merged by Union even with bucketSpec.") {
// Note: for delta lake, this test is also eligible as the dataset is partitioned.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How should we handle this "Note"? This sounds like a TODO?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's note because this test is already included in delta lake hybrid scan tests.

Copy link
Contributor

@imback82 imback82 Feb 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we just remove the comment then? (not sure about the importance unless I am missing something)

@imback82 imback82 merged commit 88f1b43 into microsoft:master Feb 6, 2021
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants