Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

e2e for cluster-autoscaler unhealthy cluster handling #43557

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
78 changes: 73 additions & 5 deletions test/e2e/autoscaling/cluster_size_autoscaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
"bytes"
"fmt"
"io/ioutil"
"math"
"net/http"
"os/exec"
"regexp"
"strconv"
"strings"
"time"
Expand All @@ -46,17 +48,19 @@ import (
)

const (
defaultTimeout = 3 * time.Minute
resizeTimeout = 5 * time.Minute
scaleUpTimeout = 5 * time.Minute
scaleDownTimeout = 15 * time.Minute
podTimeout = 2 * time.Minute
defaultTimeout = 3 * time.Minute
resizeTimeout = 5 * time.Minute
scaleUpTimeout = 5 * time.Minute
scaleDownTimeout = 15 * time.Minute
podTimeout = 2 * time.Minute
nodesRecoverTimeout = 5 * time.Minute

gkeEndpoint = "https://test-container.sandbox.googleapis.com"
gkeUpdateTimeout = 15 * time.Minute

disabledTaint = "DisabledForAutoscalingTest"
newNodesForScaledownTests = 2
unhealthyClusterThreshold = 4
)

var _ = framework.KubeDescribe("Cluster size autoscaling [Slow]", func() {
Expand Down Expand Up @@ -354,6 +358,48 @@ var _ = framework.KubeDescribe("Cluster size autoscaling [Slow]", func() {
})
})

It("Shouldn't perform scale up operation and should list unhealthy status if most of the cluster is broken[Feature:ClusterSizeAutoscalingScaleUp]", func() {
clusterSize := nodeCount
for clusterSize < unhealthyClusterThreshold+1 {
clusterSize = manuallyIncreaseClusterSize(f, originalSizes)
}

By("Block network connectivity to some nodes to simulate unhealthy cluster")
nodesToBreakCount := int(math.Floor(math.Max(float64(unhealthyClusterThreshold), 0.5*float64(clusterSize))))
nodes, err := f.ClientSet.Core().Nodes().List(metav1.ListOptions{FieldSelector: fields.Set{
"spec.unschedulable": "false",
}.AsSelector().String()})
framework.ExpectNoError(err)
Expect(nodesToBreakCount <= len(nodes.Items)).To(BeTrue())
nodesToBreak := nodes.Items[:nodesToBreakCount]

// TestUnderTemporaryNetworkFailure only removes connectivity to a single node,
// and accepts func() callback. This is expanding the loop to recursive call
// to avoid duplicating TestUnderTemporaryNetworkFailure
var testFunction func()
testFunction = func() {
if len(nodesToBreak) > 0 {
ntb := &nodesToBreak[0]
nodesToBreak = nodesToBreak[1:]
framework.TestUnderTemporaryNetworkFailure(c, "default", ntb, testFunction)
} else {
ReserveMemory(f, "memory-reservation", 100, nodeCount*memCapacityMb, false)
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.Namespace.Name, "memory-reservation")
time.Sleep(scaleUpTimeout)
currentNodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet)
framework.Logf("Currently available nodes: %v, nodes available at the start of test: %v, disabled nodes: %v", len(currentNodes.Items), len(nodes.Items), nodesToBreakCount)
Expect(len(currentNodes.Items)).Should(Equal(len(nodes.Items) - nodesToBreakCount))
status, err := getClusterwideStatus(c)
framework.Logf("Clusterwide status: %v", status)
framework.ExpectNoError(err)
Expect(status).Should(Equal("Unhealthy"))
}
}
testFunction()
// Give nodes time to recover from network failure
framework.ExpectNoError(framework.WaitForClusterSize(c, len(nodes.Items), nodesRecoverTimeout))
})

})

func runDrainTest(f *framework.Framework, migSizes map[string]int, podsPerNode, pdbSize int, verifyFunction func(int)) {
Expand Down Expand Up @@ -828,3 +874,25 @@ func manuallyIncreaseClusterSize(f *framework.Framework, originalSizes map[strin
func(size int) bool { return size >= increasedSize }, scaleUpTimeout))
return increasedSize
}

