Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Add similarity thresholds for Hybrid Scan #300

Merged
merged 12 commits into from
Jan 14, 2021
29 changes: 11 additions & 18 deletions src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,27 +32,20 @@ object IndexConstants {
val INDEX_NUM_BUCKETS_DEFAULT: Int = SQLConf.SHUFFLE_PARTITIONS.defaultValue.get

// This config enables Hybrid scan on mutable dataset at query time.
// Currently, this config allows to perform Hybrid scan on append-only dataset.
// For delete dataset, "spark.hyperspace.index.hybridscan.delete.enabled" is
// also needed to be set.
val INDEX_HYBRID_SCAN_ENABLED = "spark.hyperspace.index.hybridscan.enabled"
val INDEX_HYBRID_SCAN_ENABLED_DEFAULT = "false"

// This is a temporary config to support Hybrid scan on both append & delete dataset.
// The config does not work without the Hybrid scan config -
// "spark.hyperspace.index.hybridscan.enabled"
// and will be removed after performance validation and optimization.
// See https://github.com/microsoft/hyperspace/issues/184
val INDEX_HYBRID_SCAN_DELETE_ENABLED = "spark.hyperspace.index.hybridscan.delete.enabled"
val INDEX_HYBRID_SCAN_DELETE_ENABLED_DEFAULT = "false"

// While the performance validation of Hybrid scan for delete files described above,
// we limit the number of deleted files to avoid regression from Hybrid scan.
// If the number of deleted files is larger than this config, the index is disabled and
// cannot be a candidate for Hybrid Scan.
val INDEX_HYBRID_SCAN_DELETE_MAX_NUM_FILES =
"spark.hyperspace.index.hybridscan.delete.maxNumDeletedFiles"
val INDEX_HYBRID_SCAN_DELETE_MAX_NUM_FILES_DEFAULT = "30"
// If the ratio of deleted files to all source files of a candidate index is greater than this
// threshold, the index won't be applied even with Hybrid Scan.
val INDEX_HYBRID_SCAN_DELETED_RATIO_THRESHOLD =
"spark.hyperspace.index.hybridscan.maxDeletedRatio"
val INDEX_HYBRID_SCAN_DELETED_RATIO_THRESHOLD_DEFAULT = "0.2"

// If the ratio of newly appended files to all source files in the given relation is greater than
// this threshold, the index won't be applied even with Hybrid Scan.
val INDEX_HYBRID_SCAN_APPENDED_RATIO_THRESHOLD =
"spark.hyperspace.index.hybridscan.maxAppendedRatio"
val INDEX_HYBRID_SCAN_APPENDED_RATIO_THRESHOLD_DEFAULT = "0.3"

// Identifier injected to HadoopFsRelation as an option if an index is applied.
// Currently, the identifier is added to options field of HadoopFsRelation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,11 @@ case class IndexLogEntry(
relations.head.data.properties.content.fileInfos
}

@JsonIgnore
lazy val sourceFilesSizeInBytes: Long = {
sourceFileInfoSet.map(_.size).sum
}

def sourceUpdate: Option[Update] = {
relations.head.data.properties.update
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,27 +55,27 @@ object JoinIndexRanker {
rightChild: LogicalPlan,
indexPairs: Seq[(IndexLogEntry, IndexLogEntry)]): Seq[(IndexLogEntry, IndexLogEntry)] = {
val hybridScanEnabled = HyperspaceConf.hybridScanEnabled(spark)
def getCommonBytes(logicalPlan: LogicalPlan, index: IndexLogEntry): Long = {
def getCommonSizeInBytes(logicalPlan: LogicalPlan, index: IndexLogEntry): Long = {
index.getTagValue(logicalPlan, IndexLogEntryTags.COMMON_SOURCE_SIZE_IN_BYTES).getOrElse(0L)
}

indexPairs.sortWith {
case ((left1, right1), (left2, right2)) =>
// These common bytes were calculated and tagged in getCandidateIndexes.
// The value is the summation of common source files of the given plan and each index.
lazy val commonBytes1 =
getCommonBytes(leftChild, left1) + getCommonBytes(rightChild, right1)
lazy val commonBytes2 =
getCommonBytes(leftChild, left2) + getCommonBytes(rightChild, right2)
lazy val commonSizeInBytes1 =
getCommonSizeInBytes(leftChild, left1) + getCommonSizeInBytes(rightChild, right1)
lazy val commonSizeInBytes2 =
getCommonSizeInBytes(leftChild, left2) + getCommonSizeInBytes(rightChild, right2)

if (left1.numBuckets == right1.numBuckets && left2.numBuckets == right2.numBuckets) {
if (!hybridScanEnabled || (commonBytes1 == commonBytes2)) {
if (!hybridScanEnabled || (commonSizeInBytes1 == commonSizeInBytes2)) {
left1.numBuckets > left2.numBuckets
} else {
// If both index pairs have the same number of buckets and Hybrid Scan is enabled,
// pick the pair with more common bytes with the given source plan, so as to
// reduce the overhead from handling appended and deleted files.
commonBytes1 > commonBytes2
commonSizeInBytes1 > commonSizeInBytes2
}
} else if (left1.numBuckets == right1.numBuckets) {
true
Expand All @@ -84,7 +84,7 @@ object JoinIndexRanker {
} else {
// At this point, both pairs have different number of buckets. If Hybrid Scan is enabled,
// pick the pair with "more common bytes", otherwise pick the first pair.
!hybridScanEnabled || commonBytes1 > commonBytes2
!hybridScanEnabled || commonSizeInBytes1 > commonSizeInBytes2
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ object RuleUtils {
}
}

def isHybridScanCandidate(entry: IndexLogEntry, inputSourceFiles: Seq[FileInfo]): Boolean = {
def isHybridScanCandidate(
entry: IndexLogEntry,
inputSourceFiles: Seq[FileInfo],
inputSourceFilesSizeInBytes: Long): Boolean = {
// TODO: Some threshold about the similarity of source data files - number of common files or
// total size of common files.
// See https://github.com/microsoft/hyperspace/issues/159
Expand All @@ -88,15 +91,23 @@ object RuleUtils {
res
}
}

val appendedBytesRatio = 1 - commonBytes / inputSourceFilesSizeInBytes.toFloat
val deletedBytesRatio = 1 - commonBytes / entry.sourceFilesSizeInBytes.toFloat

val deletedCnt = entry.sourceFileInfoSet.size - commonCnt
lazy val isDeleteCandidate = hybridScanDeleteEnabled && entry.hasLineageColumn &&
commonCnt > 0 && deletedCnt <= HyperspaceConf.hybridScanDeleteMaxNumFiles(spark)
val isAppendAndDeleteCandidate = hybridScanDeleteEnabled && entry.hasLineageColumn &&
commonCnt > 0 &&
appendedBytesRatio < HyperspaceConf.hybridScanAppendedRatioThreshold(spark) &&
sezruby marked this conversation as resolved.
Show resolved Hide resolved
deletedBytesRatio < HyperspaceConf.hybridScanDeletedRatioThreshold(spark)


// For append-only Hybrid Scan, deleted files are not allowed.
lazy val isAppendOnlyCandidate = !hybridScanDeleteEnabled && deletedCnt == 0 &&
commonCnt > 0
commonCnt > 0 &&
appendedBytesRatio < HyperspaceConf.hybridScanAppendedRatioThreshold(spark)

val isCandidate = isDeleteCandidate || isAppendOnlyCandidate
val isCandidate = isAppendAndDeleteCandidate || isAppendOnlyCandidate
if (isCandidate) {
entry.setTagValue(plan, IndexLogEntryTags.COMMON_SOURCE_SIZE_IN_BYTES, commonBytes)

Expand Down Expand Up @@ -132,8 +143,10 @@ object RuleUtils {
}
}
assert(filesByRelations.length == 1)
val inputSourceFiles = filesByRelations.flatten
val totalSizeInBytes = inputSourceFiles.map(_.size).sum
indexes.filter(index =>
index.created && isHybridScanCandidate(index, filesByRelations.flatten))
index.created && isHybridScanCandidate(index, inputSourceFiles, totalSizeInBytes))
} else {
indexes.filter(index => index.created && signatureValid(index))
}
Expand Down
22 changes: 13 additions & 9 deletions src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,7 @@ object HyperspaceConf {
}

def hybridScanDeleteEnabled(spark: SparkSession): Boolean = {
spark.conf
.get(
IndexConstants.INDEX_HYBRID_SCAN_DELETE_ENABLED,
IndexConstants.INDEX_HYBRID_SCAN_DELETE_ENABLED_DEFAULT)
.toBoolean
hybridScanDeletedRatioThreshold(spark) > 0.0
}

def optimizeFileSizeThreshold(spark: SparkSession): Long = {
Expand All @@ -48,12 +44,20 @@ object HyperspaceConf {
.toLong
}

def hybridScanDeleteMaxNumFiles(spark: SparkSession): Int = {
def hybridScanDeletedRatioThreshold(spark: SparkSession): Double = {
spark.conf
.get(
IndexConstants.INDEX_HYBRID_SCAN_DELETED_RATIO_THRESHOLD,
IndexConstants.INDEX_HYBRID_SCAN_DELETED_RATIO_THRESHOLD_DEFAULT)
.toDouble
}

def hybridScanAppendedRatioThreshold(spark: SparkSession): Double = {
spark.conf
.get(
IndexConstants.INDEX_HYBRID_SCAN_DELETE_MAX_NUM_FILES,
IndexConstants.INDEX_HYBRID_SCAN_DELETE_MAX_NUM_FILES_DEFAULT)
.toInt
IndexConstants.INDEX_HYBRID_SCAN_APPENDED_RATIO_THRESHOLD,
IndexConstants.INDEX_HYBRID_SCAN_APPENDED_RATIO_THRESHOLD_DEFAULT)
.toDouble
}

def numBucketsForIndex(spark: SparkSession): Int = {
Expand Down
14 changes: 13 additions & 1 deletion src/test/scala/com/microsoft/hyperspace/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}

import com.microsoft.hyperspace.MockEventLogger.reset
import com.microsoft.hyperspace.index.{FileIdTracker, IndexConfig, IndexLogEntry, IndexLogManager, IndexLogManagerFactoryImpl}
import com.microsoft.hyperspace.index.{FileIdTracker, IndexConfig, IndexConstants, IndexLogEntry, IndexLogManager, IndexLogManagerFactoryImpl}
import com.microsoft.hyperspace.telemetry.{EventLogger, HyperspaceEvent}
import com.microsoft.hyperspace.util.{FileUtils, PathUtils}

Expand Down Expand Up @@ -107,3 +107,15 @@ object MockEventLogger {
emittedEvents = Seq()
}
}

object TestConfig {
val HybridScanEnabled = Seq(
IndexConstants.INDEX_HYBRID_SCAN_ENABLED -> "true",
IndexConstants.INDEX_HYBRID_SCAN_APPENDED_RATIO_THRESHOLD -> "0.99",
IndexConstants.INDEX_HYBRID_SCAN_DELETED_RATIO_THRESHOLD -> "0.99")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use "1.0" instead? (not sure what it means by "0.99")

Copy link
Contributor

@imback82 imback82 Jan 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the tests failing if we use the default ratio? I am a bit confused with when to use this config, looking at other tests below. (If the majority of the tests are passing with default ratio, I would use the new ratio only for the tests that fail with the default ratio.)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes because the sample data is too small, so almost all tests failed with the default config.
"1.0" means Hybrid scan is performed even if there's no common source data, so I set it "0.9" or "0.99"


val HybridScanEnabledAppendOnly = Seq(
IndexConstants.INDEX_HYBRID_SCAN_ENABLED -> "true",
IndexConstants.INDEX_HYBRID_SCAN_APPENDED_RATIO_THRESHOLD -> "0.99",
IndexConstants.INDEX_HYBRID_SCAN_DELETED_RATIO_THRESHOLD -> "0")
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.sql.{DataFrame, QueryTest}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources._

import com.microsoft.hyperspace.{Hyperspace, Implicits, SampleData}
import com.microsoft.hyperspace.{Hyperspace, Implicits, SampleData, TestConfig}

class DeltaLakeIntegrationTest extends QueryTest with HyperspaceSuite {
override val systemPath = new Path("src/test/resources/deltaLakeIntegrationTest")
Expand Down Expand Up @@ -125,9 +125,7 @@ class DeltaLakeIntegrationTest extends QueryTest with HyperspaceSuite {
val deltaTable = DeltaTable.forPath(dataPath)
deltaTable.delete("clicks > 5000")

withSQLConf(
"spark.hyperspace.index.hybridscan.enabled" -> "true",
"spark.hyperspace.index.hybridscan.delete.enabled" -> "true") {
withSQLConf(TestConfig.HybridScanEnabled: _*) {
// The index should be applied for the updated version.
assert(isIndexUsed(query().queryExecution.optimizedPlan, "deltaIndex", true))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.execution.SortExec
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFileIndex, LogicalRelation}
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec

import com.microsoft.hyperspace.{Hyperspace, Implicits, SampleData, TestUtils}
import com.microsoft.hyperspace.{Hyperspace, Implicits, SampleData, TestConfig, TestUtils}
import com.microsoft.hyperspace.index.IndexConstants.{GLOBBING_PATTERN_KEY, REFRESH_MODE_INCREMENTAL, REFRESH_MODE_QUICK}
import com.microsoft.hyperspace.index.execution.BucketUnionStrategy
import com.microsoft.hyperspace.index.rules.{FilterIndexRule, JoinIndexRule}
Expand Down Expand Up @@ -669,16 +669,14 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite {
}

// Refreshed index as quick mode can be applied with Hybrid Scan config.
withSQLConf(IndexConstants.INDEX_HYBRID_SCAN_ENABLED -> "true") {
withSQLConf(IndexConstants.INDEX_HYBRID_SCAN_DELETE_ENABLED -> "true") {
withSQLConf(TestConfig.HybridScanEnabled: _*) {
spark.disableHyperspace()
val dfWithHyperspaceDisabled = query()
val basePlan = dfWithHyperspaceDisabled.queryExecution.optimizedPlan
spark.enableHyperspace()
val dfWithHyperspaceEnabled = query()
assert(!basePlan.equals(dfWithHyperspaceEnabled.queryExecution.optimizedPlan))
checkAnswer(dfWithHyperspaceDisabled, dfWithHyperspaceEnabled)
}
}
}
}
Expand Down Expand Up @@ -720,7 +718,7 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite {
assert(basePlan.equals(filter.queryExecution.optimizedPlan))
}

withSQLConf(IndexConstants.INDEX_HYBRID_SCAN_ENABLED -> "true") {
withSQLConf(TestConfig.HybridScanEnabledAppendOnly: _*) {
val filter = filterQuery
val planWithHybridScan = filter.queryExecution.optimizedPlan
assert(!basePlan.equals(planWithHybridScan))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.execution.datasources._

import com.microsoft.hyperspace.Hyperspace
import com.microsoft.hyperspace.{Hyperspace, TestConfig}

// Hybrid Scan tests for non partitioned source data. Test cases of HybridScanSuite are also
// executed with non partitioned source data.
Expand Down Expand Up @@ -110,7 +110,7 @@ class HybridScanForNonPartitionedDataTest extends HybridScanSuite {
assert(basePlan.equals(filter.queryExecution.optimizedPlan))
}

withSQLConf(IndexConstants.INDEX_HYBRID_SCAN_ENABLED -> "true") {
withSQLConf(TestConfig.HybridScanEnabledAppendOnly: _*) {
val filter = filterQuery
val planWithHybridScan = filter.queryExecution.optimizedPlan
assert(!basePlan.equals(planWithHybridScan))
Expand Down Expand Up @@ -148,15 +148,11 @@ class HybridScanForNonPartitionedDataTest extends HybridScanSuite {
df.filter(df("clicks") <= 2000).select(df("query"))
val baseQuery = filterQuery
val basePlan = baseQuery.queryExecution.optimizedPlan
withSQLConf(
IndexConstants.INDEX_HYBRID_SCAN_ENABLED -> "true",
IndexConstants.INDEX_HYBRID_SCAN_DELETE_ENABLED -> "false") {
withSQLConf(TestConfig.HybridScanEnabledAppendOnly: _*) {
val filter = filterQuery
assert(basePlan.equals(filter.queryExecution.optimizedPlan))
}
withSQLConf(
IndexConstants.INDEX_HYBRID_SCAN_ENABLED -> "true",
IndexConstants.INDEX_HYBRID_SCAN_DELETE_ENABLED -> "true") {
withSQLConf(TestConfig.HybridScanEnabled: _*) {
val filter = filterQuery
assert(
basePlan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package com.microsoft.hyperspace.index
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.DataFrame

import com.microsoft.hyperspace.{Hyperspace, Implicits}
import com.microsoft.hyperspace.{Hyperspace, Implicits, TestConfig}
import com.microsoft.hyperspace.util.FileUtils

// Hybrid Scan tests for partitioned source data. Test cases of HybridScanSuite are also
Expand Down Expand Up @@ -133,7 +133,7 @@ class HybridScanForPartitionedDataTest extends HybridScanSuite {
checkAnswer(baseQuery, filter)
}

withSQLConf(IndexConstants.INDEX_HYBRID_SCAN_ENABLED -> "true") {
withSQLConf(TestConfig.HybridScanEnabledAppendOnly: _*) {
val filter = filterQuery(df)
assert(!basePlan.equals(filter.queryExecution.optimizedPlan))
checkAnswer(baseQuery, filter)
Expand Down Expand Up @@ -163,7 +163,7 @@ class HybridScanForPartitionedDataTest extends HybridScanSuite {
checkAnswer(baseQuery, filter)
}

withSQLConf(IndexConstants.INDEX_HYBRID_SCAN_ENABLED -> "true") {
withSQLConf(TestConfig.HybridScanEnabledAppendOnly: _*) {
val filter = filterQuery(df)
assert(!basePlan.equals(filter.queryExecution.optimizedPlan))
checkAnswer(baseQuery, filter)
Expand Down Expand Up @@ -213,7 +213,7 @@ class HybridScanForPartitionedDataTest extends HybridScanSuite {
checkAnswer(baseQuery, filter)
}

withSQLConf(IndexConstants.INDEX_HYBRID_SCAN_ENABLED -> "true") {
withSQLConf(TestConfig.HybridScanEnabledAppendOnly: _*) {
val filter = filterQuery(df)
assert(!basePlan.equals(filter.queryExecution.optimizedPlan))
checkAnswer(baseQuery, filter)
Expand Down Expand Up @@ -247,7 +247,7 @@ class HybridScanForPartitionedDataTest extends HybridScanSuite {
checkAnswer(baseQuery, filter)
}

withSQLConf(IndexConstants.INDEX_HYBRID_SCAN_ENABLED -> "true") {
withSQLConf(TestConfig.HybridScanEnabledAppendOnly: _*) {
val filter = filterQuery(df)
// The new partition can be handled with Hybrid Scan approach.
assert(!basePlan.equals(filter.queryExecution.optimizedPlan))
Expand Down
Loading