Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Commit

Permalink
Add config to use bucketed scan for filter indexes (#329)
Browse files Browse the repository at this point in the history
  • Loading branch information
sezruby authored Feb 6, 2021
1 parent 9ddf44b commit 88f1b43
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ object IndexConstants {
"spark.hyperspace.index.hybridscan.maxAppendedRatio"
val INDEX_HYBRID_SCAN_APPENDED_RATIO_THRESHOLD_DEFAULT = "0.3"

// Config used to set bucketSpec for Filter rule. If bucketSpec is used, Spark can prune
// not applicable buckets, so it can read less files in case of a highly selective query.
val INDEX_FILTER_RULE_USE_BUCKET_SPEC = "spark.hyperspace.index.filterRule.useBucketSpec"
val INDEX_FILTER_RULE_USE_BUCKET_SPEC_DEFAULT = "false"

// Identifier injected to HadoopFsRelation as an option if an index is applied.
// Currently, the identifier is added to options field of HadoopFsRelation.
// In Spark 3.0, we could utilize TreeNodeTag to mark the identifier for each plan.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import com.microsoft.hyperspace.actions.Constants
import com.microsoft.hyperspace.index.IndexLogEntry
import com.microsoft.hyperspace.index.rankers.FilterIndexRanker
import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEventLogging, HyperspaceIndexUsageEvent}
import com.microsoft.hyperspace.util.ResolverUtils
import com.microsoft.hyperspace.util.{HyperspaceConf, ResolverUtils}

/**
* FilterIndex rule looks for opportunities in a logical plan to replace
Expand Down Expand Up @@ -56,13 +56,16 @@ object FilterIndexRule
findCoveringIndexes(filter, outputColumns, filterColumns)
FilterIndexRanker.rank(spark, filter, candidateIndexes) match {
case Some(index) =>
// Do not set BucketSpec to avoid limiting Spark's degree of parallelism.
// As FilterIndexRule is not intended to support bucketed scan, we set
// useBucketUnionForAppended as false. If it's true, Hybrid Scan can cause
// unnecessary shuffle for appended data to apply BucketUnion for merging data.
val transformedPlan =
RuleUtils.transformPlanToUseIndex(
spark,
index,
originalPlan,
useBucketSpec = false)
useBucketSpec = HyperspaceConf.useBucketSpecForFilterRule(spark),
useBucketUnionForAppended = false)
logEvent(
HyperspaceIndexUsageEvent(
AppInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,18 @@ object JoinIndexRule
val updatedPlan =
join
.copy(
left =
RuleUtils.transformPlanToUseIndex(spark, lIndex, l, useBucketSpec = true),
right =
RuleUtils.transformPlanToUseIndex(spark, rIndex, r, useBucketSpec = true))
left = RuleUtils.transformPlanToUseIndex(
spark,
lIndex,
l,
useBucketSpec = true,
useBucketUnionForAppended = true),
right = RuleUtils.transformPlanToUseIndex(
spark,
rIndex,
r,
useBucketSpec = true,
useBucketUnionForAppended = true))

logEvent(
HyperspaceIndexUsageEvent(
Expand Down Expand Up @@ -325,11 +333,7 @@ object JoinIndexRule
compatibleIndexPairs.map(
indexPairs =>
JoinIndexRanker
.rank(
spark,
leftRel,
rightRel,
indexPairs)
.rank(spark, leftRel, rightRel, indexPairs)
.head)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,13 +202,15 @@ object RuleUtils {
* @param index Index used in transformation of plan.
* @param plan Current logical plan.
* @param useBucketSpec Option whether to use BucketSpec for reading index data.
* @param useBucketUnionForAppended Option whether to use BucketUnion to merge appended data.
* @return Transformed plan.
*/
def transformPlanToUseIndex(
spark: SparkSession,
index: IndexLogEntry,
plan: LogicalPlan,
useBucketSpec: Boolean): LogicalPlan = {
useBucketSpec: Boolean,
useBucketUnionForAppended: Boolean): LogicalPlan = {
// Check pre-requisite.
val logicalRelation = getLogicalRelation(plan)
assert(logicalRelation.isDefined)
Expand All @@ -225,7 +227,7 @@ object RuleUtils {
lazy val isSourceUpdated = index.hasSourceUpdate

val transformed = if (hybridScanRequired || isSourceUpdated) {
transformPlanToUseHybridScan(spark, index, plan, useBucketSpec)
transformPlanToUseHybridScan(spark, index, plan, useBucketSpec, useBucketUnionForAppended)
} else {
transformPlanToUseIndexOnlyScan(spark, index, plan, useBucketSpec)
}
Expand Down Expand Up @@ -302,13 +304,15 @@ object RuleUtils {
* @param index Index used in transformation of plan.
* @param plan Current logical plan.
* @param useBucketSpec Option whether to use BucketSpec for reading index data.
* @param useBucketUnionForAppended Option whether to use BucketUnion to merge appended data.
* @return Transformed logical plan that leverages an index and merges appended data.
*/
private def transformPlanToUseHybridScan(
spark: SparkSession,
index: IndexLogEntry,
plan: LogicalPlan,
useBucketSpec: Boolean): LogicalPlan = {
useBucketSpec: Boolean,
useBucketUnionForAppended: Boolean): LogicalPlan = {
var unhandledAppendedFiles: Seq[Path] = Nil

// Get transformed plan with index data and appended files if applicable.
Expand Down Expand Up @@ -426,7 +430,7 @@ object RuleUtils {

val planForAppended =
transformPlanToReadAppendedFiles(spark, index, plan, unhandledAppendedFiles)
if (useBucketSpec) {
if (useBucketUnionForAppended && useBucketSpec) {
// If Bucketing information of the index is used to read the index data, we need to
// shuffle the appended data in the same way to correctly merge with bucketed index data.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ object HyperspaceConf {
.toDouble
}

def useBucketSpecForFilterRule(spark: SparkSession): Boolean = {
spark.conf
.get(
IndexConstants.INDEX_FILTER_RULE_USE_BUCKET_SPEC,
IndexConstants.INDEX_FILTER_RULE_USE_BUCKET_SPEC_DEFAULT)
.toBoolean
}

def numBucketsForIndex(spark: SparkSession): Int = {
getConfStringWithMultipleKeys(
spark,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class HybridScanForDeltaLakeTest extends HybridScanSuite {

test(
"Append-only: filter rule & parquet format, " +
"index relation should include appended file paths") {
"index relation should include appended file paths.") {
// This flag is for testing plan transformation if appended files could be load with index
// data scan node. Currently, it's applied for a very specific case: FilterIndexRule,
// Parquet source format, no partitioning, no deleted files.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class HybridScanForNonPartitionedDataTest extends HybridScanSuite {

test(
"Append-only: filter rule & parquet format, " +
"index relation should include appended file paths") {
"index relation should include appended file paths.") {
// This flag is for testing plan transformation if appended files could be load with index
// data scan node. Currently, it's applied for a very specific case: FilterIndexRule,
// Parquet source format, no partitioning, no deleted files.
Expand Down Expand Up @@ -83,7 +83,7 @@ class HybridScanForNonPartitionedDataTest extends HybridScanSuite {
}
}

test("Delete-only: Hybrid Scan for delete support doesn't work without lineage column") {
test("Delete-only: Hybrid Scan for delete support doesn't work without lineage column.") {
val indexConfig = IndexConfig("index_ParquetDelete2", Seq("clicks"), Seq("query"))
Seq(("indexWithoutLineage", "false", false), ("indexWithLineage", "true", true)) foreach {
case (indexName, lineageColumnConfig, transformationExpected) =>
Expand Down Expand Up @@ -119,7 +119,7 @@ class HybridScanForNonPartitionedDataTest extends HybridScanSuite {
}
}

test("Delete-only: filter rule, number of delete files threshold") {
test("Delete-only: filter rule, number of delete files threshold.") {
withTempPathAsString { testPath =>
val indexName = "IndexDeleteCntTest"
withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "true") {
Expand All @@ -140,8 +140,6 @@ class HybridScanForNonPartitionedDataTest extends HybridScanSuite {

val afterDeleteSize = FileUtils.getDirectorySize(new Path(testPath))
val deletedRatio = 1 - (afterDeleteSize / sourceSize.toFloat)
// scalastyle:off
println(deletedRatio)

withSQLConf(TestConfig.HybridScanEnabled: _*) {
withSQLConf(IndexConstants.INDEX_HYBRID_SCAN_DELETED_RATIO_THRESHOLD ->
Expand All @@ -159,5 +157,4 @@ class HybridScanForNonPartitionedDataTest extends HybridScanSuite {
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ trait HybridScanSuite extends QueryTest with HyperspaceSuite {

test(
"Append-only: join rule, appended data should be shuffled with indexed columns " +
"and merged by BucketUnion") {
"and merged by BucketUnion.") {
withTempPathAsString { testPath =>
val appendPath1 = testPath + "/append1"
val appendPath2 = testPath + "/append2"
Expand Down Expand Up @@ -400,11 +400,12 @@ trait HybridScanSuite extends QueryTest with HyperspaceSuite {
val basePlan = baseQuery.queryExecution.optimizedPlan

withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "-1") {
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
withSQLConf(
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
IndexConstants.INDEX_HYBRID_SCAN_ENABLED -> "false") {
val join = joinQuery()
checkAnswer(join, baseQuery)
assert(basePlan.equals(join.queryExecution.optimizedPlan))
val join = joinQuery()
checkAnswer(join, baseQuery)
assert(basePlan.equals(join.queryExecution.optimizedPlan))
}

withSQLConf(TestConfig.HybridScanEnabled: _*) {
Expand All @@ -428,7 +429,7 @@ trait HybridScanSuite extends QueryTest with HyperspaceSuite {

test(
"Append-only: filter rule and non-parquet format," +
"appended data should be shuffled and merged by Union") {
"appended data should be shuffled and merged by Union.") {
// Note: for delta lake, this test is also eligible as the dataset is partitioned.
withTempPathAsString { testPath =>
val (appendedFiles, deletedFiles) = setupIndexAndChangeData(
Expand Down Expand Up @@ -460,12 +461,76 @@ trait HybridScanSuite extends QueryTest with HyperspaceSuite {
appendedFiles,
deletedFiles,
Seq(" <= 2000"))

// Check bucketSpec is not used.
val bucketSpec = planWithHybridScan collect {
case LogicalRelation(HadoopFsRelation(_, _, _, bucketSpec, _, _), _, _, _) =>
bucketSpec
}
assert(bucketSpec.length == 2)

// bucketSpec.head is for the index plan, bucketSpec.last is for the plan
// for appended files.
assert(bucketSpec.head.isEmpty && bucketSpec.last.isEmpty)

checkAnswer(baseQuery, filter)
}
}
}

test(
"Append-only: filter rule and non-parquet format," +
"appended data should be shuffled and merged by Union even with bucketSpec.") {
withTempPathAsString { testPath =>
val (appendedFiles, deletedFiles) = setupIndexAndChangeData(
fileFormat2,
testPath,
indexConfig1.copy(indexName = "index_Format2Append"),
appendCnt = 1,
deleteCnt = 0)

val df = spark.read.format(fileFormat2).load(testPath)
def filterQuery: DataFrame = df.filter(df("clicks") <= 2000).select(df("query"))

val baseQuery = filterQuery
val basePlan = baseQuery.queryExecution.optimizedPlan

withSQLConf(IndexConstants.INDEX_HYBRID_SCAN_ENABLED -> "false") {
val filter = filterQuery
assert(basePlan.equals(filter.queryExecution.optimizedPlan))
}

withSQLConf(
TestConfig.HybridScanEnabledAppendOnly :+
IndexConstants.INDEX_FILTER_RULE_USE_BUCKET_SPEC -> "true": _*) {
val filter = filterQuery
val planWithHybridScan = filter.queryExecution.optimizedPlan
assert(!basePlan.equals(planWithHybridScan))

checkFilterIndexHybridScanUnion(
planWithHybridScan,
"index_Format2Append",
appendedFiles,
deletedFiles,
Seq(" <= 2000"))

// Check bucketSpec is used.
val bucketSpec = planWithHybridScan collect {
case LogicalRelation(HadoopFsRelation(_, _, _, bucketSpec, _, _), _, _, _) =>
bucketSpec
}
assert(bucketSpec.length == 2)
// bucketSpec.head is for the index plan, bucketSpec.last is for the plan
// for appended files.
assert(bucketSpec.head.isDefined && bucketSpec.last.isEmpty)
assert(bucketSpec.head.get.bucketColumnNames.toSet === indexConfig1.indexedColumns.toSet)

checkAnswer(baseQuery, filter)
}
}
}

test("Delete-only: index relation should have additional filter for deleted files") {
test("Delete-only: index relation should have additional filter for deleted files.") {
val testSet = Seq(("index_ParquetDelete", fileFormat), ("index_JsonDelete", fileFormat2))
testSet foreach {
case (indexName, dataFormat) =>
Expand Down Expand Up @@ -640,8 +705,9 @@ trait HybridScanSuite extends QueryTest with HyperspaceSuite {
assert(basePlan.equals(join.queryExecution.optimizedPlan))
}

withSQLConf(TestConfig.HybridScanEnabled :+
"spark.sql.optimizer.inSetConversionThreshold" -> "1": _*) {
withSQLConf(
TestConfig.HybridScanEnabled :+
"spark.sql.optimizer.inSetConversionThreshold" -> "1": _*) {
// Changed inSetConversionThreshould to check InSet optimization.
val join = joinQuery()
val planWithHybridScan = join.queryExecution.optimizedPlan
Expand Down

0 comments on commit 88f1b43

Please sign in to comment.