Skip to content

Commit

Permalink
api: add region stats. (#840)
Browse files Browse the repository at this point in the history
  • Loading branch information
disksing committed Nov 9, 2017
1 parent 96db717 commit fca123a
Show file tree
Hide file tree
Showing 8 changed files with 291 additions and 21 deletions.
3 changes: 3 additions & 0 deletions server/api/router.go
Expand Up @@ -95,6 +95,9 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
classifierHandler := newClassifierHandler(svr, rd, classifierPrefix)
router.PathPrefix("/api/v1/classifier/").Handler(classifierHandler)

statsHandler := newStatsHandler(svr, rd)
router.HandleFunc("/api/v1/stats/region", statsHandler.Region).Methods("GET")

router.HandleFunc("/ping", func(w http.ResponseWriter, r *http.Request) {}).Methods("GET")
return router
}
44 changes: 44 additions & 0 deletions server/api/stats.go
@@ -0,0 +1,44 @@
// Copyright 2017 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package api

import (
"net/http"

"github.com/pingcap/pd/server"
"github.com/unrolled/render"
)

type statsHandler struct {
svr *server.Server
rd *render.Render
}

func newStatsHandler(svr *server.Server, rd *render.Render) *statsHandler {
return &statsHandler{
svr: svr,
rd: rd,
}
}

func (h *statsHandler) Region(w http.ResponseWriter, r *http.Request) {
cluster := h.svr.GetRaftCluster()
if cluster == nil {
h.rd.JSON(w, http.StatusInternalServerError, server.ErrNotBootstrapped.Error())
return
}
startKey, endKey := r.URL.Query().Get("start_key"), r.URL.Query().Get("end_key")
stats := cluster.GetRegionStats([]byte(startKey), []byte(endKey))
h.rd.JSON(w, http.StatusOK, stats)
}
152 changes: 152 additions & 0 deletions server/api/stats_test.go
@@ -0,0 +1,152 @@
// Copyright 2017 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package api

import (
"fmt"
"net/http"
"net/url"

. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/pd/pkg/apiutil"
"github.com/pingcap/pd/server"
"github.com/pingcap/pd/server/core"
)

var _ = Suite(&testStatsSuite{})

type testStatsSuite struct {
svr *server.Server
cleanup cleanUpFunc
urlPrefix string
}

func (s *testStatsSuite) SetUpSuite(c *C) {
s.svr, s.cleanup = mustNewServer(c)
mustWaitLeader(c, []*server.Server{s.svr})

addr := s.svr.GetAddr()
s.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix)

mustBootstrapCluster(c, s.svr)
}

func (s *testStatsSuite) TearDownSuite(c *C) {
s.cleanup()
}

