Add RUNTIME_FILTER predicate primitive (Bloom) for MSE runtime filter#196
Add RUNTIME_FILTER predicate primitive (Bloom) for MSE runtime filter#196UOETianleZhang wants to merge 7 commits into
Conversation
Lay the leaf-stage primitive used by the upcoming MSE hash-join runtime filter (dynamic filter pushdown). Independent of the broker configuration scaffolding in #195 -- this PR only adds the predicate plumbing and an in-process bloom filter wrapper; no rule registration, no operator wiring, and the predicate has no producer yet. Added: * Predicate.Type.BLOOM_MEMBERSHIP -- new enum value, inclusive semantics. * BloomMembershipPredicate (pinot-common) -- carries a RuntimeBloomFilter and an ExpressionContext. Reference equality on the filter; two distinct instances are conceptually different runtime filters even if their bit arrays match. * RuntimeBloomFilter (pinot-common) -- in-memory bloom wrapper mirroring the funnel choices in BloomFilterIdSet (INT/FLOAT share an int funnel, LONG/DOUBLE share a long funnel, plus STRING and BYTES funnels). Lives in pinot-common so predicate classes can reference it; serialization for cross-worker transport is intentionally deferred. * BloomMembershipPredicateEvaluatorFactory (pinot-core) -- per-DataType inner evaluators (INT/LONG/FLOAT/DOUBLE/STRING/BYTES), each delegating applySV to the matching RuntimeBloomFilter.mightContain overload. The factory fails fast on dataType mismatch so misconfigured wiring shows up at construction rather than producing garbage at query time. * PredicateEvaluatorProvider dispatch -- raw-value branch routes BLOOM_MEMBERSHIP to the new factory. Dictionary-based branch throws UnsupportedOperationException with a clear message; a dict-id pre-materialization variant is deferred until we see workloads needing it. Tests (TestNG): * RuntimeBloomFilterTest (12 cases) -- constructor rejects unsupported data types; no false negatives for INT/LONG/FLOAT/DOUBLE/STRING/BYTES; observed false-positive rate stays within 5x of configured fpp; byte[] content equality hashes identically; raw-bits float encoding documented. * BloomMembershipPredicateEvaluatorFactoryTest (13 cases) -- factory rejects type mismatch and unsupported types; evaluator metadata (predicate-type, data-type, dictionary-based, exclusive) is correct; no false negatives across all six SV types; INT false-positive rate within tolerance; base-class default applySV overloads throw for the wrong type so callers cannot accidentally call applySV(long) on an INT-typed bloom; MV inherits the correct semantics via the base loop; PredicateEvaluatorProvider routes raw-value BLOOM_MEMBERSHIP to the factory; provider rejects the dictionary-based path. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Adds the BLOOM_MEMBERSHIP predicate primitive and in-process Bloom filter wrapper needed to support upcoming MSE hash-join runtime filtering, plus raw-value predicate evaluation plumbing and unit tests.
Changes:
- Introduces
Predicate.Type.BLOOM_MEMBERSHIP,BloomMembershipPredicate, andRuntimeBloomFilterinpinot-common. - Adds
BloomMembershipPredicateEvaluatorFactoryand routes raw-value evaluation throughPredicateEvaluatorProvider(dictionary-based explicitly unsupported for now). - Adds unit tests covering Bloom filter behavior and evaluator/provider wiring.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/PredicateEvaluatorProvider.java | Routes raw-value BLOOM_MEMBERSHIP to the new evaluator factory; rejects dictionary-based usage. |
| pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/BloomMembershipPredicateEvaluatorFactory.java | Implements per-type raw-value evaluators delegating to RuntimeBloomFilter.mightContain(...). |
| pinot-core/src/test/java/org/apache/pinot/core/operator/filter/predicate/BloomMembershipPredicateEvaluatorFactoryTest.java | Adds coverage for factory wiring, per-type membership, overload behavior, MV behavior, and provider dispatch. |
| pinot-common/src/main/java/org/apache/pinot/common/request/context/predicate/Predicate.java | Adds new Predicate.Type.BLOOM_MEMBERSHIP enum value and documents semantics. |
| pinot-common/src/main/java/org/apache/pinot/common/request/context/predicate/BloomMembershipPredicate.java | Adds a predicate that carries a RuntimeBloomFilter with reference-based equality semantics. |
| pinot-common/src/main/java/org/apache/pinot/common/request/context/predicate/RuntimeBloomFilter.java | Adds an in-memory Bloom wrapper mirroring BloomFilterIdSet funnel choices and raw-bits encoding. |
| pinot-common/src/test/java/org/apache/pinot/common/request/context/predicate/RuntimeBloomFilterTest.java | Adds tests for supported types, false-negative behavior, and bytes hashing semantics. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
* RuntimeBloomFilter class Javadoc: drop the inaccurate "calling the wrong overload does not throw" claim. The underlying Guava BloomFilter is constructed with a single Funnel matched to the DataType, so a wrong-typed call may throw ClassCastException when the boxed value cannot be passed to that funnel, or return a meaningless answer. Callers must not rely on either outcome. * FunnelType Javadoc: replace "equal float values hash identically" with the accurate raw-bits contract. Values with the same IEEE-754 bit pattern hash to the same slot; -0.0f vs 0.0f and distinct NaN payloads hash differently. * add(float) inline comment: same rewording -- raw-bits encoding, not semantic equality. * RuntimeBloomFilterTest.floatDifferentBitPatternsAreDistinct: replace the tautological assertTrue(x || !x) with a meaningful pin-down of the "same bits -> same hash" guarantee (including a non-canonical NaN). The negative-zero call is now a smoke-only API exercise. * Drop stale internal references from Javadoc/comments across the predicate, factory, and provider files; keep only the technical description that survives code review on its own. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Can this be reviewed and merged independently of PR 197. Is this a pre-cursor to that ? |
Yes this one is independent of 197 |
|
I think we should not strictly tie this to Bloom Filter.
So I suggest using something like |
| // Reference equality on the bloom filter is intentional: two distinct RuntimeBloomFilter | ||
| // instances are conceptually different runtime filters even if they happen to contain the | ||
| // same keys. There is no cheap structural equality on Guava BloomFilter that we want to lean on. | ||
| return Objects.equals(_lhs, that._lhs) && _bloomFilter == that._bloomFilter; |
There was a problem hiding this comment.
Where are we relying on reference equality check ?
There was a problem hiding this comment.
we weren't actually relying on it anywhere, the defensive equals / hashCode override here was boilerplate without a real consumer.
| // variant can be added once we have measured workloads that need it. Throw clearly so | ||
| // accidental wiring during planner work is caught early. | ||
| throw new UnsupportedOperationException( | ||
| "Dictionary-based BLOOM_MEMBERSHIP predicate is not yet supported"); |
There was a problem hiding this comment.
Is this call being made to intentionally keep the scope limited (while we know that it will certainly be a useful improvement) OR we don't know if dictionary will be useful for the runtime path.
I am ok with the former as we initially have a bigger fish to fry for the overall design of this feature. However if it's the latter then I am curious why we think it won't be useful generally.
We are building the bloom on raw values of the build side of the join and then sending that to leaf stage as a filter for the large fact table scan so that we read fewer fact rows into the JOIN.
Say the fact has 1 billion rows and the join key column (for which bloom has been pushed down) is dictionary encoded
The non dictionary based evaluator will do something like
for each of 1B rows {
dictId = column.readDictId(docID) --> 1B reads of forward index
rawValue = dictionary.lookup(dictId) --> 1B lookup of dictionary
if bloom.mightContain(rawValue). --> 1B reads of bloom
// do something ....
}
Now if you leverage the dictionary instead, let's assume cardinality of 1M for the join key column in fact, we can potentially do something like ..
matchingDictIds = empty bitmap
for each of 1M dict entries. --> 1M reads of dictionary
rawValue = dictionary.getValue(i). --> retrieve raw value from dictionary
if bloom.mightContain(rawValue): --> 1M reads of bloom
matchingDictIds.set(i)
project ....
The difference is obvious ....
FWIW, the basic building blocks to write a dictionary based predicate evaluator are already there for the usual predicates so it should not be that much of a net new work but I don't want to force include it in the scope.
There was a problem hiding this comment.
Yes it was planned in future PRs but since you have mentioned this - I have added the implementation so it can be self-contained. Feel free to review the latest version!
Refactor the v1 predicate primitive so the predicate type, the predicate
class, and the leaf-stage dispatch path are agnostic to the concrete
filter form (Bloom today; min-max range and IN-list later). Adding a
future filter form is one new RuntimeFilter implementation + one new
per-form evaluator factory + one new case arm in the dispatcher -- no
new Predicate.Type, no churn to FilterPlanNode or
PredicateEvaluatorProvider, no new wire format.
What changed:
* Predicate.Type.BLOOM_MEMBERSHIP -> RUNTIME_FILTER (umbrella enum value).
* New RuntimeFilter interface in pinot-common with Kind {BLOOM} and
DataType. Future Kinds (MIN_MAX, IN_LIST) slot in as additional enum
values without churning anything else.
* RuntimeBloomFilter -> BloomRuntimeFilter, now implements RuntimeFilter
and reports Kind.BLOOM.
* BloomMembershipPredicate -> RuntimeFilterPredicate; holds the abstract
RuntimeFilter rather than the concrete BloomRuntimeFilter. Dropped the
defensive equals/hashCode override -- no current caller relies on
structural equality, and Object identity is the right contract for a
predicate that wraps a freshly-built filter.
* BloomMembershipPredicateEvaluatorFactory -> BloomRuntimeFilterEvaluatorFactory
(Bloom-specific implementation, reached only after dispatch).
* New RuntimeFilterPredicateEvaluatorFactory in pinot-core -- the
top-level dispatcher that routes on Kind. Adding a new Kind = one new
arm here plus a sibling per-form factory.
* PredicateEvaluatorProvider switch arms updated to route
Predicate.Type.RUNTIME_FILTER to the new top-level factory; the
dictionary-based branch still throws (unchanged semantics, new message).
Tests:
* RuntimeBloomFilterTest -> BloomRuntimeFilterTest; added a new
getKindReportsBloom() case (13 cases total, +1).
* BloomMembershipPredicateEvaluatorFactoryTest ->
BloomRuntimeFilterEvaluatorFactoryTest; all 13 cases preserved,
including the two provider-dispatch cases (now testing
Predicate.Type.RUNTIME_FILTER routing).
* New RuntimeFilterPredicateEvaluatorFactoryTest covers the Kind-based
dispatcher (2 cases).
* Total: 28 tests, all passing.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Fixes spotless check; alphabetizes the predicate-package imports introduced by the RUNTIME_FILTER rename. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
The dict-based path probes the column dictionary once at construction time: every dictId whose value passes BloomRuntimeFilter#mightContain lands in an IntSet, and applySV(int) answers from that set thereafter. Standard _alwaysTrue / _alwaysFalse short-circuits are populated based on the count of matching dict ids. False-positive semantics carry over: a dictId may end up in the matching set due to a Bloom false positive, but no true match will be excluded. Cost is O(dictionary.length()) Bloom probes at construction; callers are responsible for choosing dict-based only when it pays off versus the raw scan path. Added five tests covering hit/miss mix, empty dictionary -> always-false, full-coverage dictionary -> always-true, dataType-mismatch guard, and sort-ascending invariant on the returned matching dict ids. Replaced the previous "provider rejects dict-based runtime filter" negative test with a positive routing test through PredicateEvaluatorProvider. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
BloomRuntimeFilter: drop the package-private FunnelType enum and _funnelType field. The field was declared "visible for tests" but never read; with the dataType already stored separately, this state has no remaining purpose. RuntimeFilterPredicateEvaluatorFactoryTest: replace the placeholder unsupportedKindThrows test (a tautology with only one Kind defined) with a direct test that the umbrella dispatcher routes BLOOM through the dictionary-based path. Renamed the existing test to make the raw-value path explicit in the name. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Sounds good, it totally makes sense. I have added some factories and interfaces to make it able to be extended. |
|
We should add a TODO for NULL Support. By "Support" I mean the contract should be documented in code and user docs. For now, it is fine to not worry about NULLs and focus on rest of the feature. |
|
Please update the PR description clearly calling out the types that are supported in V1 of this feature and add a TODO for types we are not planning as of now. Per my comment on #195 regarding planner rule, I think the type enforcement / check should happen in the rule. |
| // additional Kind values without a new Predicate.Type or new dispatch chain. Inclusive | ||
| // semantics: a row matches when the filter says "maybe present". False positives are allowed | ||
| // (downstream join still performs the exact match); false negatives must not occur. | ||
| RUNTIME_FILTER; |
There was a problem hiding this comment.
The enum itself does not go on the wire right ?
There was a problem hiding this comment.
Correct. Predicate.Type does not go on the wire. It's a server-local, query-engine-internal representation.
There was a problem hiding this comment.
RUNTIME_FILTER in particular is never parsed from a query at all. it's injected in-process into the leaf filter tree, so it has no wire representation by construction.
NULLs are out of scope for v1: the build side carries only non-null join keys and the probe side treats NULLs as non-matching, which is correct for inner equi-joins. Add the authoritative contract plus a TODO(NULL support) for the end-to-end definition (code + user docs) on the RuntimeFilter interface, and a pointer from Predicate.Type.RUNTIME_FILTER. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Done in db0960f. |
Updated the PR description with a "Supported data types (v1)" table ( |
Summary
Issue: #200
Leaf-stage primitive for the upcoming MSE hash-join runtime filter (dynamic filter pushdown). This PR adds only the predicate plumbing and an in-process Bloom filter wrapper — no rule registration, no operator wiring, and the predicate has no producer yet. Independent of the broker-config scaffolding PR; either order works.
What's added
pinot-commonPredicate.Type.RUNTIME_FILTER— umbrella predicate type for the runtime-filter feature, inclusive semantics, allows false positives but never false negatives. The concrete filter form is encoded inside the carriedRuntimeFilterso additional forms (min/max, IN-list) slot in without a newPredicate.Type.RuntimeFilterinterface +Kindenum — abstracts the filter form (Kind.BLOOMfor v1; min/max and IN-list are future Kinds). Carries the columnDataTypeso the evaluator can validate the wiring at construction.RuntimeFilterPredicate— carries aRuntimeFilterand anExpressionContext. Uses base-class reference equality (a runtime filter is identified by the build-side instance that produced it, not by the bit pattern).BloomRuntimeFilter— in-memoryRuntimeFilterimplementation backed by Guava'sBloomFilter, mirroring the funnel choices inBloomFilterIdSet(INT/FLOAT share an int funnel via raw-bits encoding, LONG/DOUBLE share a long funnel via raw-bits encoding, plus STRING and BYTES funnels). Lives in pinot-common so predicate classes can reference it. Serialization for cross-worker transport is intentionally deferred.pinot-coreRuntimeFilterPredicateEvaluatorFactory— umbrella dispatcher. Routes both raw-value and dictionary-based evaluation to a form-specific factory based onRuntimeFilter.Kind. Adding a new Kind means adding one case arm here plus the per-form factory — no churn toPredicateEvaluatorProviderorFilterPlanNode.BloomRuntimeFilterEvaluatorFactory— Bloom-specific factory. Raw-value path: per-DataTypeinner evaluators (INT/LONG/FLOAT/DOUBLE/STRING/BYTES), each delegatingapplySVto the matchingBloomRuntimeFilter.mightContainoverload. Dictionary-based path: probes the column dictionary once at construction time, populates anIntSetof matching dict ids, and answersapplySV(int)from that set; sets_alwaysTrue/_alwaysFalsebased on the count of matching dict ids. Fails fast on dataType mismatch on both paths.PredicateEvaluatorProvider— dispatch updated so both branches routeRUNTIME_FILTERto the umbrella factory.Supported data types (v1)
The Bloom filter form supports the following stored data types:
INT,FLOATLONG,DOUBLESTRINGBYTESAny other stored type (
BIG_DECIMAL,BOOLEAN,TIMESTAMP,JSON,MAP/LIST/complex) fails fast atBloomRuntimeFilterconstruction. TODO: evaluate whetherBOOLEAN/TIMESTAMP(stored asINT/LONG) andBIG_DECIMALare worth supporting once the end-to-end feature lands; they are intentionally out of scope for now.Type enforcement lives in the planner rule, not here. This primitive only fails fast as a backstop. The eligibility check that decides whether a join key's data type can carry a runtime filter belongs in the
RuntimeFilterInsertRule(per review on #195) so unsupported types never reach the leaf stage.NULL handling (v1)
NULLs are out of scope for v1 and are not modeled: the build side adds only non-null join keys and the probe side treats NULLs as non-matching (safe for inner equi-joins, where a NULL key never matches). The authoritative contract and a
TODO(NULL support)for the end-to-end definition (code + user docs) live on theRuntimeFilterinterface; the planner rule must not push a runtime filter where NULLs must survive the scan.Out of scope (follow-up changes)
QueryContextinjection at the leaf-stage boundaryHashJoinOperatorbuild-side Bloom constructionRuntimeFilterOperatorTesting Done
BloomRuntimeFilterTest(13 cases): constructor rejects unsupported data types; reportsKind.BLOOM; no false negatives for INT/LONG/FLOAT/DOUBLE/STRING/BYTES; observed false-positive rate within 5× of configured fpp; byte[] content equality hashes identically; raw-bits float encoding pinned down (including a non-canonical NaN).BloomRuntimeFilterEvaluatorFactoryTest(18 cases): factory rejects type mismatch and unsupported types on both raw-value and dictionary paths; evaluator metadata correct (predicate-type, data-type, dictionary-based, exclusive); no false negatives across all six SV types on the raw-value path; INT false-positive rate within tolerance; base-class defaultapplySVoverloads throw for the wrong type so callers cannot accidentally callapplySV(long)on an INT-typed bloom; MV inherits correct semantics via the base loop; dictionary-based evaluator returns the correct matching dict ids (sorted ascending), short-circuits to_alwaysFalseon an empty dictionary, short-circuits to_alwaysTruewhen every dict entry passes the Bloom;PredicateEvaluatorProviderroutes raw-value and dictionary-basedRUNTIME_FILTERto the factory.RuntimeFilterPredicateEvaluatorFactoryTest(2 cases): umbrella dispatcher routesKind.BLOOMto the Bloom factory on both raw-value and dictionary code paths.mvn -pl pinot-common test -Dtest=BloomRuntimeFilterTest→ 13/13mvn -pl pinot-core test -Dtest=BloomRuntimeFilterEvaluatorFactoryTest,RuntimeFilterPredicateEvaluatorFactoryTest→ 20/20