Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add simple HashAggregation #1

Merged
merged 5 commits into from
Aug 17, 2012
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/main/java/com/facebook/presto/AggregationUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.facebook.presto;

import com.google.common.base.Preconditions;
import com.google.common.collect.Range;

public class AggregationUtil {
public static void processGroup(SeekableIterator<ValueBlock> aggregationSource, AggregationFunction aggregation, Range<Long> positions)
{
RangePositionBlock positionBlock = new RangePositionBlock(positions);

// goto start of range
aggregationSource.seekTo(positions.lowerEndpoint());
Preconditions.checkState(aggregationSource.hasNext(), "Group start position not found in aggregation source");

// while we have data...
while (aggregationSource.hasNext() && aggregationSource.peek().getRange().isConnected(positions)) {
// process aggregation
aggregation.add(aggregationSource.next(), positionBlock);
}
}
}
74 changes: 74 additions & 0 deletions src/main/java/com/facebook/presto/CsvFileScanner.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package com.facebook.presto;

import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.base.Throwables;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Iterables;
import com.google.common.io.InputSupplier;
import com.google.common.io.LineReader;

import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Iterator;

public class CsvFileScanner implements Iterable<ValueBlock>
{
private final InputSupplier<InputStreamReader> inputSupplier;
private final Splitter columnSplitter;
private final int columnIndex;

public CsvFileScanner(InputSupplier<InputStreamReader> inputSupplier, int columnIndex, char columnSeparator)
{
this.columnIndex = columnIndex;
Preconditions.checkNotNull(inputSupplier, "inputSupplier is null");
this.inputSupplier = inputSupplier;
columnSplitter = Splitter.on(columnSeparator);
}

@Override
public Iterator<ValueBlock> iterator()
{
return new ColumnIterator(inputSupplier, columnIndex, columnSplitter);
}

private static class ColumnIterator extends AbstractIterator<ValueBlock>
{
private long position;
private final LineReader reader;
private int columnIndex;
private Splitter columnSplitter;

public ColumnIterator(InputSupplier<InputStreamReader> inputSupplier, int columnIndex, Splitter columnSplitter)
{
try {
this.reader = new LineReader(inputSupplier.getInput());
}
catch (IOException e) {
throw Throwables.propagate(e);
}
this.columnIndex = columnIndex;
this.columnSplitter = columnSplitter;
}

@Override
protected ValueBlock computeNext()
{
String line;
try {
line = reader.readLine();
}
catch (IOException e) {
throw Throwables.propagate(e);
}
if (line == null) {
endOfData();
return null;
}
Iterable<String> split = columnSplitter.split(line);
String value = Iterables.get(split, columnIndex);
return new UncompressedValueBlock(position++, value);

}
}
}
82 changes: 82 additions & 0 deletions src/main/java/com/facebook/presto/GroupBy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package com.facebook.presto;

import com.google.common.collect.AbstractIterator;
import com.google.common.collect.PeekingIterator;
import com.google.common.collect.Range;
import com.google.common.collect.Ranges;

import java.util.Iterator;

/**
* Group input data and produce a single block for each sequence of identical values.
*/
public class GroupBy
extends AbstractIterator<RunLengthEncodedBlock>
{
private final Iterator<ValueBlock> groupBySource;

private PeekingIterator<Pair> currentGroupByBlock;

public GroupBy(Iterator<ValueBlock> keySource)
{
this.groupBySource = keySource;
}

@Override
protected RunLengthEncodedBlock computeNext()
{
// if no more data, return null
if (!advanceGroupByBlock()) {
endOfData();
return null;
}

// form a group from the current position, until the value changes
Pair entry = currentGroupByBlock.next();
Object groupByKey = entry.getValue();
long startPosition = entry.getPosition();

while (true) {
// skip entries until the current key changes or we've consumed this block
while (currentGroupByBlock.hasNext() && currentGroupByBlock.peek().getValue().equals(groupByKey)) {
entry = currentGroupByBlock.next();
}

// stop if there is more data in the current block since the next entry will be for a new group
if (currentGroupByBlock.hasNext()) {
break;
}

// stop if we are at the end of the stream
if (!groupBySource.hasNext()) {
break;
}

// process the next block
currentGroupByBlock = groupBySource.next().pairIterator();
}

long endPosition = entry.getPosition();
Range<Long> range = Ranges.closed(startPosition, endPosition);

RunLengthEncodedBlock group = new RunLengthEncodedBlock(groupByKey, range);
return group;
}

private boolean advanceGroupByBlock()
{
// does current block iterator have more data?
if (currentGroupByBlock != null && currentGroupByBlock.hasNext()) {
return true;
}

// are there more blocks?
if (!groupBySource.hasNext()) {
return false;
}

// advance to next block and open an iterator
currentGroupByBlock = groupBySource.next().pairIterator();
return true;
}
}
68 changes: 68 additions & 0 deletions src/main/java/com/facebook/presto/HashAggregation.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package com.facebook.presto;

