Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KEP-3521] Part 3: Bug fixes, integration & E2E Test #113442

Merged
merged 3 commits into from Nov 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/scheduler/internal/queue/scheduling_queue.go
Expand Up @@ -441,6 +441,9 @@ func (p *PriorityQueue) activate(pod *v1.Pod) bool {
// isPodBackingoff returns true if a pod is still waiting for its backoff timer.
// If this returns true, the pod should not be re-tried.
func (p *PriorityQueue) isPodBackingoff(podInfo *framework.QueuedPodInfo) bool {
if podInfo.Gated {
return false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is good to have for sure, but I assume that podInfo for a gated pod should never have its Timestamp set beyond the first time we observed the pod (on Add(...)) because we should have never attempted to schedule the pod (i.e., Attempts should always be zero), and so Timestamp shouldn't be reset.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, attempts is always zero, so duration := p.calculateBackoffDuration(podInfo) would always return podInitialBackoffDuration. In other words, if the duration between the pod is added and updated less than podInitialBackoffDuration, isPodBackingOff would return true.

This bug can be observed by running hack/local-up-cluster.sh with the following diff:

diff --git a/hack/local-up-cluster.sh b/hack/local-up-cluster.sh
index 6ec7f1ded28..e2c67dd0e34 100755
--- a/hack/local-up-cluster.sh
+++ b/hack/local-up-cluster.sh
@@ -869,6 +869,8 @@ clientConnection:
   kubeconfig: ${CERT_DIR}/scheduler.kubeconfig
 leaderElection:
   leaderElect: false
+podInitialBackoffSeconds: 120
+podMaxBackoffSeconds: 200
 EOF
     ${CONTROLPLANE_SUDO} "${GO_OUT}/kube-scheduler" \
       --v="${LOG_LEVEL}" \

}
boTime := p.getBackoffTime(podInfo)
return boTime.After(p.clock.Now())
}
Expand Down
14 changes: 12 additions & 2 deletions pkg/scheduler/internal/queue/scheduling_queue_test.go
Expand Up @@ -512,14 +512,24 @@ func TestPriorityQueue_addToActiveQ(t *testing.T) {
defer cancel()

m := map[string][]framework.PreEnqueuePlugin{"": tt.plugins}
q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), []runtime.Object{tt.pod}, WithPreEnqueuePluginMap(m))
got, _ := q.addToActiveQ(newQueuedPodInfoForLookup(tt.pod))
q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), []runtime.Object{tt.pod}, WithPreEnqueuePluginMap(m),
WithPodInitialBackoffDuration(time.Second*30), WithPodMaxBackoffDuration(time.Second*60))
got, _ := q.addToActiveQ(q.newQueuedPodInfo(tt.pod))
if got != tt.wantSuccess {
t.Errorf("Unexpected result: want %v, but got %v", tt.wantSuccess, got)
}
if tt.wantUnschedulablePods != len(q.unschedulablePods.podInfoMap) {
t.Errorf("Unexpected unschedulablePods: want %v, but got %v", tt.wantUnschedulablePods, len(q.unschedulablePods.podInfoMap))
}

// Simulate an update event.
clone := tt.pod.DeepCopy()
metav1.SetMetaDataAnnotation(&clone.ObjectMeta, "foo", "")
q.Update(tt.pod, clone)
// Ensure the pod is still located in unschedulablePods.
if tt.wantUnschedulablePods != len(q.unschedulablePods.podInfoMap) {
t.Errorf("Unexpected unschedulablePods: want %v, but got %v", tt.wantUnschedulablePods, len(q.unschedulablePods.podInfoMap))
}
})
}
}
Expand Down
77 changes: 75 additions & 2 deletions test/e2e/framework/pod/wait.go
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"errors"
"fmt"
"reflect"
"text/tabwriter"
"time"

Expand Down Expand Up @@ -253,7 +254,7 @@ func WaitForPodsRunningReady(c clientset.Interface, ns string, minPods, allowedN
framework.Logf("Pod %s is Failed, but it's not controlled by a controller", pod.ObjectMeta.Name)
badPods = append(badPods, pod)
}
//ignore failed pods that are controlled by some controller
// ignore failed pods that are controlled by some controller
}
}

Expand Down Expand Up @@ -326,7 +327,7 @@ func WaitForPodCondition(c clientset.Interface, ns, podName, conditionDesc strin
return maybeTimeoutError(err, "waiting for pod %s to be %s", podIdentifier(ns, podName), conditionDesc)
}

