Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-1.20] Balance nodes in scheduling e2e #98811

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 8 additions & 2 deletions test/e2e/scheduling/predicates.go
Expand Up @@ -887,8 +887,8 @@ func initPausePod(f *framework.Framework, conf pausePodConfig) *v1.Pod {
ObjectMeta: metav1.ObjectMeta{
Name: conf.Name,
Namespace: conf.Namespace,
Labels: conf.Labels,
Annotations: conf.Annotations,
Labels: map[string]string{},
Annotations: map[string]string{},
OwnerReferences: conf.OwnerReferences,
},
Spec: v1.PodSpec{
Expand All @@ -908,6 +908,12 @@ func initPausePod(f *framework.Framework, conf pausePodConfig) *v1.Pod {
TerminationGracePeriodSeconds: &gracePeriod,
},
}
for key, value := range conf.Labels {
pod.ObjectMeta.Labels[key] = value
}
for key, value := range conf.Annotations {
pod.ObjectMeta.Annotations[key] = value
}
// TODO: setting the Pod's nodeAffinity instead of setting .spec.nodeName works around the
// Preemption e2e flake (#88441), but we should investigate deeper to get to the bottom of it.
if len(conf.NodeName) != 0 {
Expand Down
48 changes: 40 additions & 8 deletions test/e2e/scheduling/priorities.go
Expand Up @@ -32,6 +32,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
Expand All @@ -52,7 +53,7 @@ type Resource struct {
Memory int64
}

var balancePodLabel = map[string]string{"name": "priority-balanced-memory"}
var balancePodLabel = map[string]string{"podname": "priority-balanced-memory"}

var podRequestedResource = &v1.ResourceRequirements{
Limits: v1.ResourceList{
Expand Down Expand Up @@ -187,7 +188,8 @@ var _ = SIGDescribe("SchedulerPriorities [Serial]", func() {
}

// make the nodes have balanced cpu,mem usage
err = createBalancedPodForNodes(f, cs, ns, nodeList.Items, podRequestedResource, 0.6)
cleanUp, err := createBalancedPodForNodes(f, cs, ns, nodeList.Items, podRequestedResource, 0.6)
defer cleanUp()
framework.ExpectNoError(err)
ginkgo.By("Trying to launch the pod with podAntiAffinity.")
labelPodName := "pod-with-pod-antiaffinity"
Expand Down Expand Up @@ -236,7 +238,8 @@ var _ = SIGDescribe("SchedulerPriorities [Serial]", func() {
ginkgo.It("Pod should avoid nodes that have avoidPod annotation", func() {
nodeName := nodeList.Items[0].Name
// make the nodes have balanced cpu,mem usage
err := createBalancedPodForNodes(f, cs, ns, nodeList.Items, podRequestedResource, 0.5)
cleanUp, err := createBalancedPodForNodes(f, cs, ns, nodeList.Items, podRequestedResource, 0.5)
defer cleanUp()
framework.ExpectNoError(err)
ginkgo.By("Create a RC, with 0 replicas")
rc := createRC(ns, "scheduler-priority-avoid-pod", int32(0), map[string]string{"name": "scheduler-priority-avoid-pod"}, f, podRequestedResource)
Expand Down Expand Up @@ -298,7 +301,8 @@ var _ = SIGDescribe("SchedulerPriorities [Serial]", func() {

ginkgo.It("Pod should be preferably scheduled to nodes pod can tolerate", func() {
// make the nodes have balanced cpu,mem usage ratio
err := createBalancedPodForNodes(f, cs, ns, nodeList.Items, podRequestedResource, 0.5)
cleanUp, err := createBalancedPodForNodes(f, cs, ns, nodeList.Items, podRequestedResource, 0.5)
defer cleanUp()
framework.ExpectNoError(err)
// Apply 10 taints to first node
nodeName := nodeList.Items[0].Name
Expand Down Expand Up @@ -360,7 +364,8 @@ var _ = SIGDescribe("SchedulerPriorities [Serial]", func() {
}

// Make the nodes have balanced cpu,mem usage.
err := createBalancedPodForNodes(f, cs, ns, nodes, podRequestedResource, 0.5)
cleanUp, err := createBalancedPodForNodes(f, cs, ns, nodes, podRequestedResource, 0.5)
defer cleanUp()
framework.ExpectNoError(err)

replicas := 4
Expand Down Expand Up @@ -425,7 +430,34 @@ var _ = SIGDescribe("SchedulerPriorities [Serial]", func() {
})

// createBalancedPodForNodes creates a pod per node that asks for enough resources to make all nodes have the same mem/cpu usage ratio.
func createBalancedPodForNodes(f *framework.Framework, cs clientset.Interface, ns string, nodes []v1.Node, requestedResource *v1.ResourceRequirements, ratio float64) error {
func createBalancedPodForNodes(f *framework.Framework, cs clientset.Interface, ns string, nodes []v1.Node, requestedResource *v1.ResourceRequirements, ratio float64) (func(), error) {
cleanUp := func() {
// Delete all remaining pods
err := cs.CoreV1().Pods(ns).DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{
LabelSelector: labels.SelectorFromSet(labels.Set(balancePodLabel)).String(),
})
if err != nil {
framework.Logf("Failed to delete memory balanced pods: %v.", err)
} else {
err := wait.PollImmediate(2*time.Second, time.Minute, func() (bool, error) {
podList, err := cs.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{
LabelSelector: labels.SelectorFromSet(labels.Set(balancePodLabel)).String(),
})
if err != nil {
framework.Logf("Failed to list memory balanced pods: %v.", err)
return false, nil
}
if len(podList.Items) > 0 {
return false, nil
}
return true, nil
})
if err != nil {
framework.Logf("Failed to wait until all memory balanced pods are deleted: %v.", err)
}
}
}

// find the max, if the node has the max,use the one, if not,use the ratio parameter
var maxCPUFraction, maxMemFraction float64 = ratio, ratio
var cpuFractionMap = make(map[string]float64)
Expand Down Expand Up @@ -485,7 +517,7 @@ func createBalancedPodForNodes(f *framework.Framework, cs clientset.Interface, n
*initPausePod(f, *podConfig), true, framework.Logf)

if err != nil {
return err
return cleanUp, err
}
}

Expand All @@ -494,7 +526,7 @@ func createBalancedPodForNodes(f *framework.Framework, cs clientset.Interface, n
computeCPUMemFraction(cs, node, requestedResource)
}

return nil
return cleanUp, nil
}

func computeCPUMemFraction(cs clientset.Interface, node v1.Node, resource *v1.ResourceRequirements) (float64, float64) {
Expand Down
16 changes: 16 additions & 0 deletions test/e2e/scheduling/ubernetes_lite.go
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"math"
"time"

"github.com/onsi/ginkgo"
"github.com/onsi/gomega"
Expand All @@ -42,6 +43,7 @@ var _ = SIGDescribe("Multi-AZ Clusters", func() {
f := framework.NewDefaultFramework("multi-az")
var zoneCount int
var err error
var cleanUp func()
ginkgo.BeforeEach(func() {
e2eskipper.SkipUnlessProviderIs("gce", "gke", "aws")
if zoneCount <= 0 {
Expand All @@ -52,6 +54,20 @@ var _ = SIGDescribe("Multi-AZ Clusters", func() {
msg := fmt.Sprintf("Zone count is %d, only run for multi-zone clusters, skipping test", zoneCount)
e2eskipper.SkipUnlessAtLeast(zoneCount, 2, msg)
// TODO: SkipUnlessDefaultScheduler() // Non-default schedulers might not spread

cs := f.ClientSet
e2enode.WaitForTotalHealthy(cs, time.Minute)
nodeList, err := e2enode.GetReadySchedulableNodes(cs)
framework.ExpectNoError(err)

// make the nodes have balanced cpu,mem usage
cleanUp, err = createBalancedPodForNodes(f, cs, f.Namespace.Name, nodeList.Items, podRequestedResource, 0.0)
framework.ExpectNoError(err)
})
ginkgo.AfterEach(func() {
if cleanUp != nil {
cleanUp()
}
})
ginkgo.It("should spread the pods of a service across zones", func() {
SpreadServiceOrFail(f, 5*zoneCount, imageutils.GetPauseImageName())
Expand Down