Skip to content

Commit

Permalink
[SPARK-28156][SQL][BACKPORT-2.4] Self-join should not miss cached view
Browse files Browse the repository at this point in the history
Back-port of apache#24960 to branch-2.4.

The issue is when self-join a cached view, only one side of join uses cached relation. The cause is in `ResolveReferences` we do deduplicate for a view to have new output attributes. Then in `AliasViewChild`, the rule adds extra project under a view. So it breaks cache matching.

The fix is when dedup, we only dedup a view which has output different to its child plan. Otherwise, we dedup on the view's child plan.

```scala
val df = Seq.tabulate(5) { x => (x, x + 1, x + 2, x + 3) }.toDF("a", "b", "c", "d")
df.write.mode("overwrite").format("orc").saveAsTable("table1")

sql("drop view if exists table1_vw")
sql("create view table1_vw as select * from table1")

val cachedView = sql("select a, b, c, d from table1_vw")

cachedView.createOrReplaceTempView("cachedview")
cachedView.persist()

val queryDf = sql(
  s"""select leftside.a, leftside.b
      |from cachedview leftside
      |join cachedview rightside
      |on leftside.a = rightside.a
    """.stripMargin)
```

Query plan before this PR:
```scala
== Physical Plan ==
*(2) Project [a#12664, b#12665]
+- *(2) BroadcastHashJoin [a#12664], [a#12660], Inner, BuildRight
   :- *(2) Filter isnotnull(a#12664)
   :  +- *(2) InMemoryTableScan [a#12664, b#12665], [isnotnull(a#12664)]
   :        +- InMemoryRelation [a#12664, b#12665, c#12666, d#12667], StorageLevel(disk, memory, deserialized, 1 replicas)
   :              +- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryF
ileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struc
t<a:int,b:int,c:int,d:int>
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)))
      +- *(1) Project [a#12660]
         +- *(1) Filter isnotnull(a#12660)
            +- *(1) FileScan orc default.table1[a#12660] Batched: true, DataFilters: [isnotnull(a#12660)], Format: ORC, Location: InMemoryFileIndex[fil
e:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: struc
t<a:int>
```

Query plan after this PR:
```scala
== Physical Plan ==
*(2) Project [a#12664, b#12665]
+- *(2) BroadcastHashJoin [a#12664], [a#12692], Inner, BuildRight
   :- *(2) Filter isnotnull(a#12664)
   :  +- *(2) InMemoryTableScan [a#12664, b#12665], [isnotnull(a#12664)]
   :        +- InMemoryRelation [a#12664, b#12665, c#12666, d#12667], StorageLevel(disk, memory, deserialized, 1 replicas)
   :              +- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryFileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int,b:int,c:int,d:int>
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
      +- *(1) Filter isnotnull(a#12692)
         +- *(1) InMemoryTableScan [a#12692], [isnotnull(a#12692)]
               +- InMemoryRelation [a#12692, b#12693, c#12694, d#12695], StorageLevel(disk, memory, deserialized, 1 replicas)
                     +- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryFileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int,b:int,c:int,d:int>
```

Added test.

Closes apache#25068 from bersprockets/selfjoin_24.

