Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: refactor store info #6830

Merged
merged 5 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 10 additions & 185 deletions pkg/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,218 +14,43 @@

package core

import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/pkg/core/storelimit"
"github.com/tikv/pd/pkg/utils/syncutil"
)

// BasicCluster provides basic data member and interface for a tikv cluster.
type BasicCluster struct {
Stores struct {
mu syncutil.RWMutex
*StoresInfo
}

*StoresInfo
*RegionsInfo
}

// NewBasicCluster creates a BasicCluster.
func NewBasicCluster() *BasicCluster {
return &BasicCluster{
Stores: struct {
mu syncutil.RWMutex
*StoresInfo
}{StoresInfo: NewStoresInfo()},

StoresInfo: NewStoresInfo(),
RegionsInfo: NewRegionsInfo(),
}
}

/* Stores read operations */

// GetStores returns all Stores in the cluster.
func (bc *BasicCluster) GetStores() []*StoreInfo {
bc.Stores.mu.RLock()
defer bc.Stores.mu.RUnlock()
return bc.Stores.GetStores()
}

// GetMetaStores gets a complete set of metapb.Store.
func (bc *BasicCluster) GetMetaStores() []*metapb.Store {
bc.Stores.mu.RLock()
defer bc.Stores.mu.RUnlock()
return bc.Stores.GetMetaStores()
}

// GetStore searches for a store by ID.
func (bc *BasicCluster) GetStore(storeID uint64) *StoreInfo {
bc.Stores.mu.RLock()
defer bc.Stores.mu.RUnlock()
return bc.Stores.GetStore(storeID)
}

// GetRegionStores returns all Stores that contains the region's peer.
func (bc *BasicCluster) GetRegionStores(region *RegionInfo) []*StoreInfo {
bc.Stores.mu.RLock()
defer bc.Stores.mu.RUnlock()
var Stores []*StoreInfo
for id := range region.GetStoreIDs() {
if store := bc.Stores.GetStore(id); store != nil {
Stores = append(Stores, store)
}
}
return Stores
}

// GetNonWitnessVoterStores returns all Stores that contains the non-witness's voter peer.
func (bc *BasicCluster) GetNonWitnessVoterStores(region *RegionInfo) []*StoreInfo {
bc.Stores.mu.RLock()
defer bc.Stores.mu.RUnlock()
var Stores []*StoreInfo
for id := range region.GetNonWitnessVoters() {
if store := bc.Stores.GetStore(id); store != nil {
Stores = append(Stores, store)
}
}
return Stores
}

// GetFollowerStores returns all Stores that contains the region's follower peer.
func (bc *BasicCluster) GetFollowerStores(region *RegionInfo) []*StoreInfo {
bc.Stores.mu.RLock()
defer bc.Stores.mu.RUnlock()
var Stores []*StoreInfo
for id := range region.GetFollowers() {
if store := bc.Stores.GetStore(id); store != nil {
Stores = append(Stores, store)
}
}
return Stores
}

// GetLeaderStore returns all Stores that contains the region's leader peer.
func (bc *BasicCluster) GetLeaderStore(region *RegionInfo) *StoreInfo {
bc.Stores.mu.RLock()
defer bc.Stores.mu.RUnlock()
return bc.Stores.GetStore(region.GetLeader().GetStoreId())
}

// GetStoreCount returns the total count of storeInfo.
func (bc *BasicCluster) GetStoreCount() int {
bc.Stores.mu.RLock()
defer bc.Stores.mu.RUnlock()
return bc.Stores.GetStoreCount()
}

/* Stores Write operations */

// PauseLeaderTransfer prevents the store from been selected as source or
// target store of TransferLeader.
func (bc *BasicCluster) PauseLeaderTransfer(storeID uint64) error {
bc.Stores.mu.Lock()
defer bc.Stores.mu.Unlock()
return bc.Stores.PauseLeaderTransfer(storeID)
}

// ResumeLeaderTransfer cleans a store's pause state. The store can be selected
// as source or target of TransferLeader again.
func (bc *BasicCluster) ResumeLeaderTransfer(storeID uint64) {
bc.Stores.mu.Lock()
defer bc.Stores.mu.Unlock()
bc.Stores.ResumeLeaderTransfer(storeID)
}

// SlowStoreEvicted marks a store as a slow store and prevents transferring
// leader to the store
func (bc *BasicCluster) SlowStoreEvicted(storeID uint64) error {
bc.Stores.mu.Lock()
defer bc.Stores.mu.Unlock()
return bc.Stores.SlowStoreEvicted(storeID)
}

// SlowTrendEvicted marks a store as a slow store by trend and prevents transferring
// leader to the store
func (bc *BasicCluster) SlowTrendEvicted(storeID uint64) error {
bc.Stores.mu.Lock()
defer bc.Stores.mu.Unlock()
return bc.Stores.SlowTrendEvicted(storeID)
}

