Skip to content

Commit

Permalink
sched: integration test to cover event registration
Browse files Browse the repository at this point in the history
  • Loading branch information
Huang-Wei committed Oct 6, 2021
1 parent 04f747d commit 3283e6b
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 5 deletions.
93 changes: 90 additions & 3 deletions test/integration/scheduler/queue_test.go
Expand Up @@ -47,7 +47,93 @@ import (
"k8s.io/utils/pointer"
)

// TODO(#105303): Add a test case to cover event registration for core API resources
// TestCoreResourceEnqueue verify Pods failed by in-tree default plugins can be
// moved properly upon their registered events.
func TestCoreResourceEnqueue(t *testing.T) {
// Use zero backoff seconds to bypass backoffQ.
// It's intended to not start the scheduler's queue, and hence to
// not start any flushing logic. We will pop and schedule the Pods manually later.
testCtx := testutils.InitTestSchedulerWithOptions(
t,
testutils.InitTestAPIServer(t, "core-res-enqueue", nil),
nil,
scheduler.WithPodInitialBackoffSeconds(0),
scheduler.WithPodMaxBackoffSeconds(0),
)
testutils.SyncInformerFactory(testCtx)

defer testutils.CleanupTest(t, testCtx)

cs, ns, ctx := testCtx.ClientSet, testCtx.NS.Name, testCtx.Ctx
// Create one Node with a taint.
node := st.MakeNode().Name("fake-node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Obj()
node.Spec.Taints = []v1.Taint{{Key: "foo", Effect: v1.TaintEffectNoSchedule}}
if _, err := cs.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create Node %q: %v", node.Name, err)
}

// Create two Pods that are both unschedulable.
// - Pod1 is a best-effort Pod, but doesn't have the required toleration.
// - Pod2 requests a large amount of CPU resource that the node cannot fit.
// Note: Pod2 will fail the tainttoleration plugin b/c that's ordered prior to noderesources.
pod1 := st.MakePod().Namespace(ns).Name("pod1").Container("image").Obj()
pod2 := st.MakePod().Namespace(ns).Name("pod2").Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Obj()
for _, pod := range []*v1.Pod{pod1, pod2} {
if _, err := cs.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create Pod %q: %v", pod.Name, err)
}
}

// Wait for the two pods to be present in the scheduling queue.
if err := wait.Poll(time.Millisecond*200, wait.ForeverTestTimeout, func() (bool, error) {
return len(testCtx.Scheduler.SchedulingQueue.PendingPods()) == 2, nil
}); err != nil {
t.Fatal(err)
}

// Pop the two pods out. They should be unschedulable.
for i := 0; i < 2; i++ {
podInfo := nextPodOrDie(t, testCtx)
fwk, ok := testCtx.Scheduler.Profiles[podInfo.Pod.Spec.SchedulerName]
if !ok {
t.Fatalf("Cannot find the profile for Pod %v", podInfo.Pod.Name)
}
// Schedule the Pod manually.
_, fitError := testCtx.Scheduler.Algorithm.Schedule(ctx, nil, fwk, framework.NewCycleState(), podInfo.Pod)
if fitError == nil {
t.Fatalf("Expect Pod %v to fail at scheduling.", podInfo.Pod.Name)
}
testCtx.Scheduler.Error(podInfo, fitError)

// Scheduling cycle is incremented by one after NextPod() is called, so
// pass a number larger than i to move Pod to unschedulableQ.
testCtx.Scheduler.SchedulingQueue.AddUnschedulableIfNotPresent(podInfo, int64(i+10))
}

// Trigger a NodeTaintChange event.
// We expect this event to trigger moving the test Pod from unschedulableQ to activeQ.
node.Spec.Taints = nil
if _, err := cs.CoreV1().Nodes().Update(ctx, node, metav1.UpdateOptions{}); err != nil {
t.Fatalf("Failed to remove taints off the node: %v", err)
}

// Now we should be able to pop the Pod from activeQ again.
podInfo := nextPodOrDie(t, testCtx)
if podInfo.Attempts != 2 {
t.Fatalf("Expected the Pod to be attempted 2 times, but got %v", podInfo.Attempts)
}
if got := podInfo.Pod.Name; got != "pod1" {
t.Fatalf("Exepcted pod1 to be popped, but got %v", got)
}

// Pod2 is not expected to be popped out.
// Although the failure reason has been lifted, it still won't be moved to active due to
// the node event's preCheckForNode().
podInfo = nextPod(t, testCtx)
if podInfo != nil {
t.Fatalf("Unexpected pod %v get popped out", podInfo.Pod.Name)
}
}

var _ framework.FilterPlugin = &fakeCRPlugin{}
var _ framework.EnqueueExtensions = &fakeCRPlugin{}
Expand Down Expand Up @@ -147,6 +233,8 @@ func TestCustomResourceEnqueue(t *testing.T) {
}

// Use zero backoff seconds to bypass backoffQ.
// It's intended to not start the scheduler's queue, and hence to
// not start any flushing logic. We will pop and schedule the Pods manually later.
testCtx = testutils.InitTestSchedulerWithOptions(
t,
testCtx,
Expand All @@ -157,8 +245,7 @@ func TestCustomResourceEnqueue(t *testing.T) {
scheduler.WithPodMaxBackoffSeconds(0),
)
testutils.SyncInformerFactory(testCtx)
// It's intended to not start the scheduler's queue, and hence to
// not start any flushing logic. We will pop and schedule the Pods manually later.

defer testutils.CleanupTest(t, testCtx)

cs, ns, ctx := testCtx.ClientSet, testCtx.NS.Name, testCtx.Ctx
Expand Down
19 changes: 17 additions & 2 deletions test/integration/scheduler/util.go
Expand Up @@ -534,17 +534,32 @@ func timeout(ctx context.Context, d time.Duration, f func()) error {
}

// nextPodOrDie returns the next Pod in the scheduler queue.
// The operation needs to be completed within 15 seconds; otherwise the test gets aborted.
// The operation needs to be completed within 5 seconds; otherwise the test gets aborted.
func nextPodOrDie(t *testing.T, testCtx *testutils.TestContext) *framework.QueuedPodInfo {
t.Helper()

var podInfo *framework.QueuedPodInfo
// NextPod() is a blocking operation. Wrap it in timeout() to avoid relying on
// default go testing timeout (10m) to abort.
if err := timeout(testCtx.Ctx, time.Second*15, func() {
if err := timeout(testCtx.Ctx, time.Second*5, func() {
podInfo = testCtx.Scheduler.NextPod()
}); err != nil {
t.Fatalf("Timed out waiting for the Pod to be popped: %v", err)
}
return podInfo
}

// nextPod returns the next Pod in the scheduler queue, with a 5 seconds timeout.
func nextPod(t *testing.T, testCtx *testutils.TestContext) *framework.QueuedPodInfo {
t.Helper()

var podInfo *framework.QueuedPodInfo
// NextPod() is a blocking operation. Wrap it in timeout() to avoid relying on
// default go testing timeout (10m) to abort.
if err := timeout(testCtx.Ctx, time.Second*5, func() {
podInfo = testCtx.Scheduler.NextPod()
}); err != nil {
return nil
}
return podInfo
}

0 comments on commit 3283e6b

Please sign in to comment.