Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: fix scheduling can not immediately start after transfer leader (#4875) #4970

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
42 changes: 24 additions & 18 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,11 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error {
return nil
}

// IsPrepared return true if the prepare checker is ready.
func (c *RaftCluster) IsPrepared() bool {
return c.prepareChecker.isPrepared()
}

var regionGuide = core.GenerateRegionGuideFunc(true)

// processRegionHeartbeat updates the region information.
Expand Down Expand Up @@ -700,7 +705,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
regionEventCounter.WithLabelValues("update_cache").Inc()
}

if isNew {
if !c.IsPrepared() && isNew {
c.prepareChecker.collect(region)
}

Expand Down Expand Up @@ -755,13 +760,6 @@ func (c *RaftCluster) updateStoreStatusLocked(id uint64) {
c.core.UpdateStoreStatus(id, leaderCount, regionCount, pendingPeerCount, leaderRegionSize, regionSize)
}

//nolint:unused
func (c *RaftCluster) getClusterID() uint64 {
c.RLock()
defer c.RUnlock()
return c.meta.GetId()
}

func (c *RaftCluster) putMetaLocked(meta *metapb.Cluster) error {
if c.storage != nil {
if err := c.storage.SaveMeta(meta); err != nil {
Expand Down Expand Up @@ -1495,13 +1493,6 @@ func (c *RaftCluster) GetComponentManager() *component.Manager {
return c.componentManager
}

// isPrepared if the cluster information is collected
func (c *RaftCluster) isPrepared() bool {
c.RLock()
defer c.RUnlock()
return c.prepareChecker.check(c)
}

// GetStoresLoads returns load stats of all stores.
func (c *RaftCluster) GetStoresLoads() map[uint64][]float64 {
c.RLock()
Expand Down Expand Up @@ -1556,10 +1547,11 @@ func (c *RaftCluster) GetRegionLabeler() *labeler.RegionLabeler {
}

type prepareChecker struct {
sync.RWMutex
reactiveRegions map[uint64]int
start time.Time
sum int
isPrepared bool
prepared bool
}

func newPrepareChecker() *prepareChecker {
Expand All @@ -1571,7 +1563,13 @@ func newPrepareChecker() *prepareChecker {

// Before starting up the scheduler, we need to take the proportion of the regions on each store into consideration.
func (checker *prepareChecker) check(c *RaftCluster) bool {
if checker.isPrepared || time.Since(checker.start) > collectTimeout {
checker.Lock()
defer checker.Unlock()
if checker.prepared {
return true
}
if time.Since(checker.start) > collectTimeout {
checker.prepared = true
return true
}
// The number of active regions should be more than total region of all stores * collectFactor
Expand All @@ -1588,17 +1586,25 @@ func (checker *prepareChecker) check(c *RaftCluster) bool {
return false
}
}
checker.isPrepared = true
checker.prepared = true
return true
}

func (checker *prepareChecker) collect(region *core.RegionInfo) {
checker.Lock()
defer checker.Unlock()
for _, p := range region.GetPeers() {
checker.reactiveRegions[p.GetStoreId()]++
}
checker.sum++
}

func (checker *prepareChecker) isPrepared() bool {
checker.RLock()
defer checker.RUnlock()
return checker.prepared
}

// GetHotWriteRegions gets hot write regions' info.
func (c *RaftCluster) GetHotWriteRegions(storeIDs ...uint64) *statistics.StoreHotPeersInfos {
c.RLock()
Expand Down
5 changes: 4 additions & 1 deletion server/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,9 @@ func (c *coordinator) drivePushOperator() {

func (c *coordinator) run() {
ticker := time.NewTicker(runSchedulerCheckInterval)
failpoint.Inject("changeCoordinatorTicker", func() {
ticker = time.NewTicker(100 * time.Millisecond)
})
defer ticker.Stop()
log.Info("coordinator starts to collect cluster information")
for {
Expand Down Expand Up @@ -617,7 +620,7 @@ func (c *coordinator) resetHotSpotMetrics() {
}

func (c *coordinator) shouldRun() bool {
return c.cluster.isPrepared()
return c.cluster.prepareChecker.check(c.cluster)
}

func (c *coordinator) addScheduler(scheduler schedule.Scheduler, args ...string) error {
Expand Down
5 changes: 2 additions & 3 deletions server/cluster/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,8 @@ func (s *testCoordinatorSuite) TestBasic(c *C) {
}

func (s *testCoordinatorSuite) TestDispatch(c *C) {
tc, co, cleanup := prepare(nil, func(tc *testCluster) { tc.prepareChecker.isPrepared = true }, nil, c)
tc, co, cleanup := prepare(nil, func(tc *testCluster) { tc.prepareChecker.prepared = true }, nil, c)
defer cleanup()

// Transfer peer from store 4 to store 1.
c.Assert(tc.addRegionStore(4, 40), IsNil)
c.Assert(tc.addRegionStore(3, 30), IsNil)
Expand Down Expand Up @@ -285,7 +284,7 @@ func prepare(setCfg func(*config.ScheduleConfig), setTc func(*testCluster), run
setCfg(cfg)
}
tc := newTestCluster(ctx, opt)
hbStreams := hbstream.NewTestHeartbeatStreams(ctx, tc.getClusterID(), tc, true /* need to run */)
hbStreams := hbstream.NewTestHeartbeatStreams(ctx, tc.meta.GetId(), tc, true /* need to run */)
if setTc != nil {
setTc(tc)
}
Expand Down
9 changes: 9 additions & 0 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type RegionInfo struct {
replicationStatus *replication_modepb.RegionReplicationStatus
QueryStats *pdpb.QueryStats
flowRoundDivisor uint64
fromHeartbeat bool
}

// NewRegionInfo creates RegionInfo with region's meta and leader peer.
Expand Down Expand Up @@ -493,6 +494,11 @@ func (r *RegionInfo) GetReplicationStatus() *replication_modepb.RegionReplicatio
return r.replicationStatus
}

// IsFromHeartbeat returns whether the region info is from the region heartbeat.
func (r *RegionInfo) IsFromHeartbeat() bool {
return r.fromHeartbeat
}

// RegionGuideFunc is a function that determines which follow-up operations need to be performed based on the origin
// and new region information.
type RegionGuideFunc func(region, origin *RegionInfo) (isNew, saveKV, saveCache, needSync bool)
Expand Down Expand Up @@ -577,6 +583,9 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
region.GetReplicationStatus().GetStateId() != origin.GetReplicationStatus().GetStateId()) {
saveCache = true
}
if !origin.IsFromHeartbeat() {
isNew = true
}
}
return
}
Expand Down
7 changes: 7 additions & 0 deletions server/core/region_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,3 +315,10 @@ func WithInterval(interval *pdpb.TimeInterval) RegionCreateOption {
region.interval = interval
}
}

// SetFromHeartbeat sets if the region info comes from the region heartbeat.
func SetFromHeartbeat(fromHeartbeat bool) RegionCreateOption {
return func(region *RegionInfo) {
region.fromHeartbeat = fromHeartbeat
}
}
2 changes: 1 addition & 1 deletion server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error
lastBind = time.Now()
}

region := core.RegionFromHeartbeat(request, flowRoundOption)
region := core.RegionFromHeartbeat(request, flowRoundOption, core.SetFromHeartbeat(true))
if region.GetLeader() == nil {
log.Error("invalid request, the leader is nil", zap.Reflect("request", request), errs.ZapError(errs.ErrLeaderNil))
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "invalid-leader").Inc()
Expand Down
3 changes: 2 additions & 1 deletion server/region_syncer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,10 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
core.SetWrittenKeys(stats[i].KeysWritten),
core.SetReadBytes(stats[i].BytesRead),
core.SetReadKeys(stats[i].KeysRead),
core.SetFromHeartbeat(false),
)
} else {
region = core.NewRegionInfo(r, regionLeader)
region = core.NewRegionInfo(r, regionLeader, core.SetFromHeartbeat(false))
}

origin, err := bc.PreCheckPutRegion(region)
Expand Down
45 changes: 45 additions & 0 deletions tests/server/region_syncer/region_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,51 @@ func (s *regionSyncerTestSuite) TestFullSyncWithAddMember(c *C) {
c.Assert(loadRegions, HasLen, regionLen)
}

func (s *regionSyncerTestSuite) TestPrepareChecker(c *C) {
c.Assert(failpoint.Enable("github.com/tikv/pd/server/cluster/changeCoordinatorTicker", `return(true)`), IsNil)
defer failpoint.Disable("github.com/tikv/pd/server/cluster/changeCoordinatorTicker")
cluster, err := tests.NewTestCluster(s.ctx, 1, func(conf *config.Config, serverName string) { conf.PDServerCfg.UseRegionStorage = true })
defer cluster.Destroy()
c.Assert(err, IsNil)

err = cluster.RunInitialServers()
c.Assert(err, IsNil)
cluster.WaitLeader()
leaderServer := cluster.GetServer(cluster.GetLeader())
c.Assert(leaderServer.BootstrapCluster(), IsNil)
rc := leaderServer.GetServer().GetRaftCluster()
c.Assert(rc, NotNil)
regionLen := 110
regions := initRegions(regionLen)
for _, region := range regions {
err = rc.HandleRegionHeartbeat(region)
c.Assert(err, IsNil)
}

// ensure flush to region storage
time.Sleep(3 * time.Second)
c.Assert(leaderServer.GetRaftCluster().IsPrepared(), IsTrue)

// join new PD
pd2, err := cluster.Join(s.ctx)
c.Assert(err, IsNil)
err = pd2.Run()
c.Assert(err, IsNil)
// waiting for synchronization to complete
time.Sleep(3 * time.Second)
err = cluster.ResignLeader()
c.Assert(err, IsNil)
c.Assert(cluster.WaitLeader(), Equals, "pd2")
leaderServer = cluster.GetServer(cluster.GetLeader())
rc = leaderServer.GetServer().GetRaftCluster()
for _, region := range regions {
err = rc.HandleRegionHeartbeat(region)
c.Assert(err, IsNil)
}
time.Sleep(time.Second)
c.Assert(rc.IsPrepared(), IsTrue)
}

func initRegions(regionLen int) []*core.RegionInfo {
allocator := &idAllocator{allocator: mockid.NewIDAllocator()}
regions := make([]*core.RegionInfo, 0, regionLen)
Expand Down