Skip to content

Commit

Permalink
Optimize page compaction for dictionary blocks
Browse files Browse the repository at this point in the history
When a page is compacted, if there are dictionary blocks with the same
source id, this property should be retained even after the page is
compacted. Group all the dictionary blocks in a page with the same
source id and compact them together. This also allows the result blocks
to use the same ids block.
  • Loading branch information
nileema committed Jan 8, 2016
1 parent c9fbdec commit 1f83e1c
Show file tree
Hide file tree
Showing 3 changed files with 204 additions and 23 deletions.
55 changes: 55 additions & 0 deletions presto-spi/src/main/java/com/facebook/presto/spi/Page.java
Expand Up @@ -14,8 +14,14 @@
package com.facebook.presto.spi;

import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.block.DictionaryBlock;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;

import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -107,20 +113,47 @@ public void compact()

for (int i = 0; i < blocks.length; i++) {
Block block = blocks[i];
if (block instanceof DictionaryBlock) {
continue;
}
if (block.getSizeInBytes() < block.getRetainedSizeInBytes()) {
// Copy the block to compact its size
Block compactedBlock = block.copyRegion(0, block.getPositionCount());
blocks[i] = compactedBlock;
}
}

Map<UUID, DictionaryBlockIndexes> dictionaryBlocks = getRelatedDictionaryBlocks();
for (DictionaryBlockIndexes blockIndexes : dictionaryBlocks.values()) {
List<Block> compactBlocks = DictionaryBlock.compactBlocks(blockIndexes.getBlocks());
List<Integer> indexes = blockIndexes.getIndexes();
for (int i = 0; i < compactBlocks.size(); i++) {
blocks[indexes.get(i)] = compactBlocks.get(i);
}
}

long retainedSize = 0;
for (Block block : blocks) {
retainedSize += block.getRetainedSizeInBytes();
}
retainedSizeInBytes.set(retainedSize);
}

private Map<UUID, DictionaryBlockIndexes> getRelatedDictionaryBlocks()
{
Map<UUID, DictionaryBlockIndexes> relatedDictionaryBlocks = new HashMap<>();

for (int i = 0; i < blocks.length; i++) {
Block block = blocks[i];
if (block instanceof DictionaryBlock) {
UUID sourceId = ((DictionaryBlock) block).getDictionarySourceId();
relatedDictionaryBlocks.computeIfAbsent(sourceId, id -> new DictionaryBlockIndexes())
.addBlock(block, i);
}
}
return relatedDictionaryBlocks;
}

