Skip to content

Commit

Permalink
Remove PrefixComputer and require prefix to be specified as part of i…
Browse files Browse the repository at this point in the history
…nsert()
  • Loading branch information
JoshRosen committed May 2, 2015
1 parent 1433b42 commit 240864c
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ private Iterator<RecordPointerAndKeyPrefix> sortRecords(
final UnsafeSorter sorter = new UnsafeSorter(
memoryManager,
RECORD_COMPARATOR,
PREFIX_COMPUTER,
PREFIX_COMPARATOR,
4096 // Initial size (TODO: tune this!)
);
Expand All @@ -156,17 +155,12 @@ private Iterator<RecordPointerAndKeyPrefix> sortRecords(

final int serializedRecordSize = serByteBuffer.position();
assert (serializedRecordSize > 0);
// TODO: we should run the partition extraction function _now_, at insert time, rather than
// requiring it to be stored alongisde the data, since this may lead to double storage
// Need 8 bytes to store the prefix (for later retrieval in the prefix computer), plus
// 4 to store the record length.
ensureSpaceInDataPage(serializedRecordSize + 8 + 4);
// Need 4 bytes to store the record length.
ensureSpaceInDataPage(serializedRecordSize + 4);

final long recordAddress =
memoryManager.encodePageNumberAndOffset(currentPage, currentPagePosition);
final Object baseObject = currentPage.getBaseObject();
PlatformDependent.UNSAFE.putLong(baseObject, currentPagePosition, partitionId);
currentPagePosition += 8;
PlatformDependent.UNSAFE.putInt(baseObject, currentPagePosition, serializedRecordSize);
currentPagePosition += 4;
PlatformDependent.copyMemory(
Expand All @@ -177,7 +171,7 @@ private Iterator<RecordPointerAndKeyPrefix> sortRecords(
serializedRecordSize);
currentPagePosition += serializedRecordSize;

sorter.insertRecord(recordAddress);
sorter.insertRecord(recordAddress, partitionId);
}

return sorter.getSortedIterator();
Expand Down Expand Up @@ -211,10 +205,10 @@ private long[] writeSortedRecordsToFile(

final Object baseObject = memoryManager.getPage(recordPointer.recordPointer);
final long baseOffset = memoryManager.getOffsetInPage(recordPointer.recordPointer);
final int recordLength = (int) PlatformDependent.UNSAFE.getLong(baseObject, baseOffset + 8);
final int recordLength = (int) PlatformDependent.UNSAFE.getLong(baseObject, baseOffset);
PlatformDependent.copyMemory(
baseObject,
baseOffset + 8 + 4,
baseOffset + 4,
arr,
PlatformDependent.BYTE_ARRAY_OFFSET,
recordLength);
Expand Down Expand Up @@ -262,16 +256,6 @@ public int compare(
}
};

private static final PrefixComputer PREFIX_COMPUTER = new PrefixComputer() {
@Override
public long computePrefix(Object baseObject, long baseOffset) {
// TODO: should the prefix be computed when inserting the record pointer rather than being
// read from the record itself? May be more efficient in terms of space, etc, and is a simple
// change.
return PlatformDependent.UNSAFE.getLong(baseObject, baseOffset);
}
};

private static final PrefixComparator PREFIX_COMPARATOR = new PrefixComparator() {
@Override
public int compare(long prefix1, long prefix2) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,6 @@ public abstract int compare(
long rightBaseOffset);
}

/**
* Given a pointer to a record, computes a prefix.
*/
public static abstract class PrefixComputer {
public abstract long computePrefix(Object baseObject, long baseOffset);
}

/**
* Compares 8-byte key prefixes in prefix sort. Subclasses may implement type-specific
* comparisons, such as lexicographic comparison for strings.
Expand All @@ -90,7 +83,6 @@ public static abstract class PrefixComparator {
}

private final TaskMemoryManager memoryManager;
private final PrefixComputer prefixComputer;
private final Sorter<RecordPointerAndKeyPrefix, long[]> sorter;
private final Comparator<RecordPointerAndKeyPrefix> sortComparator;

Expand All @@ -116,13 +108,11 @@ private void expandSortBuffer(int newSize) {
public UnsafeSorter(
final TaskMemoryManager memoryManager,
final RecordComparator recordComparator,
PrefixComputer prefixComputer,
final PrefixComparator prefixComparator,
int initialSize) {
assert (initialSize > 0);
this.sortBuffer = new long[initialSize * 2];
this.memoryManager = memoryManager;
this.prefixComputer = prefixComputer;
this.sorter =
new Sorter<RecordPointerAndKeyPrefix, long[]>(UnsafeSortDataFormat.INSTANCE);
this.sortComparator = new Comparator<RecordPointerAndKeyPrefix>() {
Expand All @@ -149,13 +139,12 @@ public int compare(RecordPointerAndKeyPrefix left, RecordPointerAndKeyPrefix rig
*
* @param objectAddress pointer to a record in a data page, encoded by {@link TaskMemoryManager}.
*/
public void insertRecord(long objectAddress) {
public void insertRecord(long objectAddress, long keyPrefix) {
if (sortBufferInsertPosition + 2 == sortBuffer.length) {
expandSortBuffer(sortBuffer.length * 2);
}
final Object baseObject = memoryManager.getPage(objectAddress);
final long baseOffset = memoryManager.getOffsetInPage(objectAddress);
final long keyPrefix = prefixComputer.computePrefix(baseObject, baseOffset);
sortBuffer[sortBufferInsertPosition] = objectAddress;
sortBufferInsertPosition++;
sortBuffer[sortBufferInsertPosition] = keyPrefix;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ public void testSortingEmptyInput() {
final UnsafeSorter sorter = new UnsafeSorter(
new TaskMemoryManager(new ExecutorMemoryManager(MemoryAllocator.HEAP)),
mock(UnsafeSorter.RecordComparator.class),
mock(UnsafeSorter.PrefixComputer.class),
mock(UnsafeSorter.PrefixComparator.class),
100);
final Iterator<UnsafeSorter.RecordPointerAndKeyPrefix> iter = sorter.getSortedIterator();
Expand Down Expand Up @@ -104,30 +103,24 @@ public int compare(
};
// Compute key prefixes based on the records' partition ids
final HashPartitioner hashPartitioner = new HashPartitioner(4);
final UnsafeSorter.PrefixComputer prefixComputer = new UnsafeSorter.PrefixComputer() {
@Override
public long computePrefix(Object baseObject, long baseOffset) {
final String str = getStringFromDataPage(baseObject, baseOffset);
final int partitionId = hashPartitioner.getPartition(str);
return (long) partitionId;
}
};
// Use integer comparison for comparing prefixes (which are partition ids, in this case)
final UnsafeSorter.PrefixComparator prefixComparator = new UnsafeSorter.PrefixComparator() {
@Override
public int compare(long prefix1, long prefix2) {
return (int) prefix1 - (int) prefix2;
}
};
final UnsafeSorter sorter = new UnsafeSorter(memoryManager, recordComparator, prefixComputer,
prefixComparator, dataToSort.length);
final UnsafeSorter sorter = new UnsafeSorter(memoryManager, recordComparator, prefixComparator,
dataToSort.length);
// Given a page of records, insert those records into the sorter one-by-one:
position = dataPage.getBaseOffset();
for (int i = 0; i < dataToSort.length; i++) {
// position now points to the start of a record (which holds its length).
final long recordLength = PlatformDependent.UNSAFE.getLong(baseObject, position);
final long address = memoryManager.encodePageNumberAndOffset(dataPage, position);
sorter.insertRecord(address);
final String str = getStringFromDataPage(baseObject, position);
final int partitionId = hashPartitioner.getPartition(str);
sorter.insertRecord(address, partitionId);
position += 8 + recordLength;
}
final Iterator<UnsafeSorter.RecordPointerAndKeyPrefix> iter = sorter.getSortedIterator();
Expand Down

0 comments on commit 240864c

Please sign in to comment.