Skip to content

Commit

Permalink
Add copyPositions API to Block
Browse files Browse the repository at this point in the history
  • Loading branch information
nileema committed Jul 25, 2015
1 parent 85858d8 commit a7ebcba
Show file tree
Hide file tree
Showing 18 changed files with 303 additions and 4 deletions.
Expand Up @@ -18,6 +18,8 @@
import com.facebook.presto.spi.block.BlockEncoding;
import io.airlift.slice.Slice;

import java.util.List;

import static com.facebook.presto.spi.type.BigintType.BIGINT;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkNotNull;
Expand Down Expand Up @@ -189,6 +191,12 @@ public BlockEncoding getEncoding()
return block.getEncoding();
}

@Override
public Block copyPositions(List<Integer> positions)
{
return block.copyPositions(positions);
}

@Override
public void assureLoaded()
{
Expand Down
Expand Up @@ -17,6 +17,8 @@
import com.facebook.presto.spi.block.BlockBuilder;
import io.airlift.slice.Slice;

import java.util.List;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
Expand Down Expand Up @@ -69,6 +71,12 @@ public RunLengthBlockEncoding getEncoding()
return new RunLengthBlockEncoding(value.getEncoding());
}

@Override
public Block copyPositions(List<Integer> positions)
{
return new RunLengthEncodedBlock(value.copyRegion(0, 1), positions.size());
}

@Override
public Block getRegion(int positionOffset, int length)
{
Expand Down
Expand Up @@ -17,12 +17,14 @@
import com.facebook.presto.spi.block.BlockBuilder;
import com.facebook.presto.spi.block.BlockBuilderStatus;
import com.facebook.presto.spi.block.BlockEncoding;
import com.google.common.primitives.Ints;
import io.airlift.slice.DynamicSliceOutput;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import org.testng.annotations.Test;

import java.lang.reflect.Array;
import java.util.List;

import static com.facebook.presto.spi.type.BigintType.BIGINT;
import static com.facebook.presto.spi.type.VarbinaryType.VARBINARY;
Expand Down Expand Up @@ -62,6 +64,23 @@ protected static <T> void assertBlock(Block block, T[] expectedValues)
}
}

protected static void assertBlockFilteredPositions(Slice[] expectedValues, Block block, List<Integer> positions)
{
Block filteredBlock = block.copyPositions(positions);
Slice[] filteredExpectedValues = filter(expectedValues, positions);
assertEquals(filteredBlock.getPositionCount(), positions.size());
assertBlock(filteredBlock, filteredExpectedValues);
}

private static Slice[] filter(Slice[] expectedValues, List<Integer> positions)
{
Slice[] prunedExpectedValues = new Slice[positions.size()];
for (int i = 0; i < prunedExpectedValues.length; i++) {
prunedExpectedValues[i] = expectedValues[positions.get(i)];
}
return prunedExpectedValues;
}

private static <T> void assertBlockPositions(Block block, T[] expectedValues)
{
assertEquals(block.getPositionCount(), expectedValues.length);
Expand All @@ -80,6 +99,12 @@ private static <T> void assertBlockPosition(Block block, int position, T expecte
assertPositionValue(block.copyRegion(position, 1), 0, expectedValue);
assertPositionValue(block.copyRegion(0, position + 1), position, expectedValue);
assertPositionValue(block.copyRegion(position, block.getPositionCount() - position), 0, expectedValue);
try {
assertPositionValue(block.copyPositions(Ints.asList(position)), 0, expectedValue);
}
catch (UnsupportedOperationException e) {
// ignore blocks that do not support this operation
}
}

private static <T> void assertPositionValue(Block block, int position, T expectedValue)
Expand Down
Expand Up @@ -13,7 +13,9 @@
*/
package com.facebook.presto.block;

import com.facebook.presto.spi.block.BlockBuilder;
import com.facebook.presto.spi.block.FixedWidthBlockBuilder;
import com.google.common.primitives.Ints;
import io.airlift.slice.Slice;
import org.testng.annotations.Test;

Expand All @@ -30,7 +32,25 @@ public void test()
}
}

@Test
public void testCopyPositions()
throws Exception
{
for (int fixedSize = 0; fixedSize < 20; fixedSize++) {
Slice[] expectedValues = (Slice[]) alternatingNullValues(createExpectedValues(17, fixedSize));
BlockBuilder blockBuilder = createBlockBuilderWithValues(expectedValues, fixedSize);
assertBlockFilteredPositions(expectedValues, blockBuilder.build(), Ints.asList(0, 2, 4, 6, 7, 9, 10, 16));
}
}

private static void assertFixedWithValues(Slice[] expectedValues, int fixedSize)
{
BlockBuilder blockBuilder = createBlockBuilderWithValues(expectedValues, fixedSize);
assertBlock(blockBuilder, expectedValues);
assertBlock(blockBuilder.build(), expectedValues);
}

private static BlockBuilder createBlockBuilderWithValues(Slice[] expectedValues, int fixedSize)
{
FixedWidthBlockBuilder blockBuilder = new FixedWidthBlockBuilder(fixedSize, expectedValues.length);
for (Slice expectedValue : expectedValues) {
Expand All @@ -41,8 +61,7 @@ private static void assertFixedWithValues(Slice[] expectedValues, int fixedSize)
blockBuilder.writeBytes(expectedValue, 0, expectedValue.length()).closeEntry();
}
}
assertBlock(blockBuilder, expectedValues);
assertBlock(blockBuilder.build(), expectedValues);
return blockBuilder;
}

private static Slice[] createExpectedValues(int positionCount, int fixedSize)
Expand Down
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.block;

import com.facebook.presto.spi.block.SliceArrayBlock;
import com.google.common.primitives.Ints;
import io.airlift.slice.Slice;
import org.testng.annotations.Test;

Expand All @@ -28,6 +29,15 @@ public void test()
assertVariableWithValues((Slice[]) alternatingNullValues(expectedValues));
}

@Test
public void testCopyPositions()
throws Exception
{
Slice[] expectedValues = (Slice[]) alternatingNullValues(createExpectedValues(100));
SliceArrayBlock block = new SliceArrayBlock(expectedValues.length, expectedValues);
assertBlockFilteredPositions(expectedValues, block, Ints.asList(0, 2, 4, 6, 7, 9, 10, 16));
}

private static void assertVariableWithValues(Slice[] expectedValues)
{
SliceArrayBlock block = new SliceArrayBlock(expectedValues.length, expectedValues);
Expand Down
Expand Up @@ -13,8 +13,10 @@
*/
package com.facebook.presto.block;

import com.facebook.presto.spi.block.BlockBuilder;
import com.facebook.presto.spi.block.BlockBuilderStatus;
import com.facebook.presto.spi.block.VariableWidthBlockBuilder;
import com.google.common.primitives.Ints;
import io.airlift.slice.Slice;
import org.testng.annotations.Test;

Expand All @@ -29,7 +31,23 @@ public void test()
assertVariableWithValues((Slice[]) alternatingNullValues(expectedValues));
}

@Test
public void testCopyPositions()
throws Exception
{
Slice[] expectedValues = (Slice[]) alternatingNullValues(createExpectedValues(100));
BlockBuilder blockBuilder = createBlockBuilderWithValues(expectedValues);
assertBlockFilteredPositions(expectedValues, blockBuilder.build(), Ints.asList(0, 2, 4, 6, 7, 9, 10, 16));
}

private static void assertVariableWithValues(Slice[] expectedValues)
{
BlockBuilder blockBuilder = createBlockBuilderWithValues(expectedValues);
assertBlock(blockBuilder, expectedValues);
assertBlock(blockBuilder.build(), expectedValues);
}

private static BlockBuilder createBlockBuilderWithValues(Slice[] expectedValues)
{
VariableWidthBlockBuilder blockBuilder = new VariableWidthBlockBuilder(new BlockBuilderStatus());
for (Slice expectedValue : expectedValues) {
Expand All @@ -40,8 +58,7 @@ private static void assertVariableWithValues(Slice[] expectedValues)
blockBuilder.writeBytes(expectedValue, 0, expectedValue.length()).closeEntry();
}
}
assertBlock(blockBuilder, expectedValues);
assertBlock(blockBuilder.build(), expectedValues);
return blockBuilder;
}

private static Slice[] createExpectedValues(int positionCount)
Expand Down
Expand Up @@ -16,6 +16,8 @@
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;

import java.util.List;

public abstract class AbstractArrayBlock
implements Block
{
Expand All @@ -38,6 +40,12 @@ private int getOffset(int position)
return position == 0 ? 0 : getOffsets().getInt((position - 1) * 4) - getOffsetBase();
}

@Override
public Block copyPositions(List<Integer> positions)
{
throw new UnsupportedOperationException();
}

@Override
public Block getRegion(int position, int length)
{
Expand Down
Expand Up @@ -15,6 +15,8 @@

import io.airlift.slice.Slice;

import java.util.List;

public abstract class AbstractArrayElementBlock
implements Block
{
Expand Down Expand Up @@ -171,6 +173,12 @@ public BlockEncoding getEncoding()
throw new UnsupportedOperationException();
}

@Override
public Block copyPositions(List<Integer> positions)
{
throw new UnsupportedOperationException();
}

@Override
public Block getRegion(int position, int length)
{
Expand Down
Expand Up @@ -15,6 +15,8 @@

import io.airlift.slice.Slice;

import java.util.List;

public abstract class AbstractInterleavedBlock
implements Block
{
Expand Down Expand Up @@ -205,6 +207,12 @@ public Block getSingleValueBlock(int position)
return getBlock(blockIndex).getSingleValueBlock(positionInBlock);
}

@Override
public Block copyPositions(List<Integer> positions)
{
throw new UnsupportedOperationException();
}

@Override
public Block getRegion(int position, int length)
{
Expand Down
10 changes: 10 additions & 0 deletions presto-spi/src/main/java/com/facebook/presto/spi/block/Block.java
Expand Up @@ -15,6 +15,8 @@

import io.airlift.slice.Slice;

import java.util.List;

public interface Block
{
/**
Expand Down Expand Up @@ -139,6 +141,14 @@ default <T> T getObject(int position, Class<T> clazz)
*/
BlockEncoding getEncoding();

/**
* Returns a block containing the specified positions.
* All specified positions must be valid for this block.
*
* The returned block must be a compact representation of the original block.
*/
Block copyPositions(List<Integer> positions);

/**
* Returns a block starting at the specified position and extends for the
* specified length. The specified region must be entirely contained
Expand Down
@@ -0,0 +1,34 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.spi.block;

import java.util.List;
import java.util.Set;

import static java.util.stream.Collectors.toSet;

final class BlockValidationUtil
{
private BlockValidationUtil()
{
}

static void checkValidPositions(List<Integer> positions, int positionCount)
{
Set<Integer> invalidPositions = positions.stream().filter(position -> position >= positionCount).collect(toSet());
if (!invalidPositions.isEmpty()) {
throw new IllegalArgumentException("Invalid positions " + invalidPositions + " in block with " + positionCount + " positions");
}
}
}
Expand Up @@ -13,11 +13,16 @@
*/
package com.facebook.presto.spi.block;

import io.airlift.slice.DynamicSliceOutput;
import io.airlift.slice.Slice;
import io.airlift.slice.SliceOutput;
import io.airlift.slice.Slices;

import java.util.List;
import java.util.Objects;

import static com.facebook.presto.spi.block.BlockValidationUtil.checkValidPositions;

public class FixedWidthBlock
extends AbstractFixedWidthBlock
{
Expand Down Expand Up @@ -80,6 +85,21 @@ public int getRetainedSizeInBytes()
return (int) size;
}

@Override
public Block copyPositions(List<Integer> positions)
{
checkValidPositions(positions, positionCount);

SliceOutput newSlice = new DynamicSliceOutput(positions.size() * fixedSize);
SliceOutput newValueIsNull = new DynamicSliceOutput(positions.size());

for (int position : positions) {
newValueIsNull.appendByte(valueIsNull.getByte(position));
newSlice.appendBytes(getRawSlice().getBytes(position * fixedSize, fixedSize));
}
return new FixedWidthBlock(fixedSize, positions.size(), newSlice.slice(), newValueIsNull.slice());
}

@Override
public Block getRegion(int positionOffset, int length)
{
Expand Down

0 comments on commit a7ebcba

Please sign in to comment.