Skip to content

Commit

Permalink
Unnest array of row to multiple columns
Browse files Browse the repository at this point in the history
  • Loading branch information
rongrong committed Jul 24, 2018
1 parent 0772921 commit f3f7aa3
Show file tree
Hide file tree
Showing 10 changed files with 272 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public final class SystemSessionProperties
public static final String USE_MARK_DISTINCT = "use_mark_distinct";
public static final String PREFER_PARTITIAL_AGGREGATION = "prefer_partial_aggregation";
public static final String MAX_GROUPING_SETS = "max_grouping_sets";
public static final String LEGACY_UNNEST = "legacy_unnest";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -478,7 +479,12 @@ public SystemSessionProperties(
MAX_GROUPING_SETS,
"Maximum number of grouping sets in a GROUP BY",
featuresConfig.getMaxGroupingSets(),
true));
true),
booleanProperty(
LEGACY_UNNEST,
"Using legacy unnest semantic, where unnest(array(row)) will create one column of type row",
featuresConfig.isLegacyUnnestArrayRows(),
false));
}

public List<PropertyMetadata<?>> getSessionProperties()
Expand Down Expand Up @@ -780,6 +786,11 @@ public static int getMaxGroupingSets(Session session)
return session.getSystemProperty(MAX_GROUPING_SETS, Integer.class);
}

public static boolean isLegacyUnnest(Session session)
{
return session.getSystemProperty(LEGACY_UNNEST, Boolean.class);
}

