Skip to content

Commit

Permalink
Some additional comments + small cleanup to remove an unused parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Jul 18, 2015
1 parent a27cfc1 commit 0082515
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,8 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una

@transient private lazy val sparkConf = child.sqlContext.sparkContext.getConf

private val rowDataTypes = child.output.map(_.dataType).toArray

private val serializer: Serializer = {
val rowDataTypes = child.output.map(_.dataType).toArray
// It is true when there is no field that needs to be write out.
// For now, we will not use SparkSqlSerializer2 when noField is true.
val noField = rowDataTypes == null || rowDataTypes.length == 0
Expand Down Expand Up @@ -181,7 +180,7 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
}
}
}
new ShuffledRowRDD(rowDataTypes, rddWithPartitionIds, serializer, part.numPartitions)
new ShuffledRowRDD(rddWithPartitionIds, serializer, part.numPartitions)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ private class ShuffledRowRDDPartition(val idx: Int) extends Partition {
override def hashCode(): Int = idx
}

/**
* A dummy partitioner for use with records whose partition ids have been pre-computed (i.e. for
* use on RDDs of (Int, Row) pairs where the Int is a partition id in the expected range).
*/
private class PartitionIdPassthrough(override val numPartitions: Int) extends Partitioner {
override def getPartition(key: Any): Int = key.asInstanceOf[Int]
}
Expand All @@ -37,9 +41,13 @@ private class PartitionIdPassthrough(override val numPartitions: Int) extends Pa
* shuffling rows instead of Java key-value pairs. Note that something like this should eventually
* be implemented in Spark core, but that is blocked by some more general refactorings to shuffle
* interfaces / internals.
*
* @param prev the RDD being shuffled. Elements of this RDD are (partitionId, Row) pairs.
* Partition ids should be in the range [0, numPartitions - 1].
* @param serializer the serializer used during the shuffle.
* @param numPartitions the number of post-shuffle partitions.
*/
class ShuffledRowRDD(
rowSchema: Array[DataType],
@transient var prev: RDD[Product2[Int, InternalRow]],
serializer: Serializer,
numPartitions: Int)
Expand Down

0 comments on commit 0082515

Please sign in to comment.