Skip to content

Commit

Permalink
Fix bug
Browse files Browse the repository at this point in the history
  • Loading branch information
marmbrus committed Aug 21, 2014
1 parent e0f9462 commit 2ff8114
Showing 1 changed file with 6 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,15 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
case HashPartitioning(expressions, numPartitions) =>
// TODO: Eliminate redundant expressions in grouping key and value.
val rdd = child.execute().mapPartitions { iter =>
@transient val hashExpressions =
newMutableProjection(expressions, child.output)()

if (sortBasedShuffleOn) {
@transient val hashExpressions =
newProjection(expressions, child.output)

iter.map(r => (hashExpressions(r), r.copy()))
} else {
@transient val hashExpressions =
newMutableProjection(expressions, child.output)()

val mutablePair = new MutablePair[Row, Row]()
iter.map(r => mutablePair.update(hashExpressions(r), r))
}
Expand Down

0 comments on commit 2ff8114

Please sign in to comment.