Skip to content

Commit

Permalink
Prototype of auto conversion to broadcast hash join.
Browse files Browse the repository at this point in the history
  • Loading branch information
concretevitamin committed Jun 20, 2014
1 parent 0ad122f commit 7c7158b
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,12 @@ abstract class UnaryNode extends LogicalPlan with trees.UnaryNode[LogicalPlan] {
abstract class BinaryNode extends LogicalPlan with trees.BinaryNode[LogicalPlan] {
self: Product =>
}

/**
* A trait that can be mixed in by logical operators representing relations that could
* estimate their physical sizes.
* @tparam Ctx input (context) to the size estimator
*/
trait SizeEstimatableRelation[Ctx] { self: LogicalPlan =>
def estimatedSize(context: Ctx): Long
}
13 changes: 13 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,22 @@ import scala.collection.JavaConverters._
*/
trait SQLConf {

/************************** Spark SQL Params/Hints ********************/

/** Number of partitions to use for shuffle operators. */
private[spark] def numShufflePartitions: Int = get("spark.sql.shuffle.partitions", "200").toInt

/**
* Upper bound on the sizes (in bytes) of the tables qualified for the auto conversion to
* a broadcast value during the physical executions of join operations. Setting this to 0
* effectively disables auto conversion.
* Hive setting: hive.auto.convert.join.noconditionaltask.size.
*/
private[spark] def autoConvertJoinSize: Int =
get("spark.sql.auto.convert.join.size", "10000").toInt

/************************ SQLConf functionality methods *************/

@transient
private val settings = java.util.Collections.synchronizedMap(
new java.util.HashMap[String, String]())
Expand Down
2 changes: 2 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
protected[sql] class SparkPlanner extends SparkStrategies {
val sparkContext = self.sparkContext

val sqlContext = self

def numPartitions = self.numShufflePartitions

val strategies: Seq[Strategy] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.catalyst.plans.{QueryPlan, logical}
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.columnar.InMemoryColumnarTableScan

/**
* :: DeveloperApi ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,20 @@

package org.apache.spark.sql.execution

import org.apache.hadoop.fs.FileSystem
import org.apache.spark.sql.{SQLContext, execution}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.{BaseRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.{SizeEstimatableRelation, BaseRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.parquet._
import org.apache.spark.sql.columnar.{InMemoryRelation, InMemoryColumnarTableScan}

private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
self: SQLContext#SparkPlanner =>

val sqlContext: SQLContext

object LeftSemiJoin extends Strategy with PredicateHelper {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
// Find left semi joins where at least some predicates can be evaluated by matching hash
Expand All @@ -51,64 +52,70 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
* evaluated by matching hash keys.
*/
object HashJoin extends Strategy with PredicateHelper {
var broadcastTables: Seq[String] =
sparkContext.conf.get("spark.sql.hints.broadcastTables", "").split(",").toBuffer

def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
private[this] def broadcastHashJoin(
leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
left: LogicalPlan,
right: LogicalPlan,
condition: Option[Expression],
side: BuildSide) = {
val broadcastHashJoin = execution.BroadcastHashJoin(
leftKeys, rightKeys, BuildRight, planLater(left), planLater(right))(sparkContext)
condition.map(Filter(_, broadcastHashJoin)).getOrElse(broadcastHashJoin) :: Nil
}

// case HashFilteredJoin(
// Inner,
// leftKeys,
// rightKeys,
// condition,
// left,
// right @ PhysicalOperation(_, _, b: MetastoreRelation))
// if tableRawSizeBelowThreshold(left) =>
// // TODO: these will be used
//// import org.apache.hadoop.fs.ContentSummary
//// import org.apache.hadoop.fs.FileSystem
//// import org.apache.hadoop.fs.Path
//
// FileSystem.get()
//
// val hashJoin =
// execution.BroadcastHashJoin(
// leftKeys, rightKeys, BuildRight, planLater(left), planLater(right))(sparkContext)
// condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil
def broadcastTables: Seq[String] =
sparkContext.conf.get("spark.sql.hints.broadcastTables", "").split(",").toBuffer

// TODO: how to unit test these conversions?
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case HashFilteredJoin(
Inner,
leftKeys,
rightKeys,
condition,
left,
right @ PhysicalOperation(_, _, b: BaseRelation))
if broadcastTables.contains(b.tableName)=>
Inner,
leftKeys,
rightKeys,
condition,
left,
right @ PhysicalOperation(_, _, b: BaseRelation))
if broadcastTables.contains(b.tableName) =>
broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight)

val hashJoin =
execution.BroadcastHashJoin(
leftKeys, rightKeys, BuildRight, planLater(left), planLater(right))(sparkContext)
condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil
case HashFilteredJoin(
Inner,
leftKeys,
rightKeys,
condition,
left @ PhysicalOperation(_, _, b: BaseRelation),
right)
if broadcastTables.contains(b.tableName) =>
broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight)

case HashFilteredJoin(
Inner,
leftKeys,
rightKeys,
condition,
left @ PhysicalOperation(_, _, b: BaseRelation),
right)
if broadcastTables.contains(b.tableName) =>
Inner,
leftKeys,
rightKeys,
condition,
left,
right @ PhysicalOperation(_, _, b: SizeEstimatableRelation[SQLContext]))
if b.estimatedSize(sqlContext) <= sqlContext.autoConvertJoinSize =>
broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight)

