Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pdctl: support top query in pdctl #7843

Merged
merged 12 commits into from
Apr 12, 2024
26 changes: 26 additions & 0 deletions server/api/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,19 @@
h.GetTopNRegions(w, r, func(a, b *core.RegionInfo) bool { return a.GetBytesWritten() < b.GetBytesWritten() })
}

// @Tags region
// @Summary List regions with the highest write flow.
// @Param limit query integer false "Limit count" default(16)
// @Produce json
// @Success 200 {object} response.RegionsInfo
// @Failure 400 {string} string "The input is invalid."
// @Router /regions/writequery [get]
func (h *regionsHandler) GetTopWriteQueryRegions(w http.ResponseWriter, r *http.Request) {
h.GetTopNRegions(w, r, func(a, b *core.RegionInfo) bool {
return a.GetWriteQueryNum() < b.GetWriteQueryNum()
})

Check warning on line 558 in server/api/region.go

View check run for this annotation

Codecov / codecov/patch

server/api/region.go#L556-L558

Added lines #L556 - L558 were not covered by tests
}

// @Tags region
// @Summary List regions with the highest read flow.
// @Param limit query integer false "Limit count" default(16)
Expand All @@ -556,6 +569,19 @@
h.GetTopNRegions(w, r, func(a, b *core.RegionInfo) bool { return a.GetBytesRead() < b.GetBytesRead() })
}

// @Tags region
// @Summary List regions with the highest write flow.
// @Param limit query integer false "Limit count" default(16)
// @Produce json
// @Success 200 {object} response.RegionsInfo
// @Failure 400 {string} string "The input is invalid."
// @Router /regions/readquery [get]
func (h *regionsHandler) GetTopReadQueryRegions(w http.ResponseWriter, r *http.Request) {
h.GetTopNRegions(w, r, func(a, b *core.RegionInfo) bool {
return a.GetReadQueryNum() < b.GetReadQueryNum()
})

Check warning on line 582 in server/api/region.go

View check run for this annotation

Codecov / codecov/patch

server/api/region.go#L580-L582

Added lines #L580 - L582 were not covered by tests
}

