Skip to content

[RFC] MPP SQL/PPL Engine for OpenSearch #5380

@penghuo

Description

@penghuo

Summary

Replace the existing SQL/PPL → DSL translation in the OpenSearch SQL plugin with a distributed MPP (Massively Parallel Processing) query engine. Instead of translating queries to OpenSearch DSL and executing them through the search API, the engine reads Lucene segments directly and executes queries using a multi-stage, hash-shuffled distributed execution model.

Motivation

The current SQL/PPL plugin translates queries to OpenSearch DSL, which has fundamental limitations:

  1. No distributed joins — DSL has no join primitive. Joining two indices or performing self-joins requires application-level logic, nested/parent-child queries with limited semantics, or multiple round-trips stitched together by the coordinating node.
  2. No multi-stage aggregationCOUNT(DISTINCT x) over high-cardinality columns, multi-key GROUP BY, and nested aggregations require multiple passes that DSL can't express in a single request.
  3. Translation gap — many SQL/PPL constructs have no DSL equivalent, leading to unsupported query errors or incorrect results.

A distributed engine eliminates the translation layer entirely. SQL/PPL queries compile to physical execution plans that run directly on Lucene data, with the same operators (hash join, hash aggregate, sort-merge) used by dedicated analytics databases.

Architecture

The engine embeds Trino 442's execution runtime inside OpenSearch as a native plugin. It runs in the same JVM as OpenSearch data nodes — no external coordinator, no separate cluster.

┌─────────────────────────────────────────────────────────────────┐
│                     OpenSearch Cluster                          │
│                                                                 │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                    Node 1 (Coordinator)                  │   │
│  │                                                         │   │
│  │  ┌──────────────┐    ┌───────────────────────────────┐ │   │
│  │  │  REST :9200   │    │  Exchange HTTP :9500           │ │   │
│  │  │  POST /query  │    │  Task control + data shuffle  │ │   │
│  │  └──────┬────────┘    └──────┬────────────────────────┘ │   │
│  │         │                    │                           │   │
│  │  ┌──────▼────────────────────▼────────────────────────┐ │   │
│  │  │            Query Engine (Trino 442)                 │ │   │
│  │  │                                                     │ │   │
│  │  │  DispatchManager ─► SqlQueryExecution               │ │   │
│  │  │    │                                                │ │   │
│  │  │    ├─ Parse (SQL/PPL → AST)                         │ │   │
│  │  │    ├─ Analyze (resolve tables, type check)          │ │   │
│  │  │    ├─ Plan (logical → optimized → physical)         │ │   │
│  │  │    ├─ Fragment (split at exchange boundaries)       │ │   │
│  │  │    └─ Schedule (assign tasks to nodes)              │ │   │
│  │  │                                                     │ │   │
│  │  │  SqlTaskManager ─► TaskExecutor ─► Drivers          │ │   │
│  │  │    (executes tasks assigned to this node)           │ │   │
│  │  └─────────────────────────────────────────────────────┘ │   │
│  │                         │                                │   │
│  │  ┌──────────────────────▼──────────────────────────────┐ │   │
│  │  │              OpenSearch Connector                    │ │   │
│  │  │  Lucene doc_values → columnar blocks (no REST/DSL)  │ │   │
│  │  └─────────────────────────────────────────────────────┘ │   │
│  └─────────────────────────────────────────────────────────┘   │
│                                                                 │
│  ┌──────────────────────┐    ┌──────────────────────┐          │
│  │   Node 2 (Worker)    │    │   Node 3 (Worker)    │          │
│  │                      │    │                      │          │
│  │  Exchange HTTP :9500 │    │  Exchange HTTP :9500 │          │
│  │  SqlTaskManager      │    │  SqlTaskManager      │          │
│  │  OpenSearch Connector│    │  OpenSearch Connector│          │
│  └──────────────────────┘    └──────────────────────┘          │
└─────────────────────────────────────────────────────────────────┘

Every node runs the full engine stack. Any node can be the coordinator (the node that receives the REST request). Worker nodes execute tasks assigned by the coordinator's scheduler.

