Skip to content

Commit

Permalink
Add dictionary encoded blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
nileema committed Jul 31, 2015
1 parent 9809c52 commit dddbd70
Show file tree
Hide file tree
Showing 21 changed files with 1,162 additions and 122 deletions.
Expand Up @@ -469,9 +469,14 @@ public void load(LazySliceArrayBlock block)
{
checkState(batchId == expectedBatchId);
try {
SliceVector vector = new SliceVector(batchSize);
SliceVector vector = new SliceVector();
recordReader.readVector(hiveColumnIndex, vector);
block.setValues(vector.vector);
if (vector.dictionary) {
block.setValues(vector.vector, vector.ids, vector.isNull);
}
else {
block.setValues(vector.vector);
}
}
catch (IOException e) {
throw propagateException(e);
Expand Down
Expand Up @@ -409,7 +409,7 @@ protected void checkPageSource(ConnectorPageSource pageSource, List<TestColumn>
Object actualValue = row.getField(i);
Object expectedValue = testColumn.getExpectedValue();
if (actualValue == null) {
assertEquals(null, expectedValue, String.format("Expected null for column %d", i));
assertEquals(null, expectedValue, String.format("Expected non-null for column %d", i));
}
else if (testColumn.getObjectInspector().getTypeName().equals("float") ||
testColumn.getObjectInspector().getTypeName().equals("double")) {
Expand Down
Expand Up @@ -17,6 +17,7 @@
import com.facebook.presto.spi.block.BlockEncoding;
import com.facebook.presto.spi.block.BlockEncodingFactory;
import com.facebook.presto.spi.block.BlockEncodingSerde;
import com.facebook.presto.spi.block.DictionaryBlockEncoding;
import com.facebook.presto.spi.block.InterleavedBlockEncoding;
import com.facebook.presto.spi.block.FixedWidthBlockEncoding;
import com.facebook.presto.spi.block.LazySliceArrayBlockEncoding;
Expand Down Expand Up @@ -60,6 +61,7 @@ public BlockEncodingManager(TypeManager typeManager, Set<BlockEncodingFactory<?>
addBlockEncodingFactory(FixedWidthBlockEncoding.FACTORY);
addBlockEncodingFactory(SliceArrayBlockEncoding.FACTORY);
addBlockEncodingFactory(LazySliceArrayBlockEncoding.FACTORY);
addBlockEncodingFactory(DictionaryBlockEncoding.FACTORY);
addBlockEncodingFactory(ArrayBlockEncoding.FACTORY);
addBlockEncodingFactory(InterleavedBlockEncoding.FACTORY);

Expand Down
Expand Up @@ -223,6 +223,15 @@ private static Slice createGreaterValue(Slice expectedValue, int offset, int len
return greaterOutput.slice();
}

protected static Slice[] createExpectedValues(int positionCount)
{
Slice[] expectedValues = new Slice[positionCount];
for (int position = 0; position < positionCount; position++) {
expectedValues[position] = createExpectedValue(position);
}
return expectedValues;
}

protected static Slice createExpectedValue(int length)
{
DynamicSliceOutput dynamicSliceOutput = new DynamicSliceOutput(16);
Expand Down
@@ -0,0 +1,145 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.block;

import com.facebook.presto.spi.block.DictionaryBlock;
import com.facebook.presto.spi.block.SliceArrayBlock;
import com.google.common.primitives.Ints;
import io.airlift.slice.Slice;
import org.testng.annotations.Test;

import java.util.Arrays;
import java.util.List;

import static io.airlift.slice.Slices.wrappedIntArray;
import static org.testng.Assert.assertEquals;

public class TestDictionaryBlock
extends AbstractTestBlock
{
@Test
public void testCopyPositionsWithCompaction()
throws Exception
{
Slice[] expectedValues = createExpectedValues(10);
DictionaryBlock dictionaryBlock = createDictionaryBlock(expectedValues, 100);

List<Integer> positionsToCopy = Ints.asList(0, 10, 20, 30, 40);
DictionaryBlock copiedBlock = (DictionaryBlock) dictionaryBlock.copyPositions(positionsToCopy);

assertEquals(copiedBlock.getDictionary().getPositionCount(), 1);
assertEquals(copiedBlock.getPositionCount(), positionsToCopy.size());
assertBlock(copiedBlock.getDictionary(), Arrays.copyOfRange(expectedValues, 0, 1));
}

@Test
public void testCopyPositionsWithCompactionsAndReorder()
throws Exception
{
Slice[] expectedValues = createExpectedValues(10);
DictionaryBlock dictionaryBlock = createDictionaryBlock(expectedValues, 100);
List<Integer> positionsToCopy = Ints.asList(50, 55, 40, 45, 60);

DictionaryBlock copiedBlock = (DictionaryBlock) dictionaryBlock.copyPositions(positionsToCopy);

assertEquals(copiedBlock.getDictionary().getPositionCount(), 2);
assertEquals(copiedBlock.getPositionCount(), positionsToCopy.size());

assertBlock(copiedBlock.getDictionary(), new Slice[] { expectedValues[0], expectedValues[5] });
assertEquals(copiedBlock.getIds(), wrappedIntArray(0, 1, 0, 1, 0));
}

@Test
public void testCopyPositionsSamePosition()
throws Exception
{
Slice[] expectedValues = createExpectedValues(10);
DictionaryBlock dictionaryBlock = createDictionaryBlock(expectedValues, 100);
List<Integer> positionsToCopy = Ints.asList(52, 52, 52);

DictionaryBlock copiedBlock = (DictionaryBlock) dictionaryBlock.copyPositions(positionsToCopy);

assertEquals(copiedBlock.getDictionary().getPositionCount(), 1);
assertEquals(copiedBlock.getPositionCount(), positionsToCopy.size());

assertBlock(copiedBlock.getDictionary(), new Slice[] { expectedValues[2] });
assertEquals(copiedBlock.getIds(), wrappedIntArray(0, 0, 0));
}

@Test
public void testCopyPositionsNoCompaction()
throws Exception
{
Slice[] expectedValues = createExpectedValues(1);
DictionaryBlock dictionaryBlock = createDictionaryBlock(expectedValues, 100);

List<Integer> positionsToCopy = Ints.asList(0, 2, 4, 5);
DictionaryBlock copiedBlock = (DictionaryBlock) dictionaryBlock.copyPositions(positionsToCopy);

assertEquals(copiedBlock.getPositionCount(), positionsToCopy.size());
assertBlock(copiedBlock.getDictionary(), expectedValues);
}

@Test
public void testCompact()
throws Exception
{
Slice[] expectedValues = createExpectedValues(5);
DictionaryBlock dictionaryBlock = createDictionaryBlockWithUnreferencedKeys(expectedValues, 10);
DictionaryBlock compactBlock = dictionaryBlock.compact();

assertEquals(compactBlock.getDictionary().getPositionCount(), (expectedValues.length / 2) + 1);
assertBlock(compactBlock.getDictionary(), new Slice[] { expectedValues[0], expectedValues[1], expectedValues[3] });
assertEquals(compactBlock.getIds(), wrappedIntArray(0, 1, 1, 2, 2, 0, 1, 1, 2, 2));
}

@Test
public void testCompactAllKeysReferenced()
throws Exception
{
Slice[] expectedValues = createExpectedValues(5);
DictionaryBlock dictionaryBlock = createDictionaryBlock(expectedValues, 10);
DictionaryBlock compactBlock = dictionaryBlock.compact();

// When there is nothing to compact, we return the same block
assertEquals(compactBlock, dictionaryBlock);
}

private static DictionaryBlock createDictionaryBlockWithUnreferencedKeys(Slice[] expectedValues, int positionCount)
{
// adds references to 0 and all odd indexes
int dictionarySize = expectedValues.length;
int[] ids = new int[positionCount];

for (int i = 0; i < positionCount; i++) {
int index = i % dictionarySize;
if (index % 2 == 0 && index != 0) {
index--;
}
ids[i] = index;
}
return new DictionaryBlock(positionCount, new SliceArrayBlock(dictionarySize, expectedValues), wrappedIntArray(ids));
}

private static DictionaryBlock createDictionaryBlock(Slice[] expectedValues, int positionCount)
{
int dictionarySize = expectedValues.length;
int[] ids = new int[positionCount];

for (int i = 0; i < positionCount; i++) {
ids[i] = i % dictionarySize;
}
return new DictionaryBlock(positionCount, new SliceArrayBlock(dictionarySize, expectedValues), wrappedIntArray(ids));
}
}
Expand Up @@ -18,6 +18,10 @@
import io.airlift.slice.Slice;
import org.testng.annotations.Test;

import static com.facebook.presto.spi.block.SliceArrayBlock.getSliceArraySizeInBytes;
import static io.airlift.slice.SizeOf.SIZE_OF_BYTE;
import static io.airlift.slice.SizeOf.SIZE_OF_INT;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

Expand All @@ -35,7 +39,7 @@ public void test()
@Test
public void testRelease()
{
TestLazySliceArrayBlockLoader loader = new TestLazySliceArrayBlockLoader(new Slice[10]);
TestDirectLazySliceArrayBlockLoader loader = new TestDirectLazySliceArrayBlockLoader(new Slice[10]);
LazySliceArrayBlock block = new LazySliceArrayBlock(10, loader);

block.assureLoaded();
Expand All @@ -44,28 +48,52 @@ public void testRelease()
assertTrue(loader.loaded);
}

private static void assertVariableWithValues(Slice[] expectedValues)
@Test
public void testDirectBlock()
throws Exception
{
LazySliceArrayBlock block = new LazySliceArrayBlock(expectedValues.length, new TestLazySliceArrayBlockLoader(expectedValues));
assertBlock(block, expectedValues);
Slice[] expectedValues = createExpectedValues(10);
TestDirectLazySliceArrayBlockLoader loader = new TestDirectLazySliceArrayBlockLoader(expectedValues);
LazySliceArrayBlock block = new LazySliceArrayBlock(10, loader);

block.assureLoaded();
assertEquals(block.getValues(), expectedValues);
}

private static Slice[] createExpectedValues(int positionCount)
@Test
public void testDictionaryBlock()
throws Exception
{
Slice[] expectedValues = new Slice[positionCount];
for (int position = 0; position < positionCount; position++) {
expectedValues[position] = createExpectedValue(position);
}
return expectedValues;
Slice[] expectedValues = createExpectedValues(3);
int[] ids = new int[] {0, 2, 1, 0, 0, 0, 1, 1, 1, 0, 1, 2};
boolean[] isNull = new boolean[ids.length];

TestDictionaryLazySliceArrayBlockLoader loader = new TestDictionaryLazySliceArrayBlockLoader(expectedValues, ids, isNull);
LazySliceArrayBlock block = new LazySliceArrayBlock(ids.length, loader);

block.assureLoaded();

assertTrue(block.isDictionary());
assertEquals(block.getValues(), expectedValues);
assertEquals(block.getIds(), ids);

int expectedSizeInBytes = getSliceArraySizeInBytes(expectedValues) + (ids.length * SIZE_OF_INT) + (isNull.length * SIZE_OF_BYTE);
assertEquals(block.getSizeInBytes(), expectedSizeInBytes);
}

private static void assertVariableWithValues(Slice[] expectedValues)
{
LazySliceArrayBlock block = new LazySliceArrayBlock(expectedValues.length, new TestDirectLazySliceArrayBlockLoader(expectedValues));
assertBlock(block, expectedValues);
}

private static class TestLazySliceArrayBlockLoader
private static class TestDirectLazySliceArrayBlockLoader
implements LazyBlockLoader<LazySliceArrayBlock>
{
private final Slice[] expectedValues;
private boolean loaded;

public TestLazySliceArrayBlockLoader(Slice[] expectedValues)
public TestDirectLazySliceArrayBlockLoader(Slice[] expectedValues)
{
this.expectedValues = expectedValues;
}
Expand All @@ -81,4 +109,31 @@ public void load(LazySliceArrayBlock block)
loaded = true;
}
}

private static class TestDictionaryLazySliceArrayBlockLoader
implements LazyBlockLoader<LazySliceArrayBlock>
{
private final Slice[] values;
private final int[] ids;
private final boolean[] isNull;
private boolean loaded;

public TestDictionaryLazySliceArrayBlockLoader(Slice[] values, int[] ids, boolean[] isNull)
{
this.values = values;
this.ids = ids;
this.isNull = isNull;
}

@Override
public void load(LazySliceArrayBlock block)
{
if (values == null) {
fail("load should not be called");
}

block.setValues(values, ids, isNull);
loaded = true;
}
}
}
Expand Up @@ -43,13 +43,4 @@ private static void assertVariableWithValues(Slice[] expectedValues)
SliceArrayBlock block = new SliceArrayBlock(expectedValues.length, expectedValues);
assertBlock(block, expectedValues);
}

private static Slice[] createExpectedValues(int positionCount)
{
Slice[] expectedValues = new Slice[positionCount];
for (int position = 0; position < positionCount; position++) {
expectedValues[position] = createExpectedValue(position);
}
return expectedValues;
}
}
Expand Up @@ -60,13 +60,4 @@ private static BlockBuilder createBlockBuilderWithValues(Slice[] expectedValues)
}
return blockBuilder;
}

private static Slice[] createExpectedValues(int positionCount)
{
Slice[] expectedValues = new Slice[positionCount];
for (int position = 0; position < positionCount; position++) {
expectedValues[position] = createExpectedValue(position);
}
return expectedValues;
}
}
Expand Up @@ -285,6 +285,12 @@ public void readVector(Type type, int columnIndex, Object vector)
streamReaders[columnIndex].readBatch(type, vector);
}

public StreamReader getStreamReader(int index)
{
checkArgument(index < streamReaders.length, "index does not exist");
return streamReaders[index];
}

private boolean advanceToNextRowGroup()
throws IOException
{
Expand Down

0 comments on commit dddbd70

Please sign in to comment.