// SlowTrendRecovered cleans the evicted by slow trend state of a store.
func (bc *BasicCluster) SlowTrendRecovered(storeID uint64) {
bc.Stores.mu.Lock()
defer bc.Stores.mu.Unlock()
bc.Stores.SlowTrendRecovered(storeID)
}

// SlowStoreRecovered cleans the evicted state of a store.
func (bc *BasicCluster) SlowStoreRecovered(storeID uint64) {
bc.Stores.mu.Lock()
defer bc.Stores.mu.Unlock()
bc.Stores.SlowStoreRecovered(storeID)
}

// ResetStoreLimit resets the limit for a specific store.
func (bc *BasicCluster) ResetStoreLimit(storeID uint64, limitType storelimit.Type, ratePerSec ...float64) {
bc.Stores.mu.Lock()
defer bc.Stores.mu.Unlock()
bc.Stores.ResetStoreLimit(storeID, limitType, ratePerSec...)
}

// UpdateStoreStatus updates the information of the store.
func (bc *BasicCluster) UpdateStoreStatus(storeID uint64) {
leaderCount, regionCount, witnessCount, learnerCount, pendingPeerCount, leaderRegionSize, regionSize := bc.RegionsInfo.GetStoreStats(storeID)
bc.Stores.mu.Lock()
defer bc.Stores.mu.Unlock()
bc.Stores.UpdateStoreStatus(storeID, leaderCount, regionCount, witnessCount, learnerCount, pendingPeerCount, leaderRegionSize, regionSize)
}

// PutStore put a store.
func (bc *BasicCluster) PutStore(store *StoreInfo) {
bc.Stores.mu.Lock()
defer bc.Stores.mu.Unlock()
bc.Stores.SetStore(store)
}

// ResetStores resets the store cache.
func (bc *BasicCluster) ResetStores() {
bc.Stores.mu.Lock()
defer bc.Stores.mu.Unlock()
bc.Stores.StoresInfo = NewStoresInfo()
}

// DeleteStore deletes a store.
func (bc *BasicCluster) DeleteStore(store *StoreInfo) {
bc.Stores.mu.Lock()
defer bc.Stores.mu.Unlock()
bc.Stores.DeleteStore(store)
leaderCount, regionCount, witnessCount, learnerCount, pendingPeerCount, leaderRegionSize, regionSize := bc.GetStoreStats(storeID)
bc.StoresInfo.UpdateStoreStatus(storeID, leaderCount, regionCount, witnessCount, learnerCount, pendingPeerCount, leaderRegionSize, regionSize)
}

/* Regions read operations */

// GetLeaderStoreByRegionID returns the leader store of the given region.
func (bc *BasicCluster) GetLeaderStoreByRegionID(regionID uint64) *StoreInfo {
region := bc.RegionsInfo.GetRegion(regionID)
region := bc.GetRegion(regionID)
if region == nil || region.GetLeader() == nil {
return nil
}

bc.Stores.mu.RLock()
defer bc.Stores.mu.RUnlock()
return bc.Stores.GetStore(region.GetLeader().GetStoreId())
return bc.GetStore(region.GetLeader().GetStoreId())
}

func (bc *BasicCluster) getWriteRate(
f func(storeID uint64) (bytesRate, keysRate float64),
) (storeIDs []uint64, bytesRates, keysRates []float64) {
bc.Stores.mu.RLock()
count := len(bc.Stores.stores)
storeIDs = make([]uint64, 0, count)
for _, store := range bc.Stores.stores {
storeIDs = append(storeIDs, store.GetID())
}
bc.Stores.mu.RUnlock()
storeIDs = bc.GetStoreIDs()
count := len(storeIDs)
bytesRates = make([]float64, 0, count)
keysRates = make([]float64, 0, count)
for _, id := range storeIDs {
Expand All @@ -238,12 +63,12 @@

// GetStoresLeaderWriteRate get total write rate of each store's leaders.
func (bc *BasicCluster) GetStoresLeaderWriteRate() (storeIDs []uint64, bytesRates, keysRates []float64) {
return bc.getWriteRate(bc.RegionsInfo.GetStoreLeaderWriteRate)
return bc.getWriteRate(bc.GetStoreLeaderWriteRate)

Check warning on line 66 in pkg/core/basic_cluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/basic_cluster.go#L66

Added line #L66 was not covered by tests
}

// GetStoresWriteRate get total write rate of each store's regions.
func (bc *BasicCluster) GetStoresWriteRate() (storeIDs []uint64, bytesRates, keysRates []float64) {
return bc.getWriteRate(bc.RegionsInfo.GetStoreWriteRate)
return bc.getWriteRate(bc.GetStoreWriteRate)
}

// UpdateAllStoreStatus updates the information of all stores.
Expand Down
Loading