Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Add config to use bucketed scan for filter indexes #329

Merged
merged 9 commits into from
Feb 6, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ object IndexConstants {
"spark.hyperspace.index.hybridscan.maxAppendedRatio"
val INDEX_HYBRID_SCAN_APPENDED_RATIO_THRESHOLD_DEFAULT = "0.3"

// Config used to set bucketSpec for Filter rule. If bucketSpec is used, Spark can prune
// not applicable buckets, so it can read less files in case of a highly selective query.
val INDEX_FILTER_RULE_USE_BUCKET_SPEC = "spark.hyperspace.index.filterRule.useBucketSpec"
val INDEX_FILTER_RULE_USE_BUCKET_SPEC_DEFAULT = "false"

// Identifier injected to HadoopFsRelation as an option if an index is applied.
// Currently, the identifier is added to options field of HadoopFsRelation.
// In Spark 3.0, we could utilize TreeNodeTag to mark the identifier for each plan.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import com.microsoft.hyperspace.actions.Constants
import com.microsoft.hyperspace.index.IndexLogEntry
import com.microsoft.hyperspace.index.rankers.FilterIndexRanker
import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEventLogging, HyperspaceIndexUsageEvent}
import com.microsoft.hyperspace.util.ResolverUtils
import com.microsoft.hyperspace.util.{HyperspaceConf, ResolverUtils}

