Skip to content

Commit

Permalink
Merge pull request #71884 from Huang-Wei/automated-cherry-pick-of-#70…
Browse files Browse the repository at this point in the history
…898-#71281-upstream-release-1.11

Automated cherry pick of #70898: ensure scheduler preemptor behaves in an efficient/correct #71281: add an e2e test to verify preemption running path
  • Loading branch information
k8s-ci-robot committed Dec 10, 2018
2 parents 4600add + 64bc50a commit df34354
Show file tree
Hide file tree
Showing 7 changed files with 299 additions and 26 deletions.
6 changes: 3 additions & 3 deletions pkg/scheduler/core/generic_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v
// addNominatedPods adds pods with equal or greater priority which are nominated
// to run on the node given in nodeInfo to meta and nodeInfo. It returns 1) whether
// any pod was found, 2) augmented meta data, 3) augmented nodeInfo.
func addNominatedPods(podPriority int32, meta algorithm.PredicateMetadata,
func addNominatedPods(pod *v1.Pod, meta algorithm.PredicateMetadata,
nodeInfo *schedulercache.NodeInfo, queue SchedulingQueue) (bool, algorithm.PredicateMetadata,
*schedulercache.NodeInfo) {
if queue == nil || nodeInfo == nil || nodeInfo.Node() == nil {
Expand All @@ -435,7 +435,7 @@ func addNominatedPods(podPriority int32, meta algorithm.PredicateMetadata,
}
nodeInfoOut := nodeInfo.Clone()
for _, p := range nominatedPods {
if util.GetPodPriority(p) >= podPriority {
if util.GetPodPriority(p) >= util.GetPodPriority(pod) && p.UID != pod.UID {
nodeInfoOut.AddPod(p)
if metaOut != nil {
metaOut.AddPod(p, nodeInfoOut)
Expand Down Expand Up @@ -494,7 +494,7 @@ func podFitsOnNode(
metaToUse := meta
nodeInfoToUse := info
if i == 0 {
podsAdded, metaToUse, nodeInfoToUse = addNominatedPods(util.GetPodPriority(pod), meta, info, queue)
podsAdded, metaToUse, nodeInfoToUse = addNominatedPods(pod, meta, info, queue)
} else if !podsAdded || len(failedPredicates) != 0 {
break
}
Expand Down
26 changes: 20 additions & 6 deletions pkg/scheduler/core/scheduling_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ type SchedulingQueue interface {
AssignedPodUpdated(pod *v1.Pod)
WaitingPodsForNode(nodeName string) []*v1.Pod
WaitingPods() []*v1.Pod
// DeleteNominatedPodIfExists deletes nominatedPod from internal cache
DeleteNominatedPodIfExists(pod *v1.Pod)
}

// NewSchedulingQueue initializes a new scheduling queue. If pod priority is
Expand Down Expand Up @@ -144,6 +146,9 @@ func (f *FIFO) WaitingPodsForNode(nodeName string) []*v1.Pod {
return nil
}

// DeleteNominatedPodIfExists does nothing in FIFO.
func (f *FIFO) DeleteNominatedPodIfExists(pod *v1.Pod) {}

// NewFIFO creates a FIFO object.
func NewFIFO() *FIFO {
return &FIFO{FIFO: cache.NewFIFO(cache.MetaNamespaceKeyFunc)}
Expand Down Expand Up @@ -223,7 +228,7 @@ func (p *PriorityQueue) addNominatedPodIfNeeded(pod *v1.Pod) {
if len(nnn) > 0 {
for _, np := range p.nominatedPods[nnn] {
if np.UID == pod.UID {
glog.Errorf("Pod %v/%v already exists in the nominated map!", pod.Namespace, pod.Name)
glog.V(4).Infof("Pod %v/%v already exists in the nominated map!", pod.Namespace, pod.Name)
return
}
}
Expand All @@ -232,6 +237,7 @@ func (p *PriorityQueue) addNominatedPodIfNeeded(pod *v1.Pod) {
}

// deleteNominatedPodIfExists deletes a pod from the nominatedPods.
// NOTE: this function assumes lock has been acquired in caller.
func (p *PriorityQueue) deleteNominatedPodIfExists(pod *v1.Pod) {
nnn := NominatedNodeName(pod)
if len(nnn) > 0 {
Expand Down Expand Up @@ -340,7 +346,6 @@ func (p *PriorityQueue) Pop() (*v1.Pod, error) {
return nil, err
}
pod := obj.(*v1.Pod)
p.deleteNominatedPodIfExists(pod)
p.receivedMoveRequest = false
return pod, err
}
Expand Down Expand Up @@ -409,13 +414,17 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) error {
// AssignedPodAdded is called when a bound pod is added. Creation of this pod
// may make pending pods with matching affinity terms schedulable.
func (p *PriorityQueue) AssignedPodAdded(pod *v1.Pod) {
p.lock.Lock()
p.movePodsToActiveQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod))
p.lock.Unlock()
}

// AssignedPodUpdated is called when a bound pod is updated. Change of labels
// may make pending pods with matching affinity terms schedulable.
func (p *PriorityQueue) AssignedPodUpdated(pod *v1.Pod) {
p.lock.Lock()
p.movePodsToActiveQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod))
p.lock.Unlock()
}

// MoveAllToActiveQueue moves all pods from unschedulableQ to activeQ. This
Expand All @@ -439,9 +448,8 @@ func (p *PriorityQueue) MoveAllToActiveQueue() {
p.cond.Broadcast()
}

// NOTE: this function assumes lock has been acquired in caller
func (p *PriorityQueue) movePodsToActiveQueue(pods []*v1.Pod) {
p.lock.Lock()
defer p.lock.Unlock()
for _, pod := range pods {
if err := p.activeQ.Add(pod); err == nil {
p.unschedulableQ.delete(pod)
Expand All @@ -455,9 +463,8 @@ func (p *PriorityQueue) movePodsToActiveQueue(pods []*v1.Pod) {

// getUnschedulablePodsWithMatchingAffinityTerm returns unschedulable pods which have
// any affinity term that matches "pod".
// NOTE: this function assumes lock has been acquired in caller.
func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod) []*v1.Pod {
p.lock.RLock()
defer p.lock.RUnlock()
var podsToMove []*v1.Pod
for _, up := range p.unschedulableQ.pods {
affinity := up.Spec.Affinity
Expand Down Expand Up @@ -506,6 +513,13 @@ func (p *PriorityQueue) WaitingPods() []*v1.Pod {
return result
}

// DeleteNominatedPodIfExists deletes pod from internal cache if it's a nominatedPod
func (p *PriorityQueue) DeleteNominatedPodIfExists(pod *v1.Pod) {
p.lock.Lock()
p.deleteNominatedPodIfExists(pod)
p.lock.Unlock()
}

// UnschedulablePodsMap holds pods that cannot be scheduled. This data structure
// is used to implement unschedulableQ.
type UnschedulablePodsMap struct {
Expand Down
12 changes: 6 additions & 6 deletions pkg/scheduler/core/scheduling_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ func TestPriorityQueue_Add(t *testing.T) {
if p, err := q.Pop(); err != nil || p != &unschedulablePod {
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePod.Name, p.Name)
}
if len(q.nominatedPods) != 0 {
t.Errorf("Expected nomindatePods to be empty: %v", q.nominatedPods)
if len(q.nominatedPods["node1"]) != 2 {
t.Errorf("Expected medPriorityPod and unschedulablePod to be still present in nomindatePods: %v", q.nominatedPods["node1"])
}
}

Expand All @@ -136,8 +136,8 @@ func TestPriorityQueue_AddIfNotPresent(t *testing.T) {
if p, err := q.Pop(); err != nil || p != &unschedulablePod {
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePod.Name, p.Name)
}
if len(q.nominatedPods) != 0 {
t.Errorf("Expected nomindatePods to be empty: %v", q.nominatedPods)
if len(q.nominatedPods["node1"]) != 2 {
t.Errorf("Expected medPriorityPod and unschedulablePod to be still present in nomindatePods: %v", q.nominatedPods["node1"])
}
if q.unschedulableQ.get(&highPriNominatedPod) != &highPriNominatedPod {
t.Errorf("Pod %v was not found in the unschedulableQ.", highPriNominatedPod.Name)
Expand Down Expand Up @@ -179,8 +179,8 @@ func TestPriorityQueue_Pop(t *testing.T) {
if p, err := q.Pop(); err != nil || p != &medPriorityPod {
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Name)
}
if len(q.nominatedPods) != 0 {
t.Errorf("Expected nomindatePods to be empty: %v", q.nominatedPods)
if len(q.nominatedPods["node1"]) != 1 {
t.Errorf("Expected medPriorityPod to be present in nomindatePods: %v", q.nominatedPods["node1"])
}
}()
q.Add(&medPriorityPod)
Expand Down
7 changes: 4 additions & 3 deletions pkg/scheduler/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -1109,9 +1109,10 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
NextPod: func() *v1.Pod {
return c.getNextPod()
},
Error: c.MakeDefaultErrorFunc(podBackoff, c.podQueue),
StopEverything: c.StopEverything,
VolumeBinder: c.volumeBinder,
Error: c.MakeDefaultErrorFunc(podBackoff, c.podQueue),
StopEverything: c.StopEverything,
VolumeBinder: c.volumeBinder,
SchedulingQueue: c.podQueue,
}, nil
}

Expand Down
9 changes: 8 additions & 1 deletion pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"fmt"
"time"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -140,6 +140,9 @@ type Config struct {

// Disable pod preemption or not.
DisablePreemption bool

// SchedulingQueue holds pods to be scheduled
SchedulingQueue core.SchedulingQueue
}

// NewFromConfigurator returns a new scheduler that is created entirely by the Configurator. Assumes Create() is implemented.
Expand Down Expand Up @@ -394,6 +397,10 @@ func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
})
return err
}
// if "assumed" is a nominated pod, we should remove it from internal cache
if sched.config.SchedulingQueue != nil {
sched.config.SchedulingQueue.DeleteNominatedPodIfExists(assumed)
}

// Optimistically assume that the binding will succeed, so we need to invalidate affected
// predicates in equivalence cache.
Expand Down
3 changes: 3 additions & 0 deletions test/e2e/scheduling/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,22 @@ go_library(
"//vendor/github.com/onsi/gomega:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/google.golang.org/api/compute/v1:go_default_library",
"//vendor/k8s.io/api/apps/v1:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/extensions/v1beta1:go_default_library",
"//vendor/k8s.io/api/scheduling/v1beta1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
],
)

Expand Down
Loading

0 comments on commit df34354

Please sign in to comment.