func (s *testStatsSuite) TestRegionStats(c *C) {
statsURL := s.urlPrefix + "/stats/region"

regions := []*core.RegionInfo{
{
Region: &metapb.Region{
Id: 1,
StartKey: []byte(""),
EndKey: []byte("a"),
Peers: []*metapb.Peer{
{Id: 101, StoreId: 1},
{Id: 102, StoreId: 2},
{Id: 103, StoreId: 3},
},
},
Leader: &metapb.Peer{Id: 101, StoreId: 1},
ApproximateSize: 100,
},
{
Region: &metapb.Region{
Id: 2,
StartKey: []byte("a"),
EndKey: []byte("t"),
Peers: []*metapb.Peer{
{Id: 104, StoreId: 1},
{Id: 105, StoreId: 4},
{Id: 106, StoreId: 5},
},
},
Leader: &metapb.Peer{Id: 105, StoreId: 4},
ApproximateSize: 200,
},
{
Region: &metapb.Region{
Id: 3,
StartKey: []byte("t"),
EndKey: []byte("x"),
Peers: []*metapb.Peer{
{Id: 106, StoreId: 1},
{Id: 107, StoreId: 5},
},
},
Leader: &metapb.Peer{Id: 107, StoreId: 5},
ApproximateSize: 1,
},
{
Region: &metapb.Region{
Id: 4,
StartKey: []byte("x"),
EndKey: []byte(""),
Peers: []*metapb.Peer{
{Id: 108, StoreId: 4},
},
},
Leader: &metapb.Peer{Id: 108, StoreId: 4},
ApproximateSize: 50,
},
}

for _, r := range regions {
mustRegionHeartbeat(c, s.svr, r)
}

// Distribution (L for leader, F for follower):
// region range size store1 store2 store3 store4 store5
// 1 ["", "a") 100 L F F
// 2 ["a", "t") 200 F L F
// 3 ["t", "x") 1 F L
// 4 ["x", "") 50 L

statsAll := &core.RegionStats{
Count: 4,
EmptyCount: 1,
StorageSize: 351,
StoreLeaderCount: map[uint64]int{1: 1, 4: 2, 5: 1},
StorePeerCount: map[uint64]int{1: 3, 2: 1, 3: 1, 4: 2, 5: 2},
StoreLeaderSize: map[uint64]int64{1: 100, 4: 250, 5: 1},
StorePeerSize: map[uint64]int64{1: 301, 2: 100, 3: 100, 4: 250, 5: 201},
}
res, err := http.Get(statsURL)
c.Assert(err, IsNil)
stats := &core.RegionStats{}
err = apiutil.ReadJSON(res.Body, stats)
c.Assert(err, IsNil)
c.Assert(stats, DeepEquals, statsAll)

stats23 := &core.RegionStats{
Count: 2,
EmptyCount: 1,
StorageSize: 201,
StoreLeaderCount: map[uint64]int{4: 1, 5: 1},
StorePeerCount: map[uint64]int{1: 2, 4: 1, 5: 2},
StoreLeaderSize: map[uint64]int64{4: 200, 5: 1},
StorePeerSize: map[uint64]int64{1: 201, 4: 200, 5: 201},
}
args := fmt.Sprintf("?start_key=%s&end_key=%s", url.QueryEscape("\x01\x02"), url.QueryEscape("xyz\x00\x00"))
res, err = http.Get(statsURL + args)
c.Assert(err, IsNil)
stats = &core.RegionStats{}
err = apiutil.ReadJSON(res.Body, stats)
c.Assert(err, IsNil)
c.Assert(stats, DeepEquals, stats23)
}
6 changes: 6 additions & 0 deletions server/cache.go
Expand Up @@ -267,6 +267,12 @@ func (c *clusterInfo) getRegionCount() int {
return c.Regions.GetRegionCount()
}

func (c *clusterInfo) getRegionStats(startKey, endKey []byte) *core.RegionStats {
c.RLock()
defer c.RUnlock()
return c.Regions.GetRegionStats(startKey, endKey)
}