Query Lifecycle

1. REST Request

Client sends a SQL or PPL query to any node:

POST /_plugins/_sql/v1/query
{"query": "SELECT RegionID, COUNT(*) AS cnt FROM logs GROUP BY RegionID ORDER BY cnt DESC LIMIT 10"}

For PPL, the query is first translated to SQL via the Calcite-based PPL translator, then follows the same execution path.

2. Parse

Trino's SqlParser converts the SQL string into an AST (Statement). This is standard SQL parsing — no DSL translation.

3. Analyze

StatementAnalyzer resolves table and column references against the OpenSearch connector's metadata. The connector reads index mappings from OpenSearch cluster state and exposes them as table schemas. Type checking validates that operations are valid (e.g., COUNT(*) returns BIGINT, RegionID is INTEGER).

4. Logical Plan → Optimize → Physical Plan

LogicalPlanner builds a tree of plan nodes:

Output (limit 10)
  └─ TopN (ORDER BY cnt DESC, LIMIT 10)
       └─ Aggregation (GROUP BY RegionID, COUNT(*))
            └─ TableScan (logs)

The optimizer applies rules: predicate pushdown (WHERE clauses become Lucene queries), column pruning (only read RegionID from doc_values), partial aggregation (pre-aggregate on scan nodes before shuffle).

5. Fragment

PlanFragmenter splits the optimized plan at exchange boundaries into fragments. Each fragment is a self-contained execution unit:

Fragment 0 [SINGLE]          ← runs on coordinator only
  TopN(10) + Output
       ▲
       │ exchange (GATHER)
       │
Fragment 1 [HASH]            ← runs on all nodes, hash-partitioned by RegionID
  FinalAgg: SUM(partial_count) GROUP BY RegionID
  TopN(10) partial
       ▲
       │ exchange (HASH by RegionID)
       │
Fragment 2 [SOURCE]          ← runs on nodes hosting index shards
  TableScan(logs) → PartialAgg: COUNT(*) GROUP BY RegionID

6. Task Scheduling

The coordinator's PipelinedQueryScheduler assigns fragments to nodes:

Split creation: OpenSearchSplitManager reads the index routing table from cluster state. For each shard, it creates one split per Lucene segment, tagged with the host node address. A 4-shard index with ~5 segments per shard produces ~20 splits.

Task assignment: NodeScheduler assigns SOURCE fragment splits to the node hosting each shard (locality-aware — data never moves for the scan). HASH fragments run on all nodes. SINGLE fragments run on the coordinator.

For each assigned task, the coordinator sends an HttpRemoteTask request to the target node's exchange port (9500).

7. Execution

On each worker node, SqlTaskManager receives the task and creates a Driver pipeline — a chain of operators:

Driver pipeline (Fragment 2 on Node 2):
  OpenSearchPageSource (scan shard 1, segments 0-4)
    → FilterOperator (pushed-down predicate, if any)
    → PartialAggregationOperator (COUNT per RegionID)
    → PartitionedOutputOperator (hash-partition by RegionID, write to OutputBuffer)

Lucene direct read: OpenSearchPageSource calls IndexShard.acquireSearcher("omni") to get a Lucene IndexSearcher. It reads doc_values directly — no REST API, no JSON parsing, no _source field. Keyword fields produce DictionaryBlock (ordinal-based, dictionary shared per segment). Numeric fields produce direct array blocks. Batch size is 8192 rows per page.

Predicate pushdown: WHERE clauses are converted to Lucene queries by LuceneQueryBuilder and executed via BulkScorer — only matching documents are read.

8. Shuffle

Intermediate results flow between fragments via HTTP exchange on port 9500.

