Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Add config to use bucketed scan for filter indexes #329

Merged
merged 9 commits into from
Feb 6, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ object IndexConstants {
"spark.hyperspace.index.hybridscan.maxAppendedRatio"
val INDEX_HYBRID_SCAN_APPENDED_RATIO_THRESHOLD_DEFAULT = "0.3"

// Config used to set bucketSpec for Filter Index. If bucketSpec is used, Spark can prune
sezruby marked this conversation as resolved.
Show resolved Hide resolved
// not applicable buckets, so we could read less files in case of a high selectivity query.
sezruby marked this conversation as resolved.
Show resolved Hide resolved
val INDEX_FILTER_RULE_USE_BUCKET_SPEC = "spark.hyperspace.index.filterRule.useBucketSpec"
val INDEX_FILTER_RULE_USE_BUCKET_SPEC_DEFAULT = "false"

// Identifier injected to HadoopFsRelation as an option if an index is applied.
// Currently, the identifier is added to options field of HadoopFsRelation.
// In Spark 3.0, we could utilize TreeNodeTag to mark the identifier for each plan.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import com.microsoft.hyperspace.actions.Constants
import com.microsoft.hyperspace.index.IndexLogEntry
import com.microsoft.hyperspace.index.rankers.FilterIndexRanker
import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEventLogging, HyperspaceIndexUsageEvent}
import com.microsoft.hyperspace.util.ResolverUtils
import com.microsoft.hyperspace.util.{HyperspaceConf, ResolverUtils}

/**
* FilterIndex rule looks for opportunities in a logical plan to replace
Expand Down Expand Up @@ -62,7 +62,8 @@ object FilterIndexRule
spark,
index,
originalPlan,
useBucketSpec = false)
useBucketSpec = HyperspaceConf.useBucketSpecForFilterRule(spark),
useBucketUnionForAppended = false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a comment why this will always be false for the filter index rule? We will never take advantage of bucketing from the union right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be beneficial for later ops which requires bucketing, but just for filter index, we don't need it.
I think it's better to write a new rule for the cases if needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we add the comment to the code (since it's not straightforward to understand)?

logEvent(
HyperspaceIndexUsageEvent(
AppInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,18 @@ object JoinIndexRule
val updatedPlan =
join
.copy(
left =
RuleUtils.transformPlanToUseIndex(spark, lIndex, l, useBucketSpec = true),
right =
RuleUtils.transformPlanToUseIndex(spark, rIndex, r, useBucketSpec = true))
left = RuleUtils.transformPlanToUseIndex(
spark,
lIndex,
l,
useBucketSpec = true,
useBucketUnionForAppended = true),
right = RuleUtils.transformPlanToUseIndex(
spark,
rIndex,
r,
useBucketSpec = true,
useBucketUnionForAppended = true))

logEvent(
HyperspaceIndexUsageEvent(
Expand Down Expand Up @@ -325,11 +333,7 @@ object JoinIndexRule
compatibleIndexPairs.map(
indexPairs =>
JoinIndexRanker
.rank(
spark,
leftRel,
rightRel,
indexPairs)
.rank(spark, leftRel, rightRel, indexPairs)
.head)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ object RuleUtils {
appendedBytesRatio < HyperspaceConf.hybridScanAppendedRatioThreshold(spark) &&
deletedBytesRatio < HyperspaceConf.hybridScanDeletedRatioThreshold(spark)


// For append-only Hybrid Scan, deleted files are not allowed.
lazy val isAppendOnlyCandidate = !hybridScanDeleteEnabled && deletedCnt == 0 &&
commonCnt > 0 &&
Expand Down Expand Up @@ -178,13 +177,15 @@ object RuleUtils {
* @param index Index used in transformation of plan.
* @param plan Current logical plan.
* @param useBucketSpec Option whether to use BucketSpec for reading index data.
* @param useBucketUnionForAppended Option whether to use BucketUnion to merge appended data.
* @return Transformed plan.
*/
def transformPlanToUseIndex(
spark: SparkSession,
index: IndexLogEntry,
plan: LogicalPlan,
useBucketSpec: Boolean): LogicalPlan = {
useBucketSpec: Boolean,
useBucketUnionForAppended: Boolean): LogicalPlan = {
// Check pre-requisite.
val logicalRelation = getLogicalRelation(plan)
assert(logicalRelation.isDefined)
Expand All @@ -201,7 +202,7 @@ object RuleUtils {
lazy val isSourceUpdated = index.hasSourceUpdate

val transformed = if (hybridScanRequired || isSourceUpdated) {
transformPlanToUseHybridScan(spark, index, plan, useBucketSpec)
transformPlanToUseHybridScan(spark, index, plan, useBucketSpec, useBucketUnionForAppended)
} else {
transformPlanToUseIndexOnlyScan(spark, index, plan, useBucketSpec)
}
Expand Down Expand Up @@ -276,13 +277,15 @@ object RuleUtils {
* @param index Index used in transformation of plan.
* @param plan Current logical plan.
* @param useBucketSpec Option whether to use BucketSpec for reading index data.
* @param useBucketUnionForAppended Option whether to use BucketUnion to merge appended data.
* @return Transformed logical plan that leverages an index and merges appended data.
*/
private def transformPlanToUseHybridScan(
spark: SparkSession,
index: IndexLogEntry,
plan: LogicalPlan,
useBucketSpec: Boolean): LogicalPlan = {
useBucketSpec: Boolean,
useBucketUnionForAppended: Boolean): LogicalPlan = {
var unhandledAppendedFiles: Seq[Path] = Nil

// Get transformed plan with index data and appended files if applicable.
Expand Down Expand Up @@ -393,7 +396,7 @@ object RuleUtils {

val planForAppended =
transformPlanToReadAppendedFiles(spark, index.schema, plan, unhandledAppendedFiles)
if (useBucketSpec) {
if (useBucketUnionForAppended && useBucketSpec) {
// If Bucketing information of the index is used to read the index data, we need to
// shuffle the appended data in the same way to correctly merge with bucketed index data.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ object HyperspaceConf {
.toDouble
}

def useBucketSpecForFilterRule(spark: SparkSession): Boolean = {
spark.conf
.get(
IndexConstants.INDEX_FILTER_RULE_USE_BUCKET_SPEC,
IndexConstants.INDEX_FILTER_RULE_USE_BUCKET_SPEC_DEFAULT)
.toBoolean
}

def numBucketsForIndex(spark: SparkSession): Int = {
getConfStringWithMultipleKeys(
spark,
Expand Down