Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

poc: implement ClusterEventWithHint for efficient enqueueing #8

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions pkg/scheduler/eventhandlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (sched *Scheduler) onStorageClassAdd(obj interface{}) {
// We don't need to invalidate cached results because results will not be
// cached for pod that has unbound immediate PVCs.
if sc.VolumeBindingMode != nil && *sc.VolumeBindingMode == storagev1.VolumeBindingWaitForFirstConsumer {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.StorageClassAdd, nil)
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.StorageClassAdd, nil, sc, nil)
}
}

Expand All @@ -71,7 +71,7 @@ func (sched *Scheduler) addNodeToCache(obj interface{}) {

nodeInfo := sched.Cache.AddNode(logger, node)
logger.V(3).Info("Add event for node", "node", klog.KObj(node))
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.NodeAdd, preCheckForNode(nodeInfo))
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.NodeAdd, nil, node, preCheckForNode(nodeInfo))
}

func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) {
Expand All @@ -90,7 +90,7 @@ func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) {
nodeInfo := sched.Cache.UpdateNode(logger, oldNode, newNode)
// Only requeue unschedulable pods if the node became more schedulable.
if event := nodeSchedulingPropertiesChange(newNode, oldNode); event != nil {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, *event, preCheckForNode(nodeInfo))
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, *event, oldNode, newNode, preCheckForNode(nodeInfo))
}
}

Expand Down Expand Up @@ -180,7 +180,7 @@ func (sched *Scheduler) deletePodFromSchedulingQueue(obj interface{}) {
// removing it from the scheduler cache. In this case, signal a AssignedPodDelete
// event to immediately retry some unscheduled Pods.
if fwk.RejectWaitingPod(pod.UID) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.AssignedPodDelete, nil)
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.AssignedPodDelete, pod, nil, nil)
}
}

Expand Down Expand Up @@ -218,7 +218,7 @@ func (sched *Scheduler) updatePodInCache(oldObj, newObj interface{}) {
logger.Error(err, "Scheduler cache UpdatePod failed", "pod", klog.KObj(oldPod))
}

sched.SchedulingQueue.AssignedPodUpdated(logger, newPod)
sched.SchedulingQueue.AssignedPodUpdated(logger, oldPod, newPod)
}

func (sched *Scheduler) deletePodFromCache(obj interface{}) {
Expand All @@ -243,7 +243,7 @@ func (sched *Scheduler) deletePodFromCache(obj interface{}) {
logger.Error(err, "Scheduler cache RemovePod failed", "pod", klog.KObj(pod))
}

sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.AssignedPodDelete, nil)
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.AssignedPodDelete, pod, nil, nil)
}

// assignedPod selects pods that are assigned (scheduled and running).
Expand Down Expand Up @@ -332,20 +332,20 @@ func addAllEventHandlers(
funcs := cache.ResourceEventHandlerFuncs{}
if at&framework.Add != 0 {
evt := framework.ClusterEvent{Resource: gvk, ActionType: framework.Add, Label: fmt.Sprintf("%vAdd", shortGVK)}
funcs.AddFunc = func(_ interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, evt, nil)
funcs.AddFunc = func(obj interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, evt, nil, obj, nil)
}
}
if at&framework.Update != 0 {
evt := framework.ClusterEvent{Resource: gvk, ActionType: framework.Update, Label: fmt.Sprintf("%vUpdate", shortGVK)}
funcs.UpdateFunc = func(_, _ interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, evt, nil)
funcs.UpdateFunc = func(old, obj interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, evt, old, obj, nil)
}
}
if at&framework.Delete != 0 {
evt := framework.ClusterEvent{Resource: gvk, ActionType: framework.Delete, Label: fmt.Sprintf("%vDelete", shortGVK)}
funcs.DeleteFunc = func(_ interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, evt, nil)
funcs.DeleteFunc = func(obj interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, evt, obj, nil, nil)
}
}
return funcs
Expand Down Expand Up @@ -412,8 +412,8 @@ func addAllEventHandlers(
if at&framework.Update != 0 {
informerFactory.Storage().V1().StorageClasses().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
UpdateFunc: func(_, _ interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.StorageClassUpdate, nil)
UpdateFunc: func(old, obj interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.StorageClassUpdate, old, obj, nil)
},
},
)
Expand Down
6 changes: 5 additions & 1 deletion pkg/scheduler/framework/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,13 +323,14 @@ type QueueSortPlugin interface {
// move unschedulable Pods in internal scheduling queues. Plugins
// that fail pod scheduling (e.g., Filter plugins) are expected to implement this interface.
type EnqueueExtensions interface {
Plugin
// EventsToRegister returns a series of possible events that may cause a Pod
// failed by this plugin schedulable.
// The events will be registered when instantiating the internal scheduling queue,
// and leveraged to build event handlers dynamically.
// Note: the returned list needs to be static (not depend on configuration parameters);
// otherwise it would lead to undefined behavior.
EventsToRegister() []ClusterEvent
EventsToRegister() []ClusterEventWithHint
}

// PreFilterExtensions is an interface that is included in plugins that allow specifying
Expand Down Expand Up @@ -513,6 +514,9 @@ type Framework interface {
// PreEnqueuePlugins returns the registered preEnqueue plugins.
PreEnqueuePlugins() []PreEnqueuePlugin

// EnqueueExtensions returns the registered Enqueue extensions.
EnqueueExtensions() []EnqueueExtensions

// QueueSortFunc returns the function to sort pods in scheduling queue
QueueSortFunc() LessFunc

Expand Down
Loading