From 79ff2c2d6d01909c31308a7fd6ee3fb4e9e9bd81 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Sat, 12 Mar 2016 00:48:36 -0800 Subject: [PATCH] [SPARK-13671] [SPARK-13311] [SQL] Use different physical plans for RDD and data sources ## What changes were proposed in this pull request? This PR split the PhysicalRDD into two classes, PhysicalRDD and PhysicalScan. PhysicalRDD is used for DataFrames that is created from existing RDD. PhysicalScan is used for DataFrame that is created from data sources. This enable use to apply different optimization on both of them. Also fix the problem for sameResult() on two DataSourceScan. Also fix the equality check to toString for `In`. It's better to use Seq there, but we can't break this public API (sad). ## How was this patch tested? Existing tests. Manually tested with TPCDS query Q59 and Q64, all those duplicated exchanges can be re-used now, also saw there are 40+% performance improvement (saving half of the scan). Author: Davies Liu Closes #11514 from davies/existing_rdd. --- python/pyspark/sql/dataframe.py | 3 +- .../spark/sql/catalyst/plans/QueryPlan.scala | 12 +- .../spark/sql/execution/ExistingRDD.scala | 107 +++++++++++------- .../sql/execution/WholeStageCodegen.scala | 1 + .../datasources/DataSourceStrategy.scala | 8 +- .../sql/execution/ui/SparkPlanGraph.scala | 6 +- .../apache/spark/sql/sources/filters.scala | 19 +++- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 6 +- .../spark/sql/sources/FilteredScanSuite.scala | 2 +- .../spark/sql/sources/PrunedScanSuite.scala | 2 +- .../apache/spark/sql/hive/parquetSuites.scala | 4 +- .../spark/sql/sources/BucketedReadSuite.scala | 4 +- 12 files changed, 110 insertions(+), 64 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 99d665fafec89..7008e8fadffc3 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -173,8 +173,7 @@ def explain(self, extended=False): >>> df.explain() == Physical Plan == - WholeStageCodegen - : +- Scan ExistingRDD[age#0,name#1] + Scan ExistingRDD[age#0,name#1] >>> df.explain(True) == Parsed Logical Plan == diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index c222571a3464b..920e989d058dc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -280,12 +280,12 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT * can do better should override this function. */ def sameResult(plan: PlanType): Boolean = { - val canonicalizedLeft = this.canonicalized - val canonicalizedRight = plan.canonicalized - canonicalizedLeft.getClass == canonicalizedRight.getClass && - canonicalizedLeft.children.size == canonicalizedRight.children.size && - canonicalizedLeft.cleanArgs == canonicalizedRight.cleanArgs && - (canonicalizedLeft.children, canonicalizedRight.children).zipped.forall(_ sameResult _) + val left = this.canonicalized + val right = plan.canonicalized + left.getClass == right.getClass && + left.children.size == right.children.size && + left.cleanArgs == right.cleanArgs && + (left.children, right.children).zipped.forall(_ sameResult _) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 3662ed74d2eae..d363cb000d39a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -101,17 +101,76 @@ private[sql] case class LogicalRDD( private[sql] case class PhysicalRDD( output: Seq[Attribute], rdd: RDD[InternalRow], - override val nodeName: String, - override val metadata: Map[String, String] = Map.empty, - isUnsafeRow: Boolean = false, - override val outputPartitioning: Partitioning = UnknownPartitioning(0)) + override val nodeName: String) extends LeafNode { + + private[sql] override lazy val metrics = Map( + "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) + + protected override def doExecute(): RDD[InternalRow] = { + val numOutputRows = longMetric("numOutputRows") + rdd.mapPartitionsInternal { iter => + val proj = UnsafeProjection.create(schema) + iter.map { r => + numOutputRows += 1 + proj(r) + } + } + } + + override def simpleString: String = { + s"Scan $nodeName${output.mkString("[", ",", "]")}" + } +} + +/** Physical plan node for scanning data from a relation. */ +private[sql] case class DataSourceScan( + output: Seq[Attribute], + rdd: RDD[InternalRow], + @transient relation: BaseRelation, + override val metadata: Map[String, String] = Map.empty) extends LeafNode with CodegenSupport { + override val nodeName: String = relation.toString + + // Ignore rdd when checking results + override def sameResult(plan: SparkPlan ): Boolean = plan match { + case other: DataSourceScan => relation == other.relation && metadata == other.metadata + case _ => false + } + private[sql] override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) + val outputUnsafeRows = relation match { + case r: HadoopFsRelation if r.fileFormat.isInstanceOf[ParquetSource] => + !SQLContext.getActive().get.conf.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED) + case _: HadoopFsRelation => true + case _ => false + } + + override val outputPartitioning = { + val bucketSpec = relation match { + // TODO: this should be closer to bucket planning. + case r: HadoopFsRelation if r.sqlContext.conf.bucketingEnabled() => r.bucketSpec + case _ => None + } + + def toAttribute(colName: String): Attribute = output.find(_.name == colName).getOrElse { + throw new AnalysisException(s"bucket column $colName not found in existing columns " + + s"(${output.map(_.name).mkString(", ")})") + } + + bucketSpec.map { spec => + val numBuckets = spec.numBuckets + val bucketColumns = spec.bucketColumnNames.map(toAttribute) + HashPartitioning(bucketColumns, numBuckets) + }.getOrElse { + UnknownPartitioning(0) + } + } + protected override def doExecute(): RDD[InternalRow] = { - val unsafeRow = if (isUnsafeRow) { + val unsafeRow = if (outputUnsafeRows) { rdd } else { rdd.mapPartitionsInternal { iter => @@ -187,7 +246,7 @@ private[sql] case class PhysicalRDD( ctx.INPUT_ROW = row ctx.currentVars = null val columns2 = exprs.map(_.gen(ctx)) - val inputRow = if (isUnsafeRow) row else null + val inputRow = if (outputUnsafeRows) row else null val scanRows = ctx.freshName("processRows") ctx.addNewFunction(scanRows, s""" @@ -221,42 +280,8 @@ private[sql] case class PhysicalRDD( } } -private[sql] object PhysicalRDD { +private[sql] object DataSourceScan { // Metadata keys val INPUT_PATHS = "InputPaths" val PUSHED_FILTERS = "PushedFilters" - - def createFromDataSource( - output: Seq[Attribute], - rdd: RDD[InternalRow], - relation: BaseRelation, - metadata: Map[String, String] = Map.empty): PhysicalRDD = { - - val outputUnsafeRows = relation match { - case r: HadoopFsRelation if r.fileFormat.isInstanceOf[ParquetSource] => - !SQLContext.getActive().get.conf.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED) - case _: HadoopFsRelation => true - case _ => false - } - - val bucketSpec = relation match { - // TODO: this should be closer to bucket planning. - case r: HadoopFsRelation if r.sqlContext.conf.bucketingEnabled() => r.bucketSpec - case _ => None - } - - def toAttribute(colName: String): Attribute = output.find(_.name == colName).getOrElse { - throw new AnalysisException(s"bucket column $colName not found in existing columns " + - s"(${output.map(_.name).mkString(", ")})") - } - - bucketSpec.map { spec => - val numBuckets = spec.numBuckets - val bucketColumns = spec.bucketColumnNames.map(toAttribute) - val partitioning = HashPartitioning(bucketColumns, numBuckets) - PhysicalRDD(output, rdd, relation.toString, metadata, outputUnsafeRows, partitioning) - }.getOrElse { - PhysicalRDD(output, rdd, relation.toString, metadata, outputUnsafeRows) - } - } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala index 52c2971b73f1b..8fb4705581a38 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala @@ -41,6 +41,7 @@ trait CodegenSupport extends SparkPlan { case _: BroadcastHashJoin => "bhj" case _: SortMergeJoin => "smj" case _: PhysicalRDD => "rdd" + case _: DataSourceScan => "scan" case _ => nodeName.toLowerCase } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 2944a8f86f169..1adf3b6676555 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.PhysicalRDD.{INPUT_PATHS, PUSHED_FILTERS} +import org.apache.spark.sql.execution.DataSourceScan.{INPUT_PATHS, PUSHED_FILTERS} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.ExecutedCommand import org.apache.spark.sql.execution.vectorized.{ColumnarBatch, ColumnVectorUtils} @@ -239,7 +239,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { } case l @ LogicalRelation(baseRelation: TableScan, _, _) => - execution.PhysicalRDD.createFromDataSource( + execution.DataSourceScan( l.output, toCatalystRDD(l, baseRelation.buildScan()), baseRelation) :: Nil case i @ logical.InsertIntoTable(l @ LogicalRelation(t: InsertableRelation, _, _), @@ -639,7 +639,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { // Don't request columns that are only referenced by pushed filters. .filterNot(handledSet.contains) - val scan = execution.PhysicalRDD.createFromDataSource( + val scan = execution.DataSourceScan( projects.map(_.toAttribute), scanBuilder(requestedColumns, candidatePredicates, pushedFilters), relation.relation, metadata) @@ -649,7 +649,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { val requestedColumns = (projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq - val scan = execution.PhysicalRDD.createFromDataSource( + val scan = execution.DataSourceScan( requestedColumns, scanBuilder(requestedColumns, candidatePredicates, pushedFilters), relation.relation, metadata) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala index 94d318e702789..8a36d3224003a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala @@ -93,6 +93,10 @@ private[sql] object SparkPlanGraph { case "Subquery" if subgraph != null => // Subquery should not be included in WholeStageCodegen buildSparkPlanGraphNode(planInfo, nodeIdGenerator, nodes, edges, parent, null, exchanges) + case "ReusedExchange" => + // Point to the re-used exchange + val node = exchanges(planInfo.children.head) + edges += SparkPlanGraphEdge(node.id, parent.id) case name => val metrics = planInfo.metrics.map { metric => SQLPlanMetric(metric.name, metric.accumulatorId, @@ -106,7 +110,7 @@ private[sql] object SparkPlanGraph { } else { subgraph.nodes += node } - if (name == "ShuffleExchange" || name == "BroadcastExchange") { + if (name.contains("Exchange")) { exchanges += planInfo -> node } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala index 3780cbbcc9631..9130e77ea5724 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala @@ -82,7 +82,24 @@ case class LessThanOrEqual(attribute: String, value: Any) extends Filter * * @since 1.3.0 */ -case class In(attribute: String, values: Array[Any]) extends Filter +case class In(attribute: String, values: Array[Any]) extends Filter { + override def hashCode(): Int = { + var h = attribute.hashCode + values.foreach { v => + h *= 41 + h += v.hashCode() + } + h + } + override def equals(o: Any): Boolean = o match { + case In(a, vs) => + a == attribute && vs.length == values.length && vs.zip(values).forall(x => x._1 == x._2) + case _ => false + } + override def toString: String = { + s"In($attribute, [${values.mkString(",")}]" + } +} /** * A filter that evaluates to `true` iff the attribute evaluates to null. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index dfffa582120cc..1ef517324d7cb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -26,7 +26,7 @@ import org.scalatest.{BeforeAndAfter, PrivateMethodTester} import org.apache.spark.SparkFunSuite import org.apache.spark.sql.{DataFrame, Row} -import org.apache.spark.sql.execution.PhysicalRDD +import org.apache.spark.sql.execution.DataSourceScan import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD @@ -210,8 +210,8 @@ class JDBCSuite extends SparkFunSuite // the plan only has PhysicalRDD to scan JDBCRelation. assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen]) val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen] - assert(node.child.isInstanceOf[org.apache.spark.sql.execution.PhysicalRDD]) - assert(node.child.asInstanceOf[PhysicalRDD].nodeName.contains("JDBCRelation")) + assert(node.child.isInstanceOf[org.apache.spark.sql.execution.DataSourceScan]) + assert(node.child.asInstanceOf[DataSourceScan].nodeName.contains("JDBCRelation")) df } assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID < 1")).collect().size == 0) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala index 2ff79a2316bdc..19e34b45bff67 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala @@ -312,7 +312,7 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic try { val queryExecution = sql(sqlString).queryExecution val rawPlan = queryExecution.executedPlan.collect { - case p: execution.PhysicalRDD => p + case p: execution.DataSourceScan => p } match { case Seq(p) => p case _ => fail(s"More than one PhysicalRDD found\n$queryExecution") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala index db722975379a2..62f991fc5dc61 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala @@ -124,7 +124,7 @@ class PrunedScanSuite extends DataSourceTest with SharedSQLContext { try { val queryExecution = sql(sqlString).queryExecution val rawPlan = queryExecution.executedPlan.collect { - case p: execution.PhysicalRDD => p + case p: execution.DataSourceScan => p } match { case Seq(p) => p case _ => fail(s"More than one PhysicalRDD found\n$queryExecution") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index a0f09d6c4a36e..8fdbbd94c807e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.hive import java.io.File import org.apache.spark.sql._ -import org.apache.spark.sql.execution.PhysicalRDD +import org.apache.spark.sql.execution.DataSourceScan import org.apache.spark.sql.execution.command.ExecutedCommand import org.apache.spark.sql.execution.datasources.{InsertIntoDataSource, InsertIntoHadoopFsRelation, LogicalRelation} import org.apache.spark.sql.hive.execution.HiveTableScan @@ -196,7 +196,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { }.isEmpty) assert( sql("SELECT * FROM normal_parquet").queryExecution.sparkPlan.collect { - case _: PhysicalRDD => true + case _: DataSourceScan => true }.nonEmpty) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 35573f62dc633..a0be55cfba94c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -22,7 +22,7 @@ import java.io.File import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning -import org.apache.spark.sql.execution.PhysicalRDD +import org.apache.spark.sql.execution.DataSourceScan import org.apache.spark.sql.execution.datasources.{BucketSpec, DataSourceStrategy} import org.apache.spark.sql.execution.exchange.ShuffleExchange import org.apache.spark.sql.execution.joins.SortMergeJoin @@ -93,7 +93,7 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet // Filter could hide the bug in bucket pruning. Thus, skipping all the filters val plan = bucketedDataFrame.filter(filterCondition).queryExecution.executedPlan - val rdd = plan.find(_.isInstanceOf[PhysicalRDD]) + val rdd = plan.find(_.isInstanceOf[DataSourceScan]) assert(rdd.isDefined, plan) val checkedResult = rdd.get.execute().mapPartitionsWithIndex { case (index, iter) =>