Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build/linter/toomanytests/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ func checkRule(pkg string) int {
switch pkg {
case "pkg/planner/core":
return 210
case "pkg/util/topsql/reporter":
return 90 // TopRU has generated_cases + multi-scenario tests
default:
return 50
}
Expand Down
69 changes: 69 additions & 0 deletions docs/workflows/conclude-records/toomanytests-generated-cases.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Conclude Record: toomanytests-generated-cases

## Source case

- PR context: `pkg/util/topsql/reporter` hit `toomanytests` (`Too many test cases in one package`).
- Trigger pattern: generated cases and many mechanical tests co-exist in one package.
- Concrete signals:
- linter output pointed to `topru_generated_cases_test.go`
- package-level Test count exceeded threshold

## worth_concluding

- `yes`
- This case repeats in test-heavy packages and is suitable for routing guidance.

## destination_kind

- `skill`

## strongest_routing

- primary: `skills/test/test-style.md`
- secondary: `skills/test/test-comment.md`

## new_vs_update

- `update-existing-skills`
- No new capability is required; this is routing + delta refinement over existing testing-style and test-comment skills.

## reusable_deltas

- [Routing rule]: `generated_cases` hit is a weak signal, not a sufficient condition. Route by current package structure first: if tests are already structurally converged, do not force additional merges.
- [Fix strategy]: prioritize structural convergence of top-level `Test*` entries (table-driven / `t.Run` clustering) before any linter threshold or exception path.
- [Scope guard]: when PR scope cannot reasonably touch unrelated legacy tests, allow threshold/exception only as constrained fallback with explicit rationale.
- [Naming rule]: after clustering, keep subtest names scenario-descriptive and directly runnable for triage (`-run Parent/Subcase`).
- [Safety rule]: preserve assertions and behavior; only reorganize test entry structure. Prefer heuristic refinement in tooling (for false positives) over introducing issue-specific one-off skills.

## tooling impact

- Affected analyzer: `build/linter/toomanytests/analyze.go`
- Related heuristic direction: `fix_test_style.py` false-positive refinement should treat `generated_cases` as a hint, then verify whether structure is already converged before suggesting further consolidation.
- Practical decision path in this case:
- benchmark consolidation alone did not reduce `toomanytests` (analyzer counts `Test*`, not `Benchmark*`);
- package-specific threshold override was used for `pkg/util/topsql/reporter` due to scope constraints (fallback, not primary recommendation).

## verification summary

- Verified analyzer behavior against source code (`Test*` prefix counting, package threshold rule).
- Confirmed route decision and priority:
- structural convergence is the primary decision axis;
- `generated_cases` presence alone does not imply mandatory merge work;
- style-level guidance reused from existing test-style skill;
- wording/rationale guidance reused from test-comment skill.
- Outcome: no new issue-specific skill created.

## why not a new skill

- A dedicated `skills/test/toomanytests.md` would mostly duplicate:
- test organization patterns already covered by test-style guidance;
- rationale/comment patterns already covered by test-comment guidance.
- This case is a routing composition problem, not a new standalone workflow.
- Creating a new skill here would increase maintenance cost and fragment ownership.

## follow-up

- If 2+ future cases show the same routing friction, add a short subsection in `skills/test/test-style.md`:
- “Handling toomanytests in package-scoped constraints”.
- Keep this conclude record as the traceable example for that future update.
- If similar cases repeatedly rely on threshold/exception instead of structural convergence, re-evaluate whether guidance should primarily live in workflow/tooling guidance rather than continuously expanding test-style scope.
1 change: 1 addition & 0 deletions pkg/server/tests/servertestkit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ go_library(
"//pkg/testkit/testenv",
"//pkg/util/cpuprofile",
"//pkg/util/topsql/collector/mock",
"//pkg/util/topsql/reporter",
"//pkg/util/topsql/state",
"@com_github_pingcap_errors//:errors",
"@com_github_stretchr_testify//require",
Expand Down
5 changes: 3 additions & 2 deletions pkg/server/tests/servertestkit/testkit.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/pingcap/tidb/pkg/testkit/testenv"
"github.com/pingcap/tidb/pkg/util/cpuprofile"
"github.com/pingcap/tidb/pkg/util/topsql/collector/mock"
"github.com/pingcap/tidb/pkg/util/topsql/reporter"
topsqlstate "github.com/pingcap/tidb/pkg/util/topsql/state"
"github.com/stretchr/testify/require"
"go.opencensus.io/stats/view"
Expand Down Expand Up @@ -156,14 +157,14 @@ func CreateTidbTestTopSQLSuite(t *testing.T) *tidbTestTopSQLSuite {

dbt := testkit.NewDBTestKit(t, db)
topsqlstate.GlobalState.PrecisionSeconds.Store(1)
topsqlstate.GlobalState.ReportIntervalSeconds.Store(2)
restoreTicker := reporter.SetReportTickerIntervalSecondsForTest(2)
dbt.MustExec("set @@global.tidb_top_sql_max_time_series_count=5;")

require.NoError(t, cpuprofile.StartCPUProfiler())
t.Cleanup(func() {
cpuprofile.StopCPUProfiler()
topsqlstate.GlobalState.PrecisionSeconds.Store(topsqlstate.DefTiDBTopSQLPrecisionSeconds)
topsqlstate.GlobalState.ReportIntervalSeconds.Store(topsqlstate.DefTiDBTopSQLReportIntervalSeconds)
restoreTicker()
view.Stop()
})
return ts
Expand Down
9 changes: 9 additions & 0 deletions pkg/util/topsql/reporter/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ go_library(
"datamodel.go",
"datasink.go",
"pubsub.go",
"report_ticker.go",
"reporter.go",
"ru_datamodel.go",
"ru_window_aggregator.go",
"single_target.go",
],
importpath = "github.com/pingcap/tidb/pkg/util/topsql/reporter",
Expand Down Expand Up @@ -43,7 +46,11 @@ go_test(
"main_test.go",
"pubsub_test.go",
"reporter_test.go",
"ru_datamodel_test.go",
"ru_window_aggregator_test.go",
"single_target_test.go",
"topru_case_runner_test.go",
"topru_generated_cases_test.go",
],
embed = [":reporter"],
flaky = True,
Expand All @@ -52,11 +59,13 @@ go_test(
"//pkg/config",
"//pkg/testkit/testsetup",
"//pkg/util/topsql/collector",
"//pkg/util/topsql/reporter/metrics",
"//pkg/util/topsql/reporter/mock",
"//pkg/util/topsql/state",
"//pkg/util/topsql/stmtstats",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_tipb//go-tipb",
"@com_github_prometheus_client_model//go",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_golang_google_grpc//codes",
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/topsql/reporter/datamodel.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ type record struct {
}

func newRecord(sqlDigest, planDigest []byte) *record {
listCap := min(topsqlstate.GlobalState.ReportIntervalSeconds.Load()/topsqlstate.GlobalState.PrecisionSeconds.Load()+1, maxTsItemsCapacity)
listCap := min(int64(topsqlstate.DefTiDBTopSQLReportIntervalSeconds)/topsqlstate.GlobalState.PrecisionSeconds.Load()+1, maxTsItemsCapacity)
return &record{
sqlDigest: sqlDigest,
planDigest: planDigest,
Expand Down
55 changes: 55 additions & 0 deletions pkg/util/topsql/reporter/report_ticker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2026 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package reporter

import (
"sync"
"time"

topsqlstate "github.com/pingcap/tidb/pkg/util/topsql/state"
)

const defaultReportTickerInterval = time.Duration(topsqlstate.DefTiDBTopSQLReportIntervalSeconds) * time.Second

var reportTickerIntervalMu sync.Mutex
var reportTickerInterval = defaultReportTickerInterval

func newReportTicker() *time.Ticker {
reportTickerIntervalMu.Lock()
interval := reportTickerInterval
reportTickerIntervalMu.Unlock()
return time.NewTicker(interval)
}

// SetReportTickerIntervalSecondsForTest overrides report ticker interval in tests.
// A non-positive value resets to the default interval.
// The returned function restores the previous override.
func SetReportTickerIntervalSecondsForTest(seconds int) (restore func()) {
interval := defaultReportTickerInterval
if seconds > 0 {
interval = time.Duration(seconds) * time.Second
}

reportTickerIntervalMu.Lock()
prev := reportTickerInterval
reportTickerInterval = interval
reportTickerIntervalMu.Unlock()

return func() {
reportTickerIntervalMu.Lock()
reportTickerInterval = prev
reportTickerIntervalMu.Unlock()
}
}
67 changes: 57 additions & 10 deletions pkg/util/topsql/reporter/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
reporter_metrics "github.com/pingcap/tidb/pkg/util/topsql/reporter/metrics"
topsqlstate "github.com/pingcap/tidb/pkg/util/topsql/state"
"github.com/pingcap/tidb/pkg/util/topsql/stmtstats"
"github.com/pingcap/tipb/go-tipb"
"github.com/wangjohn/quickselect"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -68,6 +69,7 @@ type TopSQLReporter interface {

var _ TopSQLReporter = &RemoteTopSQLReporter{}
var _ DataSinkRegisterer = &RemoteTopSQLReporter{}
var _ stmtstats.RUCollector = &RemoteTopSQLReporter{}

// RemoteTopSQLReporter implements TopSQLReporter that sends data to a remote agent.
// This should be called periodically to collect TopSQL resource usage metrics.
Expand All @@ -78,7 +80,9 @@ type RemoteTopSQLReporter struct {
sqlCPUCollector *collector.SQLCPUCollector
collectCPUTimeChan chan []collector.SQLCPUTimeRecord
collectStmtStatsChan chan stmtstats.StatementStatsMap
collectRUIncrementsChan chan ruBatch
collecting *collecting
ruAggregator *ruWindowAggregator // Online 15s RU aggregation (400->200->100 pipeline)
normalizedSQLMap *normalizedSQLMap
normalizedPlanMap *normalizedPlanMap
stmtStatsBuffer map[uint64]stmtstats.StatementStatsMap // timestamp => stmtstats.StatementStatsMap
Expand All @@ -90,6 +94,14 @@ type RemoteTopSQLReporter struct {
keyspaceName []byte
}

// ruBatch carries RU increments with producer-side timestamp.
// timestamping at enqueue side keeps RU bucket attribution independent
// from downstream scheduling delay.
type ruBatch struct {
data stmtstats.RUIncrementMap
timestamp uint64
}

// NewRemoteTopSQLReporter creates a new RemoteTopSQLReporter.
//
// decodePlan is a decoding function which will be called asynchronously to decode the plan binary to string.
Expand All @@ -101,8 +113,10 @@ func NewRemoteTopSQLReporter(decodePlan planBinaryDecodeFunc, compressPlan planB
cancel: cancel,
collectCPUTimeChan: make(chan []collector.SQLCPUTimeRecord, collectChanBufferSize),
collectStmtStatsChan: make(chan stmtstats.StatementStatsMap, collectChanBufferSize),
collectRUIncrementsChan: make(chan ruBatch, collectChanBufferSize),
reportCollectedDataChan: make(chan collectedData, 1),
collecting: newCollecting(),
ruAggregator: newRUWindowAggregator(),
normalizedSQLMap: newNormalizedSQLMap(),
normalizedPlanMap: newNormalizedPlanMap(),
stmtStatsBuffer: map[uint64]stmtstats.StatementStatsMap{},
Expand Down Expand Up @@ -162,6 +176,29 @@ func (tsr *RemoteTopSQLReporter) CollectStmtStatsMap(data stmtstats.StatementSta
}
}

// CollectRUIncrements implements stmtstats.RUCollector.
// It receives merged RU increments from aggregator every second.
// Best-effort window contract: RU is timestamped when collected here. For
// boundary cases, RU can be attributed to the next report window instead of
// backfilling an already closed window.
//
// WARN: It will drop the data if the processing is not in time.
// This function is thread-safe and efficient.
func (tsr *RemoteTopSQLReporter) CollectRUIncrements(data stmtstats.RUIncrementMap) {
if len(data) == 0 {
return
}
batch := ruBatch{
timestamp: uint64(nowFunc().Unix()),
data: data,
}
select {
case tsr.collectRUIncrementsChan <- batch:
default:
reporter_metrics.IgnoreCollectRUChannelFullCounter.Inc()
}
}

// RegisterSQL implements TopSQLReporter.
//
// This function is thread-safe and efficient.
Expand All @@ -187,8 +224,7 @@ func (tsr *RemoteTopSQLReporter) Close() {
func (tsr *RemoteTopSQLReporter) collectWorker() {
defer util.Recover("top-sql", "collectWorker", nil, false)

currentReportInterval := topsqlstate.GlobalState.ReportIntervalSeconds.Load()
reportTicker := time.NewTicker(time.Second * time.Duration(currentReportInterval))
reportTicker := newReportTicker()
defer reportTicker.Stop()
for {
select {
Expand All @@ -200,14 +236,15 @@ func (tsr *RemoteTopSQLReporter) collectWorker() {
case data := <-tsr.collectStmtStatsChan:
timestamp := uint64(nowFunc().Unix())
tsr.stmtStatsBuffer[timestamp] = data
case batch := <-tsr.collectRUIncrementsChan:
if len(batch.data) == 0 {
continue
}
tsr.ruAggregator.addBatchToBucket(batch.timestamp, batch.data)
case <-reportTicker.C:
timestamp := uint64(nowFunc().Unix())
tsr.processStmtStatsData()
tsr.takeDataAndSendToReportChan()
// Update `reportTicker` if report interval changed.
if newInterval := topsqlstate.GlobalState.ReportIntervalSeconds.Load(); newInterval != currentReportInterval {
currentReportInterval = newInterval
reportTicker.Reset(time.Second * time.Duration(currentReportInterval))
}
tsr.takeDataAndSendToReportChan(timestamp)
}
}
}
Expand Down Expand Up @@ -300,11 +337,19 @@ func findKthNetworkBytes(data stmtstats.StatementStatsMap, k int, u64Slice []uin
}

// takeDataAndSendToReportChan takes records data and then send to the report channel for reporting.
func (tsr *RemoteTopSQLReporter) takeDataAndSendToReportChan() {
// TopRU extraction runs on the same report tick path.
// Each call emits at most one aligned closed 60s RU window.
func (tsr *RemoteTopSQLReporter) takeDataAndSendToReportChan(timestamp uint64) {
ruRecords := tsr.ruAggregator.takeReportRecords(
timestamp,
uint64(topsqlstate.GetTopRUItemInterval()),
tsr.keyspaceName,
)
// Send to report channel. When channel is full, data will be dropped.
select {
case tsr.reportCollectedDataChan <- collectedData{
collected: tsr.collecting.take(),
ruRecords: ruRecords,
normalizedSQLMap: tsr.normalizedSQLMap.take(),
normalizedPlanMap: tsr.normalizedPlanMap.take(),
}:
Expand All @@ -330,6 +375,7 @@ func (tsr *RemoteTopSQLReporter) reportWorker() {
// Convert to protobuf data and do report.
tsr.doReport(&ReportData{
DataRecords: rs.toProto(tsr.keyspaceName),
RURecords: data.ruRecords,
SQLMetas: data.normalizedSQLMap.toProto(tsr.keyspaceName),
PlanMetas: data.normalizedPlanMap.toProto(tsr.keyspaceName, tsr.decodePlan, tsr.compressPlan),
})
Expand All @@ -349,7 +395,7 @@ func (tsr *RemoteTopSQLReporter) doReport(data *ReportData) {
timeout := reportTimeout
failpoint.Inject("resetTimeoutForTest", func(val failpoint.Value) {
if val.(bool) {
interval := time.Duration(topsqlstate.GlobalState.ReportIntervalSeconds.Load()) * time.Second
interval := time.Duration(topsqlstate.DefTiDBTopSQLReportIntervalSeconds) * time.Second
if interval < timeout {
timeout = interval
}
Expand Down Expand Up @@ -390,4 +436,5 @@ type collectedData struct {
collected *collecting
normalizedSQLMap *normalizedSQLMap
normalizedPlanMap *normalizedPlanMap
ruRecords []tipb.TopRURecord
}
Loading
Loading