Skip to content

Commit

Permalink
In memory orderby operator
Browse files Browse the repository at this point in the history
  • Loading branch information
erichwang committed Nov 8, 2012
1 parent 139d6c3 commit 908e1c5
Show file tree
Hide file tree
Showing 4 changed files with 337 additions and 0 deletions.
@@ -0,0 +1,184 @@
package com.facebook.presto.operator;

import com.facebook.presto.block.Block;
import com.facebook.presto.block.BlockBuilder;
import com.facebook.presto.block.BlockCursor;
import com.facebook.presto.tuple.FieldOrderedTupleComparator;
import com.facebook.presto.tuple.Tuple;
import com.facebook.presto.tuple.TupleReadable;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;

import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;

import static com.facebook.presto.hive.shaded.com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

public class InMemoryOrderByOperator
implements Operator
{
private static final int MAX_IN_MEMORY_SORT_SIZE = 1_000_000;

private final Operator source;
private final int keyChannelIndex;
private final List<ProjectionFunction> projections;
private final Ordering<TupleReadable> ordering;

public InMemoryOrderByOperator(Operator source, int keyChannelIndex, List<ProjectionFunction> projections, Ordering<TupleReadable> ordering)
{
checkNotNull(source, "source is null");
checkArgument(keyChannelIndex >= 0, "keyChannelIndex must be at least zero");
checkNotNull(projections, "projections is null");
checkArgument(!projections.isEmpty(), "projections is empty");
checkNotNull(ordering, "ordering is null");
this.source = source;
this.keyChannelIndex = keyChannelIndex;
this.projections = ImmutableList.copyOf(projections);
this.ordering = ordering;
}

public InMemoryOrderByOperator(Operator source, int keyChannelIndex, List<ProjectionFunction> projections)
{
this(source, keyChannelIndex, projections, Ordering.from(FieldOrderedTupleComparator.INSTANCE));
}

@Override
public int getChannelCount()
{
return projections.size();
}

@Override
public Iterator<Page> iterator()
{
return new InMemoryOrderByIterator(source.iterator());
}

private class InMemoryOrderByIterator
extends AbstractIterator<Page>
{
private final Iterator<Page> pageIterator;
private Iterator<KeyAndTuples> outputIterator;
private long position;

private InMemoryOrderByIterator(Iterator<Page> pageIterator)
{
this.pageIterator = pageIterator;
}

@Override
protected Page computeNext()
{
if (outputIterator == null) {
outputIterator = materializeTuplesAndSort().iterator();
}

if (!outputIterator.hasNext()) {
return endOfData();
}

BlockBuilder[] outputs = new BlockBuilder[projections.size()];
for (int i = 0; i < outputs.length; i++) {
outputs[i] = new BlockBuilder(position, projections.get(i).getTupleInfo());
}

while (!isFull(outputs) && outputIterator.hasNext()) {
KeyAndTuples next = outputIterator.next();
for (int i = 0; i < projections.size(); i++) {
projections.get(i).project(next.getTuples(), outputs[i]);
}
}

Block[] blocks = new Block[projections.size()];
for (int i = 0; i < blocks.length; i++) {
blocks[i] = outputs[i].build();
}

Page page = new Page(blocks);
position += page.getPositionCount();
return page;
}

private List<KeyAndTuples> materializeTuplesAndSort() {
List<KeyAndTuples> keyAndTuplesList = Lists.newArrayList();
while (pageIterator.hasNext()) {
Page page = pageIterator.next();
Block[] blocks = page.getBlocks();
BlockCursor[] cursors = new BlockCursor[blocks.length];
for (int i = 0; i < cursors.length; i++) {
cursors[i] = blocks[i].cursor();
}
for (int position = 0; position < page.getPositionCount(); position++) {
for (BlockCursor cursor : cursors) {
checkState(cursor.advanceNextPosition());
}
keyAndTuplesList.add(getKeyAndTuples(cursors));
}
checkState(keyAndTuplesList.size() <= MAX_IN_MEMORY_SORT_SIZE, "Too many tuples for in memory sort");
}
Collections.sort(keyAndTuplesList, KeyAndTuples.keyComparator(ordering));
return keyAndTuplesList;
}

private KeyAndTuples getKeyAndTuples(BlockCursor[] cursors)
{
// TODO: pre-project columns to minimize storage in global candidate set
Tuple key = cursors[keyChannelIndex].getTuple();
Tuple[] tuples = new Tuple[cursors.length];
for (int channel = 0; channel < cursors.length; channel++) {
tuples[channel] = (channel == keyChannelIndex) ? key : cursors[channel].getTuple();
}
return new KeyAndTuples(key, tuples);
}

private boolean isFull(BlockBuilder... outputs)
{
for (BlockBuilder output : outputs) {
if (output.isFull()) {
return true;
}
}
return false;
}
}

private static class KeyAndTuples
{
private final Tuple key;
private final Tuple[] tuples;

private KeyAndTuples(Tuple key, Tuple[] tuples)
{
this.key = key;
this.tuples = tuples;
}

public Tuple getKey()
{
return key;
}

public Tuple[] getTuples()
{
return tuples;
}

public static Comparator<KeyAndTuples> keyComparator(final Comparator<TupleReadable> tupleReadableComparator)
{
return new Comparator<KeyAndTuples>()
{
@Override
public int compare(KeyAndTuples o1, KeyAndTuples o2)
{
return tupleReadableComparator.compare(o1.getKey(), o2.getKey());
}
};
}
}
}
Expand Up @@ -23,6 +23,7 @@ public class BenchmarkSuite
new PredicateFilterBenchmark(),
new RawStreamingBenchmark(),
new Top100Benchmark(),
new InMemoryOrderByBenchmark(),

