Skip to content

Commit

Permalink
Add support to attempt converting to top n query when two group by di…
Browse files Browse the repository at this point in the history
…mensions present

Summary:
Add support to attempt converting to top n query when two group by dimensions present

Motivation: below example query above can be executed as a Top N native query with granularity field set to `hour` but currently can only be translated to a GROUP BY native query in broker.
```
SELECT SUM(ct) FILTER(WHERE eventtype = 'PIN_IMPRESSION') AS IMPRESSION, SUM(ct) FILTER(WHERE eventtype = 'PIN_CLOSEUP') AS CLOSEUP, SUM(ct) FILTER(WHERE eventtype = 'PIN_CLICKTHROUGH') AS CLICKTHROUGH, SUM(ct) FILTER(WHERE eventtype = 'PIN_REPIN') AS SAVE, SUM(ct) FILTER(WHERE eventtype = 'VIDEO_MRC_VIEWS') AS VIDEO_MRC_VIEW, SUM(ct) FILTER(WHERE eventtype = 'QUARTILE_95_PERCENT') AS QUARTILE_95_PERCENT_VIEW, SUM(ct) FILTER(WHERE eventtype = 'VIDEO_V50_WATCH_TIME_MS') AS VIDEO_V50_WATCH_TIME, SUM(ct) FILTER(WHERE eventtype = 'VIDEO_START') AS VIDEO_START, MIN(CAST(create_timestamp AS BIGINT)) AS min_create_timestamp, root_pin_id AS root_pin_id, FLOOR(__time TO HOUR) AS __time FROM pin_stats_realtime_root WHERE (__time >= TIMESTAMP '2020-11-18 03:51:59.000000' AND __time <= TIMESTAMP '2020-11-19 03:51:59.000000' AND (app IN ('1', '2', '3', '4', '5', '6') AND root_pin_id = 512284526356261619 AND version_number = 7 AND partner_id = 512284663771961963)) GROUP BY root_pin_id, FLOOR(__time TO HOUR) LIMIT 100
```

The change affects SQL to native query translation on broker:

When `attemptConvertingToTopNWithTwoGroupByDimensions` is set to true in query context:
Apart from existing criteria of whether a SQL can be translated to a top N query, a SQL is also convertible to TOP N query when GROUP BY two columns with one of them being a granular time. Meanwhile, granularity can be inferred from the GROUP BY column of granular time.

Caveats when use this mechanism:
When execute as a GROUP BY query, the limit is appied globally, so there will be at most `limit` number of rows returned
When execute as a TOP N query, the limit is applied per group within each granular time bucket, so there will be at most (`limit` * number of distinct groups within each granular time bucket) number of rows returned
When limit is large enough, the result is the same except for potential ordering difference

Test Plan: Unit test and integration tests in dev cluster.

Reviewers: O1139 Druid, itallam, yyang

Reviewed By: O1139 Druid, itallam, yyang

Subscribers: jenkins, #realtime-analytics

Differential Revision: https://phabricator.pinadmin.com/D650035
  • Loading branch information
