Skip to content

Commit

Permalink
Misc. cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed May 4, 2015
1 parent f156a8f commit 3490512
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 50 deletions.
Expand Up @@ -17,21 +17,18 @@

package org.apache.spark.shuffle.unsafe;

import org.apache.spark.*;
import org.apache.spark.unsafe.sort.ExternalSorterIterator;
import org.apache.spark.unsafe.sort.UnsafeExternalSorter;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;

import scala.Option;
import scala.Product2;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.LinkedList;

import com.esotericsoftware.kryo.io.ByteBufferOutputStream;

import org.apache.spark.*;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.scheduler.MapStatus;
import org.apache.spark.scheduler.MapStatus$;
Expand All @@ -44,10 +41,11 @@
import org.apache.spark.storage.BlockObjectWriter;
import org.apache.spark.storage.ShuffleBlockId;
import org.apache.spark.unsafe.PlatformDependent;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.unsafe.memory.TaskMemoryManager;

import static org.apache.spark.unsafe.sort.UnsafeSorter.*;
import org.apache.spark.unsafe.sort.ExternalSorterIterator;
import org.apache.spark.unsafe.sort.UnsafeExternalSorter;
import static org.apache.spark.unsafe.sort.UnsafeSorter.PrefixComparator;
import static org.apache.spark.unsafe.sort.UnsafeSorter.RecordComparator;

// IntelliJ gets confused and claims that this class should be abstract, but this actually compiles
public class UnsafeShuffleWriter<K, V> implements ShuffleWriter<K, V> {
Expand Down
Expand Up @@ -176,8 +176,7 @@ public void remove() {

public UnsafeSorterSpillMerger.MergeableIterator getMergeableIterator() {
sorter.sort(sortBuffer, 0, sortBufferInsertPosition / 2, sortComparator);
UnsafeSorterSpillMerger.MergeableIterator iter =
new UnsafeSorterSpillMerger.MergeableIterator() {
return new UnsafeSorterSpillMerger.MergeableIterator() {

private int position = 0;
private Object baseObject;
Expand Down Expand Up @@ -213,6 +212,5 @@ public long getBaseOffset() {
return baseOffset;
}
};
return iter;
}
}
Expand Up @@ -18,12 +18,11 @@
package org.apache.spark.unsafe.sort;

import java.util.Comparator;
import java.util.Iterator;
import java.util.PriorityQueue;

import static org.apache.spark.unsafe.sort.UnsafeSorter.*;

public final class UnsafeSorterSpillMerger {
final class UnsafeSorterSpillMerger {

private final PriorityQueue<MergeableIterator> priorityQueue;

Expand All @@ -39,13 +38,6 @@ public static abstract class MergeableIterator {
public abstract long getBaseOffset();
}

public static final class RecordAddressAndKeyPrefix {
public Object baseObject;
public long baseOffset;
public int recordLength;
public long keyPrefix;
}

public UnsafeSorterSpillMerger(
final RecordComparator recordComparator,
final UnsafeSorter.PrefixComparator prefixComparator) {
Expand Down Expand Up @@ -74,37 +66,29 @@ public void addSpill(MergeableIterator spillReader) {
priorityQueue.add(spillReader);
}

public Iterator<RecordAddressAndKeyPrefix> getSortedIterator() {
return new Iterator<RecordAddressAndKeyPrefix>() {
public ExternalSorterIterator getSortedIterator() {
return new ExternalSorterIterator() {

private MergeableIterator spillReader;
private final RecordAddressAndKeyPrefix record = new RecordAddressAndKeyPrefix();

@Override
public boolean hasNext() {
return !priorityQueue.isEmpty() || (spillReader != null && spillReader.hasNext());
}

@Override
public RecordAddressAndKeyPrefix next() {
public void loadNext() {
if (spillReader != null) {
if (spillReader.hasNext()) {
spillReader.loadNextRecord();
priorityQueue.add(spillReader);
}
}
spillReader = priorityQueue.remove();
record.baseObject = spillReader.getBaseObject();
record.baseOffset = spillReader.getBaseOffset();
record.keyPrefix = spillReader.getPrefix();
return record;
}

@Override
public void remove() {
throw new UnsupportedOperationException();
baseObject = spillReader.getBaseObject();
baseOffset = spillReader.getBaseOffset();
keyPrefix = spillReader.getPrefix();
}
};
}

}
Expand Up @@ -17,13 +17,14 @@

package org.apache.spark.unsafe.sort;

import java.io.*;

import com.google.common.io.ByteStreams;

import org.apache.spark.storage.BlockId;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.unsafe.PlatformDependent;

import java.io.*;

final class UnsafeSorterSpillReader extends UnsafeSorterSpillMerger.MergeableIterator {

private final File file;
Expand Down
Expand Up @@ -17,37 +17,38 @@

package org.apache.spark.unsafe.sort;

import java.io.*;
import java.nio.ByteBuffer;

import scala.Tuple2;
import scala.reflect.ClassTag;

import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.serializer.DeserializationStream;
import org.apache.spark.serializer.JavaSerializerInstance;
import org.apache.spark.serializer.SerializationStream;
import org.apache.spark.serializer.SerializerInstance;
import org.apache.spark.storage.BlockId;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.storage.BlockObjectWriter;
import org.apache.spark.storage.TempLocalBlockId;
import org.apache.spark.unsafe.PlatformDependent;
import scala.Tuple2;
import scala.reflect.ClassTag;

import java.io.*;
import java.nio.ByteBuffer;

final class UnsafeSorterSpillWriter {

private static final int SER_BUFFER_SIZE = 1024 * 1024; // TODO: tune this
public static final int EOF_MARKER = -1;
byte[] arr = new byte[SER_BUFFER_SIZE];
static final int EOF_MARKER = -1;

private byte[] arr = new byte[SER_BUFFER_SIZE];

private final File file;
private final BlockId blockId;
BlockObjectWriter writer;
DataOutputStream dos;
private BlockObjectWriter writer;
private DataOutputStream dos;

public UnsafeSorterSpillWriter(
BlockManager blockManager,
int fileBufferSize,
ShuffleWriteMetrics writeMetrics) throws IOException {
ShuffleWriteMetrics writeMetrics) {
final Tuple2<TempLocalBlockId, File> spilledFileInfo =
blockManager.diskBlockManager().createTempLocalBlock();
this.file = spilledFileInfo._2();
Expand Down Expand Up @@ -119,6 +120,8 @@ public void write(
public void close() throws IOException {
dos.writeInt(EOF_MARKER);
writer.commitAndClose();
writer = null;
dos = null;
arr = null;
}

Expand Down

0 comments on commit 3490512

Please sign in to comment.