From 00825153fab88c183b16beeeb83e181846f37518 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sat, 18 Jul 2015 15:13:30 -0700 Subject: [PATCH] Some additional comments + small cleanup to remove an unused parameter --- .../org/apache/spark/sql/execution/Exchange.scala | 5 ++--- .../apache/spark/sql/execution/ShuffledRowRDD.scala | 10 +++++++++- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index d30dc027722df..2750053594f99 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -117,9 +117,8 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una @transient private lazy val sparkConf = child.sqlContext.sparkContext.getConf - private val rowDataTypes = child.output.map(_.dataType).toArray - private val serializer: Serializer = { + val rowDataTypes = child.output.map(_.dataType).toArray // It is true when there is no field that needs to be write out. // For now, we will not use SparkSqlSerializer2 when noField is true. val noField = rowDataTypes == null || rowDataTypes.length == 0 @@ -181,7 +180,7 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una } } } - new ShuffledRowRDD(rowDataTypes, rddWithPartitionIds, serializer, part.numPartitions) + new ShuffledRowRDD(rddWithPartitionIds, serializer, part.numPartitions) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala index 79b681baa4f3c..88f5b13c8f248 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala @@ -28,6 +28,10 @@ private class ShuffledRowRDDPartition(val idx: Int) extends Partition { override def hashCode(): Int = idx } +/** + * A dummy partitioner for use with records whose partition ids have been pre-computed (i.e. for + * use on RDDs of (Int, Row) pairs where the Int is a partition id in the expected range). + */ private class PartitionIdPassthrough(override val numPartitions: Int) extends Partitioner { override def getPartition(key: Any): Int = key.asInstanceOf[Int] } @@ -37,9 +41,13 @@ private class PartitionIdPassthrough(override val numPartitions: Int) extends Pa * shuffling rows instead of Java key-value pairs. Note that something like this should eventually * be implemented in Spark core, but that is blocked by some more general refactorings to shuffle * interfaces / internals. + * + * @param prev the RDD being shuffled. Elements of this RDD are (partitionId, Row) pairs. + * Partition ids should be in the range [0, numPartitions - 1]. + * @param serializer the serializer used during the shuffle. + * @param numPartitions the number of post-shuffle partitions. */ class ShuffledRowRDD( - rowSchema: Array[DataType], @transient var prev: RDD[Product2[Int, InternalRow]], serializer: Serializer, numPartitions: Int)