-
Notifications
You must be signed in to change notification settings - Fork 180
Convert dedup pushdown to composite + top_hits
#4844
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: Lantao Jin <ltjin@amazon.com>
Signed-off-by: Lantao Jin <ltjin@amazon.com>
Signed-off-by: Lantao Jin <ltjin@amazon.com>
Signed-off-by: Lantao Jin <ltjin@amazon.com>
Signed-off-by: Lantao Jin <ltjin@amazon.com>
Signed-off-by: Lantao Jin <ltjin@amazon.com>
| return project == null | ||
| ? List.of() | ||
| : aggCall.getArgList().stream().map(project.getProjects()::get).toList(); | ||
| : PlanUtils.getObjectFromLiteralAgg(aggCall) != null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you elaborate more on why calling this method here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it for identifying LITERAL_AGG?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. LITERAL_AGG passes the numberOfDedup here
opensearch/src/main/java/org/opensearch/sql/opensearch/request/AggregateAnalyzer.java
Outdated
Show resolved
Hide resolved
| LogicalFilter(condition=[IS NOT NULL($4)]) | ||
| CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) | ||
| physical: | | ||
| CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[account_number, firstname, address, balance, gender, city, employer, state, age, email, lastname], FILTER->IS NOT NULL($4), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=LogicalProject#,group={0},agg#0=LITERAL_AGG(1)), LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"exists":{"field":"gender","boost":1.0}},"_source":{"includes":["account_number","firstname","address","balance","gender","city","employer","state","age","email","lastname"],"excludes":[]},"aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"gender":{"terms":{"field":"gender.keyword","missing_bucket":false,"order":"asc"}}}]},"aggregations":{"$f1":{"top_hits":{"from":0,"size":1,"version":false,"seq_no_primary_term":false,"explain":false,"_source":false,"fields":[{"field":"gender"},{"field":"account_number"},{"field":"firstname"},{"field":"address"},{"field":"balance"},{"field":"city"},{"field":"employer"},{"field":"state"},{"field":"age"},{"field":"email"},{"field":"lastname"}]}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) No newline at end of file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We won't have FILTER->IS NOT NULL($4) push down for common aggregate push down, after this PR: #4843
Could you check if we can remove them as well for dedup push down?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
|
||
| // 3. Push an Aggregate | ||
| final List<RexNode> newDedupColumns = RexUtil.apply(mappingForDedupColumns, dedupColumns); | ||
| relBuilder.aggregate(relBuilder.groupKey(newDedupColumns), relBuilder.literalAgg(dedupNumer)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LITERAL_AGG in Calcite has totally different function as we use here. It seems to be tricky to implement this in this way. Do we have alternative approach? Or at least add some comments to notice that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have other alternative approach. I think it's safe since LITERAL_AGG usually used in Project removing for Aggregate. And PPL doesn't have explicit syntax to call LITERAL_AGG. I will add comment to elaborate.
...earch/src/main/java/org/opensearch/sql/opensearch/data/value/OpenSearchExprValueFactory.java
Outdated
Show resolved
Hide resolved
| } | ||
| Integer dedupNumber = literal.getValueAs(Integer.class); | ||
| TopHitsAggregationBuilder topHitsAggregationBuilder = | ||
| AggregationBuilders.topHits(aggFieldName).from(0).fetchSource(false).size(dedupNumber); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you elaborate more on why we set fetchSource(false) here? The default value is true?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update: change to use fetchSource instead of fetchField due to fetchField cannot work on OS object type.
| // LinkedHashMap["name" -> "A", "category" -> "X"], | ||
| // LinkedHashMap["name" -> "A", "category" -> "X"] | ||
| // ] | ||
| List<Map<String, Object>> res = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[question] Is this the only case that agg metric parser should return a List of value for each bucket? I'm thinking Is it still appropriate to translate dedup x to aggregate? Normally speaking, aggregate should return only 1 row value for the metric in each bucket. While dedup x can return more than 1 rows and it now affects the API of agg metric parser.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[question] Is this the only case that agg metric parser should return a List of value for each bucket?
Yeah, just top_hits metric agg does. top_hits metric agg in OpenSearch can be used in any bucket agg in DSL as a sub-aggregation, it's quite different with SQL' aggregate.
Signed-off-by: Lantao Jin <ltjin@amazon.com>
Signed-off-by: Lantao Jin <ltjin@amazon.com>
Signed-off-by: Lantao Jin <ltjin@amazon.com>
| .aggregateCall(SqlStdOperatorTable.ROW_NUMBER) | ||
| .over() | ||
| .partitionBy(dedupeFields) | ||
| .orderBy(dedupeFields) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will dedupe work without ordering by deduped fields?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
em, it should work in non-pushdown case. maybe we could remove this orderBy in window.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks a little strange to me because the sort keys in a window should be the same (the partition key)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. it was not introduced by this pr. let's fix them in followup PR since it will change the plan a lot.
|
|
||
| @ToString.Exclude private final Settings settings; | ||
|
|
||
| @ToString.Exclude private boolean topHitsAgg = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this field used for? It seems it's not referred anywhere
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. can be deleted in current impl
| @Ignore("https://github.com/opensearch-project/sql/issues/4789") | ||
| public void testDedupExpr() throws IOException { | ||
| enabledOnlyWhenPushdownIsEnabled(); | ||
| String expected = loadExpectedPlan("explain_dedup_expr1.yaml"); | ||
| assertYamlEqualsIgnoreId( | ||
| expected, | ||
| explainQueryYaml( | ||
| "source=opensearch-sql_test_index_account | eval new_gender = lower(gender) | dedup 1" | ||
| + " new_gender")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't such case covered by CalcitePPLDedupIT.testDedupExpr
| String.format("Unsupported push-down aggregator %s", aggCall.getAggregation())); | ||
| }; | ||
| } | ||
| case LITERAL_AGG -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you check if we will never produce a LITERAL_AGG in RelBuilder or by some rules in planner?
Otherwise, we may push down a real LITERAL_AGG to be tophits while it shouldn't be.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One alternative in my mind is using a self-defined class extends calcite.Aggregate. It should also be able to leverage our AggregateAnalyzer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you check if we will never produce a LITERAL_AGG in RelBuilder or by some rules in planner?
Otherwise, we may push down a real LITERAL_AGG to be tophits while it shouldn't be.
Checked, so far no LITERAL_AGG can be produced because PPL doesn't support literal in aggregators.
In SQL
SELECT
dept_id,
COUNT(*) as emp_count,
1 as constant_val
FROM employees
GROUP BY dept_id;
Could be rewritten to
SELECT
dept_id,
COUNT(*) as emp_count,
LITERAL_AGG(1) as constant_val
FROM employees
GROUP BY dept_id;
to reduce a Project upon Aggregate.
From
Project(dept_id, emp_count, constant_val=1)
Aggregate(GROUP BY dept_id, compute COUNT(*) as emp_count)
Scan(employees)
To
Aggregate(GROUP BY dept_id, compute COUNT(*) as emp_count, compute LITERAL_AGG(1) as constant_val)
Scan(employees)
But stats only support pre-defined aggregators. cc @qianheng-aws
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see SubQueryRemoveRule will produce LITERAL_AGG for some(we don't have) and in subquery. But I'm not sure whether it will be triggered.
And RelBuilder's literalAgg method is public, we should avoid call that method by developers then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
The backport to To backport manually, run these commands in your terminal: # Navigate to the root of your repository
cd $(git rev-parse --show-toplevel)
# Fetch latest updates from GitHub
git fetch
# Create a new working tree
git worktree add ../.worktrees/sql/backport-2.19-dev 2.19-dev
# Navigate to the new working tree
pushd ../.worktrees/sql/backport-2.19-dev
# Create a new branch
git switch --create backport/backport-4844-to-2.19-dev
# Cherry-pick the merged commit of this pull request and resolve the conflicts
git cherry-pick -x --mainline 1 5ceacb6945d6bacc36e037ce4da215e1cf031b56
# Push it to GitHub
git push --set-upstream origin backport/backport-4844-to-2.19-dev
# Go back to the original working tree
popd
# Delete the working tree
git worktree remove ../.worktrees/sql/backport-2.19-devThen, create a pull request where the |
…4844) * Enable dedup pushdown Signed-off-by: Lantao Jin <ltjin@amazon.com> * fix doctest Signed-off-by: Lantao Jin <ltjin@amazon.com> * refactor Signed-off-by: Lantao Jin <ltjin@amazon.com> * Disable dedup expr Signed-off-by: Lantao Jin <ltjin@amazon.com> * fix IT Signed-off-by: Lantao Jin <ltjin@amazon.com> * fix yaml test Signed-off-by: Lantao Jin <ltjin@amazon.com> * add more comments in code Signed-off-by: Lantao Jin <ltjin@amazon.com> * fix conflicts Signed-off-by: Lantao Jin <ltjin@amazon.com> * Address comments Signed-off-by: Lantao Jin <ltjin@amazon.com> --------- Signed-off-by: Lantao Jin <ltjin@amazon.com> (cherry picked from commit 5ceacb6)
…4844) * Enable dedup pushdown Signed-off-by: Lantao Jin <ltjin@amazon.com> * fix doctest Signed-off-by: Lantao Jin <ltjin@amazon.com> * refactor Signed-off-by: Lantao Jin <ltjin@amazon.com> * Disable dedup expr Signed-off-by: Lantao Jin <ltjin@amazon.com> * fix IT Signed-off-by: Lantao Jin <ltjin@amazon.com> * fix yaml test Signed-off-by: Lantao Jin <ltjin@amazon.com> * add more comments in code Signed-off-by: Lantao Jin <ltjin@amazon.com> * fix conflicts Signed-off-by: Lantao Jin <ltjin@amazon.com> * Address comments Signed-off-by: Lantao Jin <ltjin@amazon.com> --------- Signed-off-by: Lantao Jin <ltjin@amazon.com>
Description
Convert
deduppushdown to composite + top_hitsMain changes:
DedupPushdownRuleto convert the dedup plan pattern to Aggregate with TopHits metrics_source(fetchField cannot work for OS Object type)MetricParser:Map<String, Object> parse(Aggregation aggregation);->List<Map<String, Object>> parse(Aggregation aggregation);Follow-ups: Support dedup on script (expression)
Related Issues
Resolves #4797
Check List
--signoffor-s.By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.