Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: move hot region scheduler into schedulers directory. #730

Merged
merged 1 commit into from
Sep 4, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
389 changes: 0 additions & 389 deletions server/balancer.go

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions server/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ func (s *testReplicaCheckerSuite) TestBasic(c *C) {
tc.addLeaderRegion(1, 1, 2)

// Region has 2 peers, we need to add a new peer.
region := cluster.getRegion(1)
region := cluster.GetRegion(1)
checkAddPeer(c, rc.Check(region), 4)

// Test healthFilter.
Expand Down Expand Up @@ -649,7 +649,7 @@ func (s *testReplicaCheckerSuite) TestLostStore(c *C) {
// This happens only in recovering the PD cluster
// should not panic
tc.addLeaderRegion(1, 1, 2, 3)
region := cluster.getRegion(1)
region := cluster.GetRegion(1)
op := rc.Check(region)
c.Assert(op, IsNil)
}
Expand All @@ -669,7 +669,7 @@ func (s *testReplicaCheckerSuite) TestOffline(c *C) {
tc.addLabelsStore(4, 4, map[string]string{"zone": "z3", "rack": "r2", "host": "h1"})

tc.addLeaderRegion(1, 1)
region := cluster.getRegion(1)
region := cluster.GetRegion(1)

// Store 2 has different zone and smallest region score.
checkAddPeer(c, rc.Check(region), 2)
Expand Down Expand Up @@ -723,7 +723,7 @@ func (s *testReplicaCheckerSuite) TestDistinctScore(c *C) {

// We need 3 replicas.
tc.addLeaderRegion(1, 1)
region := tc.getRegion(1)
region := tc.GetRegion(1)
checkAddPeer(c, rc.Check(region), 2)
peer2, _ := cluster.AllocPeer(2)
region.Peers = append(region.Peers, peer2)
Expand Down Expand Up @@ -806,7 +806,7 @@ func (s *testReplicaCheckerSuite) TestDistinctScore2(c *C) {
tc.addLabelsStore(6, 1, map[string]string{"zone": "z3", "host": "h1"})

tc.addLeaderRegion(1, 1, 2, 4)
region := cluster.getRegion(1)
region := cluster.GetRegion(1)

checkAddPeer(c, rc.Check(region), 6)
peer6, _ := cluster.AllocPeer(6)
Expand Down Expand Up @@ -907,7 +907,7 @@ func (s *testBalanceHotRegionSchedulerSuite) TestBalance(c *C) {
tc := newTestClusterInfo(cluster)

_, opt := newTestScheduleConfig()
hb := newBalanceHotRegionScheduler(opt)
hb := schedulers.NewBalanceHotRegionScheduler(opt)

// Add stores 1, 2, 3, 4, 5 with region counts 3, 2, 2, 2, 0.
tc.addRegionStore(1, 3)
Expand Down
31 changes: 21 additions & 10 deletions server/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,53 +499,64 @@ func (c *clusterInfo) getStoresWriteStat() map[uint64]uint64 {
return res
}

func (c *clusterInfo) getRegion(regionID uint64) *core.RegionInfo {
// GetRegions searches for a region by ID.
func (c *clusterInfo) GetRegion(regionID uint64) *core.RegionInfo {
c.RLock()
defer c.RUnlock()
return c.regions.getRegion(regionID)
}

// updateWriteStatCache updates statistic for a region if it's hot, or remove it from statistics if it cools down
func (c *clusterInfo) updateWriteStatCache(region *core.RegionInfo, hotRegionThreshold uint64) {
var v *RegionStat
var v *core.RegionStat
key := region.GetId()
value, isExist := c.writeStatistics.Peek(key)
newItem := &RegionStat{
newItem := &core.RegionStat{
RegionID: region.GetId(),
WrittenBytes: region.WrittenBytes,
LastUpdateTime: time.Now(),
StoreID: region.Leader.GetStoreId(),
version: region.GetRegionEpoch().GetVersion(),
antiCount: hotRegionAntiCount,
Version: region.GetRegionEpoch().GetVersion(),
AntiCount: hotRegionAntiCount,
}

if isExist {
v = value.(*RegionStat)
v = value.(*core.RegionStat)
newItem.HotDegree = v.HotDegree + 1
}

if region.WrittenBytes < hotRegionThreshold {
if !isExist {
return
}
if v.antiCount <= 0 {
if v.AntiCount <= 0 {
c.writeStatistics.Remove(key)
return
}
// eliminate some noise
newItem.HotDegree = v.HotDegree - 1
newItem.antiCount = v.antiCount - 1
newItem.AntiCount = v.AntiCount - 1
newItem.WrittenBytes = v.WrittenBytes
}
c.writeStatistics.Put(key, newItem)
}

// RegionWriteStats returns hot region's write stats.
func (c *clusterInfo) RegionWriteStats() []*core.RegionStat {
elements := c.writeStatistics.Elems()
stats := make([]*core.RegionStat, len(elements))
for i := range elements {
stats[i] = elements[i].Value.(*core.RegionStat)
}
return stats
}

// IsRegionHot checks if a region is in hot state.
func (c *clusterInfo) IsRegionHot(id uint64) bool {
c.RLock()
defer c.RUnlock()
if stat, ok := c.writeStatistics.Peek(id); ok {
return stat.(*RegionStat).HotDegree >= hotRegionLowThreshold
return stat.(*core.RegionStat).HotDegree >= hotRegionLowThreshold
}
return false
}
Expand Down Expand Up @@ -764,7 +775,7 @@ func (c *clusterInfo) updateWriteStatus(region *core.RegionInfo) {
var WrittenBytesPerSec uint64
v, isExist := c.writeStatistics.Peek(region.GetId())
if isExist {
interval := time.Now().Sub(v.(*RegionStat).LastUpdateTime).Seconds()
interval := time.Now().Sub(v.(*core.RegionStat).LastUpdateTime).Seconds()
if interval < minHotRegionReportInterval {
return
}
Expand Down
4 changes: 2 additions & 2 deletions server/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ func heartbeatRegions(c *C, cache *clusterInfo, regions []*metapb.Region) {

c.Assert(cache.handleRegionHeartbeat(r), IsNil)

checkRegion(c, cache.getRegion(r.GetId()), r)
checkRegion(c, cache.GetRegion(r.GetId()), r)
checkRegion(c, cache.searchRegion(r.StartKey), r)

if len(r.EndKey) > 0 {
Expand All @@ -481,7 +481,7 @@ func heartbeatRegions(c *C, cache *clusterInfo, regions []*metapb.Region) {
for _, region := range regions {
r := core.NewRegionInfo(region, nil)

checkRegion(c, cache.getRegion(r.GetId()), r)
checkRegion(c, cache.GetRegion(r.GetId()), r)
checkRegion(c, cache.searchRegion(r.StartKey), r)

if len(r.EndKey) > 0 {
Expand Down
4 changes: 2 additions & 2 deletions server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ func (c *RaftCluster) GetRegionInfoByKey(regionKey []byte) *core.RegionInfo {

// GetRegionByID gets region and leader peer by regionID from cluster.
func (c *RaftCluster) GetRegionByID(regionID uint64) (*metapb.Region, *metapb.Peer) {
region := c.cachedCluster.getRegion(regionID)
region := c.cachedCluster.GetRegion(regionID)
if region == nil {
return nil, nil
}
Expand All @@ -376,7 +376,7 @@ func (c *RaftCluster) GetRegionByID(regionID uint64) (*metapb.Region, *metapb.Pe

// GetRegionInfoByID gets regionInfo by regionID from cluster.
func (c *RaftCluster) GetRegionInfoByID(regionID uint64) *core.RegionInfo {
return c.cachedCluster.getRegion(regionID)
return c.cachedCluster.GetRegion(regionID)
}

// GetRegions gets regions from cluster.
Expand Down
4 changes: 4 additions & 0 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,10 @@ func (o *scheduleOption) persist(kv *kv) error {
return kv.saveScheduleOption(o)
}

func (o *scheduleOption) GetHotRegionLowThreshold() int {
return hotRegionLowThreshold
}

// ParseUrls parse a string into multiple urls.
// Export for api.
func ParseUrls(s string) ([]url.URL, error) {
Expand Down
22 changes: 14 additions & 8 deletions server/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@ const (
scheduleIntervalFactor = 1.3

writeStatLRUMaxLen = 1000
storeHotRegionsDefaultLen = 100
hotRegionLimitFactor = 0.75
hotRegionScheduleFactor = 0.9
hotRegionMinWriteRate = 16 * 1024
regionHeartBeatReportInterval = 60
regionheartbeatSendChanCap = 1024
Expand Down Expand Up @@ -136,22 +133,31 @@ func (c *coordinator) run() {
log.Info("coordinator: Run scheduler")
c.addScheduler(schedulers.NewBalanceLeaderScheduler(c.opt), minScheduleInterval)
c.addScheduler(schedulers.NewBalanceRegionScheduler(c.opt), minScheduleInterval)
c.addScheduler(newBalanceHotRegionScheduler(c.opt), minSlowScheduleInterval)
c.addScheduler(schedulers.NewBalanceHotRegionScheduler(c.opt), minSlowScheduleInterval)
}

func (c *coordinator) stop() {
c.cancel()
c.wg.Wait()
}

func (c *coordinator) getHotWriteRegions() *StoreHotRegionInfos {
// Hack to retrive info from scheduler.
// TODO: remove it.
type hasHotStatus interface {
GetStatus() *core.StoreHotRegionInfos
}

func (c *coordinator) getHotWriteRegions() *core.StoreHotRegionInfos {
c.RLock()
defer c.RUnlock()
s, ok := c.schedulers[hotRegionScheduleName]
if !ok {
return nil
}
return s.Scheduler.(*balanceHotRegionScheduler).GetStatus()
if h, ok := s.Scheduler.(hasHotStatus); ok {
return h.GetStatus()
}
return nil
}

func (c *coordinator) getSchedulers() []string {
Expand Down Expand Up @@ -187,7 +193,7 @@ func (c *coordinator) collectHotSpotMetrics() {
if !ok {
return
}
status := s.Scheduler.(*balanceHotRegionScheduler).GetStatus()
status := s.Scheduler.(hasHotStatus).GetStatus()
for storeID, stat := range status.AsPeer {
store := fmt.Sprintf("store_%d", storeID)
totalWriteBytes := float64(stat.WrittenBytes)
Expand Down Expand Up @@ -289,7 +295,7 @@ func (c *coordinator) addOperator(op schedule.Operator) bool {
c.limiter.addOperator(op)
c.operators[regionID] = op

if region := c.cluster.getRegion(op.GetRegionID()); region != nil {
if region := c.cluster.GetRegion(op.GetRegionID()); region != nil {
if msg, _ := op.Do(region); msg != nil {
c.hbStreams.sendMsg(region, msg)
}
Expand Down
20 changes: 10 additions & 10 deletions server/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (s *testCoordinatorSuite) TestDispatch(c *C) {
stream := newMockHeartbeatStream()

// Transfer peer.
region := cluster.getRegion(1)
region := cluster.GetRegion(1)
resp := dispatchAndRecvHeartbeat(c, co, region, stream)
checkAddPeerResp(c, resp, 1)
region.Peers = append(region.Peers, resp.GetChangePeer().GetPeer())
Expand All @@ -147,7 +147,7 @@ func (s *testCoordinatorSuite) TestDispatch(c *C) {
dispatchHeartbeatNoResp(c, co, region, stream)

// Transfer leader.
region = cluster.getRegion(2)
region = cluster.GetRegion(2)
resp = dispatchAndRecvHeartbeat(c, co, region, stream)
checkTransferLeaderResp(c, resp, 2)
region.Leader = resp.GetTransferLeader().GetPeer()
Expand Down Expand Up @@ -197,7 +197,7 @@ func (s *testCoordinatorSuite) TestReplica(c *C) {

// Add peer to store 1.
tc.addLeaderRegion(1, 2, 3)
region := cluster.getRegion(1)
region := cluster.GetRegion(1)
resp := dispatchAndRecvHeartbeat(c, co, region, stream)
checkAddPeerResp(c, resp, 1)
region.Peers = append(region.Peers, resp.GetChangePeer().GetPeer())
Expand All @@ -221,7 +221,7 @@ func (s *testCoordinatorSuite) TestReplica(c *C) {

// Remove peer from store 4.
tc.addLeaderRegion(2, 1, 2, 3, 4)
region = cluster.getRegion(2)
region = cluster.GetRegion(2)
resp = dispatchAndRecvHeartbeat(c, co, region, stream)
checkRemovePeerResp(c, resp, 4)
region.RemoveStorePeer(4)
Expand Down Expand Up @@ -252,7 +252,7 @@ func (s *testCoordinatorSuite) TestPeerState(c *C) {
waitOperator(c, co, 1)
checkTransferPeer(c, co.getOperator(1), 4, 1)

region := cluster.getRegion(1)
region := cluster.GetRegion(1)

// Add new peer.
resp := dispatchAndRecvHeartbeat(c, co, region, stream)
Expand All @@ -271,7 +271,7 @@ func (s *testCoordinatorSuite) TestPeerState(c *C) {
resp = dispatchAndRecvHeartbeat(c, co, region, stream)
checkRemovePeerResp(c, resp, 4)
tc.addLeaderRegion(1, 1, 2, 3)
region = cluster.getRegion(1)
region = cluster.GetRegion(1)
dispatchHeartbeatNoResp(c, co, region, stream)
}

Expand Down Expand Up @@ -303,7 +303,7 @@ func (s *testCoordinatorSuite) TestShouldRun(c *C) {
}

for _, t := range tbl {
r := tc.getRegion(t.regionID)
r := tc.GetRegion(t.regionID)
r.Leader = r.Peers[0]
tc.handleRegionHeartbeat(r)
c.Assert(co.shouldRun(), Equals, t.shouldRun)
Expand Down Expand Up @@ -351,15 +351,15 @@ func (s *testCoordinatorSuite) TestAddScheduler(c *C) {

// Transfer all leaders to store 1.
waitOperator(c, co, 2)
region2 := cluster.getRegion(2)
region2 := cluster.GetRegion(2)
resp := dispatchAndRecvHeartbeat(c, co, region2, stream)
checkTransferLeaderResp(c, resp, 1)
region2.Leader = region2.GetStorePeer(1)
cluster.putRegion(region2)
dispatchHeartbeatNoResp(c, co, region2, stream)

waitOperator(c, co, 3)
region3 := cluster.getRegion(3)
region3 := cluster.GetRegion(3)
resp = dispatchAndRecvHeartbeat(c, co, region3, stream)
checkTransferLeaderResp(c, resp, 1)
region3.Leader = region3.GetStorePeer(1)
Expand All @@ -384,7 +384,7 @@ func (s *testCoordinatorSuite) TestRestart(c *C) {
tc.addRegionStore(3, 3)
tc.addLeaderRegion(1, 1)
cluster.activeRegions = 1
region := cluster.getRegion(1)
region := cluster.GetRegion(1)

// Add 1 replica on store 2.
co := newCoordinator(cluster, opt, hbStreams)
Expand Down
31 changes: 31 additions & 0 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package core

import (
"time"

"github.com/gogo/protobuf/proto"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
Expand Down Expand Up @@ -137,3 +139,32 @@ func (r *RegionInfo) GetFollower() *metapb.Peer {
}
return nil
}

// RegionStat records each hot region's statistics
type RegionStat struct {
RegionID uint64 `json:"region_id"`
WrittenBytes uint64 `json:"written_bytes"`
// HotDegree records the hot region update times
HotDegree int `json:"hot_degree"`
// LastUpdateTime used to calculate average write
LastUpdateTime time.Time `json:"last_update_time"`
StoreID uint64 `json:"-"`
// AntiCount used to eliminate some noise when remove region in cache
AntiCount int
// Version used to check the region split times
Version uint64
}

// RegionsStat is a list of a group region state type
type RegionsStat []RegionStat

func (m RegionsStat) Len() int { return len(m) }
func (m RegionsStat) Swap(i, j int) { m[i], m[j] = m[j], m[i] }
func (m RegionsStat) Less(i, j int) bool { return m[i].WrittenBytes < m[j].WrittenBytes }

// HotRegionsStat records all hot regions statistics
type HotRegionsStat struct {
WrittenBytes uint64 `json:"total_written_bytes"`
RegionsCount int `json:"regions_count"`
RegionsStat RegionsStat `json:"statistics"`
}
6 changes: 6 additions & 0 deletions server/core/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,3 +214,9 @@ L:
s.Labels = append(s.Labels, newLabel)
}
}

// StoreHotRegionInfos : used to get human readable description for hot regions.
type StoreHotRegionInfos struct {
AsPeer map[uint64]*HotRegionsStat `json:"as_peer"`
AsLeader map[uint64]*HotRegionsStat `json:"as_leader"`
}
Loading