Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pdctl: support top query in pdctl #7843

Merged
merged 12 commits into from
Apr 12, 2024
36 changes: 32 additions & 4 deletions pkg/core/region_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,18 +248,23 @@

// SetReadQuery sets the read query for the region, only used for unit test.
func SetReadQuery(v uint64) RegionCreateOption {
q := RandomKindReadQuery(v)
return SetQueryStats(q)
return func(region *RegionInfo) {
resetReadQuery(region.queryStats)
region.queryStats = mergeQueryStat(region.queryStats, RandomKindReadQuery(v))
}
}

// SetWrittenQuery sets the write query for the region, only used for unit test.
func SetWrittenQuery(v uint64) RegionCreateOption {
q := RandomKindWriteQuery(v)
return SetQueryStats(q)
return func(region *RegionInfo) {
resetWriteQuery(region.queryStats)
region.queryStats = mergeQueryStat(region.queryStats, RandomKindWriteQuery(v))
}
}

// SetQueryStats sets the query stats for the region, it will cover previous statistic.
// This func is only used for unit test.
// It will cover previous statistic.
func SetQueryStats(v *pdpb.QueryStats) RegionCreateOption {
return func(region *RegionInfo) {
region.queryStats = v
Expand All @@ -268,6 +273,7 @@

// AddQueryStats sets the query stats for the region, it will preserve previous statistic.
// This func is only used for test and simulator.
// It will preserve previous statistic.
func AddQueryStats(v *pdpb.QueryStats) RegionCreateOption {
return func(region *RegionInfo) {
q := mergeQueryStat(region.queryStats, v)
Expand Down Expand Up @@ -469,3 +475,25 @@
q2.Rollback += q1.Rollback
return q2
}

func resetReadQuery(q *pdpb.QueryStats) {
if q == nil {
return
}
q.Get = 0
q.Scan = 0
q.Coprocessor = 0

Check warning on line 485 in pkg/core/region_option.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/region_option.go#L483-L485

Added lines #L483 - L485 were not covered by tests
}

func resetWriteQuery(q *pdpb.QueryStats) {
if q == nil {
return
}
q.Put = 0
q.Delete = 0
q.DeleteRange = 0
q.AcquirePessimisticLock = 0
q.Rollback = 0
q.Prewrite = 0
q.Commit = 0
}
26 changes: 26 additions & 0 deletions server/api/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,19 @@ func (h *regionsHandler) GetTopWriteFlowRegions(w http.ResponseWriter, r *http.R
h.GetTopNRegions(w, r, func(a, b *core.RegionInfo) bool { return a.GetBytesWritten() < b.GetBytesWritten() })
}

// @Tags region
// @Summary List regions with the highest write flow.
// @Param limit query integer false "Limit count" default(16)
// @Produce json
// @Success 200 {object} response.RegionsInfo
// @Failure 400 {string} string "The input is invalid."
// @Router /regions/writequery [get]
func (h *regionsHandler) GetTopWriteQueryRegions(w http.ResponseWriter, r *http.Request) {
h.GetTopNRegions(w, r, func(a, b *core.RegionInfo) bool {
return a.GetWriteQueryNum() < b.GetWriteQueryNum()
})
}

// @Tags region
// @Summary List regions with the highest read flow.
// @Param limit query integer false "Limit count" default(16)
Expand All @@ -556,6 +569,19 @@ func (h *regionsHandler) GetTopReadFlowRegions(w http.ResponseWriter, r *http.Re
h.GetTopNRegions(w, r, func(a, b *core.RegionInfo) bool { return a.GetBytesRead() < b.GetBytesRead() })
}

// @Tags region
// @Summary List regions with the highest write flow.
// @Param limit query integer false "Limit count" default(16)
// @Produce json
// @Success 200 {object} response.RegionsInfo
// @Failure 400 {string} string "The input is invalid."
// @Router /regions/readquery [get]
func (h *regionsHandler) GetTopReadQueryRegions(w http.ResponseWriter, r *http.Request) {
h.GetTopNRegions(w, r, func(a, b *core.RegionInfo) bool {
return a.GetReadQueryNum() < b.GetReadQueryNum()
})
}

// @Tags region
// @Summary List regions with the largest conf version.
// @Param limit query integer false "Limit count" default(16)
Expand Down
2 changes: 2 additions & 0 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,9 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
registerFunc(clusterRouter, "/regions/store/{id}", regionsHandler.GetStoreRegions, setMethods(http.MethodGet), setAuditBackend(prometheus))
registerFunc(clusterRouter, "/regions/keyspace/id/{id}", regionsHandler.GetKeyspaceRegions, setMethods(http.MethodGet), setAuditBackend(prometheus))
registerFunc(clusterRouter, "/regions/writeflow", regionsHandler.GetTopWriteFlowRegions, setMethods(http.MethodGet), setAuditBackend(prometheus))
registerFunc(clusterRouter, "/regions/writequery", regionsHandler.GetTopWriteQueryRegions, setMethods(http.MethodGet), setAuditBackend(prometheus))
registerFunc(clusterRouter, "/regions/readflow", regionsHandler.GetTopReadFlowRegions, setMethods(http.MethodGet), setAuditBackend(prometheus))
registerFunc(clusterRouter, "/regions/readquery", regionsHandler.GetTopReadQueryRegions, setMethods(http.MethodGet), setAuditBackend(prometheus))
registerFunc(clusterRouter, "/regions/confver", regionsHandler.GetTopConfVerRegions, setMethods(http.MethodGet), setAuditBackend(prometheus))
registerFunc(clusterRouter, "/regions/version", regionsHandler.GetTopVersionRegions, setMethods(http.MethodGet), setAuditBackend(prometheus))
registerFunc(clusterRouter, "/regions/size", regionsHandler.GetTopSizeRegions, setMethods(http.MethodGet), setAuditBackend(prometheus))
Expand Down
51 changes: 45 additions & 6 deletions tools/pd-ctl/pdctl/command/region_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
regionsCheckPrefix = "pd/api/v1/regions/check"
regionsWriteFlowPrefix = "pd/api/v1/regions/writeflow"
regionsReadFlowPrefix = "pd/api/v1/regions/readflow"
regionsWriteQueryPrefix = "pd/api/v1/regions/writequery"
regionsReadQueryPrefix = "pd/api/v1/regions/readquery"
regionsConfVerPrefix = "pd/api/v1/regions/confver"
regionsVersionPrefix = "pd/api/v1/regions/version"
regionsSizePrefix = "pd/api/v1/regions/size"
Expand Down Expand Up @@ -66,17 +68,17 @@
r.AddCommand(NewRangesWithRangeHolesCommand())

topRead := &cobra.Command{
Use: `topread <limit> [--jq="<query string>"]`,
Short: "show regions with top read flow",
Run: showRegionsTopCommand(regionsReadFlowPrefix),
Use: `topread [byte|query] <limit> [--jq="<query string>"]`,
Short: "show regions with top read flow or query",
Run: showTopReadRegions,
}
topRead.Flags().String("jq", "", "jq query")
r.AddCommand(topRead)

topWrite := &cobra.Command{
Use: `topwrite <limit> [--jq="<query string>"]`,
Short: "show regions with top write flow",
Run: showRegionsTopCommand(regionsWriteFlowPrefix),
Use: `topwrite [byte|query] <limit> [--jq="<query string>"]`,
Short: "show regions with top write flow or query",
Run: showTopWriteRegions,
}
topWrite.Flags().String("jq", "", "jq query")
r.AddCommand(topWrite)
Expand Down Expand Up @@ -212,6 +214,9 @@
return
}
prefix += "?limit=" + args[0]
} else if len(args) > 1 {
cmd.Println(cmd.UsageString())
return

Check warning on line 219 in tools/pd-ctl/pdctl/command/region_command.go

View check run for this annotation

Codecov / codecov/patch

tools/pd-ctl/pdctl/command/region_command.go#L218-L219

Added lines #L218 - L219 were not covered by tests
}
r, err := doRequest(cmd, prefix, http.MethodGet, http.Header{})
if err != nil {
Expand All @@ -226,6 +231,40 @@
}
}

