Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3038,14 +3038,22 @@ public RelNode visitRareTopN(RareTopN node, CalcitePlanContext context) {
countField = context.relBuilder.field(countFieldName);
}

// Append the rare/top field columns as secondary order keys so ties in the count column
// resolve deterministically. Without this, ROW_NUMBER's tie-break is insertion-order
// dependent and varies between backends (e.g. analytics-engine vs in-process Calcite).
List<RexNode> tieBreakKeys = rexVisitor.analyze(fieldList, context);
List<RexNode> orderKeys = new ArrayList<>(tieBreakKeys.size() + 1);
orderKeys.add(countField);
orderKeys.addAll(tieBreakKeys);

RexNode rowNumberWindowOver =
PlanUtils.makeOver(
context,
BuiltinFunctionName.ROW_NUMBER,
null,
List.of(),
partitionKeys,
List.of(countField),
orderKeys,
WindowFrame.toCurrentRow());
context.relBuilder.projectPlus(
context.relBuilder.alias(rowNumberWindowOver, ROW_NUMBER_COLUMN_FOR_RARE_TOP));
Expand Down
2 changes: 1 addition & 1 deletion docs/user/ppl/cmd/rare.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,10 @@ fetched rows / total rows = 4/4
+-----------------------+-------+
| email | count |
|-----------------------+-------|
| null | 1 |
| amberduke@pyrami.com | 1 |
| daleadams@boink.com | 1 |
| hattiebond@netagy.com | 1 |
| null | 1 |
+-----------------------+-------+
```

Expand Down
2 changes: 1 addition & 1 deletion docs/user/ppl/cmd/top.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,10 @@ fetched rows / total rows = 4/4
+-----------------------+-------+
| email | count |
|-----------------------+-------|
| null | 1 |
| amberduke@pyrami.com | 1 |
| daleadams@boink.com | 1 |
| hattiebond@netagy.com | 1 |
| null | 1 |
+-----------------------+-------+
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ calcite:
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
LogicalProject(address.city=[$0], count=[$1])
LogicalFilter(condition=[<=($2, 10)])
LogicalProject(address.city=[$0], count=[$1], _row_number_rare_top_=[ROW_NUMBER() OVER (ORDER BY $1 DESC)])
LogicalProject(address.city=[$0], count=[$1], _row_number_rare_top_=[ROW_NUMBER() OVER (ORDER BY $1 DESC, $0)])
LogicalAggregate(group=[{0}], count=[COUNT()])
LogicalProject(address.city=[$3])
LogicalFilter(condition=[IS NOT NULL($3)])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ calcite:
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
LogicalProject(gender=[$0], state=[$1], count=[$2])
LogicalFilter(condition=[<=($3, 2)])
LogicalProject(gender=[$0], state=[$1], count=[$2], _row_number_rare_top_=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $2)])
LogicalProject(gender=[$0], state=[$1], count=[$2], _row_number_rare_top_=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $2, $1)])
LogicalAggregate(group=[{0, 1}], count=[COUNT()])
LogicalProject(gender=[$4], state=[$7])
LogicalFilter(condition=[AND(IS NOT NULL($4), IS NOT NULL($7))])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ calcite:
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
LogicalProject(gender=[$0], state=[$1], count=[$2])
LogicalFilter(condition=[<=($3, 2)])
LogicalProject(gender=[$0], state=[$1], count=[$2], _row_number_rare_top_=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $2)])
LogicalProject(gender=[$0], state=[$1], count=[$2], _row_number_rare_top_=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $2, $1)])
LogicalAggregate(group=[{0, 1}], count=[COUNT()])
LogicalProject(gender=[$4], state=[$7])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
physical: |
EnumerableLimit(fetch=[10000])
EnumerableCalc(expr#0..3=[{inputs}], expr#4=[2], expr#5=[<=($t3, $t4)], proj#0..2=[{exprs}], $condition=[$t5])
EnumerableWindow(window#0=[window(partition {0} order by [2] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])
EnumerableWindow(window#0=[window(partition {0} order by [2, 1] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1},count=COUNT())], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"gender":{"terms":{"field":"gender.keyword","missing_bucket":true,"missing_order":"first","order":"asc"}}},{"state":{"terms":{"field":"state.keyword","missing_bucket":true,"missing_order":"first","order":"asc"}}}]}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ calcite:
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
LogicalProject(gender=[$0], state=[$1], count=[$2])
LogicalFilter(condition=[<=($3, 2)])
LogicalProject(gender=[$0], state=[$1], count=[$2], _row_number_rare_top_=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $2 DESC)])
LogicalProject(gender=[$0], state=[$1], count=[$2], _row_number_rare_top_=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $2 DESC, $1)])
LogicalAggregate(group=[{0, 1}], count=[COUNT()])
LogicalProject(gender=[$4], state=[$7])
LogicalFilter(condition=[AND(IS NOT NULL($4), IS NOT NULL($7))])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ calcite:
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
LogicalProject(gender=[$0], state=[$1], count=[$2])
LogicalFilter(condition=[<=($3, 2)])
LogicalProject(gender=[$0], state=[$1], count=[$2], _row_number_rare_top_=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $2 DESC)])
LogicalProject(gender=[$0], state=[$1], count=[$2], _row_number_rare_top_=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $2 DESC, $1)])
LogicalAggregate(group=[{0, 1}], count=[COUNT()])
LogicalProject(gender=[$4], state=[$7])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
physical: |
EnumerableLimit(fetch=[10000])
EnumerableCalc(expr#0..3=[{inputs}], expr#4=[2], expr#5=[<=($t3, $t4)], proj#0..2=[{exprs}], $condition=[$t5])
EnumerableWindow(window#0=[window(partition {0} order by [2 DESC] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])
EnumerableWindow(window#0=[window(partition {0} order by [2 DESC, 1] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1},count=COUNT())], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"gender":{"terms":{"field":"gender.keyword","missing_bucket":true,"missing_order":"first","order":"asc"}}},{"state":{"terms":{"field":"state.keyword","missing_bucket":true,"missing_order":"first","order":"asc"}}}]}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.List;
import java.util.function.Predicate;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.rules.SubstitutionRule;
Expand Down Expand Up @@ -49,22 +50,39 @@ protected void onMatchImpl(RelOptRuleCall call) {
List<Integer> groupIndices = PlanUtils.getSelectColumns(windows.getFirst().partitionKeys);
List<String> byList = groupIndices.stream().map(fieldNameList::get).toList();

if (windows.getFirst().orderKeys.size() != 1) {
// ORDER BY must be either the count column alone, or the count column followed by the
// rare/top target field ASC (the stable tie-break inserted by visitRareTopN). The pushdown
// to the OpenSearch terms aggregation naturally tie-breaks on `_key:asc`, so the latter
// shape lowers to the same wire request as the single-key shape.
List<RexFieldCollation> orderKeys = windows.getFirst().orderKeys;
if (orderKeys.isEmpty() || orderKeys.size() > 2) {
return;
}
RexFieldCollation orderKey = windows.getFirst().orderKeys.getFirst();
List<Integer> orderIndices = PlanUtils.getSelectColumns(List.of(orderKey.getKey()));
List<String> orderList = orderIndices.stream().map(fieldNameList::get).toList();
RexFieldCollation primaryOrderKey = orderKeys.getFirst();
List<Integer> primaryOrderIndices =
PlanUtils.getSelectColumns(List.of(primaryOrderKey.getKey()));
List<String> primaryOrderList = primaryOrderIndices.stream().map(fieldNameList::get).toList();
List<String> targetList =
fieldNameList.stream()
.filter(Predicate.not(byList::contains))
.filter(Predicate.not(orderList::contains))
.filter(Predicate.not(primaryOrderList::contains))
.toList();
if (targetList.size() != 1) {
return;
}
String targetName = targetList.getFirst();
digest = new RareTopDigest(targetName, byList, number, orderKey.getDirection());
if (orderKeys.size() == 2) {
RexFieldCollation tieBreakKey = orderKeys.get(1);
if (tieBreakKey.getDirection() != RelFieldCollation.Direction.ASCENDING) {
return;
}
List<Integer> tieBreakIndices = PlanUtils.getSelectColumns(List.of(tieBreakKey.getKey()));
List<String> tieBreakList = tieBreakIndices.stream().map(fieldNameList::get).toList();
if (tieBreakList.size() != 1 || !tieBreakList.getFirst().equals(targetName)) {
return;
}
}
digest = new RareTopDigest(targetName, byList, number, primaryOrderKey.getDirection());
} catch (Exception e) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,23 +190,25 @@ private UnifiedQueryContext buildContext(QueryType queryType, boolean profiling)
* setting(String, Object)} API, keeping {@link UnifiedQueryContext} decoupled from any specific
* {@link org.opensearch.sql.common.setting.Settings} implementation.
*
* <p>Currently scoped to {@code plugins.ppl.rex.max_match.limit} — required so the unified path
* honors {@code _cluster/settings} updates for {@code rex max_match} (CalciteRexCommandIT's
* testRexMaxMatchConfigurableLimit). Add keys here if a future PR / IT depends on cluster-side
* fidelity for one of the other planning settings.
* <p>Add keys here if a future PR / IT depends on cluster-side fidelity for one of the other
* planning settings.
*/
private UnifiedQueryContext.Builder applyClusterOverrides(UnifiedQueryContext.Builder builder) {
Object rexLimit =
pluginSettings.getSettingValue(
org.opensearch.sql.common.setting.Settings.Key.PPL_REX_MAX_MATCH_LIMIT);
if (rexLimit != null) {
builder.setting(
org.opensearch.sql.common.setting.Settings.Key.PPL_REX_MAX_MATCH_LIMIT.getKeyValue(),
rexLimit);
}
forwardClusterSetting(
builder, org.opensearch.sql.common.setting.Settings.Key.PPL_REX_MAX_MATCH_LIMIT);
forwardClusterSetting(
builder, org.opensearch.sql.common.setting.Settings.Key.PPL_SYNTAX_LEGACY_PREFERRED);
return builder;
}

private void forwardClusterSetting(
UnifiedQueryContext.Builder builder, org.opensearch.sql.common.setting.Settings.Key key) {
Object value = pluginSettings.getSettingValue(key);
if (value != null) {
builder.setting(key.getKeyValue(), value);
}
}

/**
* Extract the source index name by parsing the query and visiting the AST to find the Relation
* node. Uses the context's parser which supports both PPL and SQL.
Expand Down
Loading
Loading