planner: cache mpp info to avoid frequent grpc calling#65694
planner: cache mpp info to avoid frequent grpc calling#65694xzhangxian1008 wants to merge 9 commits into
Conversation
|
Hi @xzhangxian1008. Thanks for your PR. PRs from untrusted users cannot be marked as trusted with I understand the commands that are listed here. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
/run-check-issue-triage-complete |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #65694 +/- ##
================================================
- Coverage 77.6914% 77.3365% -0.3549%
================================================
Files 2016 1945 -71
Lines 551900 540986 -10914
================================================
- Hits 428779 418380 -10399
- Misses 121375 122199 +824
+ Partials 1746 407 -1339
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
|
/cc @copilot |
|
/cc @windtalker @solotzg |
| } | ||
|
|
||
| if len(uncachedServersInfo) > 0 { | ||
| ch := make(chan [][]types.Datum, len(uncachedServersInfo)) |
There was a problem hiding this comment.
Why use chan?
deleted
| waitWg := &sync.WaitGroup{} | ||
| waitWg.Add(len(uncachedServersInfo)) | ||
| for i := range uncachedServersInfo { | ||
| go func(info []infoschema.ServerInfo) { |
There was a problem hiding this comment.
Consider fetching once for the full slice, or pass the single-server slice into the call and use the row’s address only when the fetch is truly per-server.
The extra goroutine layer may be unnecessary since FetchClusterServerInfoWithoutPrivilegeCheck already fans out internally.
There was a problem hiding this comment.
Consider fetching once for the full slice, or pass the single-server slice into the call and use the row’s address only when the fetch is truly per-server.
The extra goroutine layer may be unnecessary since
FetchClusterServerInfoWithoutPrivilegeCheckalready fans out internally.
fixed
|
/hold |
|
@solotzg: adding LGTM is restricted to approvers and reviewers in OWNERS files. DetailsIn response to this: Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
| infos := infoschema.FetchClusterServerInfoWithoutPrivilegeCheck(ctx, sctx.GetSessionVars(), serversInfo, e.serverInfoType, true) | ||
| rowCount := 0 | ||
| for _, info := range infos { | ||
| if info.Err != nil { |
There was a problem hiding this comment.
if info.Err == nil {?
fixed
|
|
||
| results := make([][]types.Datum, 0, rowCount) | ||
| for _, info := range infos { | ||
| if info.Err != nil { |
| var uncachedServersInfo []infoschema.ServerInfo | ||
| var minLogicalCores = initialMaxCores | ||
| for _, info := range serversInfo { | ||
| mppInfo := copr.GlobalMPPInfoManager.Get(info.Address) |
There was a problem hiding this comment.
Consider monitoring the StartTimestamp for any change in the event that a TiFlash instance's CPU configuration is altered.
There was a problem hiding this comment.
Consider monitoring the
StartTimestampfor any change in the event that a TiFlash instance's CPU configuration is altered.
fixed
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: solotzg The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
📝 WalkthroughWalkthroughRefactors cluster server info fetching to return per-server results with errors preserved, adds a global MPP info manager to cache per-store CPU/start-timestamp, and updates the optimizer and executor to use/cache that MPP info when computing TiFlash min logical cores. Changes
Sequence Diagram(s)sequenceDiagram
participant Optimizer
participant Cache as GlobalMPPInfoManager
participant Fetcher as FetchClusterServerInfoWithoutPrivilegeCheck
participant Executor as clusterServerInfoRetriever
Optimizer->>Cache: Get(address) for each TiFlash
Cache-->>Optimizer: MPPInfo or nil
Optimizer->>Fetcher: Request info for uncached addresses
Fetcher->>Executor: Gather ServerInfoResult[] (per-server Rows, Err, Idx)
Executor-->>Fetcher: Return flattened [][]types.Datum (skipping Err)
Fetcher-->>Optimizer: []ServerInfoResult
Optimizer->>Cache: Add(address, LogicalCPUCount) for fresh entries
Cache-->>Optimizer: Ack (cache updated)
Optimizer->>Optimizer: Compute min logical cores from cached + fresh data
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
📝 Coding Plan
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 golangci-lint (2.11.3)Command failed Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment Tip You can disable the changed files summary in the walkthrough.Disable the |
|
/ok-to-test |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
pkg/store/copr/mpp_probe_test.go (1)
194-210: Cover the failed-store eviction path, not only CRUD.This exercises the map wrapper, but the behavior this change relies on is that
MPPFailedStoreProber.Add()evicts an existing cache entry for the same address. A regression that seedsGlobalMPPInfoManager, callsGlobalMPPFailedStoreProber.Add(), and assertsGet(address) == nilwould protect the actual integration path.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/store/copr/mpp_probe_test.go` around lines 194 - 210, Add a test that exercises the failed-store eviction path by seeding GlobalMPPInfoManager with an MPPInfo for the target address, then calling GlobalMPPFailedStoreProber.Add(address) and asserting that GlobalMPPInfoManager.Get(address) returns nil; specifically, create an MPPInfo and insert it into GlobalMPPInfoManager (use GlobalMPPInfoManager.Add or assign into its cachedStores), call GlobalMPPFailedStoreProber.Add with the same Address, and assert GlobalMPPInfoManager.Get(Address) == nil to ensure MPPFailedStoreProber.Add evicts the cache entry.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pkg/infoschema/tables.go`:
- Around line 2788-2790: The partial-fetch error returned from
getServerInfoByGRPC(ctx, remote, infoTp) is sent to the channel without node
context; wrap or annotate it with the target node (remote and/or
infoTp/serverTP) before sending so logs include which node failed. Update the
code that sends ServerInfoResult (the ch <- ServerInfoResult{Idx: index, Err:
err} lines) to either wrap err with context (e.g., fmt.Errorf("node %s(%s): %w",
remote, infoTp, err)) or add node identity fields to ServerInfoResult and
populate them (e.g., Address: remote, ServerTP: infoTp) so downstream logging
can show the failing node. Ensure the same change is applied at the other
occurrence around lines 2803-2807.
In `@pkg/planner/core/optimizer.go`:
- Around line 643-665: The current Get → RPC → Add is non-atomic and can revive
a store that was marked failed in between; change the cache-fill to perform a
conditional write: after the RPC returns but before calling
copr.GlobalMPPInfoManager.Add, re-check the cache and failure state (e.g., call
copr.GlobalMPPInfoManager.Get(address) and/or
copr.GlobalMPPFailedStoreProber.IsFailed(address)) and only call Add if the
cache entry is still absent and the store is not marked failed; alternatively
implement/use an AddIfNotFailed/AddWithGeneration API on GlobalMPPInfoManager
that compares a generation/tombstone from the initial Get and only stores when
unchanged so failure invalidation wins. Ensure you reference the same address
used when creating the copr.MPPInfo and avoid unconditional Add in the loop that
processes infos.Rows.
---
Nitpick comments:
In `@pkg/store/copr/mpp_probe_test.go`:
- Around line 194-210: Add a test that exercises the failed-store eviction path
by seeding GlobalMPPInfoManager with an MPPInfo for the target address, then
calling GlobalMPPFailedStoreProber.Add(address) and asserting that
GlobalMPPInfoManager.Get(address) returns nil; specifically, create an MPPInfo
and insert it into GlobalMPPInfoManager (use GlobalMPPInfoManager.Add or assign
into its cachedStores), call GlobalMPPFailedStoreProber.Add with the same
Address, and assert GlobalMPPInfoManager.Get(Address) == nil to ensure
MPPFailedStoreProber.Add evicts the cache entry.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 71bdce6f-f92a-461d-ab57-6573cb21d63d
📒 Files selected for processing (7)
pkg/executor/memtable_reader.gopkg/infoschema/tables.gopkg/planner/core/BUILD.bazelpkg/planner/core/optimizer.gopkg/store/copr/BUILD.bazelpkg/store/copr/mpp_probe.gopkg/store/copr/mpp_probe_test.go
| items, err := getServerInfoByGRPC(ctx, remote, infoTp) | ||
| if err != nil { | ||
| ch <- result{idx: index, err: err} | ||
| ch <- ServerInfoResult{Idx: index, Err: err} |
There was a problem hiding this comment.
Wrap partial-fetch errors with the target node.
These errors are later appended/logged without address or serverTP, so warnings degrade to generic transport failures with no clue which node failed. Please annotate the error before sending it to the channel, or carry the node identity on ServerInfoResult.
Suggested change
- ch <- ServerInfoResult{Idx: index, Err: err}
+ ch <- ServerInfoResult{
+ Idx: index,
+ Err: errors.Annotatef(err, "fetch cluster server info from %s(%s)", serverTP, address),
+ }As per coding guidelines, "Keep error handling actionable and contextual; avoid silently swallowing errors".
Also applies to: 2803-2807
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/infoschema/tables.go` around lines 2788 - 2790, The partial-fetch error
returned from getServerInfoByGRPC(ctx, remote, infoTp) is sent to the channel
without node context; wrap or annotate it with the target node (remote and/or
infoTp/serverTP) before sending so logs include which node failed. Update the
code that sends ServerInfoResult (the ch <- ServerInfoResult{Idx: index, Err:
err} lines) to either wrap err with context (e.g., fmt.Errorf("node %s(%s): %w",
remote, infoTp, err)) or add node identity fields to ServerInfoResult and
populate them (e.g., Address: remote, ServerTP: infoTp) so downstream logging
can show the failing node. Ensure the same change is applied at the other
occurrence around lines 2803-2807.
| var uncachedServersInfo []infoschema.ServerInfo | ||
| var minLogicalCores = initialMaxCores | ||
| for _, info := range serversInfo { | ||
| mppInfo := copr.GlobalMPPInfoManager.Get(info.Address) | ||
| if mppInfo == nil { | ||
| uncachedServersInfo = append(uncachedServersInfo, info) | ||
| continue | ||
| } | ||
| minLogicalCores = min(minLogicalCores, mppInfo.LogicalCPUCount) | ||
| } | ||
|
|
||
| if len(uncachedServersInfo) > 0 { | ||
| infos := infoschema.FetchClusterServerInfoWithoutPrivilegeCheck(ctx, sctx.GetSessionVars(), uncachedServersInfo, diagnosticspb.ServerInfoType_HardwareInfo, false) | ||
| for _, info := range infos { | ||
| for _, row := range info.Rows { | ||
| if row[4].GetString() == "cpu-logical-cores" { | ||
| logicalCpus, err := strconv.Atoi(row[5].GetString()) | ||
| if err == nil && logicalCpus > 0 { | ||
| copr.GlobalMPPInfoManager.Add(&copr.MPPInfo{ | ||
| Address: uncachedServersInfo[info.Idx].Address, | ||
| LogicalCPUCount: uint64(logicalCpus), | ||
| }) | ||
| minLogicalCores = min(minLogicalCores, uint64(logicalCpus)) |
There was a problem hiding this comment.
Don't revive a failed store between cache miss and cache fill.
This is a non-atomic Get → RPC → Add sequence. If another goroutine marks the store failed while the RPC is in flight, GlobalMPPFailedStoreProber.Add() deletes the cache entry and this unconditional Add puts stale CPU info straight back. That breaks the new eviction guarantee and can keep planning off a store already marked failed. The cache fill needs a generation/tombstone or other conditional write so failure invalidation wins.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/planner/core/optimizer.go` around lines 643 - 665, The current Get → RPC
→ Add is non-atomic and can revive a store that was marked failed in between;
change the cache-fill to perform a conditional write: after the RPC returns but
before calling copr.GlobalMPPInfoManager.Add, re-check the cache and failure
state (e.g., call copr.GlobalMPPInfoManager.Get(address) and/or
copr.GlobalMPPFailedStoreProber.IsFailed(address)) and only call Add if the
cache entry is still absent and the store is not marked failed; alternatively
implement/use an AddIfNotFailed/AddWithGeneration API on GlobalMPPInfoManager
that compares a generation/tombstone from the initial Get and only stores when
unchanged so failure invalidation wins. Ensure you reference the same address
used when creating the copr.MPPInfo and avoid unconditional Add in the loop that
processes infos.Rows.
There was a problem hiding this comment.
♻️ Duplicate comments (1)
pkg/planner/core/optimizer.go (1)
658-673:⚠️ Potential issue | 🟠 MajorAvoid reviving failed-store cache entries in the refresh write path.
The refresh flow is still non-atomic (
split/cache-check→ remote fetch → unconditionalAdd). If a store is marked failed and evicted during fetch, thisAddcan repopulate stale data and undo the invalidation guarantee. Please make cache fill conditional (e.g., generation/tombstone-based write where invalidation wins).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/planner/core/optimizer.go` around lines 658 - 673, The refresh path currently unconditionally repopulates MPPInfo via copr.GlobalMPPInfoManager.Add using serversNeedingRefresh[info.Idx], which can resurrect stores that were concurrently marked failed/evicted; before calling Add in the loop inside splitTiFlashLogicalCoreCache's refresh logic, verify the target server is still valid by checking a stable generation/tombstone or matching StartTimestamp/address against the current cache/store list (e.g., compare serversNeedingRefresh[info.Idx].StartTimestamp and Address or a generation field), and only call Add when they still match; if your cache supports a generation/tombstone, use that to make the write conditional so invalidation wins.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@pkg/planner/core/optimizer.go`:
- Around line 658-673: The refresh path currently unconditionally repopulates
MPPInfo via copr.GlobalMPPInfoManager.Add using serversNeedingRefresh[info.Idx],
which can resurrect stores that were concurrently marked failed/evicted; before
calling Add in the loop inside splitTiFlashLogicalCoreCache's refresh logic,
verify the target server is still valid by checking a stable
generation/tombstone or matching StartTimestamp/address against the current
cache/store list (e.g., compare serversNeedingRefresh[info.Idx].StartTimestamp
and Address or a generation field), and only call Add when they still match; if
your cache supports a generation/tombstone, use that to make the write
conditional so invalidation wins.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 3838bf54-8b2e-407f-a842-76a96b47c29b
📒 Files selected for processing (4)
pkg/planner/core/optimizer.gopkg/planner/core/optimizer_test.gopkg/store/copr/mpp_probe.gopkg/store/copr/mpp_probe_test.go
|
@xzhangxian1008: The following tests failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
What problem does this PR solve?
Issue Number: close #65701
Problem Summary:
What changed and how does it work?
We define a new struct called
MPPInfoManagerand set a global variableGlobalMPPInfoManagerwithMPPInfoManagertype. mpp info are all cached in this variable. IngetTiFlashServerMinLogicalCores, we will check the cache first then call grpc for uncached tiflash stores. When we get a failed tiflash store infilterAliveStoresHelper, we will remove it's cache.Check List
Tests
Side effects
Documentation
Release note
Please refer to Release Notes Language Style Guide to write a quality release note.
Summary by CodeRabbit
Refactor
Tests