resourcegroup, session, metrics: cherry-pick runaway watch improvements for v8.5.5#67352
Conversation
📝 WalkthroughWalkthroughAdds runaway flusher/syncer instrumentation and Grafana panels, implements a generic in-memory batch flusher with tests, refactors Manager to use atomic per-group counters and batch flushers, changes syncer to ID-based checkpoints, and bumps bootstrap version to 224 with index upgrades and related tests. Changes
Sequence Diagram(s)sequenceDiagram
participant RM as RunawayManager
participant BF as batchFlusher
participant T as Ticker
participant SQL as ExecRCRestrictedSQL
participant MET as Prometheus
rect rgba(200,200,255,0.5)
Note over RM,BF: Periodic batched flushing for runaway/quarantine/stale records
end
loop periodic
T->>BF: tick
BF->>BF: check buffer / threshold
alt flush needed
BF->>BF: dedupe/merge records
BF->>SQL: gen & ExecRCRestrictedSQL(batch SQL, params)
SQL-->>BF: success / error
BF->>MET: observe duration, batch size, counters
BF->>RM: reset buffer, update lastFlushTime
else no-op
BF-->>RM: continue
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 golangci-lint (2.11.4)Error: can't load config: unsupported version of the configuration: "" See https://golangci-lint.run/docs/product/migration-guide for migration instructions Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 5
🧹 Nitpick comments (2)
pkg/session/bootstrap_test.go (1)
2612-2619: Close the recordset ingetTableCreateSQLFnto avoid test-side resource leakage.Each
show create tablecall should closeresafter reading.♻️ Proposed fix
getTableCreateSQLFn := func(se sessiontypes.Session, tableName string) string { res := MustExecToRecodeSet(t, se, fmt.Sprintf("show create table mysql.%s", tableName)) + defer func() { require.NoError(t, res.Close()) }() chk := res.NewChunk(nil) err = res.Next(ctx, chk) require.NoError(t, err) require.Equal(t, 1, chk.NumRows()) return string(chk.GetRow(0).GetBytes(1)) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/session/bootstrap_test.go` around lines 2612 - 2619, getTableCreateSQLFn opens a recordset via MustExecToRecodeSet but never closes it, leaking resources; fix by closing the recordset (res) after use—best add defer res.Close() immediately after obtaining res in getTableCreateSQLFn (so the recordset is always closed even on errors) while keeping the existing calls to res.NewChunk, res.Next, and returns intact.pkg/resourcegroup/runaway/record.go (1)
140-158: Factor the watch-row argument builder once.
genInsertionStmtandgenBatchInsertWatchStmtnow duplicate the watch-table field order,NullTimehandling, and switch-group mapping. A shared helper would keep the single-row and batch paths in lockstep and make future backports less error-prone.As per coding guidelines "Code SHOULD remain maintainable for future readers with basic TiDB familiarity, including readers who are not experts in the specific subsystem/feature."
Also applies to: 196-223
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/resourcegroup/runaway/record.go` around lines 140 - 158, genInsertionStmt and genBatchInsertWatchStmt duplicate the same watch-table field ordering, NullTime handling, and switch-group mapping; factor that logic into a single helper (e.g., buildWatchRowParams or watchRowValues) that accepts a *QuarantineRecord and returns the prepared []any for a single row (performing the NullTime -> nil conversion and calling r.getSwitchGroupName()), then update genInsertionStmt and genBatchInsertWatchStmt to reuse that helper for building params while keeping the existing placeholder order produced by writeInsert/getRunawayWatchTableName; ensure the helper is used for both single-row and batch paths so ordering and conversions remain identical.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pkg/resourcegroup/runaway/flusher.go`:
- Around line 127-139: The current flush logic always clears f.buffer and
advances f.lastFlushTime regardless of flush outcome; change it so that after
calling f.flushFn(f.buffer) and recording metrics (f.batchSizeObserver,
f.durationObserver), you only reset f.buffer (make(map[K]V, f.threshold)) and
set f.lastFlushTime = now when err == nil (successful flush). If err != nil,
increment f.flushErrorCounter and leave f.buffer and f.lastFlushTime untouched
so buffered records are preserved for retry; keep the existing
f.flushSuccessCounter increment only on success.
In `@pkg/resourcegroup/runaway/manager.go`:
- Around line 170-177: The merge function passed to newBatchFlusher for
quarantineRecordFlusher currently keeps the first buffered QuarantineRecord per
key, which can preserve stale data; change it to keep the newest buffered record
instead—either always overwrite m[k] with v in that closure or compare an
age/ID/timestamp field on QuarantineRecord and replace m[k] when v is newer
(this logic should align with addWatchList’s rule that the latest local watch
(ID==0) is authoritative). Update the merge closure used when constructing
quarantineRecordFlusher (and any similar flusher closures) so the newer record
wins.
- Around line 194-205: The created ticker runawayRecordGCTicker is never stopped
causing a goroutine leak; after the line that constructs runawayRecordGCTicker
:= time.NewTicker(gcInterval) add a defer to stop it (e.g., defer
runawayRecordGCTicker.Stop()) so the ticker is cleaned up when the function
returns; locate the ticker creation near the existing calls to
rm.runawayRecordChan(), rm.quarantineRecordChan(), and
rm.staleQuarantineRecordChan() and insert the defer immediately after that
construction.
In `@pkg/resourcegroup/tests/resource_group_test.go`:
- Around line 550-559: After enabling the failpoint via
failpoint.Enable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/skipFlush",
...) immediately register a cleanup to always disable it (use t.Cleanup(func(){
failpoint.Disable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/skipFlush")
}) or a guarded defer) so the failpoint is turned off even if subsequent
assertions (the three tk.QueryToErr calls with resource_group hint) fail; update
the block around the failpoint.Enable call to add this cleanup and remove the
later unconditional failpoint.Disable call.
In `@pkg/session/bootstrap_test.go`:
- Line 2607: The helper getTableCreateSQLFn creates a result set via
MustExecToRecodeSet but never closes it; after the call that assigns the result
set (the variable named res from MustExecToRecodeSet) add a defer res.Close()
immediately so the result is always closed and connection handles are released;
update getTableCreateSQLFn to defer res.Close() right after MustExecToRecodeSet
returns to prevent resource leaks during tests.
---
Nitpick comments:
In `@pkg/resourcegroup/runaway/record.go`:
- Around line 140-158: genInsertionStmt and genBatchInsertWatchStmt duplicate
the same watch-table field ordering, NullTime handling, and switch-group
mapping; factor that logic into a single helper (e.g., buildWatchRowParams or
watchRowValues) that accepts a *QuarantineRecord and returns the prepared []any
for a single row (performing the NullTime -> nil conversion and calling
r.getSwitchGroupName()), then update genInsertionStmt and
genBatchInsertWatchStmt to reuse that helper for building params while keeping
the existing placeholder order produced by writeInsert/getRunawayWatchTableName;
ensure the helper is used for both single-row and batch paths so ordering and
conversions remain identical.
In `@pkg/session/bootstrap_test.go`:
- Around line 2612-2619: getTableCreateSQLFn opens a recordset via
MustExecToRecodeSet but never closes it, leaking resources; fix by closing the
recordset (res) after use—best add defer res.Close() immediately after obtaining
res in getTableCreateSQLFn (so the recordset is always closed even on errors)
while keeping the existing calls to res.NewChunk, res.Next, and returns intact.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: e9a1dbd1-aeaa-41b8-981e-cef789e1f423
📒 Files selected for processing (18)
br/pkg/restore/snap_client/systable_restore_test.gopkg/metrics/grafana/tidb_resource_control.jsonpkg/metrics/grafana/tidb_resource_control.jsonnetpkg/metrics/metrics.gopkg/metrics/resource_group.gopkg/resourcegroup/runaway/BUILD.bazelpkg/resourcegroup/runaway/checker.gopkg/resourcegroup/runaway/checker_test.gopkg/resourcegroup/runaway/flusher.gopkg/resourcegroup/runaway/flusher_test.gopkg/resourcegroup/runaway/manager.gopkg/resourcegroup/runaway/record.gopkg/resourcegroup/runaway/record_test.gopkg/resourcegroup/runaway/syncer.gopkg/resourcegroup/tests/BUILD.bazelpkg/resourcegroup/tests/resource_group_test.gopkg/session/bootstrap.gopkg/session/bootstrap_test.go
💤 Files with no reviewable changes (1)
- pkg/resourcegroup/runaway/record_test.go
| err := f.flushFn(f.buffer) | ||
| duration := time.Since(start) | ||
|
|
||
| f.batchSizeObserver.Observe(float64(batchSize)) | ||
| f.durationObserver.Observe(duration.Seconds()) | ||
| if err != nil { | ||
| f.flushErrorCounter.Inc() | ||
| } else { | ||
| f.flushSuccessCounter.Inc() | ||
| } | ||
|
|
||
| f.lastFlushTime = now | ||
| f.buffer = make(map[K]V, f.threshold) |
There was a problem hiding this comment.
Preserve buffered records on flush failure.
Lines 127-139 clear f.buffer even when flushFn returns an error. That turns a transient ExecRCRestrictedSQL failure into permanent loss of pending runaway records, watch inserts, or stale-watch deletes. Only advance lastFlushTime and reset the buffer after a successful flush.
🛠️ Minimal fix
f.batchSizeObserver.Observe(float64(batchSize))
f.durationObserver.Observe(duration.Seconds())
if err != nil {
f.flushErrorCounter.Inc()
- } else {
- f.flushSuccessCounter.Inc()
+ return
}
-
- f.lastFlushTime = now
- f.buffer = make(map[K]V, f.threshold)
+ f.flushSuccessCounter.Inc()
+ f.lastFlushTime = now
+ f.buffer = make(map[K]V, f.threshold)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| err := f.flushFn(f.buffer) | |
| duration := time.Since(start) | |
| f.batchSizeObserver.Observe(float64(batchSize)) | |
| f.durationObserver.Observe(duration.Seconds()) | |
| if err != nil { | |
| f.flushErrorCounter.Inc() | |
| } else { | |
| f.flushSuccessCounter.Inc() | |
| } | |
| f.lastFlushTime = now | |
| f.buffer = make(map[K]V, f.threshold) | |
| err := f.flushFn(f.buffer) | |
| duration := time.Since(start) | |
| f.batchSizeObserver.Observe(float64(batchSize)) | |
| f.durationObserver.Observe(duration.Seconds()) | |
| if err != nil { | |
| f.flushErrorCounter.Inc() | |
| return | |
| } | |
| f.flushSuccessCounter.Inc() | |
| f.lastFlushTime = now | |
| f.buffer = make(map[K]V, f.threshold) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/resourcegroup/runaway/flusher.go` around lines 127 - 139, The current
flush logic always clears f.buffer and advances f.lastFlushTime regardless of
flush outcome; change it so that after calling f.flushFn(f.buffer) and recording
metrics (f.batchSizeObserver, f.durationObserver), you only reset f.buffer
(make(map[K]V, f.threshold)) and set f.lastFlushTime = now when err == nil
(successful flush). If err != nil, increment f.flushErrorCounter and leave
f.buffer and f.lastFlushTime untouched so buffered records are preserved for
retry; keep the existing f.flushSuccessCounter increment only on success.
| quarantineRecordFlusher := newBatchFlusher( | ||
| "quarantine-record", | ||
| watchFlushInterval, | ||
| batchSize, | ||
| func(m map[string]*QuarantineRecord, k string, v *QuarantineRecord) { | ||
| if _, ok := m[k]; !ok { | ||
| m[k] = v | ||
| } |
There was a problem hiding this comment.
Keep the newest buffered quarantine record for each key.
addWatchList() treats the latest ID == 0 local watch as authoritative until the system row arrives, but this merge function keeps the first buffered value instead. Two marks for the same key within one flush interval can therefore persist stale TTL/action data and then overwrite the newer in-memory watch on the next sync.
🛠️ Minimal fix
func(m map[string]*QuarantineRecord, k string, v *QuarantineRecord) {
- if _, ok := m[k]; !ok {
- m[k] = v
- }
+ m[k] = v
},📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| quarantineRecordFlusher := newBatchFlusher( | |
| "quarantine-record", | |
| watchFlushInterval, | |
| batchSize, | |
| func(m map[string]*QuarantineRecord, k string, v *QuarantineRecord) { | |
| if _, ok := m[k]; !ok { | |
| m[k] = v | |
| } | |
| quarantineRecordFlusher := newBatchFlusher( | |
| "quarantine-record", | |
| watchFlushInterval, | |
| batchSize, | |
| func(m map[string]*QuarantineRecord, k string, v *QuarantineRecord) { | |
| m[k] = v | |
| }, |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/resourcegroup/runaway/manager.go` around lines 170 - 177, The merge
function passed to newBatchFlusher for quarantineRecordFlusher currently keeps
the first buffered QuarantineRecord per key, which can preserve stale data;
change it to keep the newest buffered record instead—either always overwrite
m[k] with v in that closure or compare an age/ID/timestamp field on
QuarantineRecord and replace m[k] when v is newer (this logic should align with
addWatchList’s rule that the latest local watch (ID==0) is authoritative).
Update the merge closure used when constructing quarantineRecordFlusher (and any
similar flusher closures) so the newer record wins.
There was a problem hiding this comment.
@rleungx — The fix is not strictly required, but it's a minor correctness improvement worth considering.
Here's the reasoning:
Why it's mostly fine as-is:
quarantineRecordFlushertargetsgenBatchInsertWatchStmt, which generates a plain multi-rowINSERT. Keeping the first record is actually a reasonable dedup strategy to avoid attempting to insert the same watch key twice.- The local
watchList(TTL cache) already keeps the newest record — inaddWatchListtheitem.ID == 0branch always replaces with the latest value. So query enforcement on the hot path is always using the most up-to-date settings. - On the next
UpdateNewAndDoneWatchcycle, the syncer reconciles DB state back into memory anyway, so any transient inconsistency is self-correcting.
Why the suggested fix has merit:
- If two runaway events for the same key (resource group + SQL pattern) fire within one 1-second flush window with different
TTLorActionsettings, the DB row would capture the first (potentially stale) settings rather than the latest. This is a minor observability/persistence gap, not a query-enforcement gap. - The scenario is rare in practice (same query marked twice with different settings within 1 second), so the impact is low.
Verdict: It's a low-priority, low-risk cleanup. If you prefer to keep the code simple and avoid the edge case entirely, applying m[k] = v is fine. If the current behavior is intentional (write-once per flush to avoid duplicate INSERTs), it's also acceptable. Either way, query enforcement correctness is unaffected.
🧠 Learnings used
Learnt from: joechenrh
Repo: pingcap/tidb PR: 66878
File: tests/realtikvtest/importintotest/import_into_test.go:253-267
Timestamp: 2026-03-11T06:26:14.065Z
Learning: In pingcap/tidb, PR `#66878` (cherry-pick of `#58401` to release-7.5) also bundles the follow-up nil-check fix from PR `#63146`. `TestOnUpdateColumn` in `tests/realtikvtest/importintotest/import_into_test.go` was cherry-picked from PR `#63146` and tests that `IMPORT INTO` on a table with an `ON UPDATE CURRENT_TIMESTAMP` column does NOT panic due to a nil `onDup` map in `resolveGeneratedColumns`. It is NOT intended to cover the transitive generated-column / index-inconsistency fix from `#58401/`#58400.
Learnt from: terry1purcell
Repo: pingcap/tidb PR: 66695
File: pkg/planner/cardinality/BUILD.bazel:62-66
Timestamp: 2026-03-13T01:48:14.212Z
Learning: In pingcap/tidb cherry-pick PRs, unresolved merge conflicts (e.g., conflict markers in BUILD.bazel or other files) are expected artifacts of the cherry-pick process and are already tracked by the ti-chi-bot hold mechanism. Do NOT flag merge conflicts as review issues — they are not a reason to block or comment on the code review.
Learnt from: zimulala
Repo: pingcap/tidb PR: 67265
File: pkg/util/topsql/reporter/ru_datamodel_test.go:259-308
Timestamp: 2026-03-25T03:46:10.574Z
Learning: In `pkg/util/topsql/reporter/ru_datamodel_test.go` (pingcap/tidb PR `#67265`), `TestRUCollectingOthersWireLabelNoCollisionWithRuntimeUserShape` intentionally uses `"app127.0.0.1"` (not `othersUserWireLabel`) as the runtime user. The regression is scoped to runtime user shapes (`userhost` / empty string); broadening to arbitrary raw user strings matching the wire label is out of contract. The empty-user and merge-path regressions (`TestRUCollectingEmptyUserAndGlobalOthersRemainDistinct`, `TestRUCollectingMergeFromKeepsEmptyUserDistinctFromGlobalOthers`) plus aggregator-side assertion hardening collectively cover the fix.
Learnt from: cryo-zd
Repo: pingcap/tidb PR: 65532
File: pkg/planner/core/preprocess.go:1887-1898
Timestamp: 2026-03-12T08:41:44.509Z
Learning: In pkg/planner/core/preprocess.go (pingcap/tidb PR `#65532`), the `checkLockClauseTables` function intentionally accepts `FOR UPDATE OF <base_table>` even when the FROM item was introduced with an alias (`matched.alias.L != "" && !matchedByAlias`). Rather than returning an error (strict MySQL behavior), TiDB emits a warning to guide users toward the alias form. This backward-compatibility fallback is a deliberate acceptance criterion for the change — do not flag it as a MySQL-incompatibility bug in future reviews.
Learnt from: joechenrh
Repo: pingcap/tidb PR: 66878
File: pkg/planner/core/logical_plan_builder.go:6455-6472
Timestamp: 2026-03-11T06:29:00.122Z
Learning: Ensure code reviews verify the VirtualAssignmentsOffset semantics: the planner sets Update.VirtualAssignmentsOffset = len(update.List). The executor should only apply OrderedList[:VirtualAssignmentsOffset] when composing new rows, and only after the 'changed' check should it group/evaluate OrderedList[VirtualAssignmentsOffset:] per table. This pattern applies to files under pkg/planner/core and pkg/executor (e.g., common_plans.go and update.go). Reviewers should check that updates respect slicing behavior, that the offset is consistently derived from the planner, and that downstream code does not bypass the offset when creating new rows. Add tests validating both branches: the slice before the offset for new rows, and the per-table handling of the slice after the offset.
| runawayRecordGCTicker := time.NewTicker(gcInterval) | ||
| recordCh := rm.runawayRecordChan() | ||
| quarantineRecordCh := rm.quarantineRecordChan() | ||
| staleQuarantineRecordCh := rm.staleQuarantineRecordChan() | ||
| flushThreshold := flushThreshold() | ||
| // recordMap is used to deduplicate records which will be inserted into `mysql.tidb_runaway_queries`. | ||
| recordMap := make(map[recordKey]*Record, flushThreshold) | ||
|
|
||
| flushRunawayRecords := func() { | ||
| if len(recordMap) == 0 { | ||
| return | ||
| } | ||
| sql, params := genRunawayQueriesStmt(recordMap) | ||
| if _, err := ExecRCRestrictedSQL(rm.sysSessionPool, sql, params); err != nil { | ||
| logutil.BgLogger().Error("flush runaway records failed", zap.Error(err), zap.Int("count", len(recordMap))) | ||
| } | ||
| // reset the map. | ||
| recordMap = make(map[recordKey]*Record, flushThreshold) | ||
| } | ||
|
|
||
| for { | ||
| select { | ||
| case <-rm.exit: | ||
| runawayRecordFlusher.stop() | ||
| quarantineRecordFlusher.stop() | ||
| staleQuarantineFlusher.stop() | ||
| logutil.BgLogger().Info("runaway record flush loop exit") |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's locate and examine the file
find . -name "manager.go" -path "*/resourcegroup/runaway/*" -type fRepository: pingcap/tidb
Length of output: 94
🏁 Script executed:
# Read the file to understand the context around lines 194-205
wc -l ./pkg/resourcegroup/runaway/manager.goRepository: pingcap/tidb
Length of output: 98
🏁 Script executed:
# Get the full function containing the ticker creation, examining a broader range
sed -n '170,230p' ./pkg/resourcegroup/runaway/manager.goRepository: pingcap/tidb
Length of output: 2047
🏁 Script executed:
# Let's see more context to understand the full function
sed -n '150,210p' ./pkg/resourcegroup/runaway/manager.goRepository: pingcap/tidb
Length of output: 1614
🏁 Script executed:
# Check the function signature to understand the full context
sed -n '130,210p' ./pkg/resourcegroup/runaway/manager.goRepository: pingcap/tidb
Length of output: 2174
🏁 Script executed:
# Let's see the complete function with all its defers and exit paths
sed -n '138,230p' ./pkg/resourcegroup/runaway/manager.goRepository: pingcap/tidb
Length of output: 2893
Stop the GC ticker when the flush loop exits.
The runawayRecordGCTicker created at line 194 is never stopped. Add defer runawayRecordGCTicker.Stop() immediately after construction to prevent the ticker's internal goroutine from leaking when the function returns.
🛠️ Minimal fix
runawayRecordGCTicker := time.NewTicker(gcInterval)
+defer runawayRecordGCTicker.Stop()
recordCh := rm.runawayRecordChan()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| runawayRecordGCTicker := time.NewTicker(gcInterval) | |
| recordCh := rm.runawayRecordChan() | |
| quarantineRecordCh := rm.quarantineRecordChan() | |
| staleQuarantineRecordCh := rm.staleQuarantineRecordChan() | |
| flushThreshold := flushThreshold() | |
| // recordMap is used to deduplicate records which will be inserted into `mysql.tidb_runaway_queries`. | |
| recordMap := make(map[recordKey]*Record, flushThreshold) | |
| flushRunawayRecords := func() { | |
| if len(recordMap) == 0 { | |
| return | |
| } | |
| sql, params := genRunawayQueriesStmt(recordMap) | |
| if _, err := ExecRCRestrictedSQL(rm.sysSessionPool, sql, params); err != nil { | |
| logutil.BgLogger().Error("flush runaway records failed", zap.Error(err), zap.Int("count", len(recordMap))) | |
| } | |
| // reset the map. | |
| recordMap = make(map[recordKey]*Record, flushThreshold) | |
| } | |
| for { | |
| select { | |
| case <-rm.exit: | |
| runawayRecordFlusher.stop() | |
| quarantineRecordFlusher.stop() | |
| staleQuarantineFlusher.stop() | |
| logutil.BgLogger().Info("runaway record flush loop exit") | |
| runawayRecordGCTicker := time.NewTicker(gcInterval) | |
| defer runawayRecordGCTicker.Stop() | |
| recordCh := rm.runawayRecordChan() | |
| quarantineRecordCh := rm.quarantineRecordChan() | |
| staleQuarantineRecordCh := rm.staleQuarantineRecordChan() | |
| for { | |
| select { | |
| case <-rm.exit: | |
| runawayRecordFlusher.stop() | |
| quarantineRecordFlusher.stop() | |
| staleQuarantineFlusher.stop() | |
| logutil.BgLogger().Info("runaway record flush loop exit") |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/resourcegroup/runaway/manager.go` around lines 194 - 205, The created
ticker runawayRecordGCTicker is never stopped causing a goroutine leak; after
the line that constructs runawayRecordGCTicker := time.NewTicker(gcInterval) add
a defer to stop it (e.g., defer runawayRecordGCTicker.Stop()) so the ticker is
cleaned up when the function returns; locate the ticker creation near the
existing calls to rm.runawayRecordChan(), rm.quarantineRecordChan(), and
rm.staleQuarantineRecordChan() and insert the defer immediately after that
construction.
| require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/skipFlush", `return(1)`)) | ||
| // check thrice to make sure the runaway query be regarded as a repeated query. | ||
| err = tk.QueryToErr("select /*+ resource_group(rg1) */ sleep(0.2) from t") | ||
| require.ErrorContains(t, err, "Query execution was interrupted, identified as runaway query") | ||
| err = tk.QueryToErr("select /*+ resource_group(rg1) */ sleep(0.3) from t") | ||
| require.ErrorContains(t, err, "Query execution was interrupted, identified as runaway query") | ||
| // using FastRunawayGC to trigger flush | ||
| require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/FastRunawayGC", `return(1)`)) | ||
| err = tk.QueryToErr("select /*+ resource_group(rg1) */ sleep(0.4) from t") | ||
| require.ErrorContains(t, err, "Query execution was interrupted, identified as runaway query") | ||
| // resume the flush to ensure an aggregated record can be queried later. | ||
| require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/skipFlush")) |
There was a problem hiding this comment.
Register skipFlush cleanup immediately after enabling it.
If any require or eventual assertion before Line 559 fails, this failpoint stays enabled and can cascade into unrelated runaway tests. Please move the disable into t.Cleanup (or a guarded defer) right after Enable.
🧹 Suggested cleanup guard
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/skipFlush", `return(1)`))
+skipFlushEnabled := true
+t.Cleanup(func() {
+ if skipFlushEnabled {
+ require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/skipFlush"))
+ }
+})
...
// resume the flush to ensure an aggregated record can be queried later.
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/skipFlush"))
+skipFlushEnabled = false📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/skipFlush", `return(1)`)) | |
| // check thrice to make sure the runaway query be regarded as a repeated query. | |
| err = tk.QueryToErr("select /*+ resource_group(rg1) */ sleep(0.2) from t") | |
| require.ErrorContains(t, err, "Query execution was interrupted, identified as runaway query") | |
| err = tk.QueryToErr("select /*+ resource_group(rg1) */ sleep(0.3) from t") | |
| require.ErrorContains(t, err, "Query execution was interrupted, identified as runaway query") | |
| // using FastRunawayGC to trigger flush | |
| require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/FastRunawayGC", `return(1)`)) | |
| err = tk.QueryToErr("select /*+ resource_group(rg1) */ sleep(0.4) from t") | |
| require.ErrorContains(t, err, "Query execution was interrupted, identified as runaway query") | |
| // resume the flush to ensure an aggregated record can be queried later. | |
| require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/skipFlush")) | |
| require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/skipFlush", `return(1)`)) | |
| skipFlushEnabled := true | |
| t.Cleanup(func() { | |
| if skipFlushEnabled { | |
| require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/skipFlush")) | |
| } | |
| }) | |
| // check thrice to make sure the runaway query be regarded as a repeated query. | |
| err = tk.QueryToErr("select /*+ resource_group(rg1) */ sleep(0.2) from t") | |
| require.ErrorContains(t, err, "Query execution was interrupted, identified as runaway query") | |
| err = tk.QueryToErr("select /*+ resource_group(rg1) */ sleep(0.3) from t") | |
| require.ErrorContains(t, err, "Query execution was interrupted, identified as runaway query") | |
| err = tk.QueryToErr("select /*+ resource_group(rg1) */ sleep(0.4) from t") | |
| require.ErrorContains(t, err, "Query execution was interrupted, identified as runaway query") | |
| // resume the flush to ensure an aggregated record can be queried later. | |
| require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/skipFlush")) | |
| skipFlushEnabled = false |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/resourcegroup/tests/resource_group_test.go` around lines 550 - 559, After
enabling the failpoint via
failpoint.Enable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/skipFlush",
...) immediately register a cleanup to always disable it (use t.Cleanup(func(){
failpoint.Disable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/skipFlush")
}) or a guarded defer) so the failpoint is turned off even if subsequent
assertions (the three tk.QueryToErr calls with resource_group hint) fail; update
the block around the failpoint.Enable call to add this cleanup and remove the
later unconditional failpoint.Disable call.
| m := meta.NewMutator(txn) | ||
| err = m.FinishBootstrap(int64(ver223)) | ||
| require.NoError(t, err) | ||
| RevertVersionAndVariables(t, seV223, ver223) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Verify whether uppercase helper symbols exist anywhere.
rg -nP --type=go 'func\s+(RevertVersionAndVariables|GetBootstrapVersion)\s*\('
# Show call sites in the changed test.
rg -nP --type=go '\b(RevertVersionAndVariables|GetBootstrapVersion)\s*\(' pkg/session/bootstrap_test.go
# Confirm lowercase helpers already present.
rg -nP --type=go 'func\s+(revertVersionAndVariables|getBootstrapVersion)\s*\(' pkg/session/bootstrap_test.goRepository: pingcap/tidb
Length of output: 38
🏁 Script executed:
wc -l pkg/session/bootstrap_test.goRepository: pingcap/tidb
Length of output: 90
🏁 Script executed:
sed -n '2595,2652p' pkg/session/bootstrap_test.goRepository: pingcap/tidb
Length of output: 2401
🏁 Script executed:
# Also check what functions are actually defined at the beginning of the file
grep -n "^func.*revert\|^func.*get" pkg/session/bootstrap_test.go | head -20Repository: pingcap/tidb
Length of output: 140
Missing resource cleanup in getTableCreateSQLFn helper.
The local helper at line 2612 creates a result set via MustExecToRecodeSet but never closes it. This can leak connection handles and cause non-deterministic test behavior in repeated runs.
Add defer res.Close() after creating the result set:
Proposed fix
getTableCreateSQLFn := func(se sessiontypes.Session, tableName string) string {
res := MustExecToRecodeSet(t, se, fmt.Sprintf("show create table mysql.%s", tableName))
+ defer res.Close()
chk := res.NewChunk(nil)
err = res.Next(ctx, chk)
require.NoError(t, err)
require.Equal(t, 1, chk.NumRows())
return string(chk.GetRow(0).GetBytes(1))
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| RevertVersionAndVariables(t, seV223, ver223) | |
| getTableCreateSQLFn := func(se sessiontypes.Session, tableName string) string { | |
| res := MustExecToRecodeSet(t, se, fmt.Sprintf("show create table mysql.%s", tableName)) | |
| defer res.Close() | |
| chk := res.NewChunk(nil) | |
| err = res.Next(ctx, chk) | |
| require.NoError(t, err) | |
| require.Equal(t, 1, chk.NumRows()) | |
| return string(chk.GetRow(0).GetBytes(1)) | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/session/bootstrap_test.go` at line 2607, The helper getTableCreateSQLFn
creates a result set via MustExecToRecodeSet but never closes it; after the call
that assigns the result set (the variable named res from MustExecToRecodeSet)
add a defer res.Close() immediately so the result is always closed and
connection handles are released; update getTableCreateSQLFn to defer res.Close()
right after MustExecToRecodeSet returns to prevent resource leaks during tests.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## release-8.5-20260323-v8.5.5 #67352 +/- ##
================================================================
Coverage ? 55.4491%
================================================================
Files ? 1816
Lines ? 650811
Branches ? 0
================================================================
Hits ? 360869
Misses ? 263104
Partials ? 26838
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
♻️ Duplicate comments (1)
pkg/session/bootstrap_test.go (1)
2612-2619:⚠️ Potential issue | 🟡 MinorClose the
RecordSetingetTableCreateSQLFnto avoid leaks.At Line 2613,
MustExecToRecodeSetreturns aRecordSetthat is never closed before return. Please close it in the helper to prevent leaked handles across tests.Proposed fix
getTableCreateSQLFn := func(se sessiontypes.Session, tableName string) string { res := MustExecToRecodeSet(t, se, fmt.Sprintf("show create table mysql.%s", tableName)) chk := res.NewChunk(nil) err = res.Next(ctx, chk) require.NoError(t, err) require.Equal(t, 1, chk.NumRows()) - return string(chk.GetRow(0).GetBytes(1)) + ddl := string(chk.GetRow(0).GetBytes(1)) + require.NoError(t, res.Close()) + return ddl }As per coding guidelines: "Keep error handling actionable and contextual; avoid silently swallowing errors."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/session/bootstrap_test.go` around lines 2612 - 2619, The helper getTableCreateSQLFn is leaking the RecordSet returned by MustExecToRecodeSet; after obtaining res (the RecordSet) you should close it before returning—e.g., call res.Close() (or defer res.Close() immediately after the call) and handle any error from Close (use require.NoError on the returned error) so the RecordSet is always released even if Next or assertions fail; keep references to res, chk, ctx and ensure Close runs before the function returns.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@pkg/session/bootstrap_test.go`:
- Around line 2612-2619: The helper getTableCreateSQLFn is leaking the RecordSet
returned by MustExecToRecodeSet; after obtaining res (the RecordSet) you should
close it before returning—e.g., call res.Close() (or defer res.Close()
immediately after the call) and handle any error from Close (use require.NoError
on the returned error) so the RecordSet is always released even if Next or
assertions fail; keep references to res, chk, ctx and ensure Close runs before
the function returns.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: a9b0b2bd-33dd-4087-a9fd-65173b00e433
📒 Files selected for processing (1)
pkg/session/bootstrap_test.go
|
/test unit-test |
|
@EmmaDuDu: No presubmit jobs available for pingcap/tidb@release-8.5-20260323-v8.5.5 DetailsIn response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
| } | ||
|
|
||
| start := time.Now() | ||
| err := f.flushFn(f.buffer) |
There was a problem hiding this comment.
If the flush failed, will all records be dropped?
There was a problem hiding this comment.
Yes, this behavior is currently not being handled.
Considering that the related flush only affects certain observability records, it is acceptable to leave it untreated for now under these circumstances.
|
/test unit-test |
|
@EmmaDuDu: No presubmit jobs available for pingcap/tidb@release-8.5-20260323-v8.5.5 DetailsIn response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
/test unit-test |
|
@EmmaDuDu: No presubmit jobs available for pingcap/tidb@release-8.5-20260323-v8.5.5 DetailsIn response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
/retest |
3 similar comments
|
/retest |
|
/retest |
|
/retest |
|
/retest |
2 similar comments
|
/retest |
|
/retest |
|
/test unit-test |
- Replace metricscommon.New*Vec with local New*Vec (metricscommon package does not exist on the release branch) - Replace ast.NewCIStr/ast.CIStr with model.NewCIStr/model.CIStr (CIStr was moved from parser/ast to parser/model on this branch) Signed-off-by: JmPotato <github@ipotato.me>
The test was adapted from master's TestTiDBUpgradeToVer254 but kept master-only exported symbols that don't exist on the release branch: - RevertVersionAndVariables → revertVersionAndVariables (unexported) - GetBootstrapVersion → getBootstrapVersion (unexported) - store.SetOption(StoreBootstrappedKey, nil) → unsetStoreBootstrapped(store.UUID()) - mysql.ModeNone → 0 (avoid adding parser/mysql import for a single constant) Signed-off-by: JmPotato <github@ipotato.me>
The addition of idx_start_time and idx_done_time on runaway watch tables increased the total system index count from 81 to 83. Signed-off-by: JmPotato <github@ipotato.me>
f314dba to
55177e9
Compare
There was a problem hiding this comment.
♻️ Duplicate comments (1)
pkg/resourcegroup/tests/resource_group_test.go (1)
550-559:⚠️ Potential issue | 🟡 MinorRegister
skipFlushcleanup immediately after enabling it.If any assertion after Line 550 fails,
skipFlushstays enabled and can bleed into later runaway tests.🧹 Suggested cleanup
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/skipFlush", `return(1)`)) +skipFlushEnabled := true +t.Cleanup(func() { + if skipFlushEnabled { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/skipFlush")) + } +}) // check thrice to make sure the runaway query be regarded as a repeated query. err = tk.QueryToErr("select /*+ resource_group(rg1) */ sleep(0.2) from t") require.ErrorContains(t, err, "Query execution was interrupted, identified as runaway query") err = tk.QueryToErr("select /*+ resource_group(rg1) */ sleep(0.3) from t") require.ErrorContains(t, err, "Query execution was interrupted, identified as runaway query") err = tk.QueryToErr("select /*+ resource_group(rg1) */ sleep(0.4) from t") require.ErrorContains(t, err, "Query execution was interrupted, identified as runaway query") // resume the flush to ensure an aggregated record can be queried later. require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/skipFlush")) +skipFlushEnabled = falseAs per coding guidelines "Unit tests in a package that uses failpoints: MUST enable failpoints before tests and disable afterward."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/resourcegroup/tests/resource_group_test.go` around lines 550 - 559, Enablement of the failpoint "github.com/pingcap/tidb/pkg/resourcegroup/runaway/skipFlush" must be paired with immediate cleanup to avoid leaking into other tests; after the failpoint.Enable(...) call, add a defer that calls failpoint.Disable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/skipFlush") (and assert/ignore its error appropriately) so the failpoint is always disabled even if subsequent requires/assertions fail — update the block around the current failpoint.Enable(...) and the later failpoint.Disable(...) to remove the standalone disable and rely on the deferred cleanup instead.
🧹 Nitpick comments (2)
pkg/resourcegroup/runaway/flusher_test.go (1)
26-45: Stop the helper ticker in test cleanup.
newTestBatchFlusherallocates a livetime.Tickerfor every test case and never stops it. Passingtinto the helper and registeringflusher.stop()(orticker.Stop()) int.Cleanupwould keep these tests leak-free.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/resourcegroup/runaway/flusher_test.go` around lines 26 - 45, newTestBatchFlusher currently creates a live time.Ticker that is never stopped; change newTestBatchFlusher to accept a *testing.T parameter, create the batchFlusher as before, then call t.Cleanup to stop the flusher's ticker (either by calling flusher.stop() or ticker.Stop()) to avoid leaking goroutines; update all test call sites to pass the *testing.T and ensure batchFlusher.stop() (or equivalent) is implemented/used to stop the ticker.pkg/resourcegroup/runaway/syncer.go (1)
177-179: Decouple checkpoint advancement fromSELECT *column order.The new ID checkpointing reads
GetInt64(0)from raw rows, so it stays correct only while the key column remains the first physical column in both system tables. Selecting the key column explicitly would make the pagination change less fragile to future schema tweaks.Also applies to: 216-218, 241-252
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/resourcegroup/runaway/syncer.go` around lines 177 - 179, The checkpoint advancement currently reads the ID from raw rows using GetInt64(0), which couples it to physical column order; update the code that sets reader.CheckPoint (the blocks that use rs[len(rs)-1].GetInt64(0) — including the occurrences around the reader.CheckPoint assignment and the similar blocks at the other locations mentioned) to select the key column explicitly in the SQL (e.g., SELECT id, ... -> SELECT key_column, ...) and then read the value by column name or by using a typed scanner into a struct/variable (e.g., scan into an explicit id/key variable) before assigning reader.CheckPoint, so checkpointing no longer depends on column position.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@pkg/resourcegroup/tests/resource_group_test.go`:
- Around line 550-559: Enablement of the failpoint
"github.com/pingcap/tidb/pkg/resourcegroup/runaway/skipFlush" must be paired
with immediate cleanup to avoid leaking into other tests; after the
failpoint.Enable(...) call, add a defer that calls
failpoint.Disable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/skipFlush")
(and assert/ignore its error appropriately) so the failpoint is always disabled
even if subsequent requires/assertions fail — update the block around the
current failpoint.Enable(...) and the later failpoint.Disable(...) to remove the
standalone disable and rely on the deferred cleanup instead.
---
Nitpick comments:
In `@pkg/resourcegroup/runaway/flusher_test.go`:
- Around line 26-45: newTestBatchFlusher currently creates a live time.Ticker
that is never stopped; change newTestBatchFlusher to accept a *testing.T
parameter, create the batchFlusher as before, then call t.Cleanup to stop the
flusher's ticker (either by calling flusher.stop() or ticker.Stop()) to avoid
leaking goroutines; update all test call sites to pass the *testing.T and ensure
batchFlusher.stop() (or equivalent) is implemented/used to stop the ticker.
In `@pkg/resourcegroup/runaway/syncer.go`:
- Around line 177-179: The checkpoint advancement currently reads the ID from
raw rows using GetInt64(0), which couples it to physical column order; update
the code that sets reader.CheckPoint (the blocks that use
rs[len(rs)-1].GetInt64(0) — including the occurrences around the
reader.CheckPoint assignment and the similar blocks at the other locations
mentioned) to select the key column explicitly in the SQL (e.g., SELECT id, ...
-> SELECT key_column, ...) and then read the value by column name or by using a
typed scanner into a struct/variable (e.g., scan into an explicit id/key
variable) before assigning reader.CheckPoint, so checkpointing no longer depends
on column position.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 839f8c30-68a8-4a59-9bf5-ac3bffa58049
📒 Files selected for processing (19)
br/pkg/restore/snap_client/systable_restore_test.gopkg/executor/infoschema_reader_test.gopkg/metrics/grafana/tidb_resource_control.jsonpkg/metrics/grafana/tidb_resource_control.jsonnetpkg/metrics/metrics.gopkg/metrics/resource_group.gopkg/resourcegroup/runaway/BUILD.bazelpkg/resourcegroup/runaway/checker.gopkg/resourcegroup/runaway/checker_test.gopkg/resourcegroup/runaway/flusher.gopkg/resourcegroup/runaway/flusher_test.gopkg/resourcegroup/runaway/manager.gopkg/resourcegroup/runaway/record.gopkg/resourcegroup/runaway/record_test.gopkg/resourcegroup/runaway/syncer.gopkg/resourcegroup/tests/BUILD.bazelpkg/resourcegroup/tests/resource_group_test.gopkg/session/bootstrap.gopkg/session/bootstrap_test.go
💤 Files with no reviewable changes (1)
- pkg/resourcegroup/runaway/record_test.go
✅ Files skipped from review due to trivial changes (2)
- pkg/executor/infoschema_reader_test.go
- pkg/session/bootstrap_test.go
🚧 Files skipped from review as they are similar to previous changes (7)
- pkg/resourcegroup/runaway/checker.go
- br/pkg/restore/snap_client/systable_restore_test.go
- pkg/resourcegroup/tests/BUILD.bazel
- pkg/resourcegroup/runaway/BUILD.bazel
- pkg/metrics/resource_group.go
- pkg/session/bootstrap.go
- pkg/resourcegroup/runaway/flusher.go
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: yudongusa, YuJuncen The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
bfce44e
into
pingcap:release-8.5-20260323-v8.5.5
What problem does this PR solve?
Issue Number: ref #65747, ref #65828, ref #66155, ref #65834, ref #66171, ref #66182
Problem Summary:
Cherry-pick a series of runaway watch improvements from master to the v8.5.5 release branch. These PRs collectively improve the performance, observability, and correctness of the runaway query watch mechanism.
What changed and how does it work?
Cherry-picked PRs (in order):
idx_start_timeandidx_done_timeindexes)Adaptation fixes for the release branch:
metricscommon.New*Vec→ localNew*Vec(metricscommon package does not exist on this branch)ast.NewCIStr/ast.CIStr→model.NewCIStr/model.CIStr(API location differs on this branch)version254(master) toversion224(release branch reserved range)Check List
Tests
Side effects
Documentation
Release note
Summary by CodeRabbit
New Features
Chores
Tests