Skip to content

Commit

Permalink
WIP in mega-refactoring towards shuffle-specific sort.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed May 5, 2015
1 parent 57f1ec0 commit f480fb2
Show file tree
Hide file tree
Showing 16 changed files with 497 additions and 1,074 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.unsafe;

import org.apache.spark.serializer.DeserializationStream;
import org.apache.spark.serializer.SerializationStream;
import org.apache.spark.serializer.SerializerInstance;
import scala.reflect.ClassTag;

import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;

class DummySerializerInstance extends SerializerInstance {
@Override
public SerializationStream serializeStream(OutputStream s) {
return new SerializationStream() {
@Override
public void flush() {

}

@Override
public <T> SerializationStream writeObject(T t, ClassTag<T> ev1) {
return null;
}

@Override
public void close() {

}
};
}

@Override
public <T> ByteBuffer serialize(T t, ClassTag<T> ev1) {
return null;
}

@Override
public DeserializationStream deserializeStream(InputStream s) {
return null;
}

@Override
public <T> T deserialize(ByteBuffer bytes, ClassLoader loader, ClassTag<T> ev1) {
return null;
}

@Override
public <T> T deserialize(ByteBuffer bytes, ClassTag<T> ev1) {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,20 @@
* limitations under the License.
*/

package org.apache.spark.unsafe.sort;
package org.apache.spark.shuffle.unsafe;

import java.io.IOException;
import org.apache.spark.storage.BlockId;

public abstract class UnsafeSorterIterator {
import java.io.File;

public abstract boolean hasNext();
final class SpillInfo {
final long[] partitionLengths;
final File file;
final BlockId blockId;

public abstract void loadNext() throws IOException;

public abstract Object getBaseObject();

public abstract long getBaseOffset();

public abstract int getRecordLength();

public abstract long getKeyPrefix();
public SpillInfo(int numPartitions, File file, BlockId blockId) {
this.partitionLengths = new long[numPartitions];
this.file = file;
this.blockId = blockId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.unsafe;

import org.apache.spark.util.collection.SortDataFormat;

final class UnsafeShuffleSortDataFormat extends SortDataFormat<PackedRecordPointer, long[]> {

public static final UnsafeShuffleSortDataFormat INSTANCE = new UnsafeShuffleSortDataFormat();

private UnsafeShuffleSortDataFormat() { }

@Override
public PackedRecordPointer getKey(long[] data, int pos) {
// Since we re-use keys, this method shouldn't be called.
throw new UnsupportedOperationException();
}

@Override
public PackedRecordPointer newKey() {
return new PackedRecordPointer();
}

@Override
public PackedRecordPointer getKey(long[] data, int pos, PackedRecordPointer reuse) {
reuse.packedRecordPointer = data[pos];
return reuse;
}

@Override
public void swap(long[] data, int pos0, int pos1) {
final long temp = data[pos0];
data[pos0] = data[pos1];
data[pos1] = temp;
}

@Override
public void copyElement(long[] src, int srcPos, long[] dst, int dstPos) {
dst[dstPos] = src[srcPos];
}

@Override
public void copyRange(long[] src, int srcPos, long[] dst, int dstPos, int length) {
System.arraycopy(src, srcPos, dst, dstPos, length);
}

@Override
public long[] allocate(int length) {
assert (length < Integer.MAX_VALUE) : "Length " + length + " is too large";
return new long[length];
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.unsafe;

import java.util.Comparator;

import org.apache.spark.util.collection.Sorter;

public final class UnsafeShuffleSorter {

private final Sorter<PackedRecordPointer, long[]> sorter;
private final Comparator<PackedRecordPointer> sortComparator;

private long[] sortBuffer;

/**
* The position in the sort buffer where new records can be inserted.
*/
private int sortBufferInsertPosition = 0;

public UnsafeShuffleSorter(int initialSize) {
assert (initialSize > 0);
this.sortBuffer = new long[initialSize];
this.sorter =
new Sorter<PackedRecordPointer, long[]>(UnsafeShuffleSortDataFormat.INSTANCE);
this.sortComparator = new Comparator<PackedRecordPointer>() {
@Override
public int compare(PackedRecordPointer left, PackedRecordPointer right) {
return left.getPartitionId() - right.getPartitionId();
}
};
}

public void expandSortBuffer() {
final long[] oldBuffer = sortBuffer;
sortBuffer = new long[oldBuffer.length * 2];
System.arraycopy(oldBuffer, 0, sortBuffer, 0, oldBuffer.length);
}

public boolean hasSpaceForAnotherRecord() {
return sortBufferInsertPosition + 1 < sortBuffer.length;
}

public long getMemoryUsage() {
return sortBuffer.length * 8L;
}

// TODO: clairify assumption that pointer points to record length.
public void insertRecord(long recordPointer, int partitionId) {
if (!hasSpaceForAnotherRecord()) {
expandSortBuffer();
}
sortBuffer[sortBufferInsertPosition] =
PackedRecordPointer.packPointer(recordPointer, partitionId);
sortBufferInsertPosition++;
}

public static abstract class UnsafeShuffleSorterIterator {

final PackedRecordPointer packedRecordPointer = new PackedRecordPointer();

public abstract boolean hasNext();

public abstract void loadNext();

}

/**
* Return an iterator over record pointers in sorted order. For efficiency, all calls to
* {@code next()} will return the same mutable object.
*/
public UnsafeShuffleSorterIterator getSortedIterator() {
sorter.sort(sortBuffer, 0, sortBufferInsertPosition, sortComparator);
return new UnsafeShuffleSorterIterator() {

private int position = 0;

@Override
public boolean hasNext() {
return position < sortBufferInsertPosition;
}

@Override
public void loadNext() {
packedRecordPointer.packedRecordPointer = sortBuffer[position];
position++;
}
};
}
}
Loading

0 comments on commit f480fb2

Please sign in to comment.