diff --git a/server/api/admin.go b/server/api/admin.go index 1fa63c8ad9a..42aac1247bb 100644 --- a/server/api/admin.go +++ b/server/api/admin.go @@ -58,6 +58,17 @@ func (h *adminHandler) DeleteRegionCache(w http.ResponseWriter, r *http.Request) h.rd.JSON(w, http.StatusOK, "The region is removed from server cache.") } +// @Tags admin +// @Summary Drop all regions from cache. +// @Produce json +// @Success 200 {string} string "All regions are removed from server cache." +// @Router /admin/cache/regions [delete] +func (h *adminHandler) DeleteAllRegionCache(w http.ResponseWriter, r *http.Request) { + rc := getCluster(r) + rc.DropCacheAllRegion() + h.rd.JSON(w, http.StatusOK, "All regions are removed from server cache.") +} + // FIXME: details of input json body params // @Tags admin // @Summary Reset the ts. diff --git a/server/api/admin_test.go b/server/api/admin_test.go index f8fd0bcf74f..1ea5a4f9ec7 100644 --- a/server/api/admin_test.go +++ b/server/api/admin_test.go @@ -94,6 +94,74 @@ func (suite *adminTestSuite) TestDropRegion() { suite.Equal(uint64(50), region.GetRegionEpoch().Version) } +func (suite *adminTestSuite) TestDropRegions() { + cluster := suite.svr.GetRaftCluster() + + n := uint64(10000) + np := uint64(3) + + regions := make([]*core.RegionInfo, 0, n) + for i := uint64(0); i < n; i++ { + peers := make([]*metapb.Peer, 0, np) + for j := uint64(0); j < np; j++ { + peer := &metapb.Peer{ + Id: i*np + j, + } + peer.StoreId = (i + j) % n + peers = append(peers, peer) + } + // initialize region's epoch to (100, 100). + region := cluster.GetRegionByKey([]byte(fmt.Sprintf("%d", i))).Clone( + core.SetPeers(peers), + core.SetRegionConfVer(100), + core.SetRegionVersion(100), + ) + regions = append(regions, region) + + err := cluster.HandleRegionHeartbeat(region) + suite.NoError(err) + } + + // Region epoch cannot decrease. + for i := uint64(0); i < n; i++ { + region := regions[i].Clone( + core.SetRegionConfVer(50), + core.SetRegionVersion(50), + ) + regions[i] = region + err := cluster.HandleRegionHeartbeat(region) + suite.Error(err) + } + + for i := uint64(0); i < n; i++ { + region := cluster.GetRegionByKey([]byte(fmt.Sprintf("%d", i))) + + suite.Equal(uint64(100), region.GetRegionEpoch().ConfVer) + suite.Equal(uint64(100), region.GetRegionEpoch().Version) + } + + // After drop all regions from cache, lower version is accepted. + url := fmt.Sprintf("%s/admin/cache/regions", suite.urlPrefix) + req, err := http.NewRequest(http.MethodDelete, url, nil) + suite.NoError(err) + res, err := testDialClient.Do(req) + suite.NoError(err) + suite.Equal(http.StatusOK, res.StatusCode) + res.Body.Close() + + for _, region := range regions { + err := cluster.HandleRegionHeartbeat(region) + suite.NoError(err) + } + + for i := uint64(0); i < n; i++ { + region := cluster.GetRegionByKey([]byte(fmt.Sprintf("%d", i))) + + suite.Equal(uint64(50), region.GetRegionEpoch().ConfVer) + suite.Equal(uint64(50), region.GetRegionEpoch().Version) + } +} + func (suite *adminTestSuite) TestPersistFile() { data := []byte("#!/bin/sh\nrm -rf /") re := suite.Require() diff --git a/server/api/router.go b/server/api/router.go index 81b8f9d83bc..b062ff78f65 100644 --- a/server/api/router.go +++ b/server/api/router.go @@ -286,6 +286,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router { adminHandler := newAdminHandler(svr, rd) registerFunc(clusterRouter, "/admin/cache/region/{id}", adminHandler.DeleteRegionCache, setMethods(http.MethodDelete), setAuditBackend(localLog)) + registerFunc(clusterRouter, "/admin/cache/regions", adminHandler.DeleteAllRegionCache, setMethods(http.MethodDelete), setAuditBackend(localLog)) registerFunc(clusterRouter, "/admin/reset-ts", adminHandler.ResetTS, setMethods(http.MethodPost), setAuditBackend(localLog)) registerFunc(apiRouter, "/admin/persist-file/{file_name}", adminHandler.SavePersistFile, setMethods(http.MethodPost), setAuditBackend(localLog)) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 105eaa529fd..5ebf4718632 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -357,7 +357,7 @@ func (c *RaftCluster) LoadClusterInfo() (*RaftCluster, error) { start = time.Now() // used to load region from kv storage to cache storage. - if err := c.storage.LoadRegionsOnce(c.ctx, c.core.CheckAndPutRegion); err != nil { + if err := storage.TryLoadRegionsOnce(c.ctx, c.storage, c.core.CheckAndPutRegion); err != nil { return nil, err } log.Info("load regions", diff --git a/server/cluster/coordinator.go b/server/cluster/coordinator.go index a118681e6a7..34983ccef62 100644 --- a/server/cluster/coordinator.go +++ b/server/cluster/coordinator.go @@ -36,6 +36,7 @@ import ( "github.com/tikv/pd/server/schedule/checker" "github.com/tikv/pd/server/schedule/hbstream" "github.com/tikv/pd/server/schedule/operator" + "github.com/tikv/pd/server/schedule/plan" "github.com/tikv/pd/server/statistics" "github.com/tikv/pd/server/storage" "go.uber.org/zap" @@ -72,12 +73,14 @@ type coordinator struct { opController *schedule.OperatorController hbStreams *hbstream.HeartbeatStreams pluginInterface *schedule.PluginInterface + diagnosis *diagnosisManager } // newCoordinator creates a new coordinator. func newCoordinator(ctx context.Context, cluster *RaftCluster, hbStreams *hbstream.HeartbeatStreams) *coordinator { ctx, cancel := context.WithCancel(ctx) opController := schedule.NewOperatorController(ctx, cluster, hbStreams) + schedulers := make(map[string]*scheduleController) return &coordinator{ ctx: ctx, cancel: cancel, @@ -86,10 +89,11 @@ func newCoordinator(ctx context.Context, cluster *RaftCluster, hbStreams *hbstre checkers: checker.NewController(ctx, cluster, cluster.ruleManager, cluster.regionLabeler, opController), regionScatterer: schedule.NewRegionScatterer(ctx, cluster), regionSplitter: schedule.NewRegionSplitter(cluster, schedule.NewSplitRegionsHandler(cluster, opController)), - schedulers: make(map[string]*scheduleController), + schedulers: schedulers, opController: opController, hbStreams: hbStreams, pluginInterface: schedule.NewPluginInterface(), + diagnosis: newDiagnosisManager(cluster, schedulers), } } @@ -901,6 +905,11 @@ func (s *scheduleController) Schedule() []*operator.Operator { return nil } +func (s *scheduleController) DiagnoseDryRun() ([]*operator.Operator, []plan.Plan) { + cacheCluster := newCacheCluster(s.cluster) + return s.Scheduler.Schedule(cacheCluster, true) +} + // GetInterval returns the interval of scheduling for a scheduler. func (s *scheduleController) GetInterval() time.Duration { return s.nextInterval @@ -933,6 +942,60 @@ func (s *scheduleController) GetDelayUntil() int64 { return 0 } +const maxDiagnosisResultNum = 6 + +// diagnosisManager is used to manage diagnose mechanism which shares the actual scheduler with coordinator +type diagnosisManager struct { + cluster *RaftCluster + schedulers map[string]*scheduleController + dryRunResult map[string]*cache.FIFO +} + +func newDiagnosisManager(cluster *RaftCluster, schedulerControllers map[string]*scheduleController) *diagnosisManager { + return &diagnosisManager{ + cluster: cluster, + schedulers: schedulerControllers, + dryRunResult: make(map[string]*cache.FIFO), + } +} + +func (d *diagnosisManager) diagnosisDryRun(name string) error { + if _, ok := d.schedulers[name]; !ok { + return errs.ErrSchedulerNotFound.FastGenByArgs() + } + ops, plans := d.schedulers[name].DiagnoseDryRun() + result := newDiagnosisResult(ops, plans) + if _, ok := d.dryRunResult[name]; !ok { + d.dryRunResult[name] = cache.NewFIFO(maxDiagnosisResultNum) + } + queue := d.dryRunResult[name] + queue.Put(result.timestamp, result) + return nil +} + +type diagnosisResult struct { + timestamp uint64 + unschedulablePlans []plan.Plan + schedulablePlans []plan.Plan +} + +func newDiagnosisResult(ops []*operator.Operator, result []plan.Plan) *diagnosisResult { + index := len(ops) + if len(ops) > 0 { + if ops[0].Kind()&operator.OpMerge != 0 { + index /= 2 + } + } + if index > len(result) { + return nil + } + return &diagnosisResult{ + timestamp: uint64(time.Now().Unix()), + unschedulablePlans: result[index:], + schedulablePlans: result[:index], + } +} + func (c *coordinator) getPausedSchedulerDelayAt(name string) (int64, error) { c.RLock() defer c.RUnlock() diff --git a/server/cluster/coordinator_test.go b/server/cluster/coordinator_test.go index 0cca1420ce0..01dfa63e6ef 100644 --- a/server/cluster/coordinator_test.go +++ b/server/cluster/coordinator_test.go @@ -301,6 +301,17 @@ func checkRegionAndOperator(re *require.Assertions, tc *testCluster, co *coordin } } +func TestDiagnosisDryRun(t *testing.T) { + re := require.New(t) + + _, co, cleanup := prepare(nil, nil, func(co *coordinator) { co.run() }, re) + defer cleanup() + err := co.diagnosis.diagnosisDryRun(schedulers.EvictLeaderName) + re.Error(err) + err = co.diagnosis.diagnosisDryRun(schedulers.BalanceRegionName) + re.NoError(err) +} + func TestCheckRegion(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) diff --git a/server/core/store.go b/server/core/store.go index 01d1ba0ddd2..f7cb7dd92fa 100644 --- a/server/core/store.go +++ b/server/core/store.go @@ -19,7 +19,6 @@ import ( "strings" "time" - "github.com/gogo/protobuf/proto" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" @@ -79,11 +78,17 @@ func NewStoreInfo(store *metapb.Store, opts ...StoreCreateOption) *StoreInfo { return storeInfo } +func (s *StoreInfo) cloneMetaStore() *metapb.Store { + b, _ := s.meta.Marshal() + store := &metapb.Store{} + store.Unmarshal(b) + return store +} + // Clone creates a copy of current StoreInfo. func (s *StoreInfo) Clone(opts ...StoreCreateOption) *StoreInfo { - meta := proto.Clone(s.meta).(*metapb.Store) store := &StoreInfo{ - meta: meta, + meta: s.cloneMetaStore(), storeStats: s.storeStats, pauseLeaderTransfer: s.pauseLeaderTransfer, slowStoreEvicted: s.slowStoreEvicted, diff --git a/server/core/store_option.go b/server/core/store_option.go index b445e9da735..e9537455179 100644 --- a/server/core/store_option.go +++ b/server/core/store_option.go @@ -17,7 +17,6 @@ package core import ( "time" - "github.com/gogo/protobuf/proto" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/tikv/pd/server/core/storelimit" @@ -29,7 +28,7 @@ type StoreCreateOption func(region *StoreInfo) // SetStoreAddress sets the address for the store. func SetStoreAddress(address, statusAddress, peerAddress string) StoreCreateOption { return func(store *StoreInfo) { - meta := proto.Clone(store.meta).(*metapb.Store) + meta := store.cloneMetaStore() meta.Address = address meta.StatusAddress = statusAddress meta.PeerAddress = peerAddress @@ -40,7 +39,7 @@ func SetStoreAddress(address, statusAddress, peerAddress string) StoreCreateOpti // SetStoreLabels sets the labels for the store. func SetStoreLabels(labels []*metapb.StoreLabel) StoreCreateOption { return func(store *StoreInfo) { - meta := proto.Clone(store.meta).(*metapb.Store) + meta := store.cloneMetaStore() meta.Labels = labels store.meta = meta } @@ -49,7 +48,7 @@ func SetStoreLabels(labels []*metapb.StoreLabel) StoreCreateOption { // SetStoreStartTime sets the start timestamp for the store. func SetStoreStartTime(startTS int64) StoreCreateOption { return func(store *StoreInfo) { - meta := proto.Clone(store.meta).(*metapb.Store) + meta := store.cloneMetaStore() meta.StartTimestamp = startTS store.meta = meta } @@ -58,7 +57,7 @@ func SetStoreStartTime(startTS int64) StoreCreateOption { // SetStoreVersion sets the version for the store. func SetStoreVersion(githash, version string) StoreCreateOption { return func(store *StoreInfo) { - meta := proto.Clone(store.meta).(*metapb.Store) + meta := store.cloneMetaStore() meta.Version = version meta.GitHash = githash store.meta = meta @@ -68,7 +67,7 @@ func SetStoreVersion(githash, version string) StoreCreateOption { // SetStoreDeployPath sets the deploy path for the store. func SetStoreDeployPath(deployPath string) StoreCreateOption { return func(store *StoreInfo) { - meta := proto.Clone(store.meta).(*metapb.Store) + meta := store.cloneMetaStore() meta.DeployPath = deployPath store.meta = meta } @@ -77,7 +76,7 @@ func SetStoreDeployPath(deployPath string) StoreCreateOption { // OfflineStore offline a store func OfflineStore(physicallyDestroyed bool) StoreCreateOption { return func(store *StoreInfo) { - meta := proto.Clone(store.meta).(*metapb.Store) + meta := store.cloneMetaStore() meta.State = metapb.StoreState_Offline meta.NodeState = metapb.NodeState_Removing meta.PhysicallyDestroyed = physicallyDestroyed @@ -88,7 +87,7 @@ func OfflineStore(physicallyDestroyed bool) StoreCreateOption { // UpStore up a store func UpStore() StoreCreateOption { return func(store *StoreInfo) { - meta := proto.Clone(store.meta).(*metapb.Store) + meta := store.cloneMetaStore() meta.State = metapb.StoreState_Up meta.NodeState = metapb.NodeState_Serving store.meta = meta @@ -98,7 +97,7 @@ func UpStore() StoreCreateOption { // TombstoneStore set a store to tombstone. func TombstoneStore() StoreCreateOption { return func(store *StoreInfo) { - meta := proto.Clone(store.meta).(*metapb.Store) + meta := store.cloneMetaStore() meta.State = metapb.StoreState_Tombstone meta.NodeState = metapb.NodeState_Removed store.meta = meta diff --git a/server/core/store_stats.go b/server/core/store_stats.go index 90d524d02d5..2cc8099ce49 100644 --- a/server/core/store_stats.go +++ b/server/core/store_stats.go @@ -67,6 +67,16 @@ func (ss *storeStats) GetStoreStats() *pdpb.StoreStats { return ss.rawStats } +// CloneStoreStats returns the statistics information cloned from the store. +func (ss *storeStats) CloneStoreStats() *pdpb.StoreStats { + ss.mu.RLock() + b, _ := ss.rawStats.Marshal() + ss.mu.RUnlock() + stats := &pdpb.StoreStats{} + stats.Unmarshal(b) + return stats +} + // GetCapacity returns the capacity size of the store. func (ss *storeStats) GetCapacity() uint64 { ss.mu.RLock() diff --git a/server/core/store_test.go b/server/core/store_test.go index b311315a29e..e93d0e80eaf 100644 --- a/server/core/store_test.go +++ b/server/core/store_test.go @@ -90,6 +90,26 @@ func TestCloneStore(t *testing.T) { wg.Wait() } +func TestCloneMetaStore(t *testing.T) { + re := require.New(t) + store := &metapb.Store{Id: 1, Address: "mock://tikv-1", Labels: []*metapb.StoreLabel{{Key: "zone", Value: "z1"}, {Key: "host", Value: "h1"}}} + store2 := NewStoreInfo(store).cloneMetaStore() + re.Equal(store2.Labels, store.Labels) + store2.Labels[0].Value = "changed value" + re.NotEqual(store2.Labels, store.Labels) +} + +func BenchmarkStoreClone(b *testing.B) { + meta := &metapb.Store{Id: 1, + Address: "mock://tikv-1", + Labels: []*metapb.StoreLabel{{Key: "zone", Value: "z1"}, {Key: "host", Value: "h1"}}} + store := NewStoreInfo(meta) + b.ResetTimer() + for t := 0; t < b.N; t++ { + store.Clone(SetLeaderCount(t)) + } +} + func TestRegionScore(t *testing.T) { re := require.New(t) stats := &pdpb.StoreStats{} diff --git a/server/region_syncer/client.go b/server/region_syncer/client.go index ba9fd2edcd6..debf39f556f 100644 --- a/server/region_syncer/client.go +++ b/server/region_syncer/client.go @@ -25,6 +25,7 @@ import ( "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/grpcutil" "github.com/tikv/pd/server/core" + "github.com/tikv/pd/server/storage" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/backoff" @@ -119,10 +120,10 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { defer s.wg.Done() // used to load region from kv storage to cache storage. bc := s.server.GetBasicCluster() - storage := s.server.GetStorage() + regionStorage := s.server.GetStorage() log.Info("region syncer start load region") start := time.Now() - err := storage.LoadRegionsOnce(ctx, bc.CheckAndPutRegion) + err := storage.TryLoadRegionsOnce(ctx, regionStorage, bc.CheckAndPutRegion) log.Info("region syncer finished load region", zap.Duration("time-cost", time.Since(start))) if err != nil { log.Warn("failed to load regions.", errs.ZapError(err)) @@ -224,13 +225,13 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { } } if saveKV { - err = storage.SaveRegion(r) + err = regionStorage.SaveRegion(r) } if err == nil { s.history.Record(region) } for _, old := range overlaps { - _ = storage.DeleteRegion(old.GetMeta()) + _ = regionStorage.DeleteRegion(old.GetMeta()) } } } diff --git a/server/region_syncer/server.go b/server/region_syncer/server.go index 723cef879b5..d2053b46cef 100644 --- a/server/region_syncer/server.go +++ b/server/region_syncer/server.go @@ -31,7 +31,6 @@ import ( "github.com/tikv/pd/pkg/syncutil" "github.com/tikv/pd/server/core" "github.com/tikv/pd/server/storage" - "github.com/tikv/pd/server/storage/endpoint" "github.com/tikv/pd/server/storage/kv" "go.uber.org/zap" "google.golang.org/grpc/codes" @@ -92,15 +91,13 @@ type RegionSyncer struct { // Usually open the region syncer in huge cluster and the server // no longer etcd but go-leveldb. func NewRegionSyncer(s Server) *RegionSyncer { - regionStorageGetter, ok := s.GetStorage().(interface { - GetRegionStorage() endpoint.RegionStorage - }) - if !ok { + localRegionStorage := storage.TryGetLocalRegionStorage(s.GetStorage()) + if localRegionStorage == nil { return nil } syncer := &RegionSyncer{ server: s, - history: newHistoryBuffer(defaultHistoryBufferSize, regionStorageGetter.GetRegionStorage().(kv.Base)), + history: newHistoryBuffer(defaultHistoryBufferSize, localRegionStorage.(kv.Base)), limit: ratelimit.NewRateLimiter(defaultBucketRate, defaultBucketCapacity), tlsConfig: s.GetTLSConfig(), } diff --git a/server/schedule/placement/fit.go b/server/schedule/placement/fit.go index 8b7dac1c922..9879d434720 100644 --- a/server/schedule/placement/fit.go +++ b/server/schedule/placement/fit.go @@ -16,6 +16,7 @@ package placement import ( "math" + "math/bits" "sort" "github.com/pingcap/kvproto/pkg/metapb" @@ -227,27 +228,29 @@ func (w *fitWorker) fitRule(index int) bool { if len(candidates) < count { count = len(candidates) } - return w.enumPeers(candidates, nil, index, count) + + return w.fixRuleWithCandidates(candidates, index, count) } -// Recursively traverses all feasible peer combinations. -// For each combination, call `compareBest` to determine whether it is better -// than the existing option. +// Pick the most suitable peer combination for the rule with candidates. // Returns true if it replaces `bestFit` with a better alternative. -func (w *fitWorker) enumPeers(candidates, selected []*fitPeer, index int, count int) bool { - if len(selected) == count { - // We collect enough peers. End recursive. - return w.compareBest(selected, index) - } +func (w *fitWorker) fixRuleWithCandidates(candidates []*fitPeer, index int, count int) bool { + // map the candidates to binary numbers with len(candidates) bits, + // each bit can be 1 or 0, 1 means a picked candidate + // the binary numbers with `count` 1 means a choose for the current rule. var better bool - // make sure the left number of candidates should be enough. - indexLimit := len(candidates) - (count - len(selected)) - for i := 0; i <= indexLimit; i++ { - p := candidates[i] - p.selected = true - better = w.enumPeers(candidates[i+1:], append(selected, p), index, count) || better - p.selected = false + limit := uint(1<>= 1 + if binaryNumber == 0 { + break + } + } + return selected +} + +func unSelectPeers(seleted []*fitPeer) { + for _, p := range seleted { + p.selected = false + } +} + // compareBest checks if the selected peers is better then previous best. // Returns true if it replaces `bestFit` with a better alternative. func (w *fitWorker) compareBest(selected []*fitPeer, index int) bool { diff --git a/server/schedule/placement/fit_test.go b/server/schedule/placement/fit_test.go index 6325dadb30a..7c4c9147ac5 100644 --- a/server/schedule/placement/fit_test.go +++ b/server/schedule/placement/fit_test.go @@ -187,3 +187,35 @@ func TestIsolationScore(t *testing.T) { testCase.checker(score1, score2) } } + +func TestPickPeersFromBinaryInt(t *testing.T) { + re := require.New(t) + var candidates []*fitPeer + for id := uint64(1); id <= 10; id++ { + candidates = append(candidates, &fitPeer{ + Peer: &metapb.Peer{Id: id}, + }) + } + testCases := []struct { + binary string + expectedPeers []uint64 + }{ + {"0", []uint64{}}, + {"1", []uint64{1}}, + {"101", []uint64{1, 3}}, + {"111", []uint64{1, 2, 3}}, + {"1011", []uint64{1, 2, 4}}, + {"100011", []uint64{1, 2, 6}}, + {"1000001111", []uint64{1, 2, 3, 4, 10}}, + } + + for _, c := range testCases { + binaryNumber, err := strconv.ParseUint(c.binary, 2, 64) + re.NoError(err) + selected := pickPeersFromBinaryInt(candidates, uint(binaryNumber)) + re.Len(selected, len(c.expectedPeers)) + for id := 0; id < len(selected); id++ { + re.Equal(selected[id].Id, c.expectedPeers[id]) + } + } +} diff --git a/server/schedule/range_cluster.go b/server/schedule/range_cluster.go index 573cc7fd0d6..f6a247d149a 100644 --- a/server/schedule/range_cluster.go +++ b/server/schedule/range_cluster.go @@ -15,8 +15,6 @@ package schedule import ( - "github.com/gogo/protobuf/proto" - "github.com/pingcap/kvproto/pkg/pdpb" "github.com/tikv/pd/server/core" ) @@ -53,7 +51,7 @@ func (r *RangeCluster) updateStoreInfo(s *core.StoreInfo) *core.StoreInfo { regionCount := r.subCluster.GetStoreRegionCount(id) regionSize := r.subCluster.GetStoreRegionSize(id) pendingPeerCount := r.subCluster.GetStorePendingPeerCount(id) - newStats := proto.Clone(s.GetStoreStats()).(*pdpb.StoreStats) + newStats := s.CloneStoreStats() newStats.UsedSize = uint64(float64(regionSize)/amplification) * (1 << 20) newStats.Available = s.GetCapacity() - newStats.UsedSize newStore := s.Clone( diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index 58e755813e4..862e753b38c 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -105,7 +105,8 @@ type hotScheduler struct { stLoadInfos [resourceTypeLen]map[uint64]*statistics.StoreLoadDetail // config of hot scheduler - conf *hotRegionSchedulerConfig + conf *hotRegionSchedulerConfig + searchRevertRegions [resourceTypeLen]bool // Whether to search revert regions. } func newHotScheduler(opController *schedule.OperatorController, conf *hotRegionSchedulerConfig) *hotScheduler { @@ -120,6 +121,7 @@ func newHotScheduler(opController *schedule.OperatorController, conf *hotRegionS } for ty := resourceType(0); ty < resourceTypeLen; ty++ { ret.stLoadInfos[ty] = map[uint64]*statistics.StoreLoadDetail{} + ret.searchRevertRegions[ty] = false } return ret } @@ -329,10 +331,15 @@ func (h *hotScheduler) balanceHotWriteRegions(cluster schedule.Cluster) []*opera } type solution struct { - srcStore *statistics.StoreLoadDetail - srcPeerStat *statistics.HotPeerStat - region *core.RegionInfo - dstStore *statistics.StoreLoadDetail + srcStore *statistics.StoreLoadDetail + region *core.RegionInfo // The region of the main balance effect. Relate mainPeerStat. srcStore -> dstStore + mainPeerStat *statistics.HotPeerStat + + dstStore *statistics.StoreLoadDetail + revertRegion *core.RegionInfo // The regions to hedge back effects. Relate revertPeerStat. dstStore -> srcStore + revertPeerStat *statistics.HotPeerStat + + cachedPeersRate []float64 // progressiveRank measures the contribution for balance. // The smaller the rank, the better this solution is. @@ -344,8 +351,12 @@ type solution struct { } // getExtremeLoad returns the min load of the src store and the max load of the dst store. +// If peersRate is negative, the direction is reversed. func (s *solution) getExtremeLoad(dim int) (src float64, dst float64) { - return s.srcStore.LoadPred.Min().Loads[dim], s.dstStore.LoadPred.Max().Loads[dim] + if s.getPeersRateFromCache(dim) >= 0 { + return s.srcStore.LoadPred.Min().Loads[dim], s.dstStore.LoadPred.Max().Loads[dim] + } + return s.srcStore.LoadPred.Max().Loads[dim], s.dstStore.LoadPred.Min().Loads[dim] } // getCurrentLoad returns the current load of the src store and the dst store. @@ -358,9 +369,29 @@ func (s *solution) getPendingLoad(dim int) (src float64, dst float64) { return s.srcStore.LoadPred.Pending().Loads[dim], s.dstStore.LoadPred.Pending().Loads[dim] } -// getPeerRate returns the load of the peer. -func (s *solution) getPeerRate(rw statistics.RWType, dim int) float64 { - return s.srcPeerStat.GetLoad(statistics.GetRegionStatKind(rw, dim)) +// calcPeersRate precomputes the peer rate and stores it in cachedPeersRate. +func (s *solution) calcPeersRate(rw statistics.RWType, dims ...int) { + s.cachedPeersRate = make([]float64, statistics.DimLen) + for _, dim := range dims { + kind := statistics.GetRegionStatKind(rw, dim) + peersRate := s.mainPeerStat.GetLoad(kind) + if s.revertPeerStat != nil { + peersRate -= s.revertPeerStat.GetLoad(kind) + } + s.cachedPeersRate[dim] = peersRate + } +} + +// getPeersRateFromCache returns the load of the peer. Need to calcPeersRate first. +func (s *solution) getPeersRateFromCache(dim int) float64 { + return s.cachedPeersRate[dim] +} + +// isAvailable returns the solution is available. +// If the solution has no revertRegion, progressiveRank should < 0. +// If the solution has some revertRegion, progressiveRank should < -1. +func (s *solution) isAvailable() bool { + return s.progressiveRank < -1 || (s.progressiveRank < 0 && s.revertRegion == nil) } type balanceSolver struct { @@ -369,11 +400,12 @@ type balanceSolver struct { stLoadDetail map[uint64]*statistics.StoreLoadDetail rwTy statistics.RWType opTy opType + resourceTy resourceType + + cur *solution - cur *solution best *solution ops []*operator.Operator - infl statistics.Influence maxSrc *statistics.StoreLoad minDst *statistics.StoreLoad @@ -394,7 +426,8 @@ type balanceSolver struct { func (bs *balanceSolver) init() { // Init store load detail according to the type. - bs.stLoadDetail = bs.sche.stLoadInfos[toResourceType(bs.rwTy, bs.opTy)] + bs.resourceTy = toResourceType(bs.rwTy, bs.opTy) + bs.stLoadDetail = bs.sche.stLoadInfos[bs.resourceTy] bs.maxSrc = &statistics.StoreLoad{Loads: make([]float64, statistics.DimLen)} bs.minDst = &statistics.StoreLoad{ @@ -444,7 +477,7 @@ func (bs *balanceSolver) getPriorities() []string { querySupport := bs.sche.conf.checkQuerySupport(bs.Cluster) // For read, transfer-leader and move-peer have the same priority config // For write, they are different - switch toResourceType(bs.rwTy, bs.opTy) { + switch bs.resourceTy { case readLeader, readPeer: return adjustConfig(querySupport, bs.sche.conf.GetReadPriorities(), getReadPriorities) case writeLeader: @@ -471,8 +504,6 @@ func (bs *balanceSolver) isValid() bool { if bs.Cluster == nil || bs.sche == nil || bs.stLoadDetail == nil { return false } - // ignore the return value because it will panic if the type is not correct. - _ = toResourceType(bs.rwTy, bs.opTy) return true } @@ -482,8 +513,8 @@ func (bs *balanceSolver) solve() []*operator.Operator { if !bs.isValid() { return nil } - bs.cur = &solution{} + bs.cur = &solution{} tryUpdateBestSolution := func(isUniformFirstPriority bool) { if bs.cur.progressiveRank == -1 && isUniformFirstPriority { // Because region is available for src and dst, so stddev is the same for both, only need to calcurate one. @@ -491,43 +522,94 @@ func (bs *balanceSolver) solve() []*operator.Operator { hotSchedulerResultCounter.WithLabelValues("skip-uniform-store", strconv.FormatUint(bs.cur.dstStore.GetID(), 10)).Inc() return } - if bs.cur.progressiveRank < 0 && bs.betterThan(bs.best) { - if newOps, newInfl := bs.buildOperators(); len(newOps) > 0 { + if bs.cur.isAvailable() && bs.betterThan(bs.best) { + if newOps := bs.buildOperators(); len(newOps) > 0 { bs.ops = newOps - bs.infl = *newInfl clone := *bs.cur bs.best = &clone } } } + // Whether to allow move region peer from dstStore to srcStore + searchRevertRegions := bs.sche.searchRevertRegions[bs.resourceTy] && !bs.sche.conf.IsStrictPickingStoreEnabled() + var allowRevertRegion func(region *core.RegionInfo, srcStoreID uint64) bool + if bs.opTy == transferLeader { + allowRevertRegion = func(region *core.RegionInfo, srcStoreID uint64) bool { + return region.GetStorePeer(srcStoreID) != nil + } + } else { + allowRevertRegion = func(region *core.RegionInfo, srcStoreID uint64) bool { + return region.GetStorePeer(srcStoreID) == nil + } + } + for _, srcStore := range bs.filterSrcStores() { bs.cur.srcStore = srcStore srcStoreID := srcStore.GetID() - isUniformFirstPriority, isUniformSecondPriority := bs.isUniformFirstPriority(bs.cur.srcStore), bs.isUniformSecondPriority(bs.cur.srcStore) + isUniformFirstPriority, isUniformSecondPriority := bs.isUniformFirstPriority(srcStore), bs.isUniformSecondPriority(srcStore) if isUniformFirstPriority && isUniformSecondPriority { - hotSchedulerResultCounter.WithLabelValues("skip-uniform-store", strconv.FormatUint(bs.cur.srcStore.GetID(), 10)).Inc() + hotSchedulerResultCounter.WithLabelValues("skip-uniform-store", strconv.FormatUint(srcStore.GetID(), 10)).Inc() continue } - for _, srcPeerStat := range bs.filterHotPeers(srcStore) { - if bs.cur.region = bs.getRegion(srcPeerStat, srcStoreID); bs.cur.region == nil { + for _, mainPeerStat := range bs.filterHotPeers(srcStore) { + if bs.cur.region = bs.getRegion(mainPeerStat, srcStoreID); bs.cur.region == nil { continue } else if bs.opTy == movePeer && bs.cur.region.GetApproximateSize() > bs.GetOpts().GetMaxMovableHotPeerSize() { - schedulerCounter.WithLabelValues(fmt.Sprintf("hot-region-%s", bs.rwTy), "hot_region_split").Inc() + schedulerCounter.WithLabelValues(bs.sche.GetName(), "need_split_before_move_peer").Inc() continue } - bs.cur.srcPeerStat = srcPeerStat + bs.cur.mainPeerStat = mainPeerStat for _, dstStore := range bs.filterDstStores() { bs.cur.dstStore = dstStore bs.calcProgressiveRank() tryUpdateBestSolution(isUniformFirstPriority) + + if searchRevertRegions && (bs.cur.progressiveRank >= -1 && bs.cur.progressiveRank <= 0) && + (bs.best == nil || bs.best.progressiveRank >= -1 || bs.best.revertRegion != nil) { + // The search-revert-regions is performed only when the following conditions are met to improve performance. + // * `searchRevertRegions` is true. It depends on the result of the last `solve`. + // * `IsStrictPickingStoreEnabled` is false. + // * The current solution is not good enough. -1 <= progressiveRank <= 0 + // * The current best solution is not good enough. + // * The current best solution has progressiveRank < -1 and does not contain revert regions. + // * The current best solution contain revert regions. + schedulerCounter.WithLabelValues(bs.sche.GetName(), "search-revert-regions").Inc() + dstStoreID := dstStore.GetID() + for _, revertPeerStat := range bs.filterHotPeers(bs.cur.dstStore) { + revertRegion := bs.getRegion(revertPeerStat, dstStoreID) + if revertRegion == nil || revertRegion.GetID() == bs.cur.region.GetID() || + !allowRevertRegion(revertRegion, srcStoreID) { + continue + } + bs.cur.revertPeerStat = revertPeerStat + bs.cur.revertRegion = revertRegion + bs.calcProgressiveRank() + tryUpdateBestSolution(isUniformFirstPriority) + } + bs.cur.revertPeerStat = nil + bs.cur.revertRegion = nil + } } } } + searchRevertRegions = bs.allowSearchRevertRegions() + bs.sche.searchRevertRegions[bs.resourceTy] = searchRevertRegions + if searchRevertRegions { + schedulerCounter.WithLabelValues(bs.sche.GetName(), "allow-search-revert-regions").Inc() + } return bs.ops } +func (bs *balanceSolver) allowSearchRevertRegions() bool { + // The next solve is allowed to search-revert-regions only when the following conditions are met. + // * No best solution was found this time. + // * The progressiveRank of the best solution is -1. + // * The best solution contain revert regions. + return bs.best == nil || bs.best.progressiveRank >= -1 || bs.best.revertRegion != nil +} + func (bs *balanceSolver) tryAddPendingInfluence() bool { if bs.best == nil || len(bs.ops) == 0 { return false @@ -539,7 +621,7 @@ func (bs *balanceSolver) tryAddPendingInfluence() bool { // Depending on the source of the statistics used, a different ZombieDuration will be used. // If the statistics are from the sum of Regions, there will be a longer ZombieDuration. var maxZombieDur time.Duration - switch toResourceType(bs.rwTy, bs.opTy) { + switch bs.resourceTy { case writeLeader: maxZombieDur = bs.sche.conf.GetRegionsStatZombieDuration() case writePeer: @@ -551,7 +633,24 @@ func (bs *balanceSolver) tryAddPendingInfluence() bool { default: maxZombieDur = bs.sche.conf.GetStoreStatZombieDuration() } - return bs.sche.tryAddPendingInfluence(bs.ops[0], bs.best.srcStore.GetID(), bs.best.dstStore.GetID(), bs.infl, maxZombieDur) + + // TODO: Process operators atomically. + // main peer + srcStoreID := bs.best.srcStore.GetID() + dstStoreID := bs.best.dstStore.GetID() + infl := statistics.Influence{Loads: bs.cur.mainPeerStat.Loads, Count: 1} + if !bs.sche.tryAddPendingInfluence(bs.ops[0], srcStoreID, dstStoreID, infl, maxZombieDur) { + return false + } + // revert peers + if bs.best.revertPeerStat != nil { + infl = statistics.Influence{Loads: bs.best.revertPeerStat.Loads, Count: 1} + if !bs.sche.tryAddPendingInfluence(bs.ops[1], dstStoreID, srcStoreID, infl, maxZombieDur) { + return false + } + } + bs.logBestSolution() + return true } // filterSrcStores compare the min rate and the ratio * expectation rate, if two dim rate is greater than @@ -814,11 +913,18 @@ func (bs *balanceSolver) isUniformSecondPriority(store *statistics.StoreLoadDeta // | Worsened | 0 | 1 | 1 | func (bs *balanceSolver) calcProgressiveRank() { bs.cur.progressiveRank = 1 + bs.cur.calcPeersRate(bs.rwTy, bs.firstPriority, bs.secondPriority) + if bs.cur.getPeersRateFromCache(bs.firstPriority) < bs.getMinRate(bs.firstPriority) && + bs.cur.getPeersRateFromCache(bs.secondPriority) < bs.getMinRate(bs.secondPriority) { + return + } - if toResourceType(bs.rwTy, bs.opTy) == writeLeader { + if bs.resourceTy == writeLeader { // For write leader, only compare the first priority. + // If the first priority is better, the progressiveRank is -3. + // Because it is not a solution that needs to be optimized. if bs.isBetterForWriteLeader() { - bs.cur.progressiveRank = -1 + bs.cur.progressiveRank = -3 } return } @@ -859,26 +965,26 @@ func (bs *balanceSolver) isTolerance(dim int) bool { return srcRate-pendingAmp*srcPending > dstRate+pendingAmp*dstPending } -func (bs *balanceSolver) getHotDecRatioByPriorities(dim int) (bool, float64) { +func (bs *balanceSolver) getHotDecRatioByPriorities(dim int) (isHot bool, decRatio float64) { // we use DecRatio(Decline Ratio) to expect that the dst store's rate should still be less // than the src store's rate after scheduling one peer. - getSrcDecRate := func(a, b float64) float64 { - if a-b <= 0 { - return 1 - } - return a - b - } srcRate, dstRate := bs.cur.getExtremeLoad(dim) - peerRate := bs.cur.getPeerRate(bs.rwTy, dim) - isHot := peerRate >= bs.getMinRate(dim) - decRatio := (dstRate + peerRate) / getSrcDecRate(srcRate, peerRate) - return isHot, decRatio + peersRate := bs.cur.getPeersRateFromCache(dim) + // Rate may be negative after adding revertRegion, which should be regarded as moving from dst to src. + if peersRate >= 0 { + isHot = peersRate >= bs.getMinRate(dim) + decRatio = (dstRate + peersRate) / math.Max(srcRate-peersRate, 1) + } else { + isHot = -peersRate >= bs.getMinRate(dim) + decRatio = (srcRate - peersRate) / math.Max(dstRate+peersRate, 1) + } + return } func (bs *balanceSolver) isBetterForWriteLeader() bool { srcRate, dstRate := bs.cur.getExtremeLoad(bs.firstPriority) - peerRate := bs.cur.getPeerRate(bs.rwTy, bs.firstPriority) - return srcRate-peerRate >= dstRate+peerRate && bs.isTolerance(bs.firstPriority) + peersRate := bs.cur.getPeersRateFromCache(bs.firstPriority) + return srcRate-peersRate >= dstRate+peersRate && bs.isTolerance(bs.firstPriority) } func (bs *balanceSolver) isBetter(dim int) bool { @@ -909,12 +1015,13 @@ func (bs *balanceSolver) betterThan(old *solution) bool { if old == nil { return true } - - switch { - case bs.cur.progressiveRank < old.progressiveRank: - return true - case bs.cur.progressiveRank > old.progressiveRank: - return false + if bs.cur.progressiveRank != old.progressiveRank { + // Higher rank is better. + return bs.cur.progressiveRank < old.progressiveRank + } + if (bs.cur.revertRegion == nil) != (old.revertRegion == nil) { + // Fewer revertRegions are better. + return bs.cur.revertRegion == nil } if r := bs.compareSrcStore(bs.cur.srcStore, old.srcStore); r < 0 { @@ -929,11 +1036,10 @@ func (bs *balanceSolver) betterThan(old *solution) bool { return false } - if bs.cur.srcPeerStat != old.srcPeerStat { + if bs.cur.mainPeerStat != old.mainPeerStat { // compare region - if toResourceType(bs.rwTy, bs.opTy) == writeLeader { - kind := statistics.GetRegionStatKind(statistics.Write, bs.firstPriority) - return bs.cur.srcPeerStat.GetLoad(kind) > old.srcPeerStat.GetLoad(kind) + if bs.resourceTy == writeLeader { + return bs.cur.getPeersRateFromCache(bs.firstPriority) > old.getPeersRateFromCache(bs.firstPriority) } // We will firstly consider ensuring converge faster, secondly reduce oscillation @@ -966,7 +1072,6 @@ func (bs *balanceSolver) betterThan(old *solution) bool { } func (bs *balanceSolver) getRkCmpPriorities(old *solution) (firstCmp int, secondCmp int) { - fk, sk := statistics.GetRegionStatKind(bs.rwTy, bs.firstPriority), statistics.GetRegionStatKind(bs.rwTy, bs.secondPriority) dimToStep := func(priority int) float64 { switch priority { case statistics.ByteDim: @@ -978,9 +1083,9 @@ func (bs *balanceSolver) getRkCmpPriorities(old *solution) (firstCmp int, second } return 100 } - fkRkCmp := rankCmp(bs.cur.srcPeerStat.GetLoad(fk), old.srcPeerStat.GetLoad(fk), stepRank(0, dimToStep(bs.firstPriority))) - skRkCmp := rankCmp(bs.cur.srcPeerStat.GetLoad(sk), old.srcPeerStat.GetLoad(sk), stepRank(0, dimToStep(bs.secondPriority))) - return fkRkCmp, skRkCmp + firstCmp = rankCmp(bs.cur.getPeersRateFromCache(bs.firstPriority), old.getPeersRateFromCache(bs.firstPriority), stepRank(0, dimToStep(bs.firstPriority))) + secondCmp = rankCmp(bs.cur.getPeersRateFromCache(bs.secondPriority), old.getPeersRateFromCache(bs.secondPriority), stepRank(0, dimToStep(bs.secondPriority))) + return } // smaller is better @@ -988,7 +1093,7 @@ func (bs *balanceSolver) compareSrcStore(detail1, detail2 *statistics.StoreLoadD if detail1 != detail2 { // compare source store var lpCmp storeLPCmp - if toResourceType(bs.rwTy, bs.opTy) == writeLeader { + if bs.resourceTy == writeLeader { lpCmp = sliceLPCmp( minLPCmp(negLoadCmp(sliceLoadCmp( stLdRankCmp(stLdRate(bs.firstPriority), stepRank(bs.maxSrc.Loads[bs.firstPriority], bs.rankStep.Loads[bs.firstPriority])), @@ -1021,7 +1126,7 @@ func (bs *balanceSolver) compareDstStore(detail1, detail2 *statistics.StoreLoadD if detail1 != detail2 { // compare destination store var lpCmp storeLPCmp - if toResourceType(bs.rwTy, bs.opTy) == writeLeader { + if bs.resourceTy == writeLeader { lpCmp = sliceLPCmp( maxLPCmp(sliceLoadCmp( stLdRankCmp(stLdRate(bs.firstPriority), stepRank(bs.minDst.Loads[bs.firstPriority], bs.rankStep.Loads[bs.firstPriority])), @@ -1056,17 +1161,24 @@ func stepRank(rk0 float64, step float64) func(float64) int64 { // Once we are ready to build the operator, we must ensure the following things: // 1. the source store and destination store in the current solution are not nil -// 2. the peer we choose as a source in the current solution is not nil and it belongs to the source store -// 3. the region which owns the peer in the current solution is not nil and its ID should equal to the peer's region ID +// 2. the peer we choose as a source in the current solution is not nil, and it belongs to the source store +// 3. the region which owns the peer in the current solution is not nil, and its ID should equal to the peer's region ID func (bs *balanceSolver) isReadyToBuild() bool { - return bs.cur.srcStore != nil && bs.cur.dstStore != nil && - bs.cur.srcPeerStat != nil && bs.cur.srcPeerStat.StoreID == bs.cur.srcStore.GetID() && - bs.cur.region != nil && bs.cur.region.GetID() == bs.cur.srcPeerStat.ID() + if !(bs.cur.srcStore != nil && bs.cur.dstStore != nil && + bs.cur.mainPeerStat != nil && bs.cur.mainPeerStat.StoreID == bs.cur.srcStore.GetID() && + bs.cur.region != nil && bs.cur.region.GetID() == bs.cur.mainPeerStat.ID()) { + return false + } + if bs.cur.revertPeerStat == nil { + return bs.cur.revertRegion == nil + } + return bs.cur.revertPeerStat.StoreID == bs.cur.dstStore.GetID() && + bs.cur.revertRegion != nil && bs.cur.revertRegion.GetID() == bs.cur.revertPeerStat.ID() } -func (bs *balanceSolver) buildOperators() (ops []*operator.Operator, infl *statistics.Influence) { +func (bs *balanceSolver) buildOperators() (ops []*operator.Operator) { if !bs.isReadyToBuild() { - return nil, nil + return nil } srcStoreID := bs.cur.srcStore.GetID() @@ -1095,18 +1207,21 @@ func (bs *balanceSolver) buildOperators() (ops []*operator.Operator, infl *stati currentOp, typ, err := createOperator(bs.cur.region, srcStoreID, dstStoreID) if err == nil { - bs.decorateOperator(currentOp, sourceLabel, targetLabel, typ, dim) + bs.decorateOperator(currentOp, false, sourceLabel, targetLabel, typ, dim) ops = []*operator.Operator{currentOp} - infl = &statistics.Influence{ - Loads: append(bs.cur.srcPeerStat.Loads[:0:0], bs.cur.srcPeerStat.Loads...), - Count: 1, + if bs.cur.revertRegion != nil { + currentOp, typ, err = createOperator(bs.cur.revertRegion, dstStoreID, srcStoreID) + if err == nil { + bs.decorateOperator(currentOp, true, targetLabel, sourceLabel, typ, dim) + ops = append(ops, currentOp) + } } } if err != nil { log.Debug("fail to create operator", zap.Stringer("rw-type", bs.rwTy), zap.Stringer("op-type", bs.opTy), errs.ZapError(err)) schedulerCounter.WithLabelValues(bs.sche.GetName(), "create-operator-fail").Inc() - return nil, nil + return nil } return @@ -1175,7 +1290,7 @@ func (bs *balanceSolver) createWriteOperator(region *core.RegionInfo, srcStoreID return } -func (bs *balanceSolver) decorateOperator(op *operator.Operator, sourceLabel, targetLabel, typ, dim string) { +func (bs *balanceSolver) decorateOperator(op *operator.Operator, isRevert bool, sourceLabel, targetLabel, typ, dim string) { op.SetPriorityLevel(core.HighPriority) op.FinishedCounters = append(op.FinishedCounters, hotDirectionCounter.WithLabelValues(typ, bs.rwTy.String(), sourceLabel, "out", dim), @@ -1184,6 +1299,39 @@ func (bs *balanceSolver) decorateOperator(op *operator.Operator, sourceLabel, ta op.Counters = append(op.Counters, schedulerCounter.WithLabelValues(bs.sche.GetName(), "new-operator"), schedulerCounter.WithLabelValues(bs.sche.GetName(), typ)) + if isRevert { + op.FinishedCounters = append(op.FinishedCounters, + hotDirectionCounter.WithLabelValues(typ, bs.rwTy.String(), sourceLabel, "out-for-revert", dim), + hotDirectionCounter.WithLabelValues(typ, bs.rwTy.String(), targetLabel, "in-for-revert", dim)) + } +} + +func (bs *balanceSolver) logBestSolution() { + best := bs.best + if best == nil { + return + } + + if best.revertRegion != nil { + // Log more information on solutions containing revertRegion + srcFirstRate, dstFirstRate := best.getExtremeLoad(bs.firstPriority) + srcSecondRate, dstSecondRate := best.getExtremeLoad(bs.secondPriority) + mainFirstRate := best.mainPeerStat.GetLoad(statistics.GetRegionStatKind(bs.rwTy, bs.firstPriority)) + mainSecondRate := best.mainPeerStat.GetLoad(statistics.GetRegionStatKind(bs.rwTy, bs.secondPriority)) + log.Info("use solution with revert regions", + zap.Uint64("src-store", best.srcStore.GetID()), + zap.Float64("src-first-rate", srcFirstRate), + zap.Float64("src-second-rate", srcSecondRate), + zap.Uint64("dst-store", best.dstStore.GetID()), + zap.Float64("dst-first-rate", dstFirstRate), + zap.Float64("dst-second-rate", dstSecondRate), + zap.Uint64("main-region", best.region.GetID()), + zap.Float64("main-first-rate", mainFirstRate), + zap.Float64("main-second-rate", mainSecondRate), + zap.Uint64("revert-regions", best.revertRegion.GetID()), + zap.Float64("peers-first-rate", best.getPeersRateFromCache(bs.firstPriority)), + zap.Float64("peers-second-rate", best.getPeersRateFromCache(bs.secondPriority))) + } } // calcPendingInfluence return the calculate weight of one Operator, the value will between [0,1] diff --git a/server/schedulers/hot_region_test.go b/server/schedulers/hot_region_test.go index cbef2db194d..949a5d0aafe 100644 --- a/server/schedulers/hot_region_test.go +++ b/server/schedulers/hot_region_test.go @@ -1979,6 +1979,207 @@ func TestHotScheduleWithStddev(t *testing.T) { clearPendingInfluence(hb.(*hotScheduler)) } +func TestHotWriteRegionScheduleWithRevertRegionsDimSecond(t *testing.T) { + // This is a test that searchRevertRegions finds a solution of rank -2. + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + statistics.Denoising = false + opt := config.NewTestOptions() + sche, err := schedule.CreateScheduler(statistics.Write.String(), schedule.NewOperatorController(ctx, nil, nil), storage.NewStorageWithMemoryBackend(), nil) + re.NoError(err) + hb := sche.(*hotScheduler) + hb.conf.SetDstToleranceRatio(0.0) + hb.conf.SetSrcToleranceRatio(0.0) + tc := mockcluster.NewCluster(ctx, opt) + tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) + tc.SetHotRegionCacheHitsThreshold(0) + tc.AddRegionStore(1, 20) + tc.AddRegionStore(2, 20) + tc.AddRegionStore(3, 20) + tc.AddRegionStore(4, 20) + tc.AddRegionStore(5, 20) + hb.conf.WritePeerPriorities = []string{BytePriority, KeyPriority} + + tc.UpdateStorageWrittenStats(1, 15*MB*statistics.StoreHeartBeatReportInterval, 15*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenStats(2, 16*MB*statistics.StoreHeartBeatReportInterval, 20*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenStats(3, 15*MB*statistics.StoreHeartBeatReportInterval, 15*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenStats(4, 15*MB*statistics.StoreHeartBeatReportInterval, 15*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenStats(5, 14*MB*statistics.StoreHeartBeatReportInterval, 10*MB*statistics.StoreHeartBeatReportInterval) + addRegionInfo(tc, statistics.Write, []testRegionInfo{ + {6, []uint64{3, 2, 4}, 2 * MB, 3 * MB, 0}, + {7, []uint64{1, 4, 5}, 2 * MB, 0.1 * MB, 0}, + }) + // No operators can be generated when StrictPickingStore is true. + ops, _ := hb.Schedule(tc, false) + re.Empty(ops) + re.True(hb.searchRevertRegions[writePeer]) + // Two operators can be generated when StrictPickingStore is false. + hb.conf.StrictPickingStore = false + ops, _ = hb.Schedule(tc, false) + re.Len(ops, 2) + testutil.CheckTransferPeer(re, ops[0], operator.OpHotRegion, 2, 5) + testutil.CheckTransferPeer(re, ops[1], operator.OpHotRegion, 5, 2) + re.True(hb.searchRevertRegions[writePeer]) + clearPendingInfluence(hb) + // When there is a better solution, there will only be one operator. + addRegionInfo(tc, statistics.Write, []testRegionInfo{ + {8, []uint64{3, 2, 4}, 0.5 * MB, 3 * MB, 0}, + }) + ops, _ = hb.Schedule(tc, false) + re.Len(ops, 1) + testutil.CheckTransferPeer(re, ops[0], operator.OpHotRegion, 2, 5) + re.False(hb.searchRevertRegions[writePeer]) + clearPendingInfluence(hb) +} + +func TestHotWriteRegionScheduleWithRevertRegionsDimFirst(t *testing.T) { + // This is a test that searchRevertRegions finds a solution of rank -3. + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + statistics.Denoising = false + opt := config.NewTestOptions() + sche, err := schedule.CreateScheduler(statistics.Write.String(), schedule.NewOperatorController(ctx, nil, nil), storage.NewStorageWithMemoryBackend(), nil) + re.NoError(err) + hb := sche.(*hotScheduler) + hb.conf.SetDstToleranceRatio(0.0) + hb.conf.SetSrcToleranceRatio(0.0) + tc := mockcluster.NewCluster(ctx, opt) + tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) + tc.SetHotRegionCacheHitsThreshold(0) + tc.AddRegionStore(1, 20) + tc.AddRegionStore(2, 20) + tc.AddRegionStore(3, 20) + tc.AddRegionStore(4, 20) + tc.AddRegionStore(5, 20) + hb.conf.WritePeerPriorities = []string{BytePriority, KeyPriority} + + tc.UpdateStorageWrittenStats(1, 15*MB*statistics.StoreHeartBeatReportInterval, 15*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenStats(2, 20*MB*statistics.StoreHeartBeatReportInterval, 14*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenStats(3, 15*MB*statistics.StoreHeartBeatReportInterval, 15*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenStats(4, 15*MB*statistics.StoreHeartBeatReportInterval, 15*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenStats(5, 10*MB*statistics.StoreHeartBeatReportInterval, 16*MB*statistics.StoreHeartBeatReportInterval) + addRegionInfo(tc, statistics.Write, []testRegionInfo{ + {6, []uint64{3, 2, 4}, 3 * MB, 1.8 * MB, 0}, + {7, []uint64{1, 4, 5}, 0.1 * MB, 2 * MB, 0}, + }) + // One operator can be generated when StrictPickingStore is true. + ops, _ := hb.Schedule(tc, false) + re.Len(ops, 1) + testutil.CheckTransferPeer(re, ops[0], operator.OpHotRegion, 2, 5) + re.True(hb.searchRevertRegions[writePeer]) + clearPendingInfluence(hb) + // Two operators can be generated when StrictPickingStore is false. + hb.conf.StrictPickingStore = false + ops, _ = hb.Schedule(tc, false) + re.Len(ops, 2) + testutil.CheckTransferPeer(re, ops[0], operator.OpHotRegion, 2, 5) + testutil.CheckTransferPeer(re, ops[1], operator.OpHotRegion, 5, 2) + re.True(hb.searchRevertRegions[writePeer]) + clearPendingInfluence(hb) +} + +func TestHotWriteRegionScheduleWithRevertRegionsDimFirstOnly(t *testing.T) { + // This is a test that searchRevertRegions finds a solution of rank -1. + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + statistics.Denoising = false + opt := config.NewTestOptions() + sche, err := schedule.CreateScheduler(statistics.Write.String(), schedule.NewOperatorController(ctx, nil, nil), storage.NewStorageWithMemoryBackend(), nil) + re.NoError(err) + hb := sche.(*hotScheduler) + hb.conf.SetDstToleranceRatio(0.0) + hb.conf.SetSrcToleranceRatio(0.0) + tc := mockcluster.NewCluster(ctx, opt) + tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) + tc.SetHotRegionCacheHitsThreshold(0) + tc.AddRegionStore(1, 20) + tc.AddRegionStore(2, 20) + tc.AddRegionStore(3, 20) + tc.AddRegionStore(4, 20) + tc.AddRegionStore(5, 20) + hb.conf.WritePeerPriorities = []string{BytePriority, KeyPriority} + + tc.UpdateStorageWrittenStats(1, 15*MB*statistics.StoreHeartBeatReportInterval, 15*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenStats(2, 20*MB*statistics.StoreHeartBeatReportInterval, 14*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenStats(3, 15*MB*statistics.StoreHeartBeatReportInterval, 15*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenStats(4, 15*MB*statistics.StoreHeartBeatReportInterval, 15*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenStats(5, 10*MB*statistics.StoreHeartBeatReportInterval, 16*MB*statistics.StoreHeartBeatReportInterval) + addRegionInfo(tc, statistics.Write, []testRegionInfo{ + {6, []uint64{3, 2, 4}, 3 * MB, 0.1 * MB, 0}, + {7, []uint64{1, 4, 5}, 0.1 * MB, 3 * MB, 0}, + }) + // searchRevertRegions becomes true after the first `Schedule`. + hb.conf.StrictPickingStore = false + ops, _ := hb.Schedule(tc, false) + re.Len(ops, 1) + testutil.CheckTransferPeer(re, ops[0], operator.OpHotRegion, 2, 5) + re.True(hb.searchRevertRegions[writePeer]) + clearPendingInfluence(hb) + // There is still the solution with one operator after that. + ops, _ = hb.Schedule(tc, false) + re.Len(ops, 1) + testutil.CheckTransferPeer(re, ops[0], operator.OpHotRegion, 2, 5) + re.True(hb.searchRevertRegions[writePeer]) + clearPendingInfluence(hb) +} + +func TestHotReadRegionScheduleWithRevertRegionsDimSecond(t *testing.T) { + // This is a test that searchRevertRegions finds a solution of rank -2. + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + statistics.Denoising = false + opt := config.NewTestOptions() + sche, err := schedule.CreateScheduler(statistics.Read.String(), schedule.NewOperatorController(ctx, nil, nil), storage.NewStorageWithMemoryBackend(), nil) + re.NoError(err) + hb := sche.(*hotScheduler) + hb.conf.SetDstToleranceRatio(0.0) + hb.conf.SetSrcToleranceRatio(0.0) + tc := mockcluster.NewCluster(ctx, opt) + tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) + tc.SetHotRegionCacheHitsThreshold(0) + tc.AddRegionStore(1, 20) + tc.AddRegionStore(2, 20) + tc.AddRegionStore(3, 20) + tc.AddRegionStore(4, 20) + tc.AddRegionStore(5, 20) + hb.conf.ReadPriorities = []string{BytePriority, KeyPriority} + + tc.UpdateStorageReadStats(1, 15*MB*statistics.StoreHeartBeatReportInterval, 15*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageReadStats(2, 16*MB*statistics.StoreHeartBeatReportInterval, 20*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageReadStats(3, 15*MB*statistics.StoreHeartBeatReportInterval, 15*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageReadStats(4, 15*MB*statistics.StoreHeartBeatReportInterval, 15*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageReadStats(5, 14*MB*statistics.StoreHeartBeatReportInterval, 10*MB*statistics.StoreHeartBeatReportInterval) + addRegionInfo(tc, statistics.Read, []testRegionInfo{ + {6, []uint64{2, 1, 5}, 2 * MB, 3 * MB, 0}, + {7, []uint64{5, 4, 2}, 2 * MB, 0.1 * MB, 0}, + }) + // No operators can be generated when StrictPickingStore is true. + ops, _ := hb.Schedule(tc, false) + re.Empty(ops) + re.True(hb.searchRevertRegions[readLeader]) + // Two operators can be generated when StrictPickingStore is false. + hb.conf.StrictPickingStore = false + ops, _ = hb.Schedule(tc, false) + re.Len(ops, 2) + testutil.CheckTransferLeader(re, ops[0], operator.OpHotRegion, 2, 5) + testutil.CheckTransferLeader(re, ops[1], operator.OpHotRegion, 5, 2) + re.True(hb.searchRevertRegions[readLeader]) + clearPendingInfluence(hb) + // When there is a better solution, there will only be one operator. + addRegionInfo(tc, statistics.Read, []testRegionInfo{ + {8, []uint64{2, 1, 5}, 0.5 * MB, 3 * MB, 0}, + }) + ops, _ = hb.Schedule(tc, false) + re.Len(ops, 1) + testutil.CheckTransferLeader(re, ops[0], operator.OpHotRegion, 2, 5) + re.False(hb.searchRevertRegions[readLeader]) + clearPendingInfluence(hb) +} + func TestHotWriteLeaderScheduleWithPriority(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) diff --git a/server/server.go b/server/server.go index 6e900580d66..9b92b424e88 100644 --- a/server/server.go +++ b/server/server.go @@ -1517,19 +1517,14 @@ func (s *Server) reloadConfigFromKV() error { return err } s.loadRateLimitConfig() - switchableStorage, ok := s.storage.(interface { - SwitchToRegionStorage() - SwitchToDefaultStorage() - }) - if !ok { - return nil - } - if s.persistOptions.IsUseRegionStorage() { - switchableStorage.SwitchToRegionStorage() - log.Info("server enable region storage") - } else { - switchableStorage.SwitchToDefaultStorage() - log.Info("server disable region storage") + useRegionStorage := s.persistOptions.IsUseRegionStorage() + regionStorage := storage.TrySwitchRegionStorage(s.storage, useRegionStorage) + if regionStorage != nil { + if useRegionStorage { + log.Info("server enable region storage") + } else { + log.Info("server disable region storage") + } } return nil } diff --git a/server/storage/endpoint/meta.go b/server/storage/endpoint/meta.go index d16687f57d9..bb848485c39 100644 --- a/server/storage/endpoint/meta.go +++ b/server/storage/endpoint/meta.go @@ -44,7 +44,6 @@ type MetaStorage interface { type RegionStorage interface { LoadRegion(regionID uint64, region *metapb.Region) (ok bool, err error) LoadRegions(ctx context.Context, f func(region *core.RegionInfo) []*core.RegionInfo) error - LoadRegionsOnce(ctx context.Context, f func(region *core.RegionInfo) []*core.RegionInfo) error SaveRegion(region *metapb.Region) error DeleteRegion(region *metapb.Region) error Flush() error @@ -218,11 +217,6 @@ func (se *StorageEndpoint) LoadRegions(ctx context.Context, f func(region *core. } } -// LoadRegionsOnce loads all regions from storage to RegionsInfo. -func (se *StorageEndpoint) LoadRegionsOnce(ctx context.Context, f func(region *core.RegionInfo) []*core.RegionInfo) error { - return se.LoadRegions(ctx, f) -} - // SaveRegion saves one region to storage. func (se *StorageEndpoint) SaveRegion(region *metapb.Region) error { region, err := encryption.EncryptRegion(region, se.encryptionKeyManager) diff --git a/server/storage/storage.go b/server/storage/storage.go index af65dfe4a7b..97e1f8dc06e 100644 --- a/server/storage/storage.go +++ b/server/storage/storage.go @@ -83,47 +83,51 @@ func NewCoreStorage(defaultStorage Storage, regionStorage endpoint.RegionStorage } } -// GetRegionStorage gets the internal region storage. -func (ps *coreStorage) GetRegionStorage() endpoint.RegionStorage { - return ps.regionStorage -} - -// SwitchToRegionStorage switches the region storage to regionStorage, -// after calling this, all region info will be read/saved by the internal -// regionStorage, and in most cases it's LevelDB-backend. -func (ps *coreStorage) SwitchToRegionStorage() { - atomic.StoreInt32(&ps.useRegionStorage, 1) +// TryGetLocalRegionStorage gets the local region storage. Returns nil if not present. +func TryGetLocalRegionStorage(s Storage) endpoint.RegionStorage { + switch ps := s.(type) { + case *coreStorage: + return ps.regionStorage + case *levelDBBackend, *memoryStorage: + return ps + default: + return nil + } } -// SwitchToDefaultStorage switches the region storage to defaultStorage, -// after calling this, all region info will be read/saved by the internal -// defaultStorage, and in most cases it's etcd-backend. -func (ps *coreStorage) SwitchToDefaultStorage() { - atomic.StoreInt32(&ps.useRegionStorage, 0) -} +// TrySwitchRegionStorage try to switch whether the RegionStorage uses local or not, +// and returns the RegionStorage used after the switch. +// Returns nil if it cannot be switched. +func TrySwitchRegionStorage(s Storage, useLocalRegionStorage bool) endpoint.RegionStorage { + ps, ok := s.(*coreStorage) + if !ok { + return nil + } -// LoadRegion loads one region from storage. -func (ps *coreStorage) LoadRegion(regionID uint64, region *metapb.Region) (ok bool, err error) { - if atomic.LoadInt32(&ps.useRegionStorage) > 0 { - return ps.regionStorage.LoadRegion(regionID, region) + if useLocalRegionStorage { + // Switch the region storage to regionStorage, all region info will be read/saved by the internal + // regionStorage, and in most cases it's LevelDB-backend. + atomic.StoreInt32(&ps.useRegionStorage, 1) + return ps.regionStorage } - return ps.Storage.LoadRegion(regionID, region) + // Switch the region storage to defaultStorage, all region info will be read/saved by the internal + // defaultStorage, and in most cases it's etcd-backend. + atomic.StoreInt32(&ps.useRegionStorage, 0) + return ps.Storage } -// LoadRegions loads all regions from storage to RegionsInfo. -func (ps *coreStorage) LoadRegions(ctx context.Context, f func(region *core.RegionInfo) []*core.RegionInfo) error { - if atomic.LoadInt32(&ps.useRegionStorage) > 0 { - return ps.regionStorage.LoadRegions(ctx, f) +// TryLoadRegionsOnce loads all regions from storage to RegionsInfo. +// If the underlying storage is the local region storage, it will only load once. +func TryLoadRegionsOnce(ctx context.Context, s Storage, f func(region *core.RegionInfo) []*core.RegionInfo) error { + ps, ok := s.(*coreStorage) + if !ok { + return s.LoadRegions(ctx, f) } - return ps.Storage.LoadRegions(ctx, f) -} -// LoadRegionsOnce loads all regions from storage to RegionsInfo. If the underlying storage is the region storage, -// it will only load once. -func (ps *coreStorage) LoadRegionsOnce(ctx context.Context, f func(region *core.RegionInfo) []*core.RegionInfo) error { if atomic.LoadInt32(&ps.useRegionStorage) == 0 { return ps.Storage.LoadRegions(ctx, f) } + ps.mu.Lock() defer ps.mu.Unlock() if !ps.regionLoaded { @@ -135,6 +139,22 @@ func (ps *coreStorage) LoadRegionsOnce(ctx context.Context, f func(region *core. return nil } +// LoadRegion loads one region from storage. +func (ps *coreStorage) LoadRegion(regionID uint64, region *metapb.Region) (ok bool, err error) { + if atomic.LoadInt32(&ps.useRegionStorage) > 0 { + return ps.regionStorage.LoadRegion(regionID, region) + } + return ps.Storage.LoadRegion(regionID, region) +} + +// LoadRegions loads all regions from storage to RegionsInfo. +func (ps *coreStorage) LoadRegions(ctx context.Context, f func(region *core.RegionInfo) []*core.RegionInfo) error { + if atomic.LoadInt32(&ps.useRegionStorage) > 0 { + return ps.regionStorage.LoadRegions(ctx, f) + } + return ps.Storage.LoadRegions(ctx, f) +} + // SaveRegion saves one region to storage. func (ps *coreStorage) SaveRegion(region *metapb.Region) error { if atomic.LoadInt32(&ps.useRegionStorage) > 0 { diff --git a/server/storage/storage_test.go b/server/storage/storage_test.go index fbcc8e528a1..8793fbfc8c6 100644 --- a/server/storage/storage_test.go +++ b/server/storage/storage_test.go @@ -253,7 +253,7 @@ func TestLoadRegionsToCache(t *testing.T) { n := 10 regions := mustSaveRegions(re, storage, n) - re.NoError(storage.LoadRegionsOnce(context.Background(), cache.SetRegion)) + re.NoError(TryLoadRegionsOnce(context.Background(), storage, cache.SetRegion)) re.Equal(n, cache.GetRegionCount()) for _, region := range cache.GetMetaRegions() { @@ -262,7 +262,7 @@ func TestLoadRegionsToCache(t *testing.T) { n = 20 mustSaveRegions(re, storage, n) - re.NoError(storage.LoadRegionsOnce(context.Background(), cache.SetRegion)) + re.NoError(TryLoadRegionsOnce(context.Background(), storage, cache.SetRegion)) re.Equal(n, cache.GetRegionCount()) } @@ -282,6 +282,38 @@ func TestLoadRegionsExceedRangeLimit(t *testing.T) { re.NoError(failpoint.Disable("github.com/tikv/pd/server/storage/kv/withRangeLimit")) } +func TestTrySwitchRegionStorage(t *testing.T) { + re := require.New(t) + defaultStorage := NewStorageWithMemoryBackend() + localStorage := NewStorageWithMemoryBackend() + storage := NewCoreStorage(defaultStorage, localStorage) + defaultCache := core.NewRegionsInfo() + localCache := core.NewRegionsInfo() + + TrySwitchRegionStorage(storage, false) + regions10 := mustSaveRegions(re, storage, 10) + re.NoError(defaultStorage.LoadRegions(context.Background(), defaultCache.SetRegion)) + re.NoError(localStorage.LoadRegions(context.Background(), localCache.SetRegion)) + re.Empty(localCache.GetMetaRegions()) + re.Len(defaultCache.GetMetaRegions(), 10) + for _, region := range defaultCache.GetMetaRegions() { + re.Equal(regions10[region.GetId()], region) + } + + TrySwitchRegionStorage(storage, true) + regions20 := mustSaveRegions(re, storage, 20) + re.NoError(defaultStorage.LoadRegions(context.Background(), defaultCache.SetRegion)) + re.NoError(localStorage.LoadRegions(context.Background(), localCache.SetRegion)) + re.Len(defaultCache.GetMetaRegions(), 10) + re.Len(localCache.GetMetaRegions(), 20) + for _, region := range defaultCache.GetMetaRegions() { + re.Equal(regions10[region.GetId()], region) + } + for _, region := range localCache.GetMetaRegions() { + re.Equal(regions20[region.GetId()], region) + } +} + const ( keyChars = "abcdefghijklmnopqrstuvwxyz" keyLen = 20 diff --git a/server/tso/tso.go b/server/tso/tso.go index 8e1f7009657..b7a00c3f9ab 100644 --- a/server/tso/tso.go +++ b/server/tso/tso.go @@ -123,7 +123,7 @@ func (t *timestampOracle) generateTSO(count int64, suffixBits int) (physical int // For example, we have three DCs: dc-1, dc-2 and dc-3. The bits of suffix is defined by // the const suffixBits. Then, for dc-2, the suffix may be 1 because it's persisted // in etcd with the value of 1. -// Once we get a noramal TSO like this (18 bits): xxxxxxxxxxxxxxxxxx. We will make the TSO's +// Once we get a normal TSO like this (18 bits): xxxxxxxxxxxxxxxxxx. We will make the TSO's // low bits of logical part from each DC looks like: // global: xxxxxxxxxx00000000 // dc-1: xxxxxxxxxx00000001 diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index b16099ddda0..edc642495d2 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -756,13 +756,13 @@ func TestLoadClusterInfo(t *testing.T) { re.NoError(err) re.Nil(raftCluster) - storage := rc.GetStorage() + testStorage := rc.GetStorage() basicCluster := rc.GetBasicCluster() opt := rc.GetOpts() // Save meta, stores and regions. n := 10 meta := &metapb.Cluster{Id: 123} - re.NoError(storage.SaveMeta(meta)) + re.NoError(testStorage.SaveMeta(meta)) stores := make([]*metapb.Store, 0, n) for i := 0; i < n; i++ { store := &metapb.Store{Id: uint64(i)} @@ -770,7 +770,7 @@ func TestLoadClusterInfo(t *testing.T) { } for _, store := range stores { - re.NoError(storage.SaveStore(store)) + re.NoError(testStorage.SaveStore(store)) } regions := make([]*metapb.Region, 0, n) @@ -785,12 +785,12 @@ func TestLoadClusterInfo(t *testing.T) { } for _, region := range regions { - re.NoError(storage.SaveRegion(region)) + re.NoError(testStorage.SaveRegion(region)) } - re.NoError(storage.Flush()) + re.NoError(testStorage.Flush()) raftCluster = cluster.NewRaftCluster(ctx, svr.ClusterID(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient()) - raftCluster.InitCluster(mockid.NewIDAllocator(), opt, storage, basicCluster) + raftCluster.InitCluster(mockid.NewIDAllocator(), opt, testStorage, basicCluster) raftCluster, err = raftCluster.LoadClusterInfo() re.NoError(err) re.NotNil(raftCluster) @@ -819,9 +819,9 @@ func TestLoadClusterInfo(t *testing.T) { } for _, region := range regions { - re.NoError(storage.SaveRegion(region)) + re.NoError(testStorage.SaveRegion(region)) } - raftCluster.GetStorage().LoadRegionsOnce(ctx, raftCluster.GetBasicCluster().PutRegion) + re.NoError(storage.TryLoadRegionsOnce(ctx, testStorage, raftCluster.GetBasicCluster().PutRegion)) re.Equal(n, raftCluster.GetRegionCount()) } diff --git a/tools/pd-simulator/main.go b/tools/pd-simulator/main.go index 12729f7f772..496551b90b0 100644 --- a/tools/pd-simulator/main.go +++ b/tools/pd-simulator/main.go @@ -16,8 +16,8 @@ package main import ( "context" - "flag" "fmt" + "net/http" "os" "os/signal" "syscall" @@ -25,6 +25,8 @@ import ( "github.com/BurntSushi/toml" "github.com/pingcap/log" + "github.com/prometheus/client_golang/prometheus/promhttp" + flag "github.com/spf13/pflag" "github.com/tikv/pd/server" "github.com/tikv/pd/server/api" "github.com/tikv/pd/server/config" @@ -49,9 +51,12 @@ var ( regionNum = flag.Int("regionNum", 0, "regionNum of one store") storeNum = flag.Int("storeNum", 0, "storeNum") enableTransferRegionCounter = flag.Bool("enableTransferRegionCounter", false, "enableTransferRegionCounter") + statusAddress = flag.String("status-addr", "0.0.0.0:20180", "status address") ) func main() { + // ignore some undefined flag + flag.CommandLine.ParseErrorsWhitelist.UnknownFlags = true flag.Parse() simutil.InitLogger(*simLogLevel, *simLogFile) @@ -61,19 +66,6 @@ func main() { analysis.GetTransferCounter().Init(simutil.CaseConfigure.StoreNum, simutil.CaseConfigure.RegionNum) } - if *caseName == "" { - if *pdAddr != "" { - simutil.Logger.Fatal("need to specify one config name") - } - for simCase := range cases.CaseMap { - run(simCase) - } - } else { - run(*caseName) - } -} - -func run(simCase string) { simConfig := simulator.NewSimConfig(*serverLogLevel) var meta toml.MetaData var err error @@ -85,12 +77,29 @@ func run(simCase string) { if err = simConfig.Adjust(&meta); err != nil { simutil.Logger.Fatal("failed to adjust simulator configuration", zap.Error(err)) } + if len(*caseName) == 0 { + *caseName = simConfig.CaseName + } + + if *caseName == "" { + if *pdAddr != "" { + simutil.Logger.Fatal("need to specify one config name") + } + for simCase := range cases.CaseMap { + run(simCase, simConfig) + } + } else { + run(*caseName, simConfig) + } +} +func run(simCase string, simConfig *simulator.SimConfig) { if *pdAddr != "" { + go runMetrics() simStart(*pdAddr, simCase, simConfig) } else { local, clean := NewSingleServer(context.Background(), simConfig) - err = local.Run() + err := local.Run() if err != nil { simutil.Logger.Fatal("run server error", zap.Error(err)) } @@ -104,6 +113,11 @@ func run(simCase string) { } } +func runMetrics() { + http.Handle("/metrics", promhttp.Handler()) + http.ListenAndServe(*statusAddress, nil) +} + // NewSingleServer creates a pd server for simulator. func NewSingleServer(ctx context.Context, simConfig *simulator.SimConfig) (*server.Server, server.CleanupFunc) { err := simConfig.ServerConfig.SetupLogger() @@ -146,7 +160,6 @@ func simStart(pdAddr string, simCase string, simConfig *simulator.SimConfig, cle if err != nil { simutil.Logger.Fatal("simulator prepare error", zap.Error(err)) } - tickInterval := simConfig.SimTickInterval.Duration tick := time.NewTicker(tickInterval) diff --git a/tools/pd-simulator/simulator/config.go b/tools/pd-simulator/simulator/config.go index ad027a81c24..1b82a288895 100644 --- a/tools/pd-simulator/simulator/config.go +++ b/tools/pd-simulator/simulator/config.go @@ -44,6 +44,7 @@ const ( // SimConfig is the simulator configuration. type SimConfig struct { // tick + CaseName string `toml:"case-name"` SimTickInterval typeutil.Duration `toml:"sim-tick-interval"` // store StoreCapacityGB uint64 `toml:"store-capacity"` diff --git a/tools/pd-simulator/simulator/metrics.go b/tools/pd-simulator/simulator/metrics.go new file mode 100644 index 00000000000..5f06b2cbb65 --- /dev/null +++ b/tools/pd-simulator/simulator/metrics.go @@ -0,0 +1,18 @@ +package simulator + +import "github.com/prometheus/client_golang/prometheus" + +var ( + snapDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "tikv", + Subsystem: "raftstore", + Name: "snapshot_duration_seconds", + Help: "Bucketed histogram of processing time (s) of handled snap requests.", + Buckets: prometheus.ExponentialBuckets(0.0005, 2, 20), + }, []string{"store", "type"}) +) + +func init() { + prometheus.MustRegister(snapDuration) +}