Skip to content

Commit

Permalink
[SPARK-6368][SQL] Build a specialized serializer for Exchange operator.
Browse files Browse the repository at this point in the history
JIRA: https://issues.apache.org/jira/browse/SPARK-6368

Author: Yin Huai <yhuai@databricks.com>

Closes apache#5497 from yhuai/serializer2 and squashes the following commits:

da562c5 [Yin Huai] Merge remote-tracking branch 'upstream/master' into serializer2
50e0c3d [Yin Huai] When no filed is emitted to shuffle, use SparkSqlSerializer for now.
9f1ed92 [Yin Huai] Merge remote-tracking branch 'upstream/master' into serializer2
6d07678 [Yin Huai] Address comments.
4273b8c [Yin Huai] Enabled SparkSqlSerializer2.
09e587a [Yin Huai] Remove TODO.
791b96a [Yin Huai] Use UTF8String.
60a1487 [Yin Huai] Merge remote-tracking branch 'upstream/master' into serializer2
3e09655 [Yin Huai] Use getAs for Date column.
43b9fb4 [Yin Huai] Test.
8297732 [Yin Huai] Fix test.
c9373c8 [Yin Huai] Support DecimalType.
2379eeb [Yin Huai] ASF header.
39704ab [Yin Huai] Specialized serializer for Exchange.
  • Loading branch information
yhuai authored and nemccarthy committed Jun 19, 2015
1 parent 169ad3d commit 5efd83f
Show file tree
Hide file tree
Showing 4 changed files with 673 additions and 6 deletions.
4 changes: 4 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ private[spark] object SQLConf {
// Set to false when debugging requires the ability to look at invalid query plans.
val DATAFRAME_EAGER_ANALYSIS = "spark.sql.eagerAnalysis"

val USE_SQL_SERIALIZER2 = "spark.sql.useSerializer2"

object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
}
Expand Down Expand Up @@ -147,6 +149,8 @@ private[sql] class SQLConf extends Serializable {
*/
private[spark] def codegenEnabled: Boolean = getConf(CODEGEN_ENABLED, "false").toBoolean

private[spark] def useSqlSerializer2: Boolean = getConf(USE_SQL_SERIALIZER2, "true").toBoolean

/**
* Upper bound on the sizes (in bytes) of the tables qualified for the auto conversion to
* a broadcast value during the physical executions of join operations. Setting this to -1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ package org.apache.spark.sql.execution

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.{SparkEnv, HashPartitioner, RangePartitioner, SparkConf}
import org.apache.spark.{SparkEnv, HashPartitioner, RangePartitioner}
import org.apache.spark.rdd.{RDD, ShuffledRDD}
import org.apache.spark.serializer.Serializer
import org.apache.spark.sql.{SQLContext, Row}
import org.apache.spark.sql.catalyst.errors.attachTree
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.types.DataType
import org.apache.spark.util.MutablePair

object Exchange {
Expand Down Expand Up @@ -77,9 +79,48 @@ case class Exchange(
}
}

override def execute(): RDD[Row] = attachTree(this , "execute") {
lazy val sparkConf = child.sqlContext.sparkContext.getConf
@transient private lazy val sparkConf = child.sqlContext.sparkContext.getConf

def serializer(
keySchema: Array[DataType],
valueSchema: Array[DataType],
numPartitions: Int): Serializer = {
// In ExternalSorter's spillToMergeableFile function, key-value pairs are written out
// through write(key) and then write(value) instead of write((key, value)). Because
// SparkSqlSerializer2 assumes that objects passed in are Product2, we cannot safely use
// it when spillToMergeableFile in ExternalSorter will be used.
// So, we will not use SparkSqlSerializer2 when
// - Sort-based shuffle is enabled and the number of reducers (numPartitions) is greater
// then the bypassMergeThreshold; or
// - newOrdering is defined.
val cannotUseSqlSerializer2 =
(sortBasedShuffleOn && numPartitions > bypassMergeThreshold) || newOrdering.nonEmpty

// It is true when there is no field that needs to be write out.
// For now, we will not use SparkSqlSerializer2 when noField is true.
val noField =
(keySchema == null || keySchema.length == 0) &&
(valueSchema == null || valueSchema.length == 0)

val useSqlSerializer2 =
child.sqlContext.conf.useSqlSerializer2 && // SparkSqlSerializer2 is enabled.
!cannotUseSqlSerializer2 && // Safe to use Serializer2.
SparkSqlSerializer2.support(keySchema) && // The schema of key is supported.
SparkSqlSerializer2.support(valueSchema) && // The schema of value is supported.
!noField

val serializer = if (useSqlSerializer2) {
logInfo("Using SparkSqlSerializer2.")
new SparkSqlSerializer2(keySchema, valueSchema)
} else {
logInfo("Using SparkSqlSerializer.")
new SparkSqlSerializer(sparkConf)
}

serializer
}

override def execute(): RDD[Row] = attachTree(this , "execute") {
newPartitioning match {
case HashPartitioning(expressions, numPartitions) =>
// TODO: Eliminate redundant expressions in grouping key and value.
Expand Down Expand Up @@ -111,7 +152,10 @@ case class Exchange(
} else {
new ShuffledRDD[Row, Row, Row](rdd, part)
}
shuffled.setSerializer(new SparkSqlSerializer(sparkConf))
val keySchema = expressions.map(_.dataType).toArray
val valueSchema = child.output.map(_.dataType).toArray
shuffled.setSerializer(serializer(keySchema, valueSchema, numPartitions))

shuffled.map(_._2)

case RangePartitioning(sortingExpressions, numPartitions) =>
Expand All @@ -134,7 +178,9 @@ case class Exchange(
} else {
new ShuffledRDD[Row, Null, Null](rdd, part)
}
shuffled.setSerializer(new SparkSqlSerializer(sparkConf))
val keySchema = child.output.map(_.dataType).toArray
shuffled.setSerializer(serializer(keySchema, null, numPartitions))

shuffled.map(_._1)

case SinglePartition =>
Expand All @@ -152,7 +198,8 @@ case class Exchange(
}
val partitioner = new HashPartitioner(1)
val shuffled = new ShuffledRDD[Null, Row, Row](rdd, partitioner)
shuffled.setSerializer(new SparkSqlSerializer(sparkConf))
val valueSchema = child.output.map(_.dataType).toArray
shuffled.setSerializer(serializer(null, valueSchema, 1))
shuffled.map(_._2)

case _ => sys.error(s"Exchange not implemented for $newPartitioning")
Expand Down
Loading

0 comments on commit 5efd83f

Please sign in to comment.