Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#7044
Browse files Browse the repository at this point in the history
close tikv#6988, close tikv#7016

Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
HuSharp authored and ti-chi-bot committed Nov 14, 2023
1 parent 190d5e2 commit 081af40
Show file tree
Hide file tree
Showing 13 changed files with 919 additions and 28 deletions.
60 changes: 52 additions & 8 deletions pkg/core/region.go
Expand Up @@ -70,8 +70,26 @@ type RegionInfo struct {
queryStats *pdpb.QueryStats
flowRoundDivisor uint64
// buckets is not thread unsafe, it should be accessed by the request `report buckets` with greater version.
buckets unsafe.Pointer
fromHeartbeat bool
buckets unsafe.Pointer
// source is used to indicate region's source, such as Storage/Sync/Heartbeat.
source RegionSource
}

// RegionSource is the source of region.
type RegionSource uint32

const (
// Storage means this region's meta info might be stale.
Storage RegionSource = iota
// Sync means this region's meta info is relatively fresher.
Sync
// Heartbeat means this region's meta info is relatively fresher.
Heartbeat
)

// LoadedFromStorage means this region's meta info loaded from storage.
func (r *RegionInfo) LoadedFromStorage() bool {
return r.source == Storage
}

// NewRegionInfo creates RegionInfo with region's meta and leader peer.
Expand Down Expand Up @@ -153,6 +171,7 @@ func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest, opts ...RegionC
}

region := &RegionInfo{
<<<<<<< HEAD
term: heartbeat.GetTerm(),
meta: heartbeat.GetRegion(),
leader: heartbeat.GetLeader(),
Expand All @@ -168,6 +187,29 @@ func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest, opts ...RegionC
interval: heartbeat.GetInterval(),
replicationStatus: heartbeat.GetReplicationStatus(),
queryStats: heartbeat.GetQueryStats(),
=======
term: heartbeat.GetTerm(),
meta: heartbeat.GetRegion(),
leader: heartbeat.GetLeader(),
downPeers: heartbeat.GetDownPeers(),
pendingPeers: heartbeat.GetPendingPeers(),
writtenBytes: heartbeat.GetBytesWritten(),
writtenKeys: heartbeat.GetKeysWritten(),
readBytes: heartbeat.GetBytesRead(),
readKeys: heartbeat.GetKeysRead(),
approximateSize: int64(regionSize),
approximateKeys: int64(heartbeat.GetApproximateKeys()),
interval: heartbeat.GetInterval(),
queryStats: heartbeat.GetQueryStats(),
source: Heartbeat,
}

