Skip to content

Commit

Permalink
Merge cbbfc66 into 2202e99
Browse files Browse the repository at this point in the history
  • Loading branch information
huachaohuang committed Dec 13, 2016
2 parents 2202e99 + cbbfc66 commit 8935033
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 44 deletions.
12 changes: 6 additions & 6 deletions server/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ func (c *testClusterInfo) updateSnapshotCount(storeID uint64, snapshotCount int)
func newTestScheduleConfig() (*ScheduleConfig, *scheduleOption) {
cfg := NewConfig()
cfg.adjust()
cfg.ScheduleCfg.MinLeaderCount = 1
cfg.ScheduleCfg.MinRegionCount = 1
cfg.ScheduleCfg.LeaderScheduleInterval.Duration = 10 * time.Millisecond
cfg.ScheduleCfg.StorageScheduleInterval.Duration = 10 * time.Millisecond
opt := newScheduleOption(cfg)
return &cfg.ScheduleCfg, opt
}
Expand Down Expand Up @@ -219,9 +223,7 @@ func (s *testStorageBalancerSuite) TestConstraints(c *C) {
cluster := newClusterInfo(newMockIDAllocator())
tc := newTestClusterInfo(cluster)

cfg, opt := newTestScheduleConfig()
cfg.MinRegionCount = 1
cfg.MinBalanceDiffRatio = 0.01
_, opt := newTestScheduleConfig()
opt.constraints, _ = newConstraints(1, []*Constraint{
{
Labels: map[string]string{"zone": "cn"},
Expand Down Expand Up @@ -319,9 +321,7 @@ func (s *testReplicaCheckerSuite) TestConstraints(c *C) {
cluster := newClusterInfo(newMockIDAllocator())
tc := newTestClusterInfo(cluster)

cfg, opt := newTestScheduleConfig()
cfg.MinRegionCount = 1
cfg.MinBalanceDiffRatio = 0.01
_, opt := newTestScheduleConfig()
opt.constraints, _ = newConstraints(3, []*Constraint{
{
Labels: map[string]string{"zone": "us"},
Expand Down
22 changes: 13 additions & 9 deletions server/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,19 +206,23 @@ func (r *regionsInfo) getStoreFollowerCount(storeID uint64) int {
}

func (r *regionsInfo) randLeaderRegion(storeID uint64) *regionInfo {
for _, region := range r.leaders[storeID] {
if region.Leader == nil {
log.Fatalf("rand leader region without leader: store %v region %v", storeID, region)
}
return region.clone()
}
return nil
return randRegion(r.leaders[storeID])
}

func (r *regionsInfo) randFollowerRegion(storeID uint64) *regionInfo {
for _, region := range r.followers[storeID] {
return randRegion(r.followers[storeID])
}

func randRegion(regions map[uint64]*regionInfo) *regionInfo {
for _, region := range regions {
if region.Leader == nil {
log.Fatalf("rand follower region without leader: store %v region %v", storeID, region)
log.Fatalf("rand region without leader: region %v", region)
}
if len(region.DownPeers) > 0 {
continue
}
if len(region.PendingPeers) > 0 {
continue
}
return region.clone()
}
Expand Down
13 changes: 13 additions & 0 deletions server/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,19 @@ func (s *testRegionsInfoSuite) Test(c *C) {

c.Assert(region.GetStorePeer(i), NotNil)
}

// All regions will be filtered out if they have pending peers.
for i := uint64(0); i < n; i++ {
for j := 0; j < cache.getStoreLeaderCount(i); j++ {
region := cache.randLeaderRegion(i)
region.PendingPeers = region.Peers
cache.setRegion(region)
}
c.Assert(cache.randLeaderRegion(i), IsNil)
}
for i := uint64(0); i < n; i++ {
c.Assert(cache.randFollowerRegion(i), IsNil)
}
}

func checkRegion(c *C, a *regionInfo, b *regionInfo) {
Expand Down
1 change: 1 addition & 0 deletions server/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ func (c *conn) handleRegionHeartbeat(req *pdpb.Request) (*pdpb.Response, error)

region := newRegionInfo(request.GetRegion(), request.GetLeader())
region.DownPeers = request.GetDownPeers()
region.PendingPeers = request.GetPendingPeers()
if region.GetId() == 0 {
return nil, errors.Errorf("invalid request region, %v", request)
}
Expand Down
47 changes: 38 additions & 9 deletions server/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,6 @@ func (s *testCoordinatorSuite) TestSchedule(c *C) {
tc := newTestClusterInfo(cluster)

cfg, opt := newTestScheduleConfig()
cfg.MinRegionCount = 1
cfg.MinLeaderCount = 1
cfg.MinBalanceDiffRatio = 0.1
cfg.LeaderScheduleInterval.Duration = 100 * time.Millisecond
cfg.StorageScheduleInterval.Duration = 100 * time.Millisecond

co := newCoordinator(cluster, opt)
co.run()
defer co.stop()
Expand Down Expand Up @@ -140,13 +134,48 @@ func (s *testCoordinatorSuite) TestSchedule(c *C) {
checkRemovePeerResp(c, resp, 4)
}

func (s *testCoordinatorSuite) TestAddScheduler(c *C) {
func (s *testCoordinatorSuite) TestPeerState(c *C) {
cluster := newClusterInfo(newMockIDAllocator())
tc := newTestClusterInfo(cluster)

cfg, opt := newTestScheduleConfig()
cfg.LeaderScheduleInterval.Duration = 10 * time.Millisecond
_, opt := newTestScheduleConfig()
co := newCoordinator(cluster, opt)
co.run()
defer co.stop()

// Transfer peer from store 4 to store 1.
tc.addRegionStore(1, 1, 0.1)
tc.addRegionStore(2, 2, 0.2)
tc.addRegionStore(3, 3, 0.3)
tc.addRegionStore(4, 4, 0.4)
tc.addLeaderRegion(1, 2, 3, 4)

// Wait for schedule.
time.Sleep(time.Second)
checkTransferPeer(c, co.getOperator(1), 4, 1)

region := cluster.getRegion(1)

// Add new peer.
resp := co.dispatch(region)
checkAddPeerResp(c, resp, 1)
newPeer := resp.GetChangePeer().GetPeer()
region.Peers = append(region.Peers, newPeer)

// If the new peer is pending, the operator will not finish.
region.PendingPeers = append(region.PendingPeers, newPeer)
checkAddPeerResp(c, co.dispatch(region), 1)

// The new peer is not pending now, the operator will finish.
region.PendingPeers = nil
c.Assert(co.dispatch(region), IsNil)
}

func (s *testCoordinatorSuite) TestAddScheduler(c *C) {
cluster := newClusterInfo(newMockIDAllocator())
tc := newTestClusterInfo(cluster)

_, opt := newTestScheduleConfig()
co := newCoordinator(cluster, opt)
co.run()
defer co.stop()
Expand Down
21 changes: 17 additions & 4 deletions server/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,14 @@ import (
)

const (
maxOperatorWaitTime = 10 * time.Minute
maxTransferLeaderWaitCount = 3
)

var (
errOperatorTimeout = errors.New("operator timeout")
)

var baseID uint64

type callback func(op Operator)
Expand Down Expand Up @@ -127,6 +132,9 @@ func (bo *balanceOperator) Do(ctx *opContext, region *regionInfo) (bool, *pdpb.R
if bo.Start.IsZero() {
bo.Start = time.Now()
}
if time.Since(bo.Start) > maxOperatorWaitTime {
return false, nil, errors.Trace(errOperatorTimeout)
}

finished, res, err := bo.Ops[bo.Index].Do(ctx, region)
if err != nil {
Expand Down Expand Up @@ -216,16 +224,21 @@ func (co *changePeerOperator) String() string {

// check checks whether operator already finished or not.
func (co *changePeerOperator) check(region *regionInfo) (bool, error) {
peer := co.ChangePeer.GetPeer()
if co.ChangePeer.GetChangeType() == raftpb.ConfChangeType_AddNode {
if region.ContainsPeer(co.ChangePeer.GetPeer().GetId()) {
if region.GetPendingPeer(peer.GetId()) != nil {
// We don't know whether the added peer can work or not.
return false, nil
}
if region.GetPeer(peer.GetId()) != nil {
return true, nil
}
log.Infof("balance [%s], try to add peer %s", region, co.ChangePeer.GetPeer())
log.Infof("balance [%s], try to add peer %s", region, peer)
} else if co.ChangePeer.GetChangeType() == raftpb.ConfChangeType_RemoveNode {
if !region.ContainsPeer(co.ChangePeer.GetPeer().GetId()) {
if region.GetPeer(peer.GetId()) == nil {
return true, nil
}
log.Infof("balance [%s], try to remove peer %s", region, co.ChangePeer.GetPeer())
log.Infof("balance [%s], try to remove peer %s", region, peer)
}

return false, nil
Expand Down
36 changes: 28 additions & 8 deletions server/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ import (
// TODO: Export this to API directly.
type regionInfo struct {
*metapb.Region
Leader *metapb.Peer
DownPeers []*pdpb.PeerStats
Leader *metapb.Peer
DownPeers []*pdpb.PeerStats
PendingPeers []*metapb.Peer
}

func newRegionInfo(region *metapb.Region, leader *metapb.Peer) *regionInfo {
Expand All @@ -37,14 +38,19 @@ func newRegionInfo(region *metapb.Region, leader *metapb.Peer) *regionInfo {
}

func (r *regionInfo) clone() *regionInfo {
downPeers := make([]*pdpb.PeerStats, len(r.DownPeers))
downPeers := make([]*pdpb.PeerStats, 0, len(r.DownPeers))
for _, peer := range r.DownPeers {
downPeers = append(downPeers, proto.Clone(peer).(*pdpb.PeerStats))
}
pendingPeers := make([]*metapb.Peer, 0, len(r.PendingPeers))
for _, peer := range r.PendingPeers {
pendingPeers = append(pendingPeers, proto.Clone(peer).(*metapb.Peer))
}
return &regionInfo{
Region: proto.Clone(r.Region).(*metapb.Region),
Leader: proto.Clone(r.Leader).(*metapb.Peer),
DownPeers: downPeers,
Region: proto.Clone(r.Region).(*metapb.Region),
Leader: proto.Clone(r.Leader).(*metapb.Peer),
DownPeers: downPeers,
PendingPeers: pendingPeers,
}
}

Expand All @@ -57,8 +63,22 @@ func (r *regionInfo) GetPeer(peerID uint64) *metapb.Peer {
return nil
}

func (r *regionInfo) ContainsPeer(peerID uint64) bool {
return r.GetPeer(peerID) != nil
func (r *regionInfo) GetDownPeer(peerID uint64) *metapb.Peer {
for _, down := range r.DownPeers {
if down.GetPeer().GetId() == peerID {
return down.GetPeer()
}
}
return nil
}

func (r *regionInfo) GetPendingPeer(peerID uint64) *metapb.Peer {
for _, peer := range r.PendingPeers {
if peer.GetId() == peerID {
return peer
}
}
return nil
}

func (r *regionInfo) GetStorePeer(storeID uint64) *metapb.Peer {
Expand Down
19 changes: 11 additions & 8 deletions server/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,26 @@ func (s *testRegionSuite) TestRegionInfo(c *C) {
}
peers = append(peers, p)
}
downPeers := []*pdpb.PeerStats{
{Peer: peers[n-1], DownSeconds: new(uint64)},
}
region := &metapb.Region{
Peers: peers,
}
downPeer, pendingPeer := peers[0], peers[1]

info := newRegionInfo(region, peers[0])
info.DownPeers = []*pdpb.PeerStats{{Peer: downPeer}}
info.PendingPeers = []*metapb.Peer{pendingPeer}

r := newRegionInfo(region, peers[0])
r.DownPeers = downPeers
r = r.clone()
r := info.clone()
c.Assert(r, DeepEquals, info)

for i := uint64(0); i < n; i++ {
c.Assert(r.GetPeer(i), Equals, r.Peers[i])
c.Assert(r.ContainsPeer(i), IsTrue)
}
c.Assert(r.GetPeer(n), IsNil)
c.Assert(r.ContainsPeer(n), IsFalse)
c.Assert(r.GetDownPeer(n), IsNil)
c.Assert(r.GetDownPeer(downPeer.GetId()), DeepEquals, downPeer)
c.Assert(r.GetPendingPeer(n), IsNil)
c.Assert(r.GetPendingPeer(pendingPeer.GetId()), DeepEquals, pendingPeer)

for i := uint64(0); i < n; i++ {
c.Assert(r.GetStorePeer(i).GetStoreId(), Equals, i)
Expand Down

0 comments on commit 8935033

Please sign in to comment.