Skip to content

Commit

Permalink
Merge pull request #112003 from sanposhiho/metrics-goroutine
Browse files Browse the repository at this point in the history
feature(scheduler): add "goroutines" metric and deprecate the "scheduler_goroutines" metric
  • Loading branch information
k8s-ci-robot committed Sep 12, 2022
2 parents 5858aa2 + 08bd123 commit 3ac752e
Show file tree
Hide file tree
Showing 12 changed files with 40 additions and 18 deletions.
12 changes: 10 additions & 2 deletions pkg/scheduler/framework/parallelize/parallelism.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"math"

"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/scheduler/metrics"
)

// DefaultParallelism is the default parallelism used in scheduler.
Expand Down Expand Up @@ -51,6 +52,13 @@ func chunkSizeFor(n, parallelism int) int {
}

// Until is a wrapper around workqueue.ParallelizeUntil to use in scheduling algorithms.
func (p Parallelizer) Until(ctx context.Context, pieces int, doWorkPiece workqueue.DoWorkPieceFunc) {
workqueue.ParallelizeUntil(ctx, p.parallelism, pieces, doWorkPiece, workqueue.WithChunkSize(chunkSizeFor(pieces, p.parallelism)))
// A given operation will be a label that is recorded in the goroutine metric.
func (p Parallelizer) Until(ctx context.Context, pieces int, doWorkPiece workqueue.DoWorkPieceFunc, operation string) {
withMetrics := func(piece int) {
metrics.Goroutines.WithLabelValues(operation).Inc()
defer metrics.Goroutines.WithLabelValues(operation).Dec()
doWorkPiece(piece)
}

workqueue.ParallelizeUntil(ctx, p.parallelism, pieces, withMetrics, workqueue.WithChunkSize(chunkSizeFor(pieces, p.parallelism)))
}
4 changes: 2 additions & 2 deletions pkg/scheduler/framework/plugins/interpodaffinity/filtering.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (pl *InterPodAffinity) getExistingAntiAffinityCounts(ctx context.Context, p
topoMaps[atomic.AddInt32(&index, 1)] = topoMap
}
}
pl.parallelizer.Until(ctx, len(nodes), processNode)
pl.parallelizer.Until(ctx, len(nodes), processNode, pl.Name())

result := make(topologyToMatchedTermCount)
for i := 0; i <= int(index); i++ {
Expand Down Expand Up @@ -216,7 +216,7 @@ func (pl *InterPodAffinity) getIncomingAffinityAntiAffinityCounts(ctx context.Co
antiAffinityCountsList[k] = antiAffinity
}
}
pl.parallelizer.Until(ctx, len(allNodes), processNode)
pl.parallelizer.Until(ctx, len(allNodes), processNode, pl.Name())

for i := 0; i <= int(index); i++ {
affinityCounts.append(affinityCountsList[i])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (pl *InterPodAffinity) PreScore(
topoScores[atomic.AddInt32(&index, 1)] = topoScore
}
}
pl.parallelizer.Until(pCtx, len(allNodes), processNode)
pl.parallelizer.Until(pCtx, len(allNodes), processNode, pl.Name())

for i := 0; i <= int(index); i++ {
state.topologyScore.append(topoScores[i])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func (pl *PodTopologySpread) calPreFilterState(ctx context.Context, pod *v1.Pod)
}
tpCountsByNode[i] = tpCounts
}
pl.parallelizer.Until(ctx, len(allNodes), processNode)
pl.parallelizer.Until(ctx, len(allNodes), processNode, pl.Name())

for _, tpCounts := range tpCountsByNode {
for tp, count := range tpCounts {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2013,7 +2013,7 @@ func BenchmarkFilter(b *testing.B) {
n, _ := p.sharedLister.NodeInfos().Get(allNodes[i].Name)
p.Filter(ctx, state, tt.pod, n)
}
p.parallelizer.Until(ctx, len(allNodes), filterNode)
p.parallelizer.Until(ctx, len(allNodes), filterNode, "")
}
})
b.Run(tt.name+"/Clone", func(b *testing.B) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (pl *PodTopologySpread) PreScore(
atomic.AddInt64(tpCount, int64(count))
}
}
pl.parallelizer.Until(ctx, len(allNodes), processAllNode)
pl.parallelizer.Until(ctx, len(allNodes), processAllNode, pl.Name())

cycleState.Write(preScoreStateKey, state)
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1408,7 +1408,7 @@ func BenchmarkTestDefaultEvenPodsSpreadPriority(b *testing.B) {
score, _ := p.Score(ctx, state, pod, n.Name)
gotList[i] = framework.NodeScore{Name: n.Name, Score: score}
}
p.parallelizer.Until(ctx, len(filteredNodes), scoreNode)
p.parallelizer.Until(ctx, len(filteredNodes), scoreNode, "")
status = p.NormalizeScore(ctx, state, pod, gotList)
if !status.IsSuccess() {
b.Fatal(status)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func BenchmarkTestSelectorSpreadPriority(b *testing.B) {
gotList[i] = framework.NodeScore{Name: n.Name, Score: score}
}
parallelizer := parallelize.NewParallelizer(parallelize.DefaultParallelism)
parallelizer.Until(ctx, len(filteredNodes), scoreNode)
parallelizer.Until(ctx, len(filteredNodes), scoreNode, "")
status = plugin.NormalizeScore(ctx, state, pod, gotList)
if !status.IsSuccess() {
b.Fatal(status)
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/framework/preemption/preemption.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,6 @@ func (ev *Evaluator) DryRunPreemption(ctx context.Context, pod *v1.Pod, potentia
nodeStatuses[nodeInfoCopy.Node().Name] = status
statusesLock.Unlock()
}
fh.Parallelizer().Until(ctx, len(potentialNodes), checkNode)
fh.Parallelizer().Until(ctx, len(potentialNodes), checkNode, ev.PluginName)
return append(nonViolatingCandidates.get(), violatingCandidates.get()...), nodeStatuses, utilerrors.NewAggregate(errs)
}
6 changes: 3 additions & 3 deletions pkg/scheduler/framework/runtime/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -925,7 +925,7 @@ func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.Cy
Score: s,
}
}
})
}, score)
if err := errCh.ReceiveError(); err != nil {
return nil, framework.AsStatus(fmt.Errorf("running Score plugins: %w", err))
}
Expand All @@ -943,7 +943,7 @@ func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.Cy
errCh.SendErrorWithCancel(err, cancel)
return
}
})
}, score)
if err := errCh.ReceiveError(); err != nil {
return nil, framework.AsStatus(fmt.Errorf("running Normalize on Score plugins: %w", err))
}
Expand All @@ -964,7 +964,7 @@ func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.Cy
}
nodeScoreList[i].Score = nodeScore.Score * int64(weight)
}
})
}, score)
if err := errCh.ReceiveError(); err != nil {
return nil, framework.AsStatus(fmt.Errorf("applying score defaultWeights on Score plugins: %w", err))
}
Expand Down
16 changes: 13 additions & 3 deletions pkg/scheduler/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,23 @@ var (
Help: "Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulablePods.",
StabilityLevel: metrics.STABLE,
}, []string{"queue"})
// SchedulerGoroutines isn't called in some parts where goroutines start.
// Goroutines metric replaces SchedulerGoroutines metric. Goroutine metric tracks all goroutines.
SchedulerGoroutines = metrics.NewGaugeVec(
&metrics.GaugeOpts{
Subsystem: SchedulerSubsystem,
DeprecatedVersion: "1.26.0",
Name: "scheduler_goroutines",
Help: "Number of running goroutines split by the work they do such as binding. This metric is replaced by the \"goroutines\" metric.",
StabilityLevel: metrics.ALPHA,
}, []string{"work"})
Goroutines = metrics.NewGaugeVec(
&metrics.GaugeOpts{
Subsystem: SchedulerSubsystem,
Name: "scheduler_goroutines",
Name: "goroutines",
Help: "Number of running goroutines split by the work they do such as binding.",
StabilityLevel: metrics.ALPHA,
}, []string{"work"})

}, []string{"operation"})
PodSchedulingDuration = metrics.NewHistogramVec(
&metrics.HistogramOpts{
Subsystem: SchedulerSubsystem,
Expand Down Expand Up @@ -194,6 +203,7 @@ var (
PluginExecutionDuration,
SchedulerQueueIncomingPods,
SchedulerGoroutines,
Goroutines,
PermitWaitDuration,
CacheSize,
unschedulableReasons,
Expand Down
6 changes: 5 additions & 1 deletion pkg/scheduler/schedule_one.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {

metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Inc()
defer metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Dec()
metrics.Goroutines.WithLabelValues(metrics.Binding).Inc()
defer metrics.Goroutines.WithLabelValues(metrics.Binding).Dec()

sched.bindingCycle(bindingCycleCtx, state, fwk, scheduleResult, assumedPodInfo, podsToActivate, start)
}()
Expand Down Expand Up @@ -563,7 +565,7 @@ func (sched *Scheduler) findNodesThatPassFilters(

// Stops searching for more nodes once the configured number of feasible nodes
// are found.
fwk.Parallelizer().Until(ctx, numAllNodes, checkNode)
fwk.Parallelizer().Until(ctx, numAllNodes, checkNode, frameworkruntime.Filter)
feasibleNodes = feasibleNodes[:feasibleNodesLen]
if err := errCh.ReceiveError(); err != nil {
statusCode = framework.Error
Expand Down Expand Up @@ -718,8 +720,10 @@ func prioritizeNodes(
wg.Add(1)
go func(extIndex int) {
metrics.SchedulerGoroutines.WithLabelValues(metrics.PrioritizingExtender).Inc()
metrics.Goroutines.WithLabelValues(metrics.PrioritizingExtender).Inc()
defer func() {
metrics.SchedulerGoroutines.WithLabelValues(metrics.PrioritizingExtender).Dec()
metrics.Goroutines.WithLabelValues(metrics.PrioritizingExtender).Dec()
wg.Done()
}()
prioritizedList, weight, err := extenders[extIndex].Prioritize(pod, nodes)
Expand Down

0 comments on commit 3ac752e

Please sign in to comment.