diff --git a/cmd/kube-scheduler/app/server.go b/cmd/kube-scheduler/app/server.go index 7fcae39f81773..c48b09a420d6f 100644 --- a/cmd/kube-scheduler/app/server.go +++ b/cmd/kube-scheduler/app/server.go @@ -208,11 +208,17 @@ func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched * if cc.DynInformerFactory != nil { cc.DynInformerFactory.WaitForCacheSync(ctx.Done()) } + + // Wait for all handlers to sync (all items in the initial list delivered) before scheduling. + if err := sched.WaitForHandlersSync(ctx); err != nil { + logger.Error(err, "waiting for handlers to sync") + } + + logger.V(3).Info("Handlers synced") } if !cc.ComponentConfig.DelayCacheUntilActive || cc.LeaderElection == nil { startInformersAndWaitForSync(ctx) } - // If leader election is enabled, runCommand via LeaderElector until done and exit. if cc.LeaderElection != nil { cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{ diff --git a/pkg/scheduler/eventhandlers.go b/pkg/scheduler/eventhandlers.go index 08adba8a0a9f9..5d1fe0dde47e5 100644 --- a/pkg/scheduler/eventhandlers.go +++ b/pkg/scheduler/eventhandlers.go @@ -17,14 +17,17 @@ limitations under the License. package scheduler import ( + "context" "fmt" "reflect" "strings" + "time" v1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/informers" @@ -256,6 +259,24 @@ func responsibleForPod(pod *v1.Pod, profiles profile.Map) bool { return profiles.HandlesSchedulerName(pod.Spec.SchedulerName) } +const ( + // syncedPollPeriod controls how often you look at the status of your sync funcs + syncedPollPeriod = 100 * time.Millisecond +) + +// WaitForHandlersSync waits for EventHandlers to sync. +// It returns true if it was successful, false if the controller should shut down +func (sched *Scheduler) WaitForHandlersSync(ctx context.Context) error { + return wait.PollUntilContextCancel(ctx, syncedPollPeriod, true, func(ctx context.Context) (done bool, err error) { + for _, handler := range sched.registeredHandlers { + if !handler.HasSynced() { + return false, nil + } + } + return true, nil + }) +} + // addAllEventHandlers is a helper function used in tests and in Scheduler // to add event handlers for various informers. func addAllEventHandlers( @@ -263,9 +284,14 @@ func addAllEventHandlers( informerFactory informers.SharedInformerFactory, dynInformerFactory dynamicinformer.DynamicSharedInformerFactory, gvkMap map[framework.GVK]framework.ActionType, -) { +) error { + var ( + handlerRegistration cache.ResourceEventHandlerRegistration + err error + handlers []cache.ResourceEventHandlerRegistration + ) // scheduled pod cache - informerFactory.Core().V1().Pods().Informer().AddEventHandler( + if handlerRegistration, err = informerFactory.Core().V1().Pods().Informer().AddEventHandler( cache.FilteringResourceEventHandler{ FilterFunc: func(obj interface{}) bool { switch t := obj.(type) { @@ -290,9 +316,13 @@ func addAllEventHandlers( DeleteFunc: sched.deletePodFromCache, }, }, - ) + ); err != nil { + return err + } + handlers = append(handlers, handlerRegistration) + // unscheduled pod queue - informerFactory.Core().V1().Pods().Informer().AddEventHandler( + if handlerRegistration, err = informerFactory.Core().V1().Pods().Informer().AddEventHandler( cache.FilteringResourceEventHandler{ FilterFunc: func(obj interface{}) bool { switch t := obj.(type) { @@ -317,15 +347,21 @@ func addAllEventHandlers( DeleteFunc: sched.deletePodFromSchedulingQueue, }, }, - ) + ); err != nil { + return err + } + handlers = append(handlers, handlerRegistration) - informerFactory.Core().V1().Nodes().Informer().AddEventHandler( + if handlerRegistration, err = informerFactory.Core().V1().Nodes().Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: sched.addNodeToCache, UpdateFunc: sched.updateNodeInCache, DeleteFunc: sched.deleteNodeFromCache, }, - ) + ); err != nil { + return err + } + handlers = append(handlers, handlerRegistration) logger := sched.logger buildEvtResHandler := func(at framework.ActionType, gvk framework.GVK, shortGVK string) cache.ResourceEventHandlerFuncs { @@ -356,17 +392,26 @@ func addAllEventHandlers( case framework.Node, framework.Pod: // Do nothing. case framework.CSINode: - informerFactory.Storage().V1().CSINodes().Informer().AddEventHandler( + if handlerRegistration, err = informerFactory.Storage().V1().CSINodes().Informer().AddEventHandler( buildEvtResHandler(at, framework.CSINode, "CSINode"), - ) + ); err != nil { + return err + } + handlers = append(handlers, handlerRegistration) case framework.CSIDriver: - informerFactory.Storage().V1().CSIDrivers().Informer().AddEventHandler( + if handlerRegistration, err = informerFactory.Storage().V1().CSIDrivers().Informer().AddEventHandler( buildEvtResHandler(at, framework.CSIDriver, "CSIDriver"), - ) + ); err != nil { + return err + } + handlers = append(handlers, handlerRegistration) case framework.CSIStorageCapacity: - informerFactory.Storage().V1().CSIStorageCapacities().Informer().AddEventHandler( + if handlerRegistration, err = informerFactory.Storage().V1().CSIStorageCapacities().Informer().AddEventHandler( buildEvtResHandler(at, framework.CSIStorageCapacity, "CSIStorageCapacity"), - ) + ); err != nil { + return err + } + handlers = append(handlers, handlerRegistration) case framework.PersistentVolume: // MaxPDVolumeCountPredicate: since it relies on the counts of PV. // @@ -381,42 +426,60 @@ func addAllEventHandlers( // bindings due to conflicts if PVs are updated by PV controller or other // parties, then scheduler will add pod back to unschedulable queue. We // need to move pods to active queue on PV update for this scenario. - informerFactory.Core().V1().PersistentVolumes().Informer().AddEventHandler( + if handlerRegistration, err = informerFactory.Core().V1().PersistentVolumes().Informer().AddEventHandler( buildEvtResHandler(at, framework.PersistentVolume, "Pv"), - ) + ); err != nil { + return err + } + handlers = append(handlers, handlerRegistration) case framework.PersistentVolumeClaim: // MaxPDVolumeCountPredicate: add/update PVC will affect counts of PV when it is bound. - informerFactory.Core().V1().PersistentVolumeClaims().Informer().AddEventHandler( + if handlerRegistration, err = informerFactory.Core().V1().PersistentVolumeClaims().Informer().AddEventHandler( buildEvtResHandler(at, framework.PersistentVolumeClaim, "Pvc"), - ) + ); err != nil { + return err + } + handlers = append(handlers, handlerRegistration) case framework.PodSchedulingContext: if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { - _, _ = informerFactory.Resource().V1alpha2().PodSchedulingContexts().Informer().AddEventHandler( + if handlerRegistration, err = informerFactory.Resource().V1alpha2().PodSchedulingContexts().Informer().AddEventHandler( buildEvtResHandler(at, framework.PodSchedulingContext, "PodSchedulingContext"), - ) + ); err != nil { + return err + } } + handlers = append(handlers, handlerRegistration) case framework.ResourceClaim: if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { - _, _ = informerFactory.Resource().V1alpha2().ResourceClaims().Informer().AddEventHandler( + if handlerRegistration, err = informerFactory.Resource().V1alpha2().ResourceClaims().Informer().AddEventHandler( buildEvtResHandler(at, framework.ResourceClaim, "ResourceClaim"), - ) + ); err != nil { + return err + } } + handlers = append(handlers, handlerRegistration) case framework.StorageClass: if at&framework.Add != 0 { - informerFactory.Storage().V1().StorageClasses().Informer().AddEventHandler( + if handlerRegistration, err = informerFactory.Storage().V1().StorageClasses().Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: sched.onStorageClassAdd, }, - ) + ); err != nil { + return err + } + handlers = append(handlers, handlerRegistration) } if at&framework.Update != 0 { - informerFactory.Storage().V1().StorageClasses().Informer().AddEventHandler( + if handlerRegistration, err = informerFactory.Storage().V1().StorageClasses().Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ UpdateFunc: func(old, obj interface{}) { sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.StorageClassUpdate, old, obj, nil) }, }, - ) + ); err != nil { + return err + } + handlers = append(handlers, handlerRegistration) } default: // Tests may not instantiate dynInformerFactory. @@ -438,11 +501,16 @@ func addAllEventHandlers( // Fall back to try dynamic informers. gvr, _ := schema.ParseResourceArg(string(gvk)) dynInformer := dynInformerFactory.ForResource(*gvr).Informer() - dynInformer.AddEventHandler( + if handlerRegistration, err = dynInformer.AddEventHandler( buildEvtResHandler(at, gvk, strings.Title(gvr.Resource)), - ) + ); err != nil { + return err + } + handlers = append(handlers, handlerRegistration) } } + sched.registeredHandlers = handlers + return nil } func nodeSchedulingPropertiesChange(newNode *v1.Node, oldNode *v1.Node) *framework.ClusterEvent { diff --git a/pkg/scheduler/eventhandlers_test.go b/pkg/scheduler/eventhandlers_test.go index 2dbef1fc961e8..7a67d8d6ce9ee 100644 --- a/pkg/scheduler/eventhandlers_test.go +++ b/pkg/scheduler/eventhandlers_test.go @@ -448,7 +448,9 @@ func TestAddAllEventHandlers(t *testing.T) { dynclient := dyfake.NewSimpleDynamicClient(scheme) dynInformerFactory := dynamicinformer.NewDynamicSharedInformerFactory(dynclient, 0) - addAllEventHandlers(&testSched, informerFactory, dynInformerFactory, tt.gvkMap) + if err := addAllEventHandlers(&testSched, informerFactory, dynInformerFactory, tt.gvkMap); err != nil { + t.Fatalf("Add event handlers failed, error = %v", err) + } informerFactory.Start(testSched.StopEverything) dynInformerFactory.Start(testSched.StopEverything) diff --git a/pkg/scheduler/schedule_one_test.go b/pkg/scheduler/schedule_one_test.go index 712153f280860..c9fb77ba97a18 100644 --- a/pkg/scheduler/schedule_one_test.go +++ b/pkg/scheduler/schedule_one_test.go @@ -469,6 +469,9 @@ func TestSchedulerMultipleProfilesScheduling(t *testing.T) { // Run scheduler. informerFactory.Start(ctx.Done()) informerFactory.WaitForCacheSync(ctx.Done()) + if err = sched.WaitForHandlersSync(ctx); err != nil { + t.Fatalf("Handlers failed to sync: %v: ", err) + } go sched.Run(ctx) // Send pods to be scheduled. diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index ac5f4873dfe0c..9809d548cd231 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -100,6 +100,9 @@ type Scheduler struct { // otherwise logging functions will access a nil sink and // panic. logger klog.Logger + + // registeredHandlers contains the registrations of all handlers. It's used to check if all handlers have finished syncing before the scheduling cycles start. + registeredHandlers []cache.ResourceEventHandlerRegistration } func (sched *Scheduler) applyDefaultHandlers() { @@ -349,7 +352,9 @@ func New(ctx context.Context, } sched.applyDefaultHandlers() - addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(queueingHintsPerProfile)) + if err = addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(queueingHintsPerProfile)); err != nil { + return nil, fmt.Errorf("adding event handlers: %w", err) + } return sched, nil } diff --git a/test/integration/util/util.go b/test/integration/util/util.go index 69f24213fa294..1f154ed1802a5 100644 --- a/test/integration/util/util.go +++ b/test/integration/util/util.go @@ -82,6 +82,8 @@ func StartScheduler(ctx context.Context, clientSet clientset.Interface, kubeConf evtBroadcaster.StartRecordingToSink(ctx.Done()) + logger := klog.FromContext(ctx) + sched, err := scheduler.New( ctx, clientSet, @@ -96,11 +98,17 @@ func StartScheduler(ctx context.Context, clientSet clientset.Interface, kubeConf scheduler.WithExtenders(cfg.Extenders...), scheduler.WithParallelism(cfg.Parallelism)) if err != nil { - klog.Fatalf("Error creating scheduler: %v", err) + logger.Error(err, "Error creating scheduler") + klog.FlushAndExit(klog.ExitFlushTimeout, 1) } informerFactory.Start(ctx.Done()) informerFactory.WaitForCacheSync(ctx.Done()) + if err = sched.WaitForHandlersSync(ctx); err != nil { + logger.Error(err, "Failed waiting for handlers to sync") + klog.FlushAndExit(klog.ExitFlushTimeout, 1) + } + logger.V(3).Info("Handlers synced") go sched.Run(ctx) return sched, informerFactory