Skip to content

Commit

Permalink
Add columnar processing for dictionary encoded blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
nileema committed Dec 11, 2015
1 parent 3faf4a7 commit b5e90ab
Show file tree
Hide file tree
Showing 25 changed files with 1,125 additions and 33 deletions.
Expand Up @@ -92,6 +92,18 @@ public int process(ConnectorSession session, Page page, int start, int end, Page
return position; return position;
} }


@Override
public Page processColumnar(ConnectorSession session, Page page, List<? extends Type> types)
{
throw new UnsupportedOperationException();
}

@Override
public Page processColumnarDictionary(ConnectorSession session, Page page, List<? extends Type> types)
{
throw new UnsupportedOperationException();
}

private static void project(int position, PageBuilder pageBuilder, Block extendedPriceBlock, Block discountBlock) private static void project(int position, PageBuilder pageBuilder, Block extendedPriceBlock, Block discountBlock)
{ {
if (discountBlock.isNull(position) || extendedPriceBlock.isNull(position)) { if (discountBlock.isNull(position) || extendedPriceBlock.isNull(position)) {
Expand Down
Expand Up @@ -49,6 +49,8 @@ public final class SystemSessionProperties
public static final String QUERY_MAX_RUN_TIME = "query_max_run_time"; public static final String QUERY_MAX_RUN_TIME = "query_max_run_time";
public static final String REDISTRIBUTE_WRITES = "redistribute_writes"; public static final String REDISTRIBUTE_WRITES = "redistribute_writes";
public static final String EXECUTION_POLICY = "execution_policy"; public static final String EXECUTION_POLICY = "execution_policy";
public static final String COLUMNAR_PROCESSING = "columnar_processing";
public static final String COLUMNAR_PROCESSING_DICTIONARY = "columnar_processing_dictionary";


private final List<PropertyMetadata<?>> sessionProperties; private final List<PropertyMetadata<?>> sessionProperties;


Expand Down Expand Up @@ -150,7 +152,17 @@ public SystemSessionProperties(
DataSize.class, DataSize.class,
memoryManagerConfig.getMaxQueryMemory(), memoryManagerConfig.getMaxQueryMemory(),
true, true,
value -> DataSize.valueOf((String) value))); value -> DataSize.valueOf((String) value)),
booleanSessionProperty(
COLUMNAR_PROCESSING,
"Use columnar processing",
featuresConfig.isColumnarProcessing(),
false),
booleanSessionProperty(
COLUMNAR_PROCESSING_DICTIONARY,
"Use columnar processing with optimizations for dictionaries",
featuresConfig.isColumnarProcessingDictionary(),
false));
} }


public List<PropertyMetadata<?>> getSessionProperties() public List<PropertyMetadata<?>> getSessionProperties()
Expand Down Expand Up @@ -223,6 +235,16 @@ public static boolean isShareIndexLoading(Session session)
return session.getProperty(TASK_SHARE_INDEX_LOADING, Boolean.class); return session.getProperty(TASK_SHARE_INDEX_LOADING, Boolean.class);
} }


public static boolean isColumnarProcessingEnabled(Session session)
{
return session.getProperty(COLUMNAR_PROCESSING, Boolean.class);
}

public static boolean isColumnarProcessingDictionaryEnabled(Session session)
{
return session.getProperty(COLUMNAR_PROCESSING_DICTIONARY, Boolean.class);
}