import com.google.common.collect.AbstractIterator;

import javax.inject.Provider;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;

import static com.facebook.presto.AggregationUtil.processGroup;

public class HashAggregation
extends AbstractIterator<ValueBlock>
{
private final Iterator<RunLengthEncodedBlock> groupBySource;
private final SeekableIterator<ValueBlock> aggregationSource;

private final Provider<AggregationFunction> functionProvider;

private Iterator<Entry<Object, AggregationFunction>> aggregations;

private long position;

public HashAggregation(Iterator<RunLengthEncodedBlock> keySource, SeekableIterator<ValueBlock> valueSource, Provider<AggregationFunction> functionProvider)
{
this.groupBySource = keySource;
this.aggregationSource = valueSource;

this.functionProvider = functionProvider;
}

@Override
protected ValueBlock computeNext()
{
// process all data ahead of time
if (aggregations == null) {
Map<Object, AggregationFunction> aggregationMap = new HashMap<>();
while (groupBySource.hasNext()) {
RunLengthEncodedBlock group = groupBySource.next();

AggregationFunction aggregation = aggregationMap.get(group.getValue());
if (aggregation == null) {
aggregation = functionProvider.get();
aggregationMap.put(group.getValue(), aggregation);
}
processGroup(aggregationSource, aggregation, group.getRange());
}

this.aggregations = aggregationMap.entrySet().iterator();
}

// if no more data, return null
if (!aggregations.hasNext()) {
endOfData();
return null;
}

// get next aggregation
Entry<Object, AggregationFunction> aggregation = aggregations.next();

// calculate final value for this group
Object value = aggregation.getValue().evaluate();

// build an output block
return new UncompressedValueBlock(position++, new Tuple(aggregation.getKey(), value));
}
}
29 changes: 29 additions & 0 deletions src/main/java/com/facebook/presto/Pair.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,35 @@ public Object apply(Pair input)
};
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

Pair pair = (Pair) o;

if (position != pair.position) {
return false;
}
if (value != null ? !value.equals(pair.value) : pair.value != null) {
return false;
}

return true;
}

@Override
public int hashCode()
{
int result = (int) (position ^ (position >>> 32));
result = 31 * result + (value != null ? value.hashCode() : 0);
return result;
}

@Override
public String toString()
Expand Down
46 changes: 46 additions & 0 deletions src/main/java/com/facebook/presto/PairsIterator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.facebook.presto;

import com.google.common.collect.AbstractIterator;
import com.google.common.collect.PeekingIterator;

import java.util.Iterator;

public class PairsIterator
extends AbstractIterator<Pair>
{
private final Iterator<ValueBlock> blockIterator;
private PeekingIterator<Pair> currentBlock;

public PairsIterator(Iterator<ValueBlock> blockIterator)
{
this.blockIterator = blockIterator;
}

@Override
protected Pair computeNext()
{
if (!advance()) {
endOfData();
return null;
}
return currentBlock.next();
}

private boolean advance()
{
// does current block iterator have more data?
if (currentBlock != null && currentBlock.hasNext()) {
return true;
}

// are there more blocks?
if (!blockIterator.hasNext()) {
return false;
}

// advance to next block and open an iterator
currentBlock = blockIterator.next().pairIterator();
return true;
}

}
Loading