Skip to content

Commit

Permalink
*: implement the scheduling rule watcher (#6878)
Browse files Browse the repository at this point in the history
ref #5839

- Implement the scheduling rule watcher.
- Add the tests.
- Refine some code.

Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato committed Aug 3, 2023
1 parent 6377b26 commit 27a3cc5
Show file tree
Hide file tree
Showing 15 changed files with 604 additions and 59 deletions.
2 changes: 1 addition & 1 deletion pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func (manager *Manager) splitKeyspaceRegion(id uint32, waitRegionSplit bool) (er
})

start := time.Now()
keyspaceRule := makeLabelRule(id)
keyspaceRule := MakeLabelRule(id)
cl, ok := manager.cluster.(interface{ GetRegionLabeler() *labeler.RegionLabeler })
if !ok {
return errors.New("cluster does not support region label")
Expand Down
4 changes: 2 additions & 2 deletions pkg/keyspace/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ func getRegionLabelID(id uint32) string {
return regionLabelIDPrefix + strconv.FormatUint(uint64(id), endpoint.SpaceIDBase)
}

// makeLabelRule makes the label rule for the given keyspace id.
func makeLabelRule(id uint32) *labeler.LabelRule {
// MakeLabelRule makes the label rule for the given keyspace id.
func MakeLabelRule(id uint32) *labeler.LabelRule {
return &labeler.LabelRule{
ID: getRegionLabelID(id),
Index: 0,
Expand Down
2 changes: 1 addition & 1 deletion pkg/keyspace/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,6 @@ func TestMakeLabelRule(t *testing.T) {
},
}
for _, testCase := range testCases {
re.Equal(testCase.expectedLabelRule, makeLabelRule(testCase.id))
re.Equal(testCase.expectedLabelRule, MakeLabelRule(testCase.id))
}
}
File renamed without changes.
227 changes: 227 additions & 0 deletions pkg/mcs/scheduling/server/rule/watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package rule

import (
"context"
"sync"

"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/etcdutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
)

// ruleStorage is an in-memory storage for Placement Rules,
// which will implement the `endpoint.RuleStorage` interface.
type ruleStorage struct {
// Rule key -> rule value.
rules sync.Map
// GroupID -> rule group value.
groups sync.Map
// Region rule key -> rule value.
regionRules sync.Map
}

// LoadRules loads Placement Rules from storage.
func (rs *ruleStorage) LoadRules(f func(k, v string)) error {
rs.rules.Range(func(k, v interface{}) bool {
f(k.(string), v.(string))
return true
})
return nil
}

// SaveRule stores a rule cfg to the rulesPath.
func (rs *ruleStorage) SaveRule(ruleKey string, rule interface{}) error {
rs.rules.Store(ruleKey, rule)
return nil
}

// DeleteRule removes a rule from storage.
func (rs *ruleStorage) DeleteRule(ruleKey string) error {
rs.rules.Delete(ruleKey)
return nil
}

// LoadRuleGroups loads all rule groups from storage.
func (rs *ruleStorage) LoadRuleGroups(f func(k, v string)) error {
rs.groups.Range(func(k, v interface{}) bool {
f(k.(string), v.(string))
return true
})
return nil
}

// SaveRuleGroup stores a rule group config to storage.
func (rs *ruleStorage) SaveRuleGroup(groupID string, group interface{}) error {
rs.groups.Store(groupID, group)
return nil
}

// DeleteRuleGroup removes a rule group from storage.
func (rs *ruleStorage) DeleteRuleGroup(groupID string) error {
rs.groups.Delete(groupID)
return nil
}

// LoadRegionRules loads region rules from storage.
func (rs *ruleStorage) LoadRegionRules(f func(k, v string)) error {
rs.regionRules.Range(func(k, v interface{}) bool {
f(k.(string), v.(string))
return true
})
return nil
}

// SaveRegionRule saves a region rule to the storage.
func (rs *ruleStorage) SaveRegionRule(ruleKey string, rule interface{}) error {
rs.regionRules.Store(ruleKey, rule)
return nil
}

// DeleteRegionRule removes a region rule from storage.
func (rs *ruleStorage) DeleteRegionRule(ruleKey string) error {
rs.regionRules.Delete(ruleKey)
return nil
}

// Watcher is used to watch the PD API server for any Placement Rule changes.
type Watcher struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup

etcdClient *clientv3.Client
ruleStore *ruleStorage

ruleWatcher *etcdutil.LoopWatcher
groupWatcher *etcdutil.LoopWatcher
labelWatcher *etcdutil.LoopWatcher
}

// NewWatcher creates a new watcher to watch the Placement Rule change from PD API server.
// Please use `GetRuleStorage` to get the underlying storage to access the Placement Rules.
func NewWatcher(
ctx context.Context,
etcdClient *clientv3.Client,
// rulePath:
// - Key: /pd/{cluster_id}/rules/{group_id}-{rule_id}
// - Value: placement.Rule
// ruleGroupPath:
// - Key: /pd/{cluster_id}/rule_group/{group_id}
// - Value: placement.RuleGroup
// regionLabelPath:
// - Key: /pd/{cluster_id}/region_label/{rule_id}
// - Value: labeler.LabelRule
rulesPath, ruleGroupPath, regionLabelPath string,
) (*Watcher, error) {
ctx, cancel := context.WithCancel(ctx)
rw := &Watcher{
ctx: ctx,
cancel: cancel,
etcdClient: etcdClient,
ruleStore: &ruleStorage{},
}
err := rw.initializeRuleWatcher(rulesPath)
if err != nil {
return nil, err
}
err = rw.initializeGroupWatcher(ruleGroupPath)
if err != nil {
return nil, err
}
err = rw.initializeRegionLabelWatcher(regionLabelPath)
if err != nil {
return nil, err
}
return rw, nil
}

func (rw *Watcher) initializeRuleWatcher(rulePath string) error {
putFn := func(kv *mvccpb.KeyValue) error {
// Since the PD API server will validate the rule before saving it to etcd,
// so we could directly save the string rule in JSON to the storage here.
return rw.ruleStore.SaveRule(string(kv.Key), string(kv.Value))
}
deleteFn := func(kv *mvccpb.KeyValue) error {
return rw.ruleStore.DeleteRule(string(kv.Key))
}
postEventFn := func() error {
return nil
}
rw.ruleWatcher = etcdutil.NewLoopWatcher(
rw.ctx, &rw.wg,
rw.etcdClient,
"scheduling-rule-watcher", rulePath,
putFn, deleteFn, postEventFn,
clientv3.WithPrefix(),
)
rw.ruleWatcher.StartWatchLoop()
return rw.ruleWatcher.WaitLoad()
}

func (rw *Watcher) initializeGroupWatcher(ruleGroupPath string) error {
putFn := func(kv *mvccpb.KeyValue) error {
return rw.ruleStore.SaveRuleGroup(string(kv.Key), string(kv.Value))
}
deleteFn := func(kv *mvccpb.KeyValue) error {
return rw.ruleStore.DeleteRuleGroup(string(kv.Key))
}
postEventFn := func() error {
return nil
}
rw.groupWatcher = etcdutil.NewLoopWatcher(
rw.ctx, &rw.wg,
rw.etcdClient,
"scheduling-rule-group-watcher", ruleGroupPath,
putFn, deleteFn, postEventFn,
clientv3.WithPrefix(),
)
rw.groupWatcher.StartWatchLoop()
return rw.groupWatcher.WaitLoad()
}

func (rw *Watcher) initializeRegionLabelWatcher(regionLabelPath string) error {
putFn := func(kv *mvccpb.KeyValue) error {
return rw.ruleStore.SaveRegionRule(string(kv.Key), string(kv.Value))
}
deleteFn := func(kv *mvccpb.KeyValue) error {
return rw.ruleStore.DeleteRegionRule(string(kv.Key))
}
postEventFn := func() error {
return nil
}
rw.labelWatcher = etcdutil.NewLoopWatcher(
rw.ctx, &rw.wg,
rw.etcdClient,
"scheduling-region-label-watcher", regionLabelPath,
putFn, deleteFn, postEventFn,
clientv3.WithPrefix(),
)
rw.labelWatcher.StartWatchLoop()
return rw.labelWatcher.WaitLoad()
}

// Close closes the watcher.
func (rw *Watcher) Close() {
rw.cancel()
rw.wg.Wait()
}

// GetRuleStorage returns the rule storage.
func (rw *Watcher) GetRuleStorage() endpoint.RuleStorage {
return rw.ruleStore
}
39 changes: 34 additions & 5 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/discovery"
"github.com/tikv/pd/pkg/mcs/scheduling/server/config"
"github.com/tikv/pd/pkg/mcs/scheduling/server/rule"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/storage/endpoint"
Expand Down Expand Up @@ -71,11 +72,12 @@ type Server struct {
serverLoopCancel func()
serverLoopWg sync.WaitGroup

cfg *config.Config
name string
clusterID uint64
listenURL *url.URL
backendUrls []url.URL
cfg *config.Config
name string
clusterID uint64
listenURL *url.URL
backendUrls []url.URL
persistConfig *config.PersistConfig

// etcd client
etcdClient *clientv3.Client
Expand Down Expand Up @@ -107,6 +109,10 @@ type Server struct {

cluster *Cluster
storage *endpoint.StorageEndpoint

// for watching the PD API server meta info updates that are related to the scheduling.
configWatcher *config.Watcher
ruleWatcher *rule.Watcher
}

// Name returns the unique etcd name for this server in etcd cluster.
Expand Down Expand Up @@ -494,6 +500,10 @@ func (s *Server) startServer() (err error) {
if err != nil {
return err
}
err = s.startWatcher()
if err != nil {
return err
}

serverReadyChan := make(chan struct{})
defer close(serverReadyChan)
Expand Down Expand Up @@ -524,12 +534,31 @@ func (s *Server) startServer() (err error) {
return nil
}

func (s *Server) startWatcher() (err error) {
s.configWatcher, err = config.NewWatcher(
s.ctx, s.etcdClient,
endpoint.ConfigPath(s.clusterID),
s.persistConfig,
)
if err != nil {
return err
}
s.ruleWatcher, err = rule.NewWatcher(
s.ctx, s.etcdClient,
endpoint.RulesPath(s.clusterID),
endpoint.RuleGroupPath(s.clusterID),
endpoint.RegionLabelPath(s.clusterID),
)
return err
}

// CreateServer creates the Server
func CreateServer(ctx context.Context, cfg *config.Config) *Server {
svr := &Server{
DiagnosticsServer: sysutil.NewDiagnosticsServer(cfg.Log.File.Filename),
startTimestamp: time.Now().Unix(),
cfg: cfg,
persistConfig: config.NewPersistConfig(cfg),
ctx: ctx,
}
return svr
Expand Down
21 changes: 15 additions & 6 deletions pkg/schedule/labeler/labeler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package labeler

import (
"context"
"encoding/json"
"strings"
"time"

Expand Down Expand Up @@ -107,18 +106,19 @@ func (l *RegionLabeler) checkAndClearExpiredLabels() {
func (l *RegionLabeler) loadRules() error {
var toDelete []string
err := l.storage.LoadRegionRules(func(k, v string) {
var r LabelRule
if err := json.Unmarshal([]byte(v), &r); err != nil {
r, err := NewLabelRuleFromJSON([]byte(v))
if err != nil {
log.Error("failed to unmarshal label rule value", zap.String("rule-key", k), zap.String("rule-value", v), errs.ZapError(errs.ErrLoadRule))
toDelete = append(toDelete, k)
return
}
if err := r.checkAndAdjust(); err != nil {
err = r.checkAndAdjust()
if err != nil {
log.Error("failed to adjust label rule", zap.String("rule-key", k), zap.String("rule-value", v), zap.Error(err))
toDelete = append(toDelete, k)
return
}
l.labelRules[r.ID] = &r
l.labelRules[r.ID] = r
})
if err != nil {
return err
Expand Down Expand Up @@ -298,7 +298,7 @@ func (l *RegionLabeler) GetRegionLabel(region *core.RegionInfo, key string) stri
// ScheduleDisabled returns true if the region is lablelld with schedule-disabled.
func (l *RegionLabeler) ScheduleDisabled(region *core.RegionInfo) bool {
v := l.GetRegionLabel(region, scheduleOptionLabel)
return strings.EqualFold(v, scheduleOptioonValueDeny)
return strings.EqualFold(v, scheduleOptionValueDeny)
}

// GetRegionLabels returns the labels of the region.
Expand Down Expand Up @@ -335,3 +335,12 @@ func (l *RegionLabeler) GetRegionLabels(region *core.RegionInfo) []*RegionLabel
}
return result
}

// MakeKeyRanges is a helper function to make key ranges.
func MakeKeyRanges(keys ...string) []interface{} {
var res []interface{}
for i := 0; i < len(keys); i += 2 {
res = append(res, map[string]interface{}{"start_key": keys[i], "end_key": keys[i+1]})
}
return res
}

0 comments on commit 27a3cc5

Please sign in to comment.