Skip to content

Commit

Permalink
Updated BytesToBytesMap's data encoding to put the key first.
Browse files Browse the repository at this point in the history
  • Loading branch information
rxin committed Aug 1, 2015
1 parent a51b641 commit 2e62ccb
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 26 deletions.
50 changes: 27 additions & 23 deletions core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -244,13 +244,13 @@ public boolean hasNext() {

@Override
public Location next() {
int keyLength = (int) PlatformDependent.UNSAFE.getLong(pageBaseObject, offsetInPage);
if (keyLength == END_OF_PAGE_MARKER) {
int totalLength = PlatformDependent.UNSAFE.getInt(pageBaseObject, offsetInPage);
if (totalLength == END_OF_PAGE_MARKER) {
advanceToNextPage();
keyLength = (int) PlatformDependent.UNSAFE.getLong(pageBaseObject, offsetInPage);
totalLength = PlatformDependent.UNSAFE.getInt(pageBaseObject, offsetInPage);
}
loc.with(pageBaseObject, offsetInPage);
offsetInPage += 8 + 8 + keyLength + loc.getValueLength();
offsetInPage += 8 + totalLength;
currentRecordNumber++;
return loc;
}
Expand Down Expand Up @@ -352,15 +352,18 @@ private void updateAddressesAndSizes(long fullKeyAddress) {
taskMemoryManager.getOffsetInPage(fullKeyAddress));
}

private void updateAddressesAndSizes(Object page, long keyOffsetInPage) {
long position = keyOffsetInPage;
keyLength = (int) PlatformDependent.UNSAFE.getLong(page, position);
position += 8; // word used to store the key size
keyMemoryLocation.setObjAndOffset(page, position);
position += keyLength;
valueLength = (int) PlatformDependent.UNSAFE.getLong(page, position);
position += 8; // word used to store the key size
valueMemoryLocation.setObjAndOffset(page, position);
private void updateAddressesAndSizes(final Object page, final long keyOffsetInPage) {
long position = keyOffsetInPage;
final int totalLength = PlatformDependent.UNSAFE.getInt(page, position);
position += 4;
keyLength = PlatformDependent.UNSAFE.getInt(page, position);
position += 4;
valueLength = totalLength - keyLength;

keyMemoryLocation.setObjAndOffset(page, position);

position += keyLength;
valueMemoryLocation.setObjAndOffset(page, position);
}

Location with(int pos, int keyHashcode, boolean isDefined) {
Expand Down Expand Up @@ -478,7 +481,7 @@ public boolean putNewKey(
// the key address instead of storing the absolute address of the value, the key and value
// must be stored in the same memory page.
// (8 byte key length) (key) (8 byte value length) (value)
final long requiredSize = 8 + keyLengthBytes + 8 + valueLengthBytes;
final long requiredSize = 8 + keyLengthBytes + valueLengthBytes;

// --- Figure out where to insert the new record ---------------------------------------------

Expand Down Expand Up @@ -508,7 +511,7 @@ public boolean putNewKey(
// There wasn't enough space in the current page, so write an end-of-page marker:
final Object pageBaseObject = currentDataPage.getBaseObject();
final long lengthOffsetInPage = currentDataPage.getBaseOffset() + pageCursor;
PlatformDependent.UNSAFE.putLong(pageBaseObject, lengthOffsetInPage, END_OF_PAGE_MARKER);
PlatformDependent.UNSAFE.putInt(pageBaseObject, lengthOffsetInPage, END_OF_PAGE_MARKER);
}
final long memoryGranted = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
if (memoryGranted != pageSizeBytes) {
Expand All @@ -535,37 +538,38 @@ public boolean putNewKey(
long insertCursor = dataPageInsertOffset;

// Compute all of our offsets up-front:
final long keySizeOffsetInPage = insertCursor;
insertCursor += 8; // word used to store the key size
final long totalLengthOffset = insertCursor;
insertCursor += 4;
final long keyLengthOffset = insertCursor;
insertCursor += 4;
final long keyDataOffsetInPage = insertCursor;
insertCursor += keyLengthBytes;
final long valueSizeOffsetInPage = insertCursor;
insertCursor += 8; // word used to store the value size
final long valueDataOffsetInPage = insertCursor;
insertCursor += valueLengthBytes; // word used to store the value size

PlatformDependent.UNSAFE.putInt(dataPageBaseObject, totalLengthOffset,
keyLengthBytes + valueLengthBytes);
PlatformDependent.UNSAFE.putInt(dataPageBaseObject, keyLengthOffset, keyLengthBytes);
// Copy the key
PlatformDependent.UNSAFE.putLong(dataPageBaseObject, keySizeOffsetInPage, keyLengthBytes);
PlatformDependent.copyMemory(
keyBaseObject, keyBaseOffset, dataPageBaseObject, keyDataOffsetInPage, keyLengthBytes);
// Copy the value
PlatformDependent.UNSAFE.putLong(dataPageBaseObject, valueSizeOffsetInPage, valueLengthBytes);
PlatformDependent.copyMemory(valueBaseObject, valueBaseOffset, dataPageBaseObject,
valueDataOffsetInPage, valueLengthBytes);

// --- Update bookeeping data structures -----------------------------------------------------

if (useOverflowPage) {
// Store the end-of-page marker at the end of the data page
PlatformDependent.UNSAFE.putLong(dataPageBaseObject, insertCursor, END_OF_PAGE_MARKER);
PlatformDependent.UNSAFE.putInt(dataPageBaseObject, insertCursor, END_OF_PAGE_MARKER);
} else {
pageCursor += requiredSize;
}

numElements++;
bitset.set(pos);
final long storedKeyAddress = taskMemoryManager.encodePageNumberAndOffset(
dataPage, keySizeOffsetInPage);
dataPage, totalLengthOffset);
longArray.set(pos * 2, storedKeyAddress);
longArray.set(pos * 2 + 1, keyHashcode);
updateAddressesAndSizes(storedKeyAddress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,21 @@ public void insertRecord(
sorter.insertRecord(recordAddress, prefix);
}

/**
* Write a record to the sorter. The record is broken down into two different parts, and
*
*/
public void insertRecord(
Object recordBaseObject1,
long recordBaseOffset1,
int lengthInBytes1,
Object recordBaseObject2,
long recordBaseOffset2,
int lengthInBytes2,
long prefix) throws IOException {

}

public UnsafeSorterIterator getSortedIterator() throws IOException {
final UnsafeSorterIterator inMemoryIterator = sorter.getSortedIterator();
int numIteratorsToMerge = spillWriters.size() + (inMemoryIterator.hasNext() ? 1 : 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,17 +243,17 @@ public void iteratorTest() throws Exception {
@Test
public void iteratingOverDataPagesWithWastedSpace() throws Exception {
final int NUM_ENTRIES = 1000 * 1000;
final int KEY_LENGTH = 16;
final int KEY_LENGTH = 24;
final int VALUE_LENGTH = 40;
final BytesToBytesMap map = new BytesToBytesMap(
taskMemoryManager, shuffleMemoryManager, NUM_ENTRIES, PAGE_SIZE_BYTES);
// Each record will take 8 + 8 + 16 + 40 = 72 bytes of space in the data page. Our 64-megabyte
// Each record will take 8 + 24 + 40 = 72 bytes of space in the data page. Our 64-megabyte
// pages won't be evenly-divisible by records of this size, which will cause us to waste some
// space at the end of the page. This is necessary in order for us to take the end-of-record
// handling branch in iterator().
try {
for (int i = 0; i < NUM_ENTRIES; i++) {
final long[] key = new long[] { i, i }; // 2 * 8 = 16 bytes
final long[] key = new long[] { i, i, i }; // 3 * 8 = 24 bytes
final long[] value = new long[] { i, i, i, i, i }; // 5 * 8 = 40 bytes
final BytesToBytesMap.Location loc = map.lookup(
key,
Expand Down

0 comments on commit 2e62ccb

Please sign in to comment.