Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduling/config: watch StoreConfig in scheduling config watcher #6921

Merged
merged 1 commit into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 18 additions & 0 deletions pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/tikv/pd/pkg/utils/configutil"
"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/metricutil"
"github.com/tikv/pd/server/config"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -193,6 +194,7 @@ type PersistConfig struct {
clusterVersion unsafe.Pointer
schedule atomic.Value
replication atomic.Value
storeConfig atomic.Value
}

// NewPersistConfig creates a new PersistConfig instance.
Expand All @@ -201,6 +203,9 @@ func NewPersistConfig(cfg *Config) *PersistConfig {
o.SetClusterVersion(&cfg.ClusterVersion)
o.schedule.Store(&cfg.Schedule)
o.replication.Store(&cfg.Replication)
// storeConfig will be fetched from TiKV by PD API server,
// so we just set an empty value here first.
o.storeConfig.Store(&config.StoreConfig{})
return o
}

Expand Down Expand Up @@ -234,6 +239,19 @@ func (o *PersistConfig) SetReplicationConfig(cfg *sc.ReplicationConfig) {
o.replication.Store(cfg)
}

// SetStoreConfig sets the TiKV store configuration.
func (o *PersistConfig) SetStoreConfig(cfg *config.StoreConfig) {
// Some of the fields won't be persisted and watched,
// so we need to adjust it here before storing it.
cfg.Adjust()
o.storeConfig.Store(cfg)
}

// GetStoreConfig returns the TiKV store configuration.
func (o *PersistConfig) GetStoreConfig() *config.StoreConfig {
return o.storeConfig.Load().(*config.StoreConfig)
}

// GetMaxReplicas returns the max replicas.
func (o *PersistConfig) GetMaxReplicas() int {
return int(o.GetReplicationConfig().MaxReplicas)
Expand Down
7 changes: 5 additions & 2 deletions pkg/mcs/scheduling/server/config/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/log"
sc "github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/server/config"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.uber.org/zap"
Expand All @@ -42,9 +43,10 @@ type Watcher struct {
}

type persistedConfig struct {
ClusterVersion semver.Version `json:"cluster-version"`
Schedule sc.ScheduleConfig `json:"schedule"`
Replication sc.ReplicationConfig `json:"replication"`
ClusterVersion semver.Version `json:"cluster-version"`
Store config.StoreConfig `json:"store"`
}

// NewWatcher creates a new watcher to watch the config meta change from PD API server.
Expand All @@ -71,9 +73,10 @@ func NewWatcher(
zap.String("event-kv-key", string(kv.Key)), zap.Error(err))
return err
}
cw.SetClusterVersion(&cfg.ClusterVersion)
cw.SetScheduleConfig(&cfg.Schedule)
cw.SetReplicationConfig(&cfg.Replication)
cw.SetClusterVersion(&cfg.ClusterVersion)
cw.SetStoreConfig(&cfg.Store)
return nil
}
deleteFn := func(kv *mvccpb.KeyValue) error {
Expand Down
21 changes: 18 additions & 3 deletions tests/integrations/mcs/scheduling/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/pkg/versioninfo"
severcfg "github.com/tikv/pd/server/config"
"github.com/tikv/pd/tests"
)

Expand Down Expand Up @@ -81,17 +82,31 @@ func (suite *configTestSuite) TestConfigWatch() {
re.Equal(sc.DefaultSplitMergeInterval, watcher.GetScheduleConfig().SplitMergeInterval.Duration)
re.Equal("0.0.0", watcher.GetClusterVersion().String())
// Update the config and check if the scheduling config watcher can get the latest value.
suite.pdLeaderServer.GetPersistOptions().SetMaxReplicas(5)
persistOpts := suite.pdLeaderServer.GetPersistOptions()
persistOpts.SetMaxReplicas(5)
persistConfig(re, suite.pdLeaderServer)
testutil.Eventually(re, func() bool {
return watcher.GetReplicationConfig().MaxReplicas == 5
})
suite.pdLeaderServer.GetPersistOptions().SetSplitMergeInterval(2 * sc.DefaultSplitMergeInterval)
persistOpts.SetSplitMergeInterval(2 * sc.DefaultSplitMergeInterval)
persistConfig(re, suite.pdLeaderServer)
testutil.Eventually(re, func() bool {
return watcher.GetScheduleConfig().SplitMergeInterval.Duration == 2*sc.DefaultSplitMergeInterval
})
suite.pdLeaderServer.GetPersistOptions().SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
persistOpts.SetStoreConfig(&severcfg.StoreConfig{
Coprocessor: severcfg.Coprocessor{
RegionMaxSize: "144MiB",
},
Storage: severcfg.Storage{
Engine: severcfg.RaftstoreV2,
},
})
persistConfig(re, suite.pdLeaderServer)
testutil.Eventually(re, func() bool {
return watcher.GetStoreConfig().GetRegionMaxSize() == 144 &&
watcher.GetStoreConfig().IsRaftKV2()
})
persistOpts.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
persistConfig(re, suite.pdLeaderServer)
testutil.Eventually(re, func() bool {
return watcher.GetClusterVersion().String() == "4.0.0"
Expand Down