Jian Wang committed Nov 20, 2020
1 parent 135fcc9 commit 3f2604d
Show file tree
Hide file tree
Showing 5 changed files with 209 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public class PlannerConfig
{
public static final String CTX_KEY_USE_APPROXIMATE_COUNT_DISTINCT = "useApproximateCountDistinct";
public static final String CTX_KEY_USE_APPROXIMATE_TOPN = "useApproximateTopN";
public static final String CTX_KEY_ATTEMPT_CONVERTING_TO_TOP_N_WITH_TWO_GROUP_BY_DIMENSIONS =
"attemptConvertingToTopNWithTwoGroupByDimensions";

@JsonProperty
private Period metadataRefreshPeriod = new Period("PT1M");
Expand Down Expand Up @@ -65,6 +67,14 @@ public class PlannerConfig
@JsonProperty
private long metadataSegmentPollPeriod = 60000;

@JsonProperty
private boolean attemptConvertingToTopNWithTwoGroupByDimensions = false;

public boolean isAttemptConvertingToTopNWithTwoGroupByDimensions()
{
return attemptConvertingToTopNWithTwoGroupByDimensions;
}

public long getMetadataSegmentPollPeriod()
{
return metadataSegmentPollPeriod;
Expand Down Expand Up @@ -154,6 +164,11 @@ public PlannerConfig withOverrides(final Map<String, Object> context)
newConfig.metadataSegmentCacheEnable = isMetadataSegmentCacheEnable();
newConfig.metadataSegmentPollPeriod = getMetadataSegmentPollPeriod();
newConfig.serializeComplexValues = shouldSerializeComplexValues();
newConfig.attemptConvertingToTopNWithTwoGroupByDimensions = getContextBoolean(
context,
CTX_KEY_ATTEMPT_CONVERTING_TO_TOP_N_WITH_TWO_GROUP_BY_DIMENSIONS,
isAttemptConvertingToTopNWithTwoGroupByDimensions()
);
return newConfig;
}

Expand Down Expand Up @@ -195,6 +210,7 @@ public boolean equals(final Object o)
metadataSegmentCacheEnable == that.metadataSegmentCacheEnable &&
metadataSegmentPollPeriod == that.metadataSegmentPollPeriod &&
serializeComplexValues == that.serializeComplexValues &&
attemptConvertingToTopNWithTwoGroupByDimensions == that.attemptConvertingToTopNWithTwoGroupByDimensions &&
Objects.equals(metadataRefreshPeriod, that.metadataRefreshPeriod) &&
Objects.equals(sqlTimeZone, that.sqlTimeZone);
}
Expand All @@ -215,7 +231,8 @@ public int hashCode()
sqlTimeZone,
metadataSegmentCacheEnable,
metadataSegmentPollPeriod,
serializeComplexValues
serializeComplexValues,
attemptConvertingToTopNWithTwoGroupByDimensions
);
}

