Skip to content

Commit

Permalink
*: split into multiple runner for heartbeat (#8130)
Browse files Browse the repository at this point in the history
ref #7897

Signed-off-by: Ryan Leung <rleungx@gmail.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
rleungx and ti-chi-bot[bot] committed May 7, 2024
1 parent b41c897 commit a3c5950
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 141 deletions.
3 changes: 2 additions & 1 deletion pkg/core/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type MetaProcessContext struct {
context.Context
Tracer RegionHeartbeatProcessTracer
TaskRunner ratelimit.Runner
Limiter *ratelimit.ConcurrencyLimiter
LogRunner ratelimit.Runner
}

// NewMetaProcessContext creates a new MetaProcessContext.
Expand All @@ -35,6 +35,7 @@ func ContextTODO() *MetaProcessContext {
Context: context.TODO(),
Tracer: NewNoopHeartbeatProcessTracer(),
TaskRunner: ratelimit.NewSyncRunner(),
LogRunner: ratelimit.NewSyncRunner(),
// Limit default is nil
}
}
37 changes: 6 additions & 31 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -744,33 +744,26 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
// Save to storage if meta is updated.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
return func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync bool) {
taskRunner := ctx.TaskRunner
limiter := ctx.Limiter
logRunner := ctx.LogRunner
// print log asynchronously
debug, info := d, i
if taskRunner != nil {
if logRunner != nil {
debug = func(msg string, fields ...zap.Field) {
taskRunner.RunTask(
logRunner.RunTask(
ctx.Context,
ratelimit.TaskOpts{
TaskName: "Log",
Limit: limiter,
},
func(_ context.Context) {
d(msg, fields...)
},
ratelimit.WithTaskName("DebugLog"),
)
}
info = func(msg string, fields ...zap.Field) {
taskRunner.RunTask(
logRunner.RunTask(
ctx.Context,
ratelimit.TaskOpts{
TaskName: "Log",
Limit: limiter,
},
func(_ context.Context) {
i(msg, fields...)
},
ratelimit.WithTaskName("InfoLog"),
)
}
}
Expand Down Expand Up @@ -873,24 +866,6 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
}
}

// RegionHeartbeatStageName is the name of the stage of the region heartbeat.
const (
HandleStatsAsync = "HandleStatsAsync"
ObserveRegionStatsAsync = "ObserveRegionStatsAsync"
UpdateSubTree = "UpdateSubTree"
HandleOverlaps = "HandleOverlaps"
CollectRegionStatsAsync = "CollectRegionStatsAsync"
SaveRegionToKV = "SaveRegionToKV"
)

// ExtraTaskOpts returns the task options for the task.
func ExtraTaskOpts(ctx *MetaProcessContext, name string) ratelimit.TaskOpts {
return ratelimit.TaskOpts{
TaskName: name,
Limit: ctx.Limiter,
}
}

// RWLockStats is a read-write lock with statistics.
type RWLockStats struct {
syncutil.RWMutex
Expand Down
40 changes: 22 additions & 18 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ type Cluster struct {
clusterID uint64
running atomic.Bool

taskRunner ratelimit.Runner
hbConcurrencyLimiter *ratelimit.ConcurrencyLimiter
heartbeatRunnner ratelimit.Runner
logRunner ratelimit.Runner
}

const (
Expand All @@ -64,7 +64,8 @@ const (
collectWaitTime = time.Minute

// heartbeat relative const
hbConcurrentRunner = "heartbeat-concurrent-task-runner"
heartbeatTaskRunner = "heartbeat-task-runner"
logTaskRunner = "log-task-runner"
)

var syncRunner = ratelimit.NewSyncRunner()
Expand Down Expand Up @@ -92,8 +93,8 @@ func NewCluster(parentCtx context.Context, persistConfig *config.PersistConfig,
clusterID: clusterID,
checkMembershipCh: checkMembershipCh,

taskRunner: ratelimit.NewConcurrentRunner(hbConcurrentRunner, time.Minute),
hbConcurrencyLimiter: ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU() * 2)),
heartbeatRunnner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
}
c.coordinator = schedule.NewCoordinator(ctx, c, hbStreams)
err = c.ruleManager.Initialize(persistConfig.GetMaxReplicas(), persistConfig.GetLocationLabels(), persistConfig.GetIsolationLevel())
Expand Down Expand Up @@ -530,7 +531,8 @@ func (c *Cluster) StartBackgroundJobs() {
go c.runUpdateStoreStats()
go c.runCoordinator()
go c.runMetricsCollectionJob()
c.taskRunner.Start()
c.heartbeatRunnner.Start()
c.logRunner.Start()
c.running.Store(true)
}

Expand All @@ -541,7 +543,8 @@ func (c *Cluster) StopBackgroundJobs() {
}
c.running.Store(false)
c.coordinator.Stop()
c.taskRunner.Stop()
c.heartbeatRunnner.Stop()
c.logRunner.Stop()
c.cancel()
c.wg.Wait()
}
Expand All @@ -557,16 +560,17 @@ func (c *Cluster) HandleRegionHeartbeat(region *core.RegionInfo) error {
if c.persistConfig.GetScheduleConfig().EnableHeartbeatBreakdownMetrics {
tracer = core.NewHeartbeatProcessTracer()
}
var runner ratelimit.Runner
runner = syncRunner
var taskRunner, logRunner ratelimit.Runner
taskRunner, logRunner = syncRunner, syncRunner
if c.persistConfig.GetScheduleConfig().EnableHeartbeatConcurrentRunner {
runner = c.taskRunner
taskRunner = c.heartbeatRunnner
logRunner = c.logRunner
}
ctx := &core.MetaProcessContext{
Context: c.ctx,
Limiter: c.hbConcurrencyLimiter,
Tracer: tracer,
TaskRunner: runner,
TaskRunner: taskRunner,
LogRunner: logRunner,
}
tracer.Begin()
if err := c.processRegionHeartbeat(ctx, region); err != nil {
Expand All @@ -590,10 +594,10 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c

ctx.TaskRunner.RunTask(
ctx,
core.ExtraTaskOpts(ctx, core.HandleStatsAsync),
func(_ context.Context) {
cluster.HandleStatsAsync(c, region)
},
ratelimit.WithTaskName(ratelimit.HandleStatsAsync),
)
tracer.OnAsyncHotStatsFinished()
hasRegionStats := c.regionStats != nil
Expand All @@ -607,22 +611,22 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
ctx.TaskRunner.RunTask(
ctx,
core.ExtraTaskOpts(ctx, core.ObserveRegionStatsAsync),
func(_ context.Context) {
if c.regionStats.RegionStatsNeedUpdate(region) {
cluster.Collect(c, region, hasRegionStats)
}
},
ratelimit.WithTaskName(ratelimit.ObserveRegionStatsAsync),
)
}
// region is not updated to the subtree.
if origin.GetRef() < 2 {
ctx.TaskRunner.RunTask(
ctx,
core.ExtraTaskOpts(ctx, core.UpdateSubTree),
func(_ context.Context) {
c.CheckAndPutSubTree(region)
},
ratelimit.WithTaskName(ratelimit.UpdateSubTree),
)
}
return nil
Expand All @@ -642,28 +646,28 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
}
ctx.TaskRunner.RunTask(
ctx,
core.ExtraTaskOpts(ctx, core.UpdateSubTree),
func(_ context.Context) {
c.CheckAndPutSubTree(region)
},
ratelimit.WithTaskName(ratelimit.UpdateSubTree),
)
tracer.OnUpdateSubTreeFinished()
ctx.TaskRunner.RunTask(
ctx,
core.ExtraTaskOpts(ctx, core.HandleOverlaps),
func(_ context.Context) {
cluster.HandleOverlaps(c, overlaps)
},
ratelimit.WithTaskName(ratelimit.HandleOverlaps),
)
}
tracer.OnSaveCacheFinished()
// handle region stats
ctx.TaskRunner.RunTask(
ctx,
core.ExtraTaskOpts(ctx, core.CollectRegionStatsAsync),
func(_ context.Context) {
cluster.Collect(c, region, hasRegionStats)
},
ratelimit.WithTaskName(ratelimit.CollectRegionStatsAsync),
)
tracer.OnCollectRegionStatsFinished()
return nil
Expand Down
Loading

0 comments on commit a3c5950

Please sign in to comment.