Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unnest array of row to multiple columns #10883

Merged
merged 1 commit into from
Jul 24, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public final class SystemSessionProperties
public static final String USE_MARK_DISTINCT = "use_mark_distinct";
public static final String PREFER_PARTITIAL_AGGREGATION = "prefer_partial_aggregation";
public static final String MAX_GROUPING_SETS = "max_grouping_sets";
public static final String LEGACY_UNNEST = "legacy_unnest";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -478,7 +479,12 @@ public SystemSessionProperties(
MAX_GROUPING_SETS,
"Maximum number of grouping sets in a GROUP BY",
featuresConfig.getMaxGroupingSets(),
true));
true),
booleanProperty(
LEGACY_UNNEST,
"Using legacy unnest semantic, where unnest(array(row)) will create one column of type row",
featuresConfig.isLegacyUnnestArrayRows(),
false));
}

public List<PropertyMetadata<?>> getSessionProperties()
Expand Down Expand Up @@ -780,6 +786,11 @@ public static int getMaxGroupingSets(Session session)
return session.getSystemProperty(MAX_GROUPING_SETS, Integer.class);
}

public static boolean isLegacyUnnest(Session session)
{
return session.getSystemProperty(LEGACY_UNNEST, Boolean.class);
}

private static int validateValueIsPowerOfTwo(Object value, String property)
{
int intValue = ((Number) requireNonNull(value, "value is null")).intValue();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.operator;

import com.facebook.presto.spi.PageBuilder;
import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.block.BlockBuilder;
import com.facebook.presto.spi.block.ColumnarRow;
import com.facebook.presto.spi.type.RowType;
import com.facebook.presto.spi.type.Type;
import com.google.common.collect.ImmutableList;

import java.util.List;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;

public class ArrayOfRowsUnnester
implements Unnester
{
private final List<Type> fieldTypes;
private ColumnarRow columnarRow;
private int position;
private int nonNullPosition;
private int positionCount;

public ArrayOfRowsUnnester(Type elementType)
{
requireNonNull(elementType, "elementType is null");
checkArgument(elementType instanceof RowType, "elementType is not of RowType");
this.fieldTypes = ImmutableList.copyOf(elementType.getTypeParameters());
}

@Override
public int getChannelCount()
{
return fieldTypes.size();
}

@Override
public void appendNext(PageBuilder pageBuilder, int outputChannelOffset)
{
checkState(columnarRow != null, "columnarRow is null");

for (int i = 0; i < fieldTypes.size(); i++) {
BlockBuilder blockBuilder = pageBuilder.getBlockBuilder(outputChannelOffset + i);
if (columnarRow.isNull(position)) {
blockBuilder.appendNull();
}
else {
fieldTypes.get(i).appendTo(columnarRow.getField(i), nonNullPosition, blockBuilder);
}
}
if (!columnarRow.isNull(position)) {
nonNullPosition++;
}
position++;
}

@Override
public boolean hasNext()
{
return position < positionCount;
}

@Override
public void setBlock(Block block)
{
this.position = 0;
this.nonNullPosition = 0;
if (block == null) {
this.columnarRow = null;
this.positionCount = 0;
}
else {
this.columnarRow = ColumnarRow.toColumnarRow(block);
this.positionCount = block.getPositionCount();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
*/
package com.facebook.presto.operator;

import com.facebook.presto.SystemSessionProperties;
import com.facebook.presto.spi.Page;
import com.facebook.presto.spi.PageBuilder;
import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.type.ArrayType;
import com.facebook.presto.spi.type.MapType;
import com.facebook.presto.spi.type.RowType;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.sql.planner.plan.PlanNodeId;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -63,7 +65,7 @@ public Operator createOperator(DriverContext driverContext)
{
checkState(!closed, "Factory is already closed");
OperatorContext operatorContext = driverContext.addOperatorContext(operatorId, planNodeId, UnnestOperator.class.getSimpleName());
return new UnnestOperator(operatorContext, replicateChannels, replicateTypes, unnestChannels, unnestTypes, withOrdinality);
return new UnnestOperator(operatorContext, replicateChannels, replicateTypes, unnestChannels, unnestTypes, withOrdinality, SystemSessionProperties.isLegacyUnnest(driverContext.getSession()));
}

@Override
Expand Down Expand Up @@ -92,7 +94,7 @@ public OperatorFactory duplicate()
private int currentPosition;
private int ordinalityCount;

public UnnestOperator(OperatorContext operatorContext, List<Integer> replicateChannels, List<Type> replicateTypes, List<Integer> unnestChannels, List<Type> unnestTypes, boolean withOrdinality)
public UnnestOperator(OperatorContext operatorContext, List<Integer> replicateChannels, List<Type> replicateTypes, List<Integer> unnestChannels, List<Type> unnestTypes, boolean withOrdinality, boolean isLegacyUnnest)
{
this.operatorContext = requireNonNull(operatorContext, "operatorContext is null");
this.replicateChannels = ImmutableList.copyOf(requireNonNull(replicateChannels, "replicateChannels is null"));
Expand All @@ -104,15 +106,21 @@ public UnnestOperator(OperatorContext operatorContext, List<Integer> replicateCh
checkArgument(unnestChannels.size() == unnestTypes.size(), "unnest channels or types has wrong size");
ImmutableList.Builder<Type> outputTypesBuilder = ImmutableList.<Type>builder()
.addAll(replicateTypes)
.addAll(getUnnestedTypes(unnestTypes));
.addAll(getUnnestedTypes(unnestTypes, isLegacyUnnest));
if (withOrdinality) {
outputTypesBuilder.add(BIGINT);
}
this.pageBuilder = new PageBuilder(outputTypesBuilder.build());
this.unnesters = new ArrayList<>(unnestTypes.size());
for (Type type : unnestTypes) {
if (type instanceof ArrayType) {
unnesters.add(new ArrayUnnester(((ArrayType) type).getElementType()));
Type elementType = ((ArrayType) type).getElementType();
if (!isLegacyUnnest && elementType instanceof RowType) {
unnesters.add(new ArrayOfRowsUnnester(elementType));
}
else {
unnesters.add(new ArrayUnnester(elementType));
}
}
else if (type instanceof MapType) {
MapType mapType = (MapType) type;
Expand All @@ -124,12 +132,17 @@ else if (type instanceof MapType) {
}
}

private static List<Type> getUnnestedTypes(List<Type> types)
private static List<Type> getUnnestedTypes(List<Type> types, boolean isLegacyUnnest)
{
ImmutableList.Builder<Type> builder = ImmutableList.builder();
for (Type type : types) {
checkArgument(type instanceof ArrayType || type instanceof MapType, "Can only unnest map and array types");
builder.addAll(type.getTypeParameters());
if (type instanceof ArrayType && !isLegacyUnnest && ((ArrayType) type).getElementType() instanceof RowType) {
builder.addAll(((ArrayType) type).getElementType().getTypeParameters());
}
else {
builder.addAll(type.getTypeParameters());
}
}
return builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ public class FeaturesConfig
private DataSize filterAndProjectMinOutputPageSize = new DataSize(500, KILOBYTE);
private int filterAndProjectMinOutputPageRowCount = 256;
private int maxGroupingSets = 2048;
private boolean legacyUnnestArrayRows;

public enum JoinReorderingStrategy
{
Expand Down Expand Up @@ -846,4 +847,16 @@ public FeaturesConfig setMaxGroupingSets(int maxGroupingSets)
this.maxGroupingSets = maxGroupingSets;
return this;
}

public boolean isLegacyUnnestArrayRows()
{
return legacyUnnestArrayRows;
}

@Config("deprecated.legacy-unnest-array-rows")
public FeaturesConfig setLegacyUnnestArrayRows(boolean legacyUnnestArrayRows)
{
this.legacyUnnestArrayRows = legacyUnnestArrayRows;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.sql.analyzer;

import com.facebook.presto.Session;
import com.facebook.presto.SystemSessionProperties;
import com.facebook.presto.metadata.FunctionKind;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.metadata.OperatorNotFoundException;
Expand Down Expand Up @@ -697,7 +698,15 @@ protected Scope visitUnnest(Unnest node, Optional<Scope> scope)
ExpressionAnalysis expressionAnalysis = analyzeExpression(expression, createScope(scope));
Type expressionType = expressionAnalysis.getType(expression);
if (expressionType instanceof ArrayType) {
outputFields.add(Field.newUnqualified(Optional.empty(), ((ArrayType) expressionType).getElementType()));
Type elementType = ((ArrayType) expressionType).getElementType();
if (!SystemSessionProperties.isLegacyUnnest(session) && elementType instanceof RowType) {
elementType.getTypeParameters().stream()
.map(type -> Field.newUnqualified(Optional.empty(), type))
.forEach(outputFields::add);
}
else {
outputFields.add(Field.newUnqualified(Optional.empty(), elementType));
}
}
else if (expressionType instanceof MapType) {
outputFields.add(Field.newUnqualified(Optional.empty(), ((MapType) expressionType).getKeyType()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
package com.facebook.presto.sql.planner;

import com.facebook.presto.Session;
import com.facebook.presto.SystemSessionProperties;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.metadata.TableHandle;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.predicate.TupleDomain;
import com.facebook.presto.spi.type.ArrayType;
import com.facebook.presto.spi.type.MapType;
import com.facebook.presto.spi.type.RowType;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.sql.ExpressionUtils;
import com.facebook.presto.sql.analyzer.Analysis;
Expand Down Expand Up @@ -549,7 +551,17 @@ private RelationPlan planCrossJoinUnnest(RelationPlan leftPlan, Join joinNode, U
Type type = analysis.getType(expression);
Symbol inputSymbol = translations.get(expression);
if (type instanceof ArrayType) {
unnestSymbols.put(inputSymbol, ImmutableList.of(unnestedSymbolsIterator.next()));
Type elementType = ((ArrayType) type).getElementType();
if (!SystemSessionProperties.isLegacyUnnest(session) && elementType instanceof RowType) {
ImmutableList.Builder<Symbol> unnestSymbolBuilder = ImmutableList.builder();
for (int i = 0; i < ((RowType) elementType).getFields().size(); i++) {
unnestSymbolBuilder.add(unnestedSymbolsIterator.next());
}
unnestSymbols.put(inputSymbol, unnestSymbolBuilder.build());
}
else {
unnestSymbols.put(inputSymbol, ImmutableList.of(unnestedSymbolsIterator.next()));
}
}
else if (type instanceof MapType) {
unnestSymbols.put(inputSymbol, ImmutableList.of(unnestedSymbolsIterator.next(), unnestedSymbolsIterator.next()));
Expand Down Expand Up @@ -640,7 +652,17 @@ protected RelationPlan visitUnnest(Unnest node, Void context)
Symbol inputSymbol = symbolAllocator.newSymbol(rewritten, type);
argumentSymbols.add(inputSymbol);
if (type instanceof ArrayType) {
unnestSymbols.put(inputSymbol, ImmutableList.of(unnestedSymbolsIterator.next()));
Type elementType = ((ArrayType) type).getElementType();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line 663 - 680 is same as line 562 - 580. And they are both doing unnest.

Would it make sense to refactor them? @martint @kokosing

Copy link
Contributor Author

@rongrong rongrong Jun 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite like how unnest is done through out the code base. the

if (type instanceof ArrayType)
...
else if (type instanceof MapType)
...

is everywhere. If I were to do a refactoring, I'd prefer something doesn't require that. But I didn't see an easy way to do so. Would be more than happy to take suggestions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please extract the outer if as a static method.

I'm not sure what part of the following snippet you don't like

if (type instanceof ArrayType)
...
else if (type instanceof MapType)
...

You could have done switch(type.getTypeSignature().getBase(). But that makes little difference.

if (!SystemSessionProperties.isLegacyUnnest(session) && elementType instanceof RowType) {
ImmutableList.Builder<Symbol> unnestSymbolBuilder = ImmutableList.builder();
for (int i = 0; i < ((RowType) elementType).getFields().size(); i++) {
unnestSymbolBuilder.add(unnestedSymbolsIterator.next());
}
unnestSymbols.put(inputSymbol, unnestSymbolBuilder.build());
}
else {
unnestSymbols.put(inputSymbol, ImmutableList.of(unnestedSymbolsIterator.next()));
}
}
else if (type instanceof MapType) {
unnestSymbols.put(inputSymbol, ImmutableList.of(unnestedSymbolsIterator.next(), unnestedSymbolsIterator.next()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.metadata.MetadataManager;
import com.facebook.presto.spi.Page;
import com.facebook.presto.spi.type.ArrayType;
import com.facebook.presto.spi.type.RowType;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.sql.planner.plan.PlanNodeId;
import com.facebook.presto.testing.MaterializedResult;
Expand All @@ -36,6 +37,7 @@
import static com.facebook.presto.spi.type.BigintType.BIGINT;
import static com.facebook.presto.spi.type.DoubleType.DOUBLE;
import static com.facebook.presto.spi.type.TypeSignature.parseTypeSignature;
import static com.facebook.presto.spi.type.VarcharType.VARCHAR;
import static com.facebook.presto.testing.MaterializedResult.resultBuilder;
import static com.facebook.presto.testing.TestingTaskContext.createTaskContext;
import static com.facebook.presto.util.StructuralTestUtil.arrayBlockOf;
Expand Down Expand Up @@ -188,4 +190,34 @@ public void testUnnestNonNumericDoubles()

assertOperatorEquals(operatorFactory, driverContext, input, expected);
}

@Test
public void testUnnestWithArrayOfRows()
{
MetadataManager metadata = createTestMetadataManager();
Type arrayOfRowType = metadata.getType(parseTypeSignature("array(row(bigint, double, varchar))"));
Type elementType = RowType.anonymous(ImmutableList.of(BIGINT, DOUBLE, VARCHAR));

List<Page> input = rowPagesBuilder(BIGINT, arrayOfRowType)
.row(1, arrayBlockOf(elementType, ImmutableList.of(2, 4.2, "abc"), ImmutableList.of(3, 6.6, "def")))
.row(2, arrayBlockOf(elementType, ImmutableList.of(99, 3.14, "pi"), null))
.row(3, null)
.pageBreak()
.row(6, arrayBlockOf(elementType, null, ImmutableList.of(8, 1.111, "tt")))
.build();

OperatorFactory operatorFactory = new UnnestOperator.UnnestOperatorFactory(
0, new PlanNodeId("test"), ImmutableList.of(0), ImmutableList.of(BIGINT), ImmutableList.of(1), ImmutableList.of(arrayOfRowType), false);

MaterializedResult expected = resultBuilder(driverContext.getSession(), BIGINT, BIGINT, DOUBLE, VARCHAR)
.row(1L, 2L, 4.2, "abc")
.row(1L, 3L, 6.6, "def")
.row(2L, 99L, 3.14, "pi")
.row(2L, null, null, null)
.row(6L, null, null, null)
.row(6L, 8L, 1.111, "tt")
.build();

assertOperatorEquals(operatorFactory, driverContext, input, expected);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public void testDefaults()
.setArrayAggGroupImplementation(ArrayAggGroupImplementation.NEW)
.setDistributedSortEnabled(true)
.setMaxGroupingSets(2048)
.setLegacyUnnestArrayRows(false)
.setPreAllocateMemoryThreshold(succinctBytes(0)));
}

Expand Down Expand Up @@ -164,6 +165,7 @@ public void testExplicitPropertyMappings()
.put("optimizer.prefer-partial-aggregation", "false")
.put("distributed-sort", "false")
.put("analyzer.max-grouping-sets", "2047")
.put("deprecated.legacy-unnest-array-rows", "true")
.put("experimental.preallocate-memory-threshold", "5TB")
.build();

Expand Down Expand Up @@ -223,6 +225,7 @@ public void testExplicitPropertyMappings()
.setArrayAggGroupImplementation(ArrayAggGroupImplementation.LEGACY)
.setDistributedSortEnabled(false)
.setMaxGroupingSets(2047)
.setLegacyUnnestArrayRows(true)
.setPreAllocateMemoryThreshold(DataSize.valueOf("5TB"));
assertFullMapping(properties, expected);
}
Expand Down
Loading