Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #73309: Should move all unscheduable pods when we received move request to active queue #73567

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
66 changes: 45 additions & 21 deletions pkg/scheduler/core/scheduling_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,14 @@ import (
type SchedulingQueue interface {
Add(pod *v1.Pod) error
AddIfNotPresent(pod *v1.Pod) error
AddUnschedulableIfNotPresent(pod *v1.Pod) error
// AddUnschedulableIfNotPresent adds an unschedulable pod back to scheduling queue.
// The podSchedulingCycle represents the current scheduling cycle number which can be
// returned by calling SchedulingCycle().
AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingCycle int64) error
// SchedulingCycle returns the current number of scheduling cycle which is
// cached by scheduling queue. Normally, incrementing this number whenever
// a pod is popped (e.g. called Pop()) is enough.
SchedulingCycle() int64
Pop() (*v1.Pod, error)
Update(oldPod, newPod *v1.Pod) error
Delete(pod *v1.Pod) error
Expand Down Expand Up @@ -97,10 +104,15 @@ func (f *FIFO) AddIfNotPresent(pod *v1.Pod) error {

// AddUnschedulableIfNotPresent adds an unschedulable pod back to the queue. In
// FIFO it is added to the end of the queue.
func (f *FIFO) AddUnschedulableIfNotPresent(pod *v1.Pod) error {
func (f *FIFO) AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingCycle int64) error {
return f.FIFO.AddIfNotPresent(pod)
}

// SchedulingCycle implements SchedulingQueue.SchedulingCycle interface.
func (f *FIFO) SchedulingCycle() int64 {
return 0
}

// Update updates a pod in the FIFO.
func (f *FIFO) Update(oldPod, newPod *v1.Pod) error {
return f.FIFO.Update(newPod)
Expand Down Expand Up @@ -191,12 +203,14 @@ type PriorityQueue struct {
// nominatedPods is a structures that stores pods which are nominated to run
// on nodes.
nominatedPods *nominatedPodMap
// receivedMoveRequest is set to true whenever we receive a request to move a
// pod from the unschedulableQ to the activeQ, and is set to false, when we pop
// a pod from the activeQ. It indicates if we received a move request when a
// pod was in flight (we were trying to schedule it). In such a case, we put
// the pod back into the activeQ if it is determined unschedulable.
receivedMoveRequest bool
// schedulingCycle represents sequence number of scheduling cycle and is incremented
// when a pod is popped.
schedulingCycle int64
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing moveRequestCycle here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed, thanks!

// moveRequestCycle caches the sequence number of scheduling cycle when we
// received a move request. Unscheduable pods in and before this scheduling
// cycle will be put back to activeQueue if we were trying to schedule them
// when we received move request.
moveRequestCycle int64
}

// Making sure that PriorityQueue implements SchedulingQueue.
Expand Down Expand Up @@ -229,9 +243,10 @@ func activeQComp(pod1, pod2 interface{}) bool {
// NewPriorityQueue creates a PriorityQueue object.
func NewPriorityQueue() *PriorityQueue {
pq := &PriorityQueue{
activeQ: newHeap(cache.MetaNamespaceKeyFunc, activeQComp),
unschedulableQ: newUnschedulablePodsMap(),
nominatedPods: newNominatedPodMap(),
activeQ: newHeap(cache.MetaNamespaceKeyFunc, activeQComp),
unschedulableQ: newUnschedulablePodsMap(),
nominatedPods: newNominatedPodMap(),
moveRequestCycle: -1,
}
pq.cond.L = &pq.lock
return pq
Expand Down Expand Up @@ -282,10 +297,19 @@ func isPodUnschedulable(pod *v1.Pod) bool {
return cond != nil && cond.Status == v1.ConditionFalse && cond.Reason == v1.PodReasonUnschedulable
}

// AddUnschedulableIfNotPresent does nothing if the pod is present in either
// queue. Otherwise it adds the pod to the unschedulable queue if
// p.receivedMoveRequest is false, and to the activeQ if p.receivedMoveRequest is true.
func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod) error {
// SchedulingCycle returns current scheduling cycle.
func (p *PriorityQueue) SchedulingCycle() int64 {
p.lock.RLock()
defer p.lock.RUnlock()
return p.schedulingCycle
}

// AddUnschedulableIfNotPresent does nothing if the pod is present in any
// queue. If pod is unschedulable, it adds pod to unschedulable queue if
// p.moveRequestCycle > podSchedulingCycle or to backoff queue if p.moveRequestCycle
// <= podSchedulingCycle but pod is subject to backoff. In other cases, it adds pod to
// active queue.
func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingCycle int64) error {
p.lock.Lock()
defer p.lock.Unlock()
if p.unschedulableQ.get(pod) != nil {
Expand All @@ -294,7 +318,7 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod) error {
if _, exists, _ := p.activeQ.Get(pod); exists {
return fmt.Errorf("pod is already present in the activeQ")
}
if !p.receivedMoveRequest && isPodUnschedulable(pod) {
if podSchedulingCycle > p.moveRequestCycle && isPodUnschedulable(pod) {
p.unschedulableQ.addOrUpdate(pod)
p.nominatedPods.add(pod, "")
return nil
Expand All @@ -308,8 +332,8 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod) error {
}

// Pop removes the head of the active queue and returns it. It blocks if the
// activeQ is empty and waits until a new item is added to the queue. It also
// clears receivedMoveRequest to mark the beginning of a new scheduling cycle.
// activeQ is empty and waits until a new item is added to the queue. It
// increments scheduling cycle when a pod is popped.
func (p *PriorityQueue) Pop() (*v1.Pod, error) {
p.lock.Lock()
defer p.lock.Unlock()
Expand All @@ -321,7 +345,7 @@ func (p *PriorityQueue) Pop() (*v1.Pod, error) {
return nil, err
}
pod := obj.(*v1.Pod)
p.receivedMoveRequest = false
p.schedulingCycle++
return pod, err
}

Expand Down Expand Up @@ -419,7 +443,7 @@ func (p *PriorityQueue) MoveAllToActiveQueue() {
}
}
p.unschedulableQ.clear()
p.receivedMoveRequest = true
p.moveRequestCycle = p.schedulingCycle
p.cond.Broadcast()
}

Expand All @@ -432,7 +456,7 @@ func (p *PriorityQueue) movePodsToActiveQueue(pods []*v1.Pod) {
glog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err)
}
}
p.receivedMoveRequest = true
p.moveRequestCycle = p.schedulingCycle
p.cond.Broadcast()
}

Expand Down
84 changes: 78 additions & 6 deletions pkg/scheduler/core/scheduling_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,9 @@ func TestPriorityQueue_AddIfNotPresent(t *testing.T) {
func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) {
q := NewPriorityQueue()
q.Add(&highPriNominatedPod)
q.AddUnschedulableIfNotPresent(&highPriNominatedPod) // Must not add anything.
q.AddUnschedulableIfNotPresent(&medPriorityPod) // This should go to activeQ.
q.AddUnschedulableIfNotPresent(&unschedulablePod)
q.AddUnschedulableIfNotPresent(&highPriNominatedPod, q.SchedulingCycle()) // Must not add anything.
q.AddUnschedulableIfNotPresent(&medPriorityPod, q.SchedulingCycle()) // This should go to activeQ.
q.AddUnschedulableIfNotPresent(&unschedulablePod, q.SchedulingCycle())
expectedNominatedPods := &nominatedPodMap{
nominatedPodToNode: map[types.UID]string{
medPriorityPod.UID: "node1",
Expand Down Expand Up @@ -189,6 +189,78 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) {
}
}

// TestPriorityQueue_AddUnschedulableIfNotPresent_Async tests scenario when
// AddUnschedulableIfNotPresent is called asynchronously pods in and before
// current scheduling cycle will be put back to activeQueue if we were trying
// to schedule them when we received move request.
func TestPriorityQueue_AddUnschedulableIfNotPresent_Async(t *testing.T) {
q := NewPriorityQueue()
totalNum := 10
expectedPods := make([]v1.Pod, 0, totalNum)
for i := 0; i < totalNum; i++ {
priority := int32(i)
p := v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("pod%d", i),
Namespace: fmt.Sprintf("ns%d", i),
UID: types.UID(fmt.Sprintf("upns%d", i)),
},
Spec: v1.PodSpec{
Priority: &priority,
},
}
expectedPods = append(expectedPods, p)
// priority is to make pods ordered in the PriorityQueue
q.Add(&p)
}

// Pop all pods except for the first one
for i := totalNum - 1; i > 0; i-- {
p, _ := q.Pop()
if !reflect.DeepEqual(&expectedPods[i], p) {
t.Errorf("Unexpected pod. Expected: %v, got: %v", &expectedPods[i], p)
}
}

// move all pods to active queue when we were trying to schedule them
q.MoveAllToActiveQueue()
moveReqChan := make(chan struct{})
var wg sync.WaitGroup
wg.Add(totalNum - 1)
// mark pods[1] ~ pods[totalNum-1] as unschedulable, fire goroutines to add them back later
for i := 1; i < totalNum; i++ {
unschedulablePod := expectedPods[i].DeepCopy()
unschedulablePod.Status = v1.PodStatus{
Conditions: []v1.PodCondition{
{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: v1.PodReasonUnschedulable,
},
},
}
cycle := q.SchedulingCycle()
go func() {
<-moveReqChan
q.AddUnschedulableIfNotPresent(unschedulablePod, cycle)
wg.Done()
}()
}
firstPod, _ := q.Pop()
if !reflect.DeepEqual(&expectedPods[0], firstPod) {
t.Errorf("Unexpected pod. Expected: %v, got: %v", &expectedPods[0], firstPod)
}
// close moveReqChan here to make sure q.AddUnschedulableIfNotPresent is called after another pod is popped
close(moveReqChan)
wg.Wait()
// all other pods should be in active queue again
for i := 1; i < totalNum; i++ {
if _, exists, _ := q.activeQ.Get(&expectedPods[i]); !exists {
t.Errorf("Expected %v to be added to activeQ.", expectedPods[i].Name)
}
}
}

func TestPriorityQueue_Pop(t *testing.T) {
q := NewPriorityQueue()
wg := sync.WaitGroup{}
Expand Down Expand Up @@ -603,7 +675,7 @@ func TestRecentlyTriedPodsGoBack(t *testing.T) {
LastProbeTime: metav1.Now(),
})
// Put in the unschedulable queue.
q.AddUnschedulableIfNotPresent(p1)
q.AddUnschedulableIfNotPresent(p1, q.SchedulingCycle())
// Move all unschedulable pods to the active queue.
q.MoveAllToActiveQueue()
// Simulation is over. Now let's pop all pods. The pod popped first should be
Expand Down Expand Up @@ -651,7 +723,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) {
LastProbeTime: metav1.Now(),
})
// Put in the unschedulable queue
q.AddUnschedulableIfNotPresent(&unschedulablePod)
q.AddUnschedulableIfNotPresent(&unschedulablePod, q.SchedulingCycle())
// Move all unschedulable pods to the active queue.
q.MoveAllToActiveQueue()

