Skip to content

Commit

Permalink
Add startWatcher into startServer
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato committed Aug 3, 2023
1 parent b56a78b commit cf8bf5b
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 27 deletions.
File renamed without changes.
File renamed without changes.
39 changes: 34 additions & 5 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/discovery"
"github.com/tikv/pd/pkg/mcs/scheduling/server/config"
"github.com/tikv/pd/pkg/mcs/scheduling/server/rule"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/storage/endpoint"
Expand Down Expand Up @@ -71,11 +72,12 @@ type Server struct {
serverLoopCancel func()
serverLoopWg sync.WaitGroup

cfg *config.Config
name string
clusterID uint64
listenURL *url.URL
backendUrls []url.URL
cfg *config.Config
name string
clusterID uint64
listenURL *url.URL
backendUrls []url.URL
persistConfig *config.PersistConfig

// etcd client
etcdClient *clientv3.Client
Expand Down Expand Up @@ -107,6 +109,10 @@ type Server struct {

cluster *Cluster
storage *endpoint.StorageEndpoint

// for watching the PD API server meta info updates that are related to the scheduling.
configWatcher *config.Watcher
ruleWatcher *rule.Watcher
}

// Name returns the unique etcd name for this server in etcd cluster.
Expand Down Expand Up @@ -494,6 +500,10 @@ func (s *Server) startServer() (err error) {
if err != nil {
return err
}
err = s.startWatcher()
if err != nil {
return err
}

serverReadyChan := make(chan struct{})
defer close(serverReadyChan)
Expand Down Expand Up @@ -524,12 +534,31 @@ func (s *Server) startServer() (err error) {
return nil
}

func (s *Server) startWatcher() (err error) {
s.configWatcher, err = config.NewWatcher(
s.ctx, s.etcdClient,
endpoint.ConfigPath(s.clusterID),
s.persistConfig,
)
if err != nil {
return err
}
s.ruleWatcher, err = rule.NewWatcher(
s.ctx, s.etcdClient,
endpoint.RulesPath(s.clusterID),
endpoint.RuleGroupPath(s.clusterID),
endpoint.RegionLabelPath(s.clusterID),
)
return err
}

// CreateServer creates the Server
func CreateServer(ctx context.Context, cfg *config.Config) *Server {
svr := &Server{
DiagnosticsServer: sysutil.NewDiagnosticsServer(cfg.Log.File.Filename),
startTimestamp: time.Now().Unix(),
cfg: cfg,
persistConfig: config.NewPersistConfig(cfg),
ctx: ctx,
}
return svr
Expand Down
42 changes: 27 additions & 15 deletions pkg/storage/endpoint/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,15 @@ import (
)

const (
pdRootPath = "/pd"
clusterPath = "raft"
configPath = "config"
serviceMiddlewarePath = "service_middleware"
schedulePath = "schedule"
gcPath = "gc"
// RulesPath is the prefix of the Placement Rules storage endpoint.
RulesPath = "rules"
// RuleGroupPath is the prefix of the Placement Rule Groups storage endpoint.
RuleGroupPath = "rule_group"
// RegionLabelPath is the prefix of the Region Label storage endpoint.
RegionLabelPath = "region_label"
pdRootPath = "/pd"
clusterPath = "raft"
configPath = "config"
serviceMiddlewarePath = "service_middleware"
schedulePath = "schedule"
gcPath = "gc"
rulesPath = "rules"
ruleGroupPath = "rule_group"
regionLabelPath = "region_label"
replicationPath = "replication_mode"
customScheduleConfigPath = "scheduler_config"
// GCWorkerServiceSafePointID is the service id of GC worker.
Expand Down Expand Up @@ -95,6 +92,21 @@ func ConfigPath(clusterID uint64) string {
return path.Join(PDRootPath(clusterID), configPath)
}

// RulesPath returns the path to save the placement rules.
func RulesPath(clusterID uint64) string {
return path.Join(PDRootPath(clusterID), rulesPath)
}

// RuleGroupPath returns the path to save the placement rule groups.
func RuleGroupPath(clusterID uint64) string {
return path.Join(PDRootPath(clusterID), ruleGroupPath)
}

// RegionLabelPath returns the path to save the region label.
func RegionLabelPath(clusterID uint64) string {
return path.Join(PDRootPath(clusterID), regionLabelPath)
}

func scheduleConfigPath(scheduleName string) string {
return path.Join(customScheduleConfigPath, scheduleName)
}
Expand Down Expand Up @@ -146,15 +158,15 @@ func resourceGroupStateKeyPath(groupName string) string {
}

func ruleKeyPath(ruleKey string) string {
return path.Join(RulesPath, ruleKey)
return path.Join(rulesPath, ruleKey)
}

func ruleGroupIDPath(groupID string) string {
return path.Join(RuleGroupPath, groupID)
return path.Join(ruleGroupPath, groupID)
}

func regionLabelKeyPath(ruleKey string) string {
return path.Join(RegionLabelPath, ruleKey)
return path.Join(regionLabelPath, ruleKey)
}

func replicationModePath(mode string) string {
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/endpoint/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (se *StorageEndpoint) DeleteRule(ruleKey string) error {

// LoadRuleGroups loads all rule groups from storage.
func (se *StorageEndpoint) LoadRuleGroups(f func(k, v string)) error {
return se.loadRangeByPrefix(RuleGroupPath+"/", f)
return se.loadRangeByPrefix(ruleGroupPath+"/", f)
}

// SaveRuleGroup stores a rule group config to storage.
Expand All @@ -62,7 +62,7 @@ func (se *StorageEndpoint) DeleteRuleGroup(groupID string) error {

// LoadRegionRules loads region rules from storage.
func (se *StorageEndpoint) LoadRegionRules(f func(k, v string)) error {
return se.loadRangeByPrefix(RegionLabelPath+"/", f)
return se.loadRangeByPrefix(regionLabelPath+"/", f)
}

// SaveRegionRule saves a region rule to the storage.
Expand All @@ -77,7 +77,7 @@ func (se *StorageEndpoint) DeleteRegionRule(ruleKey string) error {

// LoadRules loads placement rules from storage.
func (se *StorageEndpoint) LoadRules(f func(k, v string)) error {
return se.loadRangeByPrefix(RulesPath+"/", f)
return se.loadRangeByPrefix(rulesPath+"/", f)
}

// loadRangeByPrefix iterates all key-value pairs in the storage that has the prefix.
Expand Down
8 changes: 4 additions & 4 deletions tests/integrations/mcs/scheduling/rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,13 @@ func (suite *ruleTestSuite) TestRuleWatch() {
re := suite.Require()

// Create a rule watcher.
rootPath := endpoint.PDRootPath(suite.cluster.GetCluster().GetId())
clusterID := suite.cluster.GetCluster().GetId()
watcher, err := rule.NewWatcher(
suite.ctx,
suite.pdLeaderServer.GetEtcdClient(),
endpoint.AppendToRootPath(rootPath, endpoint.RulesPath),
endpoint.AppendToRootPath(rootPath, endpoint.RuleGroupPath),
endpoint.AppendToRootPath(rootPath, endpoint.RegionLabelPath),
endpoint.RulesPath(clusterID),
endpoint.RuleGroupPath(clusterID),
endpoint.RegionLabelPath(clusterID),
)
re.NoError(err)
ruleStorage := watcher.GetRuleStorage()
Expand Down

0 comments on commit cf8bf5b

Please sign in to comment.