From 240864c9f860d41c9d2ad51ff78c9160e6e90992 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sat, 2 May 2015 00:20:40 -0700 Subject: [PATCH] Remove PrefixComputer and require prefix to be specified as part of insert() --- .../shuffle/unsafe/UnsafeShuffleWriter.java | 26 ++++--------------- .../spark/unsafe/sort/UnsafeSorter.java | 13 +--------- .../spark/unsafe/sort/UnsafeSorterSuite.java | 17 ++++-------- 3 files changed, 11 insertions(+), 45 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java index 3a5064f03cced..4d65016577872 100644 --- a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java @@ -134,7 +134,6 @@ private Iterator sortRecords( final UnsafeSorter sorter = new UnsafeSorter( memoryManager, RECORD_COMPARATOR, - PREFIX_COMPUTER, PREFIX_COMPARATOR, 4096 // Initial size (TODO: tune this!) ); @@ -156,17 +155,12 @@ private Iterator sortRecords( final int serializedRecordSize = serByteBuffer.position(); assert (serializedRecordSize > 0); - // TODO: we should run the partition extraction function _now_, at insert time, rather than - // requiring it to be stored alongisde the data, since this may lead to double storage - // Need 8 bytes to store the prefix (for later retrieval in the prefix computer), plus - // 4 to store the record length. - ensureSpaceInDataPage(serializedRecordSize + 8 + 4); + // Need 4 bytes to store the record length. + ensureSpaceInDataPage(serializedRecordSize + 4); final long recordAddress = memoryManager.encodePageNumberAndOffset(currentPage, currentPagePosition); final Object baseObject = currentPage.getBaseObject(); - PlatformDependent.UNSAFE.putLong(baseObject, currentPagePosition, partitionId); - currentPagePosition += 8; PlatformDependent.UNSAFE.putInt(baseObject, currentPagePosition, serializedRecordSize); currentPagePosition += 4; PlatformDependent.copyMemory( @@ -177,7 +171,7 @@ private Iterator sortRecords( serializedRecordSize); currentPagePosition += serializedRecordSize; - sorter.insertRecord(recordAddress); + sorter.insertRecord(recordAddress, partitionId); } return sorter.getSortedIterator(); @@ -211,10 +205,10 @@ private long[] writeSortedRecordsToFile( final Object baseObject = memoryManager.getPage(recordPointer.recordPointer); final long baseOffset = memoryManager.getOffsetInPage(recordPointer.recordPointer); - final int recordLength = (int) PlatformDependent.UNSAFE.getLong(baseObject, baseOffset + 8); + final int recordLength = (int) PlatformDependent.UNSAFE.getLong(baseObject, baseOffset); PlatformDependent.copyMemory( baseObject, - baseOffset + 8 + 4, + baseOffset + 4, arr, PlatformDependent.BYTE_ARRAY_OFFSET, recordLength); @@ -262,16 +256,6 @@ public int compare( } }; - private static final PrefixComputer PREFIX_COMPUTER = new PrefixComputer() { - @Override - public long computePrefix(Object baseObject, long baseOffset) { - // TODO: should the prefix be computed when inserting the record pointer rather than being - // read from the record itself? May be more efficient in terms of space, etc, and is a simple - // change. - return PlatformDependent.UNSAFE.getLong(baseObject, baseOffset); - } - }; - private static final PrefixComparator PREFIX_COMPARATOR = new PrefixComparator() { @Override public int compare(long prefix1, long prefix2) { diff --git a/core/src/main/java/org/apache/spark/unsafe/sort/UnsafeSorter.java b/core/src/main/java/org/apache/spark/unsafe/sort/UnsafeSorter.java index 7795ee6a5f0e2..adbbc0b1f3cb8 100644 --- a/core/src/main/java/org/apache/spark/unsafe/sort/UnsafeSorter.java +++ b/core/src/main/java/org/apache/spark/unsafe/sort/UnsafeSorter.java @@ -74,13 +74,6 @@ public abstract int compare( long rightBaseOffset); } - /** - * Given a pointer to a record, computes a prefix. - */ - public static abstract class PrefixComputer { - public abstract long computePrefix(Object baseObject, long baseOffset); - } - /** * Compares 8-byte key prefixes in prefix sort. Subclasses may implement type-specific * comparisons, such as lexicographic comparison for strings. @@ -90,7 +83,6 @@ public static abstract class PrefixComparator { } private final TaskMemoryManager memoryManager; - private final PrefixComputer prefixComputer; private final Sorter sorter; private final Comparator sortComparator; @@ -116,13 +108,11 @@ private void expandSortBuffer(int newSize) { public UnsafeSorter( final TaskMemoryManager memoryManager, final RecordComparator recordComparator, - PrefixComputer prefixComputer, final PrefixComparator prefixComparator, int initialSize) { assert (initialSize > 0); this.sortBuffer = new long[initialSize * 2]; this.memoryManager = memoryManager; - this.prefixComputer = prefixComputer; this.sorter = new Sorter(UnsafeSortDataFormat.INSTANCE); this.sortComparator = new Comparator() { @@ -149,13 +139,12 @@ public int compare(RecordPointerAndKeyPrefix left, RecordPointerAndKeyPrefix rig * * @param objectAddress pointer to a record in a data page, encoded by {@link TaskMemoryManager}. */ - public void insertRecord(long objectAddress) { + public void insertRecord(long objectAddress, long keyPrefix) { if (sortBufferInsertPosition + 2 == sortBuffer.length) { expandSortBuffer(sortBuffer.length * 2); } final Object baseObject = memoryManager.getPage(objectAddress); final long baseOffset = memoryManager.getOffsetInPage(objectAddress); - final long keyPrefix = prefixComputer.computePrefix(baseObject, baseOffset); sortBuffer[sortBufferInsertPosition] = objectAddress; sortBufferInsertPosition++; sortBuffer[sortBufferInsertPosition] = keyPrefix; diff --git a/core/src/test/java/org/apache/spark/unsafe/sort/UnsafeSorterSuite.java b/core/src/test/java/org/apache/spark/unsafe/sort/UnsafeSorterSuite.java index 2f88df1210bbc..aed115f83a368 100644 --- a/core/src/test/java/org/apache/spark/unsafe/sort/UnsafeSorterSuite.java +++ b/core/src/test/java/org/apache/spark/unsafe/sort/UnsafeSorterSuite.java @@ -49,7 +49,6 @@ public void testSortingEmptyInput() { final UnsafeSorter sorter = new UnsafeSorter( new TaskMemoryManager(new ExecutorMemoryManager(MemoryAllocator.HEAP)), mock(UnsafeSorter.RecordComparator.class), - mock(UnsafeSorter.PrefixComputer.class), mock(UnsafeSorter.PrefixComparator.class), 100); final Iterator iter = sorter.getSortedIterator(); @@ -104,14 +103,6 @@ public int compare( }; // Compute key prefixes based on the records' partition ids final HashPartitioner hashPartitioner = new HashPartitioner(4); - final UnsafeSorter.PrefixComputer prefixComputer = new UnsafeSorter.PrefixComputer() { - @Override - public long computePrefix(Object baseObject, long baseOffset) { - final String str = getStringFromDataPage(baseObject, baseOffset); - final int partitionId = hashPartitioner.getPartition(str); - return (long) partitionId; - } - }; // Use integer comparison for comparing prefixes (which are partition ids, in this case) final UnsafeSorter.PrefixComparator prefixComparator = new UnsafeSorter.PrefixComparator() { @Override @@ -119,15 +110,17 @@ public int compare(long prefix1, long prefix2) { return (int) prefix1 - (int) prefix2; } }; - final UnsafeSorter sorter = new UnsafeSorter(memoryManager, recordComparator, prefixComputer, - prefixComparator, dataToSort.length); + final UnsafeSorter sorter = new UnsafeSorter(memoryManager, recordComparator, prefixComparator, + dataToSort.length); // Given a page of records, insert those records into the sorter one-by-one: position = dataPage.getBaseOffset(); for (int i = 0; i < dataToSort.length; i++) { // position now points to the start of a record (which holds its length). final long recordLength = PlatformDependent.UNSAFE.getLong(baseObject, position); final long address = memoryManager.encodePageNumberAndOffset(dataPage, position); - sorter.insertRecord(address); + final String str = getStringFromDataPage(baseObject, position); + final int partitionId = hashPartitioner.getPartition(str); + sorter.insertRecord(address, partitionId); position += 8 + recordLength; } final Iterator iter = sorter.getSortedIterator();