diff --git a/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala index 994a8c049a331..5641903958fb5 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala @@ -22,6 +22,9 @@ import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle._ import org.apache.spark.shuffle.sort.SortShuffleManager +/** + * Subclass of [[BaseShuffleHandle]], used to identify when we've chosen to use the new shuffle. + */ private class UnsafeShuffleHandle[K, V]( shuffleId: Int, override val numMaps: Int, @@ -30,6 +33,10 @@ private class UnsafeShuffleHandle[K, V]( } private[spark] object UnsafeShuffleManager extends Logging { + /** + * Helper method for determining whether a shuffle should use the optimized unsafe shuffle + * path or whether it should fall back to the original sort-based shuffle. + */ def canUseUnsafeShuffle[K, V, C](dependency: ShuffleDependency[K, V, C]): Boolean = { val shufId = dependency.shuffleId val serializer = Serializer.getSerializer(dependency.serializer) @@ -43,6 +50,10 @@ private[spark] object UnsafeShuffleManager extends Logging { } else if (dependency.keyOrdering.isDefined) { log.debug(s"Can't use UnsafeShuffle for shuffle $shufId because a key ordering is defined") false + } else if (dependency.partitioner.numPartitions > PackedRecordPointer.MAXIMUM_PARTITION_ID) { + log.debug(s"Can't use UnsafeShuffle for shuffle $shufId because it has more than " + + s"${PackedRecordPointer.MAXIMUM_PARTITION_ID} partitions") + false } else { log.debug(s"Can use UnsafeShuffle for shuffle $shufId") true @@ -50,6 +61,52 @@ private[spark] object UnsafeShuffleManager extends Logging { } } +/** + * A shuffle implementation that uses directly-managed memory to implement several performance + * optimizations for certain types of shuffles. In cases where the new performance optimizations + * cannot be applied, this shuffle manager delegates to [[SortShuffleManager]] to handle those + * shuffles. + * + * UnsafeShuffleManager's optimizations will apply when _all_ of the following conditions hold: + * + * - The shuffle dependency specifies no aggregation or output ordering. + * - The shuffle serializer supports relocation of serialized values (this is currently supported + * by KryoSerializer and Spark SQL's custom serializers). + * - The shuffle produces fewer than 16777216 output partitions. + * - No individual record is larger than 128 MB when serialized. + * + * In addition, extra spill-merging optimizations are automatically applied when the shuffle + * compression codec supports concatenation of serialized streams. This is currently supported by + * Spark's LZF serializer. + * + * At a high-level, UnsafeShuffleManager's design is similar to Spark's existing SortShuffleManager. + * In sort-based shuffle, incoming records are sorted according to their target partition ids, then + * written to a single map output file. Reducers fetch contiguous regions of this file in order to + * read their portion of the map output. In cases where the map output data is too large to fit in + * memory, sorted subsets of the output can are spilled to disk and those on-disk files are merged + * to produce the final output file. + * + * UnsafeShuffleManager optimizes this process in several ways: + * + * - Its sort operates on serialized binary data rather than Java objects, which reduces memory + * consumption and GC overheads. This optimization requires the record serializer to have certain + * properties to allow serialized records to be re-ordered without requiring deserialization. + * See SPARK-4550, where this optimization was first proposed and implemented, for more details. + * + * - It uses a specialized cache-efficient sorter ([[UnsafeShuffleExternalSorter]]) that sorts + * arrays of compressed record pointers and partition ids. By using only 8 bytes of space per + * record in the sorting array, this fits more of the array into cache. + * + * - The spill merging procedure operates on blocks of serialized records that belong to the same + * partition and does not need to deserialize records during the merge. + * + * - When the spill compression codec supports concatenation of compressed data, the spill merge + * simply concatenates the serialized and compressed spill partitions to produce the final output + * partition. This allows efficient data copying methods, like NIO's `transferTo`, to be used + * and avoids the need to allocate decompression or copying buffers during the merge. + * + * For more details on UnsafeShuffleManager's design, see SPARK-7081. + */ private[spark] class UnsafeShuffleManager(conf: SparkConf) extends ShuffleManager { private[this] val sortShuffleManager: SortShuffleManager = new SortShuffleManager(conf) diff --git a/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManagerSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManagerSuite.scala new file mode 100644 index 0000000000000..9c91948bdc1e4 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManagerSuite.scala @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.unsafe + +import org.mockito.Mockito._ +import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer +import org.scalatest.{FunSuite, Matchers} + +import org.apache.spark._ +import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, Serializer} + +/** + * Tests for the fallback logic in UnsafeShuffleManager. Actual tests of shuffling data are + * performed in other suites. + */ +class UnsafeShuffleManagerSuite extends FunSuite with Matchers { + + import UnsafeShuffleManager.canUseUnsafeShuffle + + private class RuntimeExceptionAnswer extends Answer[Object] { + override def answer(invocation: InvocationOnMock): Object = { + throw new RuntimeException("Called non-stubbed method, " + invocation.getMethod.getName) + } + } + + private def shuffleDep( + partitioner: Partitioner, + serializer: Option[Serializer], + keyOrdering: Option[Ordering[Any]], + aggregator: Option[Aggregator[Any, Any, Any]], + mapSideCombine: Boolean): ShuffleDependency[Any, Any, Any] = { + val dep = mock(classOf[ShuffleDependency[Any, Any, Any]], new RuntimeExceptionAnswer()) + doReturn(0).when(dep).shuffleId + doReturn(partitioner).when(dep).partitioner + doReturn(serializer).when(dep).serializer + doReturn(keyOrdering).when(dep).keyOrdering + doReturn(aggregator).when(dep).aggregator + doReturn(mapSideCombine).when(dep).mapSideCombine + dep + } + + test("supported shuffle dependencies") { + val kryo = Some(new KryoSerializer(new SparkConf())) + + assert(canUseUnsafeShuffle(shuffleDep( + partitioner = new HashPartitioner(2), + serializer = kryo, + keyOrdering = None, + aggregator = None, + mapSideCombine = false + ))) + + val rangePartitioner = mock(classOf[RangePartitioner[Any, Any]]) + when(rangePartitioner.numPartitions).thenReturn(2) + assert(canUseUnsafeShuffle(shuffleDep( + partitioner = rangePartitioner, + serializer = kryo, + keyOrdering = None, + aggregator = None, + mapSideCombine = false + ))) + + } + + test("unsupported shuffle dependencies") { + val kryo = Some(new KryoSerializer(new SparkConf())) + val java = Some(new JavaSerializer(new SparkConf())) + + // We only support serializers that support object relocation + assert(!canUseUnsafeShuffle(shuffleDep( + partitioner = new HashPartitioner(2), + serializer = java, + keyOrdering = None, + aggregator = None, + mapSideCombine = false + ))) + + // We do not support shuffles with more than 16 million output partitions + assert(!canUseUnsafeShuffle(shuffleDep( + partitioner = new HashPartitioner(PackedRecordPointer.MAXIMUM_PARTITION_ID + 1), + serializer = kryo, + keyOrdering = None, + aggregator = None, + mapSideCombine = false + ))) + + // We do not support shuffles that perform any kind of aggregation or sorting of keys + assert(!canUseUnsafeShuffle(shuffleDep( + partitioner = new HashPartitioner(2), + serializer = kryo, + keyOrdering = Some(mock(classOf[Ordering[Any]])), + aggregator = None, + mapSideCombine = false + ))) + assert(!canUseUnsafeShuffle(shuffleDep( + partitioner = new HashPartitioner(2), + serializer = kryo, + keyOrdering = None, + aggregator = Some(mock(classOf[Aggregator[Any, Any, Any]])), + mapSideCombine = false + ))) + // We do not support shuffles that perform any kind of aggregation or sorting of keys + assert(!canUseUnsafeShuffle(shuffleDep( + partitioner = new HashPartitioner(2), + serializer = kryo, + keyOrdering = Some(mock(classOf[Ordering[Any]])), + aggregator = Some(mock(classOf[Aggregator[Any, Any, Any]])), + mapSideCombine = true + ))) + } + +}