Skip to content

Commit

Permalink
Fix for copying logic
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Jul 17, 2015
1 parent 035af21 commit dd9c66d
Showing 1 changed file with 12 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,18 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
case RangePartitioning(_, _) | SinglePartition => identity
case _ => sys.error(s"Exchange not implemented for $newPartitioning")
}
val needsCopy = needToCopyObjectsBeforeShuffle(part, serializer)
val rddWithPartitionIds: RDD[Product2[Int, InternalRow]] = rdd.mapPartitions { iter =>
val getPartitionKey = getPartitionKeyExtractor()
val mutablePair = new MutablePair[Int, InternalRow]()
iter.map { r =>
mutablePair.update(part.getPartition(getPartitionKey(r)), if (needsCopy) r.copy() else r)
val rddWithPartitionIds: RDD[Product2[Int, InternalRow]] = {
if (needToCopyObjectsBeforeShuffle(part, serializer)) {
rdd.mapPartitions { iter =>
val getPartitionKey = getPartitionKeyExtractor()
iter.map { row => (part.getPartition(getPartitionKey(row)), row.copy()) }
}
} else {
rdd.mapPartitions { iter =>
val getPartitionKey = getPartitionKeyExtractor()
val mutablePair = new MutablePair[Int, InternalRow]()
iter.map { row => mutablePair.update(part.getPartition(getPartitionKey(row)), row) }
}
}
}
new ShuffledRowRDD(rowDataTypes, rddWithPartitionIds, serializer, part.numPartitions)
Expand Down

0 comments on commit dd9c66d

Please sign in to comment.