Skip to content

Commit

Permalink
[SPARK-20700][SQL] InferFiltersFromConstraints stackoverflows for que…
Browse files Browse the repository at this point in the history
…ry (v2)

## What changes were proposed in this pull request?

In the previous approach we used `aliasMap` to link an `Attribute` to the expression with potentially the form `f(a, b)`, but we only searched the `expressions` and `children.expressions` for this, which is not enough when an `Alias` may lies deep in the logical plan. In that case, we can't generate the valid equivalent constraint classes and thus we fail at preventing the recursive deductions.

We fix this problem by collecting all `Alias`s from the logical plan.

## How was this patch tested?

No additional test case is added, but do modified one test case to cover this situation.

Author: Xingbo Jiang <xingbo.jiang@databricks.com>

Closes apache#18020 from jiangxb1987/inferConstrants.
  • Loading branch information
jiangxb1987 authored and liyichao committed May 24, 2017
1 parent 9a8b9b5 commit d0d4ed4
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 19 deletions.
Expand Up @@ -81,11 +81,12 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
case _ => Seq.empty[Attribute]
}

// Collect aliases from expressions, so we may avoid producing recursive constraints.
private lazy val aliasMap = AttributeMap(
(expressions ++ children.flatMap(_.expressions)).collect {
// Collect aliases from expressions of the whole tree rooted by the current QueryPlan node, so
// we may avoid producing recursive constraints.
private lazy val aliasMap: AttributeMap[Expression] = AttributeMap(
expressions.collect {
case a: Alias => (a.toAttribute, a.child)
})
} ++ children.flatMap(_.aliasMap))

/**
* Infers an additional set of constraints from a given set of equality constraints.
Expand Down
Expand Up @@ -33,7 +33,8 @@ class InferFiltersFromConstraintsSuite extends PlanTest {
PushPredicateThroughJoin,
PushDownPredicate,
InferFiltersFromConstraints(conf),
CombineFilters) :: Nil
CombineFilters,
BooleanSimplification) :: Nil
}

object OptimizeWithConstraintPropagationDisabled extends RuleExecutor[LogicalPlan] {
Expand Down Expand Up @@ -172,30 +173,31 @@ class InferFiltersFromConstraintsSuite extends PlanTest {
val t1 = testRelation.subquery('t1)
val t2 = testRelation.subquery('t2)

val originalQuery = t1.select('a, 'b.as('d), Coalesce(Seq('a, 'b)).as('int_col)).as("t")
// We should prevent `Coalese(a, b)` from recursively creating complicated constraints through
// the constraint inference procedure.
val originalQuery = t1.select('a, 'b.as('d), Coalesce(Seq('a, 'b)).as('int_col))
// We hide an `Alias` inside the child's child's expressions, to cover the situation reported
// in [SPARK-20700].
.select('int_col, 'd, 'a).as("t")
.join(t2, Inner,
Some("t.a".attr === "t2.a".attr
&& "t.d".attr === "t2.a".attr
&& "t.int_col".attr === "t2.a".attr))
.analyze
val correctAnswer = t1
.where(IsNotNull('a) && IsNotNull(Coalesce(Seq('a, 'a)))
&& 'a === Coalesce(Seq('a, 'a)) && 'a <=> Coalesce(Seq('a, 'a)) && 'a <=> 'a
&& Coalesce(Seq('a, 'a)) <=> 'b && Coalesce(Seq('a, 'a)) <=> Coalesce(Seq('a, 'a))
&& 'a === 'b && IsNotNull(Coalesce(Seq('a, 'b))) && 'a === Coalesce(Seq('a, 'b))
&& Coalesce(Seq('a, 'b)) <=> Coalesce(Seq('b, 'b)) && Coalesce(Seq('a, 'b)) === 'b
&& 'a === Coalesce(Seq('a, 'a)) && 'a <=> Coalesce(Seq('a, 'a))
&& Coalesce(Seq('b, 'b)) <=> 'a && 'a === 'b && IsNotNull(Coalesce(Seq('a, 'b)))
&& 'a === Coalesce(Seq('a, 'b)) && Coalesce(Seq('a, 'b)) === 'b
&& IsNotNull('b) && IsNotNull(Coalesce(Seq('b, 'b)))
&& 'b === Coalesce(Seq('b, 'b)) && 'b <=> Coalesce(Seq('b, 'b))
&& Coalesce(Seq('b, 'b)) <=> Coalesce(Seq('b, 'b)) && 'b <=> 'b)
.select('a, 'b.as('d), Coalesce(Seq('a, 'b)).as('int_col)).as("t")
&& 'b === Coalesce(Seq('b, 'b)) && 'b <=> Coalesce(Seq('b, 'b)))
.select('a, 'b.as('d), Coalesce(Seq('a, 'b)).as('int_col))
.select('int_col, 'd, 'a).as("t")
.join(t2
.where(IsNotNull('a) && IsNotNull(Coalesce(Seq('a, 'a)))
&& 'a === Coalesce(Seq('a, 'a)) && 'a <=> Coalesce(Seq('a, 'a)) && 'a <=> 'a
&& Coalesce(Seq('a, 'a)) <=> Coalesce(Seq('a, 'a))), Inner,
Some("t.a".attr === "t2.a".attr
&& "t.d".attr === "t2.a".attr
&& "t.int_col".attr === "t2.a".attr
&& Coalesce(Seq("t.d".attr, "t.d".attr)) <=> "t.int_col".attr))
&& 'a <=> Coalesce(Seq('a, 'a)) && 'a === Coalesce(Seq('a, 'a)) && 'a <=> 'a), Inner,
Some("t.a".attr === "t2.a".attr && "t.d".attr === "t2.a".attr
&& "t.int_col".attr === "t2.a".attr))
.analyze
val optimized = Optimize.execute(originalQuery)
comparePlans(optimized, correctAnswer)
Expand Down

0 comments on commit d0d4ed4

Please sign in to comment.