Skip to content

Commit

Permalink
Implement Unnest Operator with Dictionary Blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
phd3 authored and dain committed Jun 25, 2019
1 parent e802542 commit f5b1534
Show file tree
Hide file tree
Showing 15 changed files with 1,901 additions and 210 deletions.
Expand Up @@ -13,80 +13,108 @@
*/
package io.prestosql.operator.unnest;

import com.google.common.collect.ImmutableList;
import io.prestosql.spi.PageBuilder;
import com.google.common.collect.Iterables;
import io.prestosql.spi.block.Block;
import io.prestosql.spi.block.BlockBuilder;
import io.prestosql.spi.block.ColumnarArray;
import io.prestosql.spi.block.ColumnarRow;
import io.prestosql.spi.type.RowType;
import io.prestosql.spi.type.Type;

import java.util.List;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static io.prestosql.spi.block.ColumnarArray.toColumnarArray;
import static io.prestosql.spi.block.ColumnarRow.toColumnarRow;
import static java.util.Objects.requireNonNull;

public class ArrayOfRowsUnnester
implements Unnester
/**
* Unnester for a nested column with array type, only when array elements are of {@code RowType} type.
* It maintains {@link ColumnarArray} and {@link ColumnarRow} objects to get underlying elements. The two
* different columnar structures are required because there are two layers of translation involved. One
* from {@code ArrayBlock} to {@code RowBlock}, and then from {@code RowBlock} to individual element blocks.
*
* All protected methods implemented here assume that they are invoked when {@code columnarArray} and
* {@code columnarRow} are non-null.
*/
class ArrayOfRowsUnnester
extends Unnester
{
private final List<Type> fieldTypes;
private ColumnarArray columnarArray;
private ColumnarRow columnarRow;
private int position;
private int nonNullPosition;
private int positionCount;
private final int fieldCount;

public ArrayOfRowsUnnester(Type elementType)
// Keeping track of null row element count is required. This count needs to be deducted
// when translating row block indexes to element block indexes.
private int nullRowsEncountered;

public ArrayOfRowsUnnester(RowType elementType)
{
requireNonNull(elementType, "elementType is null");
checkArgument(elementType instanceof RowType, "elementType is not of RowType");
this.fieldTypes = ImmutableList.copyOf(elementType.getTypeParameters());
super(Iterables.toArray(requireNonNull(elementType, "elementType is null").getTypeParameters(), Type.class));
this.fieldCount = elementType.getTypeParameters().size();
this.nullRowsEncountered = 0;
}

@Override
public int getChannelCount()
{
return fieldTypes.size();
return fieldCount;
}

@Override
public void appendNext(PageBuilder pageBuilder, int outputChannelOffset)
int getInputEntryCount()
{
checkState(columnarRow != null, "columnarRow is null");
if (columnarArray == null) {
return 0;
}
return columnarArray.getPositionCount();
}

for (int i = 0; i < fieldTypes.size(); i++) {
BlockBuilder blockBuilder = pageBuilder.getBlockBuilder(outputChannelOffset + i);
if (columnarRow.isNull(position)) {
blockBuilder.appendNull();
@Override
protected void resetColumnarStructure(Block block)
{
columnarArray = toColumnarArray(block);
columnarRow = toColumnarRow(columnarArray.getElementsBlock());
nullRowsEncountered = 0;
}

@Override
public void processCurrentPosition(int requireCount)
{
// Translate to row block index
int rowBlockIndex = columnarArray.getOffset(getCurrentPosition());

// Unnest current entry
for (int i = 0; i < getCurrentUnnestedLength(); i++) {
if (columnarRow.isNull(rowBlockIndex + i)) {
// Nulls have to be appended when Row element itself is null
for (int field = 0; field < fieldCount; field++) {
getBlockBuilder(field).appendNull();
}
nullRowsEncountered++;
}
else {
fieldTypes.get(i).appendTo(columnarRow.getField(i), nonNullPosition, blockBuilder);
for (int field = 0; field < fieldCount; field++) {
getBlockBuilder(field).appendElement(rowBlockIndex + i - nullRowsEncountered);
}
}
}
if (!columnarRow.isNull(position)) {
nonNullPosition++;

// Append nulls if more output entries are needed
for (int i = 0; i < requireCount - getCurrentUnnestedLength(); i++) {
for (int field = 0; field < fieldCount; field++) {
getBlockBuilder(field).appendNull();
}
}
position++;
}

@Override
public boolean hasNext()
protected Block getElementsBlock(int channel)
{
return position < positionCount;
checkState(channel >= 0 && channel < fieldCount, "Invalid channel number");
return columnarRow.getField(channel);
}

@Override
public void setBlock(Block block)
protected int getElementsLength(int index)
{
this.position = 0;
this.nonNullPosition = 0;
if (block == null) {
this.columnarRow = null;
this.positionCount = 0;
}
else {
this.columnarRow = ColumnarRow.toColumnarRow(block);
this.positionCount = block.getPositionCount();
}
return columnarArray.getLength(index);
}
}
Expand Up @@ -13,56 +13,74 @@
*/
package io.prestosql.operator.unnest;

import io.prestosql.spi.PageBuilder;
import io.prestosql.spi.block.Block;
import io.prestosql.spi.block.BlockBuilder;
import io.prestosql.spi.block.ColumnarArray;
import io.prestosql.spi.type.Type;

import javax.annotation.Nullable;

import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;
import static io.prestosql.spi.block.ColumnarArray.toColumnarArray;

public class ArrayUnnester
implements Unnester
/**
* Unnester for a nested column with array type, only when array elements are NOT of type {@code RowType}.
* Maintains a {@link ColumnarArray} object to get underlying elements block from the array block.
*
* All protected methods implemented here assume that they are being invoked when {@code columnarArray} is non-null.
*/
class ArrayUnnester
extends Unnester
{
private final Type elementType;
private Block arrayBlock;

private int position;
private int positionCount;
private ColumnarArray columnarArray;

public ArrayUnnester(Type elementType)
{
this.elementType = requireNonNull(elementType, "elementType is null");
super(elementType);
}

@Override
public boolean hasNext()
public int getChannelCount()
{
return position < positionCount;
return 1;
}

@Override
public final int getChannelCount()
protected int getInputEntryCount()
{
return 1;
if (columnarArray == null) {
return 0;
}
return columnarArray.getPositionCount();
}

@Override
public final void appendNext(PageBuilder pageBuilder, int outputChannelOffset)
protected void resetColumnarStructure(Block block)
{
checkState(arrayBlock != null, "arrayBlock is null");
BlockBuilder blockBuilder = pageBuilder.getBlockBuilder(outputChannelOffset);
elementType.appendTo(arrayBlock, position, blockBuilder);
position++;
this.columnarArray = toColumnarArray(block);
}

@Override
protected Block getElementsBlock(int channel)
{
checkState(channel == 0, "index is not 0");
return columnarArray.getElementsBlock();
}

@Override
protected void processCurrentPosition(int requiredOutputCount)
{
// Translate indices
int startElementIndex = columnarArray.getOffset(getCurrentPosition());
int length = columnarArray.getLength(getCurrentPosition());

// Append elements and nulls
getBlockBuilder(0).appendRange(startElementIndex, length);
for (int i = 0; i < requiredOutputCount - length; i++) {
getBlockBuilder(0).appendNull();
}
}

@Override
public void setBlock(@Nullable Block arrayBlock)
protected int getElementsLength(int index)
{
this.arrayBlock = arrayBlock;
this.position = 0;
this.positionCount = arrayBlock == null ? 0 : arrayBlock.getPositionCount();
return columnarArray.getLength(index);
}
}
Expand Up @@ -13,59 +13,83 @@
*/
package io.prestosql.operator.unnest;

import io.prestosql.spi.PageBuilder;
import io.prestosql.spi.block.Block;
import io.prestosql.spi.block.BlockBuilder;
import io.prestosql.spi.block.ColumnarMap;
import io.prestosql.spi.type.Type;

import javax.annotation.Nullable;

import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;
import static io.prestosql.spi.block.ColumnarMap.toColumnarMap;

public class MapUnnester
implements Unnester
/**
* Unnester for a nested column with map type.
* Maintains a {@link ColumnarMap} object to get underlying keys and values block from the map block.
*
* All protected methods implemented here assume that they are being invoked when {@code columnarMap} is non-null.
*/
class MapUnnester
extends Unnester
{
private final Type keyType;
private final Type valueType;
private Block block;

private int position;
private int positionCount;
private ColumnarMap columnarMap;

public MapUnnester(Type keyType, Type valueType)
{
this.keyType = requireNonNull(keyType, "keyType is null");
this.valueType = requireNonNull(valueType, "valueType is null");
super(keyType, valueType);
}

@Override
public boolean hasNext()
protected void processCurrentPosition(int requiredOutputCount)
{
return position < positionCount;
// Translate indices
int mapLength = columnarMap.getEntryCount(getCurrentPosition());
int startingOffset = columnarMap.getOffset(getCurrentPosition());

// Append elements and nulls for keys Block
getBlockBuilder(0).appendRange(startingOffset, mapLength);
for (int i = 0; i < requiredOutputCount - mapLength; i++) {
getBlockBuilder(0).appendNull();
}

// Append elements and nulls for values Block
getBlockBuilder(1).appendRange(startingOffset, mapLength);
for (int i = 0; i < requiredOutputCount - mapLength; i++) {
getBlockBuilder(1).appendNull();
}
}

@Override
public final int getChannelCount()
public int getChannelCount()
{
return 2;
}

@Override
public final void appendNext(PageBuilder pageBuilder, int outputChannelOffset)
public int getInputEntryCount()
{
if (columnarMap == null) {
return 0;
}
return columnarMap.getPositionCount();
}

@Override
protected void resetColumnarStructure(Block block)
{
this.columnarMap = toColumnarMap(block);
}

@Override
protected Block getElementsBlock(int channel)
{
checkState(block != null, "block is null");
BlockBuilder keyBlockBuilder = pageBuilder.getBlockBuilder(outputChannelOffset);
BlockBuilder valueBlockBuilder = pageBuilder.getBlockBuilder(outputChannelOffset + 1);
keyType.appendTo(block, position++, keyBlockBuilder);
valueType.appendTo(block, position++, valueBlockBuilder);
checkState(channel == 0 || channel == 1, "index is not 0 or 1");
if (channel == 0) {
return columnarMap.getKeysBlock();
}
return columnarMap.getValuesBlock();
}

@Override
public void setBlock(@Nullable Block mapBlock)
protected int getElementsLength(int index)
{
this.block = mapBlock;
this.position = 0;
this.positionCount = mapBlock == null ? 0 : mapBlock.getPositionCount();
return columnarMap.getEntryCount(index);
}
}

0 comments on commit f5b1534

Please sign in to comment.