// WaitForPodsCondition waits for the listed pods to match the given condition.
// WaitForAllPodsCondition waits for the listed pods to match the given condition.
// To succeed, at least minPods must be listed, and all listed pods must match the condition.
func WaitForAllPodsCondition(c clientset.Interface, ns string, opts metav1.ListOptions, minPods int, conditionDesc string, timeout time.Duration, condition podCondition) (*v1.PodList, error) {
framework.Logf("Waiting up to %v for at least %d pods in namespace %s to be %s", timeout, minPods, ns, conditionDesc)
Expand Down Expand Up @@ -362,6 +363,78 @@ func WaitForAllPodsCondition(c clientset.Interface, ns string, opts metav1.ListO
return pods, maybeTimeoutError(err, "waiting for at least %d pods to be %s (matched %d)", minPods, conditionDesc, matched)
}

// WaitForPodsRunning waits for a given `timeout` to evaluate if a certain amount of pods in given `ns` are running.
func WaitForPodsRunning(c clientset.Interface, ns string, num int, timeout time.Duration) error {
matched := 0
err := wait.PollImmediate(poll, timeout, func() (done bool, err error) {
pods, err := c.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return handleWaitingAPIError(err, true, "listing pods")
}
matched = 0
for _, pod := range pods.Items {
if ready, _ := testutils.PodRunningReady(&pod); ready {
matched++
}
}
if matched == num {
return true, nil
}
framework.Logf("expect %d pods are running, but got %v", num, matched)
return false, nil
})
return maybeTimeoutError(err, "waiting for pods to be running (want %v, matched %d)", num, matched)
}

// WaitForPodsSchedulingGated waits for a given `timeout` to evaluate if a certain amount of pods in given `ns` stay in scheduling gated state.
func WaitForPodsSchedulingGated(c clientset.Interface, ns string, num int, timeout time.Duration) error {
matched := 0
err := wait.PollImmediate(poll, timeout, func() (done bool, err error) {
pods, err := c.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return handleWaitingAPIError(err, true, "listing pods")
}
matched = 0
for _, pod := range pods.Items {
for _, condition := range pod.Status.Conditions {
if condition.Type == v1.PodScheduled && condition.Reason == v1.PodReasonSchedulingGated {
matched++
}
}
}
if matched == num {
return true, nil
}
framework.Logf("expect %d pods in scheduling gated state, but got %v", num, matched)
return false, nil
})
return maybeTimeoutError(err, "waiting for pods to be scheduling gated (want %d, matched %d)", num, matched)
}

// WaitForPodsWithSchedulingGates waits for a given `timeout` to evaluate if a certain amount of pods in given `ns`
// match the given `schedulingGates`stay in scheduling gated state.
func WaitForPodsWithSchedulingGates(c clientset.Interface, ns string, num int, timeout time.Duration, schedulingGates []v1.PodSchedulingGate) error {
matched := 0
err := wait.PollImmediate(poll, timeout, func() (done bool, err error) {
pods, err := c.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return handleWaitingAPIError(err, true, "listing pods")
}
matched = 0
for _, pod := range pods.Items {
if reflect.DeepEqual(pod.Spec.SchedulingGates, schedulingGates) {
matched++
}
}
if matched == num {
return true, nil
}
framework.Logf("expect %d pods carry the expected scheduling gates, but got %v", num, matched)
return false, nil
})
return maybeTimeoutError(err, "waiting for pods to carry the expected scheduling gates (want %d, matched %d)", num, matched)
}

// WaitForPodTerminatedInNamespace returns an error if it takes too long for the pod to terminate,
// if the pod Get api returns an error (IsNotFound or other), or if the pod failed (and thus did not
// terminate) with an unexpected reason. Typically called to test that the passed-in pod is fully
Expand Down
72 changes: 72 additions & 0 deletions test/e2e/scheduling/predicates.go
Expand Up @@ -18,15 +18,18 @@ package scheduling

import (
"context"
"encoding/json"
"fmt"
"time"

v1 "k8s.io/api/core/v1"
nodev1 "k8s.io/api/node/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/uuid"
utilversion "k8s.io/apimachinery/pkg/util/version"
clientset "k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -70,6 +73,7 @@ type pausePodConfig struct {
PriorityClassName string
DeletionGracePeriodSeconds *int64
TopologySpreadConstraints []v1.TopologySpreadConstraint
SchedulingGates []v1.PodSchedulingGate
}

var _ = SIGDescribe("SchedulerPredicates [Serial]", func() {
Expand Down Expand Up @@ -799,8 +803,75 @@ var _ = SIGDescribe("SchedulerPredicates [Serial]", func() {
framework.ExpectEqual(numInNode2, expected, fmt.Sprintf("Pods are not distributed as expected on node %q", nodeNames[1]))
})
})

