Skip to content

Commit

Permalink
checkers: reduce the duplicated logic (#5560)
Browse files Browse the repository at this point in the history
ref #4399

Signed-off-by: Ryan Leung <rleungx@gmail.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
rleungx and ti-chi-bot committed Oct 8, 2022
1 parent c2b32e0 commit b326e4f
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 71 deletions.
128 changes: 57 additions & 71 deletions server/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,10 @@ func (c *coordinator) patrolRegions() {

log.Info("coordinator starts patrol regions")
start := time.Now()
var key []byte
var (
key []byte
regions []*core.RegionInfo
)
for {
select {
case <-timer.C:
Expand All @@ -137,34 +140,10 @@ func (c *coordinator) patrolRegions() {
// Check regions in the waiting list
c.checkWaitingRegions()

regions := c.cluster.ScanRegions(key, nil, patrolScanRegionLimit)
key, regions = c.checkRegions(key)
if len(regions) == 0 {
// Resets the scan key.
key = nil
continue
}

for _, region := range regions {
// Skips the region if there is already a pending operator.
if c.opController.GetOperator(region.GetID()) != nil {
continue
}

ops := c.checkers.CheckRegion(region)

key = region.GetEndKey()
if len(ops) == 0 {
continue
}

if !c.opController.ExceedStoreLimit(ops...) {
c.opController.AddWaitingOperator(ops...)
c.checkers.RemoveWaitingRegion(region.GetID())
c.checkers.RemoveSuspectRegion(region.GetID())
} else {
c.checkers.AddWaitingRegion(region)
}
}
// Updates the label level isolation statistics.
c.cluster.updateRegionsLabelLevelStats(regions)
if len(key) == 0 {
Expand All @@ -177,6 +156,37 @@ func (c *coordinator) patrolRegions() {
}
}

func (c *coordinator) checkRegions(startKey []byte) (key []byte, regions []*core.RegionInfo) {
regions = c.cluster.ScanRegions(startKey, nil, patrolScanRegionLimit)
if len(regions) == 0 {
// Resets the scan key.
key = nil
return
}

for _, region := range regions {
c.tryAddOperators(region)
key = region.GetEndKey()
}
return
}

func (c *coordinator) checkSuspectRegions() {
for _, id := range c.checkers.GetSuspectRegions() {
region := c.cluster.GetRegion(id)
c.tryAddOperators(region)
}
}

func (c *coordinator) checkWaitingRegions() {
items := c.checkers.GetWaitingRegions()
regionListGauge.WithLabelValues("waiting_list").Set(float64(len(items)))
for _, item := range items {
region := c.cluster.GetRegion(item.Key)
c.tryAddOperators(region)
}
}

// checkPriorityRegions checks priority regions
func (c *coordinator) checkPriorityRegions() {
items := c.checkers.GetPriorityRegions()
Expand All @@ -202,29 +212,6 @@ func (c *coordinator) checkPriorityRegions() {
}
}

func (c *coordinator) checkSuspectRegions() {
for _, id := range c.checkers.GetSuspectRegions() {
region := c.cluster.GetRegion(id)
if region == nil {
// the region could be recent split, continue to wait.
continue
}
if c.opController.GetOperator(id) != nil {
c.checkers.RemoveSuspectRegion(id)
continue
}
ops := c.checkers.CheckRegion(region)
if len(ops) == 0 {
continue
}

if !c.opController.ExceedStoreLimit(ops...) {
c.opController.AddWaitingOperator(ops...)
c.checkers.RemoveSuspectRegion(region.GetID())
}
}
}

// checkSuspectRanges would pop one suspect key range group
// The regions of new version key range and old version key range would be placed into
// the suspect regions map
Expand Down Expand Up @@ -264,29 +251,28 @@ func (c *coordinator) checkSuspectRanges() {
}
}

func (c *coordinator) checkWaitingRegions() {
items := c.checkers.GetWaitingRegions()
regionListGauge.WithLabelValues("waiting_list").Set(float64(len(items)))
for _, item := range items {
id := item.Key
region := c.cluster.GetRegion(id)
if region == nil {
// the region could be recent split, continue to wait.
continue
}
if c.opController.GetOperator(id) != nil {
c.checkers.RemoveWaitingRegion(id)
continue
}
ops := c.checkers.CheckRegion(region)
if len(ops) == 0 {
continue
}
func (c *coordinator) tryAddOperators(region *core.RegionInfo) {
if region == nil {
// the region could be recent split, continue to wait.
return
}
id := region.GetID()
if c.opController.GetOperator(id) != nil {
c.checkers.RemoveWaitingRegion(id)
c.checkers.RemoveSuspectRegion(id)
return
}
ops := c.checkers.CheckRegion(region)
if len(ops) == 0 {
return
}

if !c.opController.ExceedStoreLimit(ops...) {
c.opController.AddWaitingOperator(ops...)
c.checkers.RemoveWaitingRegion(region.GetID())
}
if !c.opController.ExceedStoreLimit(ops...) {
c.opController.AddWaitingOperator(ops...)
c.checkers.RemoveWaitingRegion(id)
c.checkers.RemoveSuspectRegion(id)
} else {
c.checkers.AddWaitingRegion(region)
}
}

Expand Down
1 change: 1 addition & 0 deletions tools/pd-simulator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ func run(simCase string, simConfig *simulator.SimConfig) {

func runMetrics() {
http.Handle("/metrics", promhttp.Handler())
// nolint
http.ListenAndServe(*statusAddress, nil)
}

Expand Down

0 comments on commit b326e4f

Please sign in to comment.