Skip to content

Commit

Permalink
Minor simplification and sanity checks in UnsafeSorter
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed May 1, 2015
1 parent 767d3ca commit 3db12de
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 6 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ conf/*.properties
conf/*.conf
conf/*.xml
conf/slaves
core/build/py4j/
docs/_site
docs/api
target/
Expand Down
19 changes: 15 additions & 4 deletions core/src/main/java/org/apache/spark/unsafe/sort/UnsafeSorter.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,22 @@ public static final class KeyPointerAndPrefix {
* A pointer to a record; see {@link org.apache.spark.unsafe.memory.TaskMemoryManager} for a
* description of how these addresses are encoded.
*/
long recordPointer;
public long recordPointer;

/**
* A key prefix, for use in comparisons.
*/
long keyPrefix;
public long keyPrefix;

@Override
public int hashCode() {
throw new UnsupportedOperationException();
}

@Override
public boolean equals(Object obj) {
throw new UnsupportedOperationException();
}
}

public static abstract class RecordComparator {
Expand Down Expand Up @@ -115,8 +125,9 @@ public void insertRecord(long objectAddress) {
final long baseOffset = memoryManager.getOffsetInPage(objectAddress);
final long keyPrefix = prefixComputer.computePrefix(baseObject, baseOffset);
sortBuffer[sortBufferInsertPosition] = objectAddress;
sortBuffer[sortBufferInsertPosition + 1] = keyPrefix;
sortBufferInsertPosition += 2;
sortBufferInsertPosition++;
sortBuffer[sortBufferInsertPosition] = keyPrefix;
sortBufferInsertPosition++;
}

public Iterator<KeyPointerAndPrefix> getSortedIterator() {
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,8 @@ object SparkEnv extends Logging {
// Let the user specify short names for shuffle managers
val shortShuffleMgrNames = Map(
"hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
"sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
"sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager",
"unsafe" -> "org.apache.spark.shuffle.unsafe.UnsafeShuffleManager")
val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.storage._
import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap}
import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}

/** A group of writers for a ShuffleMapTask, one writer per reducer. */
/** A group of writers for ShuffleMapTask, one writer per reducer. */
private[spark] trait ShuffleWriterGroup {
val writers: Array[BlockObjectWriter]

Expand Down

0 comments on commit 3db12de

Please sign in to comment.