Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(scheduler): split scheduleOne into two functions for schedulingCycle and bindingCycle #111775

Merged
merged 1 commit into from
Aug 24, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
198 changes: 109 additions & 89 deletions pkg/scheduler/schedule_one.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ const (
minFeasibleNodesPercentageToFind = 5
)

var clearNominatedNode = &framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: ""}

// scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting.
func (sched *Scheduler) scheduleOne(ctx context.Context) {
podInfo := sched.NextPod()
Expand All @@ -88,13 +86,36 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
start := time.Now()
state := framework.NewCycleState()
state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent)

// Initialize an empty podsToActivate struct, which will be filled up by plugins or stay empty.
podsToActivate := framework.NewPodsToActivate()
state.Write(framework.PodsToActivateKey, podsToActivate)

schedulingCycleCtx, cancel := context.WithCancel(ctx)
defer cancel()
scheduleResult, err := sched.SchedulePod(schedulingCycleCtx, fwk, state, pod)
scheduleResult, assumedPodInfo := sched.schedulingCycle(schedulingCycleCtx, state, fwk, podInfo, podsToActivate, start)
if scheduleResult.FeasibleNodes == 0 {
return
}

// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
go func() {
bindingCycleCtx, cancel := context.WithCancel(ctx)
defer cancel()

metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Inc()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes me think that we are not very diligent incrementing these counters throughout the codebase for everytime we create routines. Could you follow up on that? Maybe we should have a helper method to start routines.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.
By the way, we often create goroutines with parallelizer.
Should we inc/dec the SchedulerGoroutines metric in parallelizer as well?
Currently, we only count SchedulerGoroutines here and the following place.

metrics.SchedulerGoroutines.WithLabelValues(metrics.PrioritizingExtender).Inc()
defer func() {
metrics.SchedulerGoroutines.WithLabelValues(metrics.PrioritizingExtender).Dec()
wg.Done()
}()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we should

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I'll follow up on that as well in another PR.

defer metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Dec()

sched.bindingCycle(bindingCycleCtx, state, fwk, scheduleResult, assumedPodInfo, podsToActivate, start)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could also return an error and then call the FailureHandler here.

}()
}

var clearNominatedNode = &framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: ""}

// schedulingCycle tries to schedule a single Pod.
func (sched *Scheduler) schedulingCycle(ctx context.Context, state *framework.CycleState, fwk framework.Framework, podInfo *framework.QueuedPodInfo, podsToActivate *framework.PodsToActivate, start time.Time) (ScheduleResult, *framework.QueuedPodInfo) {
pod := podInfo.Pod
scheduleResult, err := sched.SchedulePod(ctx, fwk, state, pod)
if err != nil {
// SchedulePod() may have failed because the pod would not fit on any host, so we try to
// preempt, with the expectation that the next time the pod is tried for scheduling it
Expand Down Expand Up @@ -131,7 +152,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
}
sched.FailureHandler(ctx, fwk, podInfo, err, v1.PodReasonUnschedulable, nominatingInfo)
return
return ScheduleResult{}, nil
}
metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))
// Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet.
Expand All @@ -148,23 +169,23 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
// to a node and if so will not add it back to the unscheduled pods queue
// (otherwise this would cause an infinite loop).
sched.FailureHandler(ctx, fwk, assumedPodInfo, err, SchedulerError, clearNominatedNode)
return
return ScheduleResult{}, nil
}

// Run the Reserve method of reserve plugins.
if sts := fwk.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
if sts := fwk.RunReservePluginsReserve(ctx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
// trigger un-reserve to clean up state associated with the reserved Pod
fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
fwk.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost)
if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")
}
sched.FailureHandler(ctx, fwk, assumedPodInfo, sts.AsError(), SchedulerError, clearNominatedNode)
return
return ScheduleResult{}, nil
}