func showTopReadRegions(cmd *cobra.Command, args []string) {
// default to show top read flow
if len(args) == 0 {
showRegionsTopCommand(regionsReadFlowPrefix)(cmd, args)
return
}
// default to show top read flow with limit
switch args[0] {
case "query":
showRegionsTopCommand(regionsReadQueryPrefix)(cmd, args[1:])
case "byte":
showRegionsTopCommand(regionsReadFlowPrefix)(cmd, args[1:])
default:
showRegionsTopCommand(regionsReadFlowPrefix)(cmd, args)
}
}

func showTopWriteRegions(cmd *cobra.Command, args []string) {
// default to show top write flow
if len(args) == 0 {
showRegionsTopCommand(regionsWriteFlowPrefix)(cmd, args)
return
}
// default to show top write flow with limit
switch args[0] {
case "query":
showRegionsTopCommand(regionsWriteQueryPrefix)(cmd, args[1:])
case "byte":
showRegionsTopCommand(regionsWriteFlowPrefix)(cmd, args[1:])
default:
showRegionsTopCommand(regionsWriteFlowPrefix)(cmd, args)
}
}

// NewRegionWithKeyCommand return a region with key subcommand of regionCmd
func NewRegionWithKeyCommand() *cobra.Command {
r := &cobra.Command{
Expand Down
28 changes: 28 additions & 0 deletions tools/pd-ctl/tests/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1144,6 +1144,34 @@ func (suite *configTestSuite) checkMicroServiceConfig(cluster *pdTests.TestClust
re.False(svr.GetMicroServiceConfig().EnableSchedulingFallback)
}

func (suite *configTestSuite) TestRegionRules() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why add it in this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From other unmerged pr

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then, we should remove it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or opening a new pr for adding test to cover it? They are both acceptable for me.

suite.env.RunTestInTwoModes(suite.checkRegionRules)
}

func (suite *configTestSuite) checkRegionRules(cluster *pdTests.TestCluster) {
re := suite.Require()
leaderServer := cluster.GetLeaderServer()
pdAddr := leaderServer.GetAddr()
cmd := ctl.GetRootCmd()

storeID, regionID := uint64(1), uint64(2)
store := &metapb.Store{
Id: storeID,
State: metapb.StoreState_Up,
}
pdTests.MustPutStore(re, cluster, store)
pdTests.MustPutRegion(re, cluster, regionID, storeID, []byte{}, []byte{})

args := []string{"-u", pdAddr, "config", "placement-rules", "show", "--region=" + strconv.Itoa(int(regionID)), "--detail"}
output, err := tests.ExecuteCommand(cmd, args...)
re.NoError(err)
fit := &placement.RegionFit{}
re.NoError(json.Unmarshal(output, fit))
re.Len(fit.RuleFits, 1)
re.Equal(placement.DefaultGroupID, fit.RuleFits[0].Rule.GroupID)
re.Equal(placement.DefaultRuleID, fit.RuleFits[0].Rule.ID)
}

func assertBundles(re *require.Assertions, a, b []placement.GroupBundle) {
re.Len(b, len(a))
for i := 0; i < len(a); i++ {
Expand Down
9 changes: 9 additions & 0 deletions tools/pd-ctl/tests/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,12 @@ func CheckRegionsInfo(re *require.Assertions, output *response.RegionsInfo, expe
CheckRegionInfo(re, &got[i], region)
}
}

// CheckRegionsInfoWithoutSort is used to check the test results without sort.
func CheckRegionsInfoWithoutSort(re *require.Assertions, output *response.RegionsInfo, expected []*core.RegionInfo) {
re.Len(expected, output.Count)
got := output.Regions
for i, region := range expected {
CheckRegionInfo(re, &got[i], region)
}
}
85 changes: 62 additions & 23 deletions tools/pd-ctl/tests/region/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func TestRegion(t *testing.T) {
r1 := pdTests.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"),
core.SetWrittenBytes(1000), core.SetReadBytes(1000), core.SetRegionConfVer(1),
core.SetRegionVersion(1), core.SetApproximateSize(1), core.SetApproximateKeys(100),
core.SetReadQuery(100), core.SetWrittenQuery(100),
core.SetPeers([]*metapb.Peer{
{Id: 1, StoreId: 1},
{Id: 5, StoreId: 2},
Expand All @@ -92,15 +93,18 @@ func TestRegion(t *testing.T) {
r2 := pdTests.MustPutRegion(re, cluster, 2, 1, []byte("b"), []byte("c"),
core.SetWrittenBytes(2000), core.SetReadBytes(0), core.SetRegionConfVer(2),
core.SetRegionVersion(3), core.SetApproximateSize(144), core.SetApproximateKeys(14400),
core.SetReadQuery(200), core.SetWrittenQuery(200),
)
r3 := pdTests.MustPutRegion(re, cluster, 3, 1, []byte("c"), []byte("d"),
core.SetWrittenBytes(500), core.SetReadBytes(800), core.SetRegionConfVer(3),
core.SetRegionVersion(2), core.SetApproximateSize(30), core.SetApproximateKeys(3000),
core.SetReadQuery(300), core.SetWrittenQuery(300),
core.WithDownPeers([]*pdpb.PeerStats{{Peer: downPeer, DownSeconds: 3600}}),
core.WithPendingPeers([]*metapb.Peer{downPeer}), core.WithLearners([]*metapb.Peer{{Id: 3, StoreId: 1}}))
r4 := pdTests.MustPutRegion(re, cluster, 4, 1, []byte("d"), []byte("e"),
core.SetWrittenBytes(100), core.SetReadBytes(100), core.SetRegionConfVer(1),
core.SetRegionVersion(1), core.SetApproximateSize(10), core.SetApproximateKeys(1000),
core.SetWrittenBytes(100), core.SetReadBytes(100), core.SetRegionConfVer(4),
core.SetRegionVersion(4), core.SetApproximateSize(10), core.SetApproximateKeys(1000),
core.SetReadQuery(400), core.SetWrittenQuery(400),
)
defer cluster.Destroy()

Expand All @@ -115,26 +119,6 @@ func TestRegion(t *testing.T) {
// region store <store_id> command
{[]string{"region", "store", "1"}, leaderServer.GetStoreRegions(1)},
{[]string{"region", "store", "1"}, []*core.RegionInfo{r1, r2, r3, r4}},
// region topread [limit] command
{[]string{"region", "topread", "2"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool { return a.GetBytesRead() < b.GetBytesRead() }, 2)},
// region topwrite [limit] command
{[]string{"region", "topwrite", "2"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool { return a.GetBytesWritten() < b.GetBytesWritten() }, 2)},
// region topconfver [limit] command
{[]string{"region", "topconfver", "2"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool {
return a.GetMeta().GetRegionEpoch().GetConfVer() < b.GetMeta().GetRegionEpoch().GetConfVer()
}, 2)},
// region topversion [limit] command
{[]string{"region", "topversion", "2"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool {
return a.GetMeta().GetRegionEpoch().GetVersion() < b.GetMeta().GetRegionEpoch().GetVersion()
}, 2)},
// region topsize [limit] command
{[]string{"region", "topsize", "2"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool {
return a.GetApproximateSize() < b.GetApproximateSize()
}, 2)},
// region topkeys [limit] command
{[]string{"region", "topkeys", "2"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool {
return a.GetApproximateKeys() < b.GetApproximateKeys()
}, 2)},
// region check extra-peer command
{[]string{"region", "check", "extra-peer"}, []*core.RegionInfo{r1}},
// region check miss-peer command
Expand Down Expand Up @@ -172,10 +156,65 @@ func TestRegion(t *testing.T) {
output, err := tests.ExecuteCommand(cmd, args...)
re.NoError(err)
regions := &response.RegionsInfo{}
re.NoError(json.Unmarshal(output, regions))
re.NoError(json.Unmarshal(output, regions), string(output))
tests.CheckRegionsInfo(re, regions, testCase.expect)
}

testRegionsCases = []struct {
args []string
expect []*core.RegionInfo
}{
// region topread [limit] command
{[]string{"region", "topread"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool { return a.GetBytesRead() < b.GetBytesRead() }, 4)},
// region topwrite [limit] command
{[]string{"region", "topwrite"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool { return a.GetBytesWritten() < b.GetBytesWritten() }, 4)},
// region topread [limit] command
{[]string{"region", "topread", "2"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool { return a.GetBytesRead() < b.GetBytesRead() }, 2)},
// region topwrite [limit] command
{[]string{"region", "topwrite", "2"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool { return a.GetBytesWritten() < b.GetBytesWritten() }, 2)},
// region topread byte [limit] command
{[]string{"region", "topread", "byte"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool { return a.GetBytesRead() < b.GetBytesRead() }, 4)},
// region topwrite byte [limit] command
{[]string{"region", "topwrite", "byte"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool { return a.GetBytesWritten() < b.GetBytesWritten() }, 4)},
// region topread byte [limit] command
{[]string{"region", "topread", "query"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool { return a.GetReadQueryNum() < b.GetReadQueryNum() }, 4)},
// region topwrite byte [limit] command
{[]string{"region", "topwrite", "query"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool { return a.GetWriteQueryNum() < b.GetWriteQueryNum() }, 4)},
// region topread byte [limit] command
{[]string{"region", "topread", "byte", "2"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool { return a.GetBytesRead() < b.GetBytesRead() }, 2)},
// region topwrite byte [limit] command
{[]string{"region", "topwrite", "byte", "2"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool { return a.GetBytesWritten() < b.GetBytesWritten() }, 2)},
// region topread byte [limit] command
{[]string{"region", "topread", "query", "2"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool { return a.GetReadQueryNum() < b.GetReadQueryNum() }, 2)},
// region topwrite byte [limit] command
{[]string{"region", "topwrite", "query", "2"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool { return a.GetWriteQueryNum() < b.GetWriteQueryNum() }, 2)},
// region topconfver [limit] command
{[]string{"region", "topconfver", "2"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool {
return a.GetMeta().GetRegionEpoch().GetConfVer() < b.GetMeta().GetRegionEpoch().GetConfVer()
}, 2)},
// region topversion [limit] command
{[]string{"region", "topversion", "2"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool {
return a.GetMeta().GetRegionEpoch().GetVersion() < b.GetMeta().GetRegionEpoch().GetVersion()
}, 2)},
// region topsize [limit] command
{[]string{"region", "topsize", "2"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool {
return a.GetApproximateSize() < b.GetApproximateSize()
}, 2)},
// region topkeys [limit] command
{[]string{"region", "topkeys", "2"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool {
return a.GetApproximateKeys() < b.GetApproximateKeys()
}, 2)},
}

for _, testCase := range testRegionsCases {
args := append([]string{"-u", pdAddr}, testCase.args...)
output, err := tests.ExecuteCommand(cmd, args...)
re.NoError(err)
regions := &response.RegionsInfo{}
re.NoError(json.Unmarshal(output, regions), string(output))
tests.CheckRegionsInfoWithoutSort(re, regions, testCase.expect)
}

var testRegionCases = []struct {
args []string
expect *core.RegionInfo
Expand Down