Skip to content

Commit

Permalink
server/constraints: add replication constraints (#402)
Browse files Browse the repository at this point in the history
  • Loading branch information
huachaohuang committed Dec 7, 2016
1 parent 8a2fc58 commit 247fa4f
Show file tree
Hide file tree
Showing 14 changed files with 768 additions and 73 deletions.
83 changes: 46 additions & 37 deletions server/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (s *storageBalancer) GetResourceKind() ResourceKind {
}

func (s *storageBalancer) Schedule(cluster *clusterInfo) *balanceOperator {
region, source, target := scheduleStorage(cluster, s.selector)
region, source, target := scheduleStorage(cluster, s.opt, s.selector)
if region == nil {
return nil
}
Expand Down Expand Up @@ -130,24 +130,55 @@ func newReplicaChecker(cluster *clusterInfo, opt *scheduleOption) *replicaChecke
}

func (r *replicaChecker) Check(region *regionInfo) *balanceOperator {
var stores []*storeInfo

// Filter bad stores.
badPeers := r.collectBadPeers(region)
peerCount := len(region.GetPeers())
maxPeerCount := int(r.cluster.getMeta().GetMaxPeerCount())
for _, store := range r.cluster.getRegionStores(region) {
if _, ok := badPeers[store.GetId()]; !ok {
stores = append(stores, store)
}
}

if peerCount-len(badPeers) < maxPeerCount {
return r.addPeer(region)
constraints := r.opt.GetConstraints()

// Make sure all constraints will be satisfied.
result := constraints.Match(stores)
for _, matched := range result.constraints {
if len(matched.stores) < matched.constraint.Replicas {
if op := r.addPeer(region, matched.constraint); op != nil {
return op
}
}
}
if peerCount > maxPeerCount {
return r.removePeer(region, badPeers)
if len(stores) < constraints.MaxReplicas {
// No matter whether we can satisfy all constraints or not,
// we should at least ensure that the region has enough replicas.
return r.addPeer(region, nil)
}

// Now we can remove bad peers.
for _, peer := range badPeers {
return r.removePeer(region, peer)
}

// Now we have redundant replicas, we can remove unmatched peers.
if len(stores) > constraints.MaxReplicas {
for _, store := range stores {
if _, ok := result.stores[store.GetId()]; !ok {
return r.removePeer(region, region.GetStorePeer(store.GetId()))
}
}
}

return nil
}

func (r *replicaChecker) addPeer(region *regionInfo) *balanceOperator {
func (r *replicaChecker) addPeer(region *regionInfo, constraint *Constraint) *balanceOperator {
stores := r.cluster.getStores()

filter := newExcludedFilter(nil, region.GetStoreIds())
target := r.selector.SelectTarget(stores, filter)
excluded := newExcludedFilter(nil, region.GetStoreIds())
target := r.selector.SelectTarget(stores, excluded, newConstraintFilter(nil, constraint))
if target == nil {
return nil
}
Expand All @@ -162,41 +193,19 @@ func (r *replicaChecker) addPeer(region *regionInfo) *balanceOperator {
return newBalanceOperator(region, replicaOP, newOnceOperator(addPeer))
}

func (r *replicaChecker) removePeer(region *regionInfo, badPeers []*metapb.Peer) *balanceOperator {
var peer *metapb.Peer

if len(badPeers) >= 1 {
peer = badPeers[0]
} else {
stores := r.cluster.getFollowerStores(region)
source := r.selector.SelectSource(stores)
if source != nil {
peer = region.GetStorePeer(source.GetId())
}
}
if peer == nil {
return nil
}

func (r *replicaChecker) removePeer(region *regionInfo, peer *metapb.Peer) *balanceOperator {
removePeer := newRemovePeerOperator(region.GetId(), peer)
return newBalanceOperator(region, replicaOP, newOnceOperator(removePeer))
}

func (r *replicaChecker) collectBadPeers(region *regionInfo) []*metapb.Peer {
downPeers := r.collectDownPeers(region)

var badPeers []*metapb.Peer
func (r *replicaChecker) collectBadPeers(region *regionInfo) map[uint64]*metapb.Peer {
badPeers := r.collectDownPeers(region)
for _, peer := range region.GetPeers() {
if _, ok := downPeers[peer.GetId()]; ok {
badPeers = append(badPeers, peer)
continue
}
store := r.cluster.getStore(peer.GetStoreId())
if store == nil || !store.isUp() {
badPeers = append(badPeers, peer)
badPeers[peer.GetStoreId()] = peer
}
}

return badPeers
}

Expand All @@ -208,7 +217,7 @@ func (r *replicaChecker) collectDownPeers(region *regionInfo) map[uint64]*metapb
continue
}
if stats.GetDownSeconds() > uint64(r.opt.GetMaxStoreDownTime().Seconds()) {
downPeers[peer.GetId()] = peer
downPeers[peer.GetStoreId()] = peer
}
}
return downPeers
Expand Down
124 changes: 121 additions & 3 deletions server/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,15 @@ func (c *testClusterInfo) addRegionStore(storeID uint64, regionCount int, storag
c.putStore(store)
}

func (c *testClusterInfo) addLabelsStore(storeID uint64, regionCount int, storageRatio float64, labels map[string]string) {
c.addRegionStore(storeID, regionCount, storageRatio)
store := c.getStore(storeID)
for k, v := range labels {
store.Labels = append(store.Labels, &metapb.StoreLabel{Key: k, Value: v})
}
c.putStore(store)
}

func (c *testClusterInfo) addLeaderRegion(regionID uint64, leaderID uint64, followerIds ...uint64) {
region := &metapb.Region{Id: regionID}
leader, _ := c.allocPeer(leaderID)
Expand Down Expand Up @@ -101,10 +110,10 @@ func (c *testClusterInfo) updateSnapshotCount(storeID uint64, snapshotCount int)
}

func newTestScheduleConfig() (*ScheduleConfig, *scheduleOption) {
cfg := newScheduleConfig()
cfg := NewConfig()
cfg.adjust()
opt := newScheduleOption(cfg)
return cfg, opt
return &cfg.ScheduleCfg, opt
}

var _ = Suite(&testLeaderBalancerSuite{})
Expand Down Expand Up @@ -195,6 +204,43 @@ func (s *testStorageBalancerSuite) Test(c *C) {
c.Assert(sb.Schedule(cluster), IsNil)
}

func (s *testStorageBalancerSuite) TestConstraints(c *C) {
cluster := newClusterInfo(newMockIDAllocator())
tc := newTestClusterInfo(cluster)

cfg, opt := newTestScheduleConfig()
cfg.MinRegionCount = 1
cfg.MinBalanceDiffRatio = 0.01
opt.constraints, _ = newConstraints(1, []*Constraint{
{
Labels: map[string]string{"zone": "cn"},
Replicas: 1,
},
})
sb := newStorageBalancer(opt)

// Add stores 1,2,3,4.
tc.addLabelsStore(1, 6, 0.1, nil)
tc.addLabelsStore(2, 7, 0.2, nil)
tc.addLabelsStore(3, 8, 0.3, nil)
tc.addLabelsStore(4, 9, 0.4, map[string]string{"zone": "cn", "disk": "ssd"})
// Add region 1 with leader in store 4.
tc.addLeaderRegion(1, 4)

// Store 4 has most regions, but no other stores can match the constraint.
c.Assert(sb.Schedule(cluster), IsNil)

// Now store 3 can match the constarint too,
// We can transfer peer from store 4 to store 3.
tc.addLabelsStore(3, 8, 0.3, map[string]string{"zone": "cn"})
checkTransferPeer(c, sb.Schedule(cluster), 4, 3)

// Now both store 2 and 3 can match the constraint.
// But store 2 has fewer regions than store 3, so transfer peer to store 2.
tc.addLabelsStore(2, 7, 0.2, map[string]string{"zone": "cn", "disk": "hdd"})
checkTransferPeer(c, sb.Schedule(cluster), 4, 2)
}

var _ = Suite(&testReplicaCheckerSuite{})

type testReplicaCheckerSuite struct{}
Expand All @@ -207,7 +253,6 @@ func (s *testReplicaCheckerSuite) Test(c *C) {
rc := newReplicaChecker(cluster, opt)

cfg.MaxSnapshotCount = 2
cluster.putMeta(&metapb.Cluster{Id: 1, MaxPeerCount: 3})

// Add stores 1,2,3,4.
tc.addRegionStore(1, 4, 0.4)
Expand Down Expand Up @@ -259,6 +304,79 @@ func (s *testReplicaCheckerSuite) Test(c *C) {
checkRemovePeer(c, rc.Check(region), 1)
}

func (s *testReplicaCheckerSuite) TestConstraints(c *C) {
cluster := newClusterInfo(newMockIDAllocator())
tc := newTestClusterInfo(cluster)

cfg, opt := newTestScheduleConfig()
cfg.MinRegionCount = 1
cfg.MinBalanceDiffRatio = 0.01
opt.constraints, _ = newConstraints(3, []*Constraint{
{
Labels: map[string]string{"zone": "us"},
Replicas: 2,
},
{
Labels: map[string]string{"zone": "cn", "disk": "ssd"},
Replicas: 1,
},
})
rc := newReplicaChecker(cluster, opt)

// Add stores 1,2,3,4.
tc.addLabelsStore(1, 6, 0.1, map[string]string{"zone": "us", "disk": "ssd"})
tc.addLabelsStore(2, 7, 0.2, nil)
tc.addLabelsStore(3, 8, 0.3, nil)
tc.addLabelsStore(4, 9, 0.4, nil)
// Add region 1 with leader in store 1.
tc.addLeaderRegion(1, 1)

// Although we can not satisfy any constraints, we will still ensure max replicas.
// So we add peers in store 2 and store 3, because they have smaller storage ratio.
region := cluster.getRegion(1)
checkAddPeer(c, rc.Check(region), 2)
peer2, _ := cluster.allocPeer(2)
region.Peers = append(region.Peers, peer2)
checkAddPeer(c, rc.Check(region), 3)
peer3, _ := cluster.allocPeer(3)
region.Peers = append(region.Peers, peer3)
c.Assert(rc.Check(region), IsNil)

// Add stores 1,2,3,4.
tc.addLabelsStore(1, 6, 0.1, map[string]string{"zone": "us", "disk": "ssd"})
tc.addLabelsStore(2, 7, 0.2, map[string]string{"zone": "us", "disk": "hdd"})
tc.addLabelsStore(3, 8, 0.3, nil)
tc.addLabelsStore(4, 9, 0.4, nil)
// Add region 1 with leader in store 1 and follower in store 2.
tc.addLeaderRegion(1, 1, 2)

// Now the first constraint has been satisfied, but the second hasn't.
// We still need to add peer in store 3 to ensure max replicas.
region = cluster.getRegion(1)
checkAddPeer(c, rc.Check(region), 3)
peer3, _ = cluster.allocPeer(3)
region.Peers = append(region.Peers, peer3)
c.Assert(rc.Check(region), IsNil)

// Add stores 1,2,3,4.
tc.addLabelsStore(1, 6, 0.1, map[string]string{"zone": "us", "disk": "ssd"})
tc.addLabelsStore(2, 7, 0.2, map[string]string{"zone": "us", "disk": "hdd"})
tc.addLabelsStore(3, 8, 0.3, nil)
tc.addLabelsStore(4, 9, 0.4, map[string]string{"zone": "cn", "disk": "ssd"})
// Add region 1 with leader in store 1 and followers in stores 2,3.
tc.addLeaderRegion(1, 1, 2, 3)

// Stores 1,2 satisfy the first constraint, we need to add peer in
// store 4 to satisfy the second constraint.
// Then we can remove peer in store 3 because we have satisfied
// all constraints and have enough replicas.
region = cluster.getRegion(1)
checkAddPeer(c, rc.Check(region), 4)
peer4, _ := cluster.allocPeer(4)
region.Peers = append(region.Peers, peer4)
checkRemovePeer(c, rc.Check(region), 3)
}

func checkAddPeer(c *C, bop *balanceOperator, storeID uint64) {
op := bop.Ops[0].(*onceOperator).Op.(*changePeerOperator)
c.Assert(op.ChangePeer.GetChangeType(), Equals, raftpb.ConfChangeType_AddNode)
Expand Down
12 changes: 12 additions & 0 deletions server/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,18 @@ func (c *clusterInfo) randFollowerRegion(storeID uint64) *regionInfo {
return c.regions.randFollowerRegion(storeID)
}

func (c *clusterInfo) getRegionStores(region *regionInfo) []*storeInfo {
c.RLock()
defer c.RUnlock()
var stores []*storeInfo
for id := range region.GetStoreIds() {
if store := c.stores.getStore(id); store != nil {
stores = append(stores, store)
}
}
return stores
}

func (c *clusterInfo) getFollowerStores(region *regionInfo) []*storeInfo {
c.RLock()
defer c.RUnlock()
Expand Down
3 changes: 3 additions & 0 deletions server/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,9 @@ func (s *testClusterInfoSuite) testRegionHeartbeat(c *C, cache *clusterInfo) {
}

for _, region := range regions {
for _, store := range cache.getRegionStores(region) {
c.Assert(region.GetStorePeer(store.GetId()), NotNil)
}
for _, store := range cache.getFollowerStores(region) {
peer := region.GetStorePeer(store.GetId())
c.Assert(peer.GetId(), Not(Equals), region.Leader.GetId())
Expand Down
24 changes: 11 additions & 13 deletions server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,27 +318,25 @@ func (c *RaftCluster) putStore(store *metapb.Store) error {

cluster := c.cachedCluster

// There are 3 cases here:
// Case 1: store id exists with the same address - do nothing;
// Case 2: store id exists with different address - update address;
if s := cluster.getStore(store.GetId()); s != nil {
if s.GetAddress() == store.GetAddress() {
return nil
}
s.Address = store.Address
return cluster.putStore(s)
}

// Case 3: store id does not exist, check duplicated address.
// Store address can not be the same as other stores.
for _, s := range cluster.getStores() {
// It's OK to start a new store on the same address if the old store has been removed.
if s.isTombstone() {
continue
}
if s.GetAddress() == store.GetAddress() {
if s.GetId() != store.GetId() && s.GetAddress() == store.GetAddress() {
return errors.Errorf("duplicated store address: %v, already registered by %v", store, s.Store)
}
}

// Store exists, update store meta.
if s := cluster.getStore(store.GetId()); s != nil {
s.Address = store.Address
s.Labels = store.Labels
return cluster.putStore(s)
}

// Store does not exist, add a new store.
return cluster.putStore(newStoreInfo(store))
}

Expand Down
5 changes: 5 additions & 0 deletions server/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,11 @@ func (s *testClusterSuite) testPutStore(c *C, conn net.Conn, clusterID uint64, s
// Put a new store.
resp = putStore(c, conn, clusterID, s.newStore(c, 0, "127.0.0.1:12345"))
c.Assert(resp.PutStore, NotNil)

// Put an existed store with duplicated address with other old stores.
s.resetStoreState(c, store.GetId(), metapb.StoreState_Up)
resp = putStore(c, conn, clusterID, s.newStore(c, store.GetId(), "127.0.0.1:12345"))
c.Assert(resp.PutStore, IsNil)
}

func (s *testClusterSuite) resetStoreState(c *C, storeID uint64, state metapb.StoreState) {
Expand Down
Loading

0 comments on commit 247fa4f

Please sign in to comment.