diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java index 61ad6b29d5cb..518b35d9e51a 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java @@ -31,6 +31,8 @@ public class PlannerConfig { public static final String CTX_KEY_USE_APPROXIMATE_COUNT_DISTINCT = "useApproximateCountDistinct"; public static final String CTX_KEY_USE_APPROXIMATE_TOPN = "useApproximateTopN"; + public static final String CTX_KEY_ATTEMPT_CONVERTING_TO_TOP_N_WITH_TWO_GROUP_BY_DIMENSIONS = + "attemptConvertingToTopNWithTwoGroupByDimensions"; @JsonProperty private Period metadataRefreshPeriod = new Period("PT1M"); @@ -65,6 +67,14 @@ public class PlannerConfig @JsonProperty private long metadataSegmentPollPeriod = 60000; + @JsonProperty + private boolean attemptConvertingToTopNWithTwoGroupByDimensions = false; + + public boolean isAttemptConvertingToTopNWithTwoGroupByDimensions() + { + return attemptConvertingToTopNWithTwoGroupByDimensions; + } + public long getMetadataSegmentPollPeriod() { return metadataSegmentPollPeriod; @@ -154,6 +164,11 @@ public PlannerConfig withOverrides(final Map context) newConfig.metadataSegmentCacheEnable = isMetadataSegmentCacheEnable(); newConfig.metadataSegmentPollPeriod = getMetadataSegmentPollPeriod(); newConfig.serializeComplexValues = shouldSerializeComplexValues(); + newConfig.attemptConvertingToTopNWithTwoGroupByDimensions = getContextBoolean( + context, + CTX_KEY_ATTEMPT_CONVERTING_TO_TOP_N_WITH_TWO_GROUP_BY_DIMENSIONS, + isAttemptConvertingToTopNWithTwoGroupByDimensions() + ); return newConfig; } @@ -195,6 +210,7 @@ public boolean equals(final Object o) metadataSegmentCacheEnable == that.metadataSegmentCacheEnable && metadataSegmentPollPeriod == that.metadataSegmentPollPeriod && serializeComplexValues == that.serializeComplexValues && + attemptConvertingToTopNWithTwoGroupByDimensions == that.attemptConvertingToTopNWithTwoGroupByDimensions && Objects.equals(metadataRefreshPeriod, that.metadataRefreshPeriod) && Objects.equals(sqlTimeZone, that.sqlTimeZone); } @@ -215,7 +231,8 @@ public int hashCode() sqlTimeZone, metadataSegmentCacheEnable, metadataSegmentPollPeriod, - serializeComplexValues + serializeComplexValues, + attemptConvertingToTopNWithTwoGroupByDimensions ); } @@ -235,6 +252,7 @@ public String toString() ", metadataSegmentPollPeriod=" + metadataSegmentPollPeriod + ", sqlTimeZone=" + sqlTimeZone + ", serializeComplexValues=" + serializeComplexValues + + ", attemptConvertingToTopNWithTwoGroupByDimensions=" + attemptConvertingToTopNWithTwoGroupByDimensions + '}'; } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java index 1c72b4217095..e91fa56dc58a 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java @@ -744,9 +744,9 @@ public TimeseriesQuery toTimeseriesQuery() @Nullable public TopNQuery toTopNQuery() { - // Must have GROUP BY one column, ORDER BY zero or one column, limit less than maxTopNLimit, and no HAVING. - final boolean topNOk = grouping != null - && grouping.getDimensions().size() == 1 + // Must have GROUP BY one column (or GROUP BY two columns as an experimental feature when attemptConvertingToTopNWithTwoGroupByDimensions is set to true in query context), ORDER BY zero or one column, limit less than maxTopNLimit, and no HAVING. + boolean topNOk = grouping != null + && (grouping.getDimensions().size() == 1 || (plannerContext.getPlannerConfig().isAttemptConvertingToTopNWithTwoGroupByDimensions() && grouping.getDimensions().size() == 2)) && sorting != null && (sorting.getOrderBys().size() <= 1 && sorting.isLimited() && sorting.getLimit() <= plannerContext.getPlannerConfig() @@ -757,7 +757,42 @@ public TopNQuery toTopNQuery() return null; } - final DimensionSpec dimensionSpec = Iterables.getOnlyElement(grouping.getDimensions()).toDimensionSpec(); + // When attemptConvertingToTopNWithTwoGroupByDimensions is set to true in query context: + // It is also convertable to TOP N query when GROUP BY two columns with one of them being a granular time + // Meanwhile, granularity can be inferred from the GROUP BY column of granular time + // + // Caveats: + // When execute as a GROUP BY query, the limit is appied globally, so there will be at most `limit` number of rows returned + // When execute as a TOP N query, the limit is applied per group within each granular time bucket, so there will be at most (`limit` * number of distinct groups within each granular time bucket) number of rows returned + // When limit is large enough, the result is the same except for potential ordering difference + final DimensionSpec dimensionSpec; + final Granularity granularity; + if (grouping.getDimensions().size() == 1) { + dimensionSpec = Iterables.getOnlyElement(grouping.getDimensions()).toDimensionSpec(); + granularity = Granularities.ALL; + } else { + List nonGranularTimeDimensions = new ArrayList<>(); + List granularTimeDimensions = new ArrayList<>(); + grouping.getDimensions() + .forEach(d -> ( + Expressions.toQueryGranularity( + d.getDruidExpression(), + plannerContext.getExprMacroTable() + ) == null ? nonGranularTimeDimensions : granularTimeDimensions) + .add(d)); + + if (!(nonGranularTimeDimensions.size() == 1 && granularTimeDimensions.size() == 1)) { + // Not a TOP N query + return null; + } else { + dimensionSpec = Iterables.getOnlyElement(nonGranularTimeDimensions).toDimensionSpec(); + granularity = Expressions.toQueryGranularity( + Iterables.getOnlyElement(granularTimeDimensions).getDruidExpression(), + plannerContext.getExprMacroTable() + ); + } + } + final OrderByColumnSpec limitColumn; if (sorting.getOrderBys().isEmpty()) { limitColumn = new OrderByColumnSpec( @@ -804,7 +839,7 @@ public TopNQuery toTopNQuery() Ints.checkedCast(sorting.getLimit()), filtration.getQuerySegmentSpec(), filtration.getDimFilter(), - Granularities.ALL, + granularity, grouping.getAggregatorFactories(), postAggregators, ImmutableSortedMap.copyOf(plannerContext.getQueryContext()) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/QueryMaker.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/QueryMaker.java index 6ba8dfa14a34..2a211c8edfac 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/QueryMaker.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/QueryMaker.java @@ -50,6 +50,7 @@ import org.apache.druid.segment.DimensionHandlerUtils; import org.apache.druid.server.QueryLifecycleFactory; import org.apache.druid.server.security.AuthenticationResult; +import org.apache.druid.sql.calcite.expression.Expressions; import org.apache.druid.sql.calcite.planner.Calcites; import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.table.RowSignature; @@ -59,8 +60,10 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; @@ -237,11 +240,26 @@ public Sequence apply(final Result result) final List rows = result.getValue().getValue(); final List retVals = new ArrayList<>(rows.size()); + // Extract possible time output names when this TOP N query is converted from a SQL with two GROUP BY + // dimensions with one of them being a granular time dimension + Set timeOutputNames = (druidQuery.getGrouping().getDimensions().size() == 2) ? + druidQuery.getGrouping() + .getDimensions() + .stream() + .filter(d -> Expressions.toQueryGranularity( + d.getDruidExpression(), + plannerContext.getExprMacroTable() + ) != null) + .map(d -> d.getOutputName()) + .collect(Collectors.toSet()) : new HashSet<>(); + for (DimensionAndMetricValueExtractor row : rows) { final Object[] retVal = new Object[fieldList.size()]; for (final RelDataTypeField field : fieldList) { final String outputName = druidQuery.getOutputRowSignature().getRowOrder().get(field.getIndex()); - retVal[field.getIndex()] = coerce(row.getMetric(outputName), field.getType().getSqlTypeName()); + Object value = (timeOutputNames.size() == 1 && timeOutputNames.contains(outputName)) ? + result.getTimestamp() : row.getMetric(outputName); + retVal[field.getIndex()] = coerce(value, field.getType().getSqlTypeName()); } retVals.add(retVal); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java index 8bffa486210e..44ea43edc001 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java @@ -212,6 +212,15 @@ public int getMaxSemiJoinRowsInMemory() QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE ); + public static final Map QUERY_CONTEXT_ATTEMPT_CONVERTING_TO_TOP_N_WITH_TWO_GROUP_BY_DIMENSIONS = + ImmutableMap.of( + PlannerContext.CTX_SQL_QUERY_ID, DUMMY_SQL_ID, + PlannerContext.CTX_SQL_CURRENT_TIMESTAMP, "2000-01-01T00:00:00Z", + PlannerConfig.CTX_KEY_ATTEMPT_CONVERTING_TO_TOP_N_WITH_TWO_GROUP_BY_DIMENSIONS, "true", + QueryContexts.DEFAULT_TIMEOUT_KEY, QueryContexts.DEFAULT_TIMEOUT_MILLIS, + QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE + ); + // Matches QUERY_CONTEXT_LOS_ANGELES public static final Map TIMESERIES_CONTEXT_LOS_ANGELES = new HashMap<>(); public static final PagingSpec FIRST_PAGING_SPEC = new PagingSpec(null, 1000, true); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 1b30f7a08ea8..497ae00eeaa6 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -1600,6 +1600,128 @@ public void testTopNWithSelectProjections() throws Exception ); } + private void testConvertToTopNWithTwoGroupByDimensionsHelper(int limit, List groupByExpectedResults, List topNExpectedResults) throws Exception + { + String sql = "SELECT\n" + + " FLOOR(__time TO month),\n" + + " dim1,\n" + + " sum(m2)\n" + + "FROM druid.foo\n" + + "GROUP BY dim1, FLOOR(__time TO month)\n" + + "ORDER BY dim1\n" + + "LIMIT " + limit; + + testQuery( + sql, + ImmutableList.of( + GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setVirtualColumns( + expressionVirtualColumn( + "v0", + "timestamp_floor(\"__time\",'P1M',null,'UTC')", + ValueType.LONG + ) + ) + .setDimensions(dimensions( + new DefaultDimensionSpec("dim1", "d0"), + new DefaultDimensionSpec("v0", "v0", ValueType.LONG) + )) + .setAggregatorSpecs(aggregators(new DoubleSumAggregatorFactory("a0", "m2"))) + .setLimitSpec( + new DefaultLimitSpec( + ImmutableList.of( + new OrderByColumnSpec( + "d0", + OrderByColumnSpec.Direction.ASCENDING, + StringComparators.LEXICOGRAPHIC + ) + ), + limit + ) + ) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + groupByExpectedResults + ); + + testQuery( + PLANNER_CONFIG_DEFAULT, + QUERY_CONTEXT_ATTEMPT_CONVERTING_TO_TOP_N_WITH_TWO_GROUP_BY_DIMENSIONS, + sql, + CalciteTests.REGULAR_USER_AUTH_RESULT, + ImmutableList.of(new TopNQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(querySegmentSpec(Filtration.eternity())) + .granularity(Granularities.MONTH) + .dimension(new DefaultDimensionSpec("dim1", "d0")) + .aggregators(aggregators( + new DoubleSumAggregatorFactory("a0", "m2") + )) + .metric(new DimensionTopNMetricSpec(null, StringComparators.LEXICOGRAPHIC)) + .virtualColumns( + expressionVirtualColumn( + "v0", + "timestamp_floor(\"__time\",'P1M',null,'UTC')", + ValueType.LONG + ) + ) + .threshold(limit) + .context(QUERY_CONTEXT_ATTEMPT_CONVERTING_TO_TOP_N_WITH_TWO_GROUP_BY_DIMENSIONS) + .build()), + topNExpectedResults + ); + } + + @Test + public void testConvertToTopNWithTwoGroupByDimensions() throws Exception + { + skipVectorize(); + + // When execute as a GROUP BY query, the limit is appied globally, so there will be at most `limit` number of rows returned + // When execute as a TOP N query, the limit is applied per group within each granular time bucket, so there will be at most (`limit` * number of distinct groups within each granular time bucket) number of rows returned + // When limit is large enough, the result is the same except for potential ordering difference + + // Pick a large enough limit + testConvertToTopNWithTwoGroupByDimensionsHelper( + 10, + ImmutableList.of( + new Object[]{timestamp("2000-01-01"), "", 1.0}, + new Object[]{timestamp("2001-01-01"), "1", 4.0}, + new Object[]{timestamp("2000-01-01"), "10.1", 2.0}, + new Object[]{timestamp("2000-01-01"), "2", 3.0}, + new Object[]{timestamp("2001-01-01"), "abc", 6.0}, + new Object[]{timestamp("2001-01-01"), "def", 5.0} + ), + ImmutableList.of( + new Object[]{timestamp("2000-01-01"), "", 1.0}, + new Object[]{timestamp("2000-01-01"), "10.1", 2.0}, + new Object[]{timestamp("2000-01-01"), "2", 3.0}, + new Object[]{timestamp("2001-01-01"), "1", 4.0}, + new Object[]{timestamp("2001-01-01"), "abc", 6.0}, + new Object[]{timestamp("2001-01-01"), "def", 5.0} + ) + ); + + // Pick a limit smaller than number of distinct groups within each granular time bucket + testConvertToTopNWithTwoGroupByDimensionsHelper( + 2, + ImmutableList.of( + new Object[]{timestamp("2000-01-01"), "", 1.0}, + new Object[]{timestamp("2001-01-01"), "1", 4.0} + ), + ImmutableList.of( + new Object[]{timestamp("2000-01-01"), "", 1.0}, + new Object[]{timestamp("2000-01-01"), "10.1", 2.0}, + new Object[]{timestamp("2001-01-01"), "1", 4.0}, + new Object[]{timestamp("2001-01-01"), "abc", 6.0} + ) + ); + } + @Test public void testTopNWithSelectAndOrderByProjections() throws Exception {