diff --git a/server/balancer_test.go b/server/balancer_test.go index fffb1a51871a..97532f72486d 100644 --- a/server/balancer_test.go +++ b/server/balancer_test.go @@ -119,6 +119,10 @@ func (c *testClusterInfo) updateSnapshotCount(storeID uint64, snapshotCount int) func newTestScheduleConfig() (*ScheduleConfig, *scheduleOption) { cfg := NewConfig() cfg.adjust() + cfg.ScheduleCfg.MinLeaderCount = 1 + cfg.ScheduleCfg.MinRegionCount = 1 + cfg.ScheduleCfg.LeaderScheduleInterval.Duration = 10 * time.Millisecond + cfg.ScheduleCfg.StorageScheduleInterval.Duration = 10 * time.Millisecond opt := newScheduleOption(cfg) return &cfg.ScheduleCfg, opt } @@ -219,9 +223,7 @@ func (s *testStorageBalancerSuite) TestConstraints(c *C) { cluster := newClusterInfo(newMockIDAllocator()) tc := newTestClusterInfo(cluster) - cfg, opt := newTestScheduleConfig() - cfg.MinRegionCount = 1 - cfg.MinBalanceDiffRatio = 0.01 + _, opt := newTestScheduleConfig() opt.constraints, _ = newConstraints(1, []*Constraint{ { Labels: map[string]string{"zone": "cn"}, @@ -319,9 +321,7 @@ func (s *testReplicaCheckerSuite) TestConstraints(c *C) { cluster := newClusterInfo(newMockIDAllocator()) tc := newTestClusterInfo(cluster) - cfg, opt := newTestScheduleConfig() - cfg.MinRegionCount = 1 - cfg.MinBalanceDiffRatio = 0.01 + _, opt := newTestScheduleConfig() opt.constraints, _ = newConstraints(3, []*Constraint{ { Labels: map[string]string{"zone": "us"}, diff --git a/server/cache.go b/server/cache.go index a27a1292290f..8a9f7692047e 100644 --- a/server/cache.go +++ b/server/cache.go @@ -206,19 +206,23 @@ func (r *regionsInfo) getStoreFollowerCount(storeID uint64) int { } func (r *regionsInfo) randLeaderRegion(storeID uint64) *regionInfo { - for _, region := range r.leaders[storeID] { - if region.Leader == nil { - log.Fatalf("rand leader region without leader: store %v region %v", storeID, region) - } - return region.clone() - } - return nil + return randRegion(r.leaders[storeID]) } func (r *regionsInfo) randFollowerRegion(storeID uint64) *regionInfo { - for _, region := range r.followers[storeID] { + return randRegion(r.followers[storeID]) +} + +func randRegion(regions map[uint64]*regionInfo) *regionInfo { + for _, region := range regions { if region.Leader == nil { - log.Fatalf("rand follower region without leader: store %v region %v", storeID, region) + log.Fatalf("rand region without leader: region %v", region) + } + if len(region.DownPeers) > 0 { + continue + } + if len(region.PendingPeers) > 0 { + continue } return region.clone() } diff --git a/server/cache_test.go b/server/cache_test.go index 22dfbc3ed690..33e95b479b8a 100644 --- a/server/cache_test.go +++ b/server/cache_test.go @@ -135,6 +135,19 @@ func (s *testRegionsInfoSuite) Test(c *C) { c.Assert(region.GetStorePeer(i), NotNil) } + + // All regions will be filtered out if they have pending peers. + for i := uint64(0); i < n; i++ { + for j := 0; j < cache.getStoreLeaderCount(i); j++ { + region := cache.randLeaderRegion(i) + region.PendingPeers = region.Peers + cache.setRegion(region) + } + c.Assert(cache.randLeaderRegion(i), IsNil) + } + for i := uint64(0); i < n; i++ { + c.Assert(cache.randFollowerRegion(i), IsNil) + } } func checkRegion(c *C, a *regionInfo, b *regionInfo) { diff --git a/server/command.go b/server/command.go index 7193f9ac93bf..839fc1f20620 100644 --- a/server/command.go +++ b/server/command.go @@ -176,6 +176,7 @@ func (c *conn) handleRegionHeartbeat(req *pdpb.Request) (*pdpb.Response, error) region := newRegionInfo(request.GetRegion(), request.GetLeader()) region.DownPeers = request.GetDownPeers() + region.PendingPeers = request.GetPendingPeers() if region.GetId() == 0 { return nil, errors.Errorf("invalid request region, %v", request) } diff --git a/server/coordinator_test.go b/server/coordinator_test.go index 4c155a797fd6..dc2e14894043 100644 --- a/server/coordinator_test.go +++ b/server/coordinator_test.go @@ -64,12 +64,6 @@ func (s *testCoordinatorSuite) TestSchedule(c *C) { tc := newTestClusterInfo(cluster) cfg, opt := newTestScheduleConfig() - cfg.MinRegionCount = 1 - cfg.MinLeaderCount = 1 - cfg.MinBalanceDiffRatio = 0.1 - cfg.LeaderScheduleInterval.Duration = 100 * time.Millisecond - cfg.StorageScheduleInterval.Duration = 100 * time.Millisecond - co := newCoordinator(cluster, opt) co.run() defer co.stop() @@ -140,13 +134,48 @@ func (s *testCoordinatorSuite) TestSchedule(c *C) { checkRemovePeerResp(c, resp, 4) } -func (s *testCoordinatorSuite) TestAddScheduler(c *C) { +func (s *testCoordinatorSuite) TestPeerState(c *C) { cluster := newClusterInfo(newMockIDAllocator()) tc := newTestClusterInfo(cluster) - cfg, opt := newTestScheduleConfig() - cfg.LeaderScheduleInterval.Duration = 10 * time.Millisecond + _, opt := newTestScheduleConfig() + co := newCoordinator(cluster, opt) + co.run() + defer co.stop() + + // Transfer peer from store 4 to store 1. + tc.addRegionStore(1, 1, 0.1) + tc.addRegionStore(2, 2, 0.2) + tc.addRegionStore(3, 3, 0.3) + tc.addRegionStore(4, 4, 0.4) + tc.addLeaderRegion(1, 2, 3, 4) + + // Wait for schedule. + time.Sleep(time.Second) + checkTransferPeer(c, co.getOperator(1), 4, 1) + + region := cluster.getRegion(1) + + // Add new peer. + resp := co.dispatch(region) + checkAddPeerResp(c, resp, 1) + newPeer := resp.GetChangePeer().GetPeer() + region.Peers = append(region.Peers, newPeer) + + // If the new peer is pending, the operator will not finish. + region.PendingPeers = append(region.PendingPeers, newPeer) + checkAddPeerResp(c, co.dispatch(region), 1) + // The new peer is not pending now, the operator will finish. + region.PendingPeers = nil + c.Assert(co.dispatch(region), IsNil) +} + +func (s *testCoordinatorSuite) TestAddScheduler(c *C) { + cluster := newClusterInfo(newMockIDAllocator()) + tc := newTestClusterInfo(cluster) + + _, opt := newTestScheduleConfig() co := newCoordinator(cluster, opt) co.run() defer co.stop() diff --git a/server/operator.go b/server/operator.go index ed10a9bf6159..a453303915b1 100644 --- a/server/operator.go +++ b/server/operator.go @@ -26,9 +26,14 @@ import ( ) const ( + maxOperatorWaitTime = 10 * time.Minute maxTransferLeaderWaitCount = 3 ) +var ( + errOperatorTimeout = errors.New("operator timeout") +) + var baseID uint64 type callback func(op Operator) @@ -127,6 +132,9 @@ func (bo *balanceOperator) Do(ctx *opContext, region *regionInfo) (bool, *pdpb.R if bo.Start.IsZero() { bo.Start = time.Now() } + if time.Since(bo.Start) > maxOperatorWaitTime { + return false, nil, errors.Trace(errOperatorTimeout) + } finished, res, err := bo.Ops[bo.Index].Do(ctx, region) if err != nil { @@ -216,16 +224,21 @@ func (co *changePeerOperator) String() string { // check checks whether operator already finished or not. func (co *changePeerOperator) check(region *regionInfo) (bool, error) { + peer := co.ChangePeer.GetPeer() if co.ChangePeer.GetChangeType() == raftpb.ConfChangeType_AddNode { - if region.ContainsPeer(co.ChangePeer.GetPeer().GetId()) { + if region.GetPendingPeer(peer.GetId()) != nil { + // We don't know whether the added peer can work or not. + return false, nil + } + if region.GetPeer(peer.GetId()) != nil { return true, nil } - log.Infof("balance [%s], try to add peer %s", region, co.ChangePeer.GetPeer()) + log.Infof("balance [%s], try to add peer %s", region, peer) } else if co.ChangePeer.GetChangeType() == raftpb.ConfChangeType_RemoveNode { - if !region.ContainsPeer(co.ChangePeer.GetPeer().GetId()) { + if region.GetPeer(peer.GetId()) == nil { return true, nil } - log.Infof("balance [%s], try to remove peer %s", region, co.ChangePeer.GetPeer()) + log.Infof("balance [%s], try to remove peer %s", region, peer) } return false, nil diff --git a/server/region.go b/server/region.go index c32b6815ce04..93e3a0f5de0f 100644 --- a/server/region.go +++ b/server/region.go @@ -25,8 +25,9 @@ import ( // TODO: Export this to API directly. type regionInfo struct { *metapb.Region - Leader *metapb.Peer - DownPeers []*pdpb.PeerStats + Leader *metapb.Peer + DownPeers []*pdpb.PeerStats + PendingPeers []*metapb.Peer } func newRegionInfo(region *metapb.Region, leader *metapb.Peer) *regionInfo { @@ -37,14 +38,19 @@ func newRegionInfo(region *metapb.Region, leader *metapb.Peer) *regionInfo { } func (r *regionInfo) clone() *regionInfo { - downPeers := make([]*pdpb.PeerStats, len(r.DownPeers)) + downPeers := make([]*pdpb.PeerStats, 0, len(r.DownPeers)) for _, peer := range r.DownPeers { downPeers = append(downPeers, proto.Clone(peer).(*pdpb.PeerStats)) } + pendingPeers := make([]*metapb.Peer, 0, len(r.PendingPeers)) + for _, peer := range r.PendingPeers { + pendingPeers = append(pendingPeers, proto.Clone(peer).(*metapb.Peer)) + } return ®ionInfo{ - Region: proto.Clone(r.Region).(*metapb.Region), - Leader: proto.Clone(r.Leader).(*metapb.Peer), - DownPeers: downPeers, + Region: proto.Clone(r.Region).(*metapb.Region), + Leader: proto.Clone(r.Leader).(*metapb.Peer), + DownPeers: downPeers, + PendingPeers: pendingPeers, } } @@ -57,8 +63,22 @@ func (r *regionInfo) GetPeer(peerID uint64) *metapb.Peer { return nil } -func (r *regionInfo) ContainsPeer(peerID uint64) bool { - return r.GetPeer(peerID) != nil +func (r *regionInfo) GetDownPeer(peerID uint64) *metapb.Peer { + for _, down := range r.DownPeers { + if down.GetPeer().GetId() == peerID { + return down.GetPeer() + } + } + return nil +} + +func (r *regionInfo) GetPendingPeer(peerID uint64) *metapb.Peer { + for _, peer := range r.PendingPeers { + if peer.GetId() == peerID { + return peer + } + } + return nil } func (r *regionInfo) GetStorePeer(storeID uint64) *metapb.Peer { diff --git a/server/region_test.go b/server/region_test.go index 8e09bc66a4d6..387c34618dff 100644 --- a/server/region_test.go +++ b/server/region_test.go @@ -37,23 +37,26 @@ func (s *testRegionSuite) TestRegionInfo(c *C) { } peers = append(peers, p) } - downPeers := []*pdpb.PeerStats{ - {Peer: peers[n-1], DownSeconds: new(uint64)}, - } region := &metapb.Region{ Peers: peers, } + downPeer, pendingPeer := peers[0], peers[1] + + info := newRegionInfo(region, peers[0]) + info.DownPeers = []*pdpb.PeerStats{{Peer: downPeer}} + info.PendingPeers = []*metapb.Peer{pendingPeer} - r := newRegionInfo(region, peers[0]) - r.DownPeers = downPeers - r = r.clone() + r := info.clone() + c.Assert(r, DeepEquals, info) for i := uint64(0); i < n; i++ { c.Assert(r.GetPeer(i), Equals, r.Peers[i]) - c.Assert(r.ContainsPeer(i), IsTrue) } c.Assert(r.GetPeer(n), IsNil) - c.Assert(r.ContainsPeer(n), IsFalse) + c.Assert(r.GetDownPeer(n), IsNil) + c.Assert(r.GetDownPeer(downPeer.GetId()), DeepEquals, downPeer) + c.Assert(r.GetPendingPeer(n), IsNil) + c.Assert(r.GetPendingPeer(pendingPeer.GetId()), DeepEquals, pendingPeer) for i := uint64(0); i < n; i++ { c.Assert(r.GetStorePeer(i).GetStoreId(), Equals, i)