/**
* Assures that all data for the block is in memory.
*
Expand Down Expand Up @@ -154,4 +187,26 @@ private static int determinePositionCount(Block... blocks)

return blocks[0].getPositionCount();
}

private static class DictionaryBlockIndexes
{
private final List<Block> blocks = new ArrayList<>();
private final List<Integer> indexes = new ArrayList<>();

public void addBlock(Block block, int index)
{
blocks.add(block);
indexes.add(index);
}

public List<Block> getBlocks()
{
return blocks;
}

public List<Integer> getIndexes()
{
return indexes;
}
}
}
Expand Up @@ -19,14 +19,17 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.UUID;

import static com.facebook.presto.spi.block.BlockValidationUtil.checkValidPositions;
import static io.airlift.slice.SizeOf.SIZE_OF_INT;
import static io.airlift.slice.Slices.copyOf;
import static io.airlift.slice.Slices.wrappedIntArray;
import static java.util.Collections.singletonList;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;

public class DictionaryBlock
implements Block
Expand All @@ -51,6 +54,11 @@ public DictionaryBlock(int positionCount, Block dictionary, Slice ids, boolean d
this(positionCount, dictionary, ids, dictionaryIsCompacted, UUID.randomUUID());
}

public DictionaryBlock(int positionCount, Block dictionary, Slice ids, UUID dictionarySourceId)
{
this(positionCount, dictionary, ids, false, dictionarySourceId);
}

public DictionaryBlock(int positionCount, Block dictionary, Slice ids, boolean dictionaryIsCompacted, UUID dictionarySourceId)
{
requireNonNull(dictionary, "dictionary is null");
Expand All @@ -68,7 +76,6 @@ public DictionaryBlock(int positionCount, Block dictionary, Slice ids, boolean d
this.dictionary = dictionary;
this.ids = ids;
this.dictionarySourceId = requireNonNull(dictionarySourceId, "dictionarySourceId is null");

this.retainedSizeInBytes = INSTANCE_SIZE + dictionary.getRetainedSizeInBytes() + ids.getRetainedSize();

if (dictionaryIsCompacted) {
Expand All @@ -78,7 +85,7 @@ public DictionaryBlock(int positionCount, Block dictionary, Slice ids, boolean d
else {
int sizeInBytes = 0;
int uniqueIds = 0;
boolean[] isReferenced = getReferencedPositions();
boolean[] isReferenced = getReferencedPositions(dictionary, ids, positionCount);
for (int position = 0; position < isReferenced.length; position++) {
if (isReferenced[position]) {
if (!dictionary.isNull(position)) {
Expand Down Expand Up @@ -296,17 +303,7 @@ public boolean isCompact()

private int getIndex(int position)
{
return ids.getInt(position * SIZE_OF_INT);
}

private boolean[] getReferencedPositions()
{
int dictionarySize = dictionary.getPositionCount();
boolean[] isReferenced = new boolean[dictionarySize];
for (int i = 0; i < this.positionCount; i++) {
isReferenced[getIndex(i)] = true;
}
return isReferenced;
return getIndex(ids, position);
}

public DictionaryBlock compact()
Expand All @@ -315,8 +312,23 @@ public DictionaryBlock compact()
return this;
}

List<Block> compactBlocks = compactBlocks(singletonList(this));
return (DictionaryBlock) compactBlocks.get(0);
}

public static List<Block> compactBlocks(List<Block> blocks)
{
verifyEligibleToCompact(blocks);

DictionaryBlock dictionaryBlock = (DictionaryBlock) blocks.get(0);
Block dictionary = dictionaryBlock.getDictionary();
Slice ids = dictionaryBlock.getIds();

int positionCount = dictionaryBlock.getPositionCount();
int dictionarySize = dictionary.getPositionCount();
boolean[] isReferenced = getReferencedPositions();

boolean[] isReferenced = getReferencedPositions(dictionary, ids, positionCount);

List<Integer> dictionaryPositionsToCopy = new ArrayList<>(dictionarySize);
int[] remapIndex = new int[dictionarySize];
Arrays.fill(remapIndex, -1);
Expand All @@ -332,24 +344,70 @@ public DictionaryBlock compact()

// entire dictionary is referenced
if (dictionaryPositionsToCopy.size() == dictionarySize) {
return this;
return blocks;
}

Slice newIdsSlice = wrappedIntArray(getNewIds(positionCount, ids, remapIndex));
List<Block> outputDictionaryBlocks = new ArrayList<>(blocks.size());
UUID uuid = UUID.randomUUID();

for (Block block : blocks) {
dictionaryBlock = ((DictionaryBlock) block);
try {
Block compactDictionary = dictionaryBlock.getDictionary().copyPositions(dictionaryPositionsToCopy);
outputDictionaryBlocks.add(new DictionaryBlock(positionCount, compactDictionary, newIdsSlice, true, uuid));
}
catch (UnsupportedOperationException e) {
// ignore if copy positions is not supported for the dictionary
outputDictionaryBlocks.add(new DictionaryBlock(positionCount, dictionaryBlock.getDictionary(), dictionaryBlock.getIds()));
}
}
return outputDictionaryBlocks;
}

private static void verifyEligibleToCompact(List<Block> blocks)
{
for (Block block : blocks) {
if (!(block instanceof DictionaryBlock)) {
throw new IllegalArgumentException("block must be DictionaryBlock");
}
}

Set<UUID> sourceIds = blocks.stream()
.map(block -> ((DictionaryBlock) block).getDictionarySourceId())
.distinct()
.collect(toSet());

if (sourceIds.size() != 1) {
throw new IllegalArgumentException("dictionarySourceIds must be the same");
}
}

private static int[] getNewIds(int positionCount, Slice ids, int[] remapIndex)
{
int[] newIds = new int[positionCount];
for (int i = 0; i < positionCount; i++) {
int newId = remapIndex[getIndex(i)];
int newId = remapIndex[getIndex(ids, i)];
if (newId == -1) {
throw new IllegalStateException("reference to a non-existent key");
}
newIds[i] = newId;
}
try {
Block compactDictionary = dictionary.copyPositions(dictionaryPositionsToCopy);
return new DictionaryBlock(positionCount, compactDictionary, wrappedIntArray(newIds), true);
}
catch (UnsupportedOperationException e) {
// ignore if copy positions is not supported for the dictionary block
return this;
return newIds;
}

private static boolean[] getReferencedPositions(Block dictionary, Slice ids, int positionCount)
{
int dictionarySize = dictionary.getPositionCount();
boolean[] isReferenced = new boolean[dictionarySize];
for (int i = 0; i < positionCount; i++) {
isReferenced[getIndex(ids, i)] = true;
}
return isReferenced;
}

private static int getIndex(Slice ids, int i)
{
return ids.getInt(i * SIZE_OF_INT);
}
}
68 changes: 68 additions & 0 deletions presto-spi/src/test/java/com/facebook/presto/spi/TestPage.java
Expand Up @@ -13,9 +13,21 @@
*/
package com.facebook.presto.spi;