// Try to get clusterwide health from CA status configmap.
// Status configmap is not parsing-friendly, so evil regexpery follows.
func getClusterwideStatus(c clientset.Interface) (string, error) {
configMap, err := c.CoreV1().ConfigMaps("kube-system").Get("cluster-autoscaler-status", metav1.GetOptions{})
if err != nil {
return "", err
}
status, ok := configMap.Data["status"]
if !ok {
return "", fmt.Errorf("Status information not found in configmap")
}
matcher, err := regexp.Compile("Cluster-wide:\\s*\n\\s*Health:\\s*([A-Za-z]+)")
if err != nil {
return "", err
}
result := matcher.FindStringSubmatch(status)
if len(result) < 2 {
return "", fmt.Errorf("Failed to parse CA status configmap")
}
return result[1], nil
}
33 changes: 33 additions & 0 deletions test/e2e/framework/networking_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
coreclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/core/v1"
)

Expand Down Expand Up @@ -826,3 +827,35 @@ func TestHitNodesFromOutsideWithCount(externalIP string, httpPort int32, timeout
}
return nil
}

// Blocks outgoing network traffic on 'node'. Then runs testFunc and returns its status.
// At the end (even in case of errors), the network traffic is brought back to normal.
// This function executes commands on a node so it will work only for some
// environments.
func TestUnderTemporaryNetworkFailure(c clientset.Interface, ns string, node *v1.Node, testFunc func()) {
host := GetNodeExternalIP(node)
master := GetMasterAddress(c)
By(fmt.Sprintf("block network traffic from node %s to the master", node.Name))
defer func() {
// This code will execute even if setting the iptables rule failed.
// It is on purpose because we may have an error even if the new rule
// had been inserted. (yes, we could look at the error code and ssh error
// separately, but I prefer to stay on the safe side).
By(fmt.Sprintf("Unblock network traffic from node %s to the master", node.Name))
UnblockNetwork(host, master)
}()

Logf("Waiting %v to ensure node %s is ready before beginning test...", resizeNodeReadyTimeout, node.Name)
if !WaitForNodeToBe(c, node.Name, v1.NodeReady, true, resizeNodeReadyTimeout) {
Failf("Node %s did not become ready within %v", node.Name, resizeNodeReadyTimeout)
}
BlockNetwork(host, master)

Logf("Waiting %v for node %s to be not ready after simulated network failure", resizeNodeNotReadyTimeout, node.Name)
if !WaitForNodeToBe(c, node.Name, v1.NodeReady, false, resizeNodeNotReadyTimeout) {
Failf("Node %s did not become not-ready within %v", node.Name, resizeNodeNotReadyTimeout)
}

testFunc()
// network traffic is unblocked in a deferred function
}
40 changes: 4 additions & 36 deletions test/e2e/network_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,38 +41,6 @@ import (
. "github.com/onsi/gomega"
)

// Blocks outgoing network traffic on 'node'. Then runs testFunc and returns its status.
// At the end (even in case of errors), the network traffic is brought back to normal.
// This function executes commands on a node so it will work only for some
// environments.
func testUnderTemporaryNetworkFailure(c clientset.Interface, ns string, node *v1.Node, testFunc func()) {
host := framework.GetNodeExternalIP(node)
master := framework.GetMasterAddress(c)
By(fmt.Sprintf("block network traffic from node %s to the master", node.Name))
defer func() {
// This code will execute even if setting the iptables rule failed.
// It is on purpose because we may have an error even if the new rule
// had been inserted. (yes, we could look at the error code and ssh error
// separately, but I prefer to stay on the safe side).
By(fmt.Sprintf("Unblock network traffic from node %s to the master", node.Name))
framework.UnblockNetwork(host, master)
}()

framework.Logf("Waiting %v to ensure node %s is ready before beginning test...", resizeNodeReadyTimeout, node.Name)
if !framework.WaitForNodeToBe(c, node.Name, v1.NodeReady, true, resizeNodeReadyTimeout) {
framework.Failf("Node %s did not become ready within %v", node.Name, resizeNodeReadyTimeout)
}
framework.BlockNetwork(host, master)

framework.Logf("Waiting %v for node %s to be not ready after simulated network failure", resizeNodeNotReadyTimeout, node.Name)
if !framework.WaitForNodeToBe(c, node.Name, v1.NodeReady, false, resizeNodeNotReadyTimeout) {
framework.Failf("Node %s did not become not-ready within %v", node.Name, resizeNodeNotReadyTimeout)
}

testFunc()
// network traffic is unblocked in a deferred function
}

