Skip to content

Support aggregation/window commands with dynamic fields#4743

Merged
ykmr1224 merged 9 commits into
opensearch-project:feature/permissivefrom
ykmr1224:dynamic-aggregation
Nov 19, 2025
Merged

Support aggregation/window commands with dynamic fields#4743
ykmr1224 merged 9 commits into
opensearch-project:feature/permissivefrom
ykmr1224:dynamic-aggregation

Conversation

@ykmr1224

@ykmr1224 ykmr1224 commented Nov 5, 2025

Copy link
Copy Markdown
Collaborator

This PR is for feature branch feature/permissive

Description

  • Support aggregation/window commands with dynamic fields
    • stats, eventstats, timechart, trendline
  • DebugUtils/JsonUtils are just utility class mainly for tests and debugging.

Related Issues

Permissive mode RFC: #4349
Dynamic fields RFC: #4433

Check List

  • New functionality includes testing.
  • New functionality has been documented.
  • New functionality has javadoc added.
  • New functionality has a user manual doc added.
  • New PPL command checklist all confirmed.
  • API changes companion pull request created.
  • Commits are signed per the DCO using --signoff or -s.
  • Public documentation issue/PR created.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@ykmr1224 ykmr1224 added PPL Piped processing language calcite calcite migration releated labels Nov 5, 2025
@ykmr1224 ykmr1224 marked this pull request as ready for review November 5, 2025 16:52
@ykmr1224 ykmr1224 self-assigned this Nov 5, 2025
@ykmr1224 ykmr1224 added the enhancement New feature or request label Nov 5, 2025
Comment thread common/src/main/java/org/opensearch/sql/common/utils/DebugUtils.java Outdated
Comment thread core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java Outdated
@ykmr1224 ykmr1224 force-pushed the dynamic-aggregation branch 2 times, most recently from 33bab3f to 6b2e491 Compare November 7, 2025 00:27
@ykmr1224

ykmr1224 commented Nov 7, 2025

Copy link
Copy Markdown
Collaborator Author

Updated to utilize type coercion.

Comment on lines +2099 to +2103
if (!context.fieldBuilder.isFieldSpecificType(byFieldName)) {
throw new IllegalArgumentException(
String.format(
"By field `%s` needs to be specific type. Please cast explicitly.", byFieldName));
}

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we cast to string for groupBy field?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realized timechart requires bigger change due to type assigned to span function, which prevents automatic type coercion work properly.
Let me address this in a separate PR.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found simpler way to solve the problem, and included the change in this PR.

@ykmr1224 ykmr1224 force-pushed the dynamic-aggregation branch from 44e2d10 to 0e2d036 Compare November 12, 2025 00:10
Comment thread core/src/main/java/org/opensearch/sql/expression/function/udf/SpanFunction.java Outdated
Comment on lines +2052 to +2054
projectDynamicFieldAsString(node.getBinExpression(), context);
projectDynamicFieldAsString(node.getByField(), context);

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it required for all visitor?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could u add a test in CalciteDynamicFieldsTimechartIT to help understand what is correspond logical plan / sql

@ykmr1224 ykmr1224 Nov 14, 2025

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added CalcitePPLDynamicFieldsTest.java‎ for spark SQL. Added explains in IT.

Signed-off-by: Tomoyuki Morita <moritato@amazon.com>
Signed-off-by: Tomoyuki Morita <moritato@amazon.com>
Signed-off-by: Tomoyuki Morita <moritato@amazon.com>
Signed-off-by: Tomoyuki Morita <moritato@amazon.com>
Signed-off-by: Tomoyuki Morita <moritato@amazon.com>
Signed-off-by: Tomoyuki Morita <moritato@amazon.com>
Signed-off-by: Tomoyuki Morita <moritato@amazon.com>
Signed-off-by: Tomoyuki Morita <moritato@amazon.com>
Signed-off-by: Tomoyuki Morita <moritato@amazon.com>
@ykmr1224 ykmr1224 force-pushed the dynamic-aggregation branch from d6acee2 to d552010 Compare November 15, 2025 00:00
verifyLogical(root, expectedLogical);