func (c *clusterInfo) getStoreRegionCount(storeID uint64) int {
c.RLock()
defer c.RUnlock()
Expand Down
5 changes: 5 additions & 0 deletions server/cluster.go
Expand Up @@ -391,6 +391,11 @@ func (c *RaftCluster) GetRegions() []*metapb.Region {
return c.cachedCluster.getMetaRegions()
}

// GetRegionStats returns region statistics from cluster.
func (c *RaftCluster) GetRegionStats(startKey, endKey []byte) *core.RegionStats {
return c.cachedCluster.getRegionStats(startKey, endKey)
}

// GetStores gets stores from cluster.
func (c *RaftCluster) GetStores() []*metapb.Store {
return c.cachedCluster.getMetaStores()
Expand Down
82 changes: 76 additions & 6 deletions server/core/region.go
Expand Up @@ -16,6 +16,7 @@ package core
import (
"bytes"
"fmt"
"math"
"math/rand"
"reflect"
"strings"
Expand Down Expand Up @@ -45,6 +46,23 @@ func NewRegionInfo(region *metapb.Region, leader *metapb.Peer) *RegionInfo {
}
}

// EmptyRegionApproximateSize is the region approximate size of an empty region
// (heartbeat size <= 1MB).
const EmptyRegionApproximateSize = 1

// RegionFromHeartbeat constructs a Region from region heartbeat.
func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest) *RegionInfo {
return &RegionInfo{
Region: heartbeat.GetRegion(),
Leader: heartbeat.GetLeader(),
DownPeers: heartbeat.GetDownPeers(),
PendingPeers: heartbeat.GetPendingPeers(),
WrittenBytes: heartbeat.GetBytesWritten(),
ReadBytes: heartbeat.GetBytesRead(),
ApproximateSize: int64(math.Ceil(float64(heartbeat.GetApproximateSize()) / 1e6)), // use size of MB as unit
}
}

// Clone returns a copy of current regionInfo.
func (r *RegionInfo) Clone() *RegionInfo {
downPeers := make([]*pdpb.PeerStats, 0, len(r.DownPeers))
Expand Down Expand Up @@ -475,15 +493,67 @@ func (r *RegionsInfo) GetFollower(storeID uint64, regionID uint64) *RegionInfo {

// ScanRange scans region with start key, until number greater than limit.
func (r *RegionsInfo) ScanRange(startKey []byte, limit int) []*RegionInfo {
metaRegions := r.tree.scanRange(startKey, limit)
res := make([]*RegionInfo, 0, len(metaRegions))
for _, m := range metaRegions {
region := r.GetRegion(m.region.GetId())
res = append(res, region)
}
res := make([]*RegionInfo, 0, limit)
r.tree.scanRange(startKey, func(region *metapb.Region) bool {
res = append(res, r.GetRegion(region.GetId()))
return len(res) < limit
})
return res
}

// RegionStats records a list of regions' statistics and distribution status.
type RegionStats struct {
Count int `json:"count"`
EmptyCount int `json:"empty_count"`
StorageSize int64 `json:"storage_size"`
StoreLeaderCount map[uint64]int `json:"store_leader_count"`
StorePeerCount map[uint64]int `json:"store_peer_count"`
StoreLeaderSize map[uint64]int64 `json:"store_leader_size"`
StorePeerSize map[uint64]int64 `json:"store_peer_size"`
}

func newRegionStats() *RegionStats {
return &RegionStats{
StoreLeaderCount: make(map[uint64]int),
StorePeerCount: make(map[uint64]int),
StoreLeaderSize: make(map[uint64]int64),
StorePeerSize: make(map[uint64]int64),
}
}

// Observe adds a region's statistics into RegionStats.
func (s *RegionStats) Observe(r *RegionInfo) {
s.Count++
if r.ApproximateSize <= EmptyRegionApproximateSize {
s.EmptyCount++
}
s.StorageSize += r.ApproximateSize
if r.Leader != nil {
s.StoreLeaderCount[r.Leader.GetStoreId()]++
s.StoreLeaderSize[r.Leader.GetStoreId()] += r.ApproximateSize
}
for _, p := range r.Peers {
s.StorePeerCount[p.GetStoreId()]++
s.StorePeerSize[p.GetStoreId()] += r.ApproximateSize
}
}

// GetRegionStats scans regions that inside range [startKey, endKey) and sums up
// their statistics.
func (r *RegionsInfo) GetRegionStats(startKey, endKey []byte) *RegionStats {
stats := newRegionStats()
r.tree.scanRange(startKey, func(meta *metapb.Region) bool {
if len(endKey) > 0 && (len(meta.EndKey) == 0 || bytes.Compare(meta.EndKey, endKey) >= 0) {
return false
}
if region := r.GetRegion(meta.GetId()); region != nil {
stats.Observe(region)
}
return true
})
return stats
}

const randomRegionMaxRetry = 10

func randRegion(regions *regionMap) *RegionInfo {
Expand Down
12 changes: 4 additions & 8 deletions server/core/region_tree.go
Expand Up @@ -123,13 +123,9 @@ func (t *regionTree) find(region *metapb.Region) *regionItem {
return result
}

func (t *regionTree) scanRange(startKey []byte, limit int) []*regionItem {
region := &metapb.Region{StartKey: startKey}
item := &regionItem{region: region}
res := make([]*regionItem, 0, limit)
t.tree.DescendLessOrEqual(item, func(i btree.Item) bool {
res = append(res, i.(*regionItem))
return len(res) < int(limit)
func (t *regionTree) scanRange(startKey []byte, f func(*metapb.Region) bool) {
startItem := &regionItem{region: &metapb.Region{StartKey: startKey}}
t.tree.DescendLessOrEqual(startItem, func(item btree.Item) bool {
return f(item.(*regionItem).region)
})
return res
}
8 changes: 1 addition & 7 deletions server/grpc_service.go
Expand Up @@ -16,7 +16,6 @@ package server
import (
"fmt"
"io"
"math"
"strconv"
"sync/atomic"
"time"
Expand Down Expand Up @@ -317,12 +316,7 @@ func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error {
isNew = false
}

region := core.NewRegionInfo(request.GetRegion(), request.GetLeader())
region.DownPeers = request.GetDownPeers()
region.PendingPeers = request.GetPendingPeers()
region.WrittenBytes = request.GetBytesWritten()
region.ReadBytes = request.GetBytesRead()
region.ApproximateSize = int64(math.Ceil(float64(request.GetApproximateSize()) / 1e6)) // use size of Mb as unit
region := core.RegionFromHeartbeat(request)
if region.GetId() == 0 {
msg := fmt.Sprintf("invalid request region, %v", request)
hbStreams.sendErr(region, pdpb.ErrorType_UNKNOWN, msg, storeLabel)
Expand Down

0 comments on commit fca123a

Please sign in to comment.