import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.block.BlockBuilder;
import com.facebook.presto.spi.block.BlockBuilderStatus;
import com.facebook.presto.spi.block.DictionaryBlock;
import com.facebook.presto.spi.block.SliceArrayBlock;
import io.airlift.slice.DynamicSliceOutput;
import io.airlift.slice.Slice;
import org.testng.annotations.Test;

import java.util.UUID;

import static com.facebook.presto.spi.type.BigintType.BIGINT;
import static io.airlift.slice.Slices.wrappedIntArray;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;

public class TestPage
{
Expand Down Expand Up @@ -47,4 +59,60 @@ public void testGetRegionFromNoColumnPage()
{
assertEquals(new Page(100).getRegion(0, 10).getPositionCount(), 10);
}

@Test
public void testCompactDictionaryBlocks()
throws Exception
{
Slice[] expectedValues = createExpectedValues(10);
BlockBuilder blockBuilder = BIGINT.createBlockBuilder(new BlockBuilderStatus(), expectedValues.length);
for (Slice expectedValue : expectedValues) {
BIGINT.writeLong(blockBuilder, expectedValue.length());
}
Block lengthsDictionary = blockBuilder.build();

// Create 2 dictionary blocks with the same source id
UUID commonSourceId = UUID.randomUUID();
DictionaryBlock commonSourceIdBlock1 = createDictionaryBlock(expectedValues, 100, commonSourceId);
DictionaryBlock commonSourceIdBlock2 = new DictionaryBlock(commonSourceIdBlock1.getPositionCount(), lengthsDictionary, commonSourceIdBlock1.getIds(), commonSourceId);

// Create block with a different source id
DictionaryBlock randomSourceIdBlock = createDictionaryBlock(expectedValues, 100, UUID.randomUUID());

Page page = new Page(commonSourceIdBlock1, randomSourceIdBlock, commonSourceIdBlock2);
page.compact();

// Blocks that had the same source id before compacting page should have the same source id after compacting page
assertNotEquals(((DictionaryBlock) page.getBlock(0)).getDictionarySourceId(), ((DictionaryBlock) page.getBlock(1)).getDictionarySourceId());
assertEquals(((DictionaryBlock) page.getBlock(0)).getDictionarySourceId(), ((DictionaryBlock) page.getBlock(2)).getDictionarySourceId());
}

private static Slice[] createExpectedValues(int positionCount)
{
Slice[] expectedValues = new Slice[positionCount];
for (int position = 0; position < positionCount; position++) {
expectedValues[position] = createExpectedValue(position);
}
return expectedValues;
}

protected static Slice createExpectedValue(int length)
{
DynamicSliceOutput dynamicSliceOutput = new DynamicSliceOutput(16);
for (int index = 0; index < length; index++) {
dynamicSliceOutput.writeByte(length * (index + 1));
}
return dynamicSliceOutput.slice();
}

private static DictionaryBlock createDictionaryBlock(Slice[] expectedValues, int positionCount, UUID uuid)
{
int dictionarySize = expectedValues.length;
int[] ids = new int[positionCount];

for (int i = 0; i < positionCount; i++) {
ids[i] = i % dictionarySize;
}
return new DictionaryBlock(positionCount, new SliceArrayBlock(dictionarySize, expectedValues), wrappedIntArray(ids), uuid);
}
}

0 comments on commit 1f83e1c

Please sign in to comment.