Skip to content

Commit

Permalink
Expand serializer API and use new function to help control when new U…
Browse files Browse the repository at this point in the history
…nsafeShuffle path is used.

Conflicts:
	core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala
  • Loading branch information
JoshRosen committed May 5, 2015
1 parent fec7b29 commit b9624ee
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ class KryoSerializer(conf: SparkConf)
override def newInstance(): SerializerInstance = {
new KryoSerializerInstance(this)
}

override def supportsRelocationOfSerializedObjects: Boolean = {
// TODO: we should have a citation / explanatory comment here clarifying _why_ this is the case
newInstance().asInstanceOf[KryoSerializerInstance].getAutoReset()
}
}

private[spark]
Expand Down
26 changes: 25 additions & 1 deletion core/src/main/scala/org/apache/spark/serializer/Serializer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.nio.ByteBuffer
import scala.reflect.ClassTag

import org.apache.spark.{SparkConf, SparkEnv}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.annotation.{Experimental, DeveloperApi}
import org.apache.spark.util.{Utils, ByteBufferInputStream, NextIterator}

/**
Expand Down Expand Up @@ -63,6 +63,30 @@ abstract class Serializer {

/** Creates a new [[SerializerInstance]]. */
def newInstance(): SerializerInstance

/**
* Returns true if this serializer supports relocation of its serialized objects and false
* otherwise. This should return true if and only if reordering the bytes of serialized objects
* in serialization stream output results in re-ordered input that can be read with the
* deserializer. For instance, the following should work if the serializer supports relocation:
*
* serOut.open()
* position = 0
* serOut.write(obj1)
* serOut.flush()
* position = # of bytes writen to stream so far
* obj1Bytes = [bytes 0 through position of stream]
* serOut.write(obj2)
* serOut.flush
* position2 = # of bytes written to stream so far
* obj2Bytes = bytes[position through position2 of stream]
*
* serIn.open([obj2bytes] concatenate [obj1bytes]) should return (obj2, obj1)
*
* See SPARK-7311 for more discussion.
*/
@Experimental
def supportsRelocationOfSerializedObjects: Boolean = false
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,7 @@ private[spark] class ExternalSorter[K, V, C](
private val kvChunkSize = conf.getInt("spark.shuffle.sort.kvChunkSize", 1 << 22) // 4 MB
private val useSerializedPairBuffer =
!ordering.isDefined && conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true) &&
ser.isInstanceOf[KryoSerializer] &&
serInstance.asInstanceOf[KryoSerializerInstance].getAutoReset
ser.supportsRelocationOfSerializedObjects

// Data structures to store in-memory objects before we spill. Depending on whether we have an
// Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we
Expand Down

0 comments on commit b9624ee

Please sign in to comment.