Skip to content

Commit

Permalink
transfer leader before removing peer.
Browse files Browse the repository at this point in the history
  • Loading branch information
disksing committed Mar 22, 2017
1 parent 7b66ebd commit e56b1a6
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 17 deletions.
9 changes: 8 additions & 1 deletion server/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,14 @@ func checkRemovePeer(c *C, bop Operator, storeID uint64) {
case *changePeerOperator:
op = bop.(*changePeerOperator)
case *regionOperator:
op = bop.(*regionOperator).Ops[0].(*changePeerOperator)
regionOp := bop.(*regionOperator)
if len(regionOp.Ops) == 1 {
op = regionOp.Ops[0].(*changePeerOperator)
} else {
transferLeader := regionOp.Ops[0].(*transferLeaderOperator)
c.Assert(transferLeader.OldLeader.GetStoreId(), Equals, storeID)
op = bop.(*regionOperator).Ops[1].(*changePeerOperator)
}
}
c.Assert(op.ChangePeer.GetChangeType(), Equals, raftpb.ConfChangeType_RemoveNode)
c.Assert(op.ChangePeer.GetPeer().GetStoreId(), Equals, storeID)
Expand Down
49 changes: 33 additions & 16 deletions server/cluster_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func splitRegion(c *C, old *metapb.Region, splitKey []byte, newRegionID uint64,
return newRegion
}