// @Tags region
// @Summary List regions with the largest conf version.
// @Param limit query integer false "Limit count" default(16)
Expand Down
2 changes: 2 additions & 0 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,9 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
registerFunc(clusterRouter, "/regions/store/{id}", regionsHandler.GetStoreRegions, setMethods(http.MethodGet), setAuditBackend(prometheus))
registerFunc(clusterRouter, "/regions/keyspace/id/{id}", regionsHandler.GetKeyspaceRegions, setMethods(http.MethodGet), setAuditBackend(prometheus))
registerFunc(clusterRouter, "/regions/writeflow", regionsHandler.GetTopWriteFlowRegions, setMethods(http.MethodGet), setAuditBackend(prometheus))
registerFunc(clusterRouter, "/regions/writequery", regionsHandler.GetTopWriteQueryRegions, setMethods(http.MethodGet), setAuditBackend(prometheus))
registerFunc(clusterRouter, "/regions/readflow", regionsHandler.GetTopReadFlowRegions, setMethods(http.MethodGet), setAuditBackend(prometheus))
registerFunc(clusterRouter, "/regions/readquery", regionsHandler.GetTopReadQueryRegions, setMethods(http.MethodGet), setAuditBackend(prometheus))
registerFunc(clusterRouter, "/regions/confver", regionsHandler.GetTopConfVerRegions, setMethods(http.MethodGet), setAuditBackend(prometheus))
registerFunc(clusterRouter, "/regions/version", regionsHandler.GetTopVersionRegions, setMethods(http.MethodGet), setAuditBackend(prometheus))
registerFunc(clusterRouter, "/regions/size", regionsHandler.GetTopSizeRegions, setMethods(http.MethodGet), setAuditBackend(prometheus))
Expand Down
40 changes: 34 additions & 6 deletions tools/pd-ctl/pdctl/command/region_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ var (
regionsCheckPrefix = "pd/api/v1/regions/check"
regionsWriteFlowPrefix = "pd/api/v1/regions/writeflow"
regionsReadFlowPrefix = "pd/api/v1/regions/readflow"
regionsWriteQueryPrefix = "pd/api/v1/regions/writequery"
regionsReadQueryPrefix = "pd/api/v1/regions/readquery"
regionsConfVerPrefix = "pd/api/v1/regions/confver"
regionsVersionPrefix = "pd/api/v1/regions/version"
regionsSizePrefix = "pd/api/v1/regions/size"
Expand Down Expand Up @@ -66,17 +68,17 @@ func NewRegionCommand() *cobra.Command {
r.AddCommand(NewRangesWithRangeHolesCommand())

topRead := &cobra.Command{
Use: `topread <limit> [--jq="<query string>"]`,
Short: "show regions with top read flow",
Run: showRegionsTopCommand(regionsReadFlowPrefix),
Use: `topread [byte|query] <limit> [--jq="<query string>"]`,
Short: "show regions with top read flow or query",
Run: showTopReadRegions,
}
topRead.Flags().String("jq", "", "jq query")
r.AddCommand(topRead)

topWrite := &cobra.Command{
Use: `topwrite <limit> [--jq="<query string>"]`,
Short: "show regions with top write flow",
Run: showRegionsTopCommand(regionsWriteFlowPrefix),
Use: `topwrite [byte|query] <limit> [--jq="<query string>"]`,
Short: "show regions with top write flow or query",
Run: showTopWriteRegions,
}
topWrite.Flags().String("jq", "", "jq query")
r.AddCommand(topWrite)
Expand Down Expand Up @@ -226,6 +228,32 @@ func showRegionsTopCommand(prefix string) run {
}
}

func showTopReadRegions(cmd *cobra.Command, args []string) {
if len(args) < 1 && args[0] != "query" && args[0] != "byte" {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it break the compatibility?

cmd.Println(cmd.UsageString())
return
}
switch args[0] {
case "query":
showRegionsTopCommand(regionsReadQueryPrefix)(cmd, args[1:])
default: // byte
showRegionsTopCommand(regionsReadFlowPrefix)(cmd, args[1:])
}
}

func showTopWriteRegions(cmd *cobra.Command, args []string) {
if len(args) < 1 && args[0] != "query" && args[0] != "byte" {
cmd.Println(cmd.UsageString())
return
}
switch args[0] {
case "query":
showRegionsTopCommand(regionsWriteQueryPrefix)(cmd, args[1:])
default: // byte
showRegionsTopCommand(regionsWriteFlowPrefix)(cmd, args[1:])
}
}

// NewRegionWithKeyCommand return a region with key subcommand of regionCmd
func NewRegionWithKeyCommand() *cobra.Command {
r := &cobra.Command{
Expand Down
28 changes: 28 additions & 0 deletions tools/pd-ctl/tests/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1144,6 +1144,34 @@ func (suite *configTestSuite) checkMicroServiceConfig(cluster *pdTests.TestClust
re.False(svr.GetMicroServiceConfig().EnableSchedulingFallback)
}

func (suite *configTestSuite) TestRegionRules() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why add it in this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From other unmerged pr

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then, we should remove it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or opening a new pr for adding test to cover it? They are both acceptable for me.

suite.env.RunTestInTwoModes(suite.checkRegionRules)
}

func (suite *configTestSuite) checkRegionRules(cluster *pdTests.TestCluster) {
re := suite.Require()
leaderServer := cluster.GetLeaderServer()
pdAddr := leaderServer.GetAddr()
cmd := ctl.GetRootCmd()

storeID, regionID := uint64(1), uint64(2)
store := &metapb.Store{
Id: storeID,
State: metapb.StoreState_Up,
}
pdTests.MustPutStore(re, cluster, store)
pdTests.MustPutRegion(re, cluster, regionID, storeID, []byte{}, []byte{})

args := []string{"-u", pdAddr, "config", "placement-rules", "show", "--region=" + strconv.Itoa(int(regionID)), "--detail"}
output, err := tests.ExecuteCommand(cmd, args...)
re.NoError(err)
fit := &placement.RegionFit{}
re.NoError(json.Unmarshal(output, fit))
re.Len(fit.RuleFits, 1)
re.Equal(placement.DefaultGroupID, fit.RuleFits[0].Rule.GroupID)
re.Equal(placement.DefaultRuleID, fit.RuleFits[0].Rule.ID)
}

func assertBundles(re *require.Assertions, a, b []placement.GroupBundle) {
re.Len(b, len(a))
for i := 0; i < len(a); i++ {
Expand Down
18 changes: 13 additions & 5 deletions tools/pd-ctl/tests/region/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func TestRegion(t *testing.T) {
r1 := pdTests.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"),
core.SetWrittenBytes(1000), core.SetReadBytes(1000), core.SetRegionConfVer(1),
core.SetRegionVersion(1), core.SetApproximateSize(1), core.SetApproximateKeys(100),
core.SetReadQuery(100), core.SetWrittenQuery(100),
core.SetPeers([]*metapb.Peer{
{Id: 1, StoreId: 1},
{Id: 5, StoreId: 2},
Expand All @@ -92,15 +93,18 @@ func TestRegion(t *testing.T) {
r2 := pdTests.MustPutRegion(re, cluster, 2, 1, []byte("b"), []byte("c"),
core.SetWrittenBytes(2000), core.SetReadBytes(0), core.SetRegionConfVer(2),
core.SetRegionVersion(3), core.SetApproximateSize(144), core.SetApproximateKeys(14400),
core.SetReadQuery(200), core.SetWrittenQuery(200),
)
r3 := pdTests.MustPutRegion(re, cluster, 3, 1, []byte("c"), []byte("d"),
core.SetWrittenBytes(500), core.SetReadBytes(800), core.SetRegionConfVer(3),
core.SetRegionVersion(2), core.SetApproximateSize(30), core.SetApproximateKeys(3000),
core.SetReadQuery(300), core.SetWrittenQuery(300),
core.WithDownPeers([]*pdpb.PeerStats{{Peer: downPeer, DownSeconds: 3600}}),
core.WithPendingPeers([]*metapb.Peer{downPeer}), core.WithLearners([]*metapb.Peer{{Id: 3, StoreId: 1}}))
r4 := pdTests.MustPutRegion(re, cluster, 4, 1, []byte("d"), []byte("e"),
core.SetWrittenBytes(100), core.SetReadBytes(100), core.SetRegionConfVer(1),
core.SetRegionVersion(1), core.SetApproximateSize(10), core.SetApproximateKeys(1000),
core.SetReadQuery(400), core.SetWrittenQuery(400),
)
defer cluster.Destroy()

Expand All @@ -115,10 +119,14 @@ func TestRegion(t *testing.T) {
// region store <store_id> command
{[]string{"region", "store", "1"}, leaderServer.GetStoreRegions(1)},
{[]string{"region", "store", "1"}, []*core.RegionInfo{r1, r2, r3, r4}},
// region topread [limit] command
{[]string{"region", "topread", "2"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool { return a.GetBytesRead() < b.GetBytesRead() }, 2)},
// region topwrite [limit] command
{[]string{"region", "topwrite", "2"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool { return a.GetBytesWritten() < b.GetBytesWritten() }, 2)},
// region topread byte [limit] command
{[]string{"region", "topread", "byte", "2"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool { return a.GetBytesRead() < b.GetBytesRead() }, 2)},
// region topwrite byte [limit] command
{[]string{"region", "topwrite", "byte", "2"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool { return a.GetBytesWritten() < b.GetBytesWritten() }, 2)},
// region topread byte [limit] command
{[]string{"region", "topread", "query", "2"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool { return a.GetReadQueryNum() < b.GetReadQueryNum() }, 2)},
// region topwrite byte [limit] command
{[]string{"region", "topwrite", "query", "2"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool { return a.GetWriteQueryNum() < b.GetWriteQueryNum() }, 2)},
// region topconfver [limit] command
{[]string{"region", "topconfver", "2"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool {
return a.GetMeta().GetRegionEpoch().GetConfVer() < b.GetMeta().GetRegionEpoch().GetConfVer()
Expand Down Expand Up @@ -172,7 +180,7 @@ func TestRegion(t *testing.T) {
output, err := tests.ExecuteCommand(cmd, args...)
re.NoError(err)
regions := &response.RegionsInfo{}
re.NoError(json.Unmarshal(output, regions))
re.NoError(json.Unmarshal(output, regions), string(output))
tests.CheckRegionsInfo(re, regions, testCase.expect)
}

Expand Down