Skip to content

Commit

Permalink
Merge pull request kubernetes#98811 from damemi/1.20-balance-nodes-ub…
Browse files Browse the repository at this point in the history
…ernetes

[release-1.20] Balance nodes in scheduling e2e
  • Loading branch information
k8s-ci-robot committed Mar 12, 2021
2 parents 74d3baa + 2927f04 commit feae2a5
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 10 deletions.
10 changes: 8 additions & 2 deletions test/e2e/scheduling/predicates.go
Expand Up @@ -882,8 +882,8 @@ func initPausePod(f *framework.Framework, conf pausePodConfig) *v1.Pod {
ObjectMeta: metav1.ObjectMeta{
Name: conf.Name,
Namespace: conf.Namespace,
Labels: conf.Labels,
Annotations: conf.Annotations,
Labels: map[string]string{},
Annotations: map[string]string{},
OwnerReferences: conf.OwnerReferences,
},
Spec: v1.PodSpec{
Expand All @@ -903,6 +903,12 @@ func initPausePod(f *framework.Framework, conf pausePodConfig) *v1.Pod {
TerminationGracePeriodSeconds: &gracePeriod,
},
}
for key, value := range conf.Labels {
pod.ObjectMeta.Labels[key] = value
}
for key, value := range conf.Annotations {
pod.ObjectMeta.Annotations[key] = value
}
// TODO: setting the Pod's nodeAffinity instead of setting .spec.nodeName works around the
// Preemption e2e flake (#88441), but we should investigate deeper to get to the bottom of it.
if len(conf.NodeName) != 0 {
Expand Down
48 changes: 40 additions & 8 deletions test/e2e/scheduling/priorities.go
Expand Up @@ -32,6 +32,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
Expand All @@ -52,7 +53,7 @@ type Resource struct {
Memory int64
}

var balancePodLabel = map[string]string{"name": "priority-balanced-memory"}
var balancePodLabel = map[string]string{"podname": "priority-balanced-memory"}

var podRequestedResource = &v1.ResourceRequirements{
Limits: v1.ResourceList{
Expand Down Expand Up @@ -187,7 +188,8 @@ var _ = SIGDescribe("SchedulerPriorities [Serial]", func() {
}

// make the nodes have balanced cpu,mem usage
err = createBalancedPodForNodes(f, cs, ns, nodeList.Items, podRequestedResource, 0.6)
cleanUp, err := createBalancedPodForNodes(f, cs, ns, nodeList.Items, podRequestedResource, 0.6)
defer cleanUp()
framework.ExpectNoError(err)
ginkgo.By("Trying to launch the pod with podAntiAffinity.")
labelPodName := "pod-with-pod-antiaffinity"
Expand Down Expand Up @@ -236,7 +238,8 @@ var _ = SIGDescribe("SchedulerPriorities [Serial]", func() {
ginkgo.It("Pod should avoid nodes that have avoidPod annotation", func() {
nodeName := nodeList.Items[0].Name
// make the nodes have balanced cpu,mem usage
err := createBalancedPodForNodes(f, cs, ns, nodeList.Items, podRequestedResource, 0.5)
cleanUp, err := createBalancedPodForNodes(f, cs, ns, nodeList.Items, podRequestedResource, 0.5)
defer cleanUp()
framework.ExpectNoError(err)
ginkgo.By("Create a RC, with 0 replicas")
rc := createRC(ns, "scheduler-priority-avoid-pod", int32(0), map[string]string{"name": "scheduler-priority-avoid-pod"}, f, podRequestedResource)
Expand Down Expand Up @@ -298,7 +301,8 @@ var _ = SIGDescribe("SchedulerPriorities [Serial]", func() {

ginkgo.It("Pod should be preferably scheduled to nodes pod can tolerate", func() {
// make the nodes have balanced cpu,mem usage ratio
err := createBalancedPodForNodes(f, cs, ns, nodeList.Items, podRequestedResource, 0.5)
cleanUp, err := createBalancedPodForNodes(f, cs, ns, nodeList.Items, podRequestedResource, 0.5)
defer cleanUp()
framework.ExpectNoError(err)
// Apply 10 taints to first node
nodeName := nodeList.Items[0].Name
Expand Down Expand Up @@ -360,7 +364,8 @@ var _ = SIGDescribe("SchedulerPriorities [Serial]", func() {
}

// Make the nodes have balanced cpu,mem usage.
err := createBalancedPodForNodes(f, cs, ns, nodes, podRequestedResource, 0.5)
cleanUp, err := createBalancedPodForNodes(f, cs, ns, nodes, podRequestedResource, 0.5)
defer cleanUp()
framework.ExpectNoError(err)

replicas := 4
Expand Down Expand Up @@ -425,7 +430,34 @@ var _ = SIGDescribe("SchedulerPriorities [Serial]", func() {
})

// createBalancedPodForNodes creates a pod per node that asks for enough resources to make all nodes have the same mem/cpu usage ratio.
func createBalancedPodForNodes(f *framework.Framework, cs clientset.Interface, ns string, nodes []v1.Node, requestedResource *v1.ResourceRequirements, ratio float64) error {
func createBalancedPodForNodes(f *framework.Framework, cs clientset.Interface, ns string, nodes []v1.Node, requestedResource *v1.ResourceRequirements, ratio float64) (func(), error) {
cleanUp := func() {
// Delete all remaining pods
err := cs.CoreV1().Pods(ns).DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{
LabelSelector: labels.SelectorFromSet(labels.Set(balancePodLabel)).String(),
})
if err != nil {
framework.Logf("Failed to delete memory balanced pods: %v.", err)
} else {
err := wait.PollImmediate(2*time.Second, time.Minute, func() (bool, error) {
podList, err := cs.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{
LabelSelector: labels.SelectorFromSet(labels.Set(balancePodLabel)).String(),
})
if err != nil {
framework.Logf("Failed to list memory balanced pods: %v.", err)
return false, nil
}
if len(podList.Items) > 0 {
return false, nil
}
return true, nil
})
if err != nil {
framework.Logf("Failed to wait until all memory balanced pods are deleted: %v.", err)
}
}
}

// find the max, if the node has the max,use the one, if not,use the ratio parameter
var maxCPUFraction, maxMemFraction float64 = ratio, ratio
var cpuFractionMap = make(map[string]float64)
Expand Down Expand Up @@ -485,7 +517,7 @@ func createBalancedPodForNodes(f *framework.Framework, cs clientset.Interface, n
*initPausePod(f, *podConfig), true, framework.Logf)

if err != nil {
return err
return cleanUp, err
}
}

Expand All @@ -494,7 +526,7 @@ func createBalancedPodForNodes(f *framework.Framework, cs clientset.Interface, n
computeCPUMemFraction(cs, node, requestedResource)
}

return nil
return cleanUp, nil
}

func computeCPUMemFraction(cs clientset.Interface, node v1.Node, resource *v1.ResourceRequirements) (float64, float64) {
Expand Down
16 changes: 16 additions & 0 deletions test/e2e/scheduling/ubernetes_lite.go
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"math"
"time"

"github.com/onsi/ginkgo"
"github.com/onsi/gomega"
Expand All @@ -42,6 +43,7 @@ var _ = SIGDescribe("Multi-AZ Clusters", func() {
f := framework.NewDefaultFramework("multi-az")
var zoneCount int
var err error
var cleanUp func()
ginkgo.BeforeEach(func() {
e2eskipper.SkipUnlessProviderIs("gce", "gke", "aws")
if zoneCount <= 0 {
Expand All @@ -52,6 +54,20 @@ var _ = SIGDescribe("Multi-AZ Clusters", func() {
msg := fmt.Sprintf("Zone count is %d, only run for multi-zone clusters, skipping test", zoneCount)
e2eskipper.SkipUnlessAtLeast(zoneCount, 2, msg)
// TODO: SkipUnlessDefaultScheduler() // Non-default schedulers might not spread

cs := f.ClientSet
e2enode.WaitForTotalHealthy(cs, time.Minute)
nodeList, err := e2enode.GetReadySchedulableNodes(cs)
framework.ExpectNoError(err)

// make the nodes have balanced cpu,mem usage
cleanUp, err = createBalancedPodForNodes(f, cs, f.Namespace.Name, nodeList.Items, podRequestedResource, 0.0)
framework.ExpectNoError(err)
})
ginkgo.AfterEach(func() {
if cleanUp != nil {
cleanUp()
}
})
ginkgo.It("should spread the pods of a service across zones", func() {
SpreadServiceOrFail(f, 5*zoneCount, imageutils.GetPauseImageName())
Expand Down

0 comments on commit feae2a5

Please sign in to comment.