Skip to content

[Design] Runtime Filter (Dynamic Filter Pushdown) for MSE Hash Joins #200

Description

@UOETianleZhang

Runtime Filter (Dynamic Filter Pushdown) for MSE Hash Joins

TL;DR

For inner hash joins in the Multi-Stage Query Engine (MSE), materialize a compact Bloom filter over the build-side join keys and push it down to the probe-side segment scan. Non-matching rows are pruned during I/O instead of being fetched, shuffled across the network, and dropped at the join operator. This is a well-known MPP optimization (Trino, StarRocks, Apache Doris, Spark, DuckDB). Pinot doesn't have it today; this issue proposes the design.

Design follows the Trino / StarRocks / Doris pattern: a per-query, broker-mediated runtime-filter registry that producers (the HashJoinOperator in the intermediate stage) write to on build completion, and consumers (leaf-stage segment scans) poll between row batches. Timing is asynchronous and best-effort — late rows are pruned, rows already in flight when the filter arrives are not.

Revised on 2026-05-28 based on review feedback from @siddharthteotia (issue comments). The earlier "same-OpChain hand-off via OpChainExecutionContext" sketch was incorrect: build (intermediate stage) and probe leaf are always different OpChains, so cross-stage transport is now first-class instead of a follow-up.

1. Status & Scope

  • In scope (v1): Inner hash joins with single equi-condition; Bloom filter built on the build side; cross-stage transport via a broker-mediated registry; pushed into the leaf-stage segment scan as a new BLOOM_MEMBERSHIP predicate.
  • Timing model: Trino-style asynchronous / best-effort. Snowflake schema (FACT × N-DIM, multi-producer to one fact scan) supported day one.
  • Out of scope (follow-ups, no longer including cross-stage transport): Semi-join and anti-join runtime filters; multi-stage runtime-filter chains beyond simple snowflake; non-equi joins; Spark-style synchronous scheduling barrier; dictionary-encoded leaf path (pre-materialized matching dict-ID set).

2. Background

2.1 JOIN basic knowledge

A JOIN combines rows from two tables that share a value in some column (the join key). Example: a 10-billion-row orders table joined against a small customers table on customer_id to find VIP orders.

Image

2.2 What's a hash join?

There are two phases for hash join:

  1. Build phase — load the smaller side (the 3 VIPs) into an in-memory hash table keyed on customer_id.
Image
  1. Probe phase — stream the bigger side row-by-row, look each customer_id up in the hash table; emit matches, discard misses.
Image

2.3 What's the problem?

In a distributed engine like MSE, the probe side and the build side often run on different workers. Every probe-side row gets read from storage and sent across the network, only to be discarded at the join operator if its key isn't in the hash table. For a selective join (small build side, large probe side), that's a lot of wasted I/O and bandwidth.

2.4 What's a "runtime filter"?

A small "did the build side maybe contain this key?" check that the build side ships to the probe side at runtime, after the hash table is built. The probe side applies it as early as possible — ideally inside the segment scan, so non-matching rows are skipped during I/O. The check is a Bloom filter, which has zero false negatives (so the join stays correct) and a tunable false-positive rate (so it stays compact).

This is the same trick Spark, Trino, Presto, and DuckDB use. Pinot already does a related thing for SEMI joins via PinotJoinToDynamicBroadcastRule (broadcasts an IN-list); the new feature does it for INNER hash joins via a Bloom filter, transported via a registry rather than baked into the physical plan.

Image Image

3. Design

3.1 Three layers + a control plane

