Skip to content

Commit

Permalink
Merge branch 'master' into api-version
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] committed May 18, 2023
2 parents 9129cf7 + 6f6bf01 commit cd2a546
Show file tree
Hide file tree
Showing 9 changed files with 49 additions and 13 deletions.
4 changes: 2 additions & 2 deletions pkg/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,10 @@ func (bc *BasicCluster) ResetStoreLimit(storeID uint64, limitType storelimit.Typ

// UpdateStoreStatus updates the information of the store.
func (bc *BasicCluster) UpdateStoreStatus(storeID uint64) {
leaderCount, regionCount, witnessCount, pendingPeerCount, leaderRegionSize, regionSize := bc.RegionsInfo.GetStoreStats(storeID)
leaderCount, regionCount, witnessCount, learnerCount, pendingPeerCount, leaderRegionSize, regionSize := bc.RegionsInfo.GetStoreStats(storeID)
bc.Stores.mu.Lock()
defer bc.Stores.mu.Unlock()
bc.Stores.UpdateStoreStatus(storeID, leaderCount, regionCount, pendingPeerCount, leaderRegionSize, regionSize, witnessCount)
bc.Stores.UpdateStoreStatus(storeID, leaderCount, regionCount, witnessCount, learnerCount, pendingPeerCount, leaderRegionSize, regionSize)
}

// PutStore put a store.
Expand Down
4 changes: 2 additions & 2 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -1271,11 +1271,11 @@ func (r *RegionsInfo) GetMetaRegions() []*metapb.Region {
}

// GetStoreStats returns the store stats.
func (r *RegionsInfo) GetStoreStats(storeID uint64) (leader, region, witness, pending int, leaderSize, regionSize int64) {
func (r *RegionsInfo) GetStoreStats(storeID uint64) (leader, region, witness, learner, pending int, leaderSize, regionSize int64) {
r.st.RLock()
defer r.st.RUnlock()
return r.leaders[storeID].length(), r.getStoreRegionCountLocked(storeID), r.witnesses[storeID].length(),
r.pendingPeers[storeID].length(), r.leaders[storeID].TotalSize(), r.getStoreRegionSizeLocked(storeID)
r.learners[storeID].length(), r.pendingPeers[storeID].length(), r.leaders[storeID].TotalSize(), r.getStoreRegionSizeLocked(storeID)
}

// GetRegionCount gets the total count of RegionInfo of regionMap
Expand Down
9 changes: 8 additions & 1 deletion pkg/core/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type StoreInfo struct {
slowTrendEvicted bool // this store has been evicted as a slow store by trend, should not transfer leader to it
leaderCount int
regionCount int
learnerCount int
witnessCount int
leaderSize int64
regionSize int64
Expand Down Expand Up @@ -275,6 +276,11 @@ func (s *StoreInfo) GetRegionCount() int {
return s.regionCount
}

// GetLearnerCount returns the learner count of the store.
func (s *StoreInfo) GetLearnerCount() int {
return s.learnerCount
}

// GetWitnessCount returns the witness count of the store.
func (s *StoreInfo) GetWitnessCount() int {
return s.witnessCount
Expand Down Expand Up @@ -789,11 +795,12 @@ func (s *StoresInfo) SetRegionSize(storeID uint64, regionSize int64) {
}

// UpdateStoreStatus updates the information of the store.
func (s *StoresInfo) UpdateStoreStatus(storeID uint64, leaderCount int, regionCount int, pendingPeerCount int, leaderSize int64, regionSize int64, witnessCount int) {
func (s *StoresInfo) UpdateStoreStatus(storeID uint64, leaderCount, regionCount, witnessCount, learnerCount, pendingPeerCount int, leaderSize int64, regionSize int64) {
if store, ok := s.stores[storeID]; ok {
newStore := store.ShallowClone(SetLeaderCount(leaderCount),
SetRegionCount(regionCount),
SetWitnessCount(witnessCount),
SetLearnerCount(learnerCount),
SetPendingPeerCount(pendingPeerCount),
SetLeaderSize(leaderSize),
SetRegionSize(regionSize))
Expand Down
7 changes: 7 additions & 0 deletions pkg/core/store_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,13 @@ func SetRegionCount(regionCount int) StoreCreateOption {
}
}

// SetLearnerCount sets the learner count for the store.
func SetLearnerCount(learnerCount int) StoreCreateOption {
return func(store *StoreInfo) {
store.learnerCount = learnerCount
}
}

// SetWitnessCount sets the witness count for the store.
func SetWitnessCount(witnessCount int) StoreCreateOption {
return func(store *StoreInfo) {
Expand Down
7 changes: 7 additions & 0 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,13 @@ func (mc *Cluster) AddLabelsStore(storeID uint64, regionCount int, labels map[st
mc.PutStore(store)
}

// AddLabersStoreWithLearnerCount adds store with specified count of region, learner and labels.
func (mc *Cluster) AddLabersStoreWithLearnerCount(storeID uint64, regionCount int, learnerCount int, labels map[string]string) {
mc.AddLabelsStore(storeID, regionCount, labels)
store := mc.GetStore(storeID).Clone(core.SetLearnerCount(learnerCount))
mc.PutStore(store)
}

// AddLeaderRegion adds region with specified leader and followers.
func (mc *Cluster) AddLeaderRegion(regionID uint64, leaderStoreID uint64, otherPeerStoreIDs ...uint64) *core.RegionInfo {
origin := mc.newMockRegionInfo(regionID, leaderStoreID, otherPeerStoreIDs...)
Expand Down
5 changes: 5 additions & 0 deletions pkg/statistics/store_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type storeStatistics struct {
StorageCapacity uint64
RegionCount int
LeaderCount int
LearnerCount int
WitnessCount int
LabelCounter map[string]int
Preparing int
Expand Down Expand Up @@ -113,6 +114,7 @@ func (s *storeStatistics) Observe(store *core.StoreInfo, stats *StoresStats) {
s.RegionCount += store.GetRegionCount()
s.LeaderCount += store.GetLeaderCount()
s.WitnessCount += store.GetWitnessCount()
s.LearnerCount += store.GetLearnerCount()
limit, ok := store.GetStoreLimit().(*storelimit.SlidingWindows)
if ok {
cap := limit.GetCap()
Expand All @@ -131,6 +133,7 @@ func (s *storeStatistics) Observe(store *core.StoreInfo, stats *StoresStats) {
storeStatusGauge.WithLabelValues(storeAddress, id, "leader_size").Set(float64(store.GetLeaderSize()))
storeStatusGauge.WithLabelValues(storeAddress, id, "leader_count").Set(float64(store.GetLeaderCount()))
storeStatusGauge.WithLabelValues(storeAddress, id, "witness_count").Set(float64(store.GetWitnessCount()))
storeStatusGauge.WithLabelValues(storeAddress, id, "learner_count").Set(float64(store.GetLearnerCount()))
storeStatusGauge.WithLabelValues(storeAddress, id, "store_available").Set(float64(store.GetAvailable()))
storeStatusGauge.WithLabelValues(storeAddress, id, "store_used").Set(float64(store.GetUsedSize()))
storeStatusGauge.WithLabelValues(storeAddress, id, "store_capacity").Set(float64(store.GetCapacity()))
Expand Down Expand Up @@ -189,6 +192,7 @@ func (s *storeStatistics) Collect() {
metrics["region_count"] = float64(s.RegionCount)
metrics["leader_count"] = float64(s.LeaderCount)
metrics["witness_count"] = float64(s.WitnessCount)
metrics["learner_count"] = float64(s.LearnerCount)
metrics["storage_size"] = float64(s.StorageSize)
metrics["storage_capacity"] = float64(s.StorageCapacity)

Expand Down Expand Up @@ -260,6 +264,7 @@ func (s *storeStatistics) resetStoreStatistics(storeAddress string, id string) {
"leader_size",
"leader_count",
"witness_count",
"learner_count",
"store_available",
"store_used",
"store_capacity",
Expand Down
12 changes: 7 additions & 5 deletions server/api/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,10 @@ type StoreStatus struct {
RegionWeight float64 `json:"region_weight"`
RegionScore float64 `json:"region_score"`
RegionSize int64 `json:"region_size"`
WitnessCount int `json:"witness_count"`
SlowScore uint64 `json:"slow_score"`
SlowTrend SlowTrend `json:"slow_trend"`
LearnerCount int `json:"learner_count,omitempty"`
WitnessCount int `json:"witness_count,omitempty"`
SlowScore uint64 `json:"slow_score,omitempty"`
SlowTrend *SlowTrend `json:"slow_trend,omitempty"`
SendingSnapCount uint32 `json:"sending_snap_count,omitempty"`
ReceivingSnapCount uint32 `json:"receiving_snap_count,omitempty"`
IsBusy bool `json:"is_busy,omitempty"`
Expand All @@ -98,10 +99,10 @@ const (
)

func newStoreInfo(opt *config.ScheduleConfig, store *core.StoreInfo) *StoreInfo {
var slowTrend SlowTrend
var slowTrend *SlowTrend
coreSlowTrend := store.GetSlowTrend()
if coreSlowTrend != nil {
slowTrend = SlowTrend{coreSlowTrend.CauseValue, coreSlowTrend.CauseRate, coreSlowTrend.ResultValue, coreSlowTrend.ResultRate}
slowTrend = &SlowTrend{coreSlowTrend.CauseValue, coreSlowTrend.CauseRate, coreSlowTrend.ResultValue, coreSlowTrend.ResultRate}
}
s := &StoreInfo{
Store: &MetaStore{
Expand All @@ -120,6 +121,7 @@ func newStoreInfo(opt *config.ScheduleConfig, store *core.StoreInfo) *StoreInfo
RegionWeight: store.GetRegionWeight(),
RegionScore: store.RegionScore(opt.RegionScoreFormulaVersion, opt.HighSpaceRatio, opt.LowSpaceRatio, 0),
RegionSize: store.GetRegionSize(),
LearnerCount: store.GetLearnerCount(),
WitnessCount: store.GetWitnessCount(),
SlowScore: store.GetSlowScore(),
SlowTrend: slowTrend,
Expand Down
4 changes: 4 additions & 0 deletions server/replication/replication_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,10 @@ func (m *ModeManager) checkStoreStatus() [][]uint64 {
if s.IsRemoved() {
continue
}
// learner peers do not participate in major commit or vote, so it should not count in primary/dr as a normal store.
if s.GetRegionCount() == s.GetLearnerCount() {
continue
}
down := s.DownTime() >= m.config.DRAutoSync.WaitStoreTimeout.Duration
labelValue := s.GetLabelValue(m.config.DRAutoSync.LabelKey)
if labelValue == m.config.DRAutoSync.Primary {
Expand Down
10 changes: 7 additions & 3 deletions server/replication/replication_mode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func TestStateSwitch(t *testing.T) {
Primary: "zone1",
DR: "zone2",
PrimaryReplicas: 4,
DRReplicas: 1,
DRReplicas: 2,
WaitStoreTimeout: typeutil.Duration{Duration: time.Minute},
}}
cluster := mockcluster.NewCluster(ctx, mockconfig.NewTestOptions())
Expand Down Expand Up @@ -215,7 +215,7 @@ func TestStateSwitch(t *testing.T) {

// add new store in dr zone.
cluster.AddLabelsStore(5, 1, map[string]string{"zone": "zone2"})
cluster.AddLabelsStore(6, 1, map[string]string{"zone": "zone2"})
cluster.AddLabersStoreWithLearnerCount(6, 1, 1, map[string]string{"zone": "zone2"})
// async -> sync
rep.tickDR()
re.Equal(drStateSyncRecover, rep.drGetState())
Expand All @@ -234,10 +234,14 @@ func TestStateSwitch(t *testing.T) {
rep.tickDR()
re.Equal(drStateSync, rep.drGetState()) // cannot guarantee majority, keep sync.

setStoreState(cluster, "up", "up", "up", "up", "up", "down")
rep.tickDR()
re.Equal(drStateSync, rep.drGetState())

// once the voter node down, even learner node up, swith to async state.
setStoreState(cluster, "up", "up", "up", "up", "down", "up")
rep.tickDR()
re.Equal(drStateAsyncWait, rep.drGetState())
assertStateIDUpdate()

rep.drSwitchToSync()
replicator.errors[2] = errors.New("fail to replicate")
Expand Down

0 comments on commit cd2a546

Please sign in to comment.