Skip to content

Commit

Permalink
Merge pull request #116729 from AxeZhan/handlers_sync
Browse files Browse the repository at this point in the history
[Scheduler] Make sure handlers have synced before scheduling
  • Loading branch information
k8s-ci-robot committed Jun 28, 2023
2 parents 74bd77a + 9c7166f commit ddbf357
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 31 deletions.
8 changes: 7 additions & 1 deletion cmd/kube-scheduler/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,17 @@ func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *
if cc.DynInformerFactory != nil {
cc.DynInformerFactory.WaitForCacheSync(ctx.Done())
}

// Wait for all handlers to sync (all items in the initial list delivered) before scheduling.
if err := sched.WaitForHandlersSync(ctx); err != nil {
logger.Error(err, "waiting for handlers to sync")
}

logger.V(3).Info("Handlers synced")
}
if !cc.ComponentConfig.DelayCacheUntilActive || cc.LeaderElection == nil {
startInformersAndWaitForSync(ctx)
}

// If leader election is enabled, runCommand via LeaderElector until done and exit.
if cc.LeaderElection != nil {
cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
Expand Down
122 changes: 95 additions & 27 deletions pkg/scheduler/eventhandlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@ limitations under the License.
package scheduler

import (
"context"
"fmt"
"reflect"
"strings"
"time"

v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers"
Expand Down Expand Up @@ -256,16 +259,39 @@ func responsibleForPod(pod *v1.Pod, profiles profile.Map) bool {
return profiles.HandlesSchedulerName(pod.Spec.SchedulerName)
}

const (
// syncedPollPeriod controls how often you look at the status of your sync funcs
syncedPollPeriod = 100 * time.Millisecond
)

// WaitForHandlersSync waits for EventHandlers to sync.
// It returns true if it was successful, false if the controller should shut down
func (sched *Scheduler) WaitForHandlersSync(ctx context.Context) error {
return wait.PollUntilContextCancel(ctx, syncedPollPeriod, true, func(ctx context.Context) (done bool, err error) {
for _, handler := range sched.registeredHandlers {
if !handler.HasSynced() {
return false, nil
}
}
return true, nil
})
}

// addAllEventHandlers is a helper function used in tests and in Scheduler
// to add event handlers for various informers.
func addAllEventHandlers(
sched *Scheduler,
informerFactory informers.SharedInformerFactory,
dynInformerFactory dynamicinformer.DynamicSharedInformerFactory,
gvkMap map[framework.GVK]framework.ActionType,
) {
) error {
var (
handlerRegistration cache.ResourceEventHandlerRegistration
err error
handlers []cache.ResourceEventHandlerRegistration
)
// scheduled pod cache
informerFactory.Core().V1().Pods().Informer().AddEventHandler(
if handlerRegistration, err = informerFactory.Core().V1().Pods().Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
Expand All @@ -290,9 +316,13 @@ func addAllEventHandlers(
DeleteFunc: sched.deletePodFromCache,
},
},
)
); err != nil {
return err
}
handlers = append(handlers, handlerRegistration)

// unscheduled pod queue
informerFactory.Core().V1().Pods().Informer().AddEventHandler(
if handlerRegistration, err = informerFactory.Core().V1().Pods().Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
Expand All @@ -317,15 +347,21 @@ func addAllEventHandlers(
DeleteFunc: sched.deletePodFromSchedulingQueue,
},
},
)
); err != nil {
return err
}
handlers = append(handlers, handlerRegistration)

informerFactory.Core().V1().Nodes().Informer().AddEventHandler(
if handlerRegistration, err = informerFactory.Core().V1().Nodes().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: sched.addNodeToCache,
UpdateFunc: sched.updateNodeInCache,
DeleteFunc: sched.deleteNodeFromCache,
},
)
); err != nil {
return err
}
handlers = append(handlers, handlerRegistration)

logger := sched.logger
buildEvtResHandler := func(at framework.ActionType, gvk framework.GVK, shortGVK string) cache.ResourceEventHandlerFuncs {
Expand Down Expand Up @@ -356,17 +392,26 @@ func addAllEventHandlers(
case framework.Node, framework.Pod:
// Do nothing.
case framework.CSINode:
informerFactory.Storage().V1().CSINodes().Informer().AddEventHandler(
if handlerRegistration, err = informerFactory.Storage().V1().CSINodes().Informer().AddEventHandler(
buildEvtResHandler(at, framework.CSINode, "CSINode"),
)
); err != nil {
return err
}
handlers = append(handlers, handlerRegistration)
case framework.CSIDriver:
informerFactory.Storage().V1().CSIDrivers().Informer().AddEventHandler(
if handlerRegistration, err = informerFactory.Storage().V1().CSIDrivers().Informer().AddEventHandler(
buildEvtResHandler(at, framework.CSIDriver, "CSIDriver"),
)
); err != nil {
return err
}
handlers = append(handlers, handlerRegistration)
case framework.CSIStorageCapacity:
informerFactory.Storage().V1().CSIStorageCapacities().Informer().AddEventHandler(
if handlerRegistration, err = informerFactory.Storage().V1().CSIStorageCapacities().Informer().AddEventHandler(
buildEvtResHandler(at, framework.CSIStorageCapacity, "CSIStorageCapacity"),
)
); err != nil {
return err
}
handlers = append(handlers, handlerRegistration)
case framework.PersistentVolume:
// MaxPDVolumeCountPredicate: since it relies on the counts of PV.
//
Expand All @@ -381,42 +426,60 @@ func addAllEventHandlers(
// bindings due to conflicts if PVs are updated by PV controller or other
// parties, then scheduler will add pod back to unschedulable queue. We
// need to move pods to active queue on PV update for this scenario.
informerFactory.Core().V1().PersistentVolumes().Informer().AddEventHandler(
if handlerRegistration, err = informerFactory.Core().V1().PersistentVolumes().Informer().AddEventHandler(
buildEvtResHandler(at, framework.PersistentVolume, "Pv"),
)
); err != nil {
return err
}
handlers = append(handlers, handlerRegistration)
case framework.PersistentVolumeClaim:
// MaxPDVolumeCountPredicate: add/update PVC will affect counts of PV when it is bound.
informerFactory.Core().V1().PersistentVolumeClaims().Informer().AddEventHandler(
if handlerRegistration, err = informerFactory.Core().V1().PersistentVolumeClaims().Informer().AddEventHandler(
buildEvtResHandler(at, framework.PersistentVolumeClaim, "Pvc"),
)
); err != nil {
return err
}
handlers = append(handlers, handlerRegistration)
case framework.PodSchedulingContext:
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
_, _ = informerFactory.Resource().V1alpha2().PodSchedulingContexts().Informer().AddEventHandler(
if handlerRegistration, err = informerFactory.Resource().V1alpha2().PodSchedulingContexts().Informer().AddEventHandler(
buildEvtResHandler(at, framework.PodSchedulingContext, "PodSchedulingContext"),
)
); err != nil {
return err
}
}
handlers = append(handlers, handlerRegistration)
case framework.ResourceClaim:
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
_, _ = informerFactory.Resource().V1alpha2().ResourceClaims().Informer().AddEventHandler(
if handlerRegistration, err = informerFactory.Resource().V1alpha2().ResourceClaims().Informer().AddEventHandler(
buildEvtResHandler(at, framework.ResourceClaim, "ResourceClaim"),
)
); err != nil {
return err
}
}
handlers = append(handlers, handlerRegistration)
case framework.StorageClass:
if at&framework.Add != 0 {
informerFactory.Storage().V1().StorageClasses().Informer().AddEventHandler(
if handlerRegistration, err = informerFactory.Storage().V1().StorageClasses().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: sched.onStorageClassAdd,
},
)
); err != nil {
return err
}
handlers = append(handlers, handlerRegistration)
}
if at&framework.Update != 0 {
informerFactory.Storage().V1().StorageClasses().Informer().AddEventHandler(
if handlerRegistration, err = informerFactory.Storage().V1().StorageClasses().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
UpdateFunc: func(old, obj interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.StorageClassUpdate, old, obj, nil)
},
},
)
); err != nil {
return err
}
handlers = append(handlers, handlerRegistration)
}
default:
// Tests may not instantiate dynInformerFactory.
Expand All @@ -438,11 +501,16 @@ func addAllEventHandlers(
// Fall back to try dynamic informers.
gvr, _ := schema.ParseResourceArg(string(gvk))
dynInformer := dynInformerFactory.ForResource(*gvr).Informer()
dynInformer.AddEventHandler(
if handlerRegistration, err = dynInformer.AddEventHandler(
buildEvtResHandler(at, gvk, strings.Title(gvr.Resource)),
)
); err != nil {
return err
}
handlers = append(handlers, handlerRegistration)
}
}
sched.registeredHandlers = handlers
return nil
}

func nodeSchedulingPropertiesChange(newNode *v1.Node, oldNode *v1.Node) *framework.ClusterEvent {
Expand Down
4 changes: 3 additions & 1 deletion pkg/scheduler/eventhandlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,9 @@ func TestAddAllEventHandlers(t *testing.T) {
dynclient := dyfake.NewSimpleDynamicClient(scheme)
dynInformerFactory := dynamicinformer.NewDynamicSharedInformerFactory(dynclient, 0)

addAllEventHandlers(&testSched, informerFactory, dynInformerFactory, tt.gvkMap)
if err := addAllEventHandlers(&testSched, informerFactory, dynInformerFactory, tt.gvkMap); err != nil {
t.Fatalf("Add event handlers failed, error = %v", err)
}

informerFactory.Start(testSched.StopEverything)
dynInformerFactory.Start(testSched.StopEverything)
Expand Down
3 changes: 3 additions & 0 deletions pkg/scheduler/schedule_one_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,9 @@ func TestSchedulerMultipleProfilesScheduling(t *testing.T) {
// Run scheduler.
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
if err = sched.WaitForHandlersSync(ctx); err != nil {
t.Fatalf("Handlers failed to sync: %v: ", err)
}
go sched.Run(ctx)

// Send pods to be scheduled.
Expand Down
7 changes: 6 additions & 1 deletion pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ type Scheduler struct {
// otherwise logging functions will access a nil sink and
// panic.
logger klog.Logger

// registeredHandlers contains the registrations of all handlers. It's used to check if all handlers have finished syncing before the scheduling cycles start.
registeredHandlers []cache.ResourceEventHandlerRegistration
}

func (sched *Scheduler) applyDefaultHandlers() {
Expand Down Expand Up @@ -349,7 +352,9 @@ func New(ctx context.Context,
}
sched.applyDefaultHandlers()

addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(queueingHintsPerProfile))
if err = addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(queueingHintsPerProfile)); err != nil {
return nil, fmt.Errorf("adding event handlers: %w", err)
}

return sched, nil
}
Expand Down
10 changes: 9 additions & 1 deletion test/integration/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ func StartScheduler(ctx context.Context, clientSet clientset.Interface, kubeConf

evtBroadcaster.StartRecordingToSink(ctx.Done())

logger := klog.FromContext(ctx)

sched, err := scheduler.New(
ctx,
clientSet,
Expand All @@ -96,11 +98,17 @@ func StartScheduler(ctx context.Context, clientSet clientset.Interface, kubeConf
scheduler.WithExtenders(cfg.Extenders...),
scheduler.WithParallelism(cfg.Parallelism))
if err != nil {
klog.Fatalf("Error creating scheduler: %v", err)
logger.Error(err, "Error creating scheduler")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}

informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
if err = sched.WaitForHandlersSync(ctx); err != nil {
logger.Error(err, "Failed waiting for handlers to sync")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
logger.V(3).Info("Handlers synced")
go sched.Run(ctx)

return sched, informerFactory
Expand Down

0 comments on commit ddbf357

Please sign in to comment.