// Run "permit" plugins.
runPermitStatus := fwk.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
runPermitStatus := fwk.RunPermitPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
if !runPermitStatus.IsWait() && !runPermitStatus.IsSuccess() {
var reason string
if runPermitStatus.IsUnschedulable() {
Expand All @@ -175,12 +196,12 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
reason = SchedulerError
}
// One of the plugins returned status different than success or wait.
fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
fwk.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost)
if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")
}
sched.FailureHandler(ctx, fwk, assumedPodInfo, runPermitStatus.AsError(), reason, clearNominatedNode)
return
return ScheduleResult{}, nil
}

// At the end of a successful scheduling cycle, pop and move up Pods if needed.
Expand All @@ -190,92 +211,91 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
podsToActivate.Map = make(map[string]*v1.Pod)
}

// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
go func() {
bindingCycleCtx, cancel := context.WithCancel(ctx)
defer cancel()
metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Inc()
defer metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Dec()
return scheduleResult, assumedPodInfo
}

waitOnPermitStatus := fwk.WaitOnPermit(bindingCycleCtx, assumedPod)
if !waitOnPermitStatus.IsSuccess() {
var reason string
if waitOnPermitStatus.IsUnschedulable() {
metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
reason = v1.PodReasonUnschedulable
} else {
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
reason = SchedulerError
}
// trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed")
} else {
// "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
// as the assumed Pod had occupied a certain amount of resources in scheduler cache.
// TODO(#103853): de-duplicate the logic.
// Avoid moving the assumed Pod itself as it's always Unschedulable.
// It's intentional to "defer" this operation; otherwise MoveAllToActiveOrBackoffQueue() would
// update `q.moveRequest` and thus move the assumed pod to backoffQ anyways.
defer sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, func(pod *v1.Pod) bool {
return assumedPod.UID != pod.UID
})
}
sched.FailureHandler(ctx, fwk, assumedPodInfo, waitOnPermitStatus.AsError(), reason, clearNominatedNode)
return
}
// bindingCycle tries to bind an assumed Pod.
func (sched *Scheduler) bindingCycle(ctx context.Context, state *framework.CycleState, fwk framework.Framework, scheduleResult ScheduleResult, assumedPodInfo *framework.QueuedPodInfo, podsToActivate *framework.PodsToActivate, start time.Time) {
assumedPod := assumedPodInfo.Pod

// Run "prebind" plugins.
preBindStatus := fwk.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if !preBindStatus.IsSuccess() {
waitOnPermitStatus := fwk.WaitOnPermit(ctx, assumedPod)
if !waitOnPermitStatus.IsSuccess() {
var reason string
if waitOnPermitStatus.IsUnschedulable() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The status judgement can also be placed after bindingCycle, and we may reduce the amount of return values.
But I didn't try. The pattern looks like below:

	if !abcStatus.IsSuccess() {
		var reason string
		if abcStatus.IsUnschedulable() {
			metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(sched.startTime))
			reason = v1.PodReasonUnschedulable
		} else {
			metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(sched.startTime))
			reason = SchedulerError
		}
	}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's leave this for a follow up.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
reason = v1.PodReasonUnschedulable
} else {
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
// trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed")
} else {
// "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
// as the assumed Pod had occupied a certain amount of resources in scheduler cache.
// TODO(#103853): de-duplicate the logic.
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil)
}
sched.FailureHandler(ctx, fwk, assumedPodInfo, preBindStatus.AsError(), SchedulerError, clearNominatedNode)
return
reason = SchedulerError
}

err := sched.bind(bindingCycleCtx, fwk, assumedPod, scheduleResult.SuggestedHost, state)
if err != nil {
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
// trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if err := sched.Cache.ForgetPod(assumedPod); err != nil {
klog.ErrorS(err, "scheduler cache ForgetPod failed")
} else {
// "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
// as the assumed Pod had occupied a certain amount of resources in scheduler cache.
// TODO(#103853): de-duplicate the logic.
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil)
}
sched.FailureHandler(ctx, fwk, assumedPodInfo, fmt.Errorf("binding rejected: %w", err), SchedulerError, clearNominatedNode)
return
// trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also put this after bindingCycle.