val hashJoin =
execution.BroadcastHashJoin(
leftKeys, rightKeys, BuildLeft, planLater(left), planLater(right))(sparkContext)
condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil
case HashFilteredJoin(
Inner,
leftKeys,
rightKeys,
condition,
left @ PhysicalOperation(_, _, b: SizeEstimatableRelation[SQLContext]),
right)
if b.estimatedSize(sqlContext) <= sqlContext.autoConvertJoinSize =>
broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildLeft)

case HashFilteredJoin(Inner, leftKeys, rightKeys, condition, left, right) =>
val hashJoin =
execution.ShuffledHashJoin(
leftKeys, rightKeys, BuildRight, planLater(left), planLater(right))
condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil

case _ => Nil
}
}
Expand Down Expand Up @@ -167,25 +174,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}
}

// // FIXME(zongheng): WIP
// object AutoBroadcastHashJoin extends Strategy {
// def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
// case logical.Join(left, right, joinType, condition) =>
//
// execution.BroadcastHashJoin()
//
// execution.BroadcastNestedLoopJoin(
// planLater(left), planLater(right), joinType, condition)(sparkContext) :: Nil
// case _ => Nil
// }
// }

object BroadcastNestedLoopJoin extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {

// FIXME: WIP -- auto broadcast hash join
case logical.Join

case logical.Join(left, right, joinType, condition) =>
execution.BroadcastNestedLoopJoin(
planLater(left), planLater(right), joinType, condition)(sparkContext) :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ import parquet.schema.{Type => ParquetType, PrimitiveType => ParquetPrimitiveTyp
import parquet.schema.PrimitiveType.{PrimitiveTypeName => ParquetPrimitiveTypeName}
import parquet.schema.Type.Repetition

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, UnresolvedException}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Row}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LeafNode}
import org.apache.spark.sql.catalyst.plans.logical.{SizeEstimatableRelation, LogicalPlan, LeafNode}
import org.apache.spark.sql.catalyst.types._

// Implicits
Expand All @@ -52,10 +53,19 @@ import scala.collection.JavaConversions._
*
* @param path The path to the Parquet file.
*/
private[sql] case class ParquetRelation(val path: String)
extends LeafNode with MultiInstanceRelation {
private[sql] case class ParquetRelation(path: String)
extends LeafNode
with MultiInstanceRelation
with SizeEstimatableRelation[SQLContext] {
self: Product =>

def estimatedSize(context: SQLContext): Long = {
// TODO: right config?
val hdfsPath = new Path(path)
val fs = hdfsPath.getFileSystem(context.sparkContext.hadoopConfiguration)
fs.getContentSummary(hdfsPath).getLength // TODO: in bytes or system-dependent?
}

/** Schema derived from ParquetFile */
def parquetSchema: MessageType =
ParquetTypesConverter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
struct.zip(fields).map {
case (v, t) => s""""${t.name}":${toHiveStructString(v, t.dataType)}"""
}.mkString("{", ",", "}")
case (seq: Seq[_], ArrayType(typ))=>
case (seq: Seq[_], ArrayType(typ)) =>
seq.map(v => (v, typ)).map(toHiveStructString).mkString("[", ",", "]")
case (map: Map[_,_], MapType(kType, vType)) =>
map.map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.execution.SparkLogicalPlan
import org.apache.spark.sql.hive.execution.{HiveTableScan, InsertIntoHiveTable}
import org.apache.spark.sql.columnar.{InMemoryRelation, InMemoryColumnarTableScan}
import org.apache.spark.sql.columnar.InMemoryRelation
import org.apache.spark.sql.hive.execution.HiveTableScan

/* Implicit conversions */
import scala.collection.JavaConversions._
Expand Down Expand Up @@ -241,16 +240,25 @@ object HiveMetastoreTypes extends RegexParsers {
}
}



private[hive] case class MetastoreRelation
(databaseName: String, tableName: String, alias: Option[String])
(val table: TTable, val partitions: Seq[TPartition])
extends BaseRelation {
extends BaseRelation
with SizeEstimatableRelation[HiveContext] {
// TODO: Can we use org.apache.hadoop.hive.ql.metadata.Table as the type of table and
// use org.apache.hadoop.hive.ql.metadata.Partition as the type of elements of partitions.
// Right now, using org.apache.hadoop.hive.ql.metadata.Table and
// org.apache.hadoop.hive.ql.metadata.Partition will cause a NotSerializableException
// which indicates the SerDe we used is not Serializable.

def estimatedSize(context: HiveContext): Long = {
val path = hiveQlTable.getPath
val fs = path.getFileSystem(context.hiveconf) // TODO: or sc.hadoopConfiguration?
fs.getContentSummary(path).getLength // TODO: in bytes or system-dependent?
}

def hiveQlTable = new Table(table)

def hiveQlPartitions = partitions.map { p =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@

package org.apache.spark.sql.hive

import org.apache.hadoop.fs.FileSystem

import org.apache.spark.sql
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
Expand All @@ -35,29 +32,6 @@ private[hive] trait HiveStrategies {

val hiveContext: HiveContext

// FIXME(zongheng): WIP
object HashJoin extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case HashFilteredJoin(
Inner,
leftKeys,
rightKeys,
condition,
left,
right @ PhysicalOperation(_, _, b: MetastoreRelation)) =>

val path = b.hiveQlTable.getPath
val fs = path.getFileSystem(hiveContext.hiveconf)
val size = fs.getContentSummary(path).getLength // TODO: in bytes?


val hashJoin =
sql.execution.BroadcastHashJoin(
leftKeys, rightKeys, BuildRight, planLater(left), planLater(right))(sparkContext)
condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil
}
}

object Scripts extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.ScriptTransformation(input, script, output, child) =>
Expand Down

0 comments on commit 7c7158b

Please sign in to comment.