// scheduling service doesn't need the following fields.
if h, ok := heartbeat.(*pdpb.RegionHeartbeatRequest); ok {
region.approximateKvSize = int64(h.GetApproximateKvSize() / units.MiB)
region.replicationStatus = h.GetReplicationStatus()
region.cpuUsage = h.GetCpuUsage()
>>>>>>> 2ba7ed4d5 (coordinator: use a healthy region count to start coordinator (#7044))
}

for _, opt := range opts {
Expand Down Expand Up @@ -630,11 +672,6 @@ func (r *RegionInfo) IsFlashbackChanged(l *RegionInfo) bool {
return r.meta.FlashbackStartTs != l.meta.FlashbackStartTs || r.meta.IsInFlashback != l.meta.IsInFlashback
}

// IsFromHeartbeat returns whether the region info is from the region heartbeat.
func (r *RegionInfo) IsFromHeartbeat() bool {
return r.fromHeartbeat
}

func (r *RegionInfo) isInvolved(startKey, endKey []byte) bool {
return bytes.Compare(r.GetStartKey(), startKey) >= 0 && (len(endKey) == 0 || (len(r.GetEndKey()) > 0 && bytes.Compare(r.GetEndKey(), endKey) <= 0))
}
Expand Down Expand Up @@ -674,7 +711,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
}
saveKV, saveCache, isNew = true, true, true
} else {
if !origin.IsFromHeartbeat() {
if origin.LoadedFromStorage() {
isNew = true
}
r := region.GetRegionEpoch()
Expand Down Expand Up @@ -1272,6 +1309,13 @@ func (r *RegionsInfo) GetStoreWriteRate(storeID uint64) (bytesRate, keysRate flo
return
}

// GetClusterNotFromStorageRegionsCnt gets the total count of regions that not loaded from storage anymore
func (r *RegionsInfo) GetClusterNotFromStorageRegionsCnt() int {
r.st.RLock()
defer r.st.RUnlock()
return r.tree.notFromStorageRegionsCnt
}

// GetMetaRegions gets a set of metapb.Region from regionMap
func (r *RegionsInfo) GetMetaRegions() []*metapb.Region {
r.t.RLock()
Expand Down
6 changes: 3 additions & 3 deletions pkg/core/region_option.go
Expand Up @@ -374,10 +374,10 @@ func WithInterval(interval *pdpb.TimeInterval) RegionCreateOption {
}
}

// SetFromHeartbeat sets if the region info comes from the region heartbeat.
func SetFromHeartbeat(fromHeartbeat bool) RegionCreateOption {
// SetSource sets the region info's come from.
func SetSource(source RegionSource) RegionCreateOption {
return func(region *RegionInfo) {
region.fromHeartbeat = fromHeartbeat
region.source = source
}
}

Expand Down
29 changes: 25 additions & 4 deletions pkg/core/region_tree.go
Expand Up @@ -61,14 +61,17 @@ type regionTree struct {
totalSize int64
totalWriteBytesRate float64
totalWriteKeysRate float64
// count the number of regions that not loaded from storage.
notFromStorageRegionsCnt int
}

func newRegionTree() *regionTree {
return &regionTree{
tree: btree.NewG[*regionItem](defaultBTreeDegree),
totalSize: 0,
totalWriteBytesRate: 0,
totalWriteKeysRate: 0,
tree: btree.NewG[*regionItem](defaultBTreeDegree),
totalSize: 0,
totalWriteBytesRate: 0,
totalWriteKeysRate: 0,
notFromStorageRegionsCnt: 0,
}
}

Expand Down Expand Up @@ -112,6 +115,9 @@ func (t *regionTree) update(item *regionItem, withOverlaps bool, overlaps ...*re
regionWriteBytesRate, regionWriteKeysRate := region.GetWriteRate()
t.totalWriteBytesRate += regionWriteBytesRate
t.totalWriteKeysRate += regionWriteKeysRate
if !region.LoadedFromStorage() {
t.notFromStorageRegionsCnt++
}

if !withOverlaps {
overlaps = t.overlaps(item)
Expand All @@ -133,6 +139,9 @@ func (t *regionTree) update(item *regionItem, withOverlaps bool, overlaps ...*re
regionWriteBytesRate, regionWriteKeysRate = old.GetWriteRate()
t.totalWriteBytesRate -= regionWriteBytesRate
t.totalWriteKeysRate -= regionWriteKeysRate
if !old.LoadedFromStorage() {
t.notFromStorageRegionsCnt--
}
}

return result
Expand All @@ -149,6 +158,15 @@ func (t *regionTree) updateStat(origin *RegionInfo, region *RegionInfo) {
regionWriteBytesRate, regionWriteKeysRate = origin.GetWriteRate()
t.totalWriteBytesRate -= regionWriteBytesRate
t.totalWriteKeysRate -= regionWriteKeysRate

// If the region meta information not loaded from storage anymore, decrease the counter.
if origin.LoadedFromStorage() && !region.LoadedFromStorage() {
t.notFromStorageRegionsCnt++
}
// If the region meta information updated to load from storage, increase the counter.
if !origin.LoadedFromStorage() && region.LoadedFromStorage() {
t.notFromStorageRegionsCnt--
}
}

// remove removes a region if the region is in the tree.
Expand All @@ -168,6 +186,9 @@ func (t *regionTree) remove(region *RegionInfo) {
regionWriteBytesRate, regionWriteKeysRate := result.GetWriteRate()
t.totalWriteBytesRate -= regionWriteBytesRate
t.totalWriteKeysRate -= regionWriteKeysRate
if !region.LoadedFromStorage() {
t.notFromStorageRegionsCnt--
}
t.tree.Delete(item)
}

Expand Down

0 comments on commit 081af40

Please sign in to comment.