Expand Down Expand Up @@ -692,7 +764,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) {
LastProbeTime: metav1.Now(),
})
// And then, put unschedulable pod to the unschedulable queue
q.AddUnschedulableIfNotPresent(&unschedulablePod)
q.AddUnschedulableIfNotPresent(&unschedulablePod, q.SchedulingCycle())
// Move all unschedulable pods to the active queue.
q.MoveAllToActiveQueue()

Expand Down
3 changes: 2 additions & 1 deletion pkg/scheduler/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -1485,6 +1485,7 @@ func (c *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue
}

backoff.Gc()
podSchedulingCycle := podQueue.SchedulingCycle()
// Retry asynchronously.
// Note that this is extremely rudimentary and we need a more real error handling path.
go func() {
Expand Down Expand Up @@ -1512,7 +1513,7 @@ func (c *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue
pod, err := c.client.CoreV1().Pods(podID.Namespace).Get(podID.Name, metav1.GetOptions{})
if err == nil {
if len(pod.Spec.NodeName) == 0 {
podQueue.AddUnschedulableIfNotPresent(pod)
podQueue.AddUnschedulableIfNotPresent(pod, podSchedulingCycle)
} else {
if c.volumeBinder != nil {
// Volume binder only wants to keep unassigned pods
Expand Down