Skip to content

Commit

Permalink
Address a number of minor review comments:
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed May 7, 2015
1 parent 8a6fe52 commit cfe0ec4
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,44 +26,51 @@
import java.io.OutputStream;
import java.nio.ByteBuffer;

class DummySerializerInstance extends SerializerInstance {
/**
* Unfortunately, we need a serializer instance in order to construct a DiskBlockObjectWriter.
* Our shuffle write path doesn't actually use this serializer (since we end up calling the
* `write() OutputStream methods), but DiskBlockObjectWriter still calls some methods on it. To work
* around this, we pass a dummy no-op serializer.
*/
final class DummySerializerInstance extends SerializerInstance {

public static final DummySerializerInstance INSTANCE = new DummySerializerInstance();

private DummySerializerInstance() { }

@Override
public SerializationStream serializeStream(OutputStream s) {
return new SerializationStream() {
@Override
public void flush() {

}
public void flush() { }

@Override
public <T> SerializationStream writeObject(T t, ClassTag<T> ev1) {
return null;
throw new UnsupportedOperationException();
}

@Override
public void close() {

}
public void close() { }
};
}

@Override
public <T> ByteBuffer serialize(T t, ClassTag<T> ev1) {
return null;
throw new UnsupportedOperationException();
}

@Override
public DeserializationStream deserializeStream(InputStream s) {
return null;
throw new UnsupportedOperationException();
}

@Override
public <T> T deserialize(ByteBuffer bytes, ClassLoader loader, ClassTag<T> ev1) {
return null;
throw new UnsupportedOperationException();
}

@Override
public <T> T deserialize(ByteBuffer bytes, ClassTag<T> ev1) {
return null;
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ private SpillInfo writeSpillFile() throws IOException {
// Our write path doesn't actually use this serializer (since we end up calling the `write()`
// OutputStream methods), but DiskBlockObjectWriter still calls some methods on it. To work
// around this, we pass a dummy no-op serializer.
final SerializerInstance ser = new DummySerializerInstance();
final SerializerInstance ser = DummySerializerInstance.INSTANCE;
// TODO: audit the metrics-related code and ensure proper metrics integration:
// It's not clear how we should handle shuffle write metrics for spill files; currently, Spark
// doesn't report IO time spent writing spill files (see SPARK-7413). This method,
Expand Down Expand Up @@ -238,13 +238,12 @@ private long getMemoryUsage() {

private long freeMemory() {
long memoryFreed = 0;
final Iterator<MemoryBlock> iter = allocatedPages.iterator();
while (iter.hasNext()) {
memoryManager.freePage(iter.next());
shuffleMemoryManager.release(PAGE_SIZE);
memoryFreed += PAGE_SIZE;
iter.remove();
for (MemoryBlock block : allocatedPages) {
memoryManager.freePage(block);
shuffleMemoryManager.release(block.size());
memoryFreed += block.size();
}
allocatedPages.clear();
currentPage = null;
currentPagePosition = -1;
return memoryFreed;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ public void copyRange(long[] src, int srcPos, long[] dst, int dstPos, int length

@Override
public long[] allocate(int length) {
assert (length < Integer.MAX_VALUE) : "Length " + length + " is too large";
return new long[length];
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,13 @@
public final class UnsafeShuffleSorter {

private final Sorter<PackedRecordPointer, long[]> sorter;
private final Comparator<PackedRecordPointer> sortComparator;
private static final class SortComparator implements Comparator<PackedRecordPointer> {
@Override
public int compare(PackedRecordPointer left, PackedRecordPointer right) {
return left.getPartitionId() - right.getPartitionId();
}
}
private static final SortComparator SORT_COMPARATOR = new SortComparator();

private long[] sortBuffer;

Expand All @@ -36,14 +42,7 @@ public final class UnsafeShuffleSorter {
public UnsafeShuffleSorter(int initialSize) {
assert (initialSize > 0);
this.sortBuffer = new long[initialSize];
this.sorter =
new Sorter<PackedRecordPointer, long[]>(UnsafeShuffleSortDataFormat.INSTANCE);
this.sortComparator = new Comparator<PackedRecordPointer>() {
@Override
public int compare(PackedRecordPointer left, PackedRecordPointer right) {
return left.getPartitionId() - right.getPartitionId();
}
};
this.sorter = new Sorter<PackedRecordPointer, long[]>(UnsafeShuffleSortDataFormat.INSTANCE);
}

public void expandSortBuffer() {
Expand Down Expand Up @@ -81,11 +80,10 @@ public static abstract class UnsafeShuffleSorterIterator {
}

/**
* Return an iterator over record pointers in sorted order. For efficiency, all calls to
* {@code next()} will return the same mutable object.
* Return an iterator over record pointers in sorted order.
*/
public UnsafeShuffleSorterIterator getSortedIterator() {
sorter.sort(sortBuffer, 0, sortBufferInsertPosition, sortComparator);
sorter.sort(sortBuffer, 0, sortBufferInsertPosition, SORT_COMPARATOR);
return new UnsafeShuffleSorterIterator() {

private int position = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,13 +218,7 @@ private[spark] class DiskBlockObjectWriter(
recordWritten()
}

override def write(b: Int): Unit = {
if (!initialized) {
open()
}

bs.write(b)
}
override def write(b: Int): Unit = throw new UnsupportedOperationException()

override def write(kvBytes: Array[Byte], offs: Int, len: Int): Unit = {
if (!initialized) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ public DiskBlockObjectWriter answer(InvocationOnMock invocationOnMock) throws Th
}

@Test
@SuppressWarnings("unchecked")
public void basicShuffleWriting() throws Exception {

final ShuffleDependency<Object, Object, Object> dep = mock(ShuffleDependency.class);
Expand Down

0 comments on commit cfe0ec4

Please sign in to comment.