Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ensure scheduler preemptor behaves in an efficient/correct path #70898

Merged
merged 1 commit into from
Nov 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/scheduler/core/generic_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v
// addNominatedPods adds pods with equal or greater priority which are nominated
// to run on the node given in nodeInfo to meta and nodeInfo. It returns 1) whether
// any pod was found, 2) augmented meta data, 3) augmented nodeInfo.
func addNominatedPods(podPriority int32, meta algorithm.PredicateMetadata,
func addNominatedPods(pod *v1.Pod, meta algorithm.PredicateMetadata,
nodeInfo *schedulercache.NodeInfo, queue internalqueue.SchedulingQueue) (bool, algorithm.PredicateMetadata,
*schedulercache.NodeInfo) {
if queue == nil || nodeInfo == nil || nodeInfo.Node() == nil {
Expand All @@ -511,7 +511,7 @@ func addNominatedPods(podPriority int32, meta algorithm.PredicateMetadata,
}
nodeInfoOut := nodeInfo.Clone()
for _, p := range nominatedPods {
if util.GetPodPriority(p) >= podPriority {
if util.GetPodPriority(p) >= util.GetPodPriority(pod) && p.UID != pod.UID {
Huang-Wei marked this conversation as resolved.
Show resolved Hide resolved
nodeInfoOut.AddPod(p)
if metaOut != nil {
metaOut.AddPod(p, nodeInfoOut)
Expand Down Expand Up @@ -569,7 +569,7 @@ func podFitsOnNode(
metaToUse := meta
nodeInfoToUse := info
if i == 0 {
podsAdded, metaToUse, nodeInfoToUse = addNominatedPods(util.GetPodPriority(pod), meta, info, queue)
podsAdded, metaToUse, nodeInfoToUse = addNominatedPods(pod, meta, info, queue)
} else if !podsAdded || len(failedPredicates) != 0 {
break
}
Expand Down
10 changes: 7 additions & 3 deletions pkg/scheduler/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ type Config struct {

// Disable pod preemption or not.
DisablePreemption bool

// SchedulingQueue holds pods to be scheduled
SchedulingQueue internalqueue.SchedulingQueue
}

// PodPreemptor has methods needed to delete a pod and to update
Expand Down Expand Up @@ -1261,9 +1264,10 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
NextPod: func() *v1.Pod {
return c.getNextPod()
},
Error: c.MakeDefaultErrorFunc(podBackoff, c.podQueue),
StopEverything: c.StopEverything,
VolumeBinder: c.volumeBinder,
Error: c.MakeDefaultErrorFunc(podBackoff, c.podQueue),
StopEverything: c.StopEverything,
VolumeBinder: c.volumeBinder,
SchedulingQueue: c.podQueue,
}, nil
}

Expand Down
26 changes: 20 additions & 6 deletions pkg/scheduler/internal/queue/scheduling_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ type SchedulingQueue interface {
// Close closes the SchedulingQueue so that the goroutine which is
// waiting to pop items can exit gracefully.
Close()
// DeleteNominatedPodIfExists deletes nominatedPod from internal cache
DeleteNominatedPodIfExists(pod *v1.Pod)
}

// NewSchedulingQueue initializes a new scheduling queue. If pod priority is
Expand Down Expand Up @@ -157,6 +159,9 @@ func (f *FIFO) Close() {
f.FIFO.Close()
}

// DeleteNominatedPodIfExists does nothing in FIFO.
func (f *FIFO) DeleteNominatedPodIfExists(pod *v1.Pod) {}

// NewFIFO creates a FIFO object.
func NewFIFO() *FIFO {
return &FIFO{FIFO: cache.NewFIFO(cache.MetaNamespaceKeyFunc)}
Expand Down Expand Up @@ -219,7 +224,7 @@ func (p *PriorityQueue) addNominatedPodIfNeeded(pod *v1.Pod) {
if len(nnn) > 0 {
for _, np := range p.nominatedPods[nnn] {
if np.UID == pod.UID {
klog.Errorf("Pod %v/%v already exists in the nominated map!", pod.Namespace, pod.Name)
klog.V(4).Infof("Pod %v/%v already exists in the nominated map!", pod.Namespace, pod.Name)
return
}
}
Expand All @@ -228,6 +233,7 @@ func (p *PriorityQueue) addNominatedPodIfNeeded(pod *v1.Pod) {
}

// deleteNominatedPodIfExists deletes a pod from the nominatedPods.
// NOTE: this function assumes lock has been acquired in caller.
func (p *PriorityQueue) deleteNominatedPodIfExists(pod *v1.Pod) {
nnn := NominatedNodeName(pod)
if len(nnn) > 0 {
Expand Down Expand Up @@ -342,7 +348,6 @@ func (p *PriorityQueue) Pop() (*v1.Pod, error) {
return nil, err
}
pod := obj.(*v1.Pod)
p.deleteNominatedPodIfExists(pod)
p.receivedMoveRequest = false
return pod, err
}
Expand Down Expand Up @@ -411,13 +416,17 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) error {
// AssignedPodAdded is called when a bound pod is added. Creation of this pod
// may make pending pods with matching affinity terms schedulable.
func (p *PriorityQueue) AssignedPodAdded(pod *v1.Pod) {
p.lock.Lock()
p.movePodsToActiveQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod))
p.lock.Unlock()
}

// AssignedPodUpdated is called when a bound pod is updated. Change of labels
// may make pending pods with matching affinity terms schedulable.
func (p *PriorityQueue) AssignedPodUpdated(pod *v1.Pod) {
p.lock.Lock()
p.movePodsToActiveQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod))
p.lock.Unlock()
}

// MoveAllToActiveQueue moves all pods from unschedulableQ to activeQ. This
Expand All @@ -441,9 +450,8 @@ func (p *PriorityQueue) MoveAllToActiveQueue() {
p.cond.Broadcast()
}

// NOTE: this function assumes lock has been acquired in caller
func (p *PriorityQueue) movePodsToActiveQueue(pods []*v1.Pod) {
p.lock.Lock()
defer p.lock.Unlock()
Huang-Wei marked this conversation as resolved.
Show resolved Hide resolved
for _, pod := range pods {
if err := p.activeQ.Add(pod); err == nil {
p.unschedulableQ.delete(pod)
Expand All @@ -457,9 +465,8 @@ func (p *PriorityQueue) movePodsToActiveQueue(pods []*v1.Pod) {

// getUnschedulablePodsWithMatchingAffinityTerm returns unschedulable pods which have
// any affinity term that matches "pod".
// NOTE: this function assumes lock has been acquired in caller.
func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod) []*v1.Pod {
p.lock.RLock()
defer p.lock.RUnlock()
var podsToMove []*v1.Pod
for _, up := range p.unschedulableQ.pods {
affinity := up.Spec.Affinity
Expand Down Expand Up @@ -516,6 +523,13 @@ func (p *PriorityQueue) Close() {
p.cond.Broadcast()
}

// DeleteNominatedPodIfExists deletes pod from internal cache if it's a nominatedPod
func (p *PriorityQueue) DeleteNominatedPodIfExists(pod *v1.Pod) {
p.lock.Lock()
p.deleteNominatedPodIfExists(pod)
p.lock.Unlock()
}

// UnschedulablePodsMap holds pods that cannot be scheduled. This data structure
// is used to implement unschedulableQ.
type UnschedulablePodsMap struct {
Expand Down
12 changes: 6 additions & 6 deletions pkg/scheduler/internal/queue/scheduling_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ func TestPriorityQueue_Add(t *testing.T) {
if p, err := q.Pop(); err != nil || p != &unschedulablePod {
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePod.Name, p.Name)
}
if len(q.nominatedPods) != 0 {
t.Errorf("Expected nomindatePods to be empty: %v", q.nominatedPods)
if len(q.nominatedPods["node1"]) != 2 {
Huang-Wei marked this conversation as resolved.
Show resolved Hide resolved
t.Errorf("Expected medPriorityPod and unschedulablePod to be still present in nomindatePods: %v", q.nominatedPods["node1"])
}
}

Expand All @@ -135,8 +135,8 @@ func TestPriorityQueue_AddIfNotPresent(t *testing.T) {
if p, err := q.Pop(); err != nil || p != &unschedulablePod {
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePod.Name, p.Name)
}
if len(q.nominatedPods) != 0 {
t.Errorf("Expected nomindatePods to be empty: %v", q.nominatedPods)
if len(q.nominatedPods["node1"]) != 2 {
t.Errorf("Expected medPriorityPod and unschedulablePod to be still present in nomindatePods: %v", q.nominatedPods["node1"])
}
if q.unschedulableQ.get(&highPriNominatedPod) != &highPriNominatedPod {
t.Errorf("Pod %v was not found in the unschedulableQ.", highPriNominatedPod.Name)
Expand Down Expand Up @@ -178,8 +178,8 @@ func TestPriorityQueue_Pop(t *testing.T) {
if p, err := q.Pop(); err != nil || p != &medPriorityPod {
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Name)
}
if len(q.nominatedPods) != 0 {
t.Errorf("Expected nomindatePods to be empty: %v", q.nominatedPods)
if len(q.nominatedPods["node1"]) != 1 {
t.Errorf("Expected medPriorityPod to be present in nomindatePods: %v", q.nominatedPods["node1"])
}
}()
q.Add(&medPriorityPod)
Expand Down
4 changes: 4 additions & 0 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,10 @@ func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
})
return err
}
// if "assumed" is a nominated pod, we should remove it from internal cache
if sched.config.SchedulingQueue != nil {
sched.config.SchedulingQueue.DeleteNominatedPodIfExists(assumed)
}

// Optimistically assume that the binding will succeed, so we need to invalidate affected
// predicates in equivalence cache.
Expand Down