spath: parquet-backed test indices for analytics-engine route#5441
spath: parquet-backed test indices for analytics-engine route#5441ahkcs wants to merge 1 commit into
Conversation
PR Reviewer Guide 🔍(Review updated until commit 5eb2dd4)Here are some key observations to aid the review process:
|
PR Code Suggestions ✨Latest suggestions up to 5eb2dd4 Explore these optional code suggestions:
Previous suggestionsSuggestions up to commit c4d4a8c
Suggestions up to commit 9f6aef8
Suggestions up to commit f5ea743
Suggestions up to commit 1a07691
|
1a07691 to
f5ea743
Compare
|
Persistent review updated to latest commit f5ea743 |
f5ea743 to
9f6aef8
Compare
| private static final String ARROW_TEXT_CLASS_NAME = "org.apache.arrow.vector.util.Text"; | ||
|
|
||
| /** | ||
| * Whether {@code o} is an Arrow {@code Text} (the UTF-8 byte-buffer wrapper that arrow's Map / | ||
| * Struct / List vectors emit for string values). FQN match keeps {@code core/} free of an Arrow | ||
| * dependency. | ||
| */ | ||
| private static boolean isArrowText(Object o) { | ||
| return o != null && ARROW_TEXT_CLASS_NAME.equals(o.getClass().getName()); | ||
| } |
There was a problem hiding this comment.
ExprValueUtils should do not know ARROW data type. Why ExprValueUtils been used on execution code path?
There was a problem hiding this comment.
Good catch, updated to remove the change
| private static final String AUTO_DOC_MAPPING = | ||
| "{\"mappings\":{\"properties\":{" | ||
| + "\"nested_doc\":{\"type\":\"keyword\"}," | ||
| + "\"array_doc\":{\"type\":\"keyword\"}," | ||
| + "\"merge_doc\":{\"type\":\"keyword\"}," | ||
| + "\"stringify_doc\":{\"type\":\"keyword\"}," | ||
| + "\"empty_doc\":{\"type\":\"keyword\"}," | ||
| + "\"malformed_doc\":{\"type\":\"keyword\"}}}}"; |
There was a problem hiding this comment.
Why add auto_doc_mapping? Becuae Analytics Eengine does not support dynamic mapping?
There was a problem hiding this comment.
Updated to remove explicit mapping, currently our IT creates the index using the lazy way, which makes it a default lucene-backed index, the change is to create the empty index up-front through the helper so the parquet toggle gets a chance to inject its settings
9f6aef8 to
c4d4a8c
Compare
`CalcitePPLSpathCommandIT.init()` was creating its four test indices by raw `PUT /<idx>/_doc/N` requests, which auto-creates the index via the default Lucene path. The analytics-engine compatibility run (`-Dtests.analytics.parquet_indices=true`) injects the parquet/composite settings *inside* `TestUtils.createIndexByRestClient`, so the raw-PUT indices were Lucene-only and DataFusion failed with `UnsupportedOperationException: acquireReader is not supported in EngineBackedIndexer` for every test on the analytics-engine route. Fix: create the empty index up-front via `createIndexByRestClient(..., null)` so the toggle has a chance to inject parquet settings, then let the subsequent doc PUTs populate it via dynamic mapping. No mapping is declared — DataFusion is fine with dynamic mapping on a parquet-backed composite index. Same pattern as `CalciteEvalCommandIT` and `CalciteFieldFormatCommandIT`. No change for the v2 / Calcite path (the helper is a no-op when the parquet toggle isn't set). ## Pass rate Pairs with opensearch-project/OpenSearch#21664. Both PRs are required to move the analytics-engine route off 0 / 16. | IT | Route | Before | After | |---|---|---|---| | `CalcitePPLSpathCommandIT` | analytics-engine (`-Dtests.analytics.force_routing=true -Dtests.analytics.parquet_indices=true`) | 0 / 16 | **16 / 16** | | `CalcitePPLSpathCommandIT` | default v2 / Calcite (no flags) | 16 / 16 | 16 / 16 (no regression) | Signed-off-by: Kai Huang <ahkcs@amazon.com>
c4d4a8c to
5eb2dd4
Compare
|
Persistent review updated to latest commit 5eb2dd4 |
…gine route
Closes the analytics-engine gap for the PPL `spath` command. The path-mode
variant (`spath path=...`) already worked via the existing `json_extract`
wiring; this PR adds the auto-extract mode (`spath input=doc` →
`JSON_EXTRACT_ALL` returning `MAP<VARCHAR, VARCHAR>`) and its downstream
operators (ITEM lookup, WHERE on extracted values).
## Pass rate
| IT | Before | After |
|---|---|---|
| `sql/integ-test/.../CalcitePPLSpathCommandIT` (`-Dtests.analytics.force_routing=true -Dtests.analytics.parquet_indices=true`) | 0 / 16 | 16 / 16 |
| `sql/integ-test/.../CalcitePPLSpathCommandIT` (default v2/Calcite route) | 16 / 16 | 16 / 16 (no regression) |
| `sandbox/qa/analytics-engine-rest/.../SpathCommandIT` (new) | n/a | 16 / 16 |
Baseline failure modes on the analytics-engine route:
- 15 tests: `OpenSearchProjectRule.annotateExpr` → `No backend supports
scalar function [JSON_EXTRACT_ALL] among [datafusion]`.
- 1 test (`testSimpleSpath`): `EngineBackedIndexer.acquireReader` →
`UnsupportedOperationException` (test-infra issue, fixed on the SQL
plugin side in a paired PR).
## What's in this PR
1. **`json_extract_all` Rust UDF** (`sandbox/plugins/analytics-backend-datafusion/rust/src/udf/json_extract_all.rs`).
~550 lines + 16 unit tests. Returns Arrow `Map<Utf8, Utf8>`; mirrors
`JsonExtractAllFunctionImpl`'s legacy contract (dot-path flatten, `{}`
array marker, `[a, b, c]` merge format for duplicate keys / arrays,
`"null"` literal for JSON nulls, malformed → empty map, top-level
scalar → NULL).
2. **SPI enum additions** in `analytics-framework`:
- `ScalarFunction.JSON_EXTRACT_ALL` enum constant.
- `FieldType.MAP` enum constant + `case MAP -> FieldType.MAP` in
`fromSqlTypeName`.
3. **Capability registrations** in `DataFusionAnalyticsBackendPlugin`:
- New `MAP_RETURNING_PROJECT_OPS` set (mirrors `ARRAY_RETURNING_PROJECT_OPS`)
registered with `FieldType.MAP`. Required because
`OpenSearchProjectRule.resolveScalarViableBackends` keys on the call's
return type, and JSON_EXTRACT_ALL's `MAP<VARCHAR, VARCHAR>` return
wouldn't match `SUPPORTED_FIELD_TYPES`.
- `STANDARD_FILTER_OPS` registered against `FieldType.MAP` so
`where doc.user.name = 'John'` (which references the underlying MAP
column through ITEM) survives the filter-rule's field-index-keyed
viability check.
- Adapter binding `ScalarFunction.JSON_EXTRACT_ALL → JsonExtractAllAdapter`.
4. **Substrait wiring**:
- `opensearch_scalar_functions.yaml` — entries for `json_extract_all`
and `map_extract`.
- `DataFusionFragmentConvertor.ADDITIONAL_SCALAR_SIGS` — function
mappings for both names.
- `JsonFunctionAdapters.JsonExtractAllAdapter` — name-mapping adapter.
5. **`ITEM(Map, key)` dispatch** in `ArrayElementAdapter`. PPL's
`result.user.name` lowers to `ITEM(JSON_EXTRACT_ALL(doc), 'user.name')`.
Two transforms for the MAP-input branch:
- Route to `map_extract` (DataFusion's native map accessor) instead of
`array_element`. Since `map_extract` returns `List<value>` (maps
permit duplicate keys), wrap the call in `array_element(..., 1)` to
project the singleton list back to a scalar.
- Coerce the lookup key (CHAR(N) literal) to VARCHAR before emission so
it unifies with the substrait `any1` type-variable binding the YAML
declares.
6. **`ArrowValues.MapVector` flattening** in `analytics-engine`. Arrow
`MapVector` is laid out as `List<Struct{key, value}>`, so
`MapVector.getObject(i)` returns a `JsonStringArrayList` of entry
structs rather than a proper map. Reassemble into a
`LinkedHashMap<String, Object>` (Text→String normalization on keys and
values) so the SQL-plugin response marshaller sees the same shape as a
legacy v2 `Map<String, Object>` column.
7. **`gradle/run.gradle`** — the `arrow-flight-rpc` plugin block now also
sets `opensearch.experimental.feature.transport.stream.enabled=true`,
so the analytics-engine + SQL-plugin co-install boots without the
duplicate-PPL-transport-handler Guice failure.
8. **QA-side `SpathCommandIT`** under
`sandbox/qa/analytics-engine-rest/...`. Mirrors `CalcitePPLSpathCommandIT`
one test method to one, sends queries via `POST /_analytics/ppl`, no
SQL-plugin dependency. Verifies the full spath surface end-to-end
(both modes, ITEM-on-MAP eval / where / stats / sort, edge cases).
Four small datasets under `resources/datasets/spath_{simple,auto,cmd,null}/`.
## Knock-on coverage
Every piece in this PR is reusable beyond `spath`:
- The MAP_RETURNING_PROJECT_OPS pattern + MAP filter capability are
generic for any future PPL function emitting a Calcite MAP RelDataType.
- `ArrayElementAdapter`'s ITEM-on-MAP branch + the `map_extract` YAML
entry handle every `result['key']` / `result.field` access on a map
column, not just spath's.
- `ArrowValues.MapVector` flattening unblocks any UDF returning
`Map<Utf8, Utf8>` from the analytics-engine route.
## Paired SQL plugin PR
The SQL plugin side has a test-infrastructure change to ensure the v2 /
Calcite IT's test indices get parquet-backed for the analytics-engine
compatibility run: opensearch-project/sql#5441.
## How to verify
```bash
# Start the cluster with all sandbox plugins
JAVA_HOME=/path/to/temurin-25 ./gradlew :run -Dsandbox.enabled=true \
-PinstalledPlugins="['opensearch-job-scheduler:3.7.0.0-SNAPSHOT', \
'arrow-flight-rpc', 'analytics-engine', 'parquet-data-format', \
'analytics-backend-datafusion', 'analytics-backend-lucene', \
'composite-engine', 'opensearch-sql-plugin:3.7.0.0-SNAPSHOT']"
# QA-side IT (no SQL plugin needed)
./gradlew :sandbox:qa:analytics-engine-rest:integTest \
-Dsandbox.enabled=true --tests "*SpathCommandIT"
# v2 / Calcite IT (in the SQL plugin checkout, with opensearch-project#5441 applied)
./gradlew :integ-test:integTestRemote \
-Dtests.rest.cluster=localhost:9200 \
-Dtests.cluster=localhost:9300 \
-Dtests.clustername=runTask \
-Dtests.analytics.force_routing=true \
-Dtests.analytics.parquet_indices=true \
--tests "org.opensearch.sql.calcite.remote.CalcitePPLSpathCommandIT"
```
Signed-off-by: Kai Huang <ahkcs@amazon.com>
Pairs with opensearch-project/OpenSearch#21664. Both PRs are required to move
CalcitePPLSpathCommandIToff 0 / 16 on the analytics-engine route.What the change does
CalcitePPLSpathCommandIT.init()was creating its four test indices by rawPUT /<idx>/_doc/Nrequests, which auto-creates the index via the default Lucene path. The analytics-engine compatibility run (-Dtests.analytics.parquet_indices=true) only injects parquet/composite settings insideTestUtils.createIndexByRestClient, so the raw-PUT indices were Lucene-only — and DataFusion fails withUnsupportedOperationException: acquireReader is not supported in EngineBackedIndexeron any Lucene-only index.The fix is one line per index: create the empty index up-front through the helper so the parquet toggle gets a chance to inject its settings, then let the existing doc PUTs populate it via dynamic mapping.
No mapping is declared (
nullmapping argument) — DataFusion handles dynamic mapping on parquet-backed composite indices just fine. Same pattern asCalciteEvalCommandITandCalciteFieldFormatCommandIT. No change for the v2 / Calcite path; the helper is a no-op when the parquet toggle isn't set.Pass rate
CalcitePPLSpathCommandIT-Dtests.analytics.force_routing=true -Dtests.analytics.parquet_indices=true)CalcitePPLSpathCommandIT