Skip to content

Commit

Permalink
reduce store lock
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Jul 21, 2023
1 parent dac6dad commit be26c11
Show file tree
Hide file tree
Showing 15 changed files with 260 additions and 590 deletions.
187 changes: 6 additions & 181 deletions pkg/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,192 +14,24 @@

package core

import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/pkg/core/storelimit"
"github.com/tikv/pd/pkg/utils/syncutil"
)

// BasicCluster provides basic data member and interface for a tikv cluster.
type BasicCluster struct {
Stores struct {
mu syncutil.RWMutex
*StoresInfo
}

*StoresInfo
*RegionsInfo
}

// NewBasicCluster creates a BasicCluster.
func NewBasicCluster() *BasicCluster {
return &BasicCluster{
Stores: struct {
mu syncutil.RWMutex
*StoresInfo
}{StoresInfo: NewStoresInfo()},

StoresInfo: NewStoresInfo(),
RegionsInfo: NewRegionsInfo(),
}
}

/* Stores read operations */

// GetStores returns all Stores in the cluster.
func (bc *BasicCluster) GetStores() []*StoreInfo {
bc.Stores.mu.RLock()
defer bc.Stores.mu.RUnlock()
return bc.Stores.GetStores()
}

// GetMetaStores gets a complete set of metapb.Store.
func (bc *BasicCluster) GetMetaStores() []*metapb.Store {
bc.Stores.mu.RLock()
defer bc.Stores.mu.RUnlock()
return bc.Stores.GetMetaStores()
}

// GetStore searches for a store by ID.
func (bc *BasicCluster) GetStore(storeID uint64) *StoreInfo {
bc.Stores.mu.RLock()
defer bc.Stores.mu.RUnlock()
return bc.Stores.GetStore(storeID)
}

// GetRegionStores returns all Stores that contains the region's peer.
func (bc *BasicCluster) GetRegionStores(region *RegionInfo) []*StoreInfo {
bc.Stores.mu.RLock()
defer bc.Stores.mu.RUnlock()
var Stores []*StoreInfo
for id := range region.GetStoreIDs() {
if store := bc.Stores.GetStore(id); store != nil {
Stores = append(Stores, store)
}
}
return Stores
}

// GetNonWitnessVoterStores returns all Stores that contains the non-witness's voter peer.
func (bc *BasicCluster) GetNonWitnessVoterStores(region *RegionInfo) []*StoreInfo {
bc.Stores.mu.RLock()
defer bc.Stores.mu.RUnlock()
var Stores []*StoreInfo
for id := range region.GetNonWitnessVoters() {
if store := bc.Stores.GetStore(id); store != nil {
Stores = append(Stores, store)
}
}
return Stores
}

// GetFollowerStores returns all Stores that contains the region's follower peer.
func (bc *BasicCluster) GetFollowerStores(region *RegionInfo) []*StoreInfo {
bc.Stores.mu.RLock()
defer bc.Stores.mu.RUnlock()
var Stores []*StoreInfo
for id := range region.GetFollowers() {
if store := bc.Stores.GetStore(id); store != nil {
Stores = append(Stores, store)
}
}
return Stores
}

// GetLeaderStore returns all Stores that contains the region's leader peer.
func (bc *BasicCluster) GetLeaderStore(region *RegionInfo) *StoreInfo {
bc.Stores.mu.RLock()
defer bc.Stores.mu.RUnlock()
return bc.Stores.GetStore(region.GetLeader().GetStoreId())
}

// GetStoreCount returns the total count of storeInfo.
func (bc *BasicCluster) GetStoreCount() int {
bc.Stores.mu.RLock()
defer bc.Stores.mu.RUnlock()
return bc.Stores.GetStoreCount()
}

/* Stores Write operations */

// PauseLeaderTransfer prevents the store from been selected as source or
// target store of TransferLeader.
func (bc *BasicCluster) PauseLeaderTransfer(storeID uint64) error {
bc.Stores.mu.Lock()
defer bc.Stores.mu.Unlock()
return bc.Stores.PauseLeaderTransfer(storeID)
}

// ResumeLeaderTransfer cleans a store's pause state. The store can be selected
// as source or target of TransferLeader again.
func (bc *BasicCluster) ResumeLeaderTransfer(storeID uint64) {
bc.Stores.mu.Lock()
defer bc.Stores.mu.Unlock()
bc.Stores.ResumeLeaderTransfer(storeID)
}

// SlowStoreEvicted marks a store as a slow store and prevents transferring
// leader to the store
func (bc *BasicCluster) SlowStoreEvicted(storeID uint64) error {
bc.Stores.mu.Lock()
defer bc.Stores.mu.Unlock()
return bc.Stores.SlowStoreEvicted(storeID)
}

// SlowTrendEvicted marks a store as a slow store by trend and prevents transferring
// leader to the store
func (bc *BasicCluster) SlowTrendEvicted(storeID uint64) error {
bc.Stores.mu.Lock()
defer bc.Stores.mu.Unlock()
return bc.Stores.SlowTrendEvicted(storeID)
}

// SlowTrendRecovered cleans the evicted by slow trend state of a store.
func (bc *BasicCluster) SlowTrendRecovered(storeID uint64) {
bc.Stores.mu.Lock()
defer bc.Stores.mu.Unlock()
bc.Stores.SlowTrendRecovered(storeID)
}

// SlowStoreRecovered cleans the evicted state of a store.
func (bc *BasicCluster) SlowStoreRecovered(storeID uint64) {
bc.Stores.mu.Lock()
defer bc.Stores.mu.Unlock()
bc.Stores.SlowStoreRecovered(storeID)
}

// ResetStoreLimit resets the limit for a specific store.
func (bc *BasicCluster) ResetStoreLimit(storeID uint64, limitType storelimit.Type, ratePerSec ...float64) {
bc.Stores.mu.Lock()
defer bc.Stores.mu.Unlock()
bc.Stores.ResetStoreLimit(storeID, limitType, ratePerSec...)
}

// UpdateStoreStatus updates the information of the store.
func (bc *BasicCluster) UpdateStoreStatus(storeID uint64) {
leaderCount, regionCount, witnessCount, learnerCount, pendingPeerCount, leaderRegionSize, regionSize := bc.RegionsInfo.GetStoreStats(storeID)
bc.Stores.mu.Lock()
defer bc.Stores.mu.Unlock()
bc.Stores.UpdateStoreStatus(storeID, leaderCount, regionCount, witnessCount, learnerCount, pendingPeerCount, leaderRegionSize, regionSize)
}

// PutStore put a store.
func (bc *BasicCluster) PutStore(store *StoreInfo) {
bc.Stores.mu.Lock()
defer bc.Stores.mu.Unlock()
bc.Stores.SetStore(store)
}

// ResetStores resets the store cache.
func (bc *BasicCluster) ResetStores() {
bc.Stores.mu.Lock()
defer bc.Stores.mu.Unlock()
bc.Stores.StoresInfo = NewStoresInfo()
}

// DeleteStore deletes a store.
func (bc *BasicCluster) DeleteStore(store *StoreInfo) {
bc.Stores.mu.Lock()
defer bc.Stores.mu.Unlock()
bc.Stores.DeleteStore(store)
bc.StoresInfo.UpdateStoreStatus(storeID, leaderCount, regionCount, witnessCount, learnerCount, pendingPeerCount, leaderRegionSize, regionSize)
}

/* Regions read operations */
Expand All @@ -211,21 +43,14 @@ func (bc *BasicCluster) GetLeaderStoreByRegionID(regionID uint64) *StoreInfo {
return nil
}

bc.Stores.mu.RLock()
defer bc.Stores.mu.RUnlock()
return bc.Stores.GetStore(region.GetLeader().GetStoreId())
return bc.StoresInfo.GetStore(region.GetLeader().GetStoreId())
}

func (bc *BasicCluster) getWriteRate(
f func(storeID uint64) (bytesRate, keysRate float64),
) (storeIDs []uint64, bytesRates, keysRates []float64) {
bc.Stores.mu.RLock()
count := len(bc.Stores.stores)
storeIDs = make([]uint64, 0, count)
for _, store := range bc.Stores.stores {
storeIDs = append(storeIDs, store.GetID())
}
bc.Stores.mu.RUnlock()
storeIDs = bc.GetStoreIDs()
count := len(storeIDs)
bytesRates = make([]float64, 0, count)
keysRates = make([]float64, 0, count)
for _, id := range storeIDs {
Expand Down
Loading

0 comments on commit be26c11

Please sign in to comment.