if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed")
} else {
// "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
// as the assumed Pod had occupied a certain amount of resources in scheduler cache.
// TODO(#103853): de-duplicate the logic.
// Avoid moving the assumed Pod itself as it's always Unschedulable.
// It's intentional to "defer" this operation; otherwise MoveAllToActiveOrBackoffQueue() would
// update `q.moveRequest` and thus move the assumed pod to backoffQ anyways.
defer sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, func(pod *v1.Pod) bool {
return assumedPod.UID != pod.UID
})
}
// Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2.
klog.V(2).InfoS("Successfully bound pod to node", "pod", klog.KObj(pod), "node", scheduleResult.SuggestedHost, "evaluatedNodes", scheduleResult.EvaluatedNodes, "feasibleNodes", scheduleResult.FeasibleNodes)
metrics.PodScheduled(fwk.ProfileName(), metrics.SinceInSeconds(start))
metrics.PodSchedulingAttempts.Observe(float64(podInfo.Attempts))
metrics.PodSchedulingDuration.WithLabelValues(getAttemptsLabel(podInfo)).Observe(metrics.SinceInSeconds(podInfo.InitialAttemptTimestamp))
sched.FailureHandler(ctx, fwk, assumedPodInfo, waitOnPermitStatus.AsError(), reason, clearNominatedNode)
return
}

// Run "postbind" plugins.
fwk.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
// Run "prebind" plugins.
preBindStatus := fwk.RunPreBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
if !preBindStatus.IsSuccess() {
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
// trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost)
if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed")
} else {
// "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
// as the assumed Pod had occupied a certain amount of resources in scheduler cache.
// TODO(#103853): de-duplicate the logic.
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil)
}
sched.FailureHandler(ctx, fwk, assumedPodInfo, preBindStatus.AsError(), SchedulerError, clearNominatedNode)
return
}

// At the end of a successful binding cycle, move up Pods if needed.
if len(podsToActivate.Map) != 0 {
sched.SchedulingQueue.Activate(podsToActivate.Map)
// Unlike the logic in scheduling cycle, we don't bother deleting the entries
// as `podsToActivate.Map` is no longer consumed.
err := sched.bind(ctx, fwk, assumedPod, scheduleResult.SuggestedHost, state)
if err != nil {
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
// trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost)
if err := sched.Cache.ForgetPod(assumedPod); err != nil {
klog.ErrorS(err, "scheduler cache ForgetPod failed")
} else {
// "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
// as the assumed Pod had occupied a certain amount of resources in scheduler cache.
// TODO(#103853): de-duplicate the logic.
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil)
}
}()
sched.FailureHandler(ctx, fwk, assumedPodInfo, fmt.Errorf("binding rejected: %w", err), SchedulerError, clearNominatedNode)
return
}
// Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2.
klog.V(2).InfoS("Successfully bound pod to node", "pod", klog.KObj(assumedPod), "node", scheduleResult.SuggestedHost, "evaluatedNodes", scheduleResult.EvaluatedNodes, "feasibleNodes", scheduleResult.FeasibleNodes)
metrics.PodScheduled(fwk.ProfileName(), metrics.SinceInSeconds(start))
metrics.PodSchedulingAttempts.Observe(float64(assumedPodInfo.Attempts))
metrics.PodSchedulingDuration.WithLabelValues(getAttemptsLabel(assumedPodInfo)).Observe(metrics.SinceInSeconds(assumedPodInfo.InitialAttemptTimestamp))

// Run "postbind" plugins.
fwk.RunPostBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)

// At the end of a successful binding cycle, move up Pods if needed.
if len(podsToActivate.Map) != 0 {
sched.SchedulingQueue.Activate(podsToActivate.Map)
// Unlike the logic in schedulingCycle(), we don't bother deleting the entries
// as `podsToActivate.Map` is no longer consumed.
}
}

func (sched *Scheduler) frameworkForPod(pod *v1.Pod) (framework.Framework, error) {
Expand Down