Node 1 (Fragment 2)                    Node 2 (Fragment 2)
  PartialAgg output                      PartialAgg output
  ┌──────────────────┐                   ┌──────────────────┐
  │ RegionID=1: 500  │                   │ RegionID=1: 300  │
  │ RegionID=2: 200  │                   │ RegionID=2: 150  │
  │ RegionID=3: 100  │                   │ RegionID=3: 400  │
  └────────┬─────────┘                   └────────┬─────────┘
           │ hash(RegionID) % 3                    │ hash(RegionID) % 3
           ▼                                       ▼
  ┌─────────────────────────────────────────────────────────┐
  │                    HTTP Exchange :9500                   │
  │  hash(1)%3=0 → Node 1    hash(2)%3=1 → Node 2         │
  │  hash(3)%3=2 → Node 3                                  │
  └─────────────────────────────────────────────────────────┘
           │                    │                    │
           ▼                    ▼                    ▼
  Node 1 (Fragment 1)  Node 2 (Fragment 1)  Node 3 (Fragment 1)
  FinalAgg RegionID=1  FinalAgg RegionID=2  FinalAgg RegionID=3
  COUNT=500+300=800    COUNT=200+150=350    COUNT=100+400=500

Pages are serialized in Trino's columnar wire format and transferred via HTTP GET/POST. The OutputBuffer on the producer side holds pages partitioned by hash bucket. The ExchangeClient on the consumer side polls for pages.

9. Result Collection

Fragment 0 (SINGLE, on coordinator) gathers final aggregates from all Fragment 1 tasks, applies the global TopN(10), and writes results to the output buffer. The REST handler reads from this buffer, serializes rows to JSON, and returns the HTTP response.

3-Node Example: End-to-End

Query: SELECT RegionID, COUNT(*) AS cnt FROM logs GROUP BY RegionID ORDER BY cnt DESC LIMIT 10

Index logs: 3 primary shards, one per node, ~15 Lucene segments total.

                        Client
                          │
                    POST /query
                          │
                          ▼
┌─────────────────────────────────────────────────────────────────┐
│ Node 1 (Coordinator)                                            │
│                                                                 │
│  Parse → Analyze → Plan → Fragment → Schedule                   │
│                                                                 │
│  Fragment 2 [SOURCE]: scan shard 0 (5 segments)                 │
│    → PartialAgg(COUNT per RegionID)                             │
│    → hash-partition output → Exchange :9500                     │
│                                                                 │
│  Fragment 1 [HASH]: receive hash partition 0                    │
│    → FinalAgg(SUM partial counts)                               │
│    → partial TopN(10)                                           │
│    → send to Fragment 0                                         │
│                                                                 │
│  Fragment 0 [SINGLE]: gather all → global TopN(10) → output    │
│    → REST response to client                                    │
└─────────────────────────────────────────────────────────────────┘
                     ▲           ▲
          exchange   │           │   exchange
          :9500      │           │   :9500
                     │           │
┌────────────────────┘           └────────────────────┐
│ Node 2 (Worker)                 Node 3 (Worker)     │
│                                                     │
│  Fragment 2 [SOURCE]:           Fragment 2 [SOURCE]: │
│    scan shard 1 (5 segs)          scan shard 2 (5 segs)
│    → PartialAgg                   → PartialAgg      │
│    → hash-partition               → hash-partition   │
│                                                     │
│  Fragment 1 [HASH]:             Fragment 1 [HASH]:   │
│    receive hash partition 1       receive hash partition 2
│    → FinalAgg                     → FinalAgg         │
│    → partial TopN(10)             → partial TopN(10) │
│    → send to coordinator          → send to coordinator
└─────────────────────────────────────────────────────┘

Timeline:

  1. Client sends query to Node 1
  2. Node 1 parses, plans, fragments into 3 fragments, schedules tasks on all 3 nodes
  3. All 3 nodes scan their local shards in parallel (Fragment 2), reading doc_values directly
  4. Partial aggregation reduces data volume before shuffle
  5. Hash shuffle redistributes partial results by RegionID across all 3 nodes
  6. Each node performs final aggregation on its hash partition (Fragment 1)
  7. Node 1 gathers results, applies global TopN(10), returns to client

No single node holds all data at any point. Each node processes only its partition of the hash space.

OpenSearch Connector: Lucene Direct Read

The connector bypasses the OpenSearch search API entirely. It reads Lucene segments directly via IndexShard.acquireSearcher():

OpenSearch Index (3 shards, 15 segments)
         │
         ▼
