From 7c7158bf21ee9500a871e1d8fd770ec77c5177bb Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Fri, 20 Jun 2014 15:31:53 -0700 Subject: [PATCH] Prototype of auto conversion to broadcast hash join. --- .../catalyst/plans/logical/LogicalPlan.scala | 9 ++ .../scala/org/apache/spark/sql/SQLConf.scala | 13 ++ .../org/apache/spark/sql/SQLContext.scala | 2 + .../spark/sql/execution/SparkPlan.scala | 1 - .../spark/sql/execution/SparkStrategies.scala | 116 ++++++++---------- .../spark/sql/parquet/ParquetRelation.scala | 16 ++- .../apache/spark/sql/hive/HiveContext.scala | 2 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 16 ++- .../spark/sql/hive/HiveStrategies.scala | 26 ---- 9 files changed, 103 insertions(+), 98 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 0933a31c362d8..d70ef6e826cc2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -109,3 +109,12 @@ abstract class UnaryNode extends LogicalPlan with trees.UnaryNode[LogicalPlan] { abstract class BinaryNode extends LogicalPlan with trees.BinaryNode[LogicalPlan] { self: Product => } + +/** + * A trait that can be mixed in by logical operators representing relations that could + * estimate their physical sizes. + * @tparam Ctx input (context) to the size estimator + */ +trait SizeEstimatableRelation[Ctx] { self: LogicalPlan => + def estimatedSize(context: Ctx): Long +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index b378252ba2f55..740023ab44277 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -29,9 +29,22 @@ import scala.collection.JavaConverters._ */ trait SQLConf { + /************************** Spark SQL Params/Hints ********************/ + /** Number of partitions to use for shuffle operators. */ private[spark] def numShufflePartitions: Int = get("spark.sql.shuffle.partitions", "200").toInt + /** + * Upper bound on the sizes (in bytes) of the tables qualified for the auto conversion to + * a broadcast value during the physical executions of join operations. Setting this to 0 + * effectively disables auto conversion. + * Hive setting: hive.auto.convert.join.noconditionaltask.size. + */ + private[spark] def autoConvertJoinSize: Int = + get("spark.sql.auto.convert.join.size", "10000").toInt + + /************************ SQLConf functionality methods *************/ + @transient private val settings = java.util.Collections.synchronizedMap( new java.util.HashMap[String, String]()) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 1617ec717b2e0..65db4f9290f29 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -223,6 +223,8 @@ class SQLContext(@transient val sparkContext: SparkContext) protected[sql] class SparkPlanner extends SparkStrategies { val sparkContext = self.sparkContext + val sqlContext = self + def numPartitions = self.numShufflePartitions val strategies: Seq[Strategy] = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 07967fe75e882..b758a4d13411e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -25,7 +25,6 @@ import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions.GenericRow import org.apache.spark.sql.catalyst.plans.{QueryPlan, logical} import org.apache.spark.sql.catalyst.plans.physical._ -import org.apache.spark.sql.columnar.InMemoryColumnarTableScan /** * :: DeveloperApi :: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 32d8b08452f3c..97cf0f043d9f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -17,12 +17,11 @@ package org.apache.spark.sql.execution -import org.apache.hadoop.fs.FileSystem import org.apache.spark.sql.{SQLContext, execution} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.catalyst.plans.logical.{BaseRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.{SizeEstimatableRelation, BaseRelation, LogicalPlan} import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.parquet._ import org.apache.spark.sql.columnar.{InMemoryRelation, InMemoryColumnarTableScan} @@ -30,6 +29,8 @@ import org.apache.spark.sql.columnar.{InMemoryRelation, InMemoryColumnarTableSca private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { self: SQLContext#SparkPlanner => + val sqlContext: SQLContext + object LeftSemiJoin extends Strategy with PredicateHelper { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { // Find left semi joins where at least some predicates can be evaluated by matching hash @@ -51,64 +52,70 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { * evaluated by matching hash keys. */ object HashJoin extends Strategy with PredicateHelper { - var broadcastTables: Seq[String] = - sparkContext.conf.get("spark.sql.hints.broadcastTables", "").split(",").toBuffer - def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + private[this] def broadcastHashJoin( + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + left: LogicalPlan, + right: LogicalPlan, + condition: Option[Expression], + side: BuildSide) = { + val broadcastHashJoin = execution.BroadcastHashJoin( + leftKeys, rightKeys, BuildRight, planLater(left), planLater(right))(sparkContext) + condition.map(Filter(_, broadcastHashJoin)).getOrElse(broadcastHashJoin) :: Nil + } -// case HashFilteredJoin( -// Inner, -// leftKeys, -// rightKeys, -// condition, -// left, -// right @ PhysicalOperation(_, _, b: MetastoreRelation)) -// if tableRawSizeBelowThreshold(left) => -// // TODO: these will be used -//// import org.apache.hadoop.fs.ContentSummary -//// import org.apache.hadoop.fs.FileSystem -//// import org.apache.hadoop.fs.Path -// -// FileSystem.get() -// -// val hashJoin = -// execution.BroadcastHashJoin( -// leftKeys, rightKeys, BuildRight, planLater(left), planLater(right))(sparkContext) -// condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil + def broadcastTables: Seq[String] = + sparkContext.conf.get("spark.sql.hints.broadcastTables", "").split(",").toBuffer + // TODO: how to unit test these conversions? + def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case HashFilteredJoin( - Inner, - leftKeys, - rightKeys, - condition, - left, - right @ PhysicalOperation(_, _, b: BaseRelation)) - if broadcastTables.contains(b.tableName)=> + Inner, + leftKeys, + rightKeys, + condition, + left, + right @ PhysicalOperation(_, _, b: BaseRelation)) + if broadcastTables.contains(b.tableName) => + broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight) - val hashJoin = - execution.BroadcastHashJoin( - leftKeys, rightKeys, BuildRight, planLater(left), planLater(right))(sparkContext) - condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil + case HashFilteredJoin( + Inner, + leftKeys, + rightKeys, + condition, + left @ PhysicalOperation(_, _, b: BaseRelation), + right) + if broadcastTables.contains(b.tableName) => + broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight) case HashFilteredJoin( - Inner, - leftKeys, - rightKeys, - condition, - left @ PhysicalOperation(_, _, b: BaseRelation), - right) - if broadcastTables.contains(b.tableName) => + Inner, + leftKeys, + rightKeys, + condition, + left, + right @ PhysicalOperation(_, _, b: SizeEstimatableRelation[SQLContext])) + if b.estimatedSize(sqlContext) <= sqlContext.autoConvertJoinSize => + broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight) - val hashJoin = - execution.BroadcastHashJoin( - leftKeys, rightKeys, BuildLeft, planLater(left), planLater(right))(sparkContext) - condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil + case HashFilteredJoin( + Inner, + leftKeys, + rightKeys, + condition, + left @ PhysicalOperation(_, _, b: SizeEstimatableRelation[SQLContext]), + right) + if b.estimatedSize(sqlContext) <= sqlContext.autoConvertJoinSize => + broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildLeft) case HashFilteredJoin(Inner, leftKeys, rightKeys, condition, left, right) => val hashJoin = execution.ShuffledHashJoin( leftKeys, rightKeys, BuildRight, planLater(left), planLater(right)) condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil + case _ => Nil } } @@ -167,25 +174,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } } -// // FIXME(zongheng): WIP -// object AutoBroadcastHashJoin extends Strategy { -// def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { -// case logical.Join(left, right, joinType, condition) => -// -// execution.BroadcastHashJoin() -// -// execution.BroadcastNestedLoopJoin( -// planLater(left), planLater(right), joinType, condition)(sparkContext) :: Nil -// case _ => Nil -// } -// } - object BroadcastNestedLoopJoin extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - - // FIXME: WIP -- auto broadcast hash join - case logical.Join - case logical.Join(left, right, joinType, condition) => execution.BroadcastNestedLoopJoin( planLater(left), planLater(right), joinType, condition)(sparkContext) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala index 32813a66de3c3..f6ae1ddd1e647 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala @@ -32,9 +32,10 @@ import parquet.schema.{Type => ParquetType, PrimitiveType => ParquetPrimitiveTyp import parquet.schema.PrimitiveType.{PrimitiveTypeName => ParquetPrimitiveTypeName} import parquet.schema.Type.Repetition +import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, UnresolvedException} import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Row} -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LeafNode} +import org.apache.spark.sql.catalyst.plans.logical.{SizeEstimatableRelation, LogicalPlan, LeafNode} import org.apache.spark.sql.catalyst.types._ // Implicits @@ -52,10 +53,19 @@ import scala.collection.JavaConversions._ * * @param path The path to the Parquet file. */ -private[sql] case class ParquetRelation(val path: String) - extends LeafNode with MultiInstanceRelation { +private[sql] case class ParquetRelation(path: String) + extends LeafNode + with MultiInstanceRelation + with SizeEstimatableRelation[SQLContext] { self: Product => + def estimatedSize(context: SQLContext): Long = { + // TODO: right config? + val hdfsPath = new Path(path) + val fs = hdfsPath.getFileSystem(context.sparkContext.hadoopConfiguration) + fs.getContentSummary(hdfsPath).getLength // TODO: in bytes or system-dependent? + } + /** Schema derived from ParquetFile */ def parquetSchema: MessageType = ParquetTypesConverter diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index cc95b7af0abf6..bf084584d41dd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -257,7 +257,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { struct.zip(fields).map { case (v, t) => s""""${t.name}":${toHiveStructString(v, t.dataType)}""" }.mkString("{", ",", "}") - case (seq: Seq[_], ArrayType(typ))=> + case (seq: Seq[_], ArrayType(typ)) => seq.map(v => (v, typ)).map(toHiveStructString).mkString("[", ",", "]") case (map: Map[_,_], MapType(kType, vType)) => map.map { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 68284344afd55..af513dff189a6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -34,9 +34,8 @@ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.types._ -import org.apache.spark.sql.execution.SparkLogicalPlan -import org.apache.spark.sql.hive.execution.{HiveTableScan, InsertIntoHiveTable} -import org.apache.spark.sql.columnar.{InMemoryRelation, InMemoryColumnarTableScan} +import org.apache.spark.sql.columnar.InMemoryRelation +import org.apache.spark.sql.hive.execution.HiveTableScan /* Implicit conversions */ import scala.collection.JavaConversions._ @@ -241,16 +240,25 @@ object HiveMetastoreTypes extends RegexParsers { } } + + private[hive] case class MetastoreRelation (databaseName: String, tableName: String, alias: Option[String]) (val table: TTable, val partitions: Seq[TPartition]) - extends BaseRelation { + extends BaseRelation + with SizeEstimatableRelation[HiveContext] { // TODO: Can we use org.apache.hadoop.hive.ql.metadata.Table as the type of table and // use org.apache.hadoop.hive.ql.metadata.Partition as the type of elements of partitions. // Right now, using org.apache.hadoop.hive.ql.metadata.Table and // org.apache.hadoop.hive.ql.metadata.Partition will cause a NotSerializableException // which indicates the SerDe we used is not Serializable. + def estimatedSize(context: HiveContext): Long = { + val path = hiveQlTable.getPath + val fs = path.getFileSystem(context.hiveconf) // TODO: or sc.hadoopConfiguration? + fs.getContentSummary(path).getLength // TODO: in bytes or system-dependent? + } + def hiveQlTable = new Table(table) def hiveQlPartitions = partitions.map { p => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index ada606c3a94a3..0ac0ee9071f36 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -17,9 +17,6 @@ package org.apache.spark.sql.hive -import org.apache.hadoop.fs.FileSystem - -import org.apache.spark.sql import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ @@ -35,29 +32,6 @@ private[hive] trait HiveStrategies { val hiveContext: HiveContext - // FIXME(zongheng): WIP - object HashJoin extends Strategy { - def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case HashFilteredJoin( - Inner, - leftKeys, - rightKeys, - condition, - left, - right @ PhysicalOperation(_, _, b: MetastoreRelation)) => - - val path = b.hiveQlTable.getPath - val fs = path.getFileSystem(hiveContext.hiveconf) - val size = fs.getContentSummary(path).getLength // TODO: in bytes? - - - val hashJoin = - sql.execution.BroadcastHashJoin( - leftKeys, rightKeys, BuildRight, planLater(left), planLater(right))(sparkContext) - condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil - } - } - object Scripts extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case logical.ScriptTransformation(input, script, output, child) =>