Skip to content

Commit

Permalink
[SPARK-13840][SQL] Split Optimizer Rule ColumnPruning to ColumnPrunin…
Browse files Browse the repository at this point in the history
…g and EliminateOperator

#### What changes were proposed in this pull request?

Before this PR, two Optimizer rules `ColumnPruning` and `PushPredicateThroughProject` reverse each other's effects. Optimizer always reaches the max iteration when optimizing some queries. Extra `Project` are found in the plan. For example, below is the optimized plan after reaching 100 iterations:

```
Join Inner, Some((cast(id1#16 as bigint) = id1#18L))
:- Project [id1#16]
:  +- Filter isnotnull(cast(id1#16 as bigint))
:     +- Project [id1#16]
:        +- Relation[id1#16,newCol#17] JSON part: struct<>, data: struct<id1:int,newCol:int>
+- Filter isnotnull(id1#18L)
   +- Relation[id1#18L] JSON part: struct<>, data: struct<id1:bigint>
```

This PR splits the optimizer rule `ColumnPruning` to `ColumnPruning` and `EliminateOperators`

The issue becomes worse when having another rule `NullFiltering`, which could add extra Filters for `IsNotNull`. We have to be careful when introducing extra `Filter` if the benefit is not large enough. Another PR will be submitted by sameeragarwal to handle this issue.

cc sameeragarwal marmbrus

In addition, `ColumnPruning` should not push `Project` through non-deterministic `Filter`. This could cause wrong results. This will be put in a separate PR.

cc davies cloud-fan yhuai

#### How was this patch tested?

Modified the existing test cases.

Author: gatorsmile <gatorsmile@gmail.com>

Closes apache#11682 from gatorsmile/viewDuplicateNames.
  • Loading branch information
gatorsmile authored and roygao94 committed Mar 22, 2016
1 parent 706f814 commit c59692f
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
PushPredicateThroughAggregate,
LimitPushDown,
ColumnPruning,
EliminateOperators,
// Operator combine
CollapseRepartition,
CollapseProject,
Expand Down Expand Up @@ -315,11 +316,7 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper {
* - LeftSemiJoin
*/
object ColumnPruning extends Rule[LogicalPlan] {
private def sameOutput(output1: Seq[Attribute], output2: Seq[Attribute]): Boolean =
output1.size == output2.size &&
output1.zip(output2).forall(pair => pair._1.semanticEquals(pair._2))

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
// Prunes the unused columns from project list of Project/Aggregate/Expand
case p @ Project(_, p2: Project) if (p2.outputSet -- p.references).nonEmpty =>
p.copy(child = p2.copy(projectList = p2.projectList.filter(p.references.contains)))
Expand Down Expand Up @@ -380,12 +377,6 @@ object ColumnPruning extends Rule[LogicalPlan] {
p.copy(child = w.copy(
windowExpressions = w.windowExpressions.filter(p.references.contains)))

// Eliminate no-op Window
case w: Window if w.windowExpressions.isEmpty => w.child

// Eliminate no-op Projects
case p @ Project(projectList, child) if sameOutput(child.output, p.output) => child

// Can't prune the columns on LeafNode
case p @ Project(_, l: LeafNode) => p

Expand All @@ -409,6 +400,24 @@ object ColumnPruning extends Rule[LogicalPlan] {
}
}

/**
* Eliminate no-op Project and Window.
*
* Note: this rule should be executed just after ColumnPruning.
*/
object EliminateOperators extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
// Eliminate no-op Projects
case p @ Project(projectList, child) if sameOutput(child.output, p.output) => child
// Eliminate no-op Window
case w: Window if w.windowExpressions.isEmpty => w.child
}

private def sameOutput(output1: Seq[Attribute], output2: Seq[Attribute]): Boolean =
output1.size == output2.size &&
output1.zip(output2).forall(pair => pair._1.semanticEquals(pair._2))
}

/**
* Combines two adjacent [[Project]] operators into one and perform alias substitution,
* merging the expressions into one single expression.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class ColumnPruningSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches = Batch("Column pruning", FixedPoint(100),
ColumnPruning,
EliminateOperators,
CollapseProject) :: Nil
}

Expand Down Expand Up @@ -327,8 +328,8 @@ class ColumnPruningSuite extends PlanTest {
val input2 = LocalRelation('c.int, 'd.string, 'e.double)
val query = Project('b :: Nil,
Union(input1 :: input2 :: Nil)).analyze
val expected = Project('b :: Nil,
Union(Project('b :: Nil, input1) :: Project('d :: Nil, input2) :: Nil)).analyze
val expected =
Union(Project('b :: Nil, input1) :: Project('d :: Nil, input2) :: Nil).analyze
comparePlans(Optimize.execute(query), expected)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ class CombiningLimitsSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
Batch("Filter Pushdown", FixedPoint(100),
ColumnPruning) ::
ColumnPruning,
EliminateOperators) ::
Batch("Combine Limit", FixedPoint(10),
CombineLimits) ::
Batch("Constant Folding", FixedPoint(10),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class JoinOptimizationSuite extends PlanTest {
PushPredicateThroughGenerate,
PushPredicateThroughAggregate,
ColumnPruning,
EliminateOperators,
CollapseProject) :: Nil

}
Expand Down

0 comments on commit c59692f

Please sign in to comment.