Skip to content

Commit

Permalink
scheduler: revert "Filter gated pods before calling isPodWorthRequeue…
Browse files Browse the repository at this point in the history
…ing"

The main part of PR kubernetes#124618 was
adding this if check:

pkg/scheduler/internal/queue/scheduling_queue.go:

    movePodsToActiveOrBackoffQueue

         if pInfo.Gated {
              continue
         }

This was supposed to shortcut expensive work. But if a pod is gated because a
plugin's PreEnqueue return false, then the event that caused
movePodsToActiveOrBackoffQueue to be called must not be ignored for the
pod. PreEnqueue has to be called for the pod again to check whether it is
now scheduleable.

This affects DRA when using claim templates. This is independent from using
classic DRA or structured parameters, in both cases a pod gets created, then
the claim, and pod scheduling can only start once the claim exists.

Depending on timing, the scheduler sees the pod update (because the claim name
is recorded in status) or the claim add first. If it first sees the pod update,
the pod gets stuck because the claim is still unknown. Then when the claim add
event is processed, the pod gets skipped because of the check above and remains
stuck.
  • Loading branch information
pohly committed Jun 15, 2024
1 parent c3689b9 commit c06272c
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 73 deletions.
5 changes: 0 additions & 5 deletions pkg/scheduler/internal/queue/scheduling_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -1193,11 +1193,6 @@ func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(logger klog.Logger, podIn

activated := false
for _, pInfo := range podInfoList {
// Since there may be many gated pods and they will not move from the
// unschedulable pool, we skip calling the expensive isPodWorthRequeueing.
if pInfo.Gated {
continue
}
schedulingHint := p.isPodWorthRequeuing(logger, pInfo, event, oldObj, newObj)
if schedulingHint == queueSkip {
// QueueingHintFn determined that this Pod isn't worth putting to activeQ or backoffQ by this event.
Expand Down
15 changes: 1 addition & 14 deletions pkg/scheduler/internal/queue/scheduling_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,6 @@ var (
}
)

func setQueuedPodInfoGated(queuedPodInfo *framework.QueuedPodInfo) *framework.QueuedPodInfo {
queuedPodInfo.Gated = true
return queuedPodInfo
}

func getUnschedulablePod(p *PriorityQueue, pod *v1.Pod) *v1.Pod {
pInfo := p.unschedulablePods.get(pod)
if pInfo != nil {
Expand Down Expand Up @@ -1498,14 +1493,6 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithQueueingHint(t *testing.
hint: queueHintReturnSkip,
expectedQ: unschedulablePods,
},
{
name: "QueueHintFunction is not called when Pod is gated",
podInfo: setQueuedPodInfoGated(&framework.QueuedPodInfo{PodInfo: mustNewPodInfo(p), UnschedulablePlugins: sets.New("foo")}),
hint: func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
return framework.Queue, fmt.Errorf("QueueingHintFn should not be called as pod is gated")
},
expectedQ: unschedulablePods,
},
}

for _, test := range tests {
Expand Down Expand Up @@ -2787,7 +2774,7 @@ func TestPendingPodsMetric(t *testing.T) {
gated := makeQueuedPodInfos(total-queueableNum, "y", failme, timestamp)
// Manually mark them as gated=true.
for _, pInfo := range gated {
setQueuedPodInfoGated(pInfo)
pInfo.Gated = true
}
pInfos = append(pInfos, gated...)
totalWithDelay := 20
Expand Down
78 changes: 24 additions & 54 deletions test/integration/scheduler/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,27 +57,27 @@ import (

func TestSchedulingGates(t *testing.T) {
tests := []struct {
name string
pods []*v1.Pod
schedule []string
delete []string
rmGates []string
name string
pods []*v1.Pod
want []string
rmPodsSchedulingGates []int
wantPostGatesRemoval []string
}{
{
name: "regular pods",
pods: []*v1.Pod{
st.MakePod().Name("p1").Container("pause").Obj(),
st.MakePod().Name("p2").Container("pause").Obj(),
},
schedule: []string{"p1", "p2"},
want: []string{"p1", "p2"},
},
{
name: "one pod carrying scheduling gates",
pods: []*v1.Pod{
st.MakePod().Name("p1").SchedulingGates([]string{"foo"}).Container("pause").Obj(),
st.MakePod().Name("p2").Container("pause").Obj(),
},
schedule: []string{"p2"},
want: []string{"p2"},
},
{
name: "two pod carrying scheduling gates, and remove gates of one pod",
Expand All @@ -86,18 +86,9 @@ func TestSchedulingGates(t *testing.T) {
st.MakePod().Name("p2").SchedulingGates([]string{"bar"}).Container("pause").Obj(),
st.MakePod().Name("p3").Container("pause").Obj(),
},
schedule: []string{"p3"},
rmGates: []string{"p2"},
},
{
name: "gated pod schedulable after deleting the scheduled pod and removing gate",
pods: []*v1.Pod{
st.MakePod().Name("p1").SchedulingGates([]string{"foo"}).Container("pause").Obj(),
st.MakePod().Name("p2").Container("pause").Obj(),
},
schedule: []string{"p2"},
delete: []string{"p2"},
rmGates: []string{"p1"},
want: []string{"p3"},
rmPodsSchedulingGates: []int{1}, // remove gates of 'p2'
wantPostGatesRemoval: []string{"p2"},
},
}

Expand All @@ -116,15 +107,6 @@ func TestSchedulingGates(t *testing.T) {
testutils.SyncSchedulerInformerFactory(testCtx)

cs, ns, ctx := testCtx.ClientSet, testCtx.NS.Name, testCtx.Ctx

// Create node, so we can schedule pods.
node := st.MakeNode().Name("node").Obj()
if _, err := cs.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}); err != nil {
t.Fatal("Failed to create node")

}

// Create pods.
for _, p := range tt.pods {
p.Namespace = ns
if _, err := cs.CoreV1().Pods(ns).Create(ctx, p, metav1.CreateOptions{}); err != nil {
Expand All @@ -140,42 +122,30 @@ func TestSchedulingGates(t *testing.T) {
t.Fatal(err)
}

// Schedule pods.
for _, podName := range tt.schedule {
testCtx.Scheduler.ScheduleOne(testCtx.Ctx)
if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, testutils.PodScheduled(cs, ns, podName)); err != nil {
t.Fatalf("Failed to schedule %s", podName)
}
}

// Delete pods, which triggers AssignedPodDelete event in the scheduling queue.
for _, podName := range tt.delete {
if err := cs.CoreV1().Pods(ns).Delete(ctx, podName, metav1.DeleteOptions{}); err != nil {
t.Fatalf("Error calling Delete on %s", podName)
}
if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, testutils.PodDeleted(ctx, cs, ns, podName)); err != nil {
t.Fatalf("Failed to delete %s", podName)
// Pop the expected pods out. They should be de-queueable.
for _, wantPod := range tt.want {
podInfo := testutils.NextPodOrDie(t, testCtx)
if got := podInfo.Pod.Name; got != wantPod {
t.Errorf("Want %v to be popped out, but got %v", wantPod, got)
}
}

// Ensure gated pods are not in ActiveQ
if len(testCtx.Scheduler.SchedulingQueue.PodsInActiveQ()) > 0 {
t.Fatal("Expected no schedulable pods")
if len(tt.rmPodsSchedulingGates) == 0 {
return
}

// Remove scheduling gates from the pod spec.
for _, podName := range tt.rmGates {
for _, idx := range tt.rmPodsSchedulingGates {
patch := `{"spec": {"schedulingGates": null}}`
podName := tt.pods[idx].Name
if _, err := cs.CoreV1().Pods(ns).Patch(ctx, podName, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}); err != nil {
t.Fatalf("Failed to patch pod %v: %v", podName, err)
}
}

// Schedule pods which no longer have gates.
for _, podName := range tt.rmGates {
testCtx.Scheduler.ScheduleOne(testCtx.Ctx)
if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, testutils.PodScheduled(cs, ns, podName)); err != nil {
t.Fatalf("Failed to schedule %s", podName)
// Pop the expected pods out. They should be de-queueable.
for _, wantPod := range tt.wantPostGatesRemoval {
podInfo := testutils.NextPodOrDie(t, testCtx)
if got := podInfo.Pod.Name; got != wantPod {
t.Errorf("Want %v to be popped out, but got %v", wantPod, got)
}
}
})
Expand Down

0 comments on commit c06272c

Please sign in to comment.