/**
* FilterIndex rule looks for opportunities in a logical plan to replace
Expand Down Expand Up @@ -56,13 +56,16 @@ object FilterIndexRule
findCoveringIndexes(filter, outputColumns, filterColumns)
FilterIndexRanker.rank(spark, filter, candidateIndexes) match {
case Some(index) =>
// Do not set BucketSpec to avoid limiting Spark's degree of parallelism.
// As FilterIndexRule is not intended to support bucketed scan, we set
// useBucketUnionForAppended as false. If it's true, Hybrid Scan can cause
// unnecessary shuffle for appended data to apply BucketUnion for merging data.
val transformedPlan =
RuleUtils.transformPlanToUseIndex(
spark,
index,
originalPlan,
useBucketSpec = false)
useBucketSpec = HyperspaceConf.useBucketSpecForFilterRule(spark),
useBucketUnionForAppended = false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a comment why this will always be false for the filter index rule? We will never take advantage of bucketing from the union right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be beneficial for later ops which requires bucketing, but just for filter index, we don't need it.
I think it's better to write a new rule for the cases if needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we add the comment to the code (since it's not straightforward to understand)?

logEvent(
HyperspaceIndexUsageEvent(
AppInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,18 @@ object JoinIndexRule
val updatedPlan =
join
.copy(
left =
RuleUtils.transformPlanToUseIndex(spark, lIndex, l, useBucketSpec = true),
right =
RuleUtils.transformPlanToUseIndex(spark, rIndex, r, useBucketSpec = true))
left = RuleUtils.transformPlanToUseIndex(
spark,
lIndex,
l,
useBucketSpec = true,
useBucketUnionForAppended = true),
right = RuleUtils.transformPlanToUseIndex(
spark,
rIndex,
r,
useBucketSpec = true,
useBucketUnionForAppended = true))

logEvent(
HyperspaceIndexUsageEvent(
Expand Down Expand Up @@ -325,11 +333,7 @@ object JoinIndexRule
compatibleIndexPairs.map(
indexPairs =>
JoinIndexRanker
.rank(
spark,
leftRel,
rightRel,
indexPairs)
.rank(spark, leftRel, rightRel, indexPairs)
.head)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,13 +202,15 @@ object RuleUtils {
* @param index Index used in transformation of plan.
* @param plan Current logical plan.
* @param useBucketSpec Option whether to use BucketSpec for reading index data.
* @param useBucketUnionForAppended Option whether to use BucketUnion to merge appended data.
* @return Transformed plan.
*/
def transformPlanToUseIndex(
spark: SparkSession,
index: IndexLogEntry,
plan: LogicalPlan,
useBucketSpec: Boolean): LogicalPlan = {
useBucketSpec: Boolean,
useBucketUnionForAppended: Boolean): LogicalPlan = {
// Check pre-requisite.
val logicalRelation = getLogicalRelation(plan)
assert(logicalRelation.isDefined)
Expand All @@ -225,7 +227,7 @@ object RuleUtils {
lazy val isSourceUpdated = index.hasSourceUpdate

val transformed = if (hybridScanRequired || isSourceUpdated) {
transformPlanToUseHybridScan(spark, index, plan, useBucketSpec)
transformPlanToUseHybridScan(spark, index, plan, useBucketSpec, useBucketUnionForAppended)
} else {
transformPlanToUseIndexOnlyScan(spark, index, plan, useBucketSpec)
}
Expand Down Expand Up @@ -302,13 +304,15 @@ object RuleUtils {
* @param index Index used in transformation of plan.
* @param plan Current logical plan.
* @param useBucketSpec Option whether to use BucketSpec for reading index data.
* @param useBucketUnionForAppended Option whether to use BucketUnion to merge appended data.
* @return Transformed logical plan that leverages an index and merges appended data.
*/
private def transformPlanToUseHybridScan(
spark: SparkSession,
index: IndexLogEntry,
plan: LogicalPlan,
useBucketSpec: Boolean): LogicalPlan = {
useBucketSpec: Boolean,
useBucketUnionForAppended: Boolean): LogicalPlan = {
var unhandledAppendedFiles: Seq[Path] = Nil

// Get transformed plan with index data and appended files if applicable.
Expand Down Expand Up @@ -426,7 +430,7 @@ object RuleUtils {

val planForAppended =
transformPlanToReadAppendedFiles(spark, index, plan, unhandledAppendedFiles)
if (useBucketSpec) {
if (useBucketUnionForAppended && useBucketSpec) {
// If Bucketing information of the index is used to read the index data, we need to
// shuffle the appended data in the same way to correctly merge with bucketed index data.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ object HyperspaceConf {
.toDouble
}

def useBucketSpecForFilterRule(spark: SparkSession): Boolean = {
spark.conf
.get(
IndexConstants.INDEX_FILTER_RULE_USE_BUCKET_SPEC,
IndexConstants.INDEX_FILTER_RULE_USE_BUCKET_SPEC_DEFAULT)
.toBoolean
}

def numBucketsForIndex(spark: SparkSession): Int = {
getConfStringWithMultipleKeys(
spark,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class HybridScanForDeltaLakeTest extends HybridScanSuite {

test(
"Append-only: filter rule & parquet format, " +
"index relation should include appended file paths") {
"index relation should include appended file paths.") {
// This flag is for testing plan transformation if appended files could be load with index
// data scan node. Currently, it's applied for a very specific case: FilterIndexRule,
// Parquet source format, no partitioning, no deleted files.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class HybridScanForNonPartitionedDataTest extends HybridScanSuite {

test(
"Append-only: filter rule & parquet format, " +
"index relation should include appended file paths") {
"index relation should include appended file paths.") {
// This flag is for testing plan transformation if appended files could be load with index
// data scan node. Currently, it's applied for a very specific case: FilterIndexRule,
// Parquet source format, no partitioning, no deleted files.
Expand Down Expand Up @@ -83,7 +83,7 @@ class HybridScanForNonPartitionedDataTest extends HybridScanSuite {
}
}

test("Delete-only: Hybrid Scan for delete support doesn't work without lineage column") {
test("Delete-only: Hybrid Scan for delete support doesn't work without lineage column.") {
val indexConfig = IndexConfig("index_ParquetDelete2", Seq("clicks"), Seq("query"))
Seq(("indexWithoutLineage", "false", false), ("indexWithLineage", "true", true)) foreach {
case (indexName, lineageColumnConfig, transformationExpected) =>
Expand Down Expand Up @@ -119,7 +119,7 @@ class HybridScanForNonPartitionedDataTest extends HybridScanSuite {
}
}

test("Delete-only: filter rule, number of delete files threshold") {
test("Delete-only: filter rule, number of delete files threshold.") {
withTempPathAsString { testPath =>
val indexName = "IndexDeleteCntTest"
withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "true") {
Expand All @@ -140,8 +140,6 @@ class HybridScanForNonPartitionedDataTest extends HybridScanSuite {

val afterDeleteSize = FileUtils.getDirectorySize(new Path(testPath))
val deletedRatio = 1 - (afterDeleteSize / sourceSize.toFloat)
// scalastyle:off
println(deletedRatio)

withSQLConf(TestConfig.HybridScanEnabled: _*) {
withSQLConf(IndexConstants.INDEX_HYBRID_SCAN_DELETED_RATIO_THRESHOLD ->
Expand All @@ -159,5 +157,4 @@ class HybridScanForNonPartitionedDataTest extends HybridScanSuite {
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ trait HybridScanSuite extends QueryTest with HyperspaceSuite {

test(
"Append-only: join rule, appended data should be shuffled with indexed columns " +
"and merged by BucketUnion") {
"and merged by BucketUnion.") {
withTempPathAsString { testPath =>
val appendPath1 = testPath + "/append1"
val appendPath2 = testPath + "/append2"
Expand Down Expand Up @@ -400,11 +400,12 @@ trait HybridScanSuite extends QueryTest with HyperspaceSuite {
val basePlan = baseQuery.queryExecution.optimizedPlan

withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "-1") {
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
withSQLConf(
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
IndexConstants.INDEX_HYBRID_SCAN_ENABLED -> "false") {
val join = joinQuery()
checkAnswer(join, baseQuery)
assert(basePlan.equals(join.queryExecution.optimizedPlan))
val join = joinQuery()
checkAnswer(join, baseQuery)
assert(basePlan.equals(join.queryExecution.optimizedPlan))
}

withSQLConf(TestConfig.HybridScanEnabled: _*) {
Expand All @@ -428,7 +429,7 @@ trait HybridScanSuite extends QueryTest with HyperspaceSuite {

test(
"Append-only: filter rule and non-parquet format," +
"appended data should be shuffled and merged by Union") {
"appended data should be shuffled and merged by Union.") {
// Note: for delta lake, this test is also eligible as the dataset is partitioned.
withTempPathAsString { testPath =>
val (appendedFiles, deletedFiles) = setupIndexAndChangeData(
Expand Down Expand Up @@ -460,12 +461,76 @@ trait HybridScanSuite extends QueryTest with HyperspaceSuite {
appendedFiles,
deletedFiles,
Seq(" <= 2000"))

// Check bucketSpec is not used.
val bucketSpec = planWithHybridScan collect {
case LogicalRelation(HadoopFsRelation(_, _, _, bucketSpec, _, _), _, _, _) =>
bucketSpec
}
assert(bucketSpec.length == 2)

// bucketSpec.head is for the index plan, bucketSpec.last is for the plan
// for appended files.
assert(bucketSpec.head.isEmpty && bucketSpec.last.isEmpty)

checkAnswer(baseQuery, filter)
}
}
}

test(
"Append-only: filter rule and non-parquet format," +
"appended data should be shuffled and merged by Union even with bucketSpec.") {
withTempPathAsString { testPath =>
val (appendedFiles, deletedFiles) = setupIndexAndChangeData(
fileFormat2,
testPath,
indexConfig1.copy(indexName = "index_Format2Append"),
appendCnt = 1,
deleteCnt = 0)

val df = spark.read.format(fileFormat2).load(testPath)
def filterQuery: DataFrame = df.filter(df("clicks") <= 2000).select(df("query"))

val baseQuery = filterQuery
val basePlan = baseQuery.queryExecution.optimizedPlan

withSQLConf(IndexConstants.INDEX_HYBRID_SCAN_ENABLED -> "false") {
val filter = filterQuery
assert(basePlan.equals(filter.queryExecution.optimizedPlan))
}

withSQLConf(
TestConfig.HybridScanEnabledAppendOnly :+
IndexConstants.INDEX_FILTER_RULE_USE_BUCKET_SPEC -> "true": _*) {
val filter = filterQuery
val planWithHybridScan = filter.queryExecution.optimizedPlan
assert(!basePlan.equals(planWithHybridScan))

checkFilterIndexHybridScanUnion(
planWithHybridScan,
"index_Format2Append",
appendedFiles,
deletedFiles,
Seq(" <= 2000"))

// Check bucketSpec is used.
val bucketSpec = planWithHybridScan collect {
case LogicalRelation(HadoopFsRelation(_, _, _, bucketSpec, _, _), _, _, _) =>
bucketSpec
}
assert(bucketSpec.length == 2)
// bucketSpec.head is for the index plan, bucketSpec.last is for the plan
// for appended files.
assert(bucketSpec.head.isDefined && bucketSpec.last.isEmpty)
assert(bucketSpec.head.get.bucketColumnNames.toSet === indexConfig1.indexedColumns.toSet)

checkAnswer(baseQuery, filter)
}
}
}

test("Delete-only: index relation should have additional filter for deleted files") {
test("Delete-only: index relation should have additional filter for deleted files.") {
val testSet = Seq(("index_ParquetDelete", fileFormat), ("index_JsonDelete", fileFormat2))
testSet foreach {
case (indexName, dataFormat) =>
Expand Down Expand Up @@ -640,8 +705,9 @@ trait HybridScanSuite extends QueryTest with HyperspaceSuite {
assert(basePlan.equals(join.queryExecution.optimizedPlan))
}

withSQLConf(TestConfig.HybridScanEnabled :+
"spark.sql.optimizer.inSetConversionThreshold" -> "1": _*) {
withSQLConf(
TestConfig.HybridScanEnabled :+
"spark.sql.optimizer.inSetConversionThreshold" -> "1": _*) {
// Changed inSetConversionThreshould to check InSet optimization.
val join = joinQuery()
val planWithHybridScan = join.queryExecution.optimizedPlan
Expand Down