public static DataSize getQueryMaxMemory(Session session) public static DataSize getQueryMaxMemory(Session session)
{ {
return session.getProperty(QUERY_MAX_MEMORY, DataSize.class); return session.getProperty(QUERY_MAX_MEMORY, DataSize.class);
Expand Down
Expand Up @@ -612,4 +612,13 @@ public static ByteCodeExpression inlineIf(ByteCodeExpression condition, ByteCode
{ {
return new InlineIfByteCodeExpression(condition, ifTrue, ifFalse); return new InlineIfByteCodeExpression(condition, ifTrue, ifFalse);
} }

//
// Print
//
public static ByteCodeExpression print(ByteCodeExpression variable)
{
ByteCodeExpression out = getStatic(System.class, "out");
return out.invoke("println", void.class, variable);
}
} }
Expand Up @@ -20,6 +20,8 @@


import java.util.List; import java.util.List;


import static com.facebook.presto.SystemSessionProperties.isColumnarProcessingDictionaryEnabled;
import static com.facebook.presto.SystemSessionProperties.isColumnarProcessingEnabled;
import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;


Expand All @@ -31,6 +33,8 @@ public class FilterAndProjectOperator


private final PageBuilder pageBuilder; private final PageBuilder pageBuilder;
private final PageProcessor processor; private final PageProcessor processor;
private final boolean columnarProcessingEnabled;
private final boolean columnarProcessingDictionaryEnabled;
private Page currentPage; private Page currentPage;
private int currentPosition; private int currentPosition;
private boolean finishing; private boolean finishing;
Expand All @@ -40,6 +44,8 @@ public FilterAndProjectOperator(OperatorContext operatorContext, Iterable<? exte
this.processor = requireNonNull(processor, "processor is null"); this.processor = requireNonNull(processor, "processor is null");
this.operatorContext = requireNonNull(operatorContext, "operatorContext is null"); this.operatorContext = requireNonNull(operatorContext, "operatorContext is null");
this.types = ImmutableList.copyOf(requireNonNull(types, "types is null")); this.types = ImmutableList.copyOf(requireNonNull(types, "types is null"));
this.columnarProcessingEnabled = isColumnarProcessingEnabled(operatorContext.getSession());
this.columnarProcessingDictionaryEnabled = isColumnarProcessingDictionaryEnabled(operatorContext.getSession());
this.pageBuilder = new PageBuilder(getTypes()); this.pageBuilder = new PageBuilder(getTypes());
} }