Expand All @@ -235,6 +252,7 @@ public String toString()
", metadataSegmentPollPeriod=" + metadataSegmentPollPeriod +
", sqlTimeZone=" + sqlTimeZone +
", serializeComplexValues=" + serializeComplexValues +
", attemptConvertingToTopNWithTwoGroupByDimensions=" + attemptConvertingToTopNWithTwoGroupByDimensions +
'}';
}
}
45 changes: 40 additions & 5 deletions sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -744,9 +744,9 @@ public TimeseriesQuery toTimeseriesQuery()
@Nullable
public TopNQuery toTopNQuery()
{
// Must have GROUP BY one column, ORDER BY zero or one column, limit less than maxTopNLimit, and no HAVING.
final boolean topNOk = grouping != null
&& grouping.getDimensions().size() == 1
// Must have GROUP BY one column (or GROUP BY two columns as an experimental feature when attemptConvertingToTopNWithTwoGroupByDimensions is set to true in query context), ORDER BY zero or one column, limit less than maxTopNLimit, and no HAVING.
boolean topNOk = grouping != null
&& (grouping.getDimensions().size() == 1 || (plannerContext.getPlannerConfig().isAttemptConvertingToTopNWithTwoGroupByDimensions() && grouping.getDimensions().size() == 2))
&& sorting != null
&& (sorting.getOrderBys().size() <= 1
&& sorting.isLimited() && sorting.getLimit() <= plannerContext.getPlannerConfig()
Expand All @@ -757,7 +757,42 @@ public TopNQuery toTopNQuery()
return null;
}

final DimensionSpec dimensionSpec = Iterables.getOnlyElement(grouping.getDimensions()).toDimensionSpec();
// When attemptConvertingToTopNWithTwoGroupByDimensions is set to true in query context:
// It is also convertable to TOP N query when GROUP BY two columns with one of them being a granular time
// Meanwhile, granularity can be inferred from the GROUP BY column of granular time
//
// Caveats:
// When execute as a GROUP BY query, the limit is appied globally, so there will be at most `limit` number of rows returned
// When execute as a TOP N query, the limit is applied per group within each granular time bucket, so there will be at most (`limit` * number of distinct groups within each granular time bucket) number of rows returned
// When limit is large enough, the result is the same except for potential ordering difference
final DimensionSpec dimensionSpec;
final Granularity granularity;
if (grouping.getDimensions().size() == 1) {
dimensionSpec = Iterables.getOnlyElement(grouping.getDimensions()).toDimensionSpec();
granularity = Granularities.ALL;
} else {
List<DimensionExpression> nonGranularTimeDimensions = new ArrayList<>();
List<DimensionExpression> granularTimeDimensions = new ArrayList<>();
grouping.getDimensions()
.forEach(d -> (
Expressions.toQueryGranularity(
d.getDruidExpression(),
plannerContext.getExprMacroTable()
) == null ? nonGranularTimeDimensions : granularTimeDimensions)
.add(d));

if (!(nonGranularTimeDimensions.size() == 1 && granularTimeDimensions.size() == 1)) {
// Not a TOP N query
return null;
} else {
dimensionSpec = Iterables.getOnlyElement(nonGranularTimeDimensions).toDimensionSpec();
granularity = Expressions.toQueryGranularity(
Iterables.getOnlyElement(granularTimeDimensions).getDruidExpression(),
plannerContext.getExprMacroTable()
);
}
}

final OrderByColumnSpec limitColumn;
if (sorting.getOrderBys().isEmpty()) {
limitColumn = new OrderByColumnSpec(
Expand Down Expand Up @@ -804,7 +839,7 @@ public TopNQuery toTopNQuery()
Ints.checkedCast(sorting.getLimit()),
filtration.getQuerySegmentSpec(),
filtration.getDimFilter(),
Granularities.ALL,
granularity,
grouping.getAggregatorFactories(),
postAggregators,
ImmutableSortedMap.copyOf(plannerContext.getQueryContext())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.table.RowSignature;
Expand All @@ -59,8 +60,10 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -237,11 +240,26 @@ public Sequence<Object[]> apply(final Result<TopNResultValue> result)
final List<DimensionAndMetricValueExtractor> rows = result.getValue().getValue();
final List<Object[]> retVals = new ArrayList<>(rows.size());

// Extract possible time output names when this TOP N query is converted from a SQL with two GROUP BY
// dimensions with one of them being a granular time dimension
Set<String> timeOutputNames = (druidQuery.getGrouping().getDimensions().size() == 2) ?
druidQuery.getGrouping()
.getDimensions()
.stream()
.filter(d -> Expressions.toQueryGranularity(
d.getDruidExpression(),
plannerContext.getExprMacroTable()
) != null)
.map(d -> d.getOutputName())
.collect(Collectors.toSet()) : new HashSet<>();

for (DimensionAndMetricValueExtractor row : rows) {
final Object[] retVal = new Object[fieldList.size()];
for (final RelDataTypeField field : fieldList) {
final String outputName = druidQuery.getOutputRowSignature().getRowOrder().get(field.getIndex());
retVal[field.getIndex()] = coerce(row.getMetric(outputName), field.getType().getSqlTypeName());
Object value = (timeOutputNames.size() == 1 && timeOutputNames.contains(outputName)) ?
result.getTimestamp() : row.getMetric(outputName);
retVal[field.getIndex()] = coerce(value, field.getType().getSqlTypeName());
}

retVals.add(retVal);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,15 @@ public int getMaxSemiJoinRowsInMemory()
QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE
);

public static final Map<String, Object> QUERY_CONTEXT_ATTEMPT_CONVERTING_TO_TOP_N_WITH_TWO_GROUP_BY_DIMENSIONS =
ImmutableMap.of(
PlannerContext.CTX_SQL_QUERY_ID, DUMMY_SQL_ID,
PlannerContext.CTX_SQL_CURRENT_TIMESTAMP, "2000-01-01T00:00:00Z",
PlannerConfig.CTX_KEY_ATTEMPT_CONVERTING_TO_TOP_N_WITH_TWO_GROUP_BY_DIMENSIONS, "true",
QueryContexts.DEFAULT_TIMEOUT_KEY, QueryContexts.DEFAULT_TIMEOUT_MILLIS,
QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE
);

// Matches QUERY_CONTEXT_LOS_ANGELES
public static final Map<String, Object> TIMESERIES_CONTEXT_LOS_ANGELES = new HashMap<>();
public static final PagingSpec FIRST_PAGING_SPEC = new PagingSpec(null, 1000, true);
Expand Down
122 changes: 122 additions & 0 deletions sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1600,6 +1600,128 @@ public void testTopNWithSelectProjections() throws Exception
);
}

private void testConvertToTopNWithTwoGroupByDimensionsHelper(int limit, List<Object[]> groupByExpectedResults, List<Object[]> topNExpectedResults) throws Exception
{
String sql = "SELECT\n"
+ " FLOOR(__time TO month),\n"
+ " dim1,\n"
+ " sum(m2)\n"
+ "FROM druid.foo\n"
+ "GROUP BY dim1, FLOOR(__time TO month)\n"
+ "ORDER BY dim1\n"
+ "LIMIT " + limit;

testQuery(
sql,
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(CalciteTests.DATASOURCE1)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setVirtualColumns(
expressionVirtualColumn(
"v0",
"timestamp_floor(\"__time\",'P1M',null,'UTC')",
ValueType.LONG
)
)
.setDimensions(dimensions(
new DefaultDimensionSpec("dim1", "d0"),
new DefaultDimensionSpec("v0", "v0", ValueType.LONG)
))
.setAggregatorSpecs(aggregators(new DoubleSumAggregatorFactory("a0", "m2")))
.setLimitSpec(
new DefaultLimitSpec(
ImmutableList.of(
new OrderByColumnSpec(
"d0",
OrderByColumnSpec.Direction.ASCENDING,
StringComparators.LEXICOGRAPHIC
)
),
limit
)
)
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
groupByExpectedResults
);

testQuery(
PLANNER_CONFIG_DEFAULT,
QUERY_CONTEXT_ATTEMPT_CONVERTING_TO_TOP_N_WITH_TWO_GROUP_BY_DIMENSIONS,
sql,
CalciteTests.REGULAR_USER_AUTH_RESULT,
ImmutableList.of(new TopNQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.granularity(Granularities.MONTH)
.dimension(new DefaultDimensionSpec("dim1", "d0"))
.aggregators(aggregators(
new DoubleSumAggregatorFactory("a0", "m2")
))
.metric(new DimensionTopNMetricSpec(null, StringComparators.LEXICOGRAPHIC))
.virtualColumns(
expressionVirtualColumn(
"v0",
"timestamp_floor(\"__time\",'P1M',null,'UTC')",
ValueType.LONG
)
)
.threshold(limit)
.context(QUERY_CONTEXT_ATTEMPT_CONVERTING_TO_TOP_N_WITH_TWO_GROUP_BY_DIMENSIONS)
.build()),
topNExpectedResults
);
}

@Test
public void testConvertToTopNWithTwoGroupByDimensions() throws Exception
{
skipVectorize();

// When execute as a GROUP BY query, the limit is appied globally, so there will be at most `limit` number of rows returned
// When execute as a TOP N query, the limit is applied per group within each granular time bucket, so there will be at most (`limit` * number of distinct groups within each granular time bucket) number of rows returned
// When limit is large enough, the result is the same except for potential ordering difference

// Pick a large enough limit
testConvertToTopNWithTwoGroupByDimensionsHelper(
10,
ImmutableList.of(
new Object[]{timestamp("2000-01-01"), "", 1.0},
new Object[]{timestamp("2001-01-01"), "1", 4.0},
new Object[]{timestamp("2000-01-01"), "10.1", 2.0},
new Object[]{timestamp("2000-01-01"), "2", 3.0},
new Object[]{timestamp("2001-01-01"), "abc", 6.0},
new Object[]{timestamp("2001-01-01"), "def", 5.0}
),
ImmutableList.of(
new Object[]{timestamp("2000-01-01"), "", 1.0},
new Object[]{timestamp("2000-01-01"), "10.1", 2.0},
new Object[]{timestamp("2000-01-01"), "2", 3.0},
new Object[]{timestamp("2001-01-01"), "1", 4.0},
new Object[]{timestamp("2001-01-01"), "abc", 6.0},
new Object[]{timestamp("2001-01-01"), "def", 5.0}
)
);

// Pick a limit smaller than number of distinct groups within each granular time bucket
testConvertToTopNWithTwoGroupByDimensionsHelper(
2,
ImmutableList.of(
new Object[]{timestamp("2000-01-01"), "", 1.0},
new Object[]{timestamp("2001-01-01"), "1", 4.0}
),
ImmutableList.of(
new Object[]{timestamp("2000-01-01"), "", 1.0},
new Object[]{timestamp("2000-01-01"), "10.1", 2.0},
new Object[]{timestamp("2001-01-01"), "1", 4.0},
new Object[]{timestamp("2001-01-01"), "abc", 6.0}
)
);
}

@Test
public void testTopNWithSelectAndOrderByProjections() throws Exception
{
Expand Down

0 comments on commit 3f2604d

Please sign in to comment.