OpenSearchSplitManager
  → reads IndexRoutingTable from ClusterState
  → creates 1 split per segment (15 splits)
  → each split tagged with host node address
         │
         ▼
OpenSearchPageSourceProvider
  → IndicesService → IndexService → IndexShard
  → shard.acquireSearcher("omni") → Lucene IndexSearcher
  → LuceneQueryBuilder: TupleDomain predicates → Lucene BooleanQuery
         │
         ▼
OpenSearchPageSource
  → BulkScorer + Collector per segment
  → DocValuesReader reads columnar data:
      keyword → SortedSetDocValues → DictionaryBlock
      numeric → SortedNumericDocValues → direct array block
      timestamp → SortedNumericDocValues (millis→micros)
  → 8192 rows per Page batch

Why direct read matters:

  • No HTTP overhead (REST API adds ~1ms per request, multiplied by thousands of scroll requests)
  • No JSON serialization/deserialization of _source
  • Columnar doc_values read matches the engine's columnar execution model
  • Predicate pushdown executes as Lucene queries (BooleanQuery, TermQuery, RangeQuery) — only matching docs are read

Node Discovery

The engine uses OpenSearch's cluster state for node discovery — no separate service discovery:

  • ClusterStateNodeManager implements Trino's InternalNodeManager and OpenSearch's ClusterStateListener
  • On cluster state change, maps each DiscoveryNode → Trino InternalNode
  • Each node publishes its exchange port (default 9500) as a node attribute
  • Push-based: no polling, instant awareness of node joins/leaves

Integration with Existing SQL/PPL Plugin

The engine integrates as a new execution backend behind the existing REST API:

POST /_plugins/_sql          ─┐
POST /_plugins/_ppl          ─┤
                              ▼
                     SQL/PPL Plugin
                         │
              ┌──────────┴──────────┐
              ▼                     ▼
     DSL Translation          MPP Engine
     (existing path)          (new path)
              │                     │
              ▼                     ▼
     OpenSearch Search API    Lucene Direct Read

Routing logic: Queries are routed to the MPP engine when:

  • The query involves joins, subqueries, or complex aggregations that DSL cannot express
  • The user explicitly opts in via a query parameter or cluster setting
  • A cost-based decision determines the MPP path is more efficient

The DSL translation path remains available for simple queries where the search API is sufficient (single-index, simple aggregations, full-text search with scoring).

What Changes in os-sql Plugin

Component Current Proposed
Query parsing PPL/SQL → AST No change (reuse existing parsers)
Query planning Calcite logical plan → DSL Calcite logical plan → Trino physical plan
Execution OpenSearch search API (DSL) Trino distributed engine (Lucene direct)
Aggregation Single-node (coordinating node) Distributed (hash-partitioned across nodes)
Joins Not supported / limited Hash join, sort-merge join, broadcast join
Result format JSON response No change (same REST response format)

Open Questions

1. Routing policy — how to decide which queries go to MPP vs DSL?

Rule-based routing, transparent to the user. A query router sits inside the SQL/PPL plugin and classifies each request before execution. There is no query parameter, no opt-in flag, and no cost-based optimizer — the plugin inspects the logical plan and applies deterministic rules:

Rule Route
Contains JOIN, UNION, or correlated subquery MPP (DSL cannot express)
COUNT(DISTINCT high-cardinality-column) or multi-key GROUP BY MPP (DSL single-node aggregation is a bottleneck)
GROUP BY cardinality estimate above threshold (e.g. > 10K buckets) MPP
Nested aggregations, window functions, CTEs MPP
Full-text search with MATCH / QUERY_STRING requiring scoring DSL (BM25 scoring is a search-API strength)
Single-index scan with simple WHERE + LIMIT DSL (fast path, low overhead)
Simple GROUP BY with small cardinality (single-key, < 10K buckets) DSL (single bucket aggregation is efficient)

The router returns identical result shapes regardless of which backend runs the query. Users see no difference in API, response format, or semantics — only in what queries are now supported and how fast they run.

2. Resource isolation — preventing MPP queries from starving OpenSearch indexing/search

