diff --git a/presto-benchmark/src/main/java/com/facebook/presto/benchmark/HandTpchQuery6.java b/presto-benchmark/src/main/java/com/facebook/presto/benchmark/HandTpchQuery6.java index 8058c98a970f..1fd53e3484ec 100644 --- a/presto-benchmark/src/main/java/com/facebook/presto/benchmark/HandTpchQuery6.java +++ b/presto-benchmark/src/main/java/com/facebook/presto/benchmark/HandTpchQuery6.java @@ -92,6 +92,18 @@ public int process(ConnectorSession session, Page page, int start, int end, Page return position; } + @Override + public Page processColumnar(ConnectorSession session, Page page, List types) + { + throw new UnsupportedOperationException(); + } + + @Override + public Page processColumnarDictionary(ConnectorSession session, Page page, List types) + { + throw new UnsupportedOperationException(); + } + private static void project(int position, PageBuilder pageBuilder, Block extendedPriceBlock, Block discountBlock) { if (discountBlock.isNull(position) || extendedPriceBlock.isNull(position)) { diff --git a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java index 8b4863c5cde3..6c88bc621991 100644 --- a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java +++ b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java @@ -49,6 +49,8 @@ public final class SystemSessionProperties public static final String QUERY_MAX_RUN_TIME = "query_max_run_time"; public static final String REDISTRIBUTE_WRITES = "redistribute_writes"; public static final String EXECUTION_POLICY = "execution_policy"; + public static final String COLUMNAR_PROCESSING = "columnar_processing"; + public static final String COLUMNAR_PROCESSING_DICTIONARY = "columnar_processing_dictionary"; private final List> sessionProperties; @@ -150,7 +152,17 @@ public SystemSessionProperties( DataSize.class, memoryManagerConfig.getMaxQueryMemory(), true, - value -> DataSize.valueOf((String) value))); + value -> DataSize.valueOf((String) value)), + booleanSessionProperty( + COLUMNAR_PROCESSING, + "Use columnar processing", + featuresConfig.isColumnarProcessing(), + false), + booleanSessionProperty( + COLUMNAR_PROCESSING_DICTIONARY, + "Use columnar processing with optimizations for dictionaries", + featuresConfig.isColumnarProcessingDictionary(), + false)); } public List> getSessionProperties() @@ -223,6 +235,16 @@ public static boolean isShareIndexLoading(Session session) return session.getProperty(TASK_SHARE_INDEX_LOADING, Boolean.class); } + public static boolean isColumnarProcessingEnabled(Session session) + { + return session.getProperty(COLUMNAR_PROCESSING, Boolean.class); + } + + public static boolean isColumnarProcessingDictionaryEnabled(Session session) + { + return session.getProperty(COLUMNAR_PROCESSING_DICTIONARY, Boolean.class); + } + public static DataSize getQueryMaxMemory(Session session) { return session.getProperty(QUERY_MAX_MEMORY, DataSize.class); diff --git a/presto-main/src/main/java/com/facebook/presto/byteCode/expression/ByteCodeExpressions.java b/presto-main/src/main/java/com/facebook/presto/byteCode/expression/ByteCodeExpressions.java index a6785491db8f..568b2832729c 100644 --- a/presto-main/src/main/java/com/facebook/presto/byteCode/expression/ByteCodeExpressions.java +++ b/presto-main/src/main/java/com/facebook/presto/byteCode/expression/ByteCodeExpressions.java @@ -612,4 +612,13 @@ public static ByteCodeExpression inlineIf(ByteCodeExpression condition, ByteCode { return new InlineIfByteCodeExpression(condition, ifTrue, ifFalse); } + + // + // Print + // + public static ByteCodeExpression print(ByteCodeExpression variable) + { + ByteCodeExpression out = getStatic(System.class, "out"); + return out.invoke("println", void.class, variable); + } } diff --git a/presto-main/src/main/java/com/facebook/presto/operator/FilterAndProjectOperator.java b/presto-main/src/main/java/com/facebook/presto/operator/FilterAndProjectOperator.java index 81be8d095fc9..f3c6834621ec 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/FilterAndProjectOperator.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/FilterAndProjectOperator.java @@ -20,6 +20,8 @@ import java.util.List; +import static com.facebook.presto.SystemSessionProperties.isColumnarProcessingDictionaryEnabled; +import static com.facebook.presto.SystemSessionProperties.isColumnarProcessingEnabled; import static com.google.common.base.Preconditions.checkState; import static java.util.Objects.requireNonNull; @@ -31,6 +33,8 @@ public class FilterAndProjectOperator private final PageBuilder pageBuilder; private final PageProcessor processor; + private final boolean columnarProcessingEnabled; + private final boolean columnarProcessingDictionaryEnabled; private Page currentPage; private int currentPosition; private boolean finishing; @@ -40,6 +44,8 @@ public FilterAndProjectOperator(OperatorContext operatorContext, Iterable projections; @@ -36,16 +51,154 @@ public GenericPageProcessor(FilterFunction filterFunction, Iterable types) + { + int[] selectedPositions = filterPage(page); + if (selectedPositions.length == 0) { + return null; + } + + if (projections.isEmpty()) { + return new Page(selectedPositions.length); + } + + PageBuilder pageBuilder = new PageBuilder(types); + Block[] inputBlocks = page.getBlocks(); + + for (int projectionIndex = 0; projectionIndex < projections.size(); projectionIndex++) { + ProjectionFunction projection = projections.get(projectionIndex); + projectColumnar(selectedPositions, pageBuilder.getBlockBuilder(projectionIndex), inputBlocks, projection); + } + pageBuilder.declarePositions(selectedPositions.length); + return pageBuilder.build(); + } + + @Override + public Page processColumnarDictionary(ConnectorSession session, Page page, List types) + { + Page inputPage = getNonLazyPage(page); + int[] selectedPositions = filterPage(inputPage); + + if (selectedPositions.length == 0) { + return null; + } + + if (projections.isEmpty()) { + return new Page(selectedPositions.length); + } + + PageBuilder pageBuilder = new PageBuilder(types); + Block[] inputBlocks = page.getBlocks(); + Block[] outputBlocks = new Block[projections.size()]; + + for (int projectionIndex = 0; projectionIndex < projections.size(); projectionIndex++) { + ProjectionFunction projection = projections.get(projectionIndex); + + if (canDictionaryProcess(projection, inputPage)) { + Block outputDictionary = projectDictionary(projection, inputPage); + int[] outputIds = filterIds(projection, inputPage, selectedPositions); + outputBlocks[projectionIndex] = new DictionaryBlock(selectedPositions.length, outputDictionary, wrappedIntArray(outputIds)); + } + else { + outputBlocks[projectionIndex] = projectColumnar(selectedPositions, pageBuilder.getBlockBuilder(projectionIndex), inputBlocks, projection).build(); + } + } + + for (Block block : outputBlocks) { + verify(block.getPositionCount() == selectedPositions.length); + } + return new Page(selectedPositions.length, outputBlocks); + } + + private static BlockBuilder projectColumnar(int[] selectedPositions, BlockBuilder blockBuilder, Block[] inputBlocks, ProjectionFunction projection) + { + for (int position : selectedPositions) { + projection.project(position, inputBlocks, blockBuilder); + } + return blockBuilder; + } + + private static int[] filterIds(ProjectionFunction projection, Page page, int[] selectedPositions) + { + Slice ids = ((DictionaryBlock) page.getBlock(getOnlyElement(projection.getInputChannels()))).getIds(); + + int[] outputIds = new int[selectedPositions.length]; + for (int pos = 0; pos < selectedPositions.length; pos++) { + outputIds[pos] = ids.getInt(selectedPositions[pos] * SizeOf.SIZE_OF_INT); + } + return outputIds; + } + + private static Block projectDictionary(ProjectionFunction projection, Page page) + { + int inputChannel = getOnlyElement(projection.getInputChannels()); + Block dictionary = ((DictionaryBlock) page.getBlock(inputChannel)).getDictionary(); + + BlockBuilder dictionaryBuilder = projection.getType().createBlockBuilder(new BlockBuilderStatus(), dictionary.getPositionCount()); + Block[] blocks = new Block[page.getChannelCount()]; + blocks[inputChannel] = dictionary; + + for (int i = 0; i < dictionary.getPositionCount(); i++) { + projection.project(i, blocks, dictionaryBuilder); + } + return dictionaryBuilder.build(); + } + + private static boolean canDictionaryProcess(ProjectionFunction projection, Page inputPage) + { + Set inputChannels = projection.getInputChannels(); + return projection.isDeterministic() + && inputChannels.size() == 1 + && (inputPage.getBlock(getOnlyElement(inputChannels)) instanceof DictionaryBlock); + } + + private Page getNonLazyPage(Page page) + { + ImmutableSet.Builder builder = ImmutableSet.builder(); + for (ProjectionFunction projection : projections) { + builder.addAll(projection.getInputChannels()); + } + Set inputChannels = builder.build(); + + if (inputChannels.isEmpty()) { + return page; + } + + Block[] blocks = page.getBlocks(); + for (int inputChannel : inputChannels) { + Block block = page.getBlock(inputChannel); + if (block instanceof LazyBlock) { + blocks[inputChannel] = ((LazyBlock) block).getBlock(); + } + } + return new Page(blocks); + } + + private int[] filterPage(Page page) + { + int[] selected = new int[page.getPositionCount()]; + int index = 0; + for (int position = 0; position < page.getPositionCount(); position++) { + if (filterFunction.filter(position, page.getBlocks())) { + selected[index] = position; + index++; + } + } + return copyOf(selected, index); + } } diff --git a/presto-main/src/main/java/com/facebook/presto/operator/PageProcessor.java b/presto-main/src/main/java/com/facebook/presto/operator/PageProcessor.java index 0ee8a8ee7784..f6576e72b074 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/PageProcessor.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/PageProcessor.java @@ -16,8 +16,21 @@ import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.Page; import com.facebook.presto.spi.PageBuilder; +import com.facebook.presto.spi.type.Type; + +import javax.annotation.Nullable; + +import java.util.List; public interface PageProcessor { int process(ConnectorSession session, Page page, int start, int end, PageBuilder pageBuilder); + + /** + * @return filtered and projected page, null if all positions are filtered + */ + @Nullable + Page processColumnar(ConnectorSession session, Page page, List types); + + Page processColumnarDictionary(ConnectorSession session, Page page, List types); } diff --git a/presto-main/src/main/java/com/facebook/presto/operator/ProjectionFunction.java b/presto-main/src/main/java/com/facebook/presto/operator/ProjectionFunction.java index 80ba28a143d8..155aff5d6aa2 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/ProjectionFunction.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/ProjectionFunction.java @@ -18,6 +18,8 @@ import com.facebook.presto.spi.block.BlockBuilder; import com.facebook.presto.spi.type.Type; +import java.util.Set; + public interface ProjectionFunction { Type getType(); @@ -25,4 +27,8 @@ public interface ProjectionFunction void project(int position, Block[] blocks, BlockBuilder output); void project(RecordCursor cursor, BlockBuilder output); + + Set getInputChannels(); + + boolean isDeterministic(); } diff --git a/presto-main/src/main/java/com/facebook/presto/operator/ProjectionFunctions.java b/presto-main/src/main/java/com/facebook/presto/operator/ProjectionFunctions.java index 317c256f2717..66a4ab2d86f0 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/ProjectionFunctions.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/ProjectionFunctions.java @@ -18,8 +18,11 @@ import com.facebook.presto.spi.block.BlockBuilder; import com.facebook.presto.spi.type.Type; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; import io.airlift.slice.Slice; +import java.util.Set; + import static java.util.Objects.requireNonNull; public final class ProjectionFunctions @@ -90,5 +93,17 @@ else if (javaType == Slice.class) { } } } + + @Override + public Set getInputChannels() + { + return ImmutableSet.of(channelIndex); + } + + @Override + public boolean isDeterministic() + { + return true; + } } } diff --git a/presto-main/src/main/java/com/facebook/presto/operator/ScanFilterAndProjectOperator.java b/presto-main/src/main/java/com/facebook/presto/operator/ScanFilterAndProjectOperator.java index 5a4a5a17aa26..d1ce989bfd7a 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/ScanFilterAndProjectOperator.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/ScanFilterAndProjectOperator.java @@ -36,6 +36,8 @@ import java.util.Optional; import java.util.function.Supplier; +import static com.facebook.presto.SystemSessionProperties.isColumnarProcessingDictionaryEnabled; +import static com.facebook.presto.SystemSessionProperties.isColumnarProcessingEnabled; import static com.google.common.base.Preconditions.checkState; import static java.util.Objects.requireNonNull; @@ -55,6 +57,8 @@ public class ScanFilterAndProjectOperator private final LocalMemoryContext pageSourceMemoryContext; private final LocalMemoryContext pageBuilderMemoryContext; private final SettableFuture blocked = SettableFuture.create(); + private final boolean columnarProcessingEnabled; + private final boolean columnarProcessingDictionaryEnabled; private RecordCursor cursor; private ConnectorPageSource pageSource; @@ -86,6 +90,8 @@ protected ScanFilterAndProjectOperator( this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null")); this.pageSourceMemoryContext = operatorContext.getSystemMemoryContext().newLocalMemoryContext(); this.pageBuilderMemoryContext = operatorContext.getSystemMemoryContext().newLocalMemoryContext(); + this.columnarProcessingEnabled = isColumnarProcessingEnabled(operatorContext.getSession()); + this.columnarProcessingDictionaryEnabled = isColumnarProcessingDictionaryEnabled(operatorContext.getSession()); this.pageBuilder = new PageBuilder(getTypes()); } @@ -237,10 +243,24 @@ public Page getOutput() } if (currentPage != null) { - currentPosition = pageProcessor.process(operatorContext.getSession().toConnectorSession(), currentPage, currentPosition, currentPage.getPositionCount(), pageBuilder); - if (currentPosition == currentPage.getPositionCount()) { + if (columnarProcessingDictionaryEnabled) { + Page page = pageProcessor.processColumnarDictionary(operatorContext.getSession().toConnectorSession(), currentPage, getTypes()); currentPage = null; currentPosition = 0; + return page; + } + else if (columnarProcessingEnabled) { + Page page = pageProcessor.processColumnar(operatorContext.getSession().toConnectorSession(), currentPage, getTypes()); + currentPage = null; + currentPosition = 0; + return page; + } + else { + currentPosition = pageProcessor.process(operatorContext.getSession().toConnectorSession(), currentPage, currentPosition, currentPage.getPositionCount(), pageBuilder); + if (currentPosition == currentPage.getPositionCount()) { + currentPage = null; + currentPosition = 0; + } } } diff --git a/presto-main/src/main/java/com/facebook/presto/operator/index/TupleFilterProcessor.java b/presto-main/src/main/java/com/facebook/presto/operator/index/TupleFilterProcessor.java index 1c1d61c58602..baa037b0bb2d 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/index/TupleFilterProcessor.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/index/TupleFilterProcessor.java @@ -71,6 +71,38 @@ public int process(ConnectorSession session, Page page, int start, int end, Page return end; } + @Override + public Page processColumnar(ConnectorSession session, Page page, List types) + { + PageBuilder pageBuilder = new PageBuilder(types); + int positionCount = page.getPositionCount(); + + int[] selectedPositions = new int[positionCount]; + int selectedCount = 0; + for (int i = 0; i < positionCount; i++) { + if (matches(i, page)) { + selectedPositions[selectedCount++] = i; + } + } + + for (int i = 0; i < outputTypes.size(); i++) { + Type type = outputTypes.get(i); + Block block = page.getBlock(i); + BlockBuilder blockBuilder = pageBuilder.getBlockBuilder(i); + for (int position : selectedPositions) { + type.appendTo(block, position, blockBuilder); + } + } + pageBuilder.declarePositions(selectedCount); + return pageBuilder.build(); + } + + @Override + public Page processColumnarDictionary(ConnectorSession session, Page page, List types) + { + return processColumnar(session, page, types); + } + private boolean matches(int position, Page page) { for (int i = 0; i < outputTupleChannels.length; i++) { diff --git a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java index bb083958447c..07c34562eba4 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java @@ -26,6 +26,8 @@ public class FeaturesConfig private boolean optimizeHashGeneration = true; private boolean optimizeSingleDistinct = true; private boolean intermediateAggregationsEnabled = false; + private boolean columnarProcessing = false; + private boolean columnarProcessingDictionary = false; @LegacyConfig("analyzer.experimental-syntax-enabled") @Config("experimental-syntax-enabled") @@ -123,4 +125,28 @@ public FeaturesConfig setIntermediateAggregationsEnabled(boolean intermediateAgg this.intermediateAggregationsEnabled = intermediateAggregationsEnabled; return this; } + + public boolean isColumnarProcessing() + { + return columnarProcessing; + } + + @Config("optimizer.columnar-processing") + public FeaturesConfig setColumnarProcessing(boolean columnarProcessing) + { + this.columnarProcessing = columnarProcessing; + return this; + } + + public boolean isColumnarProcessingDictionary() + { + return columnarProcessingDictionary; + } + + @Config("optimizer.columnar-processing-dictionary") + public FeaturesConfig setColumnarProcessingDictionary(boolean columnarProcessingDictionary) + { + this.columnarProcessingDictionary = columnarProcessingDictionary; + return this; + } } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/gen/PageProcessorCompiler.java b/presto-main/src/main/java/com/facebook/presto/sql/gen/PageProcessorCompiler.java index 57055b0a1d1d..269adcdb0edb 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/gen/PageProcessorCompiler.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/gen/PageProcessorCompiler.java @@ -31,6 +31,9 @@ import com.facebook.presto.spi.PageBuilder; import com.facebook.presto.spi.block.Block; import com.facebook.presto.spi.block.BlockBuilder; +import com.facebook.presto.spi.block.DictionaryBlock; +import com.facebook.presto.spi.block.LazyBlock; +import com.facebook.presto.spi.block.RunLengthEncodedBlock; import com.facebook.presto.spi.type.Type; import com.facebook.presto.sql.relational.CallExpression; import com.facebook.presto.sql.relational.ConstantExpression; @@ -42,13 +45,17 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.primitives.Primitives; +import io.airlift.slice.SizeOf; import io.airlift.slice.Slice; +import io.airlift.slice.Slices; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.TreeSet; +import static com.facebook.presto.byteCode.Access.PRIVATE; import static com.facebook.presto.byteCode.Access.PUBLIC; import static com.facebook.presto.byteCode.Access.a; import static com.facebook.presto.byteCode.Parameter.arg; @@ -56,12 +63,20 @@ import static com.facebook.presto.byteCode.expression.ByteCodeExpressions.add; import static com.facebook.presto.byteCode.expression.ByteCodeExpressions.constantFalse; import static com.facebook.presto.byteCode.expression.ByteCodeExpressions.constantInt; +import static com.facebook.presto.byteCode.expression.ByteCodeExpressions.constantNull; +import static com.facebook.presto.byteCode.expression.ByteCodeExpressions.constantTrue; +import static com.facebook.presto.byteCode.expression.ByteCodeExpressions.equal; +import static com.facebook.presto.byteCode.expression.ByteCodeExpressions.invokeStatic; import static com.facebook.presto.byteCode.expression.ByteCodeExpressions.lessThan; +import static com.facebook.presto.byteCode.expression.ByteCodeExpressions.multiply; +import static com.facebook.presto.byteCode.expression.ByteCodeExpressions.newArray; +import static com.facebook.presto.byteCode.expression.ByteCodeExpressions.newInstance; import static com.facebook.presto.byteCode.instruction.JumpInstruction.jump; import static com.facebook.presto.sql.gen.ByteCodeUtils.generateWrite; import static com.facebook.presto.sql.gen.ByteCodeUtils.loadConstant; import static com.google.common.base.Preconditions.checkState; -import static com.google.common.collect.Iterables.*; +import static com.google.common.base.Verify.verify; +import static com.google.common.collect.Iterables.concat; import static java.lang.String.format; import static java.util.stream.Collectors.toList; @@ -82,7 +97,14 @@ public void generateMethods(ClassDefinition classDefinition, CallSiteBinder call for (int i = 0; i < projections.size(); i++) { projectionMethods.add(generateProjectMethod(classDefinition, callSiteBinder, "project_" + i, projections.get(i))); } - generateProcessMethod(classDefinition, filter, projections, projectionMethods.build()); + List projectionMethodDefinitions = projectionMethods.build(); + + generateProcessMethod(classDefinition, filter, projections, projectionMethodDefinitions); + generateGetNonLazyPageMethod(classDefinition, filter, projections); + generateProcessColumnarMethod(classDefinition, callSiteBinder, filter, projections, projectionMethodDefinitions); + generateProcessColumnarDictionaryMethod(classDefinition, callSiteBinder, filter, projections, projectionMethodDefinitions); + + generateFilterPageMethod(classDefinition, filter); generateFilterMethod(classDefinition, callSiteBinder, filter); } @@ -106,8 +128,8 @@ private static void generateProcessMethod(ClassDefinition classDefinition, RowEx Variable blockVariable = scope.declareVariable("block_" + channel, body, page.invoke("getBlock", Block.class, constantInt(channel))); builder.put(channel, blockVariable); } - Map channelBlock = builder.build(); - Map> expressionInputBlocks = getExpressionInputBlocks(projections, filter, channelBlock); + Map channelBlocks = builder.build(); + Map> expressionInputBlocks = getExpressionInputBlocks(projections, filter, channelBlocks); // extract block builders ImmutableList.Builder variableBuilder = ImmutableList.builder(); @@ -148,6 +170,370 @@ private static void generateProcessMethod(ClassDefinition classDefinition, RowEx .append(position.ret()); } + private static void generateProcessColumnarMethod( + ClassDefinition classDefinition, + CallSiteBinder callSiteBinder, + RowExpression filter, + List projections, + List projectionMethods) + { + Parameter session = arg("session", ConnectorSession.class); + Parameter page = arg("page", Page.class); + Parameter types = arg("types", List.class); + MethodDefinition method = classDefinition.declareMethod(a(PUBLIC), "processColumnar", type(Page.class), session, page, types); + + Scope scope = method.getScope(); + ByteCodeBlock body = method.getBody(); + Variable thisVariable = method.getThis(); + + // extract blocks + List allInputChannels = getInputChannels(concat(projections, ImmutableList.of(filter))); + ImmutableMap.Builder builder = ImmutableMap.builder(); + for (int channel : allInputChannels) { + Variable blockVariable = scope.declareVariable("block_" + channel, body, page.invoke("getBlock", Block.class, constantInt(channel))); + builder.put(channel, blockVariable); + } + Map channelBlocks = builder.build(); + Map> expressionInputBlocks = getExpressionInputBlocks(projections, filter, channelBlocks); + + Variable selectedPositions = scope.declareVariable("selectedPositions", body, thisVariable.invoke("filterPage", int[].class, session, page)); + Variable cardinality = scope.declareVariable("cardinality", body, selectedPositions.length()); + + body.comment("if no rows selected return null") + .append(new IfStatement() + .condition(equal(cardinality, constantInt(0))) + .ifTrue(constantNull(Page.class).ret())); + + if (projectionMethods.isEmpty()) { + // if no projections, return new page with selected rows + body.append(newInstance(Page.class, cardinality, newArray(type(Block[].class), 0)).ret()); + return; + } + + Variable pageBuilder = scope.declareVariable("pageBuilder", body, newInstance(PageBuilder.class, cardinality, types)); + + // create variables for blockBuilders + for (int projectionIndex = 0; projectionIndex < projections.size(); projectionIndex++) { + scope.declareVariable("blockBuilder_" + projectionIndex, body, pageBuilder.invoke("getBlockBuilder", BlockBuilder.class, constantInt(projectionIndex))); + } + + scope.declareVariable("positionCount", body, page.invoke("getPositionCount", int.class)); + scope.declareVariable(int.class, "position"); + + Variable outputBlocks = scope.declareVariable("outputBlocks", body, newArray(type(Block[].class), projections.size())); + + for (int projectionIndex = 0; projectionIndex < projections.size(); projectionIndex++) { + RowExpression projection = projections.get(projectionIndex); + ByteCodeBlock simpleProjectionBlock = getSimpleProjectionBlock( + callSiteBinder, + scope, + projectionMethods.get(projectionIndex), + projection, + projectionIndex, + expressionInputBlocks.get(projection)); + body.append(simpleProjectionBlock); + } + + // create new page from outputBlocks + body.append(newInstance(Page.class, cardinality, outputBlocks).ret()); + } + + private static void generateProcessColumnarDictionaryMethod( + ClassDefinition classDefinition, + CallSiteBinder callSiteBinder, + RowExpression filter, + List projections, + List projectionMethods) + { + Parameter session = arg("session", ConnectorSession.class); + Parameter page = arg("page", Page.class); + Parameter types = arg("types", List.class); + MethodDefinition method = classDefinition.declareMethod(a(PUBLIC), "processColumnarDictionary", type(Page.class), session, page, types); + + Scope scope = method.getScope(); + ByteCodeBlock body = method.getBody(); + Variable thisVariable = method.getThis(); + + // extract blocks + List allInputChannels = getInputChannels(concat(projections, ImmutableList.of(filter))); + ImmutableMap.Builder builder = ImmutableMap.builder(); + for (int channel : allInputChannels) { + Variable blockVariable = scope.declareVariable("block_" + channel, body, page.invoke("getBlock", Block.class, constantInt(channel))); + builder.put(channel, blockVariable); + } + Map channelBlocks = builder.build(); + Map> expressionInputBlocks = getExpressionInputBlocks(projections, filter, channelBlocks); + + Variable selectedPositions = scope.declareVariable("selectedPositions", body, thisVariable.invoke("filterPage", int[].class, session, page)); + Variable cardinality = scope.declareVariable("cardinality", body, selectedPositions.length()); + + body.comment("if no rows selected return null") + .append(new IfStatement() + .condition(equal(cardinality, constantInt(0))) + .ifTrue(constantNull(Page.class).ret())); + + if (projectionMethods.isEmpty()) { + // if no projections, return new page with selected rows + body.append(newInstance(Page.class, cardinality, newArray(type(Block[].class), 0)).ret()); + return; + } + + // create PageBuilder + Variable pageBuilder = scope.declareVariable("pageBuilder", body, newInstance(PageBuilder.class, cardinality, types)); + + for (int projectionIndex = 0; projectionIndex < projections.size(); projectionIndex++) { + scope.declareVariable("blockBuilder_" + projectionIndex, body, pageBuilder.invoke("getBlockBuilder", BlockBuilder.class, constantInt(projectionIndex))); + } + + body.append(page.set(thisVariable.invoke("getNonLazyPage", Page.class, page))); + + // create outputBlocks + Variable outputBlocks = scope.declareVariable("outputBlocks", body, newArray(type(Block[].class), projections.size())); + + scope.declareVariable("positionCount", body, page.invoke("getPositionCount", int.class)); + scope.declareVariable(int.class, "position"); + scope.declareVariable(Block.class, "block"); + + scope.declareVariable(Block.class, "dictionary"); + scope.declareVariable(Slice.class, "ids"); + scope.declareVariable(int.class, "dictionaryCount"); + + scope.declareVariable(Block.class, "outputDictionary"); + scope.declareVariable(int[].class, "outputIds"); + + for (int projectionIndex = 0; projectionIndex < projections.size(); projectionIndex++) { + RowExpression projection = projections.get(projectionIndex); + List inputChannels = getInputChannels(projection); + + ByteCodeBlock simpleProjection = getSimpleProjectionBlock( + callSiteBinder, + scope, + projectionMethods.get(projectionIndex), + projection, + projectionIndex, + expressionInputBlocks.get(projection)); + + if (inputChannels.size() != 1) { + body.append(simpleProjection); + continue; + } + + Variable inputBlock = Iterables.getOnlyElement(expressionInputBlocks.get(projection)); + ByteCodeBlock dictionaryProjection = getDictionaryProjectionBlock( + scope, + projectionMethods.get(projectionIndex), + projectionIndex, + inputBlock); + + IfStatement ifStatement = new IfStatement() + .condition(inputBlock.instanceOf(DictionaryBlock.class)) + .ifTrue(dictionaryProjection) + .ifFalse(simpleProjection); + body.append(ifStatement); + } + + body.append(newInstance(Page.class, cardinality, outputBlocks).ret()); + } + + private static ByteCodeBlock getDictionaryProjectionBlock( + Scope scope, + MethodDefinition projectionMethod, + int projectionIndex, + Variable inputBlock) + { + Variable session = scope.getVariable("session"); + + Variable cardinality = scope.getVariable("cardinality"); + Variable selectedPositions = scope.getVariable("selectedPositions"); + + Variable outputBlocks = scope.getVariable("outputBlocks"); + Variable position = scope.getVariable("position"); + + Variable dictionary = scope.getVariable("dictionary"); + Variable ids = scope.getVariable("ids"); + Variable dictionaryCount = scope.getVariable("dictionaryCount"); + + Variable outputDictionary = scope.getVariable("outputDictionary"); + Variable outputIds = scope.getVariable("outputIds"); + + Variable thisVariable = scope.getThis(); + Variable blockBuilder = scope.getVariable("blockBuilder_" + projectionIndex); + + ByteCodeBlock dictionaryProjection = new ByteCodeBlock().comment("Dictionary projection"); + + dictionaryProjection + .comment("Extract dictionary and ids") + .append(dictionary.set(inputBlock.cast(DictionaryBlock.class).invoke("getDictionary", Block.class))) + .append(ids.set(inputBlock.cast(DictionaryBlock.class).invoke("getIds", Slice.class))) + .append(dictionaryCount.set(dictionary.invoke("getPositionCount", int.class))) + .comment("Project dictionary") + .append(new ForLoop() + .initialize(position.set(constantInt(0))) + .condition(lessThan(position, dictionaryCount)) + .update(position.increment()) + .body(invokeProject(thisVariable, session, ImmutableList.of(dictionary), position, blockBuilder, projectionMethod))) + .append(outputDictionary.set(blockBuilder.invoke("build", Block.class))) + .comment("Filter ids") + .append(outputIds.set(newArray(type(int[].class), cardinality))) + .append(new ForLoop() + .initialize(position.set(constantInt(0))) + .condition(lessThan(position, cardinality)) + .update(position.increment()) + .body(outputIds.setElement(position, ids.invoke("getInt", int.class, multiply(selectedPositions.getElement(position), constantInt(SizeOf.SIZE_OF_INT)))))) + .append(outputBlocks.setElement(projectionIndex, + newInstance(DictionaryBlock.class, cardinality, outputDictionary, invokeStatic(Slices.class, "wrappedIntArray", Slice.class, outputIds)).cast(Block.class))); + + return dictionaryProjection; + } + + private static ByteCodeBlock getSimpleProjectionBlock( + CallSiteBinder callSiteBinder, + Scope scope, + MethodDefinition projectionMethod, + RowExpression projection, + int projectionIndex, + List blocks) + { + Variable session = scope.getVariable("session"); + + Variable cardinality = scope.getVariable("cardinality"); + Variable selectedPositions = scope.getVariable("selectedPositions"); + + Variable page = scope.getVariable("page"); + Variable pageBuilder = scope.getVariable("pageBuilder"); + Variable outputBlocks = scope.getVariable("outputBlocks"); + + Variable position = scope.getVariable("position"); + Variable positionCount = scope.getVariable("positionCount"); + + Variable thisVariable = scope.getThis(); + Variable blockBuilder = scope.getVariable("blockBuilder_" + projectionIndex); + + ByteCodeBlock projectBlock = new ByteCodeBlock() + .append(new ForLoop() + .initialize(position.set(constantInt(0))) + .condition(lessThan(position, cardinality)) + .update(position.increment()) + .body(invokeProject(thisVariable, + session, + blocks, + selectedPositions.getElement(position), + blockBuilder, + projectionMethod))) + .append(outputBlocks.setElement(projectionIndex, blockBuilder.invoke("build", Block.class))); + + ByteCodeBlock simpleProjection = new ByteCodeBlock(); + + if (isIdentityExpression(projection)) { + // if nothing is filtered out, copy the entire block, else project it + int channel = Iterables.getOnlyElement(getInputChannels(projection)); + simpleProjection.append(new IfStatement() + .condition(equal(cardinality, positionCount)) + .ifTrue(outputBlocks.setElement(projectionIndex, page.invoke("getBlock", Block.class, constantInt(channel)))) + .ifFalse(projectBlock)); + } + else if (isConstantExpression(projection)) { + // if projection is a constant, create RLE block of constant expression with cardinality positions + ConstantExpression constantExpression = (ConstantExpression) projection; + verify(getInputChannels(projection).isEmpty()); + + ByteCodeExpression type = pageBuilder.invoke("getType", Type.class, constantInt(projectionIndex)); + ByteCodeExpression value = loadConstant(callSiteBinder, constantExpression.getValue(), Object.class); + + simpleProjection.append(outputBlocks.setElement(projectionIndex, invokeStatic(RunLengthEncodedBlock.class, "create", Block.class, type, value, cardinality))); + } + else { + simpleProjection.append(projectBlock); + } + return simpleProjection; + } + + private static void generateGetNonLazyPageMethod(ClassDefinition classDefinition, RowExpression filter, List projections) + { + Parameter page = arg("page", Page.class); + MethodDefinition method = classDefinition.declareMethod(a(PRIVATE), "getNonLazyPage", type(Page.class), page); + + Scope scope = method.getScope(); + ByteCodeBlock body = method.getBody(); + + List allInputChannels = getInputChannels(concat(projections, ImmutableList.of(filter))); + if (allInputChannels.isEmpty()) { + body.append(page.ret()); + return; + } + + ImmutableMap.Builder builder = ImmutableMap.builder(); + for (int channel : allInputChannels) { + Variable blockVariable = scope.declareVariable("block_" + channel, body, page.invoke("getBlock", Block.class, constantInt(channel))); + builder.put(channel, blockVariable); + } + Map channelBlocks = builder.build(); + + Variable blocks = scope.declareVariable("blocks", body, page.invoke("getBlocks", Block[].class)); + Variable positionCount = scope.declareVariable("positionCount", body, page.invoke("getPositionCount", int.class)); + Variable createNewPage = scope.declareVariable("createNewPage", body, constantFalse()); + + for (Map.Entry entry : channelBlocks.entrySet()) { + int channel = entry.getKey(); + Variable inputBlock = entry.getValue(); + IfStatement ifStmt = new IfStatement(); + ifStmt.condition(inputBlock.instanceOf(LazyBlock.class)) + .ifTrue() + .append(blocks.setElement(channel, inputBlock.cast(LazyBlock.class).invoke("getBlock", Block.class))) + .append(createNewPage.set(constantTrue())); + body.append(ifStmt); + } + + body.append(new IfStatement() + .condition(createNewPage) + .ifTrue(page.set(newInstance(Page.class, positionCount, blocks)))); + + body.append(page.ret()); + } + + private static void generateFilterPageMethod(ClassDefinition classDefinition, RowExpression filter) + { + Parameter session = arg("session", ConnectorSession.class); + Parameter page = arg("page", Page.class); + + MethodDefinition method = classDefinition.declareMethod(a(PUBLIC), "filterPage", type(int[].class), session, page); + method.comment("Filter: %s rows in the page", filter.toString()); + + Scope scope = method.getScope(); + Variable thisVariable = method.getThis(); + ByteCodeBlock body = method.getBody(); + + Variable positionCount = scope.declareVariable("positionCount", body, page.invoke("getPositionCount", int.class)); + Variable selectedPositions = scope.declareVariable("selectedPositions", body, newArray(type(int[].class), positionCount)); + + List filterChannels = getInputChannels(filter); + + // extract block variables + ImmutableList.Builder blockVariablesBuilder = ImmutableList.builder(); + for (int channel : filterChannels) { + Variable blockVariable = scope.declareVariable("block_" + channel, body, page.invoke("getBlock", Block.class, constantInt(channel))); + blockVariablesBuilder.add(blockVariable); + } + List blockVariables = blockVariablesBuilder.build(); + + Variable selectedCount = scope.declareVariable("selectedCount", body, constantInt(0)); + Variable position = scope.declareVariable(int.class, "position"); + + IfStatement ifStatement = new IfStatement(); + ifStatement.condition(invokeFilter(thisVariable, session, blockVariables, position)) + .ifTrue() + .append(selectedPositions.setElement(selectedCount, position)) + .append(selectedCount.increment()); + + body.append(new ForLoop() + .initialize(position.set(constantInt(0))) + .condition(lessThan(position, positionCount)) + .update(position.increment()) + .body(ifStatement)); + + body.append(invokeStatic(Arrays.class, "copyOf", int[].class, selectedPositions, selectedCount).ret()); + } + private void generateFilterMethod(ClassDefinition classDefinition, CallSiteBinder callSiteBinder, RowExpression filter) { Parameter session = arg("session", ConnectorSession.class); @@ -218,6 +604,20 @@ private MethodDefinition generateProjectMethod(ClassDefinition classDefinition, return method; } + private static boolean isIdentityExpression(RowExpression expression) + { + List rowExpressions = Expressions.subExpressions(ImmutableList.of(expression)); + return rowExpressions.size() == 1 && Iterables.getOnlyElement(rowExpressions) instanceof InputReferenceExpression; + } + + private static boolean isConstantExpression(RowExpression expression) + { + List rowExpressions = Expressions.subExpressions(ImmutableList.of(expression)); + return rowExpressions.size() == 1 && + Iterables.getOnlyElement(rowExpressions) instanceof ConstantExpression && + ((ConstantExpression) Iterables.getOnlyElement(rowExpressions)).getValue() != null; + } + private static List getInputChannels(Iterable expressions) { TreeSet channels = new TreeSet<>(); diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/InterpretedProjectionFunction.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/InterpretedProjectionFunction.java index 946e40e98487..81b2cd91a4b5 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/InterpretedProjectionFunction.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/InterpretedProjectionFunction.java @@ -21,13 +21,17 @@ import com.facebook.presto.spi.block.BlockBuilder; import com.facebook.presto.spi.type.Type; import com.facebook.presto.sql.parser.SqlParser; +import com.facebook.presto.sql.tree.DefaultExpressionTraversalVisitor; import com.facebook.presto.sql.tree.Expression; import com.facebook.presto.sql.tree.ExpressionTreeRewriter; +import com.facebook.presto.sql.tree.InputReference; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import io.airlift.slice.Slice; import java.util.IdentityHashMap; import java.util.Map; +import java.util.Set; import static com.facebook.presto.sql.analyzer.ExpressionAnalyzer.getExpressionTypesFromInput; import static java.util.Objects.requireNonNull; @@ -37,6 +41,8 @@ public class InterpretedProjectionFunction { private final Type type; private final ExpressionInterpreter evaluator; + private final Set inputChannels; + private final boolean deterministic; public InterpretedProjectionFunction( Expression expression, @@ -58,6 +64,10 @@ public InterpretedProjectionFunction( this.type = requireNonNull(expressionTypes.get(rewritten), "type is null"); evaluator = ExpressionInterpreter.expressionInterpreter(rewritten, metadata, session, expressionTypes); + InputReferenceExtractor inputReferenceExtractor = new InputReferenceExtractor(); + inputReferenceExtractor.process(rewritten, null); + this.inputChannels = inputReferenceExtractor.getInputChannels(); + this.deterministic = DeterminismEvaluator.isDeterministic(expression); } @Override @@ -80,6 +90,18 @@ public void project(RecordCursor cursor, BlockBuilder output) append(output, value); } + @Override + public Set getInputChannels() + { + return inputChannels; + } + + @Override + public boolean isDeterministic() + { + return deterministic; + } + private void append(BlockBuilder output, Object value) { if (value == null) { @@ -105,4 +127,22 @@ else if (javaType == Slice.class) { type.writeObject(output, value); } } + + private static class InputReferenceExtractor + extends DefaultExpressionTraversalVisitor + { + private final ImmutableSet.Builder inputChannels = ImmutableSet.builder(); + + @Override + protected Void visitInputReference(InputReference node, Void context) + { + inputChannels.add(node.getChannel()); + return null; + } + + public Set getInputChannels() + { + return inputChannels.build(); + } + } } diff --git a/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java b/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java index b0bfaec2c292..b2a272635229 100644 --- a/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java +++ b/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java @@ -119,6 +119,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; @@ -659,5 +660,17 @@ public void project(RecordCursor cursor, BlockBuilder output) { throw new UnsupportedOperationException("Operation not supported"); } + + @Override + public Set getInputChannels() + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isDeterministic() + { + throw new UnsupportedOperationException(); + } } } diff --git a/presto-main/src/test/java/com/facebook/presto/SequencePageBuilder.java b/presto-main/src/test/java/com/facebook/presto/SequencePageBuilder.java index 1d88cffc186e..3897c37df956 100644 --- a/presto-main/src/test/java/com/facebook/presto/SequencePageBuilder.java +++ b/presto-main/src/test/java/com/facebook/presto/SequencePageBuilder.java @@ -70,4 +70,29 @@ else if (type.equals(TIMESTAMP)) { return new Page(blocks); } + + public static Page createSequencePageWithDictionaryBlocks(List types, int length) + { + return createSequencePageWithDictionaryBlocks(types, length, new int[types.size()]); + } + + public static Page createSequencePageWithDictionaryBlocks(List types, int length, int... initialValues) + { + Block[] blocks = new Block[initialValues.length]; + for (int i = 0; i < blocks.length; i++) { + Type type = types.get(i); + int initialValue = initialValues[i]; + if (type.equals(VARCHAR)) { + blocks[i] = BlockAssertions.createStringDictionaryBlock(initialValue, initialValue + length); + } + else if (type.equals(BIGINT)) { + blocks[i] = BlockAssertions.createLongDictionaryBlock(initialValue, initialValue + length); + } + else { + throw new IllegalStateException("Unsupported type " + type); + } + } + + return new Page(blocks); + } } diff --git a/presto-main/src/test/java/com/facebook/presto/block/BlockAssertions.java b/presto-main/src/test/java/com/facebook/presto/block/BlockAssertions.java index 13f12f50aef1..9f2f0e69ffac 100644 --- a/presto-main/src/test/java/com/facebook/presto/block/BlockAssertions.java +++ b/presto-main/src/test/java/com/facebook/presto/block/BlockAssertions.java @@ -16,6 +16,7 @@ import com.facebook.presto.spi.block.Block; import com.facebook.presto.spi.block.BlockBuilder; import com.facebook.presto.spi.block.BlockBuilderStatus; +import com.facebook.presto.spi.block.DictionaryBlock; import com.facebook.presto.spi.block.RunLengthEncodedBlock; import com.facebook.presto.spi.type.Type; import com.facebook.presto.type.ArrayType; @@ -33,6 +34,7 @@ import static com.facebook.presto.spi.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE; import static com.facebook.presto.spi.type.VarcharType.VARCHAR; import static com.facebook.presto.testing.TestingConnectorSession.SESSION; +import static io.airlift.slice.Slices.wrappedIntArray; import static java.util.Objects.requireNonNull; import static org.testng.Assert.assertEquals; @@ -109,6 +111,20 @@ public static Block createStringSequenceBlock(int start, int end) return builder.build(); } + public static Block createStringDictionaryBlock(int start, int length) + { + int dictionarySize = length / 5; + BlockBuilder builder = VARCHAR.createBlockBuilder(new BlockBuilderStatus(), dictionarySize); + for (int i = start; i < dictionarySize; i++) { + VARCHAR.writeString(builder, String.valueOf(i)); + } + int[] ids = new int[length]; + for (int i = 0; i < length; i++) { + ids[i] = length % dictionarySize; + } + return new DictionaryBlock(length, builder.build(), wrappedIntArray(ids)); + } + public static Block createStringArraysBlock(Iterable> values) { ArrayType arrayType = new ArrayType(VARCHAR); @@ -205,6 +221,20 @@ public static Block createLongSequenceBlock(int start, int end) return builder.build(); } + public static Block createLongDictionaryBlock(int start, int length) + { + int dictionarySize = length / 5; + BlockBuilder builder = BIGINT.createBlockBuilder(new BlockBuilderStatus(), dictionarySize); + for (int i = start; i < dictionarySize; i++) { + BIGINT.writeLong(builder, i); + } + int[] ids = new int[length]; + for (int i = 0; i < length; i++) { + ids[i] = length % dictionarySize; + } + return new DictionaryBlock(length, builder.build(), wrappedIntArray(ids)); + } + public static Block createLongRepeatBlock(int value, int length) { BlockBuilder builder = BIGINT.createFixedSizeBlockBuilder(length); diff --git a/presto-main/src/test/java/com/facebook/presto/operator/TestFilterAndProjectOperator.java b/presto-main/src/test/java/com/facebook/presto/operator/TestFilterAndProjectOperator.java index 888306ab08a4..c8ffdf2e0a85 100644 --- a/presto-main/src/test/java/com/facebook/presto/operator/TestFilterAndProjectOperator.java +++ b/presto-main/src/test/java/com/facebook/presto/operator/TestFilterAndProjectOperator.java @@ -20,11 +20,13 @@ import com.facebook.presto.spi.type.Type; import com.facebook.presto.testing.MaterializedResult; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import java.util.List; +import java.util.Set; import java.util.concurrent.ExecutorService; import static com.facebook.presto.RowPagesBuilder.rowPagesBuilder; @@ -143,5 +145,17 @@ public void project(RecordCursor cursor, BlockBuilder output) BIGINT.writeLong(output, cursor.getLong(channelIndex) + 5); } } + + @Override + public Set getInputChannels() + { + return ImmutableSet.of(channelIndex); + } + + @Override + public boolean isDeterministic() + { + return true; + } } } diff --git a/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java b/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java index 524a666bc17e..1ed3e1ff2c3c 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java @@ -36,7 +36,9 @@ public void testDefaults() .setOptimizeMetadataQueries(false) .setOptimizeHashGeneration(true) .setOptimizeSingleDistinct(true) - .setIntermediateAggregationsEnabled(false)); + .setIntermediateAggregationsEnabled(false) + .setColumnarProcessing(false) + .setColumnarProcessingDictionary(false)); } @Test @@ -51,6 +53,8 @@ public void testExplicitPropertyMappings() .put("optimizer.optimize-hash-generation", "false") .put("optimizer.optimize-single-distinct", "false") .put("optimizer.use-intermediate-aggregations", "true") + .put("optimizer.columnar-processing", "true") + .put("optimizer.columnar-processing-dictionary", "true") .build(); Map properties = new ImmutableMap.Builder() .put("experimental-syntax-enabled", "true") @@ -61,6 +65,8 @@ public void testExplicitPropertyMappings() .put("optimizer.optimize-hash-generation", "false") .put("optimizer.optimize-single-distinct", "false") .put("optimizer.use-intermediate-aggregations", "true") + .put("optimizer.columnar-processing", "true") + .put("optimizer.columnar-processing-dictionary", "true") .build(); FeaturesConfig expected = new FeaturesConfig() @@ -71,7 +77,9 @@ public void testExplicitPropertyMappings() .setOptimizeMetadataQueries(true) .setOptimizeHashGeneration(false) .setOptimizeSingleDistinct(false) - .setIntermediateAggregationsEnabled(true); + .setIntermediateAggregationsEnabled(true) + .setColumnarProcessing(true) + .setColumnarProcessingDictionary(true); assertFullMapping(properties, expected); assertDeprecatedEquivalence(FeaturesConfig.class, properties, propertiesLegacy); diff --git a/presto-main/src/test/java/com/facebook/presto/sql/gen/BenchmarkPageProcessor.java b/presto-main/src/test/java/com/facebook/presto/sql/gen/BenchmarkPageProcessor.java index 0721d5493a54..d51f816f4833 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/gen/BenchmarkPageProcessor.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/gen/BenchmarkPageProcessor.java @@ -22,6 +22,7 @@ import com.facebook.presto.spi.PageBuilder; import com.facebook.presto.spi.block.Block; import com.facebook.presto.spi.type.StandardTypes; +import com.facebook.presto.spi.type.Type; import com.facebook.presto.sql.relational.RowExpression; import com.google.common.collect.ImmutableList; import io.airlift.slice.Slice; @@ -42,6 +43,7 @@ import org.openjdk.jmh.runner.options.VerboseMode; import java.util.Iterator; +import java.util.List; import java.util.concurrent.TimeUnit; import static com.facebook.presto.metadata.FunctionKind.SCALAR; @@ -155,6 +157,18 @@ public int process(ConnectorSession session, Page page, int start, int end, Page return position; } + @Override + public Page processColumnar(ConnectorSession session, Page page, List types) + { + throw new UnsupportedOperationException(); + } + + @Override + public Page processColumnarDictionary(ConnectorSession session, Page page, List types) + { + throw new UnsupportedOperationException(); + } + private static void project(int position, PageBuilder pageBuilder, Block extendedPriceBlock, Block discountBlock) { pageBuilder.declarePosition(); diff --git a/presto-main/src/test/java/com/facebook/presto/sql/gen/PageProcessorBenchmark.java b/presto-main/src/test/java/com/facebook/presto/sql/gen/PageProcessorBenchmark.java new file mode 100644 index 000000000000..59326b1f622a --- /dev/null +++ b/presto-main/src/test/java/com/facebook/presto/sql/gen/PageProcessorBenchmark.java @@ -0,0 +1,199 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.sql.gen; + +import com.facebook.presto.SequencePageBuilder; +import com.facebook.presto.Session; +import com.facebook.presto.metadata.Metadata; +import com.facebook.presto.operator.PageProcessor; +import com.facebook.presto.spi.Page; +import com.facebook.presto.spi.PageBuilder; +import com.facebook.presto.spi.type.Type; +import com.facebook.presto.sql.parser.SqlParser; +import com.facebook.presto.sql.planner.Symbol; +import com.facebook.presto.sql.planner.SymbolToInputRewriter; +import com.facebook.presto.sql.relational.RowExpression; +import com.facebook.presto.sql.relational.SqlToRowExpressionTranslator; +import com.facebook.presto.sql.tree.Expression; +import com.facebook.presto.sql.tree.ExpressionTreeRewriter; +import com.facebook.presto.testing.TestingSession; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.jetbrains.annotations.NotNull; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; +import org.openjdk.jmh.runner.options.VerboseMode; + +import java.util.HashMap; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static com.facebook.presto.metadata.FunctionKind.SCALAR; +import static com.facebook.presto.metadata.MetadataManager.createTestMetadataManager; +import static com.facebook.presto.operator.scalar.FunctionAssertions.createExpression; +import static com.facebook.presto.spi.type.BigintType.BIGINT; +import static com.facebook.presto.spi.type.VarcharType.VARCHAR; +import static com.facebook.presto.sql.analyzer.ExpressionAnalyzer.getExpressionTypesFromInput; +import static java.util.Locale.ENGLISH; +import static java.util.stream.Collectors.toList; + +@State(Scope.Thread) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@Fork(10) +@Warmup(iterations = 10) +@Measurement(iterations = 10) +@BenchmarkMode(Mode.AverageTime) +public class PageProcessorBenchmark +{ + private static final Map TYPE_MAP = ImmutableMap.of("bigint", BIGINT, "varchar", VARCHAR); + private static final SqlParser SQL_PARSER = new SqlParser(); + private static final Metadata METADATA = createTestMetadataManager(); + private static final Session TEST_SESSION = TestingSession.testSessionBuilder().build(); + private static final int POSITIONS = 1024; + + private final Map symbolTypes = new HashMap<>(); + private final Map sourceLayout = new HashMap<>(); + + private PageProcessor processor; + private Page inputPage; + private List types; + + @Param({ "2", "4", "8", "16", "32" }) + int columnCount; + + @Param({ "varchar", "bigint" }) + String type; + + @Param({ "false", "true" }) + boolean dictionaryBlocks; + + @Setup + public void setup() + { + Type type = TYPE_MAP.get(this.type); + + for (int i = 0; i < columnCount; i++) { + Symbol symbol = new Symbol(type.getDisplayName().toLowerCase(ENGLISH) + i); + symbolTypes.put(symbol, type); + sourceLayout.put(symbol, i); + } + + List projections = getProjections(type); + types = projections.stream().map(RowExpression::getType).collect(toList()); + + inputPage = createPage(types, dictionaryBlocks); + processor = new ExpressionCompiler(createTestMetadataManager()).compilePageProcessor(getFilter(type), projections); + } + + @Benchmark + public Page rowOriented() + { + PageBuilder pageBuilder = new PageBuilder(types); + int end = processor.process(null, inputPage, 0, inputPage.getPositionCount(), pageBuilder); + return pageBuilder.build(); + } + + @Benchmark + public Page columnOriented() + { + return processor.processColumnar(null, inputPage, types); + } + + @Benchmark + public Page columnOrientedDictionary() + { + return processor.processColumnarDictionary(null, inputPage, types); + } + + private RowExpression getFilter(Type type) + { + if (type == VARCHAR) { + return rowExpression("cast(varchar0 as bigint) % 2 = 0", VARCHAR); + } + if (type == BIGINT) { + return rowExpression("bigint0 % 2 = 0", BIGINT); + } + throw new IllegalArgumentException("filter not supported for type : " + type); + } + + private List getProjections(Type type) + { + ImmutableList.Builder builder = ImmutableList.builder(); + if (type == BIGINT) { + for (int i = 0; i < columnCount; i++) { + builder.add(rowExpression("bigint" + i + " + 5", type)); + } + } + else if (type == VARCHAR) { + for (int i = 0; i < columnCount; i++) { + // alternatively use identity expression rowExpression("varchar" + i, type) or + // rowExpression("substr(varchar" + i + ", 1, 1)", type) + builder.add(rowExpression("concat(varchar" + i + ", 'foo')", type)); + } + } + return builder.build(); + } + + private RowExpression rowExpression(String expression, Type type) + { + SymbolToInputRewriter symbolToInputRewriter = new SymbolToInputRewriter(sourceLayout); + Expression inputReferenceExpression = ExpressionTreeRewriter.rewriteWith(symbolToInputRewriter, createExpression(expression, METADATA, symbolTypes)); + + ImmutableMap.Builder builder = ImmutableMap.builder(); + for (int i = 0; i < columnCount; i++) { + builder.put(i, type); + } + Map types = builder.build(); + + IdentityHashMap expressionTypes = getExpressionTypesFromInput(TEST_SESSION, METADATA, SQL_PARSER, types, inputReferenceExpression); + return SqlToRowExpressionTranslator.translate(inputReferenceExpression, SCALAR, expressionTypes, METADATA.getFunctionRegistry(), METADATA.getTypeManager(), TEST_SESSION, true); + } + + @NotNull + private static Page createPage(List types, boolean dictionary) + { + if (dictionary) { + return SequencePageBuilder.createSequencePageWithDictionaryBlocks(types, POSITIONS); + } + else { + return SequencePageBuilder.createSequencePage(types, POSITIONS); + } + } + + public static void main(String[] args) + throws RunnerException + { + Options options = new OptionsBuilder() + .verbosity(VerboseMode.NORMAL) + .include(".*" + PageProcessorBenchmark.class.getSimpleName() + ".*") + .build(); + + new Runner(options).run(); + } +} diff --git a/presto-raptor/src/test/java/com/facebook/presto/raptor/RaptorQueryRunner.java b/presto-raptor/src/test/java/com/facebook/presto/raptor/RaptorQueryRunner.java index 103874e93967..d91063f1f0f5 100644 --- a/presto-raptor/src/test/java/com/facebook/presto/raptor/RaptorQueryRunner.java +++ b/presto-raptor/src/test/java/com/facebook/presto/raptor/RaptorQueryRunner.java @@ -88,6 +88,7 @@ private static Session createSession(String schema) return testSessionBuilder() .setCatalog("raptor") .setSchema(schema) + .setSystemProperties(ImmutableMap.of("columnar_processing_dictionary", "true")) .build(); } diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/PageBuilder.java b/presto-spi/src/main/java/com/facebook/presto/spi/PageBuilder.java index ab5f1be4d0da..8b72f068f299 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/PageBuilder.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/PageBuilder.java @@ -95,6 +95,11 @@ public void declarePosition() declaredPositions++; } + public void declarePositions(int positions) + { + declaredPositions = positions; + } + public boolean isFull() { return declaredPositions == Integer.MAX_VALUE || pageBuilderStatus.isFull(); diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/block/LazySliceArrayBlock.java b/presto-spi/src/main/java/com/facebook/presto/spi/block/LazySliceArrayBlock.java index 766109c96cdf..e2f6c869b6d6 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/block/LazySliceArrayBlock.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/block/LazySliceArrayBlock.java @@ -282,6 +282,30 @@ public int[] getIds() return ids; } + public DictionaryBlock createDictionaryBlock() + { + if (!dictionary) { + throw new IllegalStateException("cannot create dictionary block"); + } + // if nulls are encoded in values, we can create a new dictionary block + if (isNull == null) { + return new DictionaryBlock(getPositionCount(), new SliceArrayBlock(values.length, values), wrappedIntArray(ids)); + } + + boolean hasNulls = false; + int[] newIds = Arrays.copyOf(ids, ids.length); + for (int position = 0; position < positionCount; position++) { + if (isEntryNull(position)) { + hasNulls = true; + newIds[position] = values.length; + } + } + + // if we found a null, create a new values array with null at the end + Slice[] newValues = hasNulls ? Arrays.copyOf(values, values.length + 1) : values; + return new DictionaryBlock(getPositionCount(), new SliceArrayBlock(newValues.length, newValues), wrappedIntArray(newIds)); + } + public boolean isDictionary() { assureLoaded(); @@ -320,23 +344,6 @@ public Block createNonLazyBlock() if (!dictionary) { return new SliceArrayBlock(getPositionCount(), values); } - - // if nulls are encoded in values, we can create a new dictionary block - if (isNull == null) { - return new DictionaryBlock(getPositionCount(), new SliceArrayBlock(values.length, values), wrappedIntArray(ids)); - } - - boolean hasNulls = false; - int[] newIds = Arrays.copyOf(ids, ids.length); - for (int position = 0; position < positionCount; position++) { - if (isEntryNull(position)) { - hasNulls = true; - newIds[position] = values.length; - } - } - - // if we found a null, create a new values array with null at the end - Slice[] newValues = hasNulls ? Arrays.copyOf(values, values.length + 1) : values; - return new DictionaryBlock(getPositionCount(), new SliceArrayBlock(newValues.length, newValues), wrappedIntArray(newIds)); + return createDictionaryBlock(); } } diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/block/RunLengthEncodedBlock.java b/presto-spi/src/main/java/com/facebook/presto/spi/block/RunLengthEncodedBlock.java index 399e1bee8d50..44a326c8ff83 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/block/RunLengthEncodedBlock.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/block/RunLengthEncodedBlock.java @@ -13,6 +13,8 @@ */ package com.facebook.presto.spi.block; +import com.facebook.presto.spi.predicate.Utils; +import com.facebook.presto.spi.type.Type; import io.airlift.slice.Slice; import org.openjdk.jol.info.ClassLayout; @@ -27,6 +29,12 @@ public class RunLengthEncodedBlock { private static final int INSTANCE_SIZE = ClassLayout.parseClass(RunLengthEncodedBlock.class).instanceSize(); + public static Block create(Type type, Object value, int positionCount) + { + Block block = Utils.nativeValueToBlock(type, value); + return new RunLengthEncodedBlock(block, positionCount); + } + private final Block value; private final int positionCount; diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/predicate/Utils.java b/presto-spi/src/main/java/com/facebook/presto/spi/predicate/Utils.java index fd2ab6f5517d..1224b8550516 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/predicate/Utils.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/predicate/Utils.java @@ -27,7 +27,7 @@ private Utils() { } - static Block nativeValueToBlock(Type type, Object object) + public static Block nativeValueToBlock(Type type, Object object) { if (!Primitives.wrap(type.getJavaType()).isInstance(object)) { throw new IllegalArgumentException(String.format("Object '%s' does not match type %s", object, type.getJavaType()));