// sql benchmarks
new GroupBySumWithArithmeticSqlBenchmark(),
Expand Down
@@ -0,0 +1,39 @@
package com.facebook.presto.benchmark;

import com.facebook.presto.block.BlockIterable;
import com.facebook.presto.operator.AlignmentOperator;
import com.facebook.presto.operator.InMemoryOrderByOperator;
import com.facebook.presto.operator.LimitOperator;
import com.facebook.presto.operator.Operator;
import com.facebook.presto.serde.BlocksFileEncoding;
import com.facebook.presto.tpch.TpchBlocksProvider;
import com.facebook.presto.tpch.TpchSchema;
import com.facebook.presto.tuple.TupleInfo;
import com.google.common.collect.ImmutableList;

import static com.facebook.presto.operator.ProjectionFunctions.singleColumn;

public class InMemoryOrderByBenchmark
extends AbstractOperatorBenchmark
{
public InMemoryOrderByBenchmark()
{
super("in_memory_orderby_100k", 3, 30);
}

@Override
protected Operator createBenchmarkedOperator(TpchBlocksProvider inputStreamProvider)
{
BlockIterable totalPrice = inputStreamProvider.getBlocks(TpchSchema.Orders.TOTALPRICE, BlocksFileEncoding.RAW);
AlignmentOperator alignmentOperator = new AlignmentOperator(totalPrice);
LimitOperator limitOperator = new LimitOperator(alignmentOperator, 100_000);
return new InMemoryOrderByOperator(limitOperator, 0, ImmutableList.of(singleColumn(TupleInfo.Type.DOUBLE, 0, 0)));
}

public static void main(String[] args)
{
new InMemoryOrderByBenchmark().runBenchmark(
new SimpleLineBenchmarkResultWriter(System.out)
);
}
}
@@ -0,0 +1,113 @@
package com.facebook.presto.operator;

import com.facebook.presto.block.BlockAssertions;
import com.facebook.presto.block.BlockBuilder;
import com.facebook.presto.tuple.FieldOrderedTupleComparator;
import com.facebook.presto.tuple.TupleInfo;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Ordering;
import org.testng.annotations.Test;

import static com.facebook.presto.operator.OperatorAssertions.assertOperatorEquals;
import static com.facebook.presto.operator.OperatorAssertions.createOperator;
import static com.facebook.presto.operator.ProjectionFunctions.singleColumn;
import static com.facebook.presto.tuple.TupleInfo.Type.DOUBLE;
import static com.facebook.presto.tuple.TupleInfo.Type.FIXED_INT_64;
import static com.facebook.presto.tuple.TupleInfo.Type.VARIABLE_BINARY;

public class TestInMemoryOrderByOperator
{
@Test
public void testSingleFieldKey()
throws Exception
{
Operator source = createOperator(
new Page(
BlockAssertions.createLongsBlock(0, 1, 2),
BlockAssertions.createDoublesBlock(0, 0.1, 0.2)
),
new Page(
BlockAssertions.createLongsBlock(2, -1, 4),
BlockAssertions.createDoublesBlock(2, -0.1, 0.4)
)
);

InMemoryOrderByOperator actual = new InMemoryOrderByOperator(
source, 0, ImmutableList.of(singleColumn(FIXED_INT_64, 0, 0), singleColumn(DOUBLE, 1, 0))
);

Operator expected = createOperator(
new Page(
BlockAssertions.createLongsBlock(0, -1, 1, 2, 4),
BlockAssertions.createDoublesBlock(0, -0.1, 0.1, 0.2, 0.4)
)
);
assertOperatorEquals(actual, expected);
}

@Test
public void testMultiFieldKey()
throws Exception
{
TupleInfo tupleInfo = new TupleInfo(VARIABLE_BINARY, FIXED_INT_64);
Operator source = createOperator(
new Page(
new BlockBuilder(0, tupleInfo)
.append("a").append(1)
.append("b").append(2)
.build()
),
new Page(
new BlockBuilder(2, tupleInfo)
.append("b").append(3)
.append("a").append(4)
.build()
)
);

InMemoryOrderByOperator actual = new InMemoryOrderByOperator(
source, 0, ImmutableList.of(ProjectionFunctions.concat(singleColumn(VARIABLE_BINARY, 0, 0), singleColumn(FIXED_INT_64, 0, 1)))
);

Operator expected = createOperator(
new Page(
new BlockBuilder(0, tupleInfo)
.append("a").append(1)
.append("a").append(4)
.append("b").append(2)
.append("b").append(3)
.build()
)
);
assertOperatorEquals(actual, expected);
}

@Test
public void testReverseOrder()
throws Exception
{
Operator source = createOperator(
new Page(
BlockAssertions.createLongsBlock(0, 1, 2),
BlockAssertions.createDoublesBlock(0, 0.1, 0.2)
),
new Page(
BlockAssertions.createLongsBlock(2, -1, 4),
BlockAssertions.createDoublesBlock(2, -0.1, 0.4)
)
);

InMemoryOrderByOperator actual = new InMemoryOrderByOperator(
source, 0, ImmutableList.of(singleColumn(FIXED_INT_64, 0, 0), singleColumn(DOUBLE, 1, 0)),
Ordering.from(FieldOrderedTupleComparator.INSTANCE).reverse()
);

Operator expected = createOperator(
new Page(
BlockAssertions.createLongsBlock(0, 4, 2, 1, -1),
BlockAssertions.createDoublesBlock(0, 0.4, 0.2, 0.1, -0.1)
)
);
assertOperatorEquals(actual, expected);
}
}

0 comments on commit 908e1c5

Please sign in to comment.