Skip to content

Commit

Permalink
schedule: refactor diagnostic manager (#6771)
Browse files Browse the repository at this point in the history
ref #5839

Signed-off-by: Ryan Leung <rleungx@gmail.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
rleungx and ti-chi-bot[bot] committed Jul 10, 2023
1 parent b2a9768 commit dc0274a
Show file tree
Hide file tree
Showing 8 changed files with 354 additions and 277 deletions.
200 changes: 18 additions & 182 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"net/http"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/errors"
Expand All @@ -31,10 +30,9 @@ import (
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/schedule/checker"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/diagnostic"
"github.com/tikv/pd/pkg/schedule/hbstream"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/plan"
"github.com/tikv/pd/pkg/schedule/scatter"
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/schedule/splitter"
Expand All @@ -51,7 +49,6 @@ const (
checkSuspectRangesInterval = 100 * time.Millisecond
collectFactor = 0.9
collectTimeout = 5 * time.Minute
maxScheduleRetries = 10
maxLoadConfigRetries = 10
// pushOperatorTickInterval is the interval try to push the operator.
pushOperatorTickInterval = 500 * time.Millisecond
Expand All @@ -65,9 +62,8 @@ const (

var (
// WithLabelValues is a heavy operation, define variable to avoid call it every time.
waitingListGauge = regionListGauge.WithLabelValues("waiting_list")
priorityListGauge = regionListGauge.WithLabelValues("priority_list")
denySchedulersByLabelerCounter = labeler.LabelerEventCounter.WithLabelValues("schedulers", "deny")
waitingListGauge = regionListGauge.WithLabelValues("waiting_list")
priorityListGauge = regionListGauge.WithLabelValues("priority_list")
)

// Coordinator is used to manage all schedulers and checkers to decide if the region needs to be scheduled.
Expand All @@ -82,18 +78,18 @@ type Coordinator struct {
checkers *checker.Controller
regionScatterer *scatter.RegionScatterer
regionSplitter *splitter.RegionSplitter
schedulers map[string]*scheduleController
schedulers map[string]*schedulers.ScheduleController
opController *operator.Controller
hbStreams *hbstream.HeartbeatStreams
pluginInterface *PluginInterface
diagnosticManager *diagnosticManager
diagnosticManager *diagnostic.Manager
}

// NewCoordinator creates a new Coordinator.
func NewCoordinator(ctx context.Context, cluster sche.ClusterInformer, hbStreams *hbstream.HeartbeatStreams) *Coordinator {
ctx, cancel := context.WithCancel(ctx)
opController := operator.NewController(ctx, cluster.GetBasicCluster(), cluster.GetPersistOptions(), hbStreams)
schedulers := make(map[string]*scheduleController)
schedulers := make(map[string]*schedulers.ScheduleController)
c := &Coordinator{
ctx: ctx,
cancel: cancel,
Expand All @@ -107,7 +103,7 @@ func NewCoordinator(ctx context.Context, cluster sche.ClusterInformer, hbStreams
hbStreams: hbStreams,
pluginInterface: NewPluginInterface(),
}
c.diagnosticManager = newDiagnosticManager(c, cluster.GetPersistOptions())
c.diagnosticManager = diagnostic.NewManager(schedulers, cluster.GetPersistOptions())
return c
}

Expand Down Expand Up @@ -679,7 +675,7 @@ func (c *Coordinator) AddScheduler(scheduler schedulers.Scheduler, args ...strin
return errs.ErrSchedulerExisted.FastGenByArgs()
}

s := NewScheduleController(c, scheduler)
s := schedulers.NewScheduleController(c.ctx, c.cluster, c.opController, scheduler)
if err := s.Scheduler.Prepare(c.cluster); err != nil {
return err
}
Expand Down Expand Up @@ -756,7 +752,7 @@ func (c *Coordinator) PauseOrResumeScheduler(name string, t int64) error {
if c.cluster == nil {
return errs.ErrNotBootstrapped.FastGenByArgs()
}
var s []*scheduleController
var s []*schedulers.ScheduleController
if name != "all" {
sc, ok := c.schedulers[name]
if !ok {
Expand All @@ -775,8 +771,7 @@ func (c *Coordinator) PauseOrResumeScheduler(name string, t int64) error {
delayAt = time.Now().Unix()
delayUntil = delayAt + t
}
atomic.StoreInt64(&sc.delayAt, delayAt)
atomic.StoreInt64(&sc.delayUntil, delayUntil)
sc.SetDelay(delayAt, delayUntil)
}
return err
}
Expand Down Expand Up @@ -844,7 +839,7 @@ func (c *Coordinator) IsSchedulerExisted(name string) (bool, error) {
return true, nil
}

func (c *Coordinator) runScheduler(s *scheduleController) {
func (c *Coordinator) runScheduler(s *schedulers.ScheduleController) {
defer logutil.LogPanic()
defer c.wg.Done()
defer s.Scheduler.Cleanup(c.cluster)
Expand All @@ -854,7 +849,7 @@ func (c *Coordinator) runScheduler(s *scheduleController) {
for {
select {
case <-ticker.C:
diagnosable := s.diagnosticRecorder.isAllowed()
diagnosable := s.IsDiagnosticAllowed()
if !s.AllowSchedule(diagnosable) {
continue
}
Expand Down Expand Up @@ -948,157 +943,17 @@ func (c *Coordinator) GetCluster() sche.ClusterInformer {
}

// GetDiagnosticResult returns the diagnostic result.
func (c *Coordinator) GetDiagnosticResult(name string) (*DiagnosticResult, error) {
return c.diagnosticManager.getDiagnosticResult(name)
func (c *Coordinator) GetDiagnosticResult(name string) (*schedulers.DiagnosticResult, error) {
c.RLock()
defer c.RUnlock()
return c.diagnosticManager.GetDiagnosticResult(name)
}

// RecordOpStepWithTTL records OpStep with TTL
func (c *Coordinator) RecordOpStepWithTTL(regionID uint64) {
c.GetRuleChecker().RecordRegionPromoteToNonWitness(regionID)
}

// scheduleController is used to manage a scheduler to schedulers.
type scheduleController struct {
schedulers.Scheduler
cluster sche.ScheduleCluster
opController *operator.Controller
nextInterval time.Duration
ctx context.Context
cancel context.CancelFunc
delayAt int64
delayUntil int64
diagnosticRecorder *diagnosticRecorder
}

// NewScheduleController creates a new scheduleController.
func NewScheduleController(c *Coordinator, s schedulers.Scheduler) *scheduleController {
ctx, cancel := context.WithCancel(c.ctx)
return &scheduleController{
Scheduler: s,
cluster: c.cluster,
opController: c.opController,
nextInterval: s.GetMinInterval(),
ctx: ctx,
cancel: cancel,
diagnosticRecorder: c.diagnosticManager.getRecorder(s.GetName()),
}
}

func (s *scheduleController) Ctx() context.Context {
return s.ctx
}

func (s *scheduleController) Stop() {
s.cancel()
}

func (s *scheduleController) Schedule(diagnosable bool) []*operator.Operator {
for i := 0; i < maxScheduleRetries; i++ {
// no need to retry if schedule should stop to speed exit
select {
case <-s.ctx.Done():
return nil
default:
}
cacheCluster := newCacheCluster(s.cluster)
// we need only process diagnostic once in the retry loop
diagnosable = diagnosable && i == 0
ops, plans := s.Scheduler.Schedule(cacheCluster, diagnosable)
if diagnosable {
s.diagnosticRecorder.setResultFromPlans(ops, plans)
}
foundDisabled := false
for _, op := range ops {
if labelMgr := s.cluster.GetRegionLabeler(); labelMgr != nil {
region := s.cluster.GetRegion(op.RegionID())
if region == nil {
continue
}
if labelMgr.ScheduleDisabled(region) {
denySchedulersByLabelerCounter.Inc()
foundDisabled = true
break
}
}
}
if len(ops) > 0 {
// If we have schedule, reset interval to the minimal interval.
s.nextInterval = s.Scheduler.GetMinInterval()
// try regenerating operators
if foundDisabled {
continue
}
return ops
}
}
s.nextInterval = s.Scheduler.GetNextInterval(s.nextInterval)
return nil
}

func (s *scheduleController) DiagnoseDryRun() ([]*operator.Operator, []plan.Plan) {
cacheCluster := newCacheCluster(s.cluster)
return s.Scheduler.Schedule(cacheCluster, true)
}

// GetInterval returns the interval of scheduling for a scheduler.
func (s *scheduleController) GetInterval() time.Duration {
return s.nextInterval
}

// SetInterval sets the interval of scheduling for a scheduler. for test purpose.
func (s *scheduleController) SetInterval(interval time.Duration) {
s.nextInterval = interval
}

// AllowSchedule returns if a scheduler is allowed to schedulers.
func (s *scheduleController) AllowSchedule(diagnosable bool) bool {
if !s.Scheduler.IsScheduleAllowed(s.cluster) {
if diagnosable {
s.diagnosticRecorder.setResultFromStatus(pending)
}
return false
}
if s.isSchedulingHalted() {
if diagnosable {
s.diagnosticRecorder.setResultFromStatus(halted)
}
return false
}
if s.IsPaused() {
if diagnosable {
s.diagnosticRecorder.setResultFromStatus(paused)
}
return false
}
return true
}

func (s *scheduleController) isSchedulingHalted() bool {
return s.cluster.GetOpts().IsSchedulingHalted()
}

// isPaused returns if a scheduler is paused.
func (s *scheduleController) IsPaused() bool {
delayUntil := atomic.LoadInt64(&s.delayUntil)
return time.Now().Unix() < delayUntil
}

// getDelayAt returns paused timestamp of a paused scheduler
func (s *scheduleController) getDelayAt() int64 {
if s.IsPaused() {
return atomic.LoadInt64(&s.delayAt)
}
return 0
}

// getDelayUntil returns resume timestamp of a paused scheduler
func (s *scheduleController) getDelayUntil() int64 {
if s.IsPaused() {
return atomic.LoadInt64(&s.delayUntil)
}
return 0
}

// GetPausedSchedulerDelayAt returns paused timestamp of a paused scheduler
func (c *Coordinator) GetPausedSchedulerDelayAt(name string) (int64, error) {
c.RLock()
Expand All @@ -1110,7 +965,7 @@ func (c *Coordinator) GetPausedSchedulerDelayAt(name string) (int64, error) {
if !ok {
return -1, errs.ErrSchedulerNotFound.FastGenByArgs()
}
return s.getDelayAt(), nil
return s.GetDelayAt(), nil
}

// GetPausedSchedulerDelayUntil returns the delay time until the scheduler is paused.
Expand All @@ -1124,7 +979,7 @@ func (c *Coordinator) GetPausedSchedulerDelayUntil(name string) (int64, error) {
if !ok {
return -1, errs.ErrSchedulerNotFound.FastGenByArgs()
}
return s.getDelayUntil(), nil
return s.GetDelayUntil(), nil
}

// CheckTransferWitnessLeader determines if transfer leader is required, then sends to the scheduler if needed
Expand All @@ -1142,22 +997,3 @@ func (c *Coordinator) CheckTransferWitnessLeader(region *core.RegionInfo) {
}
}
}

// cacheCluster include cache info to improve the performance.
type cacheCluster struct {
sche.ScheduleCluster
stores []*core.StoreInfo
}

// GetStores returns store infos from cache
func (c *cacheCluster) GetStores() []*core.StoreInfo {
return c.stores
}

// newCacheCluster constructor for cache
func newCacheCluster(c sche.ScheduleCluster) *cacheCluster {
return &cacheCluster{
ScheduleCluster: c,
stores: c.GetStores(),
}
}
79 changes: 79 additions & 0 deletions pkg/schedule/diagnostic/diagnostic_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2022 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package diagnostic

import (
"time"

"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/server/config"
)

// Manager is used to manage the diagnostic result of schedulers for now.
type Manager struct {
config *config.PersistOptions
schedulerController map[string]*schedulers.ScheduleController
}

// NewManager creates a new Manager.
func NewManager(schedulerController map[string]*schedulers.ScheduleController, config *config.PersistOptions) *Manager {
return &Manager{
config: config,
schedulerController: schedulerController,
}
}

// GetDiagnosticResult gets the diagnostic result of the scheduler.
func (d *Manager) GetDiagnosticResult(name string) (*schedulers.DiagnosticResult, error) {
if !d.config.IsDiagnosticAllowed() {
return nil, errs.ErrDiagnosticDisabled
}

scheduler, isSchedulerExisted := d.schedulerController[name]
if !isSchedulerExisted {
ts := uint64(time.Now().Unix())
res := &schedulers.DiagnosticResult{Name: name, Timestamp: ts, Status: schedulers.Disabled}
return res, nil
}
var isDisabled bool
t := scheduler.Scheduler.GetType()
scheduleConfig := d.config.GetScheduleConfig()
for _, s := range scheduleConfig.Schedulers {
if t == s.Type {
isDisabled = s.Disable
break
}
}
if isDisabled {
ts := uint64(time.Now().Unix())
res := &schedulers.DiagnosticResult{Name: name, Timestamp: ts, Status: schedulers.Disabled}
return res, nil
}

recorder := d.getSchedulerRecorder(name)
if recorder == nil {
return nil, errs.ErrSchedulerUndiagnosable.FastGenByArgs(name)
}
result := recorder.GetLastResult()
if result == nil {
return nil, errs.ErrNoDiagnosticResult.FastGenByArgs(name)
}
return result, nil
}

func (d *Manager) getSchedulerRecorder(name string) *schedulers.DiagnosticRecorder {
return d.schedulerController[name].GetDiagnosticRecorder()
}

0 comments on commit dc0274a

Please sign in to comment.