Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add(scheduler): implement "plugin_execution_duration_seconds" metric in PreEnqueue #116201

Merged
merged 1 commit into from Mar 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 15 additions & 7 deletions pkg/scheduler/framework/runtime/framework.go
Expand Up @@ -230,6 +230,21 @@ func WithCaptureProfile(c CaptureProfile) Option {
}
}

// WithClusterEventMap sets clusterEventMap for the scheduling frameworkImpl.
func WithClusterEventMap(m map[framework.ClusterEvent]sets.String) Option {
return func(o *frameworkOptions) {
o.clusterEventMap = m
}
}

// WithMetricsRecorder sets metrics recorder for the scheduling frameworkImpl.
func WithMetricsRecorder(r *metrics.MetricAsyncRecorder) Option {
return func(o *frameworkOptions) {
o.metricsRecorder = r
}
}

// defaultFrameworkOptions are applied when no option corresponding to those fields exist.
func defaultFrameworkOptions(stopCh <-chan struct{}) frameworkOptions {
return frameworkOptions{
metricsRecorder: metrics.NewMetricsAsyncRecorder(1000, time.Second, stopCh),
Expand All @@ -238,13 +253,6 @@ func defaultFrameworkOptions(stopCh <-chan struct{}) frameworkOptions {
}
}

// WithClusterEventMap sets clusterEventMap for the scheduling frameworkImpl.
func WithClusterEventMap(m map[framework.ClusterEvent]sets.String) Option {
return func(o *frameworkOptions) {
o.clusterEventMap = m
}
}

var _ framework.Framework = &frameworkImpl{}

// NewFramework initializes plugins given the configuration and the registry.
Expand Down
36 changes: 35 additions & 1 deletion pkg/scheduler/internal/queue/scheduling_queue.go
Expand Up @@ -29,6 +29,7 @@ package queue
import (
"context"
"fmt"
"math/rand"
"reflect"
"sync"
"time"
Expand Down Expand Up @@ -182,6 +183,10 @@ type PriorityQueue struct {
closed bool

nsLister listersv1.NamespaceLister

metricsRecorder metrics.MetricAsyncRecorder
// pluginMetricsSamplePercent is the percentage of plugin metrics to be sampled.
pluginMetricsSamplePercent int
}

type priorityQueueOptions struct {
Expand All @@ -190,6 +195,8 @@ type priorityQueueOptions struct {
podMaxBackoffDuration time.Duration
podMaxInUnschedulablePodsDuration time.Duration
podLister listersv1.PodLister
metricsRecorder metrics.MetricAsyncRecorder
pluginMetricsSamplePercent int
clusterEventMap map[framework.ClusterEvent]sets.String
preEnqueuePluginMap map[string][]framework.PreEnqueuePlugin
}
Expand Down Expand Up @@ -246,6 +253,20 @@ func WithPreEnqueuePluginMap(m map[string][]framework.PreEnqueuePlugin) Option {
}
}

// WithMetricsRecorder sets metrics recorder.
func WithMetricsRecorder(recorder metrics.MetricAsyncRecorder) Option {
return func(o *priorityQueueOptions) {
o.metricsRecorder = recorder
}
}

// WithPluginMetricsSamplePercent sets the percentage of plugin metrics to be sampled.
func WithPluginMetricsSamplePercent(percent int) Option {
return func(o *priorityQueueOptions) {
o.pluginMetricsSamplePercent = percent
}
}

var defaultPriorityQueueOptions = priorityQueueOptions{
clock: clock.RealClock{},
podInitialBackoffDuration: DefaultPodInitialBackoffDuration,
Expand Down Expand Up @@ -298,6 +319,8 @@ func NewPriorityQueue(
moveRequestCycle: -1,
clusterEventMap: options.clusterEventMap,
preEnqueuePluginMap: options.preEnqueuePluginMap,
metricsRecorder: options.metricsRecorder,
pluginMetricsSamplePercent: options.pluginMetricsSamplePercent,
}
pq.cond.L = &pq.lock
pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())
Expand Down Expand Up @@ -325,8 +348,9 @@ func (p *PriorityQueue) runPreEnqueuePlugins(ctx context.Context, pInfo *framewo
metrics.FrameworkExtensionPointDuration.WithLabelValues(preEnqueue, s.Code().String(), pod.Spec.SchedulerName).Observe(metrics.SinceInSeconds(startTime))
}()

shouldRecordMetric := rand.Intn(100) < p.pluginMetricsSamplePercent
for _, pl := range p.preEnqueuePluginMap[pod.Spec.SchedulerName] {
s = pl.PreEnqueue(ctx, pod)
s = p.runPreEnqueuePlugin(ctx, pl, pod, shouldRecordMetric)
if s.IsSuccess() {
continue
}
Expand All @@ -342,6 +366,16 @@ func (p *PriorityQueue) runPreEnqueuePlugins(ctx context.Context, pInfo *framewo
return true
}

func (p *PriorityQueue) runPreEnqueuePlugin(ctx context.Context, pl framework.PreEnqueuePlugin, pod *v1.Pod, shouldRecordMetric bool) *framework.Status {
if !shouldRecordMetric {
return pl.PreEnqueue(ctx, pod)
}
startTime := p.clock.Now()
s := pl.PreEnqueue(ctx, pod)
p.metricsRecorder.ObservePluginDurationAsync(preEnqueue, pl.Name(), s.Code().String(), p.clock.Since(startTime).Seconds())
return s
}

// addToActiveQ tries to add pod to active queue. It returns 2 parameters:
// 1. a boolean flag to indicate whether the pod is added successfully.
// 2. an error for the caller to act on.
Expand Down
70 changes: 64 additions & 6 deletions pkg/scheduler/internal/queue/scheduling_queue_test.go
Expand Up @@ -1599,11 +1599,12 @@ func TestPendingPodsMetric(t *testing.T) {
pInfosWithDelay := makeQueuedPodInfos(totalWithDelay, "z", queueable, timestamp.Add(2*time.Second))

tests := []struct {
name string
operations []operation
operands [][]*framework.QueuedPodInfo
metricsName string
wants string
name string
operations []operation
operands [][]*framework.QueuedPodInfo
metricsName string
pluginMetricsSamplePercent int
wants string
}{
{
name: "add pods to activeQ and unschedulablePods",
Expand Down Expand Up @@ -1765,13 +1766,67 @@ scheduler_pending_pods{queue="gated"} 5
scheduler_pending_pods{queue="unschedulable"} 20
`,
},
{
name: "the metrics should not be recorded (pluginMetricsSamplePercent=0)",
operations: []operation{
add,
},
operands: [][]*framework.QueuedPodInfo{
pInfos[:1],
},
metricsName: "scheduler_plugin_execution_duration_seconds",
pluginMetricsSamplePercent: 0,
wants: `
# HELP scheduler_plugin_execution_duration_seconds [ALPHA] Duration for running a plugin at a specific extension point.
# TYPE scheduler_plugin_execution_duration_seconds histogram
`, // the observed value will always be 0, because we don't proceed the fake clock.
},
{
name: "the metrics should be recorded (pluginMetricsSamplePercent=100)",
operations: []operation{
add,
},
operands: [][]*framework.QueuedPodInfo{
pInfos[:1],
},
metricsName: "scheduler_plugin_execution_duration_seconds",
pluginMetricsSamplePercent: 100,
wants: `
# HELP scheduler_plugin_execution_duration_seconds [ALPHA] Duration for running a plugin at a specific extension point.
# TYPE scheduler_plugin_execution_duration_seconds histogram
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="1e-05"} 1
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="1.5000000000000002e-05"} 1
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="2.2500000000000005e-05"} 1
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="3.375000000000001e-05"} 1
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="5.062500000000001e-05"} 1
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="7.593750000000002e-05"} 1
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.00011390625000000003"} 1
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.00017085937500000006"} 1
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.0002562890625000001"} 1
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.00038443359375000017"} 1
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.0005766503906250003"} 1
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.0008649755859375004"} 1
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.0012974633789062506"} 1
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.0019461950683593758"} 1
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.0029192926025390638"} 1
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.004378938903808595"} 1
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.006568408355712893"} 1
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.009852612533569338"} 1
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.014778918800354007"} 1
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.02216837820053101"} 1
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="+Inf"} 1
scheduler_plugin_execution_duration_seconds_sum{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success"} 0
scheduler_plugin_execution_duration_seconds_count{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success"} 1
`, // the observed value will always be 0, because we don't proceed the fake clock.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't cause any flaky on this test because the observed value will be always 0 as commented.

},
}

resetMetrics := func() {
metrics.ActivePods().Set(0)
metrics.BackoffPods().Set(0)
metrics.UnschedulablePods().Set(0)
metrics.GatedPods().Set(0)
metrics.PluginExecutionDuration.Reset()
}

for _, test := range tests {
Expand All @@ -1781,13 +1836,16 @@ scheduler_pending_pods{queue="unschedulable"} 20
defer cancel()

m := map[string][]framework.PreEnqueuePlugin{"": {&preEnqueuePlugin{allowlists: []string{queueable}}}}
queue := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(testingclock.NewFakeClock(timestamp)), WithPreEnqueuePluginMap(m))
recorder := metrics.NewMetricsAsyncRecorder(3, 20*time.Microsecond, ctx.Done())
queue := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(testingclock.NewFakeClock(timestamp)), WithPreEnqueuePluginMap(m), WithPluginMetricsSamplePercent(test.pluginMetricsSamplePercent), WithMetricsRecorder(*recorder))
for i, op := range test.operations {
for _, pInfo := range test.operands[i] {
op(queue, pInfo)
}
}

recorder.FlushMetrics()

if err := testutil.GatherAndCompare(metrics.GetGather(), strings.NewReader(test.wants), test.metricsName); err != nil {
t.Fatal(err)
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/scheduler/scheduler.go
Expand Up @@ -283,6 +283,7 @@ func New(client clientset.Interface,

snapshot := internalcache.NewEmptySnapshot()
clusterEventMap := make(map[framework.ClusterEvent]sets.String)
metricsRecorder := metrics.NewMetricsAsyncRecorder(1000, time.Second, stopCh)

profiles, err := profile.NewMap(options.profiles, registry, recorderFactory, stopCh,
frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion),
Expand All @@ -292,8 +293,10 @@ func New(client clientset.Interface,
frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)),
frameworkruntime.WithClusterEventMap(clusterEventMap),
frameworkruntime.WithClusterEventMap(clusterEventMap),
frameworkruntime.WithParallelism(int(options.parallelism)),
frameworkruntime.WithExtenders(extenders),
frameworkruntime.WithMetricsRecorder(metricsRecorder),
)
if err != nil {
return nil, fmt.Errorf("initializing profiles: %v", err)
Expand All @@ -316,6 +319,8 @@ func New(client clientset.Interface,
internalqueue.WithClusterEventMap(clusterEventMap),
internalqueue.WithPodMaxInUnschedulablePodsDuration(options.podMaxInUnschedulablePodsDuration),
internalqueue.WithPreEnqueuePluginMap(preEnqueuePluginMap),
internalqueue.WithPluginMetricsSamplePercent(pluginMetricsSamplePercent),
internalqueue.WithMetricsRecorder(*metricsRecorder),
)

for _, fwk := range profiles {
Expand Down