func heartbeatRegion(c *C, conn net.Conn, clusterID uint64, msgID uint64, region *metapb.Region, leader *metapb.Peer) *pdpb.ChangePeer {
func heartbeatRegion(c *C, conn net.Conn, clusterID uint64, msgID uint64, region *metapb.Region, leader *metapb.Peer) *pdpb.RegionHeartbeatResponse {
req := &pdpb.Request{
Header: newRequestHeader(clusterID),
CmdType: pdpb.CommandType_RegionHeartbeat,
Expand All @@ -302,7 +302,7 @@ func heartbeatRegion(c *C, conn net.Conn, clusterID uint64, msgID uint64, region
sendRequest(c, conn, msgID, req)
_, resp := recvResponse(c, conn)
c.Assert(resp.GetCmdType(), Equals, pdpb.CommandType_RegionHeartbeat)
return resp.GetRegionHeartbeat().GetChangePeer()
return resp.GetRegionHeartbeat()
}

func (s *testClusterWorkerSuite) heartbeatStore(c *C, conn net.Conn, msgID uint64, stats *pdpb.StoreStats) *pdpb.StoreHeartbeatResponse {
Expand Down Expand Up @@ -369,7 +369,8 @@ func (s *testClusterWorkerSuite) TestHeartbeatSplit(c *C) {
leaderPeer1 := s.chooseRegionLeader(c, r1)

resp := heartbeatRegion(c, conn, s.clusterID, 0, r1, leaderPeer1)
c.Assert(resp, IsNil)
c.Assert(resp.GetChangePeer(), IsNil)
c.Assert(resp.GetTransferLeader(), IsNil)
checkSearchRegions(c, cluster, []byte{})

mustGetRegion(c, cluster, []byte("a"), r1)
Expand All @@ -378,7 +379,8 @@ func (s *testClusterWorkerSuite) TestHeartbeatSplit(c *C) {

leaderPeer2 := s.chooseRegionLeader(c, r2)
resp = heartbeatRegion(c, conn, s.clusterID, 0, r2, leaderPeer2)
c.Assert(resp, IsNil)
c.Assert(resp.GetChangePeer(), IsNil)
c.Assert(resp.GetTransferLeader(), IsNil)
checkSearchRegions(c, cluster, []byte{}, []byte("m"))

mustGetRegion(c, cluster, []byte("z"), r2)
Expand All @@ -390,7 +392,8 @@ func (s *testClusterWorkerSuite) TestHeartbeatSplit(c *C) {
leaderPeer3 := s.chooseRegionLeader(c, r3)

resp = heartbeatRegion(c, conn, s.clusterID, 0, r3, leaderPeer3)
c.Assert(resp, IsNil)
c.Assert(resp.GetChangePeer(), IsNil)
c.Assert(resp.GetTransferLeader(), IsNil)
checkSearchRegions(c, cluster, []byte{}, []byte("q"))

mustGetRegion(c, cluster, []byte("z"), r3)
Expand All @@ -399,7 +402,8 @@ func (s *testClusterWorkerSuite) TestHeartbeatSplit(c *C) {
mustGetRegion(c, cluster, []byte("n"), nil)

resp = heartbeatRegion(c, conn, s.clusterID, 0, r2, leaderPeer2)
c.Assert(resp, IsNil)
c.Assert(resp.GetChangePeer(), IsNil)
c.Assert(resp.GetTransferLeader(), IsNil)
checkSearchRegions(c, cluster, []byte{}, []byte("m"), []byte("q"))

mustGetRegion(c, cluster, []byte("n"), r2)
Expand Down Expand Up @@ -427,18 +431,19 @@ func (s *testClusterWorkerSuite) TestHeartbeatSplit2(c *C) {
// Add Peers util all stores are used up.
for {
resp := heartbeatRegion(c, conn, s.clusterID, 0, r1, leaderPeer)
if resp == nil {
if resp.GetChangePeer() == nil {
break
}
s.checkChangePeerRes(c, resp, raftpb.ConfChangeType_AddNode, r1)
s.checkChangePeerRes(c, resp.GetChangePeer(), raftpb.ConfChangeType_AddNode, r1)
}

// Split.
r2ID, r2PeerIDs := s.askSplit(c, conn, 0, r1)
r2 := splitRegion(c, r1, []byte("m"), r2ID, r2PeerIDs)
leaderPeer2 := s.chooseRegionLeader(c, r2)
resp := heartbeatRegion(c, conn, s.clusterID, 0, r2, leaderPeer2)
c.Assert(resp, IsNil)
c.Assert(resp.GetChangePeer(), IsNil)
c.Assert(resp.GetTransferLeader(), IsNil)

mustGetRegion(c, cluster, []byte("m"), r2)
}
Expand Down Expand Up @@ -472,7 +477,7 @@ func (s *testClusterWorkerSuite) TestHeartbeatChangePeer(c *C) {
for i := 0; i < 4; i++ {
resp := heartbeatRegion(c, conn, s.clusterID, 0, region, leaderPeer)
// Check RegionHeartbeat response.
s.checkChangePeerRes(c, resp, raftpb.ConfChangeType_AddNode, region)
s.checkChangePeerRes(c, resp.GetChangePeer(), raftpb.ConfChangeType_AddNode, region)
c.Logf("[add peer][region]:%v", region)

// Update region epoch and check region info.
Expand All @@ -488,17 +493,27 @@ func (s *testClusterWorkerSuite) TestHeartbeatChangePeer(c *C) {
opt.SetMaxReplicas(3)

// Remove 2 peers
for i := 0; i < 2; i++ {
peerCount := 5
for {
resp := heartbeatRegion(c, conn, s.clusterID, 0, region, leaderPeer)
if resp.GetTransferLeader() != nil {
leaderPeer = resp.GetTransferLeader().GetPeer()
continue
}
if resp.GetChangePeer() == nil {
break
}

// Check RegionHeartbeat response.
s.checkChangePeerRes(c, resp, raftpb.ConfChangeType_RemoveNode, region)
s.checkChangePeerRes(c, resp.GetChangePeer(), raftpb.ConfChangeType_RemoveNode, region)

// Update region epoch and check region info.
region.RegionEpoch.ConfVer = region.GetRegionEpoch().GetConfVer() + 1
heartbeatRegion(c, conn, s.clusterID, 0, region, leaderPeer)

// Check region peer count.
region = s.checkRegionPeerCount(c, regionKey, 4-i)
peerCount--
region = s.checkRegionPeerCount(c, regionKey, peerCount)
}

region = s.checkRegionPeerCount(c, regionKey, 3)
Expand All @@ -521,22 +536,24 @@ func (s *testClusterWorkerSuite) TestHeartbeatSplitAddPeer(c *C) {
// First sync, pd-server will return a AddPeer.
resp := heartbeatRegion(c, conn, s.clusterID, 0, r1, leaderPeer1)
// Apply the AddPeer ConfChange, but with no sync.
s.checkChangePeerRes(c, resp, raftpb.ConfChangeType_AddNode, r1)
s.checkChangePeerRes(c, resp.GetChangePeer(), raftpb.ConfChangeType_AddNode, r1)
// Split 1 to 1: [nil, m) 2: [m, nil).
r2ID, r2PeerIDs := s.askSplit(c, conn, 0, r1)
r2 := splitRegion(c, r1, []byte("m"), r2ID, r2PeerIDs)

// Sync r1 with both ConfVer and Version updated.
resp = heartbeatRegion(c, conn, s.clusterID, 0, r1, leaderPeer1)
c.Assert(resp, IsNil)
c.Assert(resp.GetChangePeer(), IsNil)
c.Assert(resp.GetTransferLeader(), IsNil)

mustGetRegion(c, cluster, []byte("a"), r1)
mustGetRegion(c, cluster, []byte("z"), nil)

// Sync r2.
leaderPeer2 := s.chooseRegionLeader(c, r2)
resp = heartbeatRegion(c, conn, s.clusterID, 0, r2, leaderPeer2)
c.Assert(resp, IsNil)
c.Assert(resp.GetChangePeer(), IsNil)
c.Assert(resp.GetTransferLeader(), IsNil)
}

func (s *testClusterWorkerSuite) TestStoreHeartbeat(c *C) {
Expand Down
9 changes: 9 additions & 0 deletions server/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,15 @@ func newAddPeer(region *regionInfo, peer *metapb.Peer) Operator {

func newRemovePeer(region *regionInfo, peer *metapb.Peer) Operator {
removePeer := newRemovePeerOperator(region.GetId(), peer)
if region.Leader != nil && region.Leader.GetId() == peer.GetId() {
for _, newLeader := range region.Peers {
if newLeader.GetId() != region.Leader.GetId() {
transferLeader := newTransferLeaderOperator(region.GetId(), region.Leader, newLeader)
return newRegionOperator(region, regionKind, transferLeader, removePeer)
}
}
return nil
}
return newRegionOperator(region, regionKind, removePeer)
}

Expand Down

0 comments on commit e56b1a6

Please sign in to comment.