Lead-authored-by: Bruce Robbins <bersprockets@gmail.com>
Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
2 people authored and Raphaël Luta committed Sep 17, 2019
1 parent c5a8662 commit 23355bb
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,6 @@ class Analyzer(
TypeCoercion.typeCoercionRules(conf) ++
extendedResolutionRules : _*),
Batch("Post-Hoc Resolution", Once, postHocResolutionRules: _*),
Batch("View", Once,
AliasViewChild(conf)),
Batch("Nondeterministic", Once,
PullOutNondeterministic),
Batch("UDF", Once,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.optimizer.BooleanSimplification
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

/**
Expand Down Expand Up @@ -300,6 +301,48 @@ trait CheckAnalysis extends PredicateHelper {
}
}

// If the view output doesn't have the same number of columns neither with the child
// output, nor with the query column names, throw an AnalysisException.
// If the view's child output can't up cast to the view output,
// throw an AnalysisException, too.
case v @ View(desc, output, child) if child.resolved && output != child.output =>
val queryColumnNames = desc.viewQueryColumnNames
val queryOutput = if (queryColumnNames.nonEmpty) {
if (output.length != queryColumnNames.length) {
// If the view output doesn't have the same number of columns with the query column
// names, throw an AnalysisException.
throw new AnalysisException(
s"The view output ${output.mkString("[", ",", "]")} doesn't have the same" +
"number of columns with the query column names " +
s"${queryColumnNames.mkString("[", ",", "]")}")
}
val resolver = SQLConf.get.resolver
queryColumnNames.map { colName =>
child.output.find { attr =>
resolver(attr.name, colName)
}.getOrElse(throw new AnalysisException(
s"Attribute with name '$colName' is not found in " +
s"'${child.output.map(_.name).mkString("(", ",", ")")}'"))
}
} else {
child.output
}

output.zip(queryOutput).foreach {
case (attr, originAttr) if !attr.dataType.sameType(originAttr.dataType) =>
// The dataType of the output attributes may be not the same with that of the view
// output, so we should cast the attribute to the dataType of the view output
// attribute. Will throw an AnalysisException if the cast can't be performed or
// might truncate.
if (Cast.mayTruncate(originAttr.dataType, attr.dataType) ||
!Cast.canCast(originAttr.dataType, attr.dataType)) {
throw new AnalysisException(s"Cannot up cast ${originAttr.sql} from " +
s"${originAttr.dataType.catalogString} to ${attr.dataType.catalogString} " +
"as it may truncate\n")
}
case _ =>
}

case _ => // Fallbacks to the following checks
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast}
import org.apache.spark.sql.catalyst.expressions.Alias
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, View}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.internal.SQLConf
Expand All @@ -28,7 +27,12 @@ import org.apache.spark.sql.internal.SQLConf
*/

/**
* Make sure that a view's child plan produces the view's output attributes. We try to wrap the
* This rule has two goals:
*
* 1. Removes [[View]] operators from the plan. The operator is respected till the end of analysis
* stage because we want to see which part of an analyzed logical plan is generated from a view.
*
* 2. Make sure that a view's child plan produces the view's output attributes. We try to wrap the
* child by:
* 1. Generate the `queryOutput` by:
* 1.1. If the query column names are defined, map the column names to attributes in the child
Expand All @@ -41,27 +45,29 @@ import org.apache.spark.sql.internal.SQLConf
* 2. Map the `queryOutput` to view output by index, if the corresponding attributes don't match,
* try to up cast and alias the attribute in `queryOutput` to the attribute in the view output.
* 3. Add a Project over the child, with the new output generated by the previous steps.
* If the view output doesn't have the same number of columns neither with the child output, nor
* with the query column names, throw an AnalysisException.
*
* Once reaches this rule, it means `CheckAnalysis` did necessary checks on number of columns
* between the view output and the child output or the query column names. `CheckAnalysis` also
* checked the cast from the view's child to the Project is up-cast.
*
* This should be only done after the batch of Resolution, because the view attributes are not
* completely resolved during the batch of Resolution.
*/
case class AliasViewChild(conf: SQLConf) extends Rule[LogicalPlan] with CastSupport {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
object EliminateView extends Rule[LogicalPlan] with CastSupport {
override def conf: SQLConf = SQLConf.get

override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
// The child has the different output attributes with the View operator. Adds a Project over
// the child of the view.
case v @ View(desc, output, child) if child.resolved && output != child.output =>
val resolver = conf.resolver
val queryColumnNames = desc.viewQueryColumnNames
val queryOutput = if (queryColumnNames.nonEmpty) {
// If the view output doesn't have the same number of columns with the query column names,
// throw an AnalysisException.
if (output.length != queryColumnNames.length) {
throw new AnalysisException(
s"The view output ${output.mkString("[", ",", "]")} doesn't have the same number of " +
s"columns with the query column names ${queryColumnNames.mkString("[", ",", "]")}")
}
// Find the attribute that has the expected attribute name from an attribute list, the names
// are compared using conf.resolver.
// `CheckAnalysis` already guarantees the expected attribute can be found for sure.
desc.viewQueryColumnNames.map { colName =>
findAttributeByName(colName, child.output, resolver)
child.output.find(attr => resolver(attr.name, colName)).get
}
} else {
// For view created before Spark 2.2.0, the view text is already fully qualified, the plan
Expand All @@ -70,52 +76,17 @@ case class AliasViewChild(conf: SQLConf) extends Rule[LogicalPlan] with CastSupp
}
// Map the attributes in the query output to the attributes in the view output by index.
val newOutput = output.zip(queryOutput).map {
case (attr, originAttr) if attr != originAttr =>
// The dataType of the output attributes may be not the same with that of the view
// output, so we should cast the attribute to the dataType of the view output attribute.
// Will throw an AnalysisException if the cast can't perform or might truncate.
if (Cast.mayTruncate(originAttr.dataType, attr.dataType)) {
throw new AnalysisException(s"Cannot up cast ${originAttr.sql} from " +
s"${originAttr.dataType.catalogString} to ${attr.dataType.catalogString} as it " +
s"may truncate\n")
} else {
Alias(cast(originAttr, attr.dataType), attr.name)(exprId = attr.exprId,
qualifier = attr.qualifier, explicitMetadata = Some(attr.metadata))
}
case (attr, originAttr) if !attr.semanticEquals(originAttr) =>
// `CheckAnalysis` already guarantees that the cast is a up-cast for sure.
Alias(cast(originAttr, attr.dataType), attr.name)(exprId = attr.exprId,
qualifier = attr.qualifier, explicitMetadata = Some(attr.metadata))
case (_, originAttr) => originAttr
}
v.copy(child = Project(newOutput, child))
}
Project(newOutput, child)

/**
* Find the attribute that has the expected attribute name from an attribute list, the names
* are compared using conf.resolver.
* If the expected attribute is not found, throw an AnalysisException.
*/
private def findAttributeByName(
name: String,
attrs: Seq[Attribute],
resolver: Resolver): Attribute = {
attrs.find { attr =>
resolver(attr.name, name)
}.getOrElse(throw new AnalysisException(
s"Attribute with name '$name' is not found in " +
s"'${attrs.map(_.name).mkString("(", ",", ")")}'"))
}
}

/**
* Removes [[View]] operators from the plan. The operator is respected till the end of analysis
* stage because we want to see which part of an analyzed logical plan is generated from a view.
*/
object EliminateView extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// The child should have the same output attributes with the View operator, so we simply
// remove the View operator.
case View(_, output, child) =>
assert(output == child.output,
s"The output of the child ${child.output.mkString("[", ",", "]")} is different from the " +
s"view output ${output.mkString("[", ",", "]")}")
case View(_, _, child) =>
child
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,9 @@ case class View(
output: Seq[Attribute],
child: LogicalPlan) extends LogicalPlan with MultiInstanceRelation {

@transient
override lazy val references: AttributeSet = AttributeSet.empty

override lazy val resolved: Boolean = child.resolved

override def children: Seq[LogicalPlan] = child :: Nil
Expand Down
31 changes: 31 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.execution.aggregate
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec}
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.datasources.FilePartition
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec}
import org.apache.spark.sql.functions._
Expand Down Expand Up @@ -3003,6 +3004,36 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
}
}

test("SPARK-28156: self-join should not miss cached view") {
withTable("table1") {
withView("table1_vw") {
val df = Seq.tabulate(5) { x => (x, x + 1, x + 2, x + 3) }.toDF("a", "b", "c", "d")
df.write.mode("overwrite").format("orc").saveAsTable("table1")
sql("drop view if exists table1_vw")
sql("create view table1_vw as select * from table1")

val cachedView = sql("select a, b, c, d from table1_vw")

cachedView.createOrReplaceTempView("cachedview")
cachedView.persist()

val queryDf = sql(
s"""select leftside.a, leftside.b
|from cachedview leftside
|join cachedview rightside
|on leftside.a = rightside.a
""".stripMargin)

val inMemoryTableScan = queryDf.queryExecution.executedPlan.collect {
case i: InMemoryTableScanExec => i
}
assert(inMemoryTableScan.size == 2)
checkAnswer(queryDf, Row(0, 1) :: Row(1, 2) :: Row(2, 3) :: Row(3, 4) :: Row(4, 5) :: Nil)
}
}

}
}

case class Foo(bar: Option[String])

0 comments on commit 23355bb

Please sign in to comment.