Skip to content

Commit

Permalink
[SPARK-13671] [SPARK-13311] [SQL] Use different physical plans for RD…
Browse files Browse the repository at this point in the history
…D and data sources

## What changes were proposed in this pull request?

This PR split the PhysicalRDD into two classes, PhysicalRDD and PhysicalScan. PhysicalRDD is used for DataFrames that is created from existing RDD. PhysicalScan is used for DataFrame that is created from data sources. This enable use to apply different optimization on both of them.

Also fix the problem for sameResult() on two DataSourceScan.

Also fix the equality check to toString for `In`. It's better to use Seq there, but we can't break this public API (sad).

## How was this patch tested?

Existing tests. Manually tested with TPCDS query Q59 and Q64, all those duplicated exchanges can be re-used now, also saw there are 40+% performance improvement (saving half of the scan).

Author: Davies Liu <davies@databricks.com>

Closes apache#11514 from davies/existing_rdd.
  • Loading branch information
Davies Liu authored and jeanlyn committed Mar 17, 2016
1 parent 5915092 commit 79ff2c2
Show file tree
Hide file tree
Showing 12 changed files with 110 additions and 64 deletions.
3 changes: 1 addition & 2 deletions python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,7 @@ def explain(self, extended=False):
>>> df.explain()
== Physical Plan ==
WholeStageCodegen
: +- Scan ExistingRDD[age#0,name#1]
Scan ExistingRDD[age#0,name#1]
>>> df.explain(True)
== Parsed Logical Plan ==
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,12 +280,12 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
* can do better should override this function.
*/
def sameResult(plan: PlanType): Boolean = {
val canonicalizedLeft = this.canonicalized
val canonicalizedRight = plan.canonicalized
canonicalizedLeft.getClass == canonicalizedRight.getClass &&
canonicalizedLeft.children.size == canonicalizedRight.children.size &&
canonicalizedLeft.cleanArgs == canonicalizedRight.cleanArgs &&
(canonicalizedLeft.children, canonicalizedRight.children).zipped.forall(_ sameResult _)
val left = this.canonicalized
val right = plan.canonicalized
left.getClass == right.getClass &&
left.children.size == right.children.size &&
left.cleanArgs == right.cleanArgs &&
(left.children, right.children).zipped.forall(_ sameResult _)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,17 +101,76 @@ private[sql] case class LogicalRDD(
private[sql] case class PhysicalRDD(
output: Seq[Attribute],
rdd: RDD[InternalRow],
override val nodeName: String,
override val metadata: Map[String, String] = Map.empty,
isUnsafeRow: Boolean = false,
override val outputPartitioning: Partitioning = UnknownPartitioning(0))
override val nodeName: String) extends LeafNode {

private[sql] override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))

protected override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
rdd.mapPartitionsInternal { iter =>
val proj = UnsafeProjection.create(schema)
iter.map { r =>
numOutputRows += 1
proj(r)
}
}
}

override def simpleString: String = {
s"Scan $nodeName${output.mkString("[", ",", "]")}"
}
}

/** Physical plan node for scanning data from a relation. */
private[sql] case class DataSourceScan(
output: Seq[Attribute],
rdd: RDD[InternalRow],
@transient relation: BaseRelation,
override val metadata: Map[String, String] = Map.empty)
extends LeafNode with CodegenSupport {

override val nodeName: String = relation.toString

// Ignore rdd when checking results
override def sameResult(plan: SparkPlan ): Boolean = plan match {
case other: DataSourceScan => relation == other.relation && metadata == other.metadata
case _ => false
}

private[sql] override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))

val outputUnsafeRows = relation match {
case r: HadoopFsRelation if r.fileFormat.isInstanceOf[ParquetSource] =>
!SQLContext.getActive().get.conf.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED)
case _: HadoopFsRelation => true
case _ => false
}

override val outputPartitioning = {
val bucketSpec = relation match {
// TODO: this should be closer to bucket planning.
case r: HadoopFsRelation if r.sqlContext.conf.bucketingEnabled() => r.bucketSpec
case _ => None
}

def toAttribute(colName: String): Attribute = output.find(_.name == colName).getOrElse {
throw new AnalysisException(s"bucket column $colName not found in existing columns " +
s"(${output.map(_.name).mkString(", ")})")
}

bucketSpec.map { spec =>
val numBuckets = spec.numBuckets
val bucketColumns = spec.bucketColumnNames.map(toAttribute)
HashPartitioning(bucketColumns, numBuckets)
}.getOrElse {
UnknownPartitioning(0)
}
}