func expectNodeReadiness(isReady bool, newNode chan *v1.Node) {
timeout := false
expected := false
Expand Down Expand Up @@ -281,7 +249,7 @@ var _ = framework.KubeDescribe("Network Partition [Disruptive] [Slow]", func() {
// Finally, it checks that the replication controller recreates the
// pods on another node and that now the number of replicas is equal 'replicas'.
By(fmt.Sprintf("blocking network traffic from node %s", node.Name))
testUnderTemporaryNetworkFailure(c, ns, node, func() {
framework.TestUnderTemporaryNetworkFailure(c, ns, node, func() {
framework.Logf("Waiting for pod %s to be removed", pods.Items[0].Name)
err := framework.WaitForRCPodToDisappear(c, ns, name, pods.Items[0].Name)
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -346,7 +314,7 @@ var _ = framework.KubeDescribe("Network Partition [Disruptive] [Slow]", func() {
// Finally, it checks that the replication controller recreates the
// pods on another node and that now the number of replicas is equal 'replicas + 1'.
By(fmt.Sprintf("blocking network traffic from node %s", node.Name))
testUnderTemporaryNetworkFailure(c, ns, node, func() {
framework.TestUnderTemporaryNetworkFailure(c, ns, node, func() {
framework.Logf("Waiting for pod %s to be removed", pods.Items[0].Name)
err := framework.WaitForRCPodToDisappear(c, ns, name, pods.Items[0].Name)
Expect(err).To(Equal(wait.ErrWaitTimeout), "Pod was not deleted during network partition.")
Expand Down Expand Up @@ -421,7 +389,7 @@ var _ = framework.KubeDescribe("Network Partition [Disruptive] [Slow]", func() {
// Blocks outgoing network traffic on 'node'. Then verifies that 'podNameToDisappear',
// that belongs to StatefulSet 'statefulSetName', **does not** disappear due to forced deletion from the apiserver.
// The grace period on the stateful pods is set to a value > 0.
testUnderTemporaryNetworkFailure(c, ns, node, func() {
framework.TestUnderTemporaryNetworkFailure(c, ns, node, func() {
framework.Logf("Checking that the NodeController does not force delete stateful pods %v", pod.Name)
err := framework.WaitTimeoutForPodNoLongerRunningInNamespace(c, pod.Name, ns, 10*time.Minute)
Expect(err).To(Equal(wait.ErrWaitTimeout), "Pod was not deleted during network partition.")
Expand Down Expand Up @@ -464,7 +432,7 @@ var _ = framework.KubeDescribe("Network Partition [Disruptive] [Slow]", func() {
// This creates a temporary network partition, verifies that the job has 'parallelism' number of
// running pods after the node-controller detects node unreachable.
By(fmt.Sprintf("blocking network traffic from node %s", node.Name))
testUnderTemporaryNetworkFailure(c, ns, node, func() {
framework.TestUnderTemporaryNetworkFailure(c, ns, node, func() {
framework.Logf("Waiting for pod %s to be removed", pods.Items[0].Name)
err := framework.WaitForPodToDisappear(c, ns, pods.Items[0].Name, label, 20*time.Second, 10*time.Minute)
Expect(err).To(Equal(wait.ErrWaitTimeout), "Pod was not deleted during network partition.")
Expand Down