diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index 5332e00d6616..4e3cf6335978 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -441,6 +441,9 @@ func (p *PriorityQueue) activate(pod *v1.Pod) bool { // isPodBackingoff returns true if a pod is still waiting for its backoff timer. // If this returns true, the pod should not be re-tried. func (p *PriorityQueue) isPodBackingoff(podInfo *framework.QueuedPodInfo) bool { + if podInfo.Gated { + return false + } boTime := p.getBackoffTime(podInfo) return boTime.After(p.clock.Now()) } diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index 95062a43274f..e90ef8917730 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -512,14 +512,24 @@ func TestPriorityQueue_addToActiveQ(t *testing.T) { defer cancel() m := map[string][]framework.PreEnqueuePlugin{"": tt.plugins} - q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), []runtime.Object{tt.pod}, WithPreEnqueuePluginMap(m)) - got, _ := q.addToActiveQ(newQueuedPodInfoForLookup(tt.pod)) + q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), []runtime.Object{tt.pod}, WithPreEnqueuePluginMap(m), + WithPodInitialBackoffDuration(time.Second*30), WithPodMaxBackoffDuration(time.Second*60)) + got, _ := q.addToActiveQ(q.newQueuedPodInfo(tt.pod)) if got != tt.wantSuccess { t.Errorf("Unexpected result: want %v, but got %v", tt.wantSuccess, got) } if tt.wantUnschedulablePods != len(q.unschedulablePods.podInfoMap) { t.Errorf("Unexpected unschedulablePods: want %v, but got %v", tt.wantUnschedulablePods, len(q.unschedulablePods.podInfoMap)) } + + // Simulate an update event. + clone := tt.pod.DeepCopy() + metav1.SetMetaDataAnnotation(&clone.ObjectMeta, "foo", "") + q.Update(tt.pod, clone) + // Ensure the pod is still located in unschedulablePods. + if tt.wantUnschedulablePods != len(q.unschedulablePods.podInfoMap) { + t.Errorf("Unexpected unschedulablePods: want %v, but got %v", tt.wantUnschedulablePods, len(q.unschedulablePods.podInfoMap)) + } }) } } diff --git a/test/e2e/framework/pod/wait.go b/test/e2e/framework/pod/wait.go index 44d77b197fb0..e4e8de51829d 100644 --- a/test/e2e/framework/pod/wait.go +++ b/test/e2e/framework/pod/wait.go @@ -21,6 +21,7 @@ import ( "context" "errors" "fmt" + "reflect" "text/tabwriter" "time" @@ -253,7 +254,7 @@ func WaitForPodsRunningReady(c clientset.Interface, ns string, minPods, allowedN framework.Logf("Pod %s is Failed, but it's not controlled by a controller", pod.ObjectMeta.Name) badPods = append(badPods, pod) } - //ignore failed pods that are controlled by some controller + // ignore failed pods that are controlled by some controller } } @@ -326,7 +327,7 @@ func WaitForPodCondition(c clientset.Interface, ns, podName, conditionDesc strin return maybeTimeoutError(err, "waiting for pod %s to be %s", podIdentifier(ns, podName), conditionDesc) } -// WaitForPodsCondition waits for the listed pods to match the given condition. +// WaitForAllPodsCondition waits for the listed pods to match the given condition. // To succeed, at least minPods must be listed, and all listed pods must match the condition. func WaitForAllPodsCondition(c clientset.Interface, ns string, opts metav1.ListOptions, minPods int, conditionDesc string, timeout time.Duration, condition podCondition) (*v1.PodList, error) { framework.Logf("Waiting up to %v for at least %d pods in namespace %s to be %s", timeout, minPods, ns, conditionDesc) @@ -362,6 +363,78 @@ func WaitForAllPodsCondition(c clientset.Interface, ns string, opts metav1.ListO return pods, maybeTimeoutError(err, "waiting for at least %d pods to be %s (matched %d)", minPods, conditionDesc, matched) } +// WaitForPodsRunning waits for a given `timeout` to evaluate if a certain amount of pods in given `ns` are running. +func WaitForPodsRunning(c clientset.Interface, ns string, num int, timeout time.Duration) error { + matched := 0 + err := wait.PollImmediate(poll, timeout, func() (done bool, err error) { + pods, err := c.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return handleWaitingAPIError(err, true, "listing pods") + } + matched = 0 + for _, pod := range pods.Items { + if ready, _ := testutils.PodRunningReady(&pod); ready { + matched++ + } + } + if matched == num { + return true, nil + } + framework.Logf("expect %d pods are running, but got %v", num, matched) + return false, nil + }) + return maybeTimeoutError(err, "waiting for pods to be running (want %v, matched %d)", num, matched) +} + +// WaitForPodsSchedulingGated waits for a given `timeout` to evaluate if a certain amount of pods in given `ns` stay in scheduling gated state. +func WaitForPodsSchedulingGated(c clientset.Interface, ns string, num int, timeout time.Duration) error { + matched := 0 + err := wait.PollImmediate(poll, timeout, func() (done bool, err error) { + pods, err := c.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return handleWaitingAPIError(err, true, "listing pods") + } + matched = 0 + for _, pod := range pods.Items { + for _, condition := range pod.Status.Conditions { + if condition.Type == v1.PodScheduled && condition.Reason == v1.PodReasonSchedulingGated { + matched++ + } + } + } + if matched == num { + return true, nil + } + framework.Logf("expect %d pods in scheduling gated state, but got %v", num, matched) + return false, nil + }) + return maybeTimeoutError(err, "waiting for pods to be scheduling gated (want %d, matched %d)", num, matched) +} + +// WaitForPodsWithSchedulingGates waits for a given `timeout` to evaluate if a certain amount of pods in given `ns` +// match the given `schedulingGates`stay in scheduling gated state. +func WaitForPodsWithSchedulingGates(c clientset.Interface, ns string, num int, timeout time.Duration, schedulingGates []v1.PodSchedulingGate) error { + matched := 0 + err := wait.PollImmediate(poll, timeout, func() (done bool, err error) { + pods, err := c.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return handleWaitingAPIError(err, true, "listing pods") + } + matched = 0 + for _, pod := range pods.Items { + if reflect.DeepEqual(pod.Spec.SchedulingGates, schedulingGates) { + matched++ + } + } + if matched == num { + return true, nil + } + framework.Logf("expect %d pods carry the expected scheduling gates, but got %v", num, matched) + return false, nil + }) + return maybeTimeoutError(err, "waiting for pods to carry the expected scheduling gates (want %d, matched %d)", num, matched) +} + // WaitForPodTerminatedInNamespace returns an error if it takes too long for the pod to terminate, // if the pod Get api returns an error (IsNotFound or other), or if the pod failed (and thus did not // terminate) with an unexpected reason. Typically called to test that the passed-in pod is fully diff --git a/test/e2e/scheduling/predicates.go b/test/e2e/scheduling/predicates.go index 93957429d3ba..10204c9a552c 100644 --- a/test/e2e/scheduling/predicates.go +++ b/test/e2e/scheduling/predicates.go @@ -18,6 +18,7 @@ package scheduling import ( "context" + "encoding/json" "fmt" "time" @@ -25,8 +26,10 @@ import ( nodev1 "k8s.io/api/node/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/util/uuid" utilversion "k8s.io/apimachinery/pkg/util/version" clientset "k8s.io/client-go/kubernetes" @@ -70,6 +73,7 @@ type pausePodConfig struct { PriorityClassName string DeletionGracePeriodSeconds *int64 TopologySpreadConstraints []v1.TopologySpreadConstraint + SchedulingGates []v1.PodSchedulingGate } var _ = SIGDescribe("SchedulerPredicates [Serial]", func() { @@ -799,8 +803,75 @@ var _ = SIGDescribe("SchedulerPredicates [Serial]", func() { framework.ExpectEqual(numInNode2, expected, fmt.Sprintf("Pods are not distributed as expected on node %q", nodeNames[1])) }) }) + + ginkgo.It("validates Pods with non-empty schedulingGates are blocked on scheduling [Feature:PodSchedulingReadiness] [alpha]", func() { + podLabel := "e2e-scheduling-gates" + replicas := 3 + ginkgo.By(fmt.Sprintf("Creating a ReplicaSet with replicas=%v, carrying scheduling gates [foo bar]", replicas)) + rsConfig := pauseRSConfig{ + Replicas: int32(replicas), + PodConfig: pausePodConfig{ + Name: podLabel, + Namespace: ns, + Labels: map[string]string{podLabel: ""}, + SchedulingGates: []v1.PodSchedulingGate{ + {Name: "foo"}, + {Name: "bar"}, + }, + }, + } + createPauseRS(f, rsConfig) + + ginkgo.By("Expect all pods stay in pending state") + podList, err := e2epod.WaitForNumberOfPods(cs, ns, replicas, time.Minute) + framework.ExpectNoError(err) + framework.ExpectNoError(e2epod.WaitForPodsSchedulingGated(cs, ns, replicas, time.Minute)) + + ginkgo.By("Remove one scheduling gate") + want := []v1.PodSchedulingGate{{Name: "bar"}} + var pods []*v1.Pod + for _, pod := range podList.Items { + clone := pod.DeepCopy() + clone.Spec.SchedulingGates = want + live, err := patchPod(cs, &pod, clone) + framework.ExpectNoError(err) + pods = append(pods, live) + } + + ginkgo.By("Expect all pods carry one scheduling gate and are still in pending state") + framework.ExpectNoError(e2epod.WaitForPodsWithSchedulingGates(cs, ns, replicas, time.Minute, want)) + framework.ExpectNoError(e2epod.WaitForPodsSchedulingGated(cs, ns, replicas, time.Minute)) + + ginkgo.By("Remove the remaining scheduling gates") + for _, pod := range pods { + clone := pod.DeepCopy() + clone.Spec.SchedulingGates = nil + _, err := patchPod(cs, pod, clone) + framework.ExpectNoError(err) + } + + ginkgo.By("Expect all pods are scheduled and running") + framework.ExpectNoError(e2epod.WaitForPodsRunning(cs, ns, replicas, time.Minute)) + }) }) +func patchPod(cs clientset.Interface, old, new *v1.Pod) (*v1.Pod, error) { + oldData, err := json.Marshal(old) + if err != nil { + return nil, err + } + + newData, err := json.Marshal(new) + if err != nil { + return nil, err + } + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &v1.Pod{}) + if err != nil { + return nil, fmt.Errorf("failed to create merge patch for Pod %q: %v", old.Name, err) + } + return cs.CoreV1().Pods(new.Namespace).Patch(context.TODO(), new.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}) +} + // printAllPodsOnNode outputs status of all kubelet pods into log. func printAllPodsOnNode(c clientset.Interface, nodeName string) { podList, err := c.CoreV1().Pods(metav1.NamespaceAll).List(context.TODO(), metav1.ListOptions{FieldSelector: "spec.nodeName=" + nodeName}) @@ -844,6 +915,7 @@ func initPausePod(f *framework.Framework, conf pausePodConfig) *v1.Pod { Tolerations: conf.Tolerations, PriorityClassName: conf.PriorityClassName, TerminationGracePeriodSeconds: &gracePeriod, + SchedulingGates: conf.SchedulingGates, }, } for key, value := range conf.Labels { diff --git a/test/integration/scheduler/plugins/plugins_test.go b/test/integration/scheduler/plugins/plugins_test.go index 80e6865c8af7..8760a47817f5 100644 --- a/test/integration/scheduler/plugins/plugins_test.go +++ b/test/integration/scheduler/plugins/plugins_test.go @@ -31,9 +31,12 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/util/feature" clientset "k8s.io/client-go/kubernetes" listersv1 "k8s.io/client-go/listers/core/v1" + featuregatetesting "k8s.io/component-base/featuregate/testing" configv1 "k8s.io/kube-scheduler/config/v1" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler" schedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config" configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing" @@ -57,9 +60,15 @@ var ( podSchedulingError = testutils.PodSchedulingError createAndWaitForNodesInCache = testutils.CreateAndWaitForNodesInCache waitForPodUnschedulable = testutils.WaitForPodUnschedulable + waitForPodSchedulingGated = testutils.WaitForPodSchedulingGated waitForPodToScheduleWithTimeout = testutils.WaitForPodToScheduleWithTimeout ) +type PreEnqueuePlugin struct { + called int32 + admit bool +} + type PreFilterPlugin struct { numPreFilterCalled int failPreFilter bool @@ -146,6 +155,7 @@ type PermitPlugin struct { } const ( + enqueuePluginName = "enqueue-plugin" prefilterPluginName = "prefilter-plugin" postfilterPluginName = "postfilter-plugin" scorePluginName = "score-plugin" @@ -158,6 +168,7 @@ const ( permitPluginName = "permit-plugin" ) +var _ framework.PreEnqueuePlugin = &PreEnqueuePlugin{} var _ framework.PreFilterPlugin = &PreFilterPlugin{} var _ framework.PostFilterPlugin = &PostFilterPlugin{} var _ framework.ScorePlugin = &ScorePlugin{} @@ -184,6 +195,18 @@ func newPlugin(plugin framework.Plugin) frameworkruntime.PluginFactory { } } +func (ep *PreEnqueuePlugin) Name() string { + return enqueuePluginName +} + +func (ep *PreEnqueuePlugin) PreEnqueue(ctx context.Context, p *v1.Pod) *framework.Status { + ep.called++ + if ep.admit { + return nil + } + return framework.NewStatus(framework.UnschedulableAndUnresolvable, "not ready for scheduling") +} + // Name returns name of the score plugin. func (sp *ScorePlugin) Name() string { return scorePluginName @@ -2089,6 +2112,72 @@ func TestPreScorePlugin(t *testing.T) { } } +// TestPreEnqueuePlugin tests invocation of enqueue plugins. +func TestPreEnqueuePlugin(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodSchedulingReadiness, true)() + + // Create a plugin registry for testing. Register only a filter plugin. + enqueuePlugin := &PreEnqueuePlugin{} + // Plumb a preFilterPlugin to verify if it's called or not. + preFilterPlugin := &PreFilterPlugin{} + registry, prof := initRegistryAndConfig(t, enqueuePlugin, preFilterPlugin) + + // Create the API server and the scheduler with the test plugin set. + testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "enqueue-plugin", nil), 1, + scheduler.WithProfiles(prof), + scheduler.WithFrameworkOutOfTreeRegistry(registry)) + defer testutils.CleanupTest(t, testCtx) + + tests := []struct { + name string + pod *v1.Pod + admitEnqueue bool + }{ + { + name: "pod is admitted to enqueue", + pod: st.MakePod().Name("p").Namespace(testCtx.NS.Name).Container("pause").Obj(), + admitEnqueue: true, + }, + { + name: "pod is not admitted to enqueue", + pod: st.MakePod().Name("p").Namespace(testCtx.NS.Name).SchedulingGates([]string{"foo"}).Container("pause").Obj(), + admitEnqueue: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + enqueuePlugin.admit = tt.admitEnqueue + // Create a best effort pod. + pod, err := createPausePod(testCtx.ClientSet, tt.pod) + if err != nil { + t.Errorf("Error while creating a test pod: %v", err) + } + + if tt.admitEnqueue { + if err := waitForPodToScheduleWithTimeout(testCtx.ClientSet, pod, 10*time.Second); err != nil { + t.Errorf("Expected the pod to be schedulable, but got: %v", err) + } + // Also verify enqueuePlugin is called. + if enqueuePlugin.called == 0 { + t.Errorf("Expected the enqueuePlugin plugin to be called at least once, but got 0") + } + } else { + if err := waitForPodSchedulingGated(testCtx.ClientSet, pod, 10*time.Second); err != nil { + t.Errorf("Expected the pod to be scheduling waiting, but got: %v", err) + } + // Also verify preFilterPlugin is not called. + if preFilterPlugin.numPreFilterCalled != 0 { + t.Errorf("Expected the preFilter plugin not to be called, but got %v", preFilterPlugin.numPreFilterCalled) + } + } + + preFilterPlugin.reset() + testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod}) + }) + } +} + // TestPreemptWithPermitPlugin tests preempt with permit plugins. // It verifies how waitingPods behave in different scenarios: // - when waitingPods get preempted @@ -2450,6 +2539,8 @@ func initRegistryAndConfig(t *testing.T, plugins ...framework.Plugin) (framework plugin := configv1.Plugin{Name: p.Name()} switch p.(type) { + case *PreEnqueuePlugin: + pls.PreEnqueue.Enabled = append(pls.PreEnqueue.Enabled, plugin) case *PreFilterPlugin: pls.PreFilter.Enabled = append(pls.PreFilter.Enabled, plugin) case *FilterPlugin: diff --git a/test/integration/scheduler/queue_test.go b/test/integration/scheduler/queue_test.go index 338b61b3f9e2..4227f843028b 100644 --- a/test/integration/scheduler/queue_test.go +++ b/test/integration/scheduler/queue_test.go @@ -30,12 +30,16 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" + featuregatetesting "k8s.io/component-base/featuregate/testing" configv1 "k8s.io/kube-scheduler/config/v1" apiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler" configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing" "k8s.io/kubernetes/pkg/scheduler/framework" @@ -47,6 +51,128 @@ import ( "k8s.io/utils/pointer" ) +func TestSchedulingGates(t *testing.T) { + tests := []struct { + name string + pods []*v1.Pod + featureEnabled bool + want []string + rmPodsSchedulingGates []int + wantPostGatesRemoval []string + }{ + { + name: "feature disabled, regular pods", + pods: []*v1.Pod{ + st.MakePod().Name("p1").Container("pause").Obj(), + st.MakePod().Name("p2").Container("pause").Obj(), + }, + featureEnabled: false, + want: []string{"p1", "p2"}, + }, + { + name: "feature enabled, regular pods", + pods: []*v1.Pod{ + st.MakePod().Name("p1").Container("pause").Obj(), + st.MakePod().Name("p2").Container("pause").Obj(), + }, + featureEnabled: true, + want: []string{"p1", "p2"}, + }, + { + name: "feature disabled, one pod carrying scheduling gates", + pods: []*v1.Pod{ + st.MakePod().Name("p1").SchedulingGates([]string{"foo"}).Container("pause").Obj(), + st.MakePod().Name("p2").Container("pause").Obj(), + }, + featureEnabled: false, + want: []string{"p1", "p2"}, + }, + { + name: "feature enabled, one pod carrying scheduling gates", + pods: []*v1.Pod{ + st.MakePod().Name("p1").SchedulingGates([]string{"foo"}).Container("pause").Obj(), + st.MakePod().Name("p2").Container("pause").Obj(), + }, + featureEnabled: true, + want: []string{"p2"}, + }, + { + name: "feature enabled, two pod carrying scheduling gates, and remove gates of one pod", + pods: []*v1.Pod{ + st.MakePod().Name("p1").SchedulingGates([]string{"foo"}).Container("pause").Obj(), + st.MakePod().Name("p2").SchedulingGates([]string{"bar"}).Container("pause").Obj(), + st.MakePod().Name("p3").Container("pause").Obj(), + }, + featureEnabled: true, + want: []string{"p3"}, + rmPodsSchedulingGates: []int{1}, // remove gates of 'p2' + wantPostGatesRemoval: []string{"p2"}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodSchedulingReadiness, tt.featureEnabled)() + + // Use zero backoff seconds to bypass backoffQ. + // It's intended to not start the scheduler's queue, and hence to + // not start any flushing logic. We will pop and schedule the Pods manually later. + testCtx := testutils.InitTestSchedulerWithOptions( + t, + testutils.InitTestAPIServer(t, "pod-scheduling-gates", nil), + 0, + scheduler.WithPodInitialBackoffSeconds(0), + scheduler.WithPodMaxBackoffSeconds(0), + ) + testutils.SyncInformerFactory(testCtx) + defer testutils.CleanupTest(t, testCtx) + + cs, ns, ctx := testCtx.ClientSet, testCtx.NS.Name, testCtx.Ctx + for _, p := range tt.pods { + p.Namespace = ns + if _, err := cs.CoreV1().Pods(ns).Create(ctx, p, metav1.CreateOptions{}); err != nil { + t.Fatalf("Failed to create Pod %q: %v", p.Name, err) + } + } + + // Wait for the pods to be present in the scheduling queue. + if err := wait.Poll(time.Millisecond*200, wait.ForeverTestTimeout, func() (bool, error) { + pendingPods, _ := testCtx.Scheduler.SchedulingQueue.PendingPods() + return len(pendingPods) == len(tt.pods), nil + }); err != nil { + t.Fatal(err) + } + + // Pop the expected pods out. They should be de-queueable. + for _, wantPod := range tt.want { + podInfo := nextPodOrDie(t, testCtx) + if got := podInfo.Pod.Name; got != wantPod { + t.Errorf("Want %v to be popped out, but got %v", wantPod, got) + } + } + + if len(tt.rmPodsSchedulingGates) == 0 { + return + } + // Remove scheduling gates from the pod spec. + for _, idx := range tt.rmPodsSchedulingGates { + patch := `{"spec": {"schedulingGates": null}}` + podName := tt.pods[idx].Name + if _, err := cs.CoreV1().Pods(ns).Patch(ctx, podName, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}); err != nil { + t.Fatalf("Failed to patch pod %v: %v", podName, err) + } + } + // Pop the expected pods out. They should be de-queueable. + for _, wantPod := range tt.wantPostGatesRemoval { + podInfo := nextPodOrDie(t, testCtx) + if got := podInfo.Pod.Name; got != wantPod { + t.Errorf("Want %v to be popped out, but got %v", wantPod, got) + } + } + }) + } +} + // TestCoreResourceEnqueue verify Pods failed by in-tree default plugins can be // moved properly upon their registered events. func TestCoreResourceEnqueue(t *testing.T) { diff --git a/test/integration/util/util.go b/test/integration/util/util.go index 42a51976f934..afae84225365 100644 --- a/test/integration/util/util.go +++ b/test/integration/util/util.go @@ -776,7 +776,7 @@ func PodScheduledIn(c clientset.Interface, podNamespace, podName string, nodeNam } // PodUnschedulable returns a condition function that returns true if the given pod -// gets unschedulable status. +// gets unschedulable status of reason 'Unschedulable'. func PodUnschedulable(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc { return func() (bool, error) { pod, err := c.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{}) @@ -806,18 +806,39 @@ func PodSchedulingError(c clientset.Interface, podNamespace, podName string) wai } } -// waitForPodUnscheduleWithTimeout waits for a pod to fail scheduling and returns +// PodSchedulingGated returns a condition function that returns true if the given pod +// gets unschedulable status of reason 'SchedulingGated'. +func PodSchedulingGated(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc { + return func() (bool, error) { + pod, err := c.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{}) + if err != nil { + // This could be a connection error so we want to retry. + return false, nil + } + _, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled) + return cond != nil && cond.Status == v1.ConditionFalse && + cond.Reason == v1.PodReasonSchedulingGated && pod.Spec.NodeName == "", nil + } +} + +// WaitForPodUnschedulableWithTimeout waits for a pod to fail scheduling and returns // an error if it does not become unschedulable within the given timeout. func WaitForPodUnschedulableWithTimeout(cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error { return wait.Poll(100*time.Millisecond, timeout, PodUnschedulable(cs, pod.Namespace, pod.Name)) } -// waitForPodUnschedule waits for a pod to fail scheduling and returns +// WaitForPodUnschedulable waits for a pod to fail scheduling and returns // an error if it does not become unschedulable within the timeout duration (30 seconds). func WaitForPodUnschedulable(cs clientset.Interface, pod *v1.Pod) error { return WaitForPodUnschedulableWithTimeout(cs, pod, 30*time.Second) } +// WaitForPodSchedulingGated waits for a pod to be in scheduling gated state +// and returns an error if it does not fall into this state within the given timeout. +func WaitForPodSchedulingGated(cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error { + return wait.Poll(100*time.Millisecond, timeout, PodSchedulingGated(cs, pod.Namespace, pod.Name)) +} + // WaitForPDBsStable waits for PDBs to have "CurrentHealthy" status equal to // the expected values. func WaitForPDBsStable(testCtx *TestContext, pdbs []*policy.PodDisruptionBudget, pdbPodNum []int32) error {