Skip to content

Commit

Permalink
[SPARK-13523] [SQL] Reuse exchanges in a query
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

It’s possible to have common parts in a query, for example, self join, it will be good to avoid the duplicated part to same CPUs and memory (Broadcast or cache).

Exchange will materialize the underlying RDD by shuffle or collect, it’s a great point to check duplicates and reuse them. Duplicated exchanges means they generate exactly the same result inside a query.

In order to find out the duplicated exchanges, we should be able to compare SparkPlan to check that they have same results or not. We already have that for LogicalPlan, so we should move that into QueryPlan to make it available for SparkPlan.

Once we can find the duplicated exchanges, we should replace all of them with same SparkPlan object (could be wrapped by ReusedExchage for explain), then the plan tree become a DAG. Since all the planner only work with tree, so this rule should be the last one for the entire planning.

After the rule, the plan will looks like:

```
WholeStageCodegen
:  +- Project [id#0L]
:     +- BroadcastHashJoin [id#0L], [id#2L], Inner, BuildRight, None
:        :- Project [id#0L]
:        :  +- BroadcastHashJoin [id#0L], [id#1L], Inner, BuildRight, None
:        :     :- Range 0, 1, 4, 1024, [id#0L]
:        :     +- INPUT
:        +- INPUT
:- BroadcastExchange HashedRelationBroadcastMode(true,List(id#1L),List(id#1L))
:  +- WholeStageCodegen
:     :  +- Range 0, 1, 4, 1024, [id#1L]
+- ReusedExchange [id#2L], BroadcastExchange HashedRelationBroadcastMode(true,List(id#1L),List(id#1L))
```

