Skip to content

Commit

Permalink
Support filtering on long columns (including __time) (apache#3180)
Browse files Browse the repository at this point in the history
* Support filtering on __time column

* Rename DruidPredicate

* Add docs for ValueMatcherFactory, add comment on getColumnCapabilities

* Combine ValueMatcherFactory predicate methods to accept DruidCompositePredicate

* Address PR comments (support filter on all long columns)

* Use predicate factory instead of composite predicate

* Address PR comments

* Lazily initialize long handling in selector/in filter

* Move long value parsing from InFilter to InDimFilter, make long value parsing thread-safe

* Add multithreaded selector/in filter test

* Fix non-final lock object in SelectorDimFilter
  • Loading branch information
jon-wei authored and fjy committed Jul 21, 2016
1 parent cd7337f commit a42ccb6
Show file tree
Hide file tree
Showing 34 changed files with 2,091 additions and 152 deletions.
Expand Up @@ -29,6 +29,8 @@
import com.metamx.collections.bitmap.RoaringBitmapFactory;
import com.metamx.collections.spatial.ImmutableRTree;
import io.druid.query.filter.BitmapIndexSelector;
import io.druid.query.filter.DruidLongPredicate;
import io.druid.query.filter.DruidPredicateFactory;
import io.druid.segment.column.BitmapIndex;
import io.druid.segment.data.BitmapSerdeFactory;
import io.druid.segment.data.GenericIndexed;
Expand Down Expand Up @@ -63,16 +65,35 @@ public class DimensionPredicateFilterBenchmark

private static final DimensionPredicateFilter IS_EVEN = new DimensionPredicateFilter(
"foo",
new Predicate<String>()
new DruidPredicateFactory()
{
@Override
public boolean apply(String input)
public Predicate<String> makeStringPredicate()
{
if (input == null) {
return false;
}
return new Predicate<String>()
{
@Override
public boolean apply(String input)
{
if (input == null) {
return false;
}
return Integer.parseInt(input.toString()) % 2 == 0;
}
};
}

return Integer.parseInt(input) % 2 == 0;
@Override
public DruidLongPredicate makeLongPredicate()
{
return new DruidLongPredicate()
{
@Override
public boolean applyLong(long input)
{
return false;
}
};
}
},
null
Expand Down
Expand Up @@ -43,7 +43,10 @@
import io.druid.query.extraction.JavaScriptExtractionFn;
import io.druid.query.filter.AndDimFilter;
import io.druid.query.filter.BitmapIndexSelector;
import io.druid.query.filter.BoundDimFilter;
import io.druid.query.filter.DimFilter;
import io.druid.query.filter.DruidLongPredicate;
import io.druid.query.filter.DruidPredicateFactory;
import io.druid.query.filter.Filter;
import io.druid.query.filter.OrDimFilter;
import io.druid.query.filter.SelectorDimFilter;
Expand All @@ -56,9 +59,11 @@
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexStorageAdapter;
import io.druid.segment.StorageAdapter;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.filter.AndFilter;
import io.druid.segment.filter.BoundFilter;
import io.druid.segment.filter.DimensionPredicateFilter;
import io.druid.segment.filter.Filters;
import io.druid.segment.filter.OrFilter;
Expand All @@ -67,6 +72,7 @@
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.serde.ComplexMetrics;
import org.joda.time.Interval;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
Expand Down Expand Up @@ -109,6 +115,10 @@ public class FilterPartitionBenchmark
private QueryableIndex qIndex;
private File indexFile;

private Filter timeFilterNone;
private Filter timeFilterHalf;
private Filter timeFilterAll;

private BenchmarkSchemaInfo schemaInfo;

private static String JS_FN = "function(str) { return 'super-' + str; }";
Expand Down Expand Up @@ -168,6 +178,38 @@ public void setup() throws IOException
new IndexSpec()
);
qIndex = INDEX_IO.loadIndex(indexFile);

Interval interval = schemaInfo.getDataInterval();
timeFilterNone = new BoundFilter(new BoundDimFilter(
Column.TIME_COLUMN_NAME,
String.valueOf(Long.MAX_VALUE),
String.valueOf(Long.MAX_VALUE),
true,
true,
true,
null
));

long halfEnd = (interval.getEndMillis() + interval.getStartMillis()) / 2;
timeFilterHalf = new BoundFilter(new BoundDimFilter(
Column.TIME_COLUMN_NAME,
String.valueOf(interval.getStartMillis()),
String.valueOf(halfEnd),
true,
true,
true,
null
));

timeFilterAll = new BoundFilter(new BoundDimFilter(
Column.TIME_COLUMN_NAME,
String.valueOf(interval.getStartMillis()),
String.valueOf(interval.getEndMillis()),
true,
true,
true,
null
));
}

private IncrementalIndex makeIncIndex()
Expand Down Expand Up @@ -215,6 +257,51 @@ public void longRead(Blackhole blackhole) throws Exception
}
}

@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void timeFilterNone(Blackhole blackhole) throws Exception
{
StorageAdapter sa = new QueryableIndexStorageAdapter(qIndex);
Sequence<Cursor> cursors = sa.makeCursors(timeFilterNone, schemaInfo.getDataInterval(), QueryGranularities.ALL, false);

Sequence<List<Long>> longListSeq = readCursorsLong(cursors, blackhole);
List<Long> strings = Sequences.toList(Sequences.limit(longListSeq, 1), Lists.<List<Long>>newArrayList()).get(0);
for (Long st : strings) {
blackhole.consume(st);
}
}

@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void timeFilterHalf(Blackhole blackhole) throws Exception
{
StorageAdapter sa = new QueryableIndexStorageAdapter(qIndex);
Sequence<Cursor> cursors = sa.makeCursors(timeFilterHalf, schemaInfo.getDataInterval(), QueryGranularities.ALL, false);

Sequence<List<Long>> longListSeq = readCursorsLong(cursors, blackhole);
List<Long> strings = Sequences.toList(Sequences.limit(longListSeq, 1), Lists.<List<Long>>newArrayList()).get(0);
for (Long st : strings) {
blackhole.consume(st);
}
}

@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void timeFilterAll(Blackhole blackhole) throws Exception
{
StorageAdapter sa = new QueryableIndexStorageAdapter(qIndex);
Sequence<Cursor> cursors = sa.makeCursors(timeFilterAll, schemaInfo.getDataInterval(), QueryGranularities.ALL, false);

Sequence<List<Long>> longListSeq = readCursorsLong(cursors, blackhole);
List<Long> strings = Sequences.toList(Sequences.limit(longListSeq, 1), Lists.<List<Long>>newArrayList()).get(0);
for (Long st : strings) {
blackhole.consume(st);
}
}

@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
Expand Down Expand Up @@ -442,7 +529,6 @@ private Sequence<List<Long>> readCursorsLong(Sequence<Cursor> cursors, final Bla
public List<Long> apply(Cursor input)
{
List<Long> longvals = new ArrayList<Long>();

LongColumnSelector selector = input.makeLongColumnSelector("sumLongSequential");
while (!input.isDone()) {
long rowval = selector.get();
Expand Down Expand Up @@ -476,11 +562,11 @@ private class NoBitmapDimensionPredicateFilter extends DimensionPredicateFilter
{
public NoBitmapDimensionPredicateFilter(
final String dimension,
final Predicate<String> predicate,
final DruidPredicateFactory predicateFactory,
final ExtractionFn extractionFn
)
{
super(dimension, predicate, extractionFn);
super(dimension, predicateFactory, extractionFn);
}

@Override
Expand Down Expand Up @@ -510,21 +596,37 @@ public Filter toFilter()
return new NoBitmapSelectorFilter(dimension, value);
} else {
final String valueOrNull = Strings.emptyToNull(value);
final Predicate<String> predicate = new Predicate<String>()

final DruidPredicateFactory predicateFactory = new DruidPredicateFactory()
{
@Override
public boolean apply(String input)
public Predicate<String> makeStringPredicate()
{
return Objects.equals(valueOrNull, input);
return new Predicate<String>()
{
@Override
public boolean apply(String input)
{
return Objects.equals(valueOrNull, input);
}
};
}

@Override
public String toString()
public DruidLongPredicate makeLongPredicate()
{
return value;
return new DruidLongPredicate()
{
@Override
public boolean applyLong(long input)
{
return false;
}
};
}
};
return new NoBitmapDimensionPredicateFilter(dimension, predicate, extractionFn);

return new NoBitmapDimensionPredicateFilter(dimension, predicateFactory, extractionFn);
}
}
}
Expand Down

0 comments on commit a42ccb6

Please sign in to comment.