From 57f1ec04f9e4444b034a4aefa47dcea1eca2603b Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 4 May 2015 23:05:34 -0700 Subject: [PATCH] WIP towards packed record pointers for use in optimized shuffle sort. --- .../shuffle/unsafe/PackedRecordPointer.java | 74 +++++++++++++++++++ .../unsafe/PackedRecordPointerSuite.java | 57 ++++++++++++++ 2 files changed, 131 insertions(+) create mode 100644 core/src/main/java/org/apache/spark/shuffle/unsafe/PackedRecordPointer.java create mode 100644 core/src/test/java/org/apache/spark/shuffle/unsafe/PackedRecordPointerSuite.java diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/PackedRecordPointer.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/PackedRecordPointer.java new file mode 100644 index 0000000000000..34c15e6bbcb0e --- /dev/null +++ b/core/src/main/java/org/apache/spark/shuffle/unsafe/PackedRecordPointer.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.unsafe; + +/** + * Wrapper around an 8-byte word that holds a 24-bit partition number and 40-bit record pointer. + */ +final class PackedRecordPointer { + + /** Bit mask for the lower 40 bits of a long. */ + private static final long MASK_LONG_LOWER_40_BITS = 0xFFFFFFFFFFL; + + /** Bit mask for the upper 24 bits of a long */ + private static final long MASK_LONG_UPPER_24_BITS = ~MASK_LONG_LOWER_40_BITS; + + /** Bit mask for the lower 27 bits of a long. */ + private static final long MASK_LONG_LOWER_27_BITS = 0x7FFFFFFL; + + /** Bit mask for the lower 51 bits of a long. */ + private static final long MASK_LONG_LOWER_51_BITS = 0x7FFFFFFFFFFFFL; + + /** Bit mask for the upper 13 bits of a long */ + private static final long MASK_LONG_UPPER_13_BITS = ~MASK_LONG_LOWER_51_BITS; + + // TODO: this shifting is probably extremely inefficient; this is just for prototyping + + /** + * Pack a record address and partition id into a single word. + * + * @param recordPointer a record pointer encoded by TaskMemoryManager. + * @param partitionId a shuffle partition id (maximum value of 2^24). + * @return a packed pointer that can be decoded using the {@link PackedRecordPointer} class. + */ + public static long packPointer(long recordPointer, int partitionId) { + // Note that without word alignment we can address 2^27 bytes = 128 megabytes per page. + // Also note that this relies on some internals of how TaskMemoryManager encodes its addresses. + final int pageNumber = (int) ((recordPointer & MASK_LONG_UPPER_13_BITS) >>> 51); + final long compressedAddress = + (((long) pageNumber) << 27) | (recordPointer & MASK_LONG_LOWER_27_BITS); + return (((long) partitionId) << 40) | compressedAddress; + } + + public long packedRecordPointer; + + public int getPartitionId() { + return (int) ((packedRecordPointer & MASK_LONG_UPPER_24_BITS) >>> 40); + } + + public long getRecordPointer() { + final long compressedAddress = packedRecordPointer & MASK_LONG_LOWER_40_BITS; + final long pageNumber = (compressedAddress << 24) & MASK_LONG_UPPER_13_BITS; + final long offsetInPage = compressedAddress & MASK_LONG_LOWER_27_BITS; + return pageNumber | offsetInPage; + } + + public int getRecordLength() { + return -1; // TODO + } +} diff --git a/core/src/test/java/org/apache/spark/shuffle/unsafe/PackedRecordPointerSuite.java b/core/src/test/java/org/apache/spark/shuffle/unsafe/PackedRecordPointerSuite.java new file mode 100644 index 0000000000000..53554520b22b1 --- /dev/null +++ b/core/src/test/java/org/apache/spark/shuffle/unsafe/PackedRecordPointerSuite.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.unsafe; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.spark.unsafe.memory.ExecutorMemoryManager; +import org.apache.spark.unsafe.memory.MemoryAllocator; +import org.apache.spark.unsafe.memory.MemoryBlock; +import org.apache.spark.unsafe.memory.TaskMemoryManager; + +public class PackedRecordPointerSuite { + + @Test + public void heap() { + final TaskMemoryManager memoryManager = + new TaskMemoryManager(new ExecutorMemoryManager(MemoryAllocator.HEAP)); + final MemoryBlock page0 = memoryManager.allocatePage(100); + final MemoryBlock page1 = memoryManager.allocatePage(100); + final long addressInPage1 = memoryManager.encodePageNumberAndOffset(page1, 42); + PackedRecordPointer packedPointerWrapper = new PackedRecordPointer(); + packedPointerWrapper.packedRecordPointer = PackedRecordPointer.packPointer(addressInPage1, 360); + Assert.assertEquals(360, packedPointerWrapper.getPartitionId()); + Assert.assertEquals(addressInPage1, packedPointerWrapper.getRecordPointer()); + memoryManager.cleanUpAllAllocatedMemory(); + } + + @Test + public void offHeap() { + final TaskMemoryManager memoryManager = + new TaskMemoryManager(new ExecutorMemoryManager(MemoryAllocator.UNSAFE)); + final MemoryBlock page0 = memoryManager.allocatePage(100); + final MemoryBlock page1 = memoryManager.allocatePage(100); + final long addressInPage1 = memoryManager.encodePageNumberAndOffset(page1, 42); + PackedRecordPointer packedPointerWrapper = new PackedRecordPointer(); + packedPointerWrapper.packedRecordPointer = PackedRecordPointer.packPointer(addressInPage1, 360); + Assert.assertEquals(360, packedPointerWrapper.getPartitionId()); + Assert.assertEquals(addressInPage1, packedPointerWrapper.getRecordPointer()); + memoryManager.cleanUpAllAllocatedMemory(); + } +}