![bjoin](https://cloud.githubusercontent.com/assets/40902/13414787/209e8c5c-df0a-11e5-8a0f-edff69d89e83.png)

For three ways SortMergeJoin,
```
== Physical Plan ==
WholeStageCodegen
:  +- Project [id#0L]
:     +- SortMergeJoin [id#0L], [id#4L], None
:        :- INPUT
:        +- INPUT
:- WholeStageCodegen
:  :  +- Project [id#0L]
:  :     +- SortMergeJoin [id#0L], [id#3L], None
:  :        :- INPUT
:  :        +- INPUT
:  :- WholeStageCodegen
:  :  :  +- Sort [id#0L ASC], false, 0
:  :  :     +- INPUT
:  :  +- Exchange hashpartitioning(id#0L, 200), None
:  :     +- WholeStageCodegen
:  :        :  +- Range 0, 1, 4, 33554432, [id#0L]
:  +- WholeStageCodegen
:     :  +- Sort [id#3L ASC], false, 0
:     :     +- INPUT
:     +- ReusedExchange [id#3L], Exchange hashpartitioning(id#0L, 200), None
+- WholeStageCodegen
   :  +- Sort [id#4L ASC], false, 0
   :     +- INPUT
   +- ReusedExchange [id#4L], Exchange hashpartitioning(id#0L, 200), None
```
![sjoin](https://cloud.githubusercontent.com/assets/40902/13414790/27aea61c-df0a-11e5-8cbf-fbc985c31d95.png)

If the same ShuffleExchange or BroadcastExchange, execute()/executeBroadcast() will be called by different parents, they should cached the RDD/Broadcast, return the same one for all the parents.

## How was this patch tested?

Added some unit tests for this.  Had done some manual tests on TPCDS query Q59 and Q64, we can see some exchanges are re-used (this requires a change in PhysicalRDD to for sameResult, is be done in apache#11514 ).

Author: Davies Liu <davies@databricks.com>

Closes apache#11403 from davies/dedup.
  • Loading branch information
Davies Liu authored and roygao94 committed Mar 22, 2016
1 parent bfa84d7 commit 6678064
Show file tree
Hide file tree
Showing 16 changed files with 403 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.types.{DataType, StructType}

abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanType] {
abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanType] {
self: PlanType =>

def output: Seq[Attribute]
Expand Down Expand Up @@ -237,4 +237,65 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy
}

override def innerChildren: Seq[PlanType] = subqueries

/**
* Canonicalized copy of this query plan.
*/
protected lazy val canonicalized: PlanType = this

/**
* Returns true when the given query plan will return the same results as this query plan.
*
* Since its likely undecidable to generally determine if two given plans will produce the same
* results, it is okay for this function to return false, even if the results are actually
* the same. Such behavior will not affect correctness, only the application of performance
* enhancements like caching. However, it is not acceptable to return true if the results could
* possibly be different.
*
* By default this function performs a modified version of equality that is tolerant of cosmetic
* differences like attribute naming and or expression id differences. Operators that
* can do better should override this function.
*/
def sameResult(plan: PlanType): Boolean = {
val canonicalizedLeft = this.canonicalized
val canonicalizedRight = plan.canonicalized
canonicalizedLeft.getClass == canonicalizedRight.getClass &&
canonicalizedLeft.children.size == canonicalizedRight.children.size &&
canonicalizedLeft.cleanArgs == canonicalizedRight.cleanArgs &&
(canonicalizedLeft.children, canonicalizedRight.children).zipped.forall(_ sameResult _)
}

/**
* All the attributes that are used for this plan.
*/
lazy val allAttributes: Seq[Attribute] = children.flatMap(_.output)

private def cleanExpression(e: Expression): Expression = e match {
case a: Alias =>
// As the root of the expression, Alias will always take an arbitrary exprId, we need
// to erase that for equality testing.
val cleanedExprId =
Alias(a.child, a.name)(ExprId(-1), a.qualifiers, isGenerated = a.isGenerated)
BindReferences.bindReference(cleanedExprId, allAttributes, allowFailures = true)
case other =>
BindReferences.bindReference(other, allAttributes, allowFailures = true)
}

/** Args that have cleaned such that differences in expression id should not affect equality */
protected lazy val cleanArgs: Seq[Any] = {
def cleanArg(arg: Any): Any = arg match {
case e: Expression => cleanExpression(e).canonicalized
case other => other
}

productIterator.map {
// Children are checked using sameResult above.
case tn: TreeNode[_] if containsChild(tn) => null
case e: Expression => cleanArg(e)
case s: Option[_] => s.map(cleanArg)
case s: Seq[_] => s.map(cleanArg)
case m: Map[_, _] => m.mapValues(cleanArg)
case other => other
}.toSeq
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,60 +114,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
*/
def childrenResolved: Boolean = children.forall(_.resolved)

/**
* Returns true when the given logical plan will return the same results as this logical plan.
*
* Since its likely undecidable to generally determine if two given plans will produce the same
* results, it is okay for this function to return false, even if the results are actually
* the same. Such behavior will not affect correctness, only the application of performance
* enhancements like caching. However, it is not acceptable to return true if the results could
* possibly be different.
*
* By default this function performs a modified version of equality that is tolerant of cosmetic
* differences like attribute naming and or expression id differences. Logical operators that
* can do better should override this function.
*/
def sameResult(plan: LogicalPlan): Boolean = {
val cleanLeft = EliminateSubqueryAliases(this)
val cleanRight = EliminateSubqueryAliases(plan)

cleanLeft.getClass == cleanRight.getClass &&
cleanLeft.children.size == cleanRight.children.size && {
logDebug(
s"[${cleanRight.cleanArgs.mkString(", ")}] == [${cleanLeft.cleanArgs.mkString(", ")}]")
cleanRight.cleanArgs == cleanLeft.cleanArgs
} &&
(cleanLeft.children, cleanRight.children).zipped.forall(_ sameResult _)
}

/** Args that have cleaned such that differences in expression id should not affect equality */
protected lazy val cleanArgs: Seq[Any] = {
val input = children.flatMap(_.output)
def cleanExpression(e: Expression) = e match {
case a: Alias =>
// As the root of the expression, Alias will always take an arbitrary exprId, we need
// to erase that for equality testing.
val cleanedExprId =
Alias(a.child, a.name)(ExprId(-1), a.qualifiers, isGenerated = a.isGenerated)
BindReferences.bindReference(cleanedExprId, input, allowFailures = true)
case other => BindReferences.bindReference(other, input, allowFailures = true)
}

productIterator.map {
// Children are checked using sameResult above.
case tn: TreeNode[_] if containsChild(tn) => null
case e: Expression => cleanExpression(e)
case s: Option[_] => s.map {
case e: Expression => cleanExpression(e)
case other => other
}
case s: Seq[_] => s.map {
case e: Expression => cleanExpression(e)
case other => other
}
case other => other
}.toSeq
}
override lazy val canonicalized: LogicalPlan = EliminateSubqueryAliases(this)

/**
* Optionally resolves the given strings to a [[NamedExpression]] using the input from all child
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ import org.apache.spark.sql.catalyst.InternalRow
*/
trait BroadcastMode {
def transform(rows: Array[InternalRow]): Any

/**
* Returns true iff this [[BroadcastMode]] generates the same result as `other`.
*/
def compatibleWith(other: BroadcastMode): Boolean
}

/**
Expand All @@ -33,4 +38,8 @@ trait BroadcastMode {
case object IdentityBroadcastMode extends BroadcastMode {
// TODO: pack the UnsafeRows into single bytes array.
override def transform(rows: Array[InternalRow]): Array[InternalRow] = rows

override def compatibleWith(other: BroadcastMode): Boolean = {
this eq other
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.execution

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.execution.exchange.ReusedExchange
import org.apache.spark.sql.execution.metric.SQLMetricInfo
import org.apache.spark.util.Utils

Expand All @@ -31,13 +32,28 @@ class SparkPlanInfo(
val simpleString: String,
val children: Seq[SparkPlanInfo],
val metadata: Map[String, String],
val metrics: Seq[SQLMetricInfo])
val metrics: Seq[SQLMetricInfo]) {

override def hashCode(): Int = {
// hashCode of simpleString should be good enough to distinguish the plans from each other
// within a plan
simpleString.hashCode
}

override def equals(other: Any): Boolean = other match {
case o: SparkPlanInfo =>
nodeName == o.nodeName && simpleString == o.simpleString && children == o.children
case _ => false
}
}

private[sql] object SparkPlanInfo {

def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = {

val children = plan.children ++ plan.subqueries
val children = plan match {
case ReusedExchange(_, child) => child :: Nil
case _ => plan.children ++ plan.subqueries
}
val metrics = plan.metrics.toSeq.map { case (key, metric) =>
new SQLMetricInfo(metric.name.getOrElse(key), metric.id,
Utils.getFormattedClassName(metric.param))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ case class TungstenAggregate(

require(TungstenAggregate.supportsAggregate(aggregateBufferAttributes))

override lazy val allAttributes: Seq[Attribute] =
child.output ++ aggregateBufferAttributes ++ aggregateAttributes ++
aggregateExpressions.flatMap(_.aggregateFunction.inputAggBufferAttributes)

override private[sql] lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"),
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ case class Range(
private[sql] override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))

// output attributes should not affect the results
override lazy val cleanArgs: Seq[Any] = Seq(start, step, numSlices, numElements)

override def upstreams(): Seq[RDD[InternalRow]] = {
sqlContext.sparkContext.parallelize(0 until numSlices, numSlices)
.map(i => InternalRow(i)) :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,16 @@ import org.apache.spark.util.ThreadUtils
*/
case class BroadcastExchange(
mode: BroadcastMode,
child: SparkPlan) extends UnaryNode {

override def output: Seq[Attribute] = child.output
child: SparkPlan) extends Exchange {

override def outputPartitioning: Partitioning = BroadcastPartitioning(mode)

override def sameResult(plan: SparkPlan): Boolean = plan match {
case p: BroadcastExchange =>
mode.compatibleWith(p.mode) && child.sameResult(p.child)
case _ => false
}

@transient
private val timeout: Duration = {
val timeoutValue = sqlContext.conf.broadcastTimeout
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.exchange

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode}
import org.apache.spark.sql.types.StructType

/**
* An interface for exchanges.
*/
abstract class Exchange extends UnaryNode {
override def output: Seq[Attribute] = child.output
}

/**
* A wrapper for reused exchange to have different output, because two exchanges which produce
* logically identical output will have distinct sets of output attribute ids, so we need to
* preserve the original ids because they're what downstream operators are expecting.
*/
case class ReusedExchange(override val output: Seq[Attribute], child: Exchange) extends LeafNode {

override def sameResult(plan: SparkPlan): Boolean = {
// Ignore this wrapper. `plan` could also be a ReusedExchange, so we reverse the order here.
plan.sameResult(child)
}

def doExecute(): RDD[InternalRow] = {
child.execute()
}

override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
child.executeBroadcast()
}

// Do not repeat the same tree in explain.
override def treeChildren: Seq[SparkPlan] = Nil
}

/**
* Find out duplicated exchanges in the spark plan, then use the same exchange for all the
* references.
*/
private[sql] case class ReuseExchange(sqlContext: SQLContext) extends Rule[SparkPlan] {

def apply(plan: SparkPlan): SparkPlan = {
if (!sqlContext.conf.exchangeReuseEnabled) {
return plan
}
// Build a hash map using schema of exchanges to avoid O(N*N) sameResult calls.
val exchanges = mutable.HashMap[StructType, ArrayBuffer[Exchange]]()
plan.transformUp {
case exchange: Exchange =>
// the exchanges that have same results usually also have same schemas (same column names).
val sameSchema = exchanges.getOrElseUpdate(exchange.schema, ArrayBuffer[Exchange]())
val samePlan = sameSchema.find { e =>
exchange.sameResult(e)
}
if (samePlan.isDefined) {
// Keep the output of this exchange, the following plans require that to resolve
// attributes.
ReusedExchange(exchange.output, samePlan.get)
} else {
sameSchema += exchange
exchange
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.util.MutablePair
case class ShuffleExchange(
var newPartitioning: Partitioning,
child: SparkPlan,
@transient coordinator: Option[ExchangeCoordinator]) extends UnaryNode {
@transient coordinator: Option[ExchangeCoordinator]) extends Exchange {

override def nodeName: String = {
val extraInfo = coordinator match {
Expand All @@ -55,8 +55,6 @@ case class ShuffleExchange(

override def outputPartitioning: Partitioning = newPartitioning

override def output: Seq[Attribute] = child.output

private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)

override protected def doPrepare(): Unit = {
Expand Down Expand Up @@ -103,16 +101,25 @@ case class ShuffleExchange(
new ShuffledRowRDD(shuffleDependency, specifiedPartitionStartIndices)
}

/**
* Caches the created ShuffleRowRDD so we can reuse that.
*/
private var cachedShuffleRDD: ShuffledRowRDD = null

protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
coordinator match {
case Some(exchangeCoordinator) =>
val shuffleRDD = exchangeCoordinator.postShuffleRDD(this)
assert(shuffleRDD.partitions.length == newPartitioning.numPartitions)
shuffleRDD
case None =>
val shuffleDependency = prepareShuffleDependency()
preparePostShuffleRDD(shuffleDependency)
// Returns the same ShuffleRowRDD if this plan is used by multiple plans.
if (cachedShuffleRDD == null) {
cachedShuffleRDD = coordinator match {
case Some(exchangeCoordinator) =>
val shuffleRDD = exchangeCoordinator.postShuffleRDD(this)
assert(shuffleRDD.partitions.length == newPartitioning.numPartitions)
shuffleRDD
case None =>
val shuffleDependency = prepareShuffleDependency()
preparePostShuffleRDD(shuffleDependency)
}
}
cachedShuffleRDD
}
}

Expand Down
Loading

0 comments on commit 6678064

Please sign in to comment.