-
Notifications
You must be signed in to change notification settings - Fork 1.5k
perf(sql): speed up keyed parallel GROUP BY in case of high cardinality count_distinct() #6432
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
perf(sql): speed up keyed parallel GROUP BY in case of high cardinality count_distinct() #6432
Conversation
…ty count_distinct()
WalkthroughThis PR implements a group-by optimization initiative: lowering the parallel sharding threshold from 100,000 to 10,000, adding cardinality tracking to group-by functions, replacing list-based accumulation with set-based merging in count-distinct implementations, removing the workerCount parameter from distinct-count functions, and expanding AsyncGroupByAtom from 128 to 256 shards with enhanced sharding decision logic. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes
Possibly related PRs
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 ast-grep (0.40.0)core/src/main/java/io/questdb/PropServerConfiguration.javacore/src/test/java/io/questdb/test/PropServerConfigurationTest.java[] Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
[PR Coverage check]😍 pass : 130 / 164 (79.27%) file detail
|
|
@CodeRabbit review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (8)
core/src/main/java/io/questdb/griffin/engine/functions/groupby/CountDistinctUuidGroupByFunction.java (1)
145-173: Review merge strategy and cardinality implications.The merge logic now uses eager set-based merging instead of lazy list accumulation. The size-based strategy (Line 163-171) merges the smaller set into the larger one for efficiency.
Question: Should
cardinalitybe updated during merge operations? Currently,cardinalityonly tracks additions duringcomputeFirst/computeNextbut not duringmerge. If cardinality statistics are used for sharding decisions across parallel workers, the merge phase should not affect per-worker statistics. Please verify this is the intended behavior.core/src/main/java/io/questdb/griffin/engine/functions/groupby/CountDistinctLong256GroupByFunction.java (1)
148-176: Verify cardinality behavior during merge operations.The merge logic follows the same pattern as CountDistinctUuidGroupByFunction with size-based merging. The same question applies: should
cardinalitybe updated during merge, or is it intentionally tracking only per-worker additions? Based on the PR context, this appears intentional for sharding decisions, but please confirm.core/src/main/java/io/questdb/griffin/engine/table/AsyncGroupByAtom.java (3)
72-75: Shard constants and hash-to-shard mapping assume power-of-twoNUM_SHARDS.Using
NUM_SHARDS = 256withNUM_SHARDS_SHR = Long.numberOfLeadingZeros(NUM_SHARDS) + 1andhashCode >>> NUM_SHARDS_SHRgives 8 high bits and maps cleanly into 256 shards. This is correct as long asNUM_SHARDSstays a power of two; if someone changes it to a non-power-of-two later, the distribution becomes incorrect.Consider either documenting this assumption explicitly or adding a defensive check (e.g.,
assert (NUM_SHARDS & (NUM_SHARDS - 1)) == 0;) to prevent accidental misconfiguration in the future.Also applies to: 674-676, 740-745
156-169: MakeupdateShardedHint()robust to partially initializeddestShards.
destShardsis sized viasetPos(NUM_SHARDS)and lazily populated inreopenDestShard(). InupdateShardedHint(), whensharded == trueyou do:for (int i = 0; i < NUM_SHARDS; i++) { totalShardSize += destShards.getQuick(i).size(); }This assumes every shard index has gone through
reopenDestShard()(i.e.,destShards[i]is non-null and open). If any shard is never merged (e.g., future changes avoid scheduling merges for empty shards), this will NPE.Even if current scheduling guarantees all shards are merged, this is a subtle invariant. To make this code future-proof and tolerant of partial merges, you could guard against null/closed maps:
- if (sharded) { - for (int i = 0; i < NUM_SHARDS; i++) { - totalShardSize += destShards.getQuick(i).size(); - } - } else { + if (sharded) { + for (int i = 0; i < NUM_SHARDS; i++) { + final Map shard = destShards.getQuick(i); + if (shard != null && shard.isOpen()) { + totalShardSize += shard.size(); + } + } + } else { totalShardSize = ownerFragment.map.size(); }This keeps the heuristic semantics while avoiding a fragile assumption about merge scheduling.
Also applies to: 241-255, 609-628
509-516:reopen()should probably synchroniseshardedwithshardedHintto avoid sticky sharded mode.Currently:
public void reopen() { if (shardedHint) { // Looks like we had to shard during previous execution, so let's do it ahead of time. sharded = true; } // maps opened lazily... }If
shardedwas set totruebymaybeEnableSharding()in a previous run andshardedHintlater becomesfalse(low cardinality run),reopen()never forcesshardedback tofalseunlessclear()is called in between. That can leave the atom permanently in sharded mode for subsequent executions, which is at least a performance concern.If the intended lifecycle is “reuse atom across queries and let
shardedHintdrive pre‑sharding”, then makingreopen()do:- public void reopen() { - if (shardedHint) { - sharded = true; - } - } + public void reopen() { + // Next execution starts according to the last hint. + sharded = shardedHint; + // Maps will be opened lazily by worker threads. + }would keep
shardedaligned with the latest decision. If, on the other hand, callers always invokeclear()between executions, it would be good to document that expectation clearly.core/src/main/java/io/questdb/griffin/engine/functions/groupby/CountDistinctLongGroupByFunction.java (1)
156-212: Merge logic for long distinct sets looks correct; consider clarifying size‑based heuristicThe updated
mergecovers all combinations:
- Treats
srcCount == 0 || LONG_NULLas “no contribution”, anddestCount == 0 || LONG_NULLas “take src as‑is”, which aligns withsetEmpty/setNullinvariants.- Correctly handles inline↔inline, inline↔set, and set↔set cases, always ending with a consistent
(count, ptr)pair.- Uses
setA.size() > (setB.size() >>> 1)to decide whether to merge B into A or A into B, with the comment clarifying the “significantly smaller” case in theelsebranch.Functionally this looks sound. For readability/maintainability you might want to:
- Explicitly document the invariant that
count == Numbers.LONG_NULLimplies the pointer is 0 (so skipping onLONG_NULLcan never drop a non‑empty set).- Optionally rephrase the condition to make the “merge smaller set into larger set” intention clearer (e.g., by computing both sizes once and naming them).
These are clarity tweaks only; no blocking issues from the current implementation.
core/src/main/java/io/questdb/griffin/engine/functions/groupby/AbstractCountDistinctIntGroupByFunction.java (2)
43-50: Clarify lifecycle and ownership ofcardinalitystat
cardinalityis only touched viagetCardinalityStat()andresetStats(), but this base class doesn’t update it itself. That’s fine if subclasses are responsible for incrementing it and callers consistently invokeresetStats()between runs, but it’s not obvious from here.Consider:
- Adding a short Javadoc or comment documenting that subclasses must maintain
cardinality, and whenresetStats()is expected to be called, or- Optionally tying
resetStats()intoclear()if the intended lifecycle is “one query run per clear”, to avoid stale stats if a caller forgets to reset.This would make the new stat API harder to misuse by future implementations.
Also applies to: 63-66, 170-173
93-101: Merge logic for inline vs set-backed state looks correct; consider small documentation tweakThe updated
merge()correctly handles the three representations:
count == 0/LONG_NULL: treated as empty.count == 1: second column as inlined value.count > 1: second column asGroupByIntHashSetpointer, withsetA/setBused to access and merge sets.The early-return on empty
srcCount, the fast path whendestCountis empty, and the inline-to-set promotion paths all look consistent with that representation.For the set–set branch:
setA.of(destPtr); setB.of(srcPtr); if (setA.size() > (setB.size() >>> 1)) { setA.merge(setB); ... } else { // Set A is significantly smaller than set B, so we merge it into set B. setB.merge(setA); ... }The heuristic (“merge smaller into larger when dest is at most half the size of src”) is reasonable. To aid future readers, you might slightly expand the comment to spell out that the else branch is taken when
setA.size() <= setB.size() / 2and in that casesetAis the smaller set being merged intosetB.Otherwise, the merge-direction logic and updates of
countand pointer after each branch look sound.Also applies to: 112-168
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (29)
core/src/main/java/io/questdb/PropServerConfiguration.java(1 hunks)core/src/main/java/io/questdb/cairo/map/Unordered2Map.java(2 hunks)core/src/main/java/io/questdb/cairo/map/Unordered2MapRecord.java(2 hunks)core/src/main/java/io/questdb/griffin/engine/functions/GroupByFunction.java(2 hunks)core/src/main/java/io/questdb/griffin/engine/functions/groupby/AbstractCountDistinctIntGroupByFunction.java(4 hunks)core/src/main/java/io/questdb/griffin/engine/functions/groupby/CountDistinctIPv4GroupByFunction.java(4 hunks)core/src/main/java/io/questdb/griffin/engine/functions/groupby/CountDistinctIPv4GroupByFunctionFactory.java(2 hunks)core/src/main/java/io/questdb/griffin/engine/functions/groupby/CountDistinctIntGroupByFunction.java(4 hunks)core/src/main/java/io/questdb/griffin/engine/functions/groupby/CountDistinctIntGroupByFunctionFactory.java(2 hunks)core/src/main/java/io/questdb/griffin/engine/functions/groupby/CountDistinctLong256GroupByFunction.java(6 hunks)core/src/main/java/io/questdb/griffin/engine/functions/groupby/CountDistinctLong256GroupByFunctionFactory.java(2 hunks)core/src/main/java/io/questdb/griffin/engine/functions/groupby/CountDistinctLongGroupByFunction.java(8 hunks)core/src/main/java/io/questdb/griffin/engine/functions/groupby/CountDistinctLongGroupByFunctionFactory.java(2 hunks)core/src/main/java/io/questdb/griffin/engine/functions/groupby/CountDistinctSymbolGroupByFunction.java(4 hunks)core/src/main/java/io/questdb/griffin/engine/functions/groupby/CountDistinctSymbolGroupByFunctionFactory.java(2 hunks)core/src/main/java/io/questdb/griffin/engine/functions/groupby/CountDistinctUuidGroupByFunction.java(6 hunks)core/src/main/java/io/questdb/griffin/engine/functions/groupby/CountDistinctUuidGroupByFunctionFactory.java(2 hunks)core/src/main/java/io/questdb/griffin/engine/table/AsyncGroupByAtom.java(17 hunks)core/src/main/java/io/questdb/griffin/engine/table/AsyncGroupByRecordCursor.java(0 hunks)core/src/main/java/io/questdb/griffin/engine/table/AsyncGroupByRecordCursorFactory.java(6 hunks)core/src/main/java/io/questdb/std/DirectLongLongAscList.java(1 hunks)core/src/main/java/io/questdb/std/DirectLongLongDescList.java(1 hunks)core/src/main/java/io/questdb/std/Hash.java(1 hunks)core/src/main/resources/io/questdb/site/conf/server.conf(1 hunks)core/src/test/java/io/questdb/test/PropServerConfigurationTest.java(1 hunks)core/src/test/java/io/questdb/test/ServerMainTest.java(1 hunks)core/src/test/java/io/questdb/test/std/DirectLongLongAscListTest.java(1 hunks)core/src/test/java/io/questdb/test/std/DirectLongLongDescListTest.java(1 hunks)pkg/ami/marketplace/assets/server.conf(1 hunks)
💤 Files with no reviewable changes (1)
- core/src/main/java/io/questdb/griffin/engine/table/AsyncGroupByRecordCursor.java
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-11-19T12:21:00.062Z
Learnt from: jerrinot
Repo: questdb/questdb PR: 6413
File: core/src/test/java/io/questdb/test/cutlass/pgwire/PGJobContextTest.java:11982-12002
Timestamp: 2025-11-19T12:21:00.062Z
Learning: QuestDB Java tests use a deterministic random seed. The test utilities (e.g., io.questdb.test.tools.TestUtils and io.questdb.std.Rnd) produce reproducible sequences, so rnd_* functions (including rnd_uuid4) yield deterministic outputs across runs. Do not flag tests in core/src/test/** that assert against values produced by rnd_* as flaky due to randomness.
Applied to files:
core/src/test/java/io/questdb/test/std/DirectLongLongAscListTest.javacore/src/test/java/io/questdb/test/std/DirectLongLongDescListTest.java
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (34)
- GitHub Check: New pull request (Coverage Report Coverage Report)
- GitHub Check: New pull request (SelfHosted Running tests with cover on linux-other)
- GitHub Check: New pull request (SelfHosted Running tests with cover on linux-pgwire)
- GitHub Check: New pull request (SelfHosted Running tests with cover on linux-cairo-sub)
- GitHub Check: New pull request (SelfHosted Running tests with cover on linux-cairo-root)
- GitHub Check: New pull request (SelfHosted Running tests with cover on linux-fuzz2)
- GitHub Check: New pull request (SelfHosted Running tests with cover on linux-fuzz1)
- GitHub Check: New pull request (SelfHosted Running tests with cover on linux-griffin-sub)
- GitHub Check: New pull request (SelfHosted Running tests with cover on linux-griffin-root)
- GitHub Check: New pull request (Rust Test and Lint on linux-jdk17)
- GitHub Check: New pull request (SelfHosted Griffin tests on linux-arm64)
- GitHub Check: New pull request (SelfHosted Other tests on linux-x86-graal)
- GitHub Check: New pull request (SelfHosted Griffin tests on linux-x64-zfs)
- GitHub Check: New pull request (SelfHosted Other tests on linux-x64-zfs)
- GitHub Check: New pull request (SelfHosted Griffin tests on linux-x86-graal)
- GitHub Check: New pull request (Hosted Running tests on windows-other-2)
- GitHub Check: New pull request (Hosted Running tests on windows-other-1)
- GitHub Check: New pull request (Hosted Running tests on windows-pgwire)
- GitHub Check: New pull request (Hosted Running tests on windows-cairo-2)
- GitHub Check: New pull request (Hosted Running tests on windows-cairo-1)
- GitHub Check: New pull request (Hosted Running tests on windows-fuzz2)
- GitHub Check: New pull request (Hosted Running tests on windows-fuzz1)
- GitHub Check: New pull request (Hosted Running tests on windows-griffin-sub)
- GitHub Check: New pull request (Hosted Running tests on windows-griffin-base)
- GitHub Check: New pull request (Hosted Running tests on mac-other)
- GitHub Check: New pull request (Hosted Running tests on mac-pgwire)
- GitHub Check: New pull request (Hosted Running tests on mac-cairo-fuzz)
- GitHub Check: New pull request (Hosted Running tests on mac-cairo)
- GitHub Check: New pull request (SelfHosted Other tests on linux-arm64)
- GitHub Check: New pull request (Hosted Running tests on mac-griffin)
- GitHub Check: New pull request (SelfHosted Cairo tests on linux-arm64)
- GitHub Check: New pull request (SelfHosted Cairo tests on linux-x64-zfs)
- GitHub Check: New pull request (SelfHosted Cairo tests on linux-x86-graal)
- GitHub Check: New pull request (Check Changes Check changes)
🔇 Additional comments (47)
core/src/main/java/io/questdb/std/DirectLongLongDescList.java (1)
49-51: LGTM! Defensive guard prevents out-of-bounds memory operations.This guard prevents insertion when the list is at capacity and the binary search returns a position equal to capacity. Without it, Line 53's
memmovecould operate on invalid memory bounds.core/src/main/java/io/questdb/std/DirectLongLongAscList.java (1)
49-51: LGTM! Defensive guard prevents out-of-bounds memory operations.This guard mirrors the fix in DirectLongLongDescList.java and prevents out-of-bounds
memmoveoperations when the list is at capacity.core/src/main/java/io/questdb/griffin/engine/functions/groupby/CountDistinctUuidGroupByFunctionFactory.java (1)
54-58: LGTM! Factory updated to match simplified constructor.The removal of the
workerCountparameter aligns with the refactored CountDistinctUuidGroupByFunction constructor.core/src/main/java/io/questdb/griffin/engine/functions/GroupByFunction.java (2)
87-94: LGTM! Well-designed API addition for cardinality tracking.The default method pattern ensures backward compatibility while enabling implementations to expose cardinality statistics. Documentation clearly specifies the contract.
162-168: LGTM! Companion method for resetting statistics.The default no-op implementation maintains backward compatibility while allowing count_distinct implementations to reset their cardinality counters.
core/src/main/java/io/questdb/griffin/engine/functions/groupby/CountDistinctUuidGroupByFunction.java (3)
44-51: LGTM! Simplified constructor and cardinality tracking.The removal of
workerCountsimplifies the API. The newcardinalityfield enables lightweight statistics collection for sharding decisions.
60-88: LGTM! Cardinality tracking correctly increments on new distinct values.The cardinality counter increments only when a new UUID is added (Line 67 in
computeFirst, Line 85 incomputeNextwhenindex >= 0).
175-178: LGTM! Clean implementation of stats reset.The
resetStats()implementation correctly resets the cardinality counter to zero.core/src/main/java/io/questdb/griffin/engine/functions/groupby/CountDistinctLong256GroupByFunction.java (3)
45-52: LGTM! Simplified constructor matches pattern across count_distinct implementations.The refactoring is consistent with other count_distinct functions, removing
workerCountand addingcardinalitytracking.
60-91: LGTM! Cardinality tracking correctly increments on distinct Long256 values.The implementation correctly increments cardinality only when new distinct values are added.
178-181: LGTM! Stats reset implemented correctly.core/src/main/java/io/questdb/griffin/engine/functions/groupby/CountDistinctLong256GroupByFunctionFactory.java (1)
54-58: LGTM! Factory updated consistently with constructor changes.The removal of the
workerCountparameter aligns with the simplified CountDistinctLong256GroupByFunction constructor.core/src/main/resources/io/questdb/site/conf/server.conf (1)
441-441: Code default value is consistent with server.conf configuration.The verification confirms that the code default in PropServerConfiguration.java (line 1829) sets the threshold to 10,000, which matches the server.conf setting. This consistency across both configuration sources is correct and intentional.
core/src/main/java/io/questdb/griffin/engine/table/AsyncGroupByAtom.java (4)
395-401: Cardinality-based sharding heuristic looks reasonable; confirmGroupByFunctionstat semantics.The new heuristic:
- Resets per-fragment stats at the start of each frame via
fragment.resetLocalStats().- Uses
fragment.calculateLocalFunctionCardinality()inmaybeEnableSharding()to trigger sharding whenmax(cardinalityStat)for this fragment exceeds10 * threshold.- Accumulates
totalFunctionCardinalityper fragment and then sums owner + workers inupdateShardedHint()to driveshardedHintacross executions.Structurally this is sound and matches the PR description (switching to sharding when high
count_distinct()cardinality makes merging hash sets expensive). The only thing to validate is that allGroupByFunctionimplementations used here:
- Implement
resetStats()so thatgetCardinalityStat()for a fragment reflects “this frame’s contribution” (or at least a monotonic and meaningful measure per frame).- Are safe to call
resetStats()/getCardinalityStat()in the same concurrency pattern as their regular updates (one thread per fragment).Assuming those contracts hold, the heuristic is good and nicely targeted at the high‑cardinality
count_distinctcases.Also applies to: 696-702, 727-735, 620-623
301-305: Owner/per-worker function wiring for stats reset is consistent with existing allocator/updater setup.
getOwnerGroupByFunctions()plusgetGroupByFunctions(int slotId)andMapFragment.resetLocalStats()correctly route:
- Slot
-1(owner) and any configuration without per-worker functions toownerGroupByFunctions.- Non-owner slots with
perWorkerGroupByFunctions != nullto their respective per-worker lists.This matches how
GroupByFunctionsUpdaterand allocators are wired and ensures cardinality stats are reset on the same function instances that are being updated.The “thread-unsafe” note on
getOwnerGroupByFunctions()is appropriate; current usage fromAsyncGroupByRecordCursorFactory.toPlan()is read-only and single-threaded, so no issues there.Also applies to: 538-543, 646-657, 696-702
200-214: Clearing owner/per-worker group-by functions vs freeing them is consistent with factory lifecycle.
clear()now callsMisc.clearObjList(ownerGroupByFunctions)and clears eachperWorkerGroupByFunctionslist without freeing them, whileclose()frees them. GivenAsyncGroupByRecordCursorFactory._close()freesrecordFunctions(which includes group-by functions), this split between “reuse within the same factory” and “final teardown” looks correct and should avoid double-frees.No changes recommended here.
646-668:MapFragment.close()correctly resets cardinality state.Adding:
sharded = false; totalFunctionCardinality = 0;before closing the underlying map and shard maps ensures that cardinality stats and sharding state don’t leak across fragment lifecycles. This matches how
updateShardedHint()relies ontotalFunctionCardinalitybeing per-run.Looks good as implemented.
core/src/main/java/io/questdb/griffin/engine/table/AsyncGroupByRecordCursorFactory.java (3)
107-139: Cursor construction and plan values now correctly delegate toAsyncGroupByAtom.
- The cursor is now created as
new AsyncGroupByRecordCursor(engine, recordFunctions, messageBus), aligning with the updated cursor API that no longer needs a separategroupByFunctionslist.toPlan()now usesframeSequence.getAtom().getOwnerGroupByFunctions()for"values", which is the authoritative set of group-by functions held by the atom.This removes duplication of group-by function lists in the factory and keeps the plan representation tied to the actual atom state.
Looks good.
Also applies to: 187-189
216-238: Stats reset and sharding decisions are correctly integrated into aggregate paths.Both
aggregate()andfilterAndAggregate()now:
- Call
fragment.resetLocalStats()immediately after acquiring the slot and before touching data.- Call
atom.maybeEnableSharding(fragment)after processing the frame.This ensures per-fragment cardinality stats are fresh for each frame and that sharding decisions take into account both row/map volume and function cardinality across owner and worker fragments. The symmetry between filtered and unfiltered paths avoids skew in the heuristic.
No issues spotted here.
Also applies to: 390-413
418-424: FreeingrecordFunctionsin_close()remains appropriate with new ownership model.
_close()still freesrecordFunctions(which, per comment, includes group-by functions). WithAsyncGroupByAtomnow only clearing (not freeing) its group-by function lists inclear()and freeing them inclose(), this matches the expected ownership: the factory is responsible for ultimate disposal.Looks consistent.
core/src/test/java/io/questdb/test/ServerMainTest.java (1)
403-408: Updated expectedcairo.sql.parallel.groupby.sharding.thresholdmatches new default.Changing the expected value from
100000to10000in thetestShowParametersoutput string is consistent with the new default sharding threshold and keeps the integration test in sync with configuration.Looks correct.
core/src/main/java/io/questdb/PropServerConfiguration.java (1)
1826-1833: Default sharding threshold change looks consistentLowering the default
CAIRO_SQL_PARALLEL_GROUPBY_SHARDING_THRESHOLDto10_000is in line with the PR’s goal (trigger sharding earlier) and remains just a tuning knob; callers still override via config when needed. No issues from this constructor-side change alone.core/src/main/java/io/questdb/std/Hash.java (1)
123-125: hashShort64 implementation is correct and consistentUsing
fmix64(Short.toUnsignedLong(k))matches the pattern ofhashInt64/hashLong64and gives a well‑mixed 64‑bit hash for 16‑bit keys. No changes needed here.pkg/ami/marketplace/assets/server.conf (1)
349-356: Sample config now matches new default thresholdThe commented
cairo.sql.parallel.groupby.sharding.threshold=10000aligns with the new Java default and keeps the AMI config documentation consistent. Looks good.core/src/test/java/io/questdb/test/PropServerConfigurationTest.java (1)
312-312: LGTM! Test updated to reflect the new default threshold.The assertion correctly validates the lowered default parallel GROUP BY sharding threshold from 100,000 to 10,000, as described in the PR objectives.
core/src/main/java/io/questdb/cairo/map/Unordered2MapRecord.java (2)
33-33: LGTM! Import added to support hashing.The Hash import is required for the keyHashCode() implementation below.
330-335: LGTM! Proper hash implementation for map sharding.The keyHashCode() implementation correctly:
- Hashes the first 16-bit segment of the key using Hash.hashShort64()
- Includes clear documentation explaining the purpose (map sharding for high-cardinality count_distinct())
- Replaces the previous placeholder return of 0
This enables the AsyncGroupByRecordCursorFactory to distribute keys across multiple Unordered2Maps when cardinality is high, as described in the PR objectives.
core/src/main/java/io/questdb/griffin/engine/functions/groupby/CountDistinctIntGroupByFunctionFactory.java (2)
35-35: Minor formatting change.Added blank line after class declaration for consistency.
54-58: LGTM! Simplified constructor call.The CountDistinctIntGroupByFunction constructor now takes only capacity and loadFactor parameters, removing the workerCount parameter. This aligns with the PR's refactoring to make count_distinct() merge eager rather than lazy.
core/src/test/java/io/questdb/test/std/DirectLongLongDescListTest.java (2)
43-43: Enhanced test coverage with 10x more iterations.Increasing N from 10,000 to 100,000 provides more thorough testing of the DirectLongLongDescList implementation.
45-64: LGTM! Added duplicate value testing.The test now includes:
- Random duplicate probability to test handling of repeated values
- Conditional reassignment of
vto create duplicates- Correct assertion pattern using
vLongpolled from the oracle rather than the loop variableThis enhances test coverage for duplicate scenarios while maintaining deterministic behavior (QuestDB tests use deterministic random seeds).
core/src/main/java/io/questdb/cairo/map/Unordered2Map.java (2)
38-38: LGTM! Import added to support hashing.The Hash import is required for the hash() method implementation below.
359-364: LGTM! Proper hash implementation for map sharding.The hash() method correctly:
- Computes a hash of the short key using Hash.hashShort64()
- Includes clear documentation explaining the purpose (map sharding for high-cardinality scenarios)
- Replaces the previous no-op return of 0
This enables the AsyncGroupByRecordCursorFactory to distribute keys across multiple Unordered2Maps when count_distinct() has high cardinality, consistent with the PR objectives.
core/src/test/java/io/questdb/test/std/DirectLongLongAscListTest.java (2)
43-43: Enhanced test coverage with 10x more iterations.Increasing N from 10,000 to 100,000 provides more thorough testing of the DirectLongLongAscList implementation.
45-64: LGTM! Added duplicate value testing.The test enhancements match the pattern in DirectLongLongDescListTest:
- Random duplicate probability to test handling of repeated values
- Conditional reassignment of
vto create duplicates- Correct assertion pattern using
vLongpolled from the oracleThis ensures consistent test coverage for both ascending and descending list implementations while maintaining deterministic behavior.
core/src/main/java/io/questdb/griffin/engine/functions/groupby/CountDistinctLongGroupByFunctionFactory.java (2)
35-35: Minor formatting change.Added blank line after class declaration for consistency.
54-58: LGTM! Simplified constructor call.The CountDistinctLongGroupByFunction constructor now takes only capacity and loadFactor parameters, removing the workerCount parameter. This is consistent with the refactoring across all count_distinct() implementations to enable eager merging.
core/src/main/java/io/questdb/griffin/engine/functions/groupby/CountDistinctSymbolGroupByFunctionFactory.java (2)
35-35: Minor formatting change.Added blank line after class declaration for consistency.
54-58: LGTM! Simplified constructor call.The CountDistinctSymbolGroupByFunction constructor now takes only capacity and loadFactor parameters, removing the workerCount parameter. This completes the consistent refactoring pattern across all count_distinct() function factories.
core/src/main/java/io/questdb/griffin/engine/functions/groupby/CountDistinctIntGroupByFunction.java (2)
35-40: Constructor change matches new int count_distinct backing setsConstructor now wires two
GroupByIntHashSetinstances withNumbers.INT_NULLsentinel into the abstract base, consistent with the updated parallel/merge strategy and avoidingGroupByLongList. No issues spotted.
43-85: Cardinality updates are correctly tied to new distinct int values
cardinality++only happens when:
- we see the first non-null value for a group,
- we upgrade from inlined value to a set with a genuinely different second value,
- or we insert a new key into an existing set (
keyIndex >= 0).Duplicates and NULLs do not affect the stat, which is what you want for a “sum of per-group distincts” metric.
core/src/main/java/io/questdb/griffin/engine/functions/groupby/CountDistinctIPv4GroupByFunctionFactory.java (1)
36-58: Factory wiring now matches 3‑arg IPv4 count_distinct constructor
newInstancecorrectly drops the workerCount parameter and passes(arg, countDistinctCapacity, countDistinctLoadFactor)toCountDistinctIPv4GroupByFunction. Looks consistent with the other factories.core/src/main/java/io/questdb/griffin/engine/functions/groupby/CountDistinctIPv4GroupByFunction.java (2)
35-41: IPv4 constructor update is consistent with Int/Symbol variantsUsing two
GroupByIntHashSetinstances withNumbers.IPv4_NULLas the empty sentinel matches the new shared pattern and the in-code comment about zero-based nulls; nothing problematic here.
44-86: IPv4 cardinality tracking matches int implementationThe
cardinality++sites line up with the same three “first-time distinct” cases as in the int implementation (first non-null, inline→set transition with a new value, and set insertion wherekeyIndex >= 0). This keeps distinct-count stats coherent across types.core/src/main/java/io/questdb/griffin/engine/functions/groupby/CountDistinctSymbolGroupByFunction.java (2)
42-47: Symbol constructor aligned with shared Int-based abstractionDropping workerCount and wiring two
GroupByIntHashSetinstances keyed onVALUE_IS_NULLkeeps the symbol variant in lockstep with the other int-backed count_distinct functions. No issues.
56-98: Symbol cardinality stat integrates cleanly with existing early-exit logic
cardinality++is only updated when:
- a non-null symbol is seen for an empty group,
- we upgrade from a single inlined symbol to a set with a different second symbol,
- we insert a new key into an existing set.
This leaves
earlyExit()semantics untouched (they still use the per-group count vsknownSymbolCount) while providing a consistent “sum of per-group distincts” stat for heuristics.core/src/main/java/io/questdb/griffin/engine/functions/groupby/CountDistinctLongGroupByFunction.java (1)
39-50: Cardinality stat wiring for long count_distinct looks correct; confirm intended semanticsThe new
cardinalityfield is:
- Initialised implicitly to 0 and reset via
resetStats().- Incremented only when a genuinely new, non‑NULL long value is added to a group:
- first non‑NULL value (computeFirst /
cnt == 0),- inline→set upgrade when the second value differs,
- insertion into an existing set when
keyIndex >= 0.- Exposed via
getCardinalityStat().This matches the int/IPv4/symbol patterns and effectively tracks “sum of per‑group distinct insertions for this function instance”, not the size of any particular backing set or a global de‑duplicated cardinality.
merge()does not adjustcardinality, so the stat is independent of inter‑map merges.If AsyncGroupBy’s heuristics are based on “total number of distinct entries stored across this worker’s hash sets” (i.e., a cost proxy), this implementation is appropriate. If they instead expect a post‑merge/global cardinality, the behaviour should be documented or adjusted accordingly.
Also applies to: 59-100, 107-110, 214-217
|
@puzpuzpuz is there diff between graal vs corretto? |
I only measured on OpenJDK. Will measure on Corretto and GraalVM CE and post the findings here. |
|
@bluestreak01 the findings are somewhat interesting. GraalVM CE 17 is noticeably slower than both OpenJDK 17 and Corretto 17 in Cold Run:
Here, In Hot Run, the total score is the same, but GraalVM is 35% slower (0.5s) in Q28 (the regexp one) and 30% faster (120ms) in Q35. This results in the following combined score:
|


Include the following changes around parallel keyed GROUP BY:
count_distinct()functions is now eager, not lazy. That's for the optimization from the next item.count_distinct()functions: they're now able to report how many distinct values in total they observed. The cardinality is considered by parallel keyed GROUP BY factory (AsyncGroupByRecordCursorFactory) when it checks whether it should switch to map sharding (a.k.a. radix partitioning), along with per-worker map sizes. This way, when there are high-cardinalitycount_distinct()hash sets, we run map merge phase in parallel.cairo.sql.parallel.groupby.sharding.thresholdconfiguration property to 10k instead of 100k. This means that we switch to map sharding (think, parallel merge) in more cases.Benchmarks
ClickBench results on Ryzen 7900x 64GB RAM box running Ubuntu 22.04 are below. The comparison is done with v9.1 for the sake of fair comparison with what we had before #6268