ginkgo.It("validates Pods with non-empty schedulingGates are blocked on scheduling [Feature:PodSchedulingReadiness] [alpha]", func() {
podLabel := "e2e-scheduling-gates"
replicas := 3
ginkgo.By(fmt.Sprintf("Creating a ReplicaSet with replicas=%v, carrying scheduling gates [foo bar]", replicas))
rsConfig := pauseRSConfig{
Replicas: int32(replicas),
PodConfig: pausePodConfig{
Name: podLabel,
Namespace: ns,
Labels: map[string]string{podLabel: ""},
SchedulingGates: []v1.PodSchedulingGate{
{Name: "foo"},
{Name: "bar"},
},
},
}
createPauseRS(f, rsConfig)

ginkgo.By("Expect all pods stay in pending state")
podList, err := e2epod.WaitForNumberOfPods(cs, ns, replicas, time.Minute)
framework.ExpectNoError(err)
framework.ExpectNoError(e2epod.WaitForPodsSchedulingGated(cs, ns, replicas, time.Minute))

ginkgo.By("Remove one scheduling gate")
want := []v1.PodSchedulingGate{{Name: "bar"}}
var pods []*v1.Pod
for _, pod := range podList.Items {
clone := pod.DeepCopy()
clone.Spec.SchedulingGates = want
live, err := patchPod(cs, &pod, clone)
framework.ExpectNoError(err)
pods = append(pods, live)
}

ginkgo.By("Expect all pods carry one scheduling gate and are still in pending state")
framework.ExpectNoError(e2epod.WaitForPodsWithSchedulingGates(cs, ns, replicas, time.Minute, want))
framework.ExpectNoError(e2epod.WaitForPodsSchedulingGated(cs, ns, replicas, time.Minute))

ginkgo.By("Remove the remaining scheduling gates")
for _, pod := range pods {
clone := pod.DeepCopy()
clone.Spec.SchedulingGates = nil
_, err := patchPod(cs, pod, clone)
framework.ExpectNoError(err)
}

ginkgo.By("Expect all pods are scheduled and running")
framework.ExpectNoError(e2epod.WaitForPodsRunning(cs, ns, replicas, time.Minute))
})
})

