Skip to content

Commit

Permalink
Introduce MAX_SHUFFLE_OUTPUT_PARTITIONS.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed May 12, 2015
1 parent e58a6b4 commit e995d1a
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ final class PackedRecordPointer {

static final int MAXIMUM_PAGE_SIZE_BYTES = 1 << 27; // 128 megabytes

/**
* The maximum partition identifier that can be encoded. Note that partition ids start from 0.
*/
static final int MAXIMUM_PARTITION_ID = (1 << 24) - 1; // 16777215

/** Bit mask for the lower 40 bits of a long. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ private class UnsafeShuffleHandle[K, V](
}

private[spark] object UnsafeShuffleManager extends Logging {

/**
* The maximum number of shuffle output partitions that UnsafeShuffleManager supports.
*/
val MAX_SHUFFLE_OUTPUT_PARTITIONS = PackedRecordPointer.MAXIMUM_PARTITION_ID + 1

/**
* Helper method for determining whether a shuffle should use the optimized unsafe shuffle
* path or whether it should fall back to the original sort-based shuffle.
Expand All @@ -50,9 +56,9 @@ private[spark] object UnsafeShuffleManager extends Logging {
} else if (dependency.keyOrdering.isDefined) {
log.debug(s"Can't use UnsafeShuffle for shuffle $shufId because a key ordering is defined")
false
} else if (dependency.partitioner.numPartitions > PackedRecordPointer.MAXIMUM_PARTITION_ID) {
} else if (dependency.partitioner.numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS) {
log.debug(s"Can't use UnsafeShuffle for shuffle $shufId because it has more than " +
s"${PackedRecordPointer.MAXIMUM_PARTITION_ID} partitions")
s"$MAX_SHUFFLE_OUTPUT_PARTITIONS partitions")
false
} else {
log.debug(s"Can use UnsafeShuffle for shuffle $shufId")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class UnsafeShuffleManagerSuite extends FunSuite with Matchers {

// We do not support shuffles with more than 16 million output partitions
assert(!canUseUnsafeShuffle(shuffleDep(
partitioner = new HashPartitioner(PackedRecordPointer.MAXIMUM_PARTITION_ID + 1),
partitioner = new HashPartitioner(UnsafeShuffleManager.MAX_SHUFFLE_OUTPUT_PARTITIONS + 1),
serializer = kryo,
keyOrdering = None,
aggregator = None,
Expand Down

0 comments on commit e995d1a

Please sign in to comment.