String expectedSparkSql =
"SELECT `id`, `name`, `_MAP`\n"

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The output always include _MAP columns?
@dai-chen does it works with unified ppl in spark?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, let me check.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It contains _MAP when the query does not explicitly select fields, since it should output all the dynamic fields along with static fields. (You can refer test case: testProjectStaticFields)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand if we submit such SQL query on S3 table to Spark directly, the changes include at least:

  1. Add _MAP to Spark table schema
  2. Add result expanding logic similarly as DynamicFieldsResultProcessor.expandDynamicFields()

Do you have example for writing _MAP? I want to check if more changes required.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dai-chen
_MAP should be automatically added to the table schema when permissive mode is enabled, or a command generate dynamic fields (like spath command without output param)

_MAP is collected here
Refer this PR for further context.

@dai-chen dai-chen Nov 18, 2025

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ykmr1224 I just want to make sure I’m understanding this correctly.

  • Case 1: For _MAP generated from a table, do we need to update the Spark catalog to add it when permissive mode is enabled? When you say "automatically added to the table", it means current OpenSearch schema right?
  • Case 2: For _MAP generated dynamically by a command like spath, could you share a concrete example, including:
    • the PPL query, and
    • the Spark SQL query generated?

Since our approach is to transpile PPL into Spark SQL, I’d like to ensure that all required semantics are encoded in the SQL we generate. Otherwise, we’ll need to estimate the effort for any changes required in the Spark SQL engine.

@ykmr1224 ykmr1224 Nov 18, 2025

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dai-chen
Case 1: Yes, it is added to OpenSearch schema (specifically to metadata fields). I am not sure how Spark catalog works, but I suppose we need to add _MAP to the catalog schema.

Case 2: Here is the sample SQL for ppl source=EMP | fields ENAME | spath input=ENAME

SELECT `mvappend`(`ENAME`, `JSON_EXTRACT_ALL`(`ENAME`)['ENAME']) `ENAME`, `MAP_REMOVE`(`JSON_EXTRACT_ALL`(`ENAME`), ARRAY ('ENAME')) `_MAP`
FROM `scott`.`EMP`

`MAP_REMOVE`(`JSON_EXTRACT_ALL`(`ENAME`), ARRAY ('ENAME')) `_MAP` is where _MAP is assigned. (MAP_REMOVE is to dedupe the fields in static field)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, case 1 may need some changes and we can focus on case 2. Posted the Spark SQL query generated in my understanding.

# Test data
search source=test_events;
25/11/19 11:10:06 WARN UnifiedQueryParser: PPL translated to Spark SQL:
 SELECT *
FROM `spark_catalog`.`default`.`test_events`

@timestamp	host	packets	message
2025-09-08 10:00:00	server1	60	{"category":1, "resource":"A"}
2025-09-08 10:01:00	server1	120	{"category":2, "resource":"B"}
2025-09-08 10:02:00	server1	60	{"category":3, "resource":"C"}
2025-09-08 10:02:30	server2	180	{"category":4, "resource":"D"}

# PPL query
# source=test_events | spath input=message | eval cat = abs(category) * 10

# Spark SQL query expected
spark-sql (default)>
                   > SELECT
                   >   ABS(TRY_CAST(`_MAP`['category'] AS INT) * 10) AS `cat`
                   > FROM (
                   >   SELECT `JSON_EXTRACT_ALL`(`message`) AS `_MAP`
                   >   FROM `test_events`
                   > );
line 2:14 missing ')' at '('
cat
10
20
30
40

If this is correct, the only question is expand logic in DynamicFieldsResultProcessor.expandDynamicFields()


JSONObject result = executeQuery(query);

assertExplainYaml(

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we only assert the part we're interested in?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added this per request from @penghuo to add explain verification, and I think it is better keeping whole part to detect when plan is changed.
I would migrate it to separate file once I merge the change and enabled permissive mode in main branch. (it is currently enabled only in integration test and cannot use same test base class)

verifyLogical(root, expectedLogical);

String expectedSparkSql =
"SELECT `id`, `name`, `_MAP`\n"

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand if we submit such SQL query on S3 table to Spark directly, the changes include at least:

  1. Add _MAP to Spark table schema
  2. Add result expanding logic similarly as DynamicFieldsResultProcessor.expandDynamicFields()

Do you have example for writing _MAP? I want to check if more changes required.

@ykmr1224 ykmr1224 merged commit 990346a into opensearch-project:feature/permissive Nov 19, 2025
33 of 34 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

calcite calcite migration releated enhancement New feature or request PPL Piped processing language

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants