diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index 40effb6fd34f2..4e73b88949dbf 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -494,7 +494,7 @@ func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v // addNominatedPods adds pods with equal or greater priority which are nominated // to run on the node given in nodeInfo to meta and nodeInfo. It returns 1) whether // any pod was found, 2) augmented meta data, 3) augmented nodeInfo. -func addNominatedPods(podPriority int32, meta algorithm.PredicateMetadata, +func addNominatedPods(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo, queue internalqueue.SchedulingQueue) (bool, algorithm.PredicateMetadata, *schedulercache.NodeInfo) { if queue == nil || nodeInfo == nil || nodeInfo.Node() == nil { @@ -511,7 +511,7 @@ func addNominatedPods(podPriority int32, meta algorithm.PredicateMetadata, } nodeInfoOut := nodeInfo.Clone() for _, p := range nominatedPods { - if util.GetPodPriority(p) >= podPriority { + if util.GetPodPriority(p) >= util.GetPodPriority(pod) && p.UID != pod.UID { nodeInfoOut.AddPod(p) if metaOut != nil { metaOut.AddPod(p, nodeInfoOut) @@ -569,7 +569,7 @@ func podFitsOnNode( metaToUse := meta nodeInfoToUse := info if i == 0 { - podsAdded, metaToUse, nodeInfoToUse = addNominatedPods(util.GetPodPriority(pod), meta, info, queue) + podsAdded, metaToUse, nodeInfoToUse = addNominatedPods(pod, meta, info, queue) } else if !podsAdded || len(failedPredicates) != 0 { break } diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index eaa9dee3f6bd2..aa420bcb173bf 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -135,6 +135,9 @@ type Config struct { // Disable pod preemption or not. DisablePreemption bool + + // SchedulingQueue holds pods to be scheduled + SchedulingQueue internalqueue.SchedulingQueue } // PodPreemptor has methods needed to delete a pod and to update @@ -1261,9 +1264,10 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, NextPod: func() *v1.Pod { return c.getNextPod() }, - Error: c.MakeDefaultErrorFunc(podBackoff, c.podQueue), - StopEverything: c.StopEverything, - VolumeBinder: c.volumeBinder, + Error: c.MakeDefaultErrorFunc(podBackoff, c.podQueue), + StopEverything: c.StopEverything, + VolumeBinder: c.volumeBinder, + SchedulingQueue: c.podQueue, }, nil } diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index b9a1b5c8970ef..6f5aa682c98ff 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -67,6 +67,8 @@ type SchedulingQueue interface { // Close closes the SchedulingQueue so that the goroutine which is // waiting to pop items can exit gracefully. Close() + // DeleteNominatedPodIfExists deletes nominatedPod from internal cache + DeleteNominatedPodIfExists(pod *v1.Pod) } // NewSchedulingQueue initializes a new scheduling queue. If pod priority is @@ -157,6 +159,9 @@ func (f *FIFO) Close() { f.FIFO.Close() } +// DeleteNominatedPodIfExists does nothing in FIFO. +func (f *FIFO) DeleteNominatedPodIfExists(pod *v1.Pod) {} + // NewFIFO creates a FIFO object. func NewFIFO() *FIFO { return &FIFO{FIFO: cache.NewFIFO(cache.MetaNamespaceKeyFunc)} @@ -219,7 +224,7 @@ func (p *PriorityQueue) addNominatedPodIfNeeded(pod *v1.Pod) { if len(nnn) > 0 { for _, np := range p.nominatedPods[nnn] { if np.UID == pod.UID { - klog.Errorf("Pod %v/%v already exists in the nominated map!", pod.Namespace, pod.Name) + klog.V(4).Infof("Pod %v/%v already exists in the nominated map!", pod.Namespace, pod.Name) return } } @@ -228,6 +233,7 @@ func (p *PriorityQueue) addNominatedPodIfNeeded(pod *v1.Pod) { } // deleteNominatedPodIfExists deletes a pod from the nominatedPods. +// NOTE: this function assumes lock has been acquired in caller. func (p *PriorityQueue) deleteNominatedPodIfExists(pod *v1.Pod) { nnn := NominatedNodeName(pod) if len(nnn) > 0 { @@ -342,7 +348,6 @@ func (p *PriorityQueue) Pop() (*v1.Pod, error) { return nil, err } pod := obj.(*v1.Pod) - p.deleteNominatedPodIfExists(pod) p.receivedMoveRequest = false return pod, err } @@ -411,13 +416,17 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) error { // AssignedPodAdded is called when a bound pod is added. Creation of this pod // may make pending pods with matching affinity terms schedulable. func (p *PriorityQueue) AssignedPodAdded(pod *v1.Pod) { + p.lock.Lock() p.movePodsToActiveQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod)) + p.lock.Unlock() } // AssignedPodUpdated is called when a bound pod is updated. Change of labels // may make pending pods with matching affinity terms schedulable. func (p *PriorityQueue) AssignedPodUpdated(pod *v1.Pod) { + p.lock.Lock() p.movePodsToActiveQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod)) + p.lock.Unlock() } // MoveAllToActiveQueue moves all pods from unschedulableQ to activeQ. This @@ -441,9 +450,8 @@ func (p *PriorityQueue) MoveAllToActiveQueue() { p.cond.Broadcast() } +// NOTE: this function assumes lock has been acquired in caller func (p *PriorityQueue) movePodsToActiveQueue(pods []*v1.Pod) { - p.lock.Lock() - defer p.lock.Unlock() for _, pod := range pods { if err := p.activeQ.Add(pod); err == nil { p.unschedulableQ.delete(pod) @@ -457,9 +465,8 @@ func (p *PriorityQueue) movePodsToActiveQueue(pods []*v1.Pod) { // getUnschedulablePodsWithMatchingAffinityTerm returns unschedulable pods which have // any affinity term that matches "pod". +// NOTE: this function assumes lock has been acquired in caller. func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod) []*v1.Pod { - p.lock.RLock() - defer p.lock.RUnlock() var podsToMove []*v1.Pod for _, up := range p.unschedulableQ.pods { affinity := up.Spec.Affinity @@ -516,6 +523,13 @@ func (p *PriorityQueue) Close() { p.cond.Broadcast() } +// DeleteNominatedPodIfExists deletes pod from internal cache if it's a nominatedPod +func (p *PriorityQueue) DeleteNominatedPodIfExists(pod *v1.Pod) { + p.lock.Lock() + p.deleteNominatedPodIfExists(pod) + p.lock.Unlock() +} + // UnschedulablePodsMap holds pods that cannot be scheduled. This data structure // is used to implement unschedulableQ. type UnschedulablePodsMap struct { diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index cdce43adc6a5c..ca0e336965621 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -112,8 +112,8 @@ func TestPriorityQueue_Add(t *testing.T) { if p, err := q.Pop(); err != nil || p != &unschedulablePod { t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePod.Name, p.Name) } - if len(q.nominatedPods) != 0 { - t.Errorf("Expected nomindatePods to be empty: %v", q.nominatedPods) + if len(q.nominatedPods["node1"]) != 2 { + t.Errorf("Expected medPriorityPod and unschedulablePod to be still present in nomindatePods: %v", q.nominatedPods["node1"]) } } @@ -135,8 +135,8 @@ func TestPriorityQueue_AddIfNotPresent(t *testing.T) { if p, err := q.Pop(); err != nil || p != &unschedulablePod { t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePod.Name, p.Name) } - if len(q.nominatedPods) != 0 { - t.Errorf("Expected nomindatePods to be empty: %v", q.nominatedPods) + if len(q.nominatedPods["node1"]) != 2 { + t.Errorf("Expected medPriorityPod and unschedulablePod to be still present in nomindatePods: %v", q.nominatedPods["node1"]) } if q.unschedulableQ.get(&highPriNominatedPod) != &highPriNominatedPod { t.Errorf("Pod %v was not found in the unschedulableQ.", highPriNominatedPod.Name) @@ -178,8 +178,8 @@ func TestPriorityQueue_Pop(t *testing.T) { if p, err := q.Pop(); err != nil || p != &medPriorityPod { t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Name) } - if len(q.nominatedPods) != 0 { - t.Errorf("Expected nomindatePods to be empty: %v", q.nominatedPods) + if len(q.nominatedPods["node1"]) != 1 { + t.Errorf("Expected medPriorityPod to be present in nomindatePods: %v", q.nominatedPods["node1"]) } }() q.Add(&medPriorityPod) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index b9b94d3970662..ae8db84e4af7b 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -433,6 +433,10 @@ func (sched *Scheduler) assume(assumed *v1.Pod, host string) error { }) return err } + // if "assumed" is a nominated pod, we should remove it from internal cache + if sched.config.SchedulingQueue != nil { + sched.config.SchedulingQueue.DeleteNominatedPodIfExists(assumed) + } // Optimistically assume that the binding will succeed, so we need to invalidate affected // predicates in equivalence cache.