func patchPod(cs clientset.Interface, old, new *v1.Pod) (*v1.Pod, error) {
oldData, err := json.Marshal(old)
if err != nil {
return nil, err
}

newData, err := json.Marshal(new)
if err != nil {
return nil, err
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &v1.Pod{})
if err != nil {
return nil, fmt.Errorf("failed to create merge patch for Pod %q: %v", old.Name, err)
}
return cs.CoreV1().Pods(new.Namespace).Patch(context.TODO(), new.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
}

// printAllPodsOnNode outputs status of all kubelet pods into log.
func printAllPodsOnNode(c clientset.Interface, nodeName string) {
podList, err := c.CoreV1().Pods(metav1.NamespaceAll).List(context.TODO(), metav1.ListOptions{FieldSelector: "spec.nodeName=" + nodeName})
Expand Down Expand Up @@ -844,6 +915,7 @@ func initPausePod(f *framework.Framework, conf pausePodConfig) *v1.Pod {
Tolerations: conf.Tolerations,
PriorityClassName: conf.PriorityClassName,
TerminationGracePeriodSeconds: &gracePeriod,
SchedulingGates: conf.SchedulingGates,
},
}
for key, value := range conf.Labels {
Expand Down
91 changes: 91 additions & 0 deletions test/integration/scheduler/plugins/plugins_test.go
Expand Up @@ -31,9 +31,12 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes"
listersv1 "k8s.io/client-go/listers/core/v1"
featuregatetesting "k8s.io/component-base/featuregate/testing"
configv1 "k8s.io/kube-scheduler/config/v1"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler"
schedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing"
Expand All @@ -57,9 +60,15 @@ var (
podSchedulingError = testutils.PodSchedulingError
createAndWaitForNodesInCache = testutils.CreateAndWaitForNodesInCache
waitForPodUnschedulable = testutils.WaitForPodUnschedulable
waitForPodSchedulingGated = testutils.WaitForPodSchedulingGated
waitForPodToScheduleWithTimeout = testutils.WaitForPodToScheduleWithTimeout
)

type PreEnqueuePlugin struct {
called int32
admit bool
}

type PreFilterPlugin struct {
numPreFilterCalled int
failPreFilter bool
Expand Down Expand Up @@ -146,6 +155,7 @@ type PermitPlugin struct {
}

const (
enqueuePluginName = "enqueue-plugin"
prefilterPluginName = "prefilter-plugin"
postfilterPluginName = "postfilter-plugin"
scorePluginName = "score-plugin"
Expand All @@ -158,6 +168,7 @@ const (
permitPluginName = "permit-plugin"
)

var _ framework.PreEnqueuePlugin = &PreEnqueuePlugin{}
var _ framework.PreFilterPlugin = &PreFilterPlugin{}
var _ framework.PostFilterPlugin = &PostFilterPlugin{}
var _ framework.ScorePlugin = &ScorePlugin{}
Expand All @@ -184,6 +195,18 @@ func newPlugin(plugin framework.Plugin) frameworkruntime.PluginFactory {
}
}

func (ep *PreEnqueuePlugin) Name() string {
return enqueuePluginName
}

func (ep *PreEnqueuePlugin) PreEnqueue(ctx context.Context, p *v1.Pod) *framework.Status {
ep.called++
if ep.admit {
return nil
}
return framework.NewStatus(framework.UnschedulableAndUnresolvable, "not ready for scheduling")
}

// Name returns name of the score plugin.
func (sp *ScorePlugin) Name() string {
return scorePluginName
Expand Down Expand Up @@ -2089,6 +2112,72 @@ func TestPreScorePlugin(t *testing.T) {
}
}

// TestPreEnqueuePlugin tests invocation of enqueue plugins.
func TestPreEnqueuePlugin(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodSchedulingReadiness, true)()

// Create a plugin registry for testing. Register only a filter plugin.
enqueuePlugin := &PreEnqueuePlugin{}
// Plumb a preFilterPlugin to verify if it's called or not.
preFilterPlugin := &PreFilterPlugin{}
registry, prof := initRegistryAndConfig(t, enqueuePlugin, preFilterPlugin)

// Create the API server and the scheduler with the test plugin set.
testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "enqueue-plugin", nil), 1,
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer testutils.CleanupTest(t, testCtx)

tests := []struct {
name string
pod *v1.Pod
admitEnqueue bool
}{
{
name: "pod is admitted to enqueue",
pod: st.MakePod().Name("p").Namespace(testCtx.NS.Name).Container("pause").Obj(),
admitEnqueue: true,
},
{
name: "pod is not admitted to enqueue",
pod: st.MakePod().Name("p").Namespace(testCtx.NS.Name).SchedulingGates([]string{"foo"}).Container("pause").Obj(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it a problem that both testcases use a pod with the same name?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's fine as in an integration test, each sub-test's context/env is destroyed, and is supposed to run statelessly.

testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod})

admitEnqueue: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
enqueuePlugin.admit = tt.admitEnqueue
// Create a best effort pod.
pod, err := createPausePod(testCtx.ClientSet, tt.pod)
if err != nil {
t.Errorf("Error while creating a test pod: %v", err)
}

if tt.admitEnqueue {
if err := waitForPodToScheduleWithTimeout(testCtx.ClientSet, pod, 10*time.Second); err != nil {
t.Errorf("Expected the pod to be schedulable, but got: %v", err)
}
// Also verify enqueuePlugin is called.
if enqueuePlugin.called == 0 {
t.Errorf("Expected the enqueuePlugin plugin to be called at least once, but got 0")
}
} else {
if err := waitForPodSchedulingGated(testCtx.ClientSet, pod, 10*time.Second); err != nil {
t.Errorf("Expected the pod to be scheduling waiting, but got: %v", err)
}
// Also verify preFilterPlugin is not called.
if preFilterPlugin.numPreFilterCalled != 0 {
t.Errorf("Expected the preFilter plugin not to be called, but got %v", preFilterPlugin.numPreFilterCalled)
}
}

preFilterPlugin.reset()
testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod})
})
}
}

// TestPreemptWithPermitPlugin tests preempt with permit plugins.
// It verifies how waitingPods behave in different scenarios:
// - when waitingPods get preempted
Expand Down Expand Up @@ -2450,6 +2539,8 @@ func initRegistryAndConfig(t *testing.T, plugins ...framework.Plugin) (framework
plugin := configv1.Plugin{Name: p.Name()}

switch p.(type) {
case *PreEnqueuePlugin:
pls.PreEnqueue.Enabled = append(pls.PreEnqueue.Enabled, plugin)
case *PreFilterPlugin:
pls.PreFilter.Enabled = append(pls.PreFilter.Enabled, plugin)
case *FilterPlugin:
Expand Down