-
Notifications
You must be signed in to change notification settings - Fork 114
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Change FilterIndex rule to cover select all columns case #73
Conversation
} | ||
} | ||
|
||
private def verifyTransformedPlanWithIndex2(logicalPlan: LogicalPlan): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we remove some duplication in verify..Index1
and verify..Index2
? I was thinking the following:
def verifyIndex(logicalPlan, indexName): Unit = {
val logicalRelation = logicalPlan.collect {
case l : LogicalRelation => l
}.head
logicalRelation match {
case l@LogicalRelation(HadoopRelation(newLocation..........ParquetFileFormat.....) =>
assert(condition1)
assert(condition2)
assert(index name related checks)
...
case _ => fail("Unexpected plan")
}
}
basically parameterize indexname in the function.
The only issue is we don't explicitly check if the plan is a Project or a Filter. I guess it's ok.
Let me know if this makes sense or if there's any suggestion to deduplicate. If we want to keep Project and Filter check, it's ok too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
_, | ||
_, | ||
_)) => | ||
try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you try refactoring this as discussed offline?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using extractor pattern is done.
// Pattern-2 covers the case where project node is eliminated or not present. An example is | ||
// when all columns are selected. | ||
// Currently, this rule replaces a relation with an index when: | ||
// 1. The index covers all columns from the filter predicate and output columns list, and | ||
// 2. Filter predicate's columns include the first 'indexed' column of the index. | ||
plan transform { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's be explicit and use transformDown
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
few minor comments, but generally looking good.
_))) => | ||
plan transformDown { | ||
case FilterRuleExtractor( | ||
planHandle, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
originalPlan
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed it.
@@ -85,8 +84,8 @@ object FilterIndexRule extends Rule[LogicalPlan] with Logging { | |||
* For a given relation, check its available indexes and replace it with the top-ranked index | |||
* (according to cost model). | |||
* | |||
* @param project top-most node in the logical plan that is being optimized. | |||
* @param projectColumns List of project columns. | |||
* @param filter Filter node in the subplan that is being optimized. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: one space after filter
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
|
||
object FilterRuleExtractor extends Logging { | ||
type returnType = ( | ||
LogicalPlan, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: LogicalPlan, // original plan
@@ -227,3 +221,62 @@ object FilterIndexRule extends Rule[LogicalPlan] with Logging { | |||
} | |||
} | |||
} | |||
|
|||
object FilterRuleExtractor extends Logging { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Logging
not used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added.
_, | ||
_))) => | ||
plan transformDown { | ||
case FilterRuleExtractor( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about ExtractFilterNode
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed.
case filter @ Filter( | ||
condition: Expression, | ||
logicalRelation @ LogicalRelation( | ||
fsRelation @ HadoopFsRelation(location, _, _, _, _, _), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like location: FileIndex
is not being used in this file. Can we remove it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching it. Removed it.
|
||
hyperspace.createIndex(df, indexConfig) | ||
|
||
def query(): DataFrame = df.filter("c4 == 1").select("c3", "c1", "c2", "c5", "c4") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we verify if no project node is present in this query?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Switched query to use "SELECT *" and added assert.
src/test/scala/com/microsoft/hyperspace/index/rules/FilterIndexRuleTest.scala
Show resolved
Hide resolved
case _ => fail("Unexpected plan.") | ||
} | ||
} | ||
|
||
private def verifyIndexProperties( | ||
indexName: String, | ||
newLocation: InMemoryFileIndex, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's use actual
(vs. expected) instead of new
. Or even removing new
all together is fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
bucketSpec: Option[BucketSpec]): Unit = { | ||
val allIndexes = IndexCollectionManager(spark).getIndexes(Seq(Constants.States.ACTIVE)) | ||
val expectedLocation = getIndexDataFilesPath(indexName) | ||
assert(newLocation.rootPaths.head.equals(expectedLocation), "Invalid location.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an existing code, but can we remove the assert message "Invalid blah" in this block? I don't think it adds any value and it is just more burdensome for the developer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except for one nit comment. Thanks @pirz!
def query(): DataFrame = spark.sql("SELECT * from t where c4 = 1") | ||
|
||
// Verify no Project node is present in the query plan, as a result of using SELECT * | ||
assert(query().queryExecution.optimizedPlan.collect { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
assert(query().queryExecution.optimizedPlan.collect { case p: Project => p }.isEmpty)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thnx, fixed this.
def query(): DataFrame = spark.sql("SELECT * from t where c4 = 1") | ||
|
||
// Verify no Project node is present in the query plan, as a result of using SELECT * | ||
assert(query().queryExecution.optimizedPlan.collect { case p: Project => p }.isEmpty, true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need for true
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, I fixed this too. Thnx!
@apoorvedave1 did you need more time review this? |
What changes were proposed in this pull request?
This PR extends FilterIndex rule to cover the case when all columns are selected from a relation.
Such a case happens when logical plan is "Scan -> Filter" and Project node is not present or is eliminated due to some other optimizations (for example in a
select *
scenario).Why are the changes needed?
Extend FilterIndex rule to cover more potential optimization cases.
Fixing issue #16
Does this PR introduce any user-facing change?
No
How was this patch tested?
Test cases added under FilterIndexRule tests and E2E tests.