Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(scheduler): use context in scheduler package #82072

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
47 changes: 16 additions & 31 deletions cmd/kube-scheduler/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ func runCommand(cmd *cobra.Command, args []string, opts *options.Options, regist
os.Exit(1)
}

stopCh := make(chan struct{})
// Get the completed config
cc := c.Complete()

Expand All @@ -151,11 +150,14 @@ func runCommand(cmd *cobra.Command, args []string, opts *options.Options, regist
return fmt.Errorf("unable to register configz: %s", err)
}

return Run(cc, stopCh, registryOptions...)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

return Run(ctx, cc, registryOptions...)
}

// Run executes the scheduler based on the given configuration. It only return on error or when stopCh is closed.
func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}, registryOptions ...Option) error {
// Run executes the scheduler based on the given configuration. It only returns on error or when context is done.
func Run(ctx context.Context, cc schedulerserverconfig.CompletedConfig, registryOptions ...Option) error {
// To help debugging, immediately log version
klog.V(1).Infof("Starting Kubernetes Scheduler version %+v", version.Get())

Expand All @@ -172,7 +174,7 @@ func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}, regis
cc.PodInformer,
cc.Recorder,
cc.ComponentConfig.AlgorithmSource,
stopCh,
ctx.Done(),
scheduler.WithName(cc.ComponentConfig.SchedulerName),
scheduler.WithHardPodAffinitySymmetricWeight(cc.ComponentConfig.HardPodAffinitySymmetricWeight),
scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption),
Expand All @@ -190,7 +192,7 @@ func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}, regis

// Prepare the event broadcaster.
if cc.Broadcaster != nil && cc.EventClient != nil {
cc.Broadcaster.StartRecordingToSink(stopCh)
cc.Broadcaster.StartRecordingToSink(ctx.Done())
}
if cc.LeaderElectionBroadcaster != nil && cc.CoreEventClient != nil {
cc.LeaderElectionBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: cc.CoreEventClient.Events("")})
Expand All @@ -205,53 +207,36 @@ func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}, regis
if cc.InsecureServing != nil {
separateMetrics := cc.InsecureMetricsServing != nil
handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, separateMetrics, checks...), nil, nil)
if err := cc.InsecureServing.Serve(handler, 0, stopCh); err != nil {
if err := cc.InsecureServing.Serve(handler, 0, ctx.Done()); err != nil {
return fmt.Errorf("failed to start healthz server: %v", err)
}
}
if cc.InsecureMetricsServing != nil {
handler := buildHandlerChain(newMetricsHandler(&cc.ComponentConfig), nil, nil)
if err := cc.InsecureMetricsServing.Serve(handler, 0, stopCh); err != nil {
if err := cc.InsecureMetricsServing.Serve(handler, 0, ctx.Done()); err != nil {
return fmt.Errorf("failed to start metrics server: %v", err)
}
}
if cc.SecureServing != nil {
handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, false, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer)
// TODO: handle stoppedCh returned by c.SecureServing.Serve
if _, err := cc.SecureServing.Serve(handler, 0, stopCh); err != nil {
if _, err := cc.SecureServing.Serve(handler, 0, ctx.Done()); err != nil {
// fail early for secure handlers, removing the old error loop from above
return fmt.Errorf("failed to start secure server: %v", err)
}
}

// Start all informers.
go cc.PodInformer.Informer().Run(stopCh)
cc.InformerFactory.Start(stopCh)
go cc.PodInformer.Informer().Run(ctx.Done())
cc.InformerFactory.Start(ctx.Done())

// Wait for all caches to sync before scheduling.
cc.InformerFactory.WaitForCacheSync(stopCh)

// Prepare a reusable runCommand function.
run := func(ctx context.Context) {
sched.Run()
<-ctx.Done()
}

ctx, cancel := context.WithCancel(context.TODO()) // TODO once Run() accepts a context, it should be used here
defer cancel()

go func() {
select {
case <-stopCh:
cancel()
case <-ctx.Done():
}
}()
cc.InformerFactory.WaitForCacheSync(ctx.Done())

// If leader election is enabled, runCommand via LeaderElector until done and exit.
if cc.LeaderElection != nil {
cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStartedLeading: sched.Run,
OnStoppedLeading: func() {
klog.Fatalf("leaderelection lost")
},
Expand All @@ -267,7 +252,7 @@ func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}, regis
}

// Leader election is disabled, so runCommand inline until done.
run(ctx)
sched.Run(ctx)
return fmt.Errorf("finished without leader elect")
}

Expand Down
11 changes: 6 additions & 5 deletions cmd/kube-scheduler/app/testing/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package testing

import (
"context"
"fmt"
"io/ioutil"
"net"
Expand Down Expand Up @@ -62,9 +63,9 @@ type Logger interface {
// files that because Golang testing's call to os.Exit will not give a stop channel go routine
// enough time to remove temporary files.
func StartTestServer(t Logger, customFlags []string) (result TestServer, err error) {
stopCh := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
tearDown := func() {
close(stopCh)
cancel()
if len(result.TmpDir) != 0 {
os.RemoveAll(result.TmpDir)
}
Expand Down Expand Up @@ -119,11 +120,11 @@ func StartTestServer(t Logger, customFlags []string) (result TestServer, err err
}

errCh := make(chan error)
go func(stopCh <-chan struct{}) {
if err := app.Run(config.Complete(), stopCh); err != nil {
go func(ctx context.Context) {
if err := app.Run(ctx, config.Complete()); err != nil {
errCh <- err
}
}(stopCh)
}(ctx)
draveness marked this conversation as resolved.
Show resolved Hide resolved

t.Logf("Waiting for /healthz to be ok...")
client, err := kubernetes.NewForConfig(config.LoopbackClientConfig)
Expand Down
3 changes: 2 additions & 1 deletion pkg/scheduler/core/extender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package core

import (
"context"
"fmt"
"reflect"
"sort"
Expand Down Expand Up @@ -558,7 +559,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
schedulerapi.DefaultPercentageOfNodesToScore,
false)
podIgnored := &v1.Pod{}
result, err := scheduler.Schedule(framework.NewCycleState(), podIgnored)
result, err := scheduler.Schedule(context.Background(), framework.NewCycleState(), podIgnored)
if test.expectsErr {
if err == nil {
t.Errorf("Unexpected non-error, result %+v", result)
Expand Down
47 changes: 26 additions & 21 deletions pkg/scheduler/core/generic_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,12 @@ func (f *FitError) Error() string {
// onto machines.
// TODO: Rename this type.
type ScheduleAlgorithm interface {
Schedule(*framework.CycleState, *v1.Pod) (scheduleResult ScheduleResult, err error)
Schedule(context.Context, *framework.CycleState, *v1.Pod) (scheduleResult ScheduleResult, err error)
// Preempt receives scheduling errors for a pod and tries to create room for
// the pod by preempting lower priority pods if possible.
// It returns the node where preemption happened, a list of preempted pods, a
// list of pods whose nominated node name should be removed, and error if any.
Preempt(*framework.CycleState, *v1.Pod, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error)
Preempt(context.Context, *framework.CycleState, *v1.Pod, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error)
// Predicates() returns a pointer to a map of predicate functions. This is
// exposed for testing.
Predicates() map[string]predicates.FitPredicate
Expand Down Expand Up @@ -171,7 +171,7 @@ func (g *genericScheduler) snapshot() error {
// Schedule tries to schedule the given pod to one of the nodes in the node list.
// If it succeeds, it will return the name of the node.
// If it fails, it will return a FitError error with reasons.
func (g *genericScheduler) Schedule(state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
func (g *genericScheduler) Schedule(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name})
defer trace.LogIfLong(100 * time.Millisecond)

Expand All @@ -181,7 +181,7 @@ func (g *genericScheduler) Schedule(state *framework.CycleState, pod *v1.Pod) (r
trace.Step("Basic checks done")

// Run "prefilter" plugins.
preFilterStatus := g.framework.RunPreFilterPlugins(state, pod)
preFilterStatus := g.framework.RunPreFilterPlugins(ctx, state, pod)
if !preFilterStatus.IsSuccess() {
return result, preFilterStatus.AsError()
}
Expand All @@ -198,14 +198,14 @@ func (g *genericScheduler) Schedule(state *framework.CycleState, pod *v1.Pod) (r
trace.Step("Snapshoting scheduler cache and node infos done")

startPredicateEvalTime := time.Now()
filteredNodes, failedPredicateMap, filteredNodesStatuses, err := g.findNodesThatFit(state, pod)
filteredNodes, failedPredicateMap, filteredNodesStatuses, err := g.findNodesThatFit(ctx, state, pod)
if err != nil {
return result, err
}
trace.Step("Computing predicates done")

// Run "postfilter" plugins.
postfilterStatus := g.framework.RunPostFilterPlugins(state, pod, filteredNodes, filteredNodesStatuses)
postfilterStatus := g.framework.RunPostFilterPlugins(ctx, state, pod, filteredNodes, filteredNodesStatuses)
if !postfilterStatus.IsSuccess() {
return result, postfilterStatus.AsError()
}
Expand Down Expand Up @@ -237,7 +237,7 @@ func (g *genericScheduler) Schedule(state *framework.CycleState, pod *v1.Pod) (r
}

metaPrioritiesInterface := g.priorityMetaProducer(pod, g.nodeInfoSnapshot.NodeInfoMap)
priorityList, err := PrioritizeNodes(pod, g.nodeInfoSnapshot.NodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders, g.framework, state)
priorityList, err := PrioritizeNodes(ctx, pod, g.nodeInfoSnapshot.NodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders, g.framework, state)
if err != nil {
return result, err
}
Expand Down Expand Up @@ -310,7 +310,7 @@ func (g *genericScheduler) selectHost(nodeScoreList framework.NodeScoreList) (st
// other pods with the same priority. The nominated pod prevents other pods from
// using the nominated resources and the nominated pod could take a long time
// before it is retried after many other pending pods.
func (g *genericScheduler) Preempt(state *framework.CycleState, pod *v1.Pod, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
func (g *genericScheduler) Preempt(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
// Scheduler may return various types of errors. Consider preemption only if
// the error is of type FitError.
fitError, ok := scheduleErr.(*FitError)
Expand All @@ -334,7 +334,7 @@ func (g *genericScheduler) Preempt(state *framework.CycleState, pod *v1.Pod, sch
if err != nil {
return nil, nil, nil, err
}
nodeToVictims, err := g.selectNodesForPreemption(state, pod, g.nodeInfoSnapshot.NodeInfoMap, potentialNodes, g.predicates,
nodeToVictims, err := g.selectNodesForPreemption(ctx, state, pod, g.nodeInfoSnapshot.NodeInfoMap, potentialNodes, g.predicates,
g.predicateMetaProducer, g.schedulingQueue, pdbs)
if err != nil {
return nil, nil, nil, err
Expand Down Expand Up @@ -453,7 +453,7 @@ func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes i

// Filters the nodes to find the ones that fit based on the given predicate functions
// Each node is passed through the predicate functions to determine if it is a fit
func (g *genericScheduler) findNodesThatFit(state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, FailedPredicateMap, framework.NodeToStatusMap, error) {
func (g *genericScheduler) findNodesThatFit(ctx context.Context, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, FailedPredicateMap, framework.NodeToStatusMap, error) {
var filtered []*v1.Node
failedPredicateMap := FailedPredicateMap{}
filteredNodesStatuses := framework.NodeToStatusMap{}
Expand All @@ -473,7 +473,7 @@ func (g *genericScheduler) findNodesThatFit(state *framework.CycleState, pod *v1
filteredLen int32
)

ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(ctx)

// We can use the same metadata producer for all nodes.
meta := g.predicateMetaProducer(pod, g.nodeInfoSnapshot.NodeInfoMap)
Expand All @@ -483,6 +483,7 @@ func (g *genericScheduler) findNodesThatFit(state *framework.CycleState, pod *v1
nodeName := g.cache.NodeTree().Next()

fits, failedPredicates, status, err := g.podFitsOnNode(
ctx,
state,
pod,
meta,
Expand Down Expand Up @@ -561,7 +562,7 @@ func (g *genericScheduler) findNodesThatFit(state *framework.CycleState, pod *v1
// addNominatedPods adds pods with equal or greater priority which are nominated
// to run on the node given in nodeInfo to meta and nodeInfo. It returns 1) whether
// any pod was added, 2) augmented metadata, 3) augmented CycleState 4) augmented nodeInfo.
func (g *genericScheduler) addNominatedPods(pod *v1.Pod, meta predicates.PredicateMetadata, state *framework.CycleState,
func (g *genericScheduler) addNominatedPods(ctx context.Context, pod *v1.Pod, meta predicates.PredicateMetadata, state *framework.CycleState,
nodeInfo *schedulernodeinfo.NodeInfo, queue internalqueue.SchedulingQueue) (bool, predicates.PredicateMetadata,
*framework.CycleState, *schedulernodeinfo.NodeInfo, error) {
if queue == nil || nodeInfo == nil || nodeInfo.Node() == nil {
Expand All @@ -588,7 +589,7 @@ func (g *genericScheduler) addNominatedPods(pod *v1.Pod, meta predicates.Predica
return false, meta, state, nodeInfo, err
}
}
status := g.framework.RunPreFilterExtensionAddPod(stateOut, pod, p, nodeInfoOut)
status := g.framework.RunPreFilterExtensionAddPod(ctx, stateOut, pod, p, nodeInfoOut)
if !status.IsSuccess() {
return false, meta, state, nodeInfo, status.AsError()
}
Expand All @@ -609,6 +610,7 @@ func (g *genericScheduler) addNominatedPods(pod *v1.Pod, meta predicates.Predica
// add the nominated pods. Removal of the victims is done by SelectVictimsOnNode().
// It removes victims from meta and NodeInfo before calling this function.
func (g *genericScheduler) podFitsOnNode(
ctx context.Context,
state *framework.CycleState,
pod *v1.Pod,
meta predicates.PredicateMetadata,
Expand Down Expand Up @@ -645,7 +647,7 @@ func (g *genericScheduler) podFitsOnNode(
nodeInfoToUse := info
if i == 0 {
var err error
podsAdded, metaToUse, stateToUse, nodeInfoToUse, err = g.addNominatedPods(pod, meta, state, info, queue)
podsAdded, metaToUse, stateToUse, nodeInfoToUse, err = g.addNominatedPods(ctx, pod, meta, state, info, queue)
if err != nil {
return false, []predicates.PredicateFailureReason{}, nil, err
}
Expand Down Expand Up @@ -680,7 +682,7 @@ func (g *genericScheduler) podFitsOnNode(
}
}

status = g.framework.RunFilterPlugins(stateToUse, pod, nodeInfoToUse)
status = g.framework.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse)
if !status.IsSuccess() && !status.IsUnschedulable() {
return false, failedPredicates, status, status.AsError()
}
Expand All @@ -696,6 +698,7 @@ func (g *genericScheduler) podFitsOnNode(
// The node scores returned by the priority function are multiplied by the weights to get weighted scores
// All scores are finally combined (added) to get the total weighted scores of all nodes
func PrioritizeNodes(
ctx context.Context,
pod *v1.Pod,
nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo,
meta interface{},
Expand Down Expand Up @@ -790,7 +793,7 @@ func PrioritizeNodes(

// Run the Score plugins.
state.Write(migration.PrioritiesStateKey, &migration.PrioritiesStateData{Reference: meta})
scoresMap, scoreStatus := fwk.RunScorePlugins(state, pod, nodes)
scoresMap, scoreStatus := fwk.RunScorePlugins(ctx, state, pod, nodes)
if !scoreStatus.IsSuccess() {
return framework.NodeScoreList{}, scoreStatus.AsError()
}
Expand Down Expand Up @@ -1004,6 +1007,7 @@ func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*extenderv1.Victims) *
// selectNodesForPreemption finds all the nodes with possible victims for
// preemption in parallel.
func (g *genericScheduler) selectNodesForPreemption(
ctx context.Context,
state *framework.CycleState,
pod *v1.Pod,
nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo,
Expand Down Expand Up @@ -1031,7 +1035,7 @@ func (g *genericScheduler) selectNodesForPreemption(
stateCopy := state.Clone()
stateCopy.Write(migration.PredicatesStateKey, &migration.PredicatesStateData{Reference: metaCopy})
pods, numPDBViolations, fits := g.selectVictimsOnNode(
stateCopy, pod, metaCopy, nodeInfoCopy, fitPredicates, queue, pdbs)
ctx, stateCopy, pod, metaCopy, nodeInfoCopy, fitPredicates, queue, pdbs)
if fits {
resultLock.Lock()
victims := extenderv1.Victims{
Expand Down Expand Up @@ -1101,6 +1105,7 @@ func filterPodsWithPDBViolation(pods []*v1.Pod, pdbs []*policy.PodDisruptionBudg
// due to pod affinity, node affinity, or node anti-affinity reasons. None of
// these predicates can be satisfied by removing more pods from the node.
func (g *genericScheduler) selectVictimsOnNode(
ctx context.Context,
state *framework.CycleState,
pod *v1.Pod,
meta predicates.PredicateMetadata,
Expand All @@ -1120,7 +1125,7 @@ func (g *genericScheduler) selectVictimsOnNode(
return err
}
}
status := g.framework.RunPreFilterExtensionRemovePod(state, pod, rp, nodeInfo)
status := g.framework.RunPreFilterExtensionRemovePod(ctx, state, pod, rp, nodeInfo)
if !status.IsSuccess() {
return status.AsError()
}
Expand All @@ -1133,7 +1138,7 @@ func (g *genericScheduler) selectVictimsOnNode(
return err
}
}
status := g.framework.RunPreFilterExtensionAddPod(state, pod, ap, nodeInfo)
status := g.framework.RunPreFilterExtensionAddPod(ctx, state, pod, ap, nodeInfo)
if !status.IsSuccess() {
return status.AsError()
}
Expand All @@ -1156,7 +1161,7 @@ func (g *genericScheduler) selectVictimsOnNode(
// inter-pod affinity to one or more victims, but we have decided not to
// support this case for performance reasons. Having affinity to lower
// priority pods is not a recommended configuration anyway.
if fits, _, _, err := g.podFitsOnNode(state, pod, meta, nodeInfo, fitPredicates, queue, false); !fits {
if fits, _, _, err := g.podFitsOnNode(ctx, state, pod, meta, nodeInfo, fitPredicates, queue, false); !fits {
if err != nil {
klog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err)
}
Expand All @@ -1174,7 +1179,7 @@ func (g *genericScheduler) selectVictimsOnNode(
if err := addPod(p); err != nil {
return false, err
}
fits, _, _, _ := g.podFitsOnNode(state, pod, meta, nodeInfo, fitPredicates, queue, false)
fits, _, _, _ := g.podFitsOnNode(ctx, state, pod, meta, nodeInfo, fitPredicates, queue, false)
if !fits {
if err := removePod(p); err != nil {
return false, err
Expand Down