Expand Down Expand Up @@ -88,10 +94,24 @@ public final void addInput(Page page)
public final Page getOutput() public final Page getOutput()
{ {
if (!pageBuilder.isFull() && currentPage != null) { if (!pageBuilder.isFull() && currentPage != null) {
currentPosition = processor.process(operatorContext.getSession().toConnectorSession(), currentPage, currentPosition, currentPage.getPositionCount(), pageBuilder); if (columnarProcessingDictionaryEnabled) {
if (currentPosition == currentPage.getPositionCount()) { Page page = processor.processColumnarDictionary(operatorContext.getSession().toConnectorSession(), currentPage, getTypes());
currentPage = null; currentPage = null;
currentPosition = 0; currentPosition = 0;
return page;
}
else if (columnarProcessingEnabled) {
Page page = processor.processColumnar(operatorContext.getSession().toConnectorSession(), currentPage, getTypes());
currentPage = null;
currentPosition = 0;
return page;
}
else {
currentPosition = processor.process(operatorContext.getSession().toConnectorSession(), currentPage, currentPosition, currentPage.getPositionCount(), pageBuilder);
if (currentPosition == currentPage.getPositionCount()) {
currentPage = null;
currentPosition = 0;
}
} }
} }


Expand Down
Expand Up @@ -16,12 +16,27 @@
import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.Page; import com.facebook.presto.spi.Page;
import com.facebook.presto.spi.PageBuilder; import com.facebook.presto.spi.PageBuilder;
import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.block.BlockBuilder;
import com.facebook.presto.spi.block.BlockBuilderStatus;
import com.facebook.presto.spi.block.DictionaryBlock;
import com.facebook.presto.spi.block.LazyBlock;
import com.facebook.presto.spi.type.Type;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.airlift.slice.SizeOf;
import io.airlift.slice.Slice;


import java.util.List; import java.util.List;
import java.util.Set;

import static com.google.common.base.Verify.verify;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.airlift.slice.Slices.wrappedIntArray;
import static java.util.Arrays.copyOf;


public class GenericPageProcessor public class GenericPageProcessor
implements PageProcessor implements PageProcessor
{ {
private final FilterFunction filterFunction; private final FilterFunction filterFunction;
private final List<ProjectionFunction> projections; private final List<ProjectionFunction> projections;
Expand All @@ -36,16 +51,154 @@ public GenericPageProcessor(FilterFunction filterFunction, Iterable<? extends Pr
public int process(ConnectorSession session, Page page, int start, int end, PageBuilder pageBuilder) public int process(ConnectorSession session, Page page, int start, int end, PageBuilder pageBuilder)
{ {
int position = start; int position = start;
Block[] inputBlocks = page.getBlocks();

for (; position < end && !pageBuilder.isFull(); position++) { for (; position < end && !pageBuilder.isFull(); position++) {
if (filterFunction.filter(position, page.getBlocks())) { if (filterFunction.filter(position, page.getBlocks())) {
pageBuilder.declarePosition(); pageBuilder.declarePosition();
for (int i = 0; i < projections.size(); i++) { for (int i = 0; i < projections.size(); i++) {
// todo: if the projection function increases the size of the data significantly, this could cause the servers to OOM // todo: if the projection function increases the size of the data significantly, this could cause the servers to OOM
projections.get(i).project(position, page.getBlocks(), pageBuilder.getBlockBuilder(i)); projections.get(i).project(position, inputBlocks, pageBuilder.getBlockBuilder(i));
} }
} }
} }

return position; return position;
} }

@Override
public Page processColumnar(ConnectorSession session, Page page, List<? extends Type> types)
{
int[] selectedPositions = filterPage(page);
if (selectedPositions.length == 0) {
return null;
}

if (projections.isEmpty()) {
return new Page(selectedPositions.length);
}

PageBuilder pageBuilder = new PageBuilder(types);
Block[] inputBlocks = page.getBlocks();

for (int projectionIndex = 0; projectionIndex < projections.size(); projectionIndex++) {
ProjectionFunction projection = projections.get(projectionIndex);
projectColumnar(selectedPositions, pageBuilder.getBlockBuilder(projectionIndex), inputBlocks, projection);
}
pageBuilder.declarePositions(selectedPositions.length);
return pageBuilder.build();
}

@Override
public Page processColumnarDictionary(ConnectorSession session, Page page, List<? extends Type> types)
{
Page inputPage = getNonLazyPage(page);
int[] selectedPositions = filterPage(inputPage);

if (selectedPositions.length == 0) {
return null;
}

if (projections.isEmpty()) {
return new Page(selectedPositions.length);
}

PageBuilder pageBuilder = new PageBuilder(types);
Block[] inputBlocks = page.getBlocks();
Block[] outputBlocks = new Block[projections.size()];

for (int projectionIndex = 0; projectionIndex < projections.size(); projectionIndex++) {
ProjectionFunction projection = projections.get(projectionIndex);

if (canDictionaryProcess(projection, inputPage)) {
Block outputDictionary = projectDictionary(projection, inputPage);
int[] outputIds = filterIds(projection, inputPage, selectedPositions);
outputBlocks[projectionIndex] = new DictionaryBlock(selectedPositions.length, outputDictionary, wrappedIntArray(outputIds));
}
else {
outputBlocks[projectionIndex] = projectColumnar(selectedPositions, pageBuilder.getBlockBuilder(projectionIndex), inputBlocks, projection).build();
}
}

for (Block block : outputBlocks) {
verify(block.getPositionCount() == selectedPositions.length);
}
return new Page(selectedPositions.length, outputBlocks);
}

private static BlockBuilder projectColumnar(int[] selectedPositions, BlockBuilder blockBuilder, Block[] inputBlocks, ProjectionFunction projection)
{
for (int position : selectedPositions) {
projection.project(position, inputBlocks, blockBuilder);
}
return blockBuilder;
}

private static int[] filterIds(ProjectionFunction projection, Page page, int[] selectedPositions)
{
Slice ids = ((DictionaryBlock) page.getBlock(getOnlyElement(projection.getInputChannels()))).getIds();

int[] outputIds = new int[selectedPositions.length];
for (int pos = 0; pos < selectedPositions.length; pos++) {
outputIds[pos] = ids.getInt(selectedPositions[pos] * SizeOf.SIZE_OF_INT);
}
return outputIds;
}

private static Block projectDictionary(ProjectionFunction projection, Page page)
{
int inputChannel = getOnlyElement(projection.getInputChannels());
Block dictionary = ((DictionaryBlock) page.getBlock(inputChannel)).getDictionary();

BlockBuilder dictionaryBuilder = projection.getType().createBlockBuilder(new BlockBuilderStatus(), dictionary.getPositionCount());
Block[] blocks = new Block[page.getChannelCount()];
blocks[inputChannel] = dictionary;

for (int i = 0; i < dictionary.getPositionCount(); i++) {
projection.project(i, blocks, dictionaryBuilder);
}
return dictionaryBuilder.build();
}

private static boolean canDictionaryProcess(ProjectionFunction projection, Page inputPage)
{
Set<Integer> inputChannels = projection.getInputChannels();
return projection.isDeterministic()
&& inputChannels.size() == 1
&& (inputPage.getBlock(getOnlyElement(inputChannels)) instanceof DictionaryBlock);
}

private Page getNonLazyPage(Page page)
{
ImmutableSet.Builder<Integer> builder = ImmutableSet.builder();
for (ProjectionFunction projection : projections) {
builder.addAll(projection.getInputChannels());
}
Set<Integer> inputChannels = builder.build();

if (inputChannels.isEmpty()) {
return page;
}

Block[] blocks = page.getBlocks();
for (int inputChannel : inputChannels) {
Block block = page.getBlock(inputChannel);
if (block instanceof LazyBlock) {
blocks[inputChannel] = ((LazyBlock) block).getBlock();
}
}
return new Page(blocks);
}

private int[] filterPage(Page page)
{
int[] selected = new int[page.getPositionCount()];
int index = 0;
for (int position = 0; position < page.getPositionCount(); position++) {
if (filterFunction.filter(position, page.getBlocks())) {
selected[index] = position;
index++;
}
}
return copyOf(selected, index);
}
} }
Expand Up @@ -16,8 +16,21 @@
import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.Page; import com.facebook.presto.spi.Page;
import com.facebook.presto.spi.PageBuilder; import com.facebook.presto.spi.PageBuilder;
import com.facebook.presto.spi.type.Type;

import javax.annotation.Nullable;

import java.util.List;


public interface PageProcessor public interface PageProcessor
{ {
int process(ConnectorSession session, Page page, int start, int end, PageBuilder pageBuilder); int process(ConnectorSession session, Page page, int start, int end, PageBuilder pageBuilder);

/**
* @return filtered and projected page, null if all positions are filtered
*/
@Nullable
Page processColumnar(ConnectorSession session, Page page, List<? extends Type> types);

Page processColumnarDictionary(ConnectorSession session, Page page, List<? extends Type> types);
} }
Expand Up @@ -18,11 +18,17 @@
import com.facebook.presto.spi.block.BlockBuilder; import com.facebook.presto.spi.block.BlockBuilder;
import com.facebook.presto.spi.type.Type; import com.facebook.presto.spi.type.Type;


import java.util.Set;

public interface ProjectionFunction public interface ProjectionFunction
{ {
Type getType(); Type getType();


void project(int position, Block[] blocks, BlockBuilder output); void project(int position, Block[] blocks, BlockBuilder output);


void project(RecordCursor cursor, BlockBuilder output); void project(RecordCursor cursor, BlockBuilder output);

Set<Integer> getInputChannels();

boolean isDeterministic();
} }
Expand Up @@ -18,8 +18,11 @@
import com.facebook.presto.spi.block.BlockBuilder; import com.facebook.presto.spi.block.BlockBuilder;
import com.facebook.presto.spi.type.Type; import com.facebook.presto.spi.type.Type;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import io.airlift.slice.Slice; import io.airlift.slice.Slice;


import java.util.Set;

import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;


public final class ProjectionFunctions public final class ProjectionFunctions
Expand Down Expand Up @@ -90,5 +93,17 @@ else if (javaType == Slice.class) {
} }
} }
} }

@Override
public Set<Integer> getInputChannels()
{
return ImmutableSet.of(channelIndex);
}

@Override
public boolean isDeterministic()
{
return true;
}
} }
} }

0 comments on commit b5e90ab

Please sign in to comment.