diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/DummySerializerInstance.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/DummySerializerInstance.java index ab174c3ca921a..1d31a46993a22 100644 --- a/core/src/main/java/org/apache/spark/shuffle/unsafe/DummySerializerInstance.java +++ b/core/src/main/java/org/apache/spark/shuffle/unsafe/DummySerializerInstance.java @@ -26,44 +26,51 @@ import java.io.OutputStream; import java.nio.ByteBuffer; -class DummySerializerInstance extends SerializerInstance { +/** + * Unfortunately, we need a serializer instance in order to construct a DiskBlockObjectWriter. + * Our shuffle write path doesn't actually use this serializer (since we end up calling the + * `write() OutputStream methods), but DiskBlockObjectWriter still calls some methods on it. To work + * around this, we pass a dummy no-op serializer. + */ +final class DummySerializerInstance extends SerializerInstance { + + public static final DummySerializerInstance INSTANCE = new DummySerializerInstance(); + + private DummySerializerInstance() { } + @Override public SerializationStream serializeStream(OutputStream s) { return new SerializationStream() { @Override - public void flush() { - - } + public void flush() { } @Override public SerializationStream writeObject(T t, ClassTag ev1) { - return null; + throw new UnsupportedOperationException(); } @Override - public void close() { - - } + public void close() { } }; } @Override public ByteBuffer serialize(T t, ClassTag ev1) { - return null; + throw new UnsupportedOperationException(); } @Override public DeserializationStream deserializeStream(InputStream s) { - return null; + throw new UnsupportedOperationException(); } @Override public T deserialize(ByteBuffer bytes, ClassLoader loader, ClassTag ev1) { - return null; + throw new UnsupportedOperationException(); } @Override public T deserialize(ByteBuffer bytes, ClassTag ev1) { - return null; + throw new UnsupportedOperationException(); } } diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java index 64ef0f2c07820..10efa670dda1e 100644 --- a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java +++ b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java @@ -155,7 +155,7 @@ private SpillInfo writeSpillFile() throws IOException { // Our write path doesn't actually use this serializer (since we end up calling the `write()` // OutputStream methods), but DiskBlockObjectWriter still calls some methods on it. To work // around this, we pass a dummy no-op serializer. - final SerializerInstance ser = new DummySerializerInstance(); + final SerializerInstance ser = DummySerializerInstance.INSTANCE; // TODO: audit the metrics-related code and ensure proper metrics integration: // It's not clear how we should handle shuffle write metrics for spill files; currently, Spark // doesn't report IO time spent writing spill files (see SPARK-7413). This method, @@ -238,13 +238,12 @@ private long getMemoryUsage() { private long freeMemory() { long memoryFreed = 0; - final Iterator iter = allocatedPages.iterator(); - while (iter.hasNext()) { - memoryManager.freePage(iter.next()); - shuffleMemoryManager.release(PAGE_SIZE); - memoryFreed += PAGE_SIZE; - iter.remove(); + for (MemoryBlock block : allocatedPages) { + memoryManager.freePage(block); + shuffleMemoryManager.release(block.size()); + memoryFreed += block.size(); } + allocatedPages.clear(); currentPage = null; currentPagePosition = -1; return memoryFreed; diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleSortDataFormat.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleSortDataFormat.java index d7afa1a906428..862845180584e 100644 --- a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleSortDataFormat.java +++ b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleSortDataFormat.java @@ -61,7 +61,6 @@ public void copyRange(long[] src, int srcPos, long[] dst, int dstPos, int length @Override public long[] allocate(int length) { - assert (length < Integer.MAX_VALUE) : "Length " + length + " is too large"; return new long[length]; } diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleSorter.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleSorter.java index eb46776efe12c..d9ffe9a44fec7 100644 --- a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleSorter.java +++ b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleSorter.java @@ -24,7 +24,13 @@ public final class UnsafeShuffleSorter { private final Sorter sorter; - private final Comparator sortComparator; + private static final class SortComparator implements Comparator { + @Override + public int compare(PackedRecordPointer left, PackedRecordPointer right) { + return left.getPartitionId() - right.getPartitionId(); + } + } + private static final SortComparator SORT_COMPARATOR = new SortComparator(); private long[] sortBuffer; @@ -36,14 +42,7 @@ public final class UnsafeShuffleSorter { public UnsafeShuffleSorter(int initialSize) { assert (initialSize > 0); this.sortBuffer = new long[initialSize]; - this.sorter = - new Sorter(UnsafeShuffleSortDataFormat.INSTANCE); - this.sortComparator = new Comparator() { - @Override - public int compare(PackedRecordPointer left, PackedRecordPointer right) { - return left.getPartitionId() - right.getPartitionId(); - } - }; + this.sorter = new Sorter(UnsafeShuffleSortDataFormat.INSTANCE); } public void expandSortBuffer() { @@ -81,11 +80,10 @@ public static abstract class UnsafeShuffleSorterIterator { } /** - * Return an iterator over record pointers in sorted order. For efficiency, all calls to - * {@code next()} will return the same mutable object. + * Return an iterator over record pointers in sorted order. */ public UnsafeShuffleSorterIterator getSortedIterator() { - sorter.sort(sortBuffer, 0, sortBufferInsertPosition, sortComparator); + sorter.sort(sortBuffer, 0, sortBufferInsertPosition, SORT_COMPARATOR); return new UnsafeShuffleSorterIterator() { private int position = 0; diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala index 0254025122237..8bc4e205bc3c6 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala @@ -218,13 +218,7 @@ private[spark] class DiskBlockObjectWriter( recordWritten() } - override def write(b: Int): Unit = { - if (!initialized) { - open() - } - - bs.write(b) - } + override def write(b: Int): Unit = throw new UnsupportedOperationException() override def write(kvBytes: Array[Byte], offs: Int, len: Int): Unit = { if (!initialized) { diff --git a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java index 9008cc2de9bd5..55c447327ef35 100644 --- a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java @@ -118,6 +118,7 @@ public DiskBlockObjectWriter answer(InvocationOnMock invocationOnMock) throws Th } @Test + @SuppressWarnings("unchecked") public void basicShuffleWriting() throws Exception { final ShuffleDependency dep = mock(ShuffleDependency.class);