MPP execution runs in the same JVM as data nodes, so isolation is critical. Three layers:

Dedicated thread pools (hard isolation):

  • Task executor pool: separate from OpenSearch's search, write, and get pools. Sized as min(cores, 32) by default, configurable via plugins.omni.task.max_threads. Uses Trino's priority-based task scheduler with 1-second time quanta so long-running queries yield between scan batches.
  • Exchange I/O pool: Netty event loop dedicated to port 9500, isolated from OpenSearch's transport threads.
  • Lucene reader pool: IndexShard.acquireSearcher("omni") reuses the shard's searcher manager but does not block search API readers — searchers are reference-counted.

Memory limits (circuit breakers):

  • Per-query memory: plugins.omni.query.max_memory_per_node (default 2GB). Queries exceeding this are killed with EXCEEDED_LOCAL_MEMORY_LIMIT.
  • Global engine memory: plugins.omni.query.max_memory_total (default 30% of heap). All concurrent MPP queries combined cannot exceed this.
  • Integration with OpenSearch circuit breaker: MPP memory accounting feeds into the parent CircuitBreakerService so indexing/search see unified pressure and can shed load.

Concurrency caps:

  • plugins.omni.query.max_concurrent: hard ceiling on in-flight queries (default 20). Beyond this, queries queue in DispatchManager and reject with HTTP 429 after a timeout.
  • plugins.omni.query.max_queued: queue depth before rejecting (default 1000).

Admission control: During indexing spikes (detected via bulk-queue depth or IO saturation), the router can fall back MPP-eligible queries to DSL or degrade them to a lower priority pool. This is a knob, not enabled by default.

3. Security — integrating with the OpenSearch security plugin (FGAC, FLS, DLS)

MPP bypasses the search API — so every security check the security plugin applies at the search layer must be reapplied at the Lucene-read layer. The integration points:

Index-level access control (coarse-grained):

  • Before creating splits, OpenSearchSplitManager calls the security plugin to check indices:data/read privilege on the index for the authenticated user. Denied indices yield no splits — the query fails with SecurityException at planning time, identical to the search API.

Document-level security (DLS):

  • The security plugin provides a per-role Lucene Query that filters which documents a user can see.
  • LuceneQueryBuilder retrieves this query via ThreadContext and ANDs it with the predicate-pushdown query: BooleanQuery(FILTER: userPredicate, FILTER: dlsQuery).
  • Executed inside the same BulkScorer pass — no extra scan, same performance envelope.

Field-level security (FLS):

  • The security plugin provides a per-role set of included/excluded fields.
  • OpenSearchMetadata.getColumnHandles() filters the column list against this set before returning to the planner. Columns the user cannot see are never part of the plan.
  • If an FLS-hidden column appears in a SELECT or WHERE, the query fails at analysis time with column not found — same error surface as the search API.

Field masking:

  • The security plugin specifies masking rules (e.g. SHA-256 a specific field).
  • Applied in the OpenSearchPageSource read path: DocValuesReader.forKeyword() wraps the reader with a masking decorator when a field is flagged.

Request context propagation:

  • OpenSearch stores the authenticated user in ThreadContext. Trino's task threads do not inherit it automatically.
  • TaskExecutor is wrapped to copy the security ThreadContext onto every Driver thread before it executes a split. The security plugin's interceptors then see the correct user on every Lucene read.

Audit logging:

  • Every split read emits an audit event via the security plugin's audit API (GRANTED_PRIVILEGES, index, fields). Same events as DSL path, same format.

Cross-catalog concern: When the query joins an OpenSearch index with a Hive/Iceberg table, the Hive/Iceberg side is governed by its own ACLs (e.g. Lake Formation, IAM). OpenSearch security does not extend to external catalogs — users must be authorized on both sides independently.

4. Compatibility — migration path for existing SQL/PPL users

No breaking changes. All SQL/PPL queries that work today will continue to work on the MPP engine.

