Skip to content

Commit

Permalink
WIP on UnsafeSorter
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed May 1, 2015
1 parent 27de6fe commit 81d52c5
Show file tree
Hide file tree
Showing 3 changed files with 236 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.unsafe.sort;

import org.apache.spark.util.collection.SortDataFormat;

/**
* TODO: finish writing this description
*
* Within each long[] buffer, position {@code 2 * i} holds a pointer pointer to the record at
* index {@code i}, while position {@code 2 * i + 1} in the array holds an 8-byte key prefix.
*/
final class UnsafeSortDataFormat
extends SortDataFormat<UnsafeSortDataFormat.KeyPointerAndPrefix, long[]> {

public static final UnsafeSortDataFormat INSTANCE = new UnsafeSortDataFormat();

private UnsafeSortDataFormat() { };

public static final class KeyPointerAndPrefix {
/**
* A pointer to a record; see {@link org.apache.spark.unsafe.memory.TaskMemoryManager} for a
* description of how these addresses are encoded.
*/
long recordPointer;

/**
* A key prefix, for use in comparisons.
*/
long keyPrefix;
}

@Override
public KeyPointerAndPrefix getKey(long[] data, int pos) {
// Since we re-use keys, this method shouldn't be called.
throw new UnsupportedOperationException();
}

@Override
public KeyPointerAndPrefix newKey() {
return new KeyPointerAndPrefix();
}

@Override
public KeyPointerAndPrefix getKey(long[] data, int pos, KeyPointerAndPrefix reuse) {
reuse.recordPointer = data[pos * 2];
reuse.keyPrefix = data[pos * 2 + 1];
return reuse;
}

@Override
public void swap(long[] data, int pos0, int pos1) {
long tempPointer = data[pos0 * 2];
long tempKeyPrefix = data[pos0 * 2 + 1];
data[pos0 * 2] = data[pos1 * 2];
data[pos0 * 2 + 1] = data[pos1 * 2 + 1];
data[pos1 * 2] = tempPointer;
data[pos1 * 2 + 1] = tempKeyPrefix;
}

@Override
public void copyElement(long[] src, int srcPos, long[] dst, int dstPos) {
dst[dstPos * 2] = src[srcPos * 2];
dst[dstPos * 2 + 1] = src[srcPos * 2 + 1];
}

@Override
public void copyRange(long[] src, int srcPos, long[] dst, int dstPos, int length) {
System.arraycopy(src, srcPos * 2, dst, dstPos * 2, length * 2);
}

@Override
public long[] allocate(int length) {
assert (length < Integer.MAX_VALUE / 2) : "Length " + length + " is too large";
return new long[length * 2];
}

}
136 changes: 136 additions & 0 deletions core/src/main/java/org/apache/spark/unsafe/sort/UnsafeSorter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.unsafe.sort;

import java.util.Comparator;
import java.util.Iterator;

import org.apache.spark.unsafe.memory.MemoryLocation;
import org.apache.spark.util.collection.Sorter;
import org.apache.spark.unsafe.memory.TaskMemoryManager;
import static org.apache.spark.unsafe.sort.UnsafeSortDataFormat.KeyPointerAndPrefix;

public final class UnsafeSorter {

public static abstract class RecordComparator {
public abstract int compare(
Object leftBaseObject,
long leftBaseOffset,
Object rightBaseObject,
long rightBaseOffset);
}

public static abstract class PrefixComputer {
public abstract long computePrefix(Object baseObject, long baseOffset);
}

/**
* Compares 8-byte key prefixes in prefix sort. Subclasses may implement type-specific comparisons,
* such as lexicographic comparison for strings.
*/
public static abstract class PrefixComparator {
public abstract int compare(long prefix1, long prefix2);
}

private final TaskMemoryManager memoryManager;
private final PrefixComputer prefixComputer;
private final Sorter<KeyPointerAndPrefix, long[]> sorter;
private final Comparator<KeyPointerAndPrefix> sortComparator;

/**
* Within this buffer, position {@code 2 * i} holds a pointer pointer to the record at
* index {@code i}, while position {@code 2 * i + 1} in the array holds an 8-byte key prefix.
*/
private long[] sortBuffer = new long[1024];

private int sortBufferInsertPosition = 0;

private void expandSortBuffer(int newSize) {
assert (newSize > sortBuffer.length);
final long[] oldBuffer = sortBuffer;
sortBuffer = new long[newSize];
System.arraycopy(oldBuffer, 0, sortBuffer, 0, oldBuffer.length);
}

public UnsafeSorter(
final TaskMemoryManager memoryManager,
final RecordComparator recordComparator,
PrefixComputer prefixComputer,
final PrefixComparator prefixComparator) {
this.memoryManager = memoryManager;
this.prefixComputer = prefixComputer;
this.sorter =
new Sorter<KeyPointerAndPrefix, long[]>(UnsafeSortDataFormat.INSTANCE);
this.sortComparator = new Comparator<KeyPointerAndPrefix>() {
@Override
public int compare(KeyPointerAndPrefix left, KeyPointerAndPrefix right) {
if (left.keyPrefix == right.keyPrefix) {
final Object leftBaseObject = memoryManager.getPage(left.recordPointer);
final long leftBaseOffset = memoryManager.getOffsetInPage(left.recordPointer);
final Object rightBaseObject = memoryManager.getPage(right.recordPointer);
final long rightBaseOffset = memoryManager.getOffsetInPage(right.recordPointer);
return recordComparator.compare(
leftBaseObject, leftBaseOffset, rightBaseObject, rightBaseOffset);
} else {
return prefixComparator.compare(left.keyPrefix, right.keyPrefix);
}
}
};
}

public void insertRecord(long objectAddress) {
if (sortBufferInsertPosition + 2 == sortBuffer.length) {
expandSortBuffer(sortBuffer.length * 2);
}
final Object baseObject = memoryManager.getPage(objectAddress);
final long baseOffset = memoryManager.getOffsetInPage(objectAddress);
final long keyPrefix = prefixComputer.computePrefix(baseObject, baseOffset);
sortBuffer[sortBufferInsertPosition] = objectAddress;
sortBuffer[sortBufferInsertPosition + 1] = keyPrefix;
sortBufferInsertPosition += 2;
}

public Iterator<MemoryLocation> getSortedIterator() {
final MemoryLocation memoryLocation = new MemoryLocation();
sorter.sort(sortBuffer, 0, sortBufferInsertPosition, sortComparator);
return new Iterator<MemoryLocation>() {
int position = 0;

@Override
public boolean hasNext() {
return position < sortBufferInsertPosition;
}

@Override
public MemoryLocation next() {
final long address = sortBuffer[position];
position += 2;
final Object baseObject = memoryManager.getPage(address);
final long baseOffset = memoryManager.getOffsetInPage(address);
memoryLocation.setObjAndOffset(baseObject, baseOffset);
return memoryLocation;
}

@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.apache.spark.unsafe.sort;

/**
* Created by joshrosen on 4/29/15.
*/
public class UnsafeSorterSuite {
}

0 comments on commit 81d52c5

Please sign in to comment.