Layer Module Responsibility
Planner pinot-query-planner Detect eligible inner joins (single equi-condition, build-side row count below threshold). Insert a physical runtime-filter node that names the registry key (requestId, joinColumn).
Runtime pinot-query-runtime HashJoinOperator builds the Bloom on finishBuildingRightTable() and publishes to the registry. Leaf-scan OpChain polls the registry between row batches.
Leaf pinot-core Accept the Bloom as a BLOOM_MEMBERSHIP predicate inside the segment scan via FilterPlanNode.combineWithRuntimeBloomFilters (already landed in #197).
Control plane (new) broker Hosts the per-query RuntimeBloomFilterRegistry keyed by (requestId, columnName). Producers write, consumers read. Cleaned up at query end.

3.2 Key design decisions

  1. Inner joins, single equi-condition first. Mirrors the restriction in the existing PinotJoinToDynamicBroadcastRule. Composite keys and outer joins are out of scope for v1.
  2. Bloom filter as the filter form for v1. Compact (KB-level), zero false negatives. Implementation reuses Guava's BloomFilter (matching BloomFilterIdSet). The registry shape is polymorphic so IN-list / min-max can be added later without rework (cf. StarRocks / Doris multi-form pattern in §3.4).
  3. New Predicate.Type.BLOOM_MEMBERSHIP. Dedicated predicate keeps evaluator routing clean and avoids regressions in the existing IN path.
  4. QueryContext carries runtime Bloom filters per column. Leaf-stage FilterPlanNode AND-combines a synthetic BLOOM_MEMBERSHIP predicate per column into the segment-scan filter tree. Queries without runtime filters allocate nothing.
  5. Broker-mediated registry, not in-process hand-off. OpChainId = (requestId, virtualServerId, stageId) is stage-bound, every PhysicalExchange creates a stage boundary, and build / probe always live in different OpChains. There is no in-process channel that survives the stage boundary. The registry lives on the broker, producers publish(requestId, column, bloom), consumers get(requestId, column).
  6. Asynchronous / best-effort timing. Probe-side leaf scans start immediately. The leaf execution loop polls the registry between batches; when an entry appears, it merges into QueryContext.runtimeBloomFilters and the next batch is filtered. Rows already in flight when the filter arrives are not retroactively filtered — this is the same trade-off Trino / StarRocks / Doris accept. We instrument the "leak" (rows emitted before the filter arrived) so a future decision to add a synchronous barrier is data-driven.
  7. Multi-producer to single consumer day one. Snowflake schemas (FACT × N-DIM) work without rework: each producer HashJoin writes its own column key into the same per-query registry, the fact scan reads all columns it touches. The consumer-side Map<String, RuntimeBloomFilter> already supports this (Inject runtime filters at the leaf-stage filter boundary #197).
  8. Automatic triggering, hint-overridable. The optimizer rule fires when build-side RelMetadataQuery.getRowCount() is below pinot.broker.mse.runtime.filter.build.cardinality.threshold (default 1M). Per-join enable_runtime_filter hint can force on/off.
  9. Flag-gated, disabled by default. Same rollout pattern as the recent MSE fingerprint change. No behavior change for existing queries until the flag flips.

3.3 Data flow

The diagram below was sketched against the original same-OpChain design and is partially superseded — the new control-plane path (registry between intermediate-stage HashJoin and leaf-stage scan) replaces the in-OpChain hand-off shown here. The textual description in §3.2 is authoritative.

Image

3.4 Industry comparison

Surveyed along the dimensions that matter operationally:

System Filter forms Build trigger Transport Timing Probe app point
Trino / Presto Discrete value set, range, Bloom (recent) Cost-based; per-type size thresholds Coordinator-mediated registry (DynamicFilterService) Async / late, default ~5s wait Row filter + connector pushdown (Iceberg / Hive / Delta)
Spark DPP (SPARK-25540) Discrete partition values Plan rule (join column = partition column) Subquery materialization Plan-time barrier Partition prune at file scan
Spark RBF (SPARK-32268, 3.3+) Bloom Cost-based, off by default BroadcastExchange synchronous broadcast Plan-time barrier Row filter
DuckDB Hash-table membership (perfect) Automatic In-memory shared (single-node) Pipeline barrier Row filter
ClickHouse IN-list via GLOBAL JOIN Manual keyword Distributed broadcast Late / serial Row filter (plus static on-disk indices)
StarRocks / Doris IN-list, MIN/MAX, Bloom — in parallel from same join Cost-based, per-type thresholds FE-mediated registry (global), BE-local Async / late, default 1s wait Row filter + segment-level zone map / Bloom
Pinot today (SEMI JOIN only via PinotJoinToDynamicBroadcastRule) IN-list Hint + cardinality threshold Plan rewrite + mailbox broadcast Plan-time Segment-scan IN predicate
Pinot proposed v1 Bloom (extensible) Cost-based + per-join hint Broker-mediated registry Async / late Segment-scan BLOOM_MEMBERSHIP predicate (PR #196 + #197)

Three takeaways:

  • Async / registry is industry consensus for general-purpose distributed MPP engines. Trino, StarRocks, Doris all converged here independently. Our proposal sits in the same row.
  • Spark's synchronous barrier is cheap only because Catalyst already has plan-time barriers (DynamicPruningSubquery, BroadcastExchange with synchronous materialization). MSE has neither. Replicating Spark's coverage in Pinot is not a planner-rule patch — it would require building an OpChain-level scheduling primitive, on the order of quarters of work. That's a v2 decision, not a v1 default.
  • No SOTA system tries to retroactively prune the early-row leak. They all manage it with timeouts and instrumentation. We follow suit.

3.5 Why async (Trino-style) not synchronous (Spark-style)

Trino-style (proposed v1) Spark-style (deferred to v2)
Coverage Late rows pruned; early rows leak ~100% (probe blocks on Bloom)
Change to MSE scheduling None Significant — new OpChain-level barrier
Multi-producer (snowflake) Falls out for free Requires plan-shape rewrite per dim
Cost to add Weeks Quarters
Risk Low — additive to existing pipeline High — new scheduling primitive
Decision driver to v2 Measured leak rate in production

We instrument the leak (rows-emitted-before-Bloom-arrived vs. after) on Phase 7 so the v2 decision is data-driven.


4. Implementation plan

Total: 9 phases. PRs #195, #196, #197 are already up.

# Phase Status What lands
0 Broker config + join hint Done · #195 pinot.broker.mse.runtime.filter.enabled (default false), build-cardinality threshold, enable_runtime_filter hint.
1 BLOOM_MEMBERSHIP predicate primitive Done · #196 New predicate type, BloomMembershipPredicate, RuntimeBloomFilter, per-DataType evaluator factory, provider dispatch.
2 QueryContext map + leaf injection In review · #197 Map<String, RuntimeBloomFilter> on QueryContext; FilterPlanNode.combineWithRuntimeBloomFilters AND-injects BLOOM_MEMBERSHIP predicates. PR description to be revised to drop the OpChainExecutionContext framing.
3 RuntimeBloomFilterRegistry (new) To do Per-query, broker-resident registry keyed by (requestId, columnName). Producers call publish(...), consumers call get(...). Cleaned up at query end.
4 HashJoinOperator publish hook To do In finishBuildingRightTable(): iterate the PrimitiveLookupTable keys, build a RuntimeBloomFilter sized to the build cardinality, publish to the registry. Replaces the discarded "publish via OpChainExecutionContext" idea.
5 Leaf-scan consumer (best-effort poll) To do Between row batches in the leaf execution loop: poll the registry for columns touched by the scan, merge into QueryContext.runtimeBloomFilters. Phase 2's injection logic does the rest. Instrument rows-emitted-before-Bloom-arrived vs. after.
6 RuntimeFilterInsertRule planner rule To do Calcite rule detecting eligible inner equi-joins; inserts a physical node carrying the registry key. Mirrors PinotJoinToDynamicBroadcastRule's shape (sibling rule), registered in PhysicalOptRuleSet.
7 Snowflake integration test + benchmark To do FACT × 2-DIM test per @siddharthteotia's example. pinot-perf harness measures end-to-end latency and leak rate. Decision driver for Phase 8.
8 Spark-style synchronous barrier Tracked, not committed OpChain-level barrier so leaf scans wait on the registry. Closes the leak entirely but invasive. Only if Phase 7 numbers warrant.

4.1 Out of scope for v1 (follow-up issues to file once v1 lands)

  • Dictionary-encoded leaf path for BLOOM_MEMBERSHIP (pre-materialized matching dict-ID set)
  • Semi-join and anti-join runtime filters
  • Multi-stage runtime-filter chains beyond simple snowflake
  • Min/max range and IN-list filter forms (registry shape supports them — just deferring implementation)
  • Phase 8: Spark-style synchronous scheduling barrier

5. Testing strategy

  • Unit tests at every layer — predicate evaluator across SV types (Add RUNTIME_FILTER predicate primitive (Bloom) for MSE runtime filter #196); leaf injection no-op + injection cases (Inject runtime filters at the leaf-stage filter boundary #197); registry publish/get correctness; build-side publish (no false negatives); probe-side consumer polling; planner rule trigger / non-trigger matrix.
  • Planner golden plansPhysicalOptimizerPlans.json cases: INNER triggers, OUTER does not, build-too-large does not, hint-disabled does not, hint-forced does, snowflake (multi-producer) produces correct registry keys.
  • Snowflake integration test — TPC-H-shaped FACT × 2-DIM in pinot-integration-tests: two HashJoins in different intermediate stages both publish to the same fact-scan registry, leaf scan pulls both Blooms. Assert leaf-stage emitted row count drops vs. flag-off baseline.
  • Performance benchmark with leak instrumentationpinot-perf measures latency delta and the (rows-before-Bloom / total-rows) ratio. Target ≥ 20% latency improvement on a representative selective join; leak ratio is the v2 decision driver.

6. Rollout

  1. Land all phases behind pinot.broker.mse.runtime.filter.enabled=false (no behavior change for existing queries).
  2. Enable in a staging cluster, monitor latency, CPU, and the leak ratio on join-heavy workloads.
  3. Enable cluster-wide once stable; consider per-table overrides.
  4. If staging leak ratio is acceptable (TBD threshold), stay on async. If unacceptable on representative workloads, open the Phase 8 discussion.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Fields

    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions