Skip to content

Commit

Permalink
apache#3137 Remove result wrapping in sketch query
Browse files Browse the repository at this point in the history
  • Loading branch information
navis committed Mar 24, 2020
1 parent 5e64007 commit 1caec41
Show file tree
Hide file tree
Showing 65 changed files with 270 additions and 435 deletions.
Expand Up @@ -41,7 +41,7 @@
import io.druid.query.PostProcessingOperator;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.select.Schema;
import io.druid.query.Schema;

import java.util.Map;
import java.util.Properties;
Expand Down
Expand Up @@ -140,25 +140,24 @@ public Sequence run(final Query query, Map responseContext)
final List<VirtualColumn> virtualColumns = ((SketchQuery) representative).getVirtualColumns();
final List<ListenableFuture<Integer>> futures = Lists.newArrayList();
final Map<String, Map<String, Object>> results = Maps.newLinkedHashMap();
Sequence<Pair<Query, Sequence<Result<Object[]>>>> sequences = baseRunner.run(query, responseContext);
for (Pair<Query, Sequence<Result<Object[]>>> pair : Sequences.toList(sequences)) {
Sequence<Pair<Query, Sequence<Object[]>>> sequences = baseRunner.run(query, responseContext);
for (Pair<Query, Sequence<Object[]>> pair : Sequences.toList(sequences)) {
SketchQuery sketchQuery = (SketchQuery) pair.lhs;

final List<String> columns = sketchQuery.estimatedOutputColumns();
final Result<Object[]> values = Sequences.only(pair.rhs, null);
final Object[] values = Sequences.only(pair.rhs, null);
if (values == null) {
continue; // invalid interval or not-existing
}
final Object[] value = values.getValue();
if (sketchQuery.getSketchOp() == SketchOp.QUANTILE) {
final Map<String, ValueType> primitiveColumns = Maps.newTreeMap();
final Map<String, ValueType> numericColumns = Maps.newTreeMap();
for (int i = 0; i < value.length; i++) {
if (value[i] == null) {
for (int i = 1; i < values.length; i++) {
if (values[i] == null) {
continue; // empty or not-existing
}
final String column = columns.get(i);
final ValueDesc type = ((TypedSketch<ItemsSketch>) value[i]).type();
final ValueDesc type = ((TypedSketch<ItemsSketch>) values[i]).type();
if (type.isPrimitive()) {
primitiveColumns.put(column, type.type());
}
Expand Down Expand Up @@ -195,12 +194,12 @@ public Sequence run(final Query query, Map responseContext)
null,
BaseQuery.copyContextForMeta(query.withOverriddenContext(Query.ALL_DIMENSIONS_FOR_EMPTY, false))
);
for (int i = 0; i < value.length; i++) {
if (value[i] == null) {
for (int i = 1; i < values.length; i++) {
if (values[i] == null) {
continue; // empty or not-existing
}
final String column = columns.get(i);
final TypedSketch<ItemsSketch> sketch = (TypedSketch<ItemsSketch>) value[i];
final TypedSketch<ItemsSketch> sketch = (TypedSketch<ItemsSketch>) values[i];
Map<String, Object> result = results.get(column);
if (result == null) {
results.put(column, result = Maps.newLinkedHashMap());
Expand Down Expand Up @@ -329,9 +328,9 @@ public Integer call()
)
);
} else if (sketchQuery.getSketchOp() == SketchOp.THETA) {
for (int i = 0; i < value.length; i++) {
for (int i = 1; i < values.length; i++) {
final String column = columns.get(i);
final TypedSketch<Sketch> sketch = ((TypedSketch<Sketch>) value[i]);
final TypedSketch<Sketch> sketch = ((TypedSketch<Sketch>) values[i]);
if (sketch == null) {
continue;
}
Expand Down
Expand Up @@ -31,8 +31,6 @@
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.groupby.GroupByQuery;
import io.druid.query.groupby.GroupByQueryRunnerTestHelper;
import io.druid.query.select.Schema;
import io.druid.query.select.SchemaQuery;
import io.druid.segment.ExprVirtualColumn;
import io.druid.segment.TestHelper;
import io.druid.segment.TestIndex;
Expand Down
Expand Up @@ -31,7 +31,6 @@
import io.druid.common.utils.Sequences;
import io.druid.data.input.Row;
import io.druid.data.input.Rows;
import io.druid.java.util.common.guava.Sequence;
import io.druid.query.filter.DimFilter;
import io.druid.query.filter.DimFilters;
import io.druid.query.filter.LuceneLatLonPolygonFilter;
Expand All @@ -45,7 +44,7 @@

@JsonTypeName("choropleth")
public class ChoroplethMapQuery extends BaseQuery<Object[]>
implements Query.RewritingQuery<Object[]>, Query.ArrayOutputSupport<Object[]>
implements Query.RewritingQuery<Object[]>, Query.ArrayOutput
{
private final GroupByQuery query;
private final String pointColumn;
Expand Down Expand Up @@ -208,12 +207,6 @@ public List<String> estimatedOutputColumns()
return outputColumns == null ? null : GuavaUtils.concat(outputColumns, boundaryJoin.keySet());
}

@Override
public Sequence<Object[]> array(Sequence<Object[]> sequence)
{
return sequence;
}

@Override
public String toString()
{
Expand Down
Expand Up @@ -34,7 +34,7 @@
import io.druid.data.ValueDesc;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.guava.Sequence;
import io.druid.query.Query.ArrayOutputSupport;
import io.druid.query.Query.ArrayOutput;
import io.druid.query.Query.FilterSupport;
import io.druid.query.Query.RewritingQuery;
import io.druid.query.Query.SchemaProvider;
Expand All @@ -56,7 +56,7 @@

@JsonTypeName("geo.boundary")
public class GeoBoundaryFilterQuery extends BaseQuery<Object[]>
implements RewritingQuery<Object[]>, ArrayOutputSupport<Object[]>, FilterSupport<Object[]>,
implements RewritingQuery<Object[]>, ArrayOutput, FilterSupport<Object[]>,
Query.WrappingQuery<Object[]>, SchemaProvider
{
private static final int DEFAULT_PARALLELISM = 2;
Expand Down Expand Up @@ -602,12 +602,6 @@ public List<String> estimatedOutputColumns()
return flip ? GuavaUtils.concat(boundaryJoin, columns) : GuavaUtils.concat(columns, boundaryJoin);
}

@Override
public Sequence<Object[]> array(Sequence<Object[]> sequence)
{
return sequence;
}

@Override
public boolean equals(Object o)
{
Expand Down
Expand Up @@ -27,8 +27,6 @@
import io.druid.query.filter.LuceneLatLonPolygonFilter;
import io.druid.query.filter.LuceneShapeFilter;
import io.druid.query.filter.LuceneSpatialFilter;
import io.druid.query.select.Schema;
import io.druid.query.select.SchemaQuery;
import io.druid.segment.ExprVirtualColumn;
import io.druid.segment.TestIndex;
import io.druid.segment.lucene.ShapeFormat;
Expand Down
Expand Up @@ -69,7 +69,8 @@ public abstract class BaseAggregationQuery extends BaseQuery<Row>
implements Query.AggregationsSupport<Row>,
Query.ArrayOutputSupport<Row>,
Query.OrderingSupport<Row>,
Query.LateralViewSupport<Row>
Query.LateralViewSupport<Row>,
Query.RowOutput
{
public static final String SORT_ON_TIME = "groupby.sort.on.time";

Expand Down Expand Up @@ -254,7 +255,8 @@ public List<String> estimatedOutputColumns()
public Sequence<Object[]> array(Sequence<Row> sequence)
{
final String[] columns = Preconditions.checkNotNull(estimatedOutputColumns()).toArray(new String[0]);
return io.druid.common.utils.Sequences.map(
final int timeIndex = Arrays.asList(columns).indexOf(Row.TIME_COLUMN_NAME);
return Sequences.map(
sequence,
new Function<Row, Object[]>()
{
Expand All @@ -263,8 +265,7 @@ public Object[] apply(Row input)
{
final Object[] array = new Object[columns.length];
for (int i = 0; i < columns.length; i++) {
array[i] = Row.TIME_COLUMN_NAME.equals(columns[i]) ?
input.getTimestampFromEpoch() : input.getRaw(columns[i]);
array[i] = timeIndex == i ? input.getTimestampFromEpoch() : input.getRaw(columns[i]);
}
return array;
}
Expand Down
9 changes: 1 addition & 8 deletions processing/src/main/java/io/druid/query/ClassifyQuery.java
Expand Up @@ -24,7 +24,6 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import io.druid.java.util.common.guava.Sequence;
import io.druid.common.guava.GuavaUtils;
import io.druid.query.spec.QuerySegmentSpec;

Expand All @@ -34,7 +33,7 @@

@JsonTypeName("classify")
public class ClassifyQuery extends BaseQuery<Object[]>
implements Query.RewritingQuery<Object[]>, Query.ArrayOutputSupport<Object[]>
implements Query.RewritingQuery<Object[]>, Query.ArrayOutput
{
private final Query<Object[]> query;
private final Query<?> classifier;
Expand Down Expand Up @@ -108,12 +107,6 @@ public List<String> estimatedOutputColumns()
return null;
}

@Override
public Sequence<Object[]> array(Sequence<Object[]> sequence)
{
return sequence;
}

@Override
@SuppressWarnings("unchecked")
public Query rewriteQuery(QuerySegmentWalker segmentWalker, QueryConfig queryConfig)
Expand Down
Expand Up @@ -44,6 +44,8 @@
import io.druid.java.util.common.logger.Logger;
import io.druid.java.util.common.parsers.CloseableIterator;
import io.druid.query.JoinQuery.JoinDelegate;
import io.druid.query.PostProcessingOperator.Local;
import io.druid.query.PostProcessingOperator.ReturnRowAs;
import io.druid.query.groupby.orderby.OrderByColumnSpec;
import io.druid.query.ordering.Comparators;

Expand All @@ -63,8 +65,7 @@

/**
*/
public class JoinPostProcessor extends PostProcessingOperator.UnionSupport
implements PostProcessingOperator.ReturnRowAs, PostProcessingOperator.Local
public class JoinPostProcessor extends PostProcessingOperator.UnionSupport implements ReturnRowAs, Local
{
private static final Logger log = new Logger(JoinPostProcessor.class);

Expand Down
Expand Up @@ -22,7 +22,7 @@
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.base.Function;
import io.druid.query.select.Schema;
import io.druid.query.Schema;

import java.util.Map;

Expand Down
Expand Up @@ -26,7 +26,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import io.druid.query.PostProcessingOperator.UnionSupport;
import io.druid.query.select.Schema;
import io.druid.query.Schema;

import java.util.List;
import java.util.concurrent.ExecutorService;
Expand Down
Expand Up @@ -35,7 +35,7 @@
import io.druid.java.util.common.guava.Sequence;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.PostAggregators;
import io.druid.query.select.Schema;
import io.druid.query.Schema;

import java.util.List;
import java.util.Map;
Expand Down
Expand Up @@ -25,7 +25,6 @@
import com.google.common.collect.Maps;
import io.druid.common.guava.GuavaUtils;
import io.druid.java.util.common.guava.Sequence;
import io.druid.query.select.Schema;

import java.util.Arrays;
import java.util.List;
Expand Down
30 changes: 22 additions & 8 deletions processing/src/main/java/io/druid/query/Queries.java
Expand Up @@ -45,6 +45,7 @@
import io.druid.java.util.common.guava.Accumulator;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.Query.ArrayOutputSupport;
import io.druid.query.Query.ColumnsSupport;
import io.druid.query.Query.DimensionSupport;
import io.druid.query.Query.SchemaProvider;
Expand All @@ -60,10 +61,8 @@
import io.druid.query.groupby.GroupByQuery;
import io.druid.query.ordering.OrderingSpec;
import io.druid.query.select.EventHolder;
import io.druid.query.select.Schema;
import io.druid.query.select.SelectQuery;
import io.druid.query.select.SelectResultValue;
import io.druid.query.select.StreamQuery;
import io.druid.query.sketch.GenericSketchAggregatorFactory;
import io.druid.query.sketch.QuantileOperation;
import io.druid.query.sketch.SketchOp;
Expand Down Expand Up @@ -152,6 +151,25 @@ public static RowSignature relaySchema(Query query, QuerySegmentWalker segmentWa
return Preconditions.checkNotNull(schema);
}

public static List<String> relayColumns(ArrayOutputSupport<?> query, ObjectMapper mapper)
{
List<String> columns = query.estimatedOutputColumns();
if (columns == null) {
return columns;
}
if (query instanceof Query.LateralViewSupport) {
LateralViewSpec lateralView = ((Query.LateralViewSupport) query).getLateralView();
if (lateralView != null) {
columns = lateralView.resolve(columns);
}
}
PostProcessingOperator postProcessor = PostProcessingOperators.load(query, mapper);
if (postProcessor instanceof Schema.SchemaResolving) {
columns = ((Schema.SchemaResolving) postProcessor).resolve(columns);
}
return columns;
}

// best effort.. implement SchemaProvider if not enough
private static RowSignature _relaySchema(Query query, QuerySegmentWalker segmentWalker)
{
Expand Down Expand Up @@ -272,14 +290,10 @@ public static <I> Sequence<Row> convertToRow(Query<I> subQuery, Sequence<I> sequ
return Sequences.explode((Sequence<Result<SelectResultValue>>) sequence, SELECT_TO_ROWS);
} else if (subQuery instanceof TopNQuery) {
return Sequences.explode((Sequence<Result<TopNResultValue>>) sequence, TOP_N_TO_ROWS);
} else if (subQuery instanceof BaseAggregationQuery) {
return (Sequence<Row>) sequence;
} else if (subQuery instanceof StreamQuery) {
return ((StreamQuery) subQuery).asRow((Sequence<Object[]>) sequence);
} else if (subQuery instanceof UnionAllQuery) {
return ((UnionAllQuery) subQuery).asRow(sequence);
} else if (subQuery instanceof Query.RowOutputSupport) {
return ((Query.RowOutputSupport) subQuery).asRow(sequence);
} else if (subQuery instanceof UnionAllQuery) {
return ((UnionAllQuery) subQuery).asRow(sequence);
}
return Sequences.map(sequence, GuavaUtils.<I, Row>caster());
}
Expand Down
14 changes: 13 additions & 1 deletion processing/src/main/java/io/druid/query/Query.java
Expand Up @@ -40,7 +40,6 @@
import io.druid.query.kmeans.KMeansTaggingQuery;
import io.druid.query.metadata.metadata.SegmentMetadataQuery;
import io.druid.query.search.search.SearchQuery;
import io.druid.query.select.SchemaQuery;
import io.druid.query.select.SelectForwardQuery;
import io.druid.query.select.SelectMetaQuery;
import io.druid.query.select.SelectQuery;
Expand Down Expand Up @@ -208,11 +207,24 @@ interface ArrayOutputSupport<T> extends Query<T>
Sequence<Object[]> array(Sequence<T> sequence);
}

interface ArrayOutput extends ArrayOutputSupport<Object[]>
{
default Sequence<Object[]> array(Sequence<Object[]> sequence)
{
return sequence;
}
}

interface RowOutputSupport<T> extends Query<T>
{
Sequence<Row> asRow(Sequence<T> sequence);
}

interface RowOutput extends RowOutputSupport<Row>
{
default Sequence<Row> asRow(Sequence<Row> sequence) { return sequence; }
}

interface OrderingSupport<T> extends Query<T>
{
List<OrderByColumnSpec> getResultOrdering();
Expand Down
2 changes: 0 additions & 2 deletions processing/src/main/java/io/druid/query/QueryUtils.java
Expand Up @@ -54,8 +54,6 @@
import io.druid.query.metadata.metadata.NoneColumnIncluderator;
import io.druid.query.metadata.metadata.SegmentMetadataQuery;
import io.druid.query.metadata.metadata.SegmentMetadataQuery.AnalysisType;
import io.druid.query.select.Schema;
import io.druid.query.select.SchemaQuery;
import io.druid.segment.ExprVirtualColumn;
import io.druid.segment.VirtualColumn;
import io.druid.segment.VirtualColumns;
Expand Down
Expand Up @@ -20,19 +20,17 @@
package io.druid.query;

import com.google.common.collect.Ordering;
import io.druid.common.guava.CombiningSequence;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.nary.BinaryFn;
import io.druid.common.guava.CombiningSequence;

import java.util.Map;

/**
*/
public abstract class ResultMergeQueryRunner<T> extends BySegmentSkippingQueryRunner<T>
{
public ResultMergeQueryRunner(
QueryRunner<T> baseRunner
)
public ResultMergeQueryRunner(QueryRunner<T> baseRunner)
{
super(baseRunner);
}
Expand All @@ -47,7 +45,10 @@ public Sequence<T> doRun(QueryRunner<T> baseRunner, Query<T> query, Map<String,
);
}

protected abstract Ordering<T> makeOrdering(Query<T> query);
protected Ordering<T> makeOrdering(Query<T> query)
{
return query.getMergeOrdering();
}

protected abstract BinaryFn<T,T,T> createMergeFn(Query<T> query);
}

0 comments on commit 1caec41

Please sign in to comment.