topsql/reporter: add TopRU RU window aggregation and reporting pipeline#67089
topsql/reporter: add TopRU RU window aggregation and reporting pipeline#67089ti-chi-bot[bot] merged 10 commits intomasterfrom
Conversation
|
Review Complete Findings: 0 issues ℹ️ Learn more details on Pantheon AI. |
|
Hi @zimulala. Thanks for your PR. PRs from untrusted users cannot be marked as trusted with I understand the commands that are listed here. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds TopRU collection and reporting: in-memory RU data model, a 15s‑bucket sliding RU window aggregator with 60s reporting windows, reporter wiring (non‑blocking RU queue, worker, API), RU included in ReportData, many unit tests/benchmarks, and BUILD/test dependency updates. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Reporter as RemoteTopSQLReporter
participant Worker as collectRUWorker
participant Aggregator as ruWindowAggregator
participant Sink as DataSink
Client->>Reporter: CollectRUIncrements(data)
Reporter->>Reporter: enqueue to collectRUIncrementsChan (drop metric if full)
activate Worker
Worker->>Reporter: dequeue increments
Worker->>Aggregator: addBatchToBucket(ruIncrements)
deactivate Worker
Reporter->>Aggregator: takeReportRecords(nowTs, itemInterval)
Aggregator->>Aggregator: align windows, merge 15s buckets, apply top‑N caps
Aggregator-->>Reporter: []TopRURecord
Reporter->>Reporter: attach RURecords to collectedData
Reporter->>Sink: send ReportData (SQLMeta + RURecords)
Sink-->>Sink: consume report
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
/ok-to-test |
|
/retest |
There was a problem hiding this comment.
🧹 Nitpick comments (2)
pkg/util/topsql/topsql.go (1)
65-67: Capture the registered RU collector instance to keep lifecycle symmetric.Line 65 registers the collector from the current
globalTopProfilingReport, while Line 86 unregisters from whatever instanceglobalTopProfilingReportpoints to at close time. If tests replace the global viaSetupTopProfilingForTestbetween setup/close, the originally registered collector can remain registered.♻️ Proposed refactor
var ( globalTopProfilingReport reporter.TopSQLReporter singleTargetDataSink *reporter.SingleTargetDataSink + registeredRUCollector stmtstats.RUCollector ) @@ stmtstats.RegisterCollector(globalTopProfilingReport) if ruCollector, ok := globalTopProfilingReport.(stmtstats.RUCollector); ok { stmtstats.RegisterRUCollector(ruCollector) + registeredRUCollector = ruCollector } stmtstats.SetupAggregator() } @@ - if ruCollector, ok := globalTopProfilingReport.(stmtstats.RUCollector); ok { - stmtstats.UnregisterRUCollector(ruCollector) + if registeredRUCollector != nil { + stmtstats.UnregisterRUCollector(registeredRUCollector) + registeredRUCollector = nil }Also applies to: 86-88
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/util/topsql/topsql.go` around lines 65 - 67, When registering the RU collector from the current globalTopProfilingReport, capture and store the specific collector instance (the ruCollector returned in the registration block) so you can unregister that exact instance later; change the registration site that calls stmtstats.RegisterRUCollector(ruCollector) to keep a module-level/local field (e.g., savedRUCollector) and then update the teardown/close code that currently calls stmtstats.UnregisterRUCollector(...) to use savedRUCollector instead of re-reading globalTopProfilingReport; make the same change for the TX collector path so SetupTopProfilingForTest replacements don’t leave the original collector registered.pkg/util/topsql/reporter/ru_datamodel.go (1)
123-142: Consider the O(n) timestamp lookup in high-throughput scenarios.The
addmethod uses a linear scan to find matching timestamps. For the current 15s-bucket design (max ~4 timestamps per 60s window), this is fine. If the bucket granularity ever changes to support many more timestamps per record, consider using a map-based lookup.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/util/topsql/reporter/ru_datamodel.go` around lines 123 - 142, The add method on ruRecord currently does an O(n) scan over r.items to match timestamp; change ruRecord to maintain a map (e.g., itemsMap map[uint64]*ruItem) alongside the items slice and update add to first look up the ruItem in itemsMap by timestamp and update it (and r.totalRU), otherwise create a new ruItem, append it to r.items and insert it into itemsMap; ensure any other methods that modify r.items (removal, reset, serialization) also update itemsMap accordingly so both structures stay consistent.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@pkg/util/topsql/reporter/ru_datamodel.go`:
- Around line 123-142: The add method on ruRecord currently does an O(n) scan
over r.items to match timestamp; change ruRecord to maintain a map (e.g.,
itemsMap map[uint64]*ruItem) alongside the items slice and update add to first
look up the ruItem in itemsMap by timestamp and update it (and r.totalRU),
otherwise create a new ruItem, append it to r.items and insert it into itemsMap;
ensure any other methods that modify r.items (removal, reset, serialization)
also update itemsMap accordingly so both structures stay consistent.
In `@pkg/util/topsql/topsql.go`:
- Around line 65-67: When registering the RU collector from the current
globalTopProfilingReport, capture and store the specific collector instance (the
ruCollector returned in the registration block) so you can unregister that exact
instance later; change the registration site that calls
stmtstats.RegisterRUCollector(ruCollector) to keep a module-level/local field
(e.g., savedRUCollector) and then update the teardown/close code that currently
calls stmtstats.UnregisterRUCollector(...) to use savedRUCollector instead of
re-reading globalTopProfilingReport; make the same change for the TX collector
path so SetupTopProfilingForTest replacements don’t leave the original collector
registered.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 56de3fb6-e361-4709-8ae5-d05ac16af549
📒 Files selected for processing (11)
pkg/util/topsql/reporter/BUILD.bazelpkg/util/topsql/reporter/reporter.gopkg/util/topsql/reporter/reporter_test.gopkg/util/topsql/reporter/ru_datamodel.gopkg/util/topsql/reporter/ru_datamodel_test.gopkg/util/topsql/reporter/ru_window_aggregator.gopkg/util/topsql/reporter/ru_window_aggregator_test.gopkg/util/topsql/reporter/topru_case_runner_test.gopkg/util/topsql/reporter/topru_generated_cases_test.gopkg/util/topsql/stmtstats/aggregator_bench_test.gopkg/util/topsql/topsql.go
XuHuaiyu
left a comment
There was a problem hiding this comment.
PR review: TopRU aggregation and reporting pipeline. Two findings below.
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
pkg/util/topsql/reporter/ru_window_aggregator.go (1)
178-199:⚠️ Potential issue | 🟠 MajorApply the final 100x100 cap after merging sub-intervals.
intervalCompactedlimits each 15s/30s slice, butmergedOutput.mergeFrom(...)can still accumulate up to 4× the configured user/SQL count across the 60s window. ReturningmergedOutput.toTopRURecords(...)directly therefore breaks the file's own 100x100 contract foritemInterval=15and30.♻️ Proposed fix
- // Convert to proto at output. - return mergedOutput.toTopRURecords(keyspaceName) + finalOutput := mergedOutput.compactWithLimits(ruReportTopNUsers, ruReportTopNSQLsPerUser) + if finalOutput == nil { + return nil + } + return finalOutput.toTopRURecords(keyspaceName)Please also add a regression for the 15s/30s over-cap cases, not just the 60s path.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/util/topsql/reporter/ru_window_aggregator.go` around lines 178 - 199, The mergedOutput can exceed the configured top-N caps because you only compact each sub-interval (intervalCompacted) but never re-apply compactWithLimits after merging those intervalCompacted results; fix by calling mergedOutput = mergedOutput.compactWithLimits(ruReportTopNUsers, ruReportTopNSQLsPerUser) (or equivalent in-place compaction) immediately before converting to proto with mergedOutput.toTopRURecords(keyspaceName), and ensure the singleBucket path still returns a capped result (intervalCompacted is fine there). Also add regression tests that exercise itemInterval=15 and itemInterval=30 to assert the final output respects the 100x100 caps.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pkg/util/topsql/reporter/reporter.go`:
- Around line 172-185: CollectRUIncrements currently enqueues un-timestamped RU
batches onto collectRUIncrementsChan and batches are timestamped later in
collectRUWorker, which can reorder attribution around the reporting tick; to
fix, change the channel payload to include the producer timestamp (e.g. wrap
stmtstats.RUIncrementMap into a struct with a time.Time field) and have
CollectRUIncrements stamp the batch before sending, then update collectRUWorker
to drain all pending entries and process them in timestamp order at each tick
(or only process entries during the tick goroutine) so RU is attributed to the
correct window; apply the same pattern to the other enqueue points referenced
around collectRUIncrementsChan and the related enqueue/consumer code in the
collectRUWorker/reporting logic (the same timestamped-envelope +
drain-before-report approach).
---
Duplicate comments:
In `@pkg/util/topsql/reporter/ru_window_aggregator.go`:
- Around line 178-199: The mergedOutput can exceed the configured top-N caps
because you only compact each sub-interval (intervalCompacted) but never
re-apply compactWithLimits after merging those intervalCompacted results; fix by
calling mergedOutput = mergedOutput.compactWithLimits(ruReportTopNUsers,
ruReportTopNSQLsPerUser) (or equivalent in-place compaction) immediately before
converting to proto with mergedOutput.toTopRURecords(keyspaceName), and ensure
the singleBucket path still returns a capped result (intervalCompacted is fine
there). Also add regression tests that exercise itemInterval=15 and
itemInterval=30 to assert the final output respects the 100x100 caps.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 645ec5ff-bab3-4bda-a805-13ac7ef3fe47
📒 Files selected for processing (3)
pkg/util/topsql/reporter/reporter.gopkg/util/topsql/reporter/ru_datamodel.gopkg/util/topsql/reporter/ru_window_aggregator.go
a6d1e3c to
e760e1e
Compare
There was a problem hiding this comment.
🧹 Nitpick comments (1)
pkg/util/topsql/reporter/ru_datamodel.go (1)
123-142: Consider: Linear scan inruRecord.addmay become a bottleneck.The
addmethod performs a linear scan overitemsto find an existing timestamp. For typical 15s buckets with 1-4 timestamps this is acceptable, but if items grow larger (e.g., during merges), this could become O(n²).A map-based lookup could improve performance if profiling shows this as a hotspot.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/util/topsql/reporter/ru_datamodel.go` around lines 123 - 142, The add method on ruRecord currently does a linear scan over r.items to find matching timestamp which can degrade to O(n²); replace this with a map-based index (e.g., add a field on ruRecord like itemsIndex map[uint64]int) and change ruRecord.add to look up timestamp in itemsIndex, update the existing ruItem by index when present, or append a new ruItem and record its index in itemsIndex; ensure you update itemsIndex whenever you append, merge, or reorder items and keep r.totalRU changes identical to the current logic in add.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@pkg/util/topsql/reporter/ru_datamodel.go`:
- Around line 123-142: The add method on ruRecord currently does a linear scan
over r.items to find matching timestamp which can degrade to O(n²); replace this
with a map-based index (e.g., add a field on ruRecord like itemsIndex
map[uint64]int) and change ruRecord.add to look up timestamp in itemsIndex,
update the existing ruItem by index when present, or append a new ruItem and
record its index in itemsIndex; ensure you update itemsIndex whenever you
append, merge, or reorder items and keep r.totalRU changes identical to the
current logic in add.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 0ff2db95-dc25-4843-87f7-d766f4c1c89e
📒 Files selected for processing (5)
pkg/util/topsql/reporter/BUILD.bazelpkg/util/topsql/reporter/reporter.gopkg/util/topsql/reporter/ru_datamodel.gopkg/util/topsql/reporter/ru_window_aggregator.gopkg/util/topsql/reporter/ru_window_aggregator_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
- pkg/util/topsql/reporter/ru_window_aggregator_test.go
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #67089 +/- ##
================================================
- Coverage 77.7079% 77.3741% -0.3339%
================================================
Files 2013 1940 -73
Lines 551161 549623 -1538
================================================
- Hits 428296 425266 -3030
- Misses 121134 124304 +3170
+ Partials 1731 53 -1678
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
If https://github.com/pingcap/tidb/pull/67089/changes#r2945836514 resolved, I'll approve.
|
Re: ru_datamodel.go `ruRecord.add` linear scan (CodeRabbit nitpick): For the current 15s-bucket design we have at most a few timestamps per record per 60s window, so the linear scan is intentional and acceptable. If we change granularity or profiling shows this as a hotspot, we can add a map-based index (e.g. `itemsMap map[uint64]*ruItem`) in a follow-up. |
Made-with: Cursor
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
pkg/util/topsql/reporter/reporter_test.go (1)
1211-1236: Benchmarks should avoid unbounded sink accumulation.
initializeCacheregisters a sink that retains everyReportData; in benchmark loops this can dominate memory/time and blur reporter-path measurements. Prefer a no-op or bounded/drained sink for benchmark scenarios.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/util/topsql/reporter/reporter_test.go` around lines 1211 - 1236, The benchmark currently uses initializeCache which registers a sink that retains every ReportData and causes unbounded accumulation; change the benchmark to register a no-op/draining sink instead of the retaining sink (or modify initializeCache to accept a sink parameter) so ReportData is discarded promptly. Concretely, for the BenchmarkReporterScenarios setup (where tsr is created for TopSQLOnly, TopRUOnly, and TopSQLAndTopRU), create and use a sink that simply returns nil or drains reports (does not append to a slice) and pass it into initializeCache or call tsr.RegisterSink with that no-op sink before the loops so populateCache, populateCacheWithRU and tsr.doReport do not retain ReportData.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pkg/util/topsql/reporter/reporter_test.go`:
- Around line 247-285: Record the original TopSQL enabled state at the start
(e.g. origTopSQLEnabled := topsqlstate.TopSQLEnabled()) before calling
topsqlstate.DisableTopSQL(), and in the existing t.Cleanup restore it by calling
topsqlstate.EnableTopSQL() or topsqlstate.DisableTopSQL() based on
origTopSQLEnabled; use the same
TestEffectiveReportIntervalSecondsTopSQLIndependentFromTopRU test and
topsqlstate.EnableTopSQL/DisableTopSQL/TopSQLEnabled symbols so the global
TopSQL flag is returned to its prior state to avoid cross-test leakage.
In `@pkg/util/topsql/reporter/ru_window_aggregator_test.go`:
- Around line 69-80: The helper fillAggregatorSteadyState60sAt10kKeys currently
sets numUsers=200 and numSQLsPerUser=200 producing 40,000 keys; change the
constants so numUsers * numSQLsPerUser == 10_000 (e.g., numUsers=100 and
numSQLsPerUser=100) and rebuild the batch via makeRUBatch(numUsers,
numSQLsPerUser) so the function matches its name/comment and the benchmark
targets 10k keys; update only the constants in
fillAggregatorSteadyState60sAt10kKeys and any related comment text if needed.
---
Nitpick comments:
In `@pkg/util/topsql/reporter/reporter_test.go`:
- Around line 1211-1236: The benchmark currently uses initializeCache which
registers a sink that retains every ReportData and causes unbounded
accumulation; change the benchmark to register a no-op/draining sink instead of
the retaining sink (or modify initializeCache to accept a sink parameter) so
ReportData is discarded promptly. Concretely, for the BenchmarkReporterScenarios
setup (where tsr is created for TopSQLOnly, TopRUOnly, and TopSQLAndTopRU),
create and use a sink that simply returns nil or drains reports (does not append
to a slice) and pass it into initializeCache or call tsr.RegisterSink with that
no-op sink before the loops so populateCache, populateCacheWithRU and
tsr.doReport do not retain ReportData.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 551cab77-6549-4904-be6b-280a0a58ebc3
📒 Files selected for processing (5)
pkg/util/topsql/reporter/reporter_test.gopkg/util/topsql/reporter/ru_datamodel_test.gopkg/util/topsql/reporter/ru_window_aggregator.gopkg/util/topsql/reporter/ru_window_aggregator_test.gopkg/util/topsql/reporter/topru_generated_cases_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
- pkg/util/topsql/reporter/ru_window_aggregator.go
| if len(batch.data) == 0 { | ||
| continue | ||
| } | ||
| tsr.ruAggregator.addBatchToBucket(batch.timestamp, batch.data) |
There was a problem hiding this comment.
High priority
Moving the timestamp to enqueue time is necessary, but this still leaves a cross-tick race because report flushing and RU draining are on separate goroutines.
Impact
- Suppose a batch is enqueued at
t = 59, butcollectRUWorkerdoes not consume it until aftertakeReportRecords(60)has advancedlastReportedEndTsto60. - This line will then call
addBatchToBucket(59, ...), which aligns to bucket45. ruWindowAggregator.addBatchToBucket(...)will treat that as late data (45 < 60) and drop it entirely.
So a batch produced before the report tick can still disappear from the closed window; this is not only a best-effort shift to the next window.
Test gap
TestTopRUBestEffortBoundaryShift covers a batch collected at t = 61 (already after the tick), but it does not cover the remaining problematic case: collected before the tick, drained after the tick.
Suggested direction
Before closing/reporting a window, drain pending RU batches into the aggregator, or serialize RU ingestion and report flushing on the same goroutine/event loop.
89b8852 to
0374ede
Compare
XuHuaiyu
left a comment
There was a problem hiding this comment.
Re-reviewed the latest update. Serializing RU batch ingestion onto collectWorker and shifting late batches to the earliest still-open window addresses my previous concern about pre-tick batches being dropped by the report/drain race. The updated aggregator/reporter tests also look good from my side.
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: XuHuaiyu, yibin87 The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
|
/retest |
|
/retest |
|
/retest |
|
/retest |
|
/retest |
|
/retest |
What problem does this PR solve?
Issue Number: close #67065
Problem Summary:
Add reporter-side TopRU aggregation/output path, while keeping PR2/PR3 responsibilities split and reviewable.
What changed and how does it work?
ru_datamodel.go,ru_window_aggregator.goRURecordson report tick:reporter.gotopsql.goru_datamodel_test.go,ru_window_aggregator_test.go,topru_case_runner_test.go,topru_generated_cases_test.go,reporter_test.gotopru_structured_test.gofrom reporter test srcs (file no longer exists in current source branch).Dependency note:
This PR depends on PR2 (RU delta collection in stmtstats/executor). RU collection is completed in PR2; this PR only handles reporter aggregation/output.
Check List
Tests
Side effects
Documentation
Release note
Summary by CodeRabbit
New Features
Chores
Tests