Skip to content

Commit

Permalink
Implement copyPositions for ArrayBlock and InterleavedBlock
Browse files Browse the repository at this point in the history
  • Loading branch information
haozhun committed Sep 25, 2015
1 parent 3a9042a commit e257774
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 23 deletions.
Expand Up @@ -29,6 +29,7 @@
import com.facebook.presto.testing.MaterializedResult;
import com.facebook.presto.testing.MaterializedRow;
import com.facebook.presto.type.ArrayType;
import com.facebook.presto.type.MapType;
import com.facebook.presto.type.RowType;
import com.facebook.presto.type.TypeRegistry;
import com.google.common.base.Joiner;
Expand Down Expand Up @@ -176,7 +177,7 @@ public abstract class AbstractTestHiveFileFormats
getStandardMapObjectInspector(javaShortObjectInspector, javaShortObjectInspector),
ImmutableMap.of((short) 2, (short) 2),
mapBlockOf(BIGINT, BIGINT, 2, 2)))
.add(new TestColumn("t_map_null_key", getStandardMapObjectInspector(javaIntObjectInspector, javaIntObjectInspector), mapWithNullKey(), mapBlockOf(BIGINT, BIGINT, 2, 3)))
.add(new TestColumn("t_map_null_key", getStandardMapObjectInspector(javaIntObjectInspector, javaIntObjectInspector), asMap(null, 0, 2, 3), mapBlockOf(BIGINT, BIGINT, 2, 3)))
.add(new TestColumn("t_map_int", getStandardMapObjectInspector(javaIntObjectInspector, javaIntObjectInspector), ImmutableMap.of(3, 3), mapBlockOf(BIGINT, BIGINT, 3, 3)))
.add(new TestColumn("t_map_bigint", getStandardMapObjectInspector(javaLongObjectInspector, javaLongObjectInspector), ImmutableMap.of(4L, 4L), mapBlockOf(BIGINT, BIGINT, 4L, 4L)))
.add(new TestColumn("t_map_float", getStandardMapObjectInspector(javaFloatObjectInspector, javaFloatObjectInspector), ImmutableMap.of(5.0f, 5.0f), mapBlockOf(DOUBLE, DOUBLE, 5.0f, 5.0f)))
Expand Down Expand Up @@ -228,6 +229,20 @@ public abstract class AbstractTestHiveFileFormats
mapBlockOf(VARCHAR, new ArrayType(new RowType(ImmutableList.of(BIGINT), Optional.empty())),
"test", arrayBlockOf(new RowType(ImmutableList.of(BIGINT), Optional.empty()), rowBlockOf(ImmutableList.of(BIGINT), 1)))
))
.add(new TestColumn("t_map_null_key_complex_value",
getStandardMapObjectInspector(
javaStringObjectInspector,
getStandardMapObjectInspector(javaLongObjectInspector, javaBooleanObjectInspector)
),
asMap(null, ImmutableMap.of(15L, true), "k", ImmutableMap.of(16L, false)),
mapBlockOf(VARCHAR, new MapType(BIGINT, BOOLEAN), "k", mapBlockOf(BIGINT, BOOLEAN, 16L, false))))
.add(new TestColumn("t_map_null_key_complex_key_value",
getStandardMapObjectInspector(
getStandardListObjectInspector(javaStringObjectInspector),
getStandardMapObjectInspector(javaLongObjectInspector, javaBooleanObjectInspector)
),
asMap(null, ImmutableMap.of(15L, true), ImmutableList.of("k", "ka"), ImmutableMap.of(16L, false)),
mapBlockOf(new ArrayType(VARCHAR), new MapType(BIGINT, BOOLEAN), arrayBlockOf(VARCHAR, "k", "ka"), mapBlockOf(BIGINT, BOOLEAN, 16L, false))))
.add(new TestColumn("t_struct_nested", getStandardStructObjectInspector(ImmutableList.of("struct_field"),
ImmutableList.of(getStandardListObjectInspector(javaStringObjectInspector))), ImmutableList.of(ImmutableList.of("1", "2", "3")) , rowBlockOf(ImmutableList.of(new ArrayType(VARCHAR)), arrayBlockOf(VARCHAR, "1", "2", "3"))))
.add(new TestColumn("t_struct_null", getStandardStructObjectInspector(ImmutableList.of("struct_field", "struct_field2"),
Expand All @@ -242,6 +257,14 @@ private static Map<Integer, Integer> mapWithNullKey()
return map;
}

private static <K, V> Map<K, V> asMap(K k1, V v1, K k2, V v2)
{
Map<K, V> map = new HashMap<>();
map.put(k1, v1);
map.put(k2, v2);
return map;
}

protected List<HiveColumnHandle> getColumnHandles(List<TestColumn> testColumns)
{
List<HiveColumnHandle> columns = new ArrayList<>();
Expand Down
Expand Up @@ -112,14 +112,9 @@ public void setUp()
public void testRCText()
throws Exception
{
List<TestColumn> testColumns = ImmutableList.copyOf(filter(TEST_COLUMNS, new Predicate<TestColumn>()
{
@Override
public boolean apply(TestColumn testColumn)
{
// TODO: This is a bug in the RC text reader
return !testColumn.getName().equals("t_struct_null");
}
List<TestColumn> testColumns = ImmutableList.copyOf(filter(TEST_COLUMNS, testColumn -> {
return !testColumn.getName().equals("t_struct_null") // TODO: This is a bug in the RC text reader
&& !testColumn.getName().equals("t_map_null_key_complex_key_value"); // RC file does not support complex type as key of a map
}));

HiveOutputFormat<?, ?> outputFormat = new RCFileOutputFormat();
Expand Down Expand Up @@ -162,15 +157,20 @@ public void testRcTextPageSource()
public void testRCBinary()
throws Exception
{
List<TestColumn> testColumns = ImmutableList.copyOf(filter(TEST_COLUMNS, testColumn -> {
// RC file does not support complex type as key of a map
return !testColumn.getName().equals("t_map_null_key_complex_key_value");
}));

HiveOutputFormat<?, ?> outputFormat = new RCFileOutputFormat();
InputFormat<?, ?> inputFormat = new RCFileInputFormat<>();
@SuppressWarnings("deprecation")
SerDe serde = new LazyBinaryColumnarSerDe();
File file = File.createTempFile("presto_test", "rc-binary");
try {
FileSplit split = createTestFile(file.getAbsolutePath(), outputFormat, serde, null, TEST_COLUMNS, NUM_ROWS);
testCursorProvider(new ColumnarBinaryHiveRecordCursorProvider(), split, inputFormat, serde, TEST_COLUMNS, NUM_ROWS);
testCursorProvider(new GenericHiveRecordCursorProvider(), split, inputFormat, serde, TEST_COLUMNS, NUM_ROWS);
FileSplit split = createTestFile(file.getAbsolutePath(), outputFormat, serde, null, testColumns, NUM_ROWS);
testCursorProvider(new ColumnarBinaryHiveRecordCursorProvider(), split, inputFormat, serde, testColumns, NUM_ROWS);
testCursorProvider(new GenericHiveRecordCursorProvider(), split, inputFormat, serde, testColumns, NUM_ROWS);
}
finally {
//noinspection ResultOfMethodCallIgnored
Expand Down Expand Up @@ -324,7 +324,7 @@ private static List<TestColumn> getTestColumnsSupportedByParquet()
// TODO: empty arrays or maps with null keys don't seem to work
// Parquet does not support DATE
return TEST_COLUMNS.stream()
.filter(column -> !ImmutableSet.of("t_complex", "t_array_empty", "t_map_null_key")
.filter(column -> !ImmutableSet.of("t_array_empty", "t_map_null_key", "t_map_null_key_complex_value", "t_map_null_key_complex_key_value")
.contains(column.getName()))
.filter(column -> !hasType(column.getObjectInspector(), PrimitiveCategory.DATE))
.collect(toList());
Expand Down
Expand Up @@ -100,12 +100,7 @@ protected <T> void assertBlockPosition(Block block, int position, T expectedValu
assertPositionValue(block.copyRegion(position, 1), 0, expectedValue);
assertPositionValue(block.copyRegion(0, position + 1), position, expectedValue);
assertPositionValue(block.copyRegion(position, block.getPositionCount() - position), 0, expectedValue);
try {
assertPositionValue(block.copyPositions(Ints.asList(position)), 0, expectedValue);
}
catch (UnsupportedOperationException e) {
// ignore blocks that do not support this operation
}
assertPositionValue(block.copyPositions(Ints.asList(position)), 0, expectedValue);
}

protected static <T> void assertPositionValue(Block block, int position, T expectedValue)
Expand Down
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.spi.block.ArrayBlockBuilder;
import com.facebook.presto.spi.block.BlockBuilder;
import com.facebook.presto.spi.block.BlockBuilderStatus;
import com.google.common.primitives.Ints;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import org.testng.annotations.Test;
Expand All @@ -41,10 +42,14 @@ public void testWithFixedWidthBlock()
BlockBuilder blockBuilder = createBlockBuilderWithValues(expectedValues);
assertBlock(blockBuilder, expectedValues);
assertBlock(blockBuilder.build(), expectedValues);
assertBlockFilteredPositions(expectedValues, blockBuilder.build(), Ints.asList(0, 1, 3, 4, 7));
assertBlockFilteredPositions(expectedValues, blockBuilder.build(), Ints.asList(2, 3, 5, 6));
long[][] expectedValuesWithNull = (long[][]) alternatingNullValues(expectedValues);
BlockBuilder blockBuilderWithNull = createBlockBuilderWithValues(expectedValuesWithNull);
assertBlock(blockBuilderWithNull, expectedValuesWithNull);
assertBlock(blockBuilderWithNull.build(), expectedValuesWithNull);
assertBlockFilteredPositions(expectedValuesWithNull, blockBuilderWithNull.build(), Ints.asList(0, 1, 5, 6, 7, 10, 11, 12, 15));
assertBlockFilteredPositions(expectedValuesWithNull, blockBuilderWithNull.build(), Ints.asList(2, 3, 4, 9, 13, 14));
}

@Test
Expand All @@ -60,10 +65,14 @@ public void testWithVariableWidthBlock()
BlockBuilder blockBuilder = createBlockBuilderWithValues(expectedValues);
assertBlock(blockBuilder, expectedValues);
assertBlock(blockBuilder.build(), expectedValues);
assertBlockFilteredPositions(expectedValues, blockBuilder.build(), Ints.asList(0, 1, 3, 4, 7));
assertBlockFilteredPositions(expectedValues, blockBuilder.build(), Ints.asList(2, 3, 5, 6));
Slice[][] expectedValuesWithNull = (Slice[][]) alternatingNullValues(expectedValues);
BlockBuilder blockBuilderWithNull = createBlockBuilderWithValues(expectedValuesWithNull);
assertBlock(blockBuilderWithNull, expectedValuesWithNull);
assertBlock(blockBuilderWithNull.build(), expectedValuesWithNull);
assertBlockFilteredPositions(expectedValuesWithNull, blockBuilderWithNull.build(), Ints.asList(0, 1, 5, 6, 7, 10, 11, 12, 15));
assertBlockFilteredPositions(expectedValuesWithNull, blockBuilderWithNull.build(), Ints.asList(2, 3, 4, 9, 13, 14));
}

@Test
Expand All @@ -84,10 +93,14 @@ public void testWithArrayBlock()
BlockBuilder blockBuilder = createBlockBuilderWithValues(expectedValues);
assertBlock(blockBuilder, expectedValues);
assertBlock(blockBuilder.build(), expectedValues);
assertBlockFilteredPositions(expectedValues, blockBuilder.build(), Ints.asList(0, 1, 3, 4, 7));
assertBlockFilteredPositions(expectedValues, blockBuilder.build(), Ints.asList(2, 3, 5, 6));
long[][][] expectedValuesWithNull = (long[][][]) alternatingNullValues(expectedValues);
BlockBuilder blockBuilderWithNull = createBlockBuilderWithValues(expectedValuesWithNull);
assertBlock(blockBuilderWithNull, expectedValuesWithNull);
assertBlock(blockBuilderWithNull.build(), expectedValuesWithNull);
assertBlockFilteredPositions(expectedValuesWithNull, blockBuilderWithNull.build(), Ints.asList(0, 1, 5, 6, 7, 10, 11, 12, 15));
assertBlockFilteredPositions(expectedValuesWithNull, blockBuilderWithNull.build(), Ints.asList(2, 3, 4, 9, 13, 14));
}

private BlockBuilder createBlockBuilderWithValues(long[][][] expectedValues)
Expand Down
Expand Up @@ -13,13 +13,19 @@
*/
package com.facebook.presto.block;

import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.block.BlockBuilderStatus;
import com.facebook.presto.spi.block.InterleavedBlockBuilder;
import com.facebook.presto.spi.type.Type;
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Ints;
import io.airlift.slice.Slice;
import org.testng.annotations.Test;

import java.lang.reflect.Array;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static com.facebook.presto.spi.type.BigintType.BIGINT;
import static com.facebook.presto.spi.type.VarcharType.VARCHAR;
Expand All @@ -40,9 +46,20 @@ public void test()
assertValues((Slice[]) alternatingNullValues(expectedValues));
}

private void assertValues(Slice[] expectedValues)
@Test
public void testCopyPositions()
{
Slice[] expectedValues = createExpectedValues();
InterleavedBlockBuilder blockBuilder = createBlockBuilderWithValues(expectedValues, ImmutableList.of(VARCHAR, BIGINT));

assertBlockFilteredPositions(expectedValues, blockBuilder, Ints.asList(0, 1, 4, 5, 6, 7, 14, 15));
assertBlockFilteredPositions(expectedValues, blockBuilder.build(), Ints.asList(0, 1, 4, 5, 6, 7, 14, 15));
assertBlockFilteredPositions(expectedValues, blockBuilder.build(), Ints.asList(2, 3, 4, 5, 8, 9, 12, 13));
}

private static InterleavedBlockBuilder createBlockBuilderWithValues(Slice[] expectedValues, List<Type> types)
{
InterleavedBlockBuilder blockBuilder = new InterleavedBlockBuilder(ImmutableList.of(VARCHAR, BIGINT), new BlockBuilderStatus(), 10);
InterleavedBlockBuilder blockBuilder = new InterleavedBlockBuilder(types, new BlockBuilderStatus(), expectedValues.length);

for (Slice expectedValue : expectedValues) {
if (expectedValue == null) {
Expand All @@ -52,6 +69,14 @@ private void assertValues(Slice[] expectedValues)
blockBuilder.writeBytes(expectedValue, 0, expectedValue.length()).closeEntry();
}
}

return blockBuilder;
}

private void assertValues(Slice[] expectedValues)
{
InterleavedBlockBuilder blockBuilder = createBlockBuilderWithValues(expectedValues, ImmutableList.of(VARCHAR, BIGINT));

assertBlock(blockBuilder, expectedValues);
assertBlock(blockBuilder.build(), expectedValues);
}
Expand Down Expand Up @@ -79,4 +104,19 @@ protected static Object[] alternatingNullValues(Object[] objects)
objectsWithNulls[objectsWithNulls.length - 1] = null;
return objectsWithNulls;
}

@Override
protected <T> void assertBlockPosition(Block block, int position, T expectedValue)
{
assertPositionValue(block, position, expectedValue);
assertPositionValue(block.getSingleValueBlock(position), 0, expectedValue);
assertPositionValue(block.getRegion(position, 1), 0, expectedValue);
assertPositionValue(block.getRegion(0, position + 1), position, expectedValue);
assertPositionValue(block.getRegion(position, block.getPositionCount() - position), 0, expectedValue);
assertPositionValue(block.copyRegion(position, 1), 0, expectedValue);
assertPositionValue(block.copyRegion(0, position + 1), position, expectedValue);
assertPositionValue(block.copyRegion(position, block.getPositionCount() - position), 0, expectedValue);
int positionFloored = position / COLUMN_COUNT * COLUMN_COUNT;
assertPositionValue(block.copyPositions(IntStream.range(positionFloored, positionFloored + COLUMN_COUNT).boxed().collect(Collectors.toList())), position % COLUMN_COUNT, expectedValue);
}
}
Expand Up @@ -13,9 +13,11 @@
*/
package com.facebook.presto.spi.block;

import io.airlift.slice.DynamicSliceOutput;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;

import java.util.ArrayList;
import java.util.List;

public abstract class AbstractArrayBlock
Expand Down Expand Up @@ -43,7 +45,28 @@ private int getOffset(int position)
@Override
public Block copyPositions(List<Integer> positions)
{
throw new UnsupportedOperationException();
DynamicSliceOutput newOffsets = new DynamicSliceOutput(positions.size() * Integer.BYTES);
DynamicSliceOutput newValueIsNull = new DynamicSliceOutput(positions.size());
List<Integer> valuesPositions = new ArrayList<>();
int countNewOffset = 0;
for (int position : positions) {
if (isNull(position)) {
newValueIsNull.appendByte(1);
newOffsets.appendInt(countNewOffset);
}
else {
newValueIsNull.appendByte(0);
int positionStartOffset = getOffset(position);
int positionEndOffset = getOffset(position + 1);
countNewOffset += positionEndOffset - positionStartOffset;
newOffsets.appendInt(countNewOffset);
for (int j = positionStartOffset; j < positionEndOffset; j++) {
valuesPositions.add(j);
}
}
}
Block newValues = getValues().copyPositions(valuesPositions);
return new ArrayBlock(newValues, newOffsets.slice(), 0, newValueIsNull.slice());
}

@Override
Expand Down
Expand Up @@ -15,6 +15,7 @@

import io.airlift.slice.Slice;

import java.util.ArrayList;
import java.util.List;

public abstract class AbstractInterleavedBlock
Expand Down Expand Up @@ -210,7 +211,28 @@ public Block getSingleValueBlock(int position)
@Override
public Block copyPositions(List<Integer> positions)
{
throw new UnsupportedOperationException();
if (positions.size() % columns != 0) {
throw new IllegalArgumentException("Positions.size (" + positions.size() + ") is not evenly dividable by columns (" + columns + ")");
}
int positionsPerColumn = positions.size() / columns;

List<List<Integer>> valuePositions = new ArrayList<>(columns);
for (int i = 0; i < columns; i++) {
valuePositions.add(new ArrayList<>(positionsPerColumn));
}
int ordinal = 0;
for (int position : positions) {
if (ordinal % columns != position % columns) {
throw new IllegalArgumentException("Position (" + position + ") is not congruent to ordinal (" + ordinal + ") modulo columns (" + columns + ")");
}
valuePositions.get(position % columns).add(position / columns);
ordinal++;
}
Block[] blocks = new Block[columns];
for (int i = 0; i < columns; i++) {
blocks[i] = getBlock(i).copyPositions(valuePositions.get(i));
}
return new InterleavedBlock(blocks);
}

@Override
Expand Down

0 comments on commit e257774

Please sign in to comment.