From 6f1103acb2e9074bb1ed6c32b903902e4b636dee Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 17 Apr 2024 21:59:45 +0800 Subject: [PATCH 1/7] tests: avoid some test branches not being reached Signed-off-by: lhy1024 --- pkg/schedule/schedulers/hot_region.go | 24 +++-- pkg/schedule/schedulers/hot_region_test.go | 98 +++++++------------ pkg/schedule/schedulers/hot_region_v2_test.go | 7 +- pkg/schedule/schedulers/scheduler_test.go | 12 ++- pkg/statistics/store_hot_peers_infos.go | 5 +- pkg/utils/operatorutil/operator_check.go | 10 +- 6 files changed, 72 insertions(+), 84 deletions(-) diff --git a/pkg/schedule/schedulers/hot_region.go b/pkg/schedule/schedulers/hot_region.go index 5e5e254596a..284cf07032c 100644 --- a/pkg/schedule/schedulers/hot_region.go +++ b/pkg/schedule/schedulers/hot_region.go @@ -47,25 +47,29 @@ const ( // HotRegionName is balance hot region scheduler name. HotRegionName = "balance-hot-region-scheduler" // HotRegionType is balance hot region scheduler type. - HotRegionType = "hot-region" - splitHotReadBuckets = "split-hot-read-region" - splitHotWriteBuckets = "split-hot-write-region" - splitProgressiveRank = int64(-5) - minHotScheduleInterval = time.Second - maxHotScheduleInterval = 20 * time.Second + HotRegionType = "hot-region" + splitHotReadBuckets = "split-hot-read-region" + splitHotWriteBuckets = "split-hot-write-region" + splitProgressiveRank = int64(-5) + minHotScheduleInterval = time.Second + maxHotScheduleInterval = 20 * time.Second + defaultSchedulePeerPr = 0.66 + defaultPendingAmpFactor = 2.0 + defaultStddevThreshold = 0.1 + defaultTopnPosition = 10 ) var ( // schedulePeerPr the probability of schedule the hot peer. - schedulePeerPr = 0.66 + schedulePeerPr = defaultSchedulePeerPr // pendingAmpFactor will amplify the impact of pending influence, making scheduling slower or even serial when two stores are close together - pendingAmpFactor = 2.0 + pendingAmpFactor = defaultPendingAmpFactor // If the distribution of a dimension is below the corresponding stddev threshold, then scheduling will no longer be based on this dimension, // as it implies that this dimension is sufficiently uniform. - stddevThreshold = 0.1 + stddevThreshold = defaultStddevThreshold // topnPosition is the position of the topn peer in the hot peer list. // We use it to judge whether to schedule the hot peer in some cases. - topnPosition = 10 + topnPosition = defaultTopnPosition // statisticsInterval is the interval to update statistics information. statisticsInterval = time.Second ) diff --git a/pkg/schedule/schedulers/hot_region_test.go b/pkg/schedule/schedulers/hot_region_test.go index 304698c915e..401090d099f 100644 --- a/pkg/schedule/schedulers/hot_region_test.go +++ b/pkg/schedule/schedulers/hot_region_test.go @@ -41,9 +41,6 @@ import ( ) func init() { - // TODO: remove this global variable in the future. - // And use a function to create hot schduler for test. - schedulePeerPr = 1.0 // disable denoising in test. statistics.Denoising = false statisticsInterval = 0 @@ -204,6 +201,7 @@ func checkGCPendingOpInfos(re *require.Assertions, enablePlacementRules bool) { func TestSplitIfRegionTooHot(t *testing.T) { re := require.New(t) + schedulePeerPr = 1.0 cancel, _, tc, oc := prepareSchedulersTest() defer cancel() hb, err := CreateScheduler(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) @@ -392,6 +390,8 @@ func TestHotWriteRegionScheduleByteRateOnly(t *testing.T) { } func checkHotWriteRegionPlacement(re *require.Assertions, enablePlacementRules bool) { + // This test is used to test move leader and move peer. + schedulePeerPr = 1.0 cancel, _, tc, oc := prepareSchedulersTest() defer cancel() tc.SetEnableUseJointConsensus(true) @@ -433,6 +433,7 @@ func checkHotWriteRegionPlacement(re *require.Assertions, enablePlacementRules b ops, _ := hb.Schedule(tc, false) re.NotEmpty(ops) re.NotContains(ops[0].Step(1).String(), "transfer leader") + operatorutil.CheckTransferPeerWithLeaderTransfer(re, ops[0], operator.OpHotRegion, 1, 2) clearPendingInfluence(hb.(*hotScheduler)) tc.RuleManager.SetRule(&placement.Rule{ @@ -442,6 +443,7 @@ func checkHotWriteRegionPlacement(re *require.Assertions, enablePlacementRules b ops, _ = hb.Schedule(tc, false) re.NotEmpty(ops) re.NotContains(ops[0].Step(1).String(), "transfer leader") + operatorutil.CheckTransferPeerWithLeaderTransfer(re, ops[0], operator.OpHotRegion, 1, 2) } func checkHotWriteRegionScheduleByteRateOnly(re *require.Assertions, enablePlacementRules bool) { @@ -451,10 +453,12 @@ func checkHotWriteRegionScheduleByteRateOnly(re *require.Assertions, enablePlace tc.SetEnablePlacementRules(enablePlacementRules) labels := []string{"zone", "host"} tc.SetMaxReplicasWithLabel(enablePlacementRules, 3, labels...) + tc.SetHotRegionCacheHitsThreshold(0) + hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) hb.(*hotScheduler).conf.SetHistorySampleDuration(0) - tc.SetHotRegionCacheHitsThreshold(0) + hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{utils.BytePriority, utils.KeyPriority} // Add stores 1, 2, 3, 4, 5, 6 with region counts 3, 2, 2, 2, 0, 0. tc.AddLabelsStore(1, 3, map[string]string{"zone": "z1", "host": "h1"}) @@ -498,7 +502,8 @@ func checkHotWriteRegionScheduleByteRateOnly(re *require.Assertions, enablePlace // Will transfer a hot region from store 1, because the total count of peers // which is hot for store 1 is larger than other stores. - for i := 0; i < 20; i++ { + hasLeaderOperator, hasPeerOperator := false, false + for i := 0; i < 20 || !(hasLeaderOperator && hasPeerOperator); i++ { ops, _ = hb.Schedule(tc, false) op := ops[0] clearPendingInfluence(hb.(*hotScheduler)) @@ -507,6 +512,7 @@ func checkHotWriteRegionScheduleByteRateOnly(re *require.Assertions, enablePlace // balance by leader selected re.Equal("transfer-hot-write-leader", op.Desc()) operatorutil.CheckTransferLeaderFrom(re, op, operator.OpHotRegion, 1) + hasLeaderOperator = true case 5: // balance by peer selected re.Equal("move-hot-write-leader", op.Desc()) @@ -517,6 +523,7 @@ func checkHotWriteRegionScheduleByteRateOnly(re *require.Assertions, enablePlace // peer in store 1 of the region 1,3 can only transfer to store 6 operatorutil.CheckTransferPeerWithLeaderTransfer(re, op, operator.OpHotRegion, 1, 6) } + hasPeerOperator = true default: re.FailNow("wrong op: " + op.String()) } @@ -528,25 +535,12 @@ func checkHotWriteRegionScheduleByteRateOnly(re *require.Assertions, enablePlace clearPendingInfluence(hb.(*hotScheduler)) tc.SetHotRegionScheduleLimit(int(opt.GetHotRegionScheduleLimit())) - for i := 0; i < 20; i++ { - ops, _ := hb.Schedule(tc, false) - op := ops[0] - clearPendingInfluence(hb.(*hotScheduler)) - re.Equal(5, op.Len()) - if op.RegionID() == 2 { - // peer in store 1 of the region 2 can transfer to store 5 or store 6 because of the label - operatorutil.CheckTransferPeerWithLeaderTransfer(re, op, operator.OpHotRegion, 1, 0) - } else { - // peer in store 1 of the region 1,3 can only transfer to store 6 - operatorutil.CheckTransferPeerWithLeaderTransfer(re, op, operator.OpHotRegion, 1, 6) - } - } - // hot region scheduler is not affect by `balance-region-schedule-limit`. tc.SetRegionScheduleLimit(0) ops, _ = hb.Schedule(tc, false) re.Len(ops, 1) clearPendingInfluence(hb.(*hotScheduler)) + // Always produce operator ops, _ = hb.Schedule(tc, false) re.Len(ops, 1) @@ -589,7 +583,6 @@ func checkHotWriteRegionScheduleByteRateOnly(re *require.Assertions, enablePlace // Assuming different operators have the same possibility, // if code has bug, at most 6/7 possibility to success, // test 30 times, possibility of success < 0.1%. - // Cannot transfer leader because store 2 and store 3 are hot. // Source store is 1 or 3. // Region 1 and 2 are the same, cannot move peer to store 5 due to the label. // Region 3 can only move peer to store 5. @@ -600,11 +593,14 @@ func checkHotWriteRegionScheduleByteRateOnly(re *require.Assertions, enablePlace clearPendingInfluence(hb.(*hotScheduler)) switch op.RegionID() { case 1, 2: - if op.Len() == 3 { + switch op.Len() { + case 1: + operatorutil.CheckTransferLeader(re, op, operator.OpHotRegion, 1, 2) + case 3: operatorutil.CheckTransferPeer(re, op, operator.OpHotRegion, 3, 6) - } else if op.Len() == 4 { + case 4: operatorutil.CheckTransferPeerWithLeaderTransfer(re, op, operator.OpHotRegion, 1, 6) - } else { + default: re.FailNow("wrong operator: " + op.String()) } case 3: @@ -716,7 +712,10 @@ func TestHotWriteRegionScheduleByteRateOnlyWithTiFlash(t *testing.T) { } // Will transfer a hot learner from store 8, because the total count of peers // which is hot for store 8 is larger than other TiFlash stores. - for i := 0; i < 20; i++ { + pdServerCfg := tc.GetPDServerConfig() + pdServerCfg.FlowRoundByDigit = 8 + hasLeaderOperator, hasPeerOperator := false, false + for i := 0; i < 20 || !(hasLeaderOperator && hasPeerOperator); i++ { clearPendingInfluence(hb) ops, _ := hb.Schedule(tc, false) op := ops[0] @@ -725,14 +724,17 @@ func TestHotWriteRegionScheduleByteRateOnlyWithTiFlash(t *testing.T) { // balance by leader selected re.Equal("transfer-hot-write-leader", op.Desc()) operatorutil.CheckTransferLeaderFrom(re, op, operator.OpHotRegion, 1) + hasLeaderOperator = true case 2: // balance by peer selected - re.Equal("move-hot-write-leader", op.Desc()) - operatorutil.CheckTransferLearner(re, op, operator.OpHotRegion, 8, 10) + re.Equal("move-hot-write-peer", op.Desc()) + operatorutil.CheckTransferLearner(re, op, operator.OpHotRegion, 8, 0) + hasPeerOperator = true default: re.FailNow("wrong op: " + op.String()) } } + pdServerCfg.FlowRoundByDigit = 3 // Disable for TiFlash hb.conf.SetEnableForTiFlash(false) for i := 0; i < 20; i++ { @@ -838,11 +840,6 @@ func TestHotWriteRegionScheduleByteRateOnlyWithTiFlash(t *testing.T) { func TestHotWriteRegionScheduleWithQuery(t *testing.T) { re := require.New(t) - // TODO: add schedulePeerPr,Denoising,statisticsInterval to prepare function - originValue := schedulePeerPr - defer func() { - schedulePeerPr = originValue - }() cancel, _, tc, oc := prepareSchedulersTest() defer cancel() @@ -866,7 +863,6 @@ func TestHotWriteRegionScheduleWithQuery(t *testing.T) { {2, []uint64{1, 2, 3}, 500, 0, 500}, {3, []uint64{2, 1, 3}, 500, 0, 500}, }) - schedulePeerPr = 0.0 for i := 0; i < 100; i++ { clearPendingInfluence(hb.(*hotScheduler)) ops, _ := hb.Schedule(tc, false) @@ -876,8 +872,9 @@ func TestHotWriteRegionScheduleWithQuery(t *testing.T) { } func TestHotWriteRegionScheduleWithKeyRate(t *testing.T) { + // This test is used to test move peer. + schedulePeerPr = 1.0 re := require.New(t) - cancel, _, tc, oc := prepareSchedulersTest() defer cancel() hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) @@ -1076,18 +1073,13 @@ func TestHotWriteRegionScheduleWithPendingInfluence(t *testing.T) { } func checkHotWriteRegionScheduleWithPendingInfluence(re *require.Assertions, dim int) { + pendingAmpFactor = 0.0 cancel, _, tc, oc := prepareSchedulersTest() defer cancel() hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) - hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{utils.KeyPriority, utils.BytePriority} hb.(*hotScheduler).conf.RankFormulaVersion = "v1" hb.(*hotScheduler).conf.SetHistorySampleDuration(0) - old := pendingAmpFactor - pendingAmpFactor = 0.0 - defer func() { - pendingAmpFactor = old - }() tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) tc.AddRegionStore(1, 20) @@ -1105,6 +1097,7 @@ func checkHotWriteRegionScheduleWithPendingInfluence(re *require.Assertions, dim updateStore(4, 4*units.MiB*utils.StoreHeartBeatReportInterval) if dim == 0 { // byte rate + hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{utils.KeyPriority, utils.BytePriority} addRegionInfo(tc, utils.Write, []testRegionInfo{ {1, []uint64{1, 2, 3}, 512 * units.KiB, 0, 0}, {2, []uint64{1, 2, 3}, 512 * units.KiB, 0, 0}, @@ -1114,6 +1107,7 @@ func checkHotWriteRegionScheduleWithPendingInfluence(re *require.Assertions, dim {6, []uint64{1, 2, 3}, 512 * units.KiB, 0, 0}, }) } else if dim == 1 { // key rate + hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{utils.BytePriority, utils.KeyPriority} addRegionInfo(tc, utils.Write, []testRegionInfo{ {1, []uint64{1, 2, 3}, 0, 512 * units.KiB, 0}, {2, []uint64{1, 2, 3}, 0, 512 * units.KiB, 0}, @@ -1174,11 +1168,7 @@ func TestHotWriteRegionScheduleWithRuleEnabled(t *testing.T) { key, err := hex.DecodeString("") re.NoError(err) // skip stddev check - origin := stddevThreshold stddevThreshold = -1.0 - defer func() { - stddevThreshold = origin - }() tc.AddRegionStore(1, 20) tc.AddRegionStore(2, 20) @@ -1472,11 +1462,7 @@ func checkHotReadRegionScheduleWithPendingInfluence(re *require.Assertions, dim hb.(*hotScheduler).conf.DstToleranceRatio = 1 hb.(*hotScheduler).conf.ReadPriorities = []string{utils.BytePriority, utils.KeyPriority} hb.(*hotScheduler).conf.SetHistorySampleDuration(0) - old := pendingAmpFactor pendingAmpFactor = 0.0 - defer func() { - pendingAmpFactor = old - }() tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) tc.AddRegionStore(1, 20) @@ -1527,7 +1513,7 @@ func checkHotReadRegionScheduleWithPendingInfluence(re *require.Assertions, dim operatorutil.CheckTransferPeer(re, op1, operator.OpHotRegion, 1, 4) // After move-peer, store byte/key rate (min, max): (6.6, 7.1) | 6.1 | 6 | (5, 5.5) - pendingAmpFactor = old + pendingAmpFactor = defaultPendingAmpFactor ops, _ = hb.Schedule(tc, false) re.Empty(ops) pendingAmpFactor = 0.0 @@ -2042,11 +2028,6 @@ func checkSortResult(re *require.Assertions, regions []uint64, hotPeers []*stati func TestInfluenceByRWType(t *testing.T) { re := require.New(t) - originValue := schedulePeerPr - defer func() { - schedulePeerPr = originValue - }() - cancel, _, tc, oc := prepareSchedulersTest() defer cancel() hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) @@ -2176,11 +2157,7 @@ func TestHotScheduleWithPriority(t *testing.T) { hb.(*hotScheduler).conf.SetSrcToleranceRatio(1.05) hb.(*hotScheduler).conf.SetHistorySampleDuration(0) // skip stddev check - origin := stddevThreshold stddevThreshold = -1.0 - defer func() { - stddevThreshold = origin - }() tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) tc.AddRegionStore(1, 20) @@ -2273,6 +2250,7 @@ func TestHotScheduleWithPriority(t *testing.T) { func TestHotScheduleWithStddev(t *testing.T) { re := require.New(t) + schedulePeerPr = 1.0 cancel, _, tc, oc := prepareSchedulersTest() defer cancel() hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) @@ -2330,6 +2308,7 @@ func TestHotScheduleWithStddev(t *testing.T) { func TestHotWriteLeaderScheduleWithPriority(t *testing.T) { re := require.New(t) + pendingAmpFactor = 0.0 cancel, _, tc, oc := prepareSchedulersTest() defer cancel() hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) @@ -2353,11 +2332,6 @@ func TestHotWriteLeaderScheduleWithPriority(t *testing.T) { {4, []uint64{2, 1, 3}, 10 * units.MiB, 0 * units.MiB, 0}, {5, []uint64{3, 2, 1}, 0 * units.MiB, 10 * units.MiB, 0}, }) - old1, old2 := schedulePeerPr, pendingAmpFactor - schedulePeerPr, pendingAmpFactor = 0.0, 0.0 - defer func() { - schedulePeerPr, pendingAmpFactor = old1, old2 - }() hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{utils.KeyPriority, utils.BytePriority} ops, _ := hb.Schedule(tc, false) re.Len(ops, 1) diff --git a/pkg/schedule/schedulers/hot_region_v2_test.go b/pkg/schedule/schedulers/hot_region_v2_test.go index 25d6d94f7b1..9913faeae5c 100644 --- a/pkg/schedule/schedulers/hot_region_v2_test.go +++ b/pkg/schedule/schedulers/hot_region_v2_test.go @@ -30,6 +30,7 @@ import ( func TestHotWriteRegionScheduleWithRevertRegionsDimSecond(t *testing.T) { // This is a test that searchRevertRegions finds a solution of rank -1. re := require.New(t) + schedulePeerPr = 1.0 cancel, _, tc, oc := prepareSchedulersTest() defer cancel() sche, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil, nil) @@ -90,6 +91,7 @@ func TestHotWriteRegionScheduleWithRevertRegionsDimSecond(t *testing.T) { func TestHotWriteRegionScheduleWithRevertRegionsDimFirst(t *testing.T) { // This is a test that searchRevertRegions finds a solution of rank -3. re := require.New(t) + schedulePeerPr = 1.0 cancel, _, tc, oc := prepareSchedulersTest() defer cancel() sche, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil, nil) @@ -141,6 +143,7 @@ func TestHotWriteRegionScheduleWithRevertRegionsDimFirst(t *testing.T) { func TestHotWriteRegionScheduleWithRevertRegionsDimFirstOnly(t *testing.T) { // This is a test that searchRevertRegions finds a solution of rank -2. re := require.New(t) + schedulePeerPr = 1.0 cancel, _, tc, oc := prepareSchedulersTest() defer cancel() sche, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil, nil) @@ -352,11 +355,9 @@ func TestHotReadRegionScheduleWithSmallHotRegion(t *testing.T) { // Case1: Before #6827, we only use minHotRatio, so cannot schedule small hot region in this case. // Because 10000 is larger than the length of hotRegions, so `filterHotPeers` will skip the topn calculation. - origin := topnPosition topnPosition = 10000 ops := checkHotReadRegionScheduleWithSmallHotRegion(re, highLoad, lowLoad, emptyFunc) re.Empty(ops) - topnPosition = origin // Case2: After #6827, we use top10 as the threshold of minHotPeer. ops = checkHotReadRegionScheduleWithSmallHotRegion(re, highLoad, lowLoad, emptyFunc) @@ -401,7 +402,6 @@ func TestHotReadRegionScheduleWithSmallHotRegion(t *testing.T) { tc.AddRegionWithReadInfo(hotRegionID+1, 1, bigHotRegionByte, 0, bigHotRegionQuery, utils.StoreHeartBeatReportInterval, []uint64{2, 3}) }) re.Empty(ops) - topnPosition = origin // Case7: If there are more than topnPosition hot regions, but them are pending, // we will schedule large hot region rather than small hot region, so there is no operator. @@ -413,7 +413,6 @@ func TestHotReadRegionScheduleWithSmallHotRegion(t *testing.T) { hb.regionPendings[hotRegionID+1] = &pendingInfluence{} }) re.Empty(ops) - topnPosition = origin } func checkHotReadRegionScheduleWithSmallHotRegion(re *require.Assertions, highLoad, lowLoad uint64, diff --git a/pkg/schedule/schedulers/scheduler_test.go b/pkg/schedule/schedulers/scheduler_test.go index 1480d76b75b..90f14d90fb0 100644 --- a/pkg/schedule/schedulers/scheduler_test.go +++ b/pkg/schedule/schedulers/scheduler_test.go @@ -35,9 +35,17 @@ import ( "github.com/tikv/pd/pkg/versioninfo" ) -func prepareSchedulersTest(needToRunStream ...bool) (context.CancelFunc, config.SchedulerConfigProvider, *mockcluster.Cluster, *operator.Controller) { +func prepareSchedulersTest(needToRunStream ...bool) (func(), config.SchedulerConfigProvider, *mockcluster.Cluster, *operator.Controller) { Register() ctx, cancel := context.WithCancel(context.Background()) + clean := func() { + cancel() + // reset some config to avoid affecting other tests + schedulePeerPr = defaultSchedulePeerPr + pendingAmpFactor = defaultPendingAmpFactor + stddevThreshold = defaultStddevThreshold + topnPosition = defaultTopnPosition + } opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(ctx, opt) var stream *hbstream.HeartbeatStreams @@ -48,7 +56,7 @@ func prepareSchedulersTest(needToRunStream ...bool) (context.CancelFunc, config. } oc := operator.NewController(ctx, tc.GetBasicCluster(), tc.GetSchedulerConfig(), stream) tc.SetHotRegionCacheHitsThreshold(1) - return cancel, opt, tc, oc + return clean, opt, tc, oc } func TestShuffleLeader(t *testing.T) { diff --git a/pkg/statistics/store_hot_peers_infos.go b/pkg/statistics/store_hot_peers_infos.go index 59ee3c20b6f..a65366c41bf 100644 --- a/pkg/statistics/store_hot_peers_infos.go +++ b/pkg/statistics/store_hot_peers_infos.go @@ -288,9 +288,10 @@ func summaryStoresLoadByEngine( func filterHotPeers(kind constant.ResourceKind, peers []*HotPeerStat) []*HotPeerStat { ret := make([]*HotPeerStat, 0, len(peers)) for _, peer := range peers { - if kind != constant.LeaderKind || peer.IsLeader() { - ret = append(ret, peer) + if kind == constant.LeaderKind && !peer.IsLeader() { + continue } + ret = append(ret, peer) } return ret } diff --git a/pkg/utils/operatorutil/operator_check.go b/pkg/utils/operatorutil/operator_check.go index 61efd84ef1a..0c512974445 100644 --- a/pkg/utils/operatorutil/operator_check.go +++ b/pkg/utils/operatorutil/operator_check.go @@ -82,7 +82,7 @@ func CheckTransferPeer(re *require.Assertions, op *operator.Operator, kind opera addLearnerTo = steps[0].(operator.AddLearner).ToStore removePeerFrom = steps[3].(operator.RemovePeer).FromStore default: - re.FailNow("unexpected operator steps") + re.FailNow("unexpected operator steps: " + op.String()) } re.Equal(sourceID, removePeerFrom) re.Equal(targetID, addLearnerTo) @@ -95,7 +95,9 @@ func CheckTransferLearner(re *require.Assertions, op *operator.Operator, kind op re.NotNil(op) steps, _ := trimTransferLeaders(op) re.Len(steps, 2) - re.Equal(targetID, steps[0].(operator.AddLearner).ToStore) + if targetID != 0 { + re.Equal(targetID, steps[0].(operator.AddLearner).ToStore) + } re.Equal(sourceID, steps[1].(operator.RemovePeer).FromStore) kind |= operator.OpRegion re.Equal(kind, op.Kind()&kind) @@ -124,7 +126,7 @@ func CheckTransferPeerWithLeaderTransfer(re *require.Assertions, op *operator.Op addLearnerTo = steps[0].(operator.AddLearner).ToStore removePeerFrom = steps[3].(operator.RemovePeer).FromStore default: - re.FailNow("unexpected operator steps") + re.FailNow("unexpected operator steps: " + op.String()) } re.NotZero(lastLeader) re.NotEqual(sourceID, lastLeader) @@ -177,7 +179,7 @@ func CheckSteps(re *require.Assertions, op *operator.Operator, steps []operator. case operator.MergeRegion: re.Equal(steps[i].(operator.MergeRegion).IsPassive, op.Step(i).(operator.MergeRegion).IsPassive) default: - re.FailNow("unknown operator step type") + re.FailNow("unexpected operator steps: " + op.String()) } } } From a91615b8c9af48473f376edf7feca72d1b716741 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 17 Apr 2024 22:52:10 +0800 Subject: [PATCH 2/7] make test stable Signed-off-by: lhy1024 --- pkg/schedule/schedulers/hot_region_test.go | 12 ++++++++---- pkg/schedule/schedulers/scheduler_test.go | 1 + 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/pkg/schedule/schedulers/hot_region_test.go b/pkg/schedule/schedulers/hot_region_test.go index 401090d099f..641f40b94f8 100644 --- a/pkg/schedule/schedulers/hot_region_test.go +++ b/pkg/schedule/schedulers/hot_region_test.go @@ -781,8 +781,10 @@ func TestHotWriteRegionScheduleByteRateOnlyWithTiFlash(t *testing.T) { allowLeaderTiKVCount := aliveTiKVCount - 1 // store 5 with evict leader aliveTiFlashCount := float64(aliveTiFlashLastID - aliveTiFlashStartID + 1) tc.ObserveRegionsStats() - ops, _ := hb.Schedule(tc, false) - re.NotEmpty(ops) + testutil.Eventually(re, func() bool { + ops, _ := hb.Schedule(tc, false) + return len(ops) != 0 + }) re.True( loadsEqual( hb.stLoadInfos[writeLeader][1].LoadPred.Expect.Loads, @@ -801,8 +803,10 @@ func TestHotWriteRegionScheduleByteRateOnlyWithTiFlash(t *testing.T) { pdServerCfg.FlowRoundByDigit = 8 tc.SetPDServerConfig(pdServerCfg) clearPendingInfluence(hb) - ops, _ = hb.Schedule(tc, false) - re.NotEmpty(ops) + testutil.Eventually(re, func() bool { + ops, _ := hb.Schedule(tc, false) + return len(ops) != 0 + }) re.True( loadsEqual( hb.stLoadInfos[writePeer][8].LoadPred.Expect.Loads, diff --git a/pkg/schedule/schedulers/scheduler_test.go b/pkg/schedule/schedulers/scheduler_test.go index 90f14d90fb0..d503931f0b2 100644 --- a/pkg/schedule/schedulers/scheduler_test.go +++ b/pkg/schedule/schedulers/scheduler_test.go @@ -317,6 +317,7 @@ func TestShuffleRegionRole(t *testing.T) { } func TestSpecialUseHotRegion(t *testing.T) { + schedulePeerPr = 1.0 re := require.New(t) cancel, _, tc, oc := prepareSchedulersTest() defer cancel() From ccbbef161d090fce9f2d6473732f00305b30754c Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Tue, 25 Jun 2024 20:45:19 +0800 Subject: [PATCH 3/7] remove schedulePeerPr Signed-off-by: lhy1024 --- pkg/schedule/schedulers/grant_hot_region.go | 10 +-- pkg/schedule/schedulers/hot_region.go | 76 ++++++++++--------- pkg/schedule/schedulers/hot_region_test.go | 47 +++++++----- pkg/schedule/schedulers/hot_region_v2_test.go | 6 +- pkg/schedule/schedulers/scheduler_test.go | 12 +-- pkg/schedule/schedulers/shuffle_hot_region.go | 6 +- 6 files changed, 86 insertions(+), 71 deletions(-) diff --git a/pkg/schedule/schedulers/grant_hot_region.go b/pkg/schedule/schedulers/grant_hot_region.go index 56ed7cd730e..905462fb55a 100644 --- a/pkg/schedule/schedulers/grant_hot_region.go +++ b/pkg/schedule/schedulers/grant_hot_region.go @@ -258,13 +258,13 @@ func newGrantHotRegionHandler(config *grantHotRegionSchedulerConfig) http.Handle func (s *grantHotRegionScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) { grantHotRegionCounter.Inc() - rw := s.randomRWType() - s.prepareForBalance(rw, cluster) - return s.dispatch(rw, cluster), nil + typ := s.randomType() + s.prepareForBalance(typ, cluster) + return s.dispatch(typ, cluster), nil } -func (s *grantHotRegionScheduler) dispatch(typ utils.RWType, cluster sche.SchedulerCluster) []*operator.Operator { - stLoadInfos := s.stLoadInfos[buildResourceType(typ, constant.RegionKind)] +func (s *grantHotRegionScheduler) dispatch(typ resourceType, cluster sche.SchedulerCluster) []*operator.Operator { + stLoadInfos := s.stLoadInfos[typ] infos := make([]*statistics.StoreLoadDetail, len(stLoadInfos)) index := 0 for _, info := range stLoadInfos { diff --git a/pkg/schedule/schedulers/hot_region.go b/pkg/schedule/schedulers/hot_region.go index 284cf07032c..9189741d01d 100644 --- a/pkg/schedule/schedulers/hot_region.go +++ b/pkg/schedule/schedulers/hot_region.go @@ -53,15 +53,12 @@ const ( splitProgressiveRank = int64(-5) minHotScheduleInterval = time.Second maxHotScheduleInterval = 20 * time.Second - defaultSchedulePeerPr = 0.66 defaultPendingAmpFactor = 2.0 defaultStddevThreshold = 0.1 defaultTopnPosition = 10 ) var ( - // schedulePeerPr the probability of schedule the hot peer. - schedulePeerPr = defaultSchedulePeerPr // pendingAmpFactor will amplify the impact of pending influence, making scheduling slower or even serial when two stores are close together pendingAmpFactor = defaultPendingAmpFactor // If the distribution of a dimension is below the corresponding stddev threshold, then scheduling will no longer be based on this dimension, @@ -125,7 +122,7 @@ type baseHotScheduler struct { // this records regionID which have pending Operator by operation type. During filterHotPeers, the hot peers won't // be selected if its owner region is tracked in this attribute. regionPendings map[uint64]*pendingInfluence - types []utils.RWType + types []resourceType r *rand.Rand updateReadTime time.Time updateWriteTime time.Time @@ -135,12 +132,12 @@ func newBaseHotScheduler(opController *operator.Controller, sampleDuration time. base := NewBaseScheduler(opController) ret := &baseHotScheduler{ BaseScheduler: base, - types: []utils.RWType{utils.Write, utils.Read}, regionPendings: make(map[uint64]*pendingInfluence), stHistoryLoads: statistics.NewStoreHistoryLoads(utils.DimLen, sampleDuration, sampleInterval), r: rand.New(rand.NewSource(time.Now().UnixNano())), } for ty := resourceType(0); ty < resourceTypeLen; ty++ { + ret.types = append(ret.types, ty) ret.stLoadInfos[ty] = map[uint64]*statistics.StoreLoadDetail{} } return ret @@ -148,13 +145,13 @@ func newBaseHotScheduler(opController *operator.Controller, sampleDuration time. // prepareForBalance calculate the summary of pending Influence for each store and prepare the load detail for // each store, only update read or write load detail -func (h *baseHotScheduler) prepareForBalance(rw utils.RWType, cluster sche.SchedulerCluster) { +func (h *baseHotScheduler) prepareForBalance(typ resourceType, cluster sche.SchedulerCluster) { storeInfos := statistics.SummaryStoreInfos(cluster.GetStores()) h.summaryPendingInfluence(storeInfos) storesLoads := cluster.GetStoresLoads() isTraceRegionFlow := cluster.GetSchedulerConfig().IsTraceRegionFlow() - prepare := func(regionStats map[uint64][]*statistics.HotPeerStat, resource constant.ResourceKind) { + prepare := func(regionStats map[uint64][]*statistics.HotPeerStat, rw utils.RWType, resource constant.ResourceKind) { ty := buildResourceType(rw, resource) h.stLoadInfos[ty] = statistics.SummaryStoresLoad( storeInfos, @@ -164,23 +161,25 @@ func (h *baseHotScheduler) prepareForBalance(rw utils.RWType, cluster sche.Sched isTraceRegionFlow, rw, resource) } - switch rw { - case utils.Read: + switch typ { + case readLeader, readPeer: // update read statistics if time.Since(h.updateReadTime) >= statisticsInterval { regionRead := cluster.RegionReadStats() - prepare(regionRead, constant.LeaderKind) - prepare(regionRead, constant.RegionKind) + prepare(regionRead, utils.Read, constant.LeaderKind) + prepare(regionRead, utils.Read, constant.RegionKind) h.updateReadTime = time.Now() } - case utils.Write: + case writeLeader, writePeer: // update write statistics if time.Since(h.updateWriteTime) >= statisticsInterval { regionWrite := cluster.RegionWriteStats() - prepare(regionWrite, constant.LeaderKind) - prepare(regionWrite, constant.RegionKind) + prepare(regionWrite, utils.Write, constant.LeaderKind) + prepare(regionWrite, utils.Write, constant.RegionKind) h.updateWriteTime = time.Now() } + default: + log.Error("invalid resource type", zap.String("type", typ.String())) } } @@ -227,7 +226,7 @@ func setHotPendingInfluenceMetrics(storeLabel, rwTy, dim string, load float64) { HotPendingSum.WithLabelValues(storeLabel, rwTy, dim).Set(load) } -func (h *baseHotScheduler) randomRWType() utils.RWType { +func (h *baseHotScheduler) randomType() resourceType { return h.types[h.r.Int()%len(h.types)] } @@ -328,24 +327,32 @@ func (h *hotScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { func (h *hotScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) { hotSchedulerCounter.Inc() - rw := h.randomRWType() - return h.dispatch(rw, cluster), nil + typ := h.randomType() + return h.dispatch(typ, cluster), nil } -func (h *hotScheduler) dispatch(typ utils.RWType, cluster sche.SchedulerCluster) []*operator.Operator { +func (h *hotScheduler) dispatch(typ resourceType, cluster sche.SchedulerCluster) []*operator.Operator { h.Lock() defer h.Unlock() h.updateHistoryLoadConfig(h.conf.GetHistorySampleDuration(), h.conf.GetHistorySampleInterval()) h.prepareForBalance(typ, cluster) - // it can not move earlier to support to use api and metrics. - if h.conf.IsForbidRWType(typ) { - return nil - } + // IsForbidRWType can not be move earlier to support to use api and metrics. switch typ { - case utils.Read: + case readLeader, readPeer: + if h.conf.IsForbidRWType(utils.Read) { + return nil + } return h.balanceHotReadRegions(cluster) - case utils.Write: - return h.balanceHotWriteRegions(cluster) + case writePeer: + if h.conf.IsForbidRWType(utils.Write) { + return nil + } + return h.balanceHotWritePeers(cluster) + case writeLeader: + if h.conf.IsForbidRWType(utils.Write) { + return nil + } + return h.balanceHotWriteLeaders(cluster) } return nil } @@ -410,19 +417,16 @@ func (h *hotScheduler) balanceHotReadRegions(cluster sche.SchedulerCluster) []*o return nil } -func (h *hotScheduler) balanceHotWriteRegions(cluster sche.SchedulerCluster) []*operator.Operator { - // prefer to balance by peer - s := h.r.Intn(100) - switch { - case s < int(schedulePeerPr*100): - peerSolver := newBalanceSolver(h, cluster, utils.Write, movePeer) - ops := peerSolver.solve() - if len(ops) > 0 && peerSolver.tryAddPendingInfluence() { - return ops - } - default: +func (h *hotScheduler) balanceHotWritePeers(cluster sche.SchedulerCluster) []*operator.Operator { + peerSolver := newBalanceSolver(h, cluster, utils.Write, movePeer) + ops := peerSolver.solve() + if len(ops) > 0 && peerSolver.tryAddPendingInfluence() { + return ops } + return nil +} +func (h *hotScheduler) balanceHotWriteLeaders(cluster sche.SchedulerCluster) []*operator.Operator { leaderSolver := newBalanceSolver(h, cluster, utils.Write, transferLeader) ops := leaderSolver.solve() if len(ops) > 0 && leaderSolver.tryAddPendingInfluence() { diff --git a/pkg/schedule/schedulers/hot_region_test.go b/pkg/schedule/schedulers/hot_region_test.go index 641f40b94f8..df20cd0180e 100644 --- a/pkg/schedule/schedulers/hot_region_test.go +++ b/pkg/schedule/schedulers/hot_region_test.go @@ -56,14 +56,14 @@ func init() { func newHotReadScheduler(opController *operator.Controller, conf *hotRegionSchedulerConfig) *hotScheduler { ret := newHotScheduler(opController, conf) ret.name = "" - ret.types = []utils.RWType{utils.Read} + ret.types = []resourceType{readLeader, readPeer} return ret } func newHotWriteScheduler(opController *operator.Controller, conf *hotRegionSchedulerConfig) *hotScheduler { ret := newHotScheduler(opController, conf) ret.name = "" - ret.types = []utils.RWType{utils.Write} + ret.types = []resourceType{writeLeader, writePeer} return ret } @@ -201,7 +201,6 @@ func checkGCPendingOpInfos(re *require.Assertions, enablePlacementRules bool) { func TestSplitIfRegionTooHot(t *testing.T) { re := require.New(t) - schedulePeerPr = 1.0 cancel, _, tc, oc := prepareSchedulersTest() defer cancel() hb, err := CreateScheduler(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) @@ -260,6 +259,7 @@ func TestSplitIfRegionTooHot(t *testing.T) { {1, []uint64{1, 2, 3}, 4 * units.MiB, 0, 0}, }) hb, _ = CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb.(*hotScheduler).types = []resourceType{writePeer} ops, _ = hb.Schedule(tc, false) re.Len(ops, 1) expectOp, _ = operator.CreateSplitRegionOperator(splitHotReadBuckets, tc.GetRegion(1), operator.OpSplit, @@ -391,7 +391,6 @@ func TestHotWriteRegionScheduleByteRateOnly(t *testing.T) { func checkHotWriteRegionPlacement(re *require.Assertions, enablePlacementRules bool) { // This test is used to test move leader and move peer. - schedulePeerPr = 1.0 cancel, _, tc, oc := prepareSchedulersTest() defer cancel() tc.SetEnableUseJointConsensus(true) @@ -401,6 +400,7 @@ func checkHotWriteRegionPlacement(re *require.Assertions, enablePlacementRules b tc.SetMaxReplicasWithLabel(enablePlacementRules, 3, labels...) hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) + hb.(*hotScheduler).types = []resourceType{writePeer} hb.(*hotScheduler).conf.SetHistorySampleDuration(0) tc.AddLabelsStore(1, 2, map[string]string{"zone": "z1", "host": "h1"}) @@ -740,8 +740,10 @@ func TestHotWriteRegionScheduleByteRateOnlyWithTiFlash(t *testing.T) { for i := 0; i < 20; i++ { clearPendingInfluence(hb) ops, _ := hb.Schedule(tc, false) - op := ops[0] - operatorutil.CheckTransferLeaderFrom(re, op, operator.OpHotRegion, 1) + if len(ops) == 0 { + continue + } + operatorutil.CheckTransferLeaderFrom(re, ops[0], operator.OpHotRegion, 1) } // | store_id | write_bytes_rate | // |----------|------------------| @@ -870,19 +872,21 @@ func TestHotWriteRegionScheduleWithQuery(t *testing.T) { for i := 0; i < 100; i++ { clearPendingInfluence(hb.(*hotScheduler)) ops, _ := hb.Schedule(tc, false) - op := ops[0] - operatorutil.CheckTransferLeader(re, op, operator.OpHotRegion, 1, 3) + if len(ops) == 0 { + continue + } + operatorutil.CheckTransferLeader(re, ops[0], operator.OpHotRegion, 1, 3) } } func TestHotWriteRegionScheduleWithKeyRate(t *testing.T) { // This test is used to test move peer. - schedulePeerPr = 1.0 re := require.New(t) cancel, _, tc, oc := prepareSchedulersTest() defer cancel() hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) + hb.(*hotScheduler).types = []resourceType{writePeer} hb.(*hotScheduler).conf.SetDstToleranceRatio(1) hb.(*hotScheduler).conf.SetSrcToleranceRatio(1) hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{utils.KeyPriority, utils.BytePriority} @@ -1016,6 +1020,7 @@ func TestHotWriteRegionScheduleWithLeader(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb.(*hotScheduler).types = []resourceType{writeLeader} hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{utils.KeyPriority, utils.BytePriority} hb.(*hotScheduler).conf.SetHistorySampleDuration(0) re.NoError(err) @@ -1132,7 +1137,7 @@ func checkHotWriteRegionScheduleWithPendingInfluence(re *require.Assertions, dim ops, _ := hb.Schedule(tc, false) for len(ops) == 0 { emptyCnt++ - if emptyCnt >= 10 { + if emptyCnt >= 100 { break testLoop } ops, _ = hb.Schedule(tc, false) @@ -1229,9 +1234,11 @@ func TestHotWriteRegionScheduleWithRuleEnabled(t *testing.T) { for i := 0; i < 100; i++ { clearPendingInfluence(hb.(*hotScheduler)) ops, _ := hb.Schedule(tc, false) - op := ops[0] + if len(ops) == 0 { + continue + } // The targetID should always be 1 as leader is only allowed to be placed in store1 or store2 by placement rule - operatorutil.CheckTransferLeader(re, op, operator.OpHotRegion, 2, 1) + operatorutil.CheckTransferLeader(re, ops[0], operator.OpHotRegion, 2, 1) ops, _ = hb.Schedule(tc, false) re.Empty(ops) } @@ -1908,7 +1915,8 @@ func checkHotCacheCheckRegionFlow(re *require.Assertions, testCase testHotCacheC if testCase.DegreeAfterTransferLeader >= 3 { // try schedule - hb.prepareForBalance(testCase.kind, tc) + typ := toResourceType(testCase.kind, transferLeader) + hb.prepareForBalance(typ, tc) leaderSolver := newBalanceSolver(hb, tc, testCase.kind, transferLeader) leaderSolver.cur = &solution{srcStore: hb.stLoadInfos[toResourceType(testCase.kind, transferLeader)][2]} re.Empty(leaderSolver.filterHotPeers(leaderSolver.cur.srcStore)) // skip schedule @@ -2036,6 +2044,7 @@ func TestInfluenceByRWType(t *testing.T) { defer cancel() hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) + hb.(*hotScheduler).types = []resourceType{writePeer} hb.(*hotScheduler).conf.SetDstToleranceRatio(1) hb.(*hotScheduler).conf.SetSrcToleranceRatio(1) hb.(*hotScheduler).conf.SetHistorySampleDuration(0) @@ -2054,8 +2063,6 @@ func TestInfluenceByRWType(t *testing.T) { addRegionInfo(tc, utils.Read, []testRegionInfo{ {1, []uint64{2, 1, 3}, 0.5 * units.MiB, 0.5 * units.MiB, 0}, }) - // must move peer - schedulePeerPr = 1.0 // must move peer from 1 to 4 ops, _ := hb.Schedule(tc, false) op := ops[0] @@ -2080,7 +2087,7 @@ func TestInfluenceByRWType(t *testing.T) { } // must transfer leader - schedulePeerPr = 0 + hb.(*hotScheduler).types = []resourceType{writeLeader} // must transfer leader from 1 to 3 ops, _ = hb.Schedule(tc, false) op = ops[0] @@ -2157,6 +2164,7 @@ func TestHotScheduleWithPriority(t *testing.T) { defer cancel() hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) + hb.(*hotScheduler).types = []resourceType{writePeer} hb.(*hotScheduler).conf.SetDstToleranceRatio(1.05) hb.(*hotScheduler).conf.SetSrcToleranceRatio(1.05) hb.(*hotScheduler).conf.SetHistorySampleDuration(0) @@ -2175,8 +2183,7 @@ func TestHotScheduleWithPriority(t *testing.T) { tc.UpdateStorageWrittenStats(3, 6*units.MiB*utils.StoreHeartBeatReportInterval, 6*units.MiB*utils.StoreHeartBeatReportInterval) tc.UpdateStorageWrittenStats(4, 9*units.MiB*utils.StoreHeartBeatReportInterval, 10*units.MiB*utils.StoreHeartBeatReportInterval) tc.UpdateStorageWrittenStats(5, 1*units.MiB*utils.StoreHeartBeatReportInterval, 1*units.MiB*utils.StoreHeartBeatReportInterval) - // must transfer peer - schedulePeerPr = 1.0 + addRegionInfo(tc, utils.Write, []testRegionInfo{ {1, []uint64{1, 2, 3}, 2 * units.MiB, 1 * units.MiB, 0}, {6, []uint64{4, 2, 3}, 1 * units.MiB, 2 * units.MiB, 0}, @@ -2215,6 +2222,7 @@ func TestHotScheduleWithPriority(t *testing.T) { operatorutil.CheckTransferLeader(re, ops[0], operator.OpHotRegion, 1, 3) hb, err = CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb.(*hotScheduler).types = []resourceType{writePeer} hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{utils.KeyPriority, utils.BytePriority} hb.(*hotScheduler).conf.RankFormulaVersion = "v1" hb.(*hotScheduler).conf.SetHistorySampleDuration(0) @@ -2254,11 +2262,11 @@ func TestHotScheduleWithPriority(t *testing.T) { func TestHotScheduleWithStddev(t *testing.T) { re := require.New(t) - schedulePeerPr = 1.0 cancel, _, tc, oc := prepareSchedulersTest() defer cancel() hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) + hb.(*hotScheduler).types = []resourceType{writePeer} hb.(*hotScheduler).conf.SetDstToleranceRatio(1.0) hb.(*hotScheduler).conf.SetSrcToleranceRatio(1.0) hb.(*hotScheduler).conf.RankFormulaVersion = "v1" @@ -2317,6 +2325,7 @@ func TestHotWriteLeaderScheduleWithPriority(t *testing.T) { defer cancel() hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) + hb.(*hotScheduler).types = []resourceType{writeLeader} hb.(*hotScheduler).conf.SetDstToleranceRatio(1) hb.(*hotScheduler).conf.SetSrcToleranceRatio(1) hb.(*hotScheduler).conf.SetHistorySampleDuration(0) diff --git a/pkg/schedule/schedulers/hot_region_v2_test.go b/pkg/schedule/schedulers/hot_region_v2_test.go index 9913faeae5c..dd1d99fc01d 100644 --- a/pkg/schedule/schedulers/hot_region_v2_test.go +++ b/pkg/schedule/schedulers/hot_region_v2_test.go @@ -30,12 +30,12 @@ import ( func TestHotWriteRegionScheduleWithRevertRegionsDimSecond(t *testing.T) { // This is a test that searchRevertRegions finds a solution of rank -1. re := require.New(t) - schedulePeerPr = 1.0 cancel, _, tc, oc := prepareSchedulersTest() defer cancel() sche, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil, nil) re.NoError(err) hb := sche.(*hotScheduler) + hb.types = []resourceType{writePeer} hb.conf.SetDstToleranceRatio(0.0) hb.conf.SetSrcToleranceRatio(0.0) hb.conf.SetRankFormulaVersion("v1") @@ -91,12 +91,12 @@ func TestHotWriteRegionScheduleWithRevertRegionsDimSecond(t *testing.T) { func TestHotWriteRegionScheduleWithRevertRegionsDimFirst(t *testing.T) { // This is a test that searchRevertRegions finds a solution of rank -3. re := require.New(t) - schedulePeerPr = 1.0 cancel, _, tc, oc := prepareSchedulersTest() defer cancel() sche, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil, nil) re.NoError(err) hb := sche.(*hotScheduler) + hb.types = []resourceType{writePeer} hb.conf.SetDstToleranceRatio(0.0) hb.conf.SetSrcToleranceRatio(0.0) hb.conf.SetRankFormulaVersion("v1") @@ -143,12 +143,12 @@ func TestHotWriteRegionScheduleWithRevertRegionsDimFirst(t *testing.T) { func TestHotWriteRegionScheduleWithRevertRegionsDimFirstOnly(t *testing.T) { // This is a test that searchRevertRegions finds a solution of rank -2. re := require.New(t) - schedulePeerPr = 1.0 cancel, _, tc, oc := prepareSchedulersTest() defer cancel() sche, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil, nil) re.NoError(err) hb := sche.(*hotScheduler) + hb.types = []resourceType{writePeer} hb.conf.SetDstToleranceRatio(0.0) hb.conf.SetSrcToleranceRatio(0.0) hb.conf.SetRankFormulaVersion("v1") diff --git a/pkg/schedule/schedulers/scheduler_test.go b/pkg/schedule/schedulers/scheduler_test.go index d503931f0b2..5a603515942 100644 --- a/pkg/schedule/schedulers/scheduler_test.go +++ b/pkg/schedule/schedulers/scheduler_test.go @@ -41,7 +41,6 @@ func prepareSchedulersTest(needToRunStream ...bool) (func(), config.SchedulerCon clean := func() { cancel() // reset some config to avoid affecting other tests - schedulePeerPr = defaultSchedulePeerPr pendingAmpFactor = defaultPendingAmpFactor stddevThreshold = defaultStddevThreshold topnPosition = defaultTopnPosition @@ -317,7 +316,6 @@ func TestShuffleRegionRole(t *testing.T) { } func TestSpecialUseHotRegion(t *testing.T) { - schedulePeerPr = 1.0 re := require.New(t) cancel, _, tc, oc := prepareSchedulersTest() defer cancel() @@ -363,9 +361,13 @@ func TestSpecialUseHotRegion(t *testing.T) { tc.AddLeaderRegionWithWriteInfo(5, 3, 512*units.KiB*utils.RegionHeartBeatReportInterval, 0, 0, utils.RegionHeartBeatReportInterval, []uint64{1, 2}) hs, err := CreateScheduler(utils.Write.String(), oc, storage, cd) re.NoError(err) - ops, _ = hs.Schedule(tc, false) - re.Len(ops, 1) - operatorutil.CheckTransferPeer(re, ops[0], operator.OpHotRegion, 1, 4) + for i := 0; i < 100; i++ { + ops, _ = hs.Schedule(tc, false) + if len(ops) == 0 { + continue + } + operatorutil.CheckTransferPeer(re, ops[0], operator.OpHotRegion, 1, 4) + } } func TestSpecialUseReserved(t *testing.T) { diff --git a/pkg/schedule/schedulers/shuffle_hot_region.go b/pkg/schedule/schedulers/shuffle_hot_region.go index 0b9021267cb..5e6600e531f 100644 --- a/pkg/schedule/schedulers/shuffle_hot_region.go +++ b/pkg/schedule/schedulers/shuffle_hot_region.go @@ -159,9 +159,9 @@ func (s *shuffleHotRegionScheduler) IsScheduleAllowed(cluster sche.SchedulerClus func (s *shuffleHotRegionScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) { shuffleHotRegionCounter.Inc() - rw := s.randomRWType() - s.prepareForBalance(rw, cluster) - operators := s.randomSchedule(cluster, s.stLoadInfos[buildResourceType(rw, constant.LeaderKind)]) + typ := s.randomType() + s.prepareForBalance(typ, cluster) + operators := s.randomSchedule(cluster, s.stLoadInfos[typ]) return operators, nil } From 73aab7053aa913f9bd301b1e5ba73439aabc0750 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Tue, 25 Jun 2024 21:55:20 +0800 Subject: [PATCH 4/7] remove comments Signed-off-by: lhy1024 --- pkg/schedule/schedulers/hot_region_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/schedule/schedulers/hot_region_test.go b/pkg/schedule/schedulers/hot_region_test.go index df20cd0180e..2c94da0f315 100644 --- a/pkg/schedule/schedulers/hot_region_test.go +++ b/pkg/schedule/schedulers/hot_region_test.go @@ -390,7 +390,6 @@ func TestHotWriteRegionScheduleByteRateOnly(t *testing.T) { } func checkHotWriteRegionPlacement(re *require.Assertions, enablePlacementRules bool) { - // This test is used to test move leader and move peer. cancel, _, tc, oc := prepareSchedulersTest() defer cancel() tc.SetEnableUseJointConsensus(true) From f4f4e0406b96ae2e3ea80e374bdb1086c6124632 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Thu, 27 Jun 2024 16:20:43 +0800 Subject: [PATCH 5/7] add more tests Signed-off-by: lhy1024 --- pkg/statistics/hot_cache_test.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/pkg/statistics/hot_cache_test.go b/pkg/statistics/hot_cache_test.go index 9794a4a8968..fbd28c94683 100644 --- a/pkg/statistics/hot_cache_test.go +++ b/pkg/statistics/hot_cache_test.go @@ -26,11 +26,13 @@ func TestIsHot(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cache := NewHotCache(ctx) - region := buildRegion(utils.Read, 3, 60) - stats := cache.CheckReadPeerSync(region, region.GetPeers(), []float64{100000000, 1000, 1000}, 60) - cache.Update(stats[0], utils.Read) - for i := 0; i < 100; i++ { - re.True(cache.IsRegionHot(region, 1)) + for i := utils.RWType(0); i < utils.RWTypeLen; i++ { + cache := NewHotCache(ctx) + region := buildRegion(i, 3, 60) + stats := cache.CheckReadPeerSync(region, region.GetPeers(), []float64{100000000, 1000, 1000}, 60) + cache.Update(stats[0], i) + for i := 0; i < 100; i++ { + re.True(cache.IsRegionHot(region, 1)) + } } } From f7d8c25a9256d7c1809a05b2bdebd751ffc5187f Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Tue, 2 Jul 2024 18:03:35 +0800 Subject: [PATCH 6/7] address comments Signed-off-by: lhy1024 --- pkg/schedule/schedulers/hot_region_test.go | 12 +++++------- pkg/utils/operatorutil/operator_check.go | 4 +--- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/pkg/schedule/schedulers/hot_region_test.go b/pkg/schedule/schedulers/hot_region_test.go index 2c94da0f315..587e920894f 100644 --- a/pkg/schedule/schedulers/hot_region_test.go +++ b/pkg/schedule/schedulers/hot_region_test.go @@ -655,12 +655,12 @@ func TestHotWriteRegionScheduleByteRateOnlyWithTiFlash(t *testing.T) { hb.conf.SetHistorySampleDuration(0) // Add TiKV stores 1, 2, 3, 4, 5, 6, 7 (Down) with region counts 3, 3, 2, 2, 0, 0, 0. - // Add TiFlash stores 8, 9, 10, 11 with region counts 3, 1, 1, 0. - storeCount := uint64(11) + // Add TiFlash stores 8, 9, 10 with region counts 2, 1, 1. + storeCount := uint64(10) aliveTiKVStartID := uint64(1) aliveTiKVLastID := uint64(6) aliveTiFlashStartID := uint64(8) - aliveTiFlashLastID := uint64(11) + aliveTiFlashLastID := uint64(10) downStoreID := uint64(7) tc.AddLabelsStore(1, 3, map[string]string{"zone": "z1", "host": "h1"}) tc.AddLabelsStore(2, 3, map[string]string{"zone": "z2", "host": "h2"}) @@ -672,7 +672,6 @@ func TestHotWriteRegionScheduleByteRateOnlyWithTiFlash(t *testing.T) { tc.AddLabelsStore(8, 3, map[string]string{"zone": "z1", "host": "h8", "engine": "tiflash"}) tc.AddLabelsStore(9, 1, map[string]string{"zone": "z2", "host": "h9", "engine": "tiflash"}) tc.AddLabelsStore(10, 1, map[string]string{"zone": "z5", "host": "h10", "engine": "tiflash"}) - tc.AddLabelsStore(11, 0, map[string]string{"zone": "z3", "host": "h11", "engine": "tiflash"}) tc.SetStoreDown(downStoreID) for i := uint64(1); i <= storeCount; i++ { if i != downStoreID { @@ -712,7 +711,7 @@ func TestHotWriteRegionScheduleByteRateOnlyWithTiFlash(t *testing.T) { // Will transfer a hot learner from store 8, because the total count of peers // which is hot for store 8 is larger than other TiFlash stores. pdServerCfg := tc.GetPDServerConfig() - pdServerCfg.FlowRoundByDigit = 8 + pdServerCfg.FlowRoundByDigit = 6 hasLeaderOperator, hasPeerOperator := false, false for i := 0; i < 20 || !(hasLeaderOperator && hasPeerOperator); i++ { clearPendingInfluence(hb) @@ -727,7 +726,7 @@ func TestHotWriteRegionScheduleByteRateOnlyWithTiFlash(t *testing.T) { case 2: // balance by peer selected re.Equal("move-hot-write-peer", op.Desc()) - operatorutil.CheckTransferLearner(re, op, operator.OpHotRegion, 8, 0) + operatorutil.CheckTransferLearner(re, op, operator.OpHotRegion, 8, 10) hasPeerOperator = true default: re.FailNow("wrong op: " + op.String()) @@ -756,7 +755,6 @@ func TestHotWriteRegionScheduleByteRateOnlyWithTiFlash(t *testing.T) { // | 8 | n/a | <- TiFlash is always 0. // | 9 | n/a | // | 10 | n/a | - // | 11 | n/a | storesBytes := map[uint64]uint64{ 1: 7.5 * units.MiB * utils.StoreHeartBeatReportInterval, 2: 4.5 * units.MiB * utils.StoreHeartBeatReportInterval, diff --git a/pkg/utils/operatorutil/operator_check.go b/pkg/utils/operatorutil/operator_check.go index 0c512974445..b9428369109 100644 --- a/pkg/utils/operatorutil/operator_check.go +++ b/pkg/utils/operatorutil/operator_check.go @@ -95,9 +95,7 @@ func CheckTransferLearner(re *require.Assertions, op *operator.Operator, kind op re.NotNil(op) steps, _ := trimTransferLeaders(op) re.Len(steps, 2) - if targetID != 0 { - re.Equal(targetID, steps[0].(operator.AddLearner).ToStore) - } + re.Equal(targetID, steps[0].(operator.AddLearner).ToStore) re.Equal(sourceID, steps[1].(operator.RemovePeer).FromStore) kind |= operator.OpRegion re.Equal(kind, op.Kind()&kind) From f434c5c9e6d8e3c7d0ca9891b9bdc8d40105b101 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Tue, 2 Jul 2024 18:16:16 +0800 Subject: [PATCH 7/7] update Signed-off-by: lhy1024 --- pkg/schedule/schedulers/hot_region.go | 3 ++- pkg/schedule/schedulers/shuffle_hot_region.go | 8 ++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/pkg/schedule/schedulers/hot_region.go b/pkg/schedule/schedulers/hot_region.go index 9189741d01d..d20473fb010 100644 --- a/pkg/schedule/schedulers/hot_region.go +++ b/pkg/schedule/schedulers/hot_region.go @@ -121,7 +121,8 @@ type baseHotScheduler struct { // regionPendings stores regionID -> pendingInfluence, // this records regionID which have pending Operator by operation type. During filterHotPeers, the hot peers won't // be selected if its owner region is tracked in this attribute. - regionPendings map[uint64]*pendingInfluence + regionPendings map[uint64]*pendingInfluence + // types is the resource types that the scheduler considers. types []resourceType r *rand.Rand updateReadTime time.Time diff --git a/pkg/schedule/schedulers/shuffle_hot_region.go b/pkg/schedule/schedulers/shuffle_hot_region.go index 5e6600e531f..91708dc89bc 100644 --- a/pkg/schedule/schedulers/shuffle_hot_region.go +++ b/pkg/schedule/schedulers/shuffle_hot_region.go @@ -161,8 +161,12 @@ func (s *shuffleHotRegionScheduler) Schedule(cluster sche.SchedulerCluster, _ bo shuffleHotRegionCounter.Inc() typ := s.randomType() s.prepareForBalance(typ, cluster) - operators := s.randomSchedule(cluster, s.stLoadInfos[typ]) - return operators, nil + switch typ { + case readLeader, writeLeader: + return s.randomSchedule(cluster, s.stLoadInfos[typ]), nil + default: + } + return nil, nil } func (s *shuffleHotRegionScheduler) randomSchedule(cluster sche.SchedulerCluster, loadDetail map[uint64]*statistics.StoreLoadDetail) []*operator.Operator {