protected override def doExecute(): RDD[InternalRow] = {
val unsafeRow = if (isUnsafeRow) {
val unsafeRow = if (outputUnsafeRows) {
rdd
} else {
rdd.mapPartitionsInternal { iter =>
Expand Down Expand Up @@ -187,7 +246,7 @@ private[sql] case class PhysicalRDD(
ctx.INPUT_ROW = row
ctx.currentVars = null
val columns2 = exprs.map(_.gen(ctx))
val inputRow = if (isUnsafeRow) row else null
val inputRow = if (outputUnsafeRows) row else null
val scanRows = ctx.freshName("processRows")
ctx.addNewFunction(scanRows,
s"""
Expand Down Expand Up @@ -221,42 +280,8 @@ private[sql] case class PhysicalRDD(
}
}

private[sql] object PhysicalRDD {
private[sql] object DataSourceScan {
// Metadata keys
val INPUT_PATHS = "InputPaths"
val PUSHED_FILTERS = "PushedFilters"

def createFromDataSource(
output: Seq[Attribute],
rdd: RDD[InternalRow],
relation: BaseRelation,
metadata: Map[String, String] = Map.empty): PhysicalRDD = {

val outputUnsafeRows = relation match {
case r: HadoopFsRelation if r.fileFormat.isInstanceOf[ParquetSource] =>
!SQLContext.getActive().get.conf.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED)
case _: HadoopFsRelation => true
case _ => false
}

val bucketSpec = relation match {
// TODO: this should be closer to bucket planning.
case r: HadoopFsRelation if r.sqlContext.conf.bucketingEnabled() => r.bucketSpec
case _ => None
}

def toAttribute(colName: String): Attribute = output.find(_.name == colName).getOrElse {
throw new AnalysisException(s"bucket column $colName not found in existing columns " +
s"(${output.map(_.name).mkString(", ")})")
}

bucketSpec.map { spec =>
val numBuckets = spec.numBuckets
val bucketColumns = spec.bucketColumnNames.map(toAttribute)
val partitioning = HashPartitioning(bucketColumns, numBuckets)
PhysicalRDD(output, rdd, relation.toString, metadata, outputUnsafeRows, partitioning)
}.getOrElse {
PhysicalRDD(output, rdd, relation.toString, metadata, outputUnsafeRows)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ trait CodegenSupport extends SparkPlan {
case _: BroadcastHashJoin => "bhj"
case _: SortMergeJoin => "smj"
case _: PhysicalRDD => "rdd"
case _: DataSourceScan => "scan"
case _ => nodeName.toLowerCase
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.PhysicalRDD.{INPUT_PATHS, PUSHED_FILTERS}
import org.apache.spark.sql.execution.DataSourceScan.{INPUT_PATHS, PUSHED_FILTERS}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.ExecutedCommand
import org.apache.spark.sql.execution.vectorized.{ColumnarBatch, ColumnVectorUtils}
Expand Down Expand Up @@ -239,7 +239,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
}

case l @ LogicalRelation(baseRelation: TableScan, _, _) =>
execution.PhysicalRDD.createFromDataSource(
execution.DataSourceScan(
l.output, toCatalystRDD(l, baseRelation.buildScan()), baseRelation) :: Nil

case i @ logical.InsertIntoTable(l @ LogicalRelation(t: InsertableRelation, _, _),
Expand Down Expand Up @@ -639,7 +639,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
// Don't request columns that are only referenced by pushed filters.
.filterNot(handledSet.contains)

val scan = execution.PhysicalRDD.createFromDataSource(
val scan = execution.DataSourceScan(
projects.map(_.toAttribute),
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
relation.relation, metadata)
Expand All @@ -649,7 +649,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
val requestedColumns =
(projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq

val scan = execution.PhysicalRDD.createFromDataSource(
val scan = execution.DataSourceScan(
requestedColumns,
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
relation.relation, metadata)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ private[sql] object SparkPlanGraph {
case "Subquery" if subgraph != null =>
// Subquery should not be included in WholeStageCodegen
buildSparkPlanGraphNode(planInfo, nodeIdGenerator, nodes, edges, parent, null, exchanges)
case "ReusedExchange" =>
// Point to the re-used exchange
val node = exchanges(planInfo.children.head)
edges += SparkPlanGraphEdge(node.id, parent.id)
case name =>
val metrics = planInfo.metrics.map { metric =>
SQLPlanMetric(metric.name, metric.accumulatorId,
Expand All @@ -106,7 +110,7 @@ private[sql] object SparkPlanGraph {
} else {
subgraph.nodes += node
}
if (name == "ShuffleExchange" || name == "BroadcastExchange") {
if (name.contains("Exchange")) {
exchanges += planInfo -> node
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,24 @@ case class LessThanOrEqual(attribute: String, value: Any) extends Filter
*
* @since 1.3.0
*/
case class In(attribute: String, values: Array[Any]) extends Filter
case class In(attribute: String, values: Array[Any]) extends Filter {
override def hashCode(): Int = {
var h = attribute.hashCode
values.foreach { v =>
h *= 41
h += v.hashCode()
}
h
}
override def equals(o: Any): Boolean = o match {
case In(a, vs) =>
a == attribute && vs.length == values.length && vs.zip(values).forall(x => x._1 == x._2)
case _ => false
}
override def toString: String = {
s"In($attribute, [${values.mkString(",")}]"
}
}

/**
* A filter that evaluates to `true` iff the attribute evaluates to null.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.scalatest.{BeforeAndAfter, PrivateMethodTester}

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.execution.PhysicalRDD
import org.apache.spark.sql.execution.DataSourceScan
import org.apache.spark.sql.execution.command.ExplainCommand
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD
Expand Down Expand Up @@ -210,8 +210,8 @@ class JDBCSuite extends SparkFunSuite
// the plan only has PhysicalRDD to scan JDBCRelation.
assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen])
val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen]
assert(node.child.isInstanceOf[org.apache.spark.sql.execution.PhysicalRDD])
assert(node.child.asInstanceOf[PhysicalRDD].nodeName.contains("JDBCRelation"))
assert(node.child.isInstanceOf[org.apache.spark.sql.execution.DataSourceScan])
assert(node.child.asInstanceOf[DataSourceScan].nodeName.contains("JDBCRelation"))
df
}
assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID < 1")).collect().size == 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic
try {
val queryExecution = sql(sqlString).queryExecution
val rawPlan = queryExecution.executedPlan.collect {
case p: execution.PhysicalRDD => p
case p: execution.DataSourceScan => p
} match {
case Seq(p) => p
case _ => fail(s"More than one PhysicalRDD found\n$queryExecution")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class PrunedScanSuite extends DataSourceTest with SharedSQLContext {
try {
val queryExecution = sql(sqlString).queryExecution
val rawPlan = queryExecution.executedPlan.collect {
case p: execution.PhysicalRDD => p
case p: execution.DataSourceScan => p
} match {
case Seq(p) => p
case _ => fail(s"More than one PhysicalRDD found\n$queryExecution")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.hive
import java.io.File

import org.apache.spark.sql._
import org.apache.spark.sql.execution.PhysicalRDD
import org.apache.spark.sql.execution.DataSourceScan
import org.apache.spark.sql.execution.command.ExecutedCommand
import org.apache.spark.sql.execution.datasources.{InsertIntoDataSource, InsertIntoHadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.hive.execution.HiveTableScan
Expand Down Expand Up @@ -196,7 +196,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
}.isEmpty)
assert(
sql("SELECT * FROM normal_parquet").queryExecution.sparkPlan.collect {
case _: PhysicalRDD => true
case _: DataSourceScan => true
}.nonEmpty)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.io.File
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.execution.PhysicalRDD
import org.apache.spark.sql.execution.DataSourceScan
import org.apache.spark.sql.execution.datasources.{BucketSpec, DataSourceStrategy}
import org.apache.spark.sql.execution.exchange.ShuffleExchange
import org.apache.spark.sql.execution.joins.SortMergeJoin
Expand Down Expand Up @@ -93,7 +93,7 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet

// Filter could hide the bug in bucket pruning. Thus, skipping all the filters
val plan = bucketedDataFrame.filter(filterCondition).queryExecution.executedPlan
val rdd = plan.find(_.isInstanceOf[PhysicalRDD])
val rdd = plan.find(_.isInstanceOf[DataSourceScan])
assert(rdd.isDefined, plan)

val checkedResult = rdd.get.execute().mapPartitionsWithIndex { case (index, iter) =>
Expand Down

0 comments on commit 79ff2c2

Please sign in to comment.