private static int validateValueIsPowerOfTwo(Object value, String property)
{
int intValue = ((Number) requireNonNull(value, "value is null")).intValue();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.operator;

import com.facebook.presto.spi.PageBuilder;
import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.block.BlockBuilder;
import com.facebook.presto.spi.block.ColumnarRow;
import com.facebook.presto.spi.type.RowType;
import com.facebook.presto.spi.type.Type;
import com.google.common.collect.ImmutableList;

import java.util.List;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;

public class ArrayOfRowsUnnester
implements Unnester
{
private final List<Type> fieldTypes;
private ColumnarRow columnarRow;
private int position;
private int nonNullPosition;
private int positionCount;

public ArrayOfRowsUnnester(Type elementType)
{
requireNonNull(elementType, "elementType is null");
checkArgument(elementType instanceof RowType, "elementType is not of RowType");
this.fieldTypes = ImmutableList.copyOf(elementType.getTypeParameters());
}

@Override
public int getChannelCount()
{
return fieldTypes.size();
}

@Override
public void appendNext(PageBuilder pageBuilder, int outputChannelOffset)
{
checkState(columnarRow != null, "columnarRow is null");

for (int i = 0; i < fieldTypes.size(); i++) {
BlockBuilder blockBuilder = pageBuilder.getBlockBuilder(outputChannelOffset + i);
if (columnarRow.isNull(position)) {
blockBuilder.appendNull();
}
else {
fieldTypes.get(i).appendTo(columnarRow.getField(i), nonNullPosition, blockBuilder);
}
}
if (!columnarRow.isNull(position)) {
nonNullPosition++;
}
position++;
}

@Override
public boolean hasNext()
{
return position < positionCount;
}

@Override
public void setBlock(Block block)
{
this.position = 0;
this.nonNullPosition = 0;
if (block == null) {
this.columnarRow = null;
this.positionCount = 0;
}
else {
this.columnarRow = ColumnarRow.toColumnarRow(block);
this.positionCount = block.getPositionCount();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
*/
package com.facebook.presto.operator;

import com.facebook.presto.SystemSessionProperties;
import com.facebook.presto.spi.Page;
import com.facebook.presto.spi.PageBuilder;
import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.type.ArrayType;
import com.facebook.presto.spi.type.MapType;
import com.facebook.presto.spi.type.RowType;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.sql.planner.plan.PlanNodeId;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -63,7 +65,7 @@ public Operator createOperator(DriverContext driverContext)
{
checkState(!closed, "Factory is already closed");
OperatorContext operatorContext = driverContext.addOperatorContext(operatorId, planNodeId, UnnestOperator.class.getSimpleName());
return new UnnestOperator(operatorContext, replicateChannels, replicateTypes, unnestChannels, unnestTypes, withOrdinality);
return new UnnestOperator(operatorContext, replicateChannels, replicateTypes, unnestChannels, unnestTypes, withOrdinality, SystemSessionProperties.isLegacyUnnest(driverContext.getSession()));
}

@Override
Expand Down Expand Up @@ -92,7 +94,7 @@ public OperatorFactory duplicate()
private int currentPosition;
private int ordinalityCount;

public UnnestOperator(OperatorContext operatorContext, List<Integer> replicateChannels, List<Type> replicateTypes, List<Integer> unnestChannels, List<Type> unnestTypes, boolean withOrdinality)
public UnnestOperator(OperatorContext operatorContext, List<Integer> replicateChannels, List<Type> replicateTypes, List<Integer> unnestChannels, List<Type> unnestTypes, boolean withOrdinality, boolean isLegacyUnnest)
{
this.operatorContext = requireNonNull(operatorContext, "operatorContext is null");
this.replicateChannels = ImmutableList.copyOf(requireNonNull(replicateChannels, "replicateChannels is null"));
Expand All @@ -104,15 +106,21 @@ public UnnestOperator(OperatorContext operatorContext, List<Integer> replicateCh
checkArgument(unnestChannels.size() == unnestTypes.size(), "unnest channels or types has wrong size");
ImmutableList.Builder<Type> outputTypesBuilder = ImmutableList.<Type>builder()
.addAll(replicateTypes)
.addAll(getUnnestedTypes(unnestTypes));
.addAll(getUnnestedTypes(unnestTypes, isLegacyUnnest));
if (withOrdinality) {
outputTypesBuilder.add(BIGINT);
}
this.pageBuilder = new PageBuilder(outputTypesBuilder.build());
this.unnesters = new ArrayList<>(unnestTypes.size());
for (Type type : unnestTypes) {
if (type instanceof ArrayType) {
unnesters.add(new ArrayUnnester(((ArrayType) type).getElementType()));
Type elementType = ((ArrayType) type).getElementType();
if (!isLegacyUnnest && elementType instanceof RowType) {
unnesters.add(new ArrayOfRowsUnnester(elementType));
}
else {
unnesters.add(new ArrayUnnester(elementType));
}
}
else if (type instanceof MapType) {
MapType mapType = (MapType) type;
Expand All @@ -124,12 +132,17 @@ else if (type instanceof MapType) {
}
}

private static List<Type> getUnnestedTypes(List<Type> types)
private static List<Type> getUnnestedTypes(List<Type> types, boolean isLegacyUnnest)
{
ImmutableList.Builder<Type> builder = ImmutableList.builder();
for (Type type : types) {
checkArgument(type instanceof ArrayType || type instanceof MapType, "Can only unnest map and array types");
builder.addAll(type.getTypeParameters());
if (type instanceof ArrayType && !isLegacyUnnest && ((ArrayType) type).getElementType() instanceof RowType) {
builder.addAll(((ArrayType) type).getElementType().getTypeParameters());
}
else {
builder.addAll(type.getTypeParameters());
}
}
return builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ public class FeaturesConfig
private DataSize filterAndProjectMinOutputPageSize = new DataSize(500, KILOBYTE);
private int filterAndProjectMinOutputPageRowCount = 256;
private int maxGroupingSets = 2048;
private boolean legacyUnnestArrayRows;

public enum JoinReorderingStrategy
{
Expand Down Expand Up @@ -846,4 +847,16 @@ public FeaturesConfig setMaxGroupingSets(int maxGroupingSets)
this.maxGroupingSets = maxGroupingSets;
return this;
}

public boolean isLegacyUnnestArrayRows()
{
return legacyUnnestArrayRows;
}

@Config("deprecated.legacy-unnest-array-rows")
public FeaturesConfig setLegacyUnnestArrayRows(boolean legacyUnnestArrayRows)
{
this.legacyUnnestArrayRows = legacyUnnestArrayRows;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.sql.analyzer;

import com.facebook.presto.Session;
import com.facebook.presto.SystemSessionProperties;
import com.facebook.presto.metadata.FunctionKind;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.metadata.OperatorNotFoundException;
Expand Down Expand Up @@ -697,7 +698,15 @@ protected Scope visitUnnest(Unnest node, Optional<Scope> scope)
ExpressionAnalysis expressionAnalysis = analyzeExpression(expression, createScope(scope));
Type expressionType = expressionAnalysis.getType(expression);
if (expressionType instanceof ArrayType) {
outputFields.add(Field.newUnqualified(Optional.empty(), ((ArrayType) expressionType).getElementType()));
Type elementType = ((ArrayType) expressionType).getElementType();
if (!SystemSessionProperties.isLegacyUnnest(session) && elementType instanceof RowType) {
elementType.getTypeParameters().stream()
.map(type -> Field.newUnqualified(Optional.empty(), type))
.forEach(outputFields::add);
}
else {
outputFields.add(Field.newUnqualified(Optional.empty(), elementType));
}
}
else if (expressionType instanceof MapType) {
outputFields.add(Field.newUnqualified(Optional.empty(), ((MapType) expressionType).getKeyType()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
package com.facebook.presto.sql.planner;

import com.facebook.presto.Session;
import com.facebook.presto.SystemSessionProperties;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.metadata.TableHandle;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.predicate.TupleDomain;
import com.facebook.presto.spi.type.ArrayType;
import com.facebook.presto.spi.type.MapType;
import com.facebook.presto.spi.type.RowType;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.sql.ExpressionUtils;
import com.facebook.presto.sql.analyzer.Analysis;
Expand Down Expand Up @@ -549,7 +551,17 @@ private RelationPlan planCrossJoinUnnest(RelationPlan leftPlan, Join joinNode, U
Type type = analysis.getType(expression);
Symbol inputSymbol = translations.get(expression);
if (type instanceof ArrayType) {
unnestSymbols.put(inputSymbol, ImmutableList.of(unnestedSymbolsIterator.next()));
Type elementType = ((ArrayType) type).getElementType();
if (!SystemSessionProperties.isLegacyUnnest(session) && elementType instanceof RowType) {
ImmutableList.Builder<Symbol> unnestSymbolBuilder = ImmutableList.builder();
for (int i = 0; i < ((RowType) elementType).getFields().size(); i++) {
unnestSymbolBuilder.add(unnestedSymbolsIterator.next());
}
unnestSymbols.put(inputSymbol, unnestSymbolBuilder.build());
}
else {
unnestSymbols.put(inputSymbol, ImmutableList.of(unnestedSymbolsIterator.next()));
}
}
else if (type instanceof MapType) {
unnestSymbols.put(inputSymbol, ImmutableList.of(unnestedSymbolsIterator.next(), unnestedSymbolsIterator.next()));
Expand Down Expand Up @@ -640,7 +652,17 @@ protected RelationPlan visitUnnest(Unnest node, Void context)
Symbol inputSymbol = symbolAllocator.newSymbol(rewritten, type);
argumentSymbols.add(inputSymbol);
if (type instanceof ArrayType) {
unnestSymbols.put(inputSymbol, ImmutableList.of(unnestedSymbolsIterator.next()));
Type elementType = ((ArrayType) type).getElementType();
if (!SystemSessionProperties.isLegacyUnnest(session) && elementType instanceof RowType) {
ImmutableList.Builder<Symbol> unnestSymbolBuilder = ImmutableList.builder();
for (int i = 0; i < ((RowType) elementType).getFields().size(); i++) {
unnestSymbolBuilder.add(unnestedSymbolsIterator.next());
}
unnestSymbols.put(inputSymbol, unnestSymbolBuilder.build());
}
else {
unnestSymbols.put(inputSymbol, ImmutableList.of(unnestedSymbolsIterator.next()));
}
}
else if (type instanceof MapType) {
unnestSymbols.put(inputSymbol, ImmutableList.of(unnestedSymbolsIterator.next(), unnestedSymbolsIterator.next()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.metadata.MetadataManager;
import com.facebook.presto.spi.Page;
import com.facebook.presto.spi.type.ArrayType;
import com.facebook.presto.spi.type.RowType;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.sql.planner.plan.PlanNodeId;
import com.facebook.presto.testing.MaterializedResult;
Expand All @@ -36,6 +37,7 @@
import static com.facebook.presto.spi.type.BigintType.BIGINT;
import static com.facebook.presto.spi.type.DoubleType.DOUBLE;
import static com.facebook.presto.spi.type.TypeSignature.parseTypeSignature;
import static com.facebook.presto.spi.type.VarcharType.VARCHAR;
import static com.facebook.presto.testing.MaterializedResult.resultBuilder;
import static com.facebook.presto.testing.TestingTaskContext.createTaskContext;
import static com.facebook.presto.util.StructuralTestUtil.arrayBlockOf;
Expand Down Expand Up @@ -188,4 +190,34 @@ public void testUnnestNonNumericDoubles()

assertOperatorEquals(operatorFactory, driverContext, input, expected);
}

@Test
public void testUnnestWithArrayOfRows()
{
MetadataManager metadata = createTestMetadataManager();
Type arrayOfRowType = metadata.getType(parseTypeSignature("array(row(bigint, double, varchar))"));
Type elementType = RowType.anonymous(ImmutableList.of(BIGINT, DOUBLE, VARCHAR));

List<Page> input = rowPagesBuilder(BIGINT, arrayOfRowType)
.row(1, arrayBlockOf(elementType, ImmutableList.of(2, 4.2, "abc"), ImmutableList.of(3, 6.6, "def")))
.row(2, arrayBlockOf(elementType, ImmutableList.of(99, 3.14, "pi"), null))
.row(3, null)
.pageBreak()
.row(6, arrayBlockOf(elementType, null, ImmutableList.of(8, 1.111, "tt")))
.build();

OperatorFactory operatorFactory = new UnnestOperator.UnnestOperatorFactory(
0, new PlanNodeId("test"), ImmutableList.of(0), ImmutableList.of(BIGINT), ImmutableList.of(1), ImmutableList.of(arrayOfRowType), false);

MaterializedResult expected = resultBuilder(driverContext.getSession(), BIGINT, BIGINT, DOUBLE, VARCHAR)
.row(1L, 2L, 4.2, "abc")
.row(1L, 3L, 6.6, "def")
.row(2L, 99L, 3.14, "pi")
.row(2L, null, null, null)
.row(6L, null, null, null)
.row(6L, 8L, 1.111, "tt")
.build();

assertOperatorEquals(operatorFactory, driverContext, input, expected);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public void testDefaults()
.setArrayAggGroupImplementation(ArrayAggGroupImplementation.NEW)
.setDistributedSortEnabled(true)
.setMaxGroupingSets(2048)
.setLegacyUnnestArrayRows(false)
.setPreAllocateMemoryThreshold(succinctBytes(0)));
}

Expand Down Expand Up @@ -164,6 +165,7 @@ public void testExplicitPropertyMappings()
.put("optimizer.prefer-partial-aggregation", "false")
.put("distributed-sort", "false")
.put("analyzer.max-grouping-sets", "2047")
.put("deprecated.legacy-unnest-array-rows", "true")
.put("experimental.preallocate-memory-threshold", "5TB")
.build();

Expand Down Expand Up @@ -223,6 +225,7 @@ public void testExplicitPropertyMappings()
.setArrayAggGroupImplementation(ArrayAggGroupImplementation.LEGACY)
.setDistributedSortEnabled(false)
.setMaxGroupingSets(2047)
.setLegacyUnnestArrayRows(true)
.setPreAllocateMemoryThreshold(DataSize.valueOf("5TB"));
assertFullMapping(properties, expected);
}
Expand Down
Loading

0 comments on commit f3f7aa3

Please sign in to comment.