Skip to content

Commit

Permalink
Rename UnsafeShuffleSorter to UnsafeShuffleInMemorySorter
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed May 12, 2015
1 parent e995d1a commit 56781a1
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@
* <p>
* Incoming records are appended to data pages. When all records have been inserted (or when the
* current thread's shuffle memory limit is reached), the in-memory records are sorted according to
* their partition ids (using a {@link UnsafeShuffleSorter}). The sorted records are then written
* to a single output file (or multiple files, if we've spilled). The format of the output files is
* the same as the format of the final output file written by
* their partition ids (using a {@link UnsafeShuffleInMemorySorter}). The sorted records are then
* written to a single output file (or multiple files, if we've spilled). The format of the output
* files is the same as the format of the final output file written by
* {@link org.apache.spark.shuffle.sort.SortShuffleWriter}: each output partition's records are
* written as a single serialized, compressed stream that can be read with a new decompression and
* deserialization stream.
Expand Down Expand Up @@ -86,7 +86,7 @@ final class UnsafeShuffleExternalSorter {
private final LinkedList<SpillInfo> spills = new LinkedList<SpillInfo>();

// All three of these variables are reset after spilling:
private UnsafeShuffleSorter sorter;
private UnsafeShuffleInMemorySorter sorter;
private MemoryBlock currentPage = null;
private long currentPagePosition = -1;
private long freeSpaceInCurrentPage = 0;
Expand Down Expand Up @@ -128,7 +128,7 @@ private void openSorter() throws IOException {
}
}

this.sorter = new UnsafeShuffleSorter(initialSize);
this.sorter = new UnsafeShuffleInMemorySorter(initialSize);
}

/**
Expand All @@ -153,7 +153,7 @@ private void writeSpillFile(boolean isSpill) throws IOException {
}

// This call performs the actual sort.
final UnsafeShuffleSorter.UnsafeShuffleSorterIterator sortedRecords =
final UnsafeShuffleInMemorySorter.UnsafeShuffleSorterIterator sortedRecords =
sorter.getSortedIterator();

// Currently, we need to open a new DiskBlockObjectWriter for each partition; we can avoid this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import org.apache.spark.util.collection.Sorter;

final class UnsafeShuffleSorter {
final class UnsafeShuffleInMemorySorter {

private final Sorter<PackedRecordPointer, long[]> sorter;
private static final class SortComparator implements Comparator<PackedRecordPointer> {
Expand All @@ -39,7 +39,7 @@ public int compare(PackedRecordPointer left, PackedRecordPointer right) {
*/
private int sortBufferInsertPosition = 0;

public UnsafeShuffleSorter(int initialSize) {
public UnsafeShuffleInMemorySorter(int initialSize) {
assert (initialSize > 0);
this.sortBuffer = new long[initialSize];
this.sorter = new Sorter<PackedRecordPointer, long[]>(UnsafeShuffleSortDataFormat.INSTANCE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.unsafe.memory.TaskMemoryManager;

public class UnsafeShuffleSorterSuite {
public class UnsafeShuffleInMemorySorterSuite {

private static String getStringFromDataPage(Object baseObject, long baseOffset, int strLength) {
final byte[] strBytes = new byte[strLength];
Expand All @@ -44,8 +44,8 @@ private static String getStringFromDataPage(Object baseObject, long baseOffset,

@Test
public void testSortingEmptyInput() {
final UnsafeShuffleSorter sorter = new UnsafeShuffleSorter(100);
final UnsafeShuffleSorter.UnsafeShuffleSorterIterator iter = sorter.getSortedIterator();
final UnsafeShuffleInMemorySorter sorter = new UnsafeShuffleInMemorySorter(100);
final UnsafeShuffleInMemorySorter.UnsafeShuffleSorterIterator iter = sorter.getSortedIterator();
assert(!iter.hasNext());
}

Expand All @@ -66,7 +66,7 @@ public void testBasicSorting() throws Exception {
new TaskMemoryManager(new ExecutorMemoryManager(MemoryAllocator.HEAP));
final MemoryBlock dataPage = memoryManager.allocatePage(2048);
final Object baseObject = dataPage.getBaseObject();
final UnsafeShuffleSorter sorter = new UnsafeShuffleSorter(4);
final UnsafeShuffleInMemorySorter sorter = new UnsafeShuffleInMemorySorter(4);
final HashPartitioner hashPartitioner = new HashPartitioner(4);

// Write the records into the data page and store pointers into the sorter
Expand All @@ -87,7 +87,7 @@ public void testBasicSorting() throws Exception {
}

// Sort the records
final UnsafeShuffleSorter.UnsafeShuffleSorterIterator iter = sorter.getSortedIterator();
final UnsafeShuffleInMemorySorter.UnsafeShuffleSorterIterator iter = sorter.getSortedIterator();
int prevPartitionId = -1;
Arrays.sort(dataToSort);
for (int i = 0; i < dataToSort.length; i++) {
Expand All @@ -111,7 +111,7 @@ public void testBasicSorting() throws Exception {

@Test
public void testSortingManyNumbers() throws Exception {
UnsafeShuffleSorter sorter = new UnsafeShuffleSorter(4);
UnsafeShuffleInMemorySorter sorter = new UnsafeShuffleInMemorySorter(4);
int[] numbersToSort = new int[128000];
Random random = new Random(16);
for (int i = 0; i < numbersToSort.length; i++) {
Expand All @@ -120,7 +120,7 @@ public void testSortingManyNumbers() throws Exception {
}
Arrays.sort(numbersToSort);
int[] sorterResult = new int[numbersToSort.length];
UnsafeShuffleSorter.UnsafeShuffleSorterIterator iter = sorter.getSortedIterator();
UnsafeShuffleInMemorySorter.UnsafeShuffleSorterIterator iter = sorter.getSortedIterator();
int j = 0;
while (iter.hasNext()) {
iter.loadNext();
Expand Down

0 comments on commit 56781a1

Please sign in to comment.