Guarantees:

  • API unchanged. REST endpoints (POST /_plugins/_sql, POST /_plugins/_ppl), request shape, and response JSON structure are identical. Existing clients (dashboards, SDKs, scripts) need no modification.
  • Grammar unchanged. The SQL/PPL parsers are reused as-is; there is no new dialect. Every statement that parses today still parses.
  • Semantics preserved. Result sets are identical — same rows, same column names, same types, same NULL handling. The MPP engine's SQL implementation follows ANSI SQL, and the PPL translator produces semantically equivalent plans.
  • Feature superset. Queries that currently fail with "unsupported operation" (joins, multi-index aggregations, window functions) now succeed. Nothing that worked stops working.

Migration is automatic: after upgrading the plugin, the router transparently directs eligible queries to the MPP engine. No query rewrites, no client changes, no configuration required.

A cluster setting plugins.omni.routing.mode = {auto, dsl_only, mpp_only} lets operators pin to the old DSL path during rollout or debugging. Default is auto.

5. Why Trino

Building a distributed MPP engine from scratch is a multi-year effort. Off-the-shelf options include Trino, Apache Spark, Apache DataFusion, Presto, Apache Drill, and Apache Impala. We chose Trino 442 because it is the only mature engine that satisfies every hard constraint for embedding inside OpenSearch.

Requirements

The engine must:

  1. Embed in the OpenSearch JVM — no external coordinator, no separate cluster, no cross-process serialization overhead. Runs alongside indexing/search on the same data nodes.
  2. Execute distributed plans on commodity clusters — multi-stage hash-shuffled execution, not a single-node fallback. Parallel scan, hash join, hash aggregation, distributed sort.
  3. Support SQL and PPL semantics — ANSI SQL surface broad enough that PPL can lower to it without gaps. Must handle joins, correlated subqueries, window functions, CTEs, set operations.
  4. Plug in custom connectors cleanly — a well-defined Connector SPI so we can wire Lucene doc_values reading directly, without patching the engine.
  5. Low per-query overhead — fragment scheduling and task startup in tens of milliseconds so short queries are not penalized.
  6. Permissive open-source license — Apache 2.0 or compatible, compatible with the OpenSearch license.
  7. Production-proven at scale — a track record at petabyte-scale deployments, with documented performance characteristics.

Why not the alternatives

Apache Spark — designed as a standalone cluster with its own driver/executor lifecycle. Embedding inside a long-lived JVM process is not a supported mode; the driver expects to own heap, thread pools, and classloading. Task startup overhead (seconds for short queries) is incompatible with interactive latency requirements. RDD/DataFrame APIs are lower-level than SQL planning; we would still need a SQL→plan stage. License (Apache 2.0) is fine, but the operational model is wrong.

Apache DataFusion (Rust) — excellent engine, but written in Rust. Embedding a Rust runtime inside OpenSearch's JVM requires JNI bindings, which introduce GC interaction, debugging complexity, and platform-specific native packaging. The columnar execution model (Arrow) would require a block-format bridge to Lucene readers. DataFusion's distributed layer (Ballista) is still experimental and separate-process by default. If we were starting on the JVM greenfield this would be our closest competitor, but the JVM embedding constraint rules it out.

Presto (PrestoDB, the fork Trino split from in 2019) — similar architecture to Trino, same origin. We evaluated both; Trino has a more active release cadence, better modern SQL features (e.g. improved window functions, lambda, row/array semantics), and a cleaner module structure for embedding. Presto would also work, but Trino is the better-maintained branch.

Apache Drill — JVM, SQL, distributed. But the project's development velocity has slowed significantly, the connector SPI is less uniform, and the execution runtime has known performance gaps versus Trino on analytical workloads (hash aggregation, complex joins). Last major release gap is multi-year.

Apache Impala — designed for HDFS/Kudu, tightly coupled to statestored and catalog services that cannot be embedded. Written in C++ for the backend; embedding would require a native bridge similar to DataFusion. Not a realistic option.

ClickHouse, DuckDB, Doric, Photon, etc. — either columnar storage engines (not query engines), native-code single-node systems (no distributed plans), or proprietary. Rejected at the "embeddable JVM MPP engine" filter.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    Status

    Not Started

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions