Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add e2e test for network partition #19731

Merged
merged 1 commit into from
Jan 30, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
259 changes: 200 additions & 59 deletions test/e2e/resize_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,21 @@ import (

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"k8s.io/kubernetes/pkg/client/cache"
awscloud "k8s.io/kubernetes/pkg/cloudprovider/providers/aws"
controllerframework "k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/watch"
)

const (
serveHostnameImage = "gcr.io/google_containers/serve_hostname:1.1"
resizeNodeReadyTimeout = 2 * time.Minute
resizeNodeNotReadyTimeout = 2 * time.Minute
nodeReadinessTimeout = 3 * time.Minute
podNotReadyTimeout = 1 * time.Minute
podReadyTimeout = 2 * time.Minute
testPort = 9376
)

Expand Down Expand Up @@ -287,35 +295,55 @@ func verifyPods(c *client.Client, ns, name string, wantName bool, replicas int)
return nil
}

// Blocks outgoing network traffic on 'node'. Then verifies that 'podNameToDisappear',
// that belongs to replication controller 'rcName', really disappeared.
// Finally, it checks that the replication controller recreates the
// pods on another node and that now the number of replicas is equal 'replicas'.
// At the end (even in case of errors), the network traffic is brought back to normal.
// This function executes commands on a node so it will work only for some
// environments.
func performTemporaryNetworkFailure(c *client.Client, ns, rcName string, replicas int, podNameToDisappear string, node *api.Node) {
Logf("Getting external IP address for %s", node.Name)
host := ""
for _, a := range node.Status.Addresses {
if a.Type == api.NodeExternalIP {
host = a.Address + ":22"
break
}
func blockNetwork(from string, to string) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move the block and unblock functions into e2e/utils?

type netsplitHelper struct { timeout time.Time }

func (n *netsplitHelper) Block() error {}
func (n *netsplitHelper)Unblock() error {}

I'm alright with a follow up pr that does this, I gather many tests can use this functionality (I have one in mind already).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can do a follow up PR. No need to make this one too complex.

Logf("block network traffic from %s to %s", from, to)
iptablesRule := fmt.Sprintf("OUTPUT --destination %s --jump REJECT", to)
dropCmd := fmt.Sprintf("sudo iptables --insert %s", iptablesRule)
if result, err := SSH(dropCmd, from, testContext.Provider); result.Code != 0 || err != nil {
LogSSHResult(result)
Failf("Unexpected error: %v", err)
}
if host == "" {
Failf("Couldn't get the external IP of host %s with addresses %v", node.Name, node.Status.Addresses)
}

func unblockNetwork(from string, to string) {
Logf("Unblock network traffic from %s to %s", from, to)
iptablesRule := fmt.Sprintf("OUTPUT --destination %s --jump REJECT", to)
undropCmd := fmt.Sprintf("sudo iptables --delete %s", iptablesRule)
// Undrop command may fail if the rule has never been created.
// In such case we just lose 30 seconds, but the cluster is healthy.
// But if the rule had been created and removing it failed, the node is broken and
// not coming back. Subsequent tests will run or fewer nodes (some of the tests
// may fail). Manual intervention is required in such case (recreating the
// cluster solves the problem too).
err := wait.Poll(time.Millisecond*100, time.Second*30, func() (bool, error) {
result, err := SSH(undropCmd, from, testContext.Provider)
if result.Code == 0 && err == nil {
return true, nil
}
LogSSHResult(result)
if err != nil {
Logf("Unexpected error: %v", err)
}
return false, nil
})
if err != nil {
Failf("Failed to remove the iptable REJECT rule. Manual intervention is "+
"required on host %s: remove rule %s, if exists", from, iptablesRule)
}
}

By(fmt.Sprintf("block network traffic from node %s to the master", node.Name))
func getMaster(c *client.Client) string {
master := ""
switch testContext.Provider {
case "gce":
// TODO(#10085): The use of MasterName will cause iptables to do a DNS
// lookup to resolve the name to an IP address, which will slow down the
// test and cause it to fail if DNS is absent or broken. Use the
// internal IP address instead (i.e. NOT the one in testContext.Host).
master = testContext.CloudConfig.MasterName
eps, err := c.Endpoints(api.NamespaceDefault).Get("kubernetes")
if err != nil {
Failf("Fail to get kubernetes endpoinds: %v", err)
}
if len(eps.Subsets) != 1 || len(eps.Subsets[0].Addresses) != 1 {
Failf("There are more than 1 endpoints for kubernetes service: %+v", eps)
}
master = eps.Subsets[0].Addresses[0].IP
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the IP instead of host name... This is to avoid external DNS dependancy for iptables.

case "gke":
master = strings.TrimPrefix(testContext.Host, "https://")
case "aws":
Expand All @@ -324,53 +352,51 @@ func performTemporaryNetworkFailure(c *client.Client, ns, rcName string, replica
default:
Failf("This test is not supported for provider %s and should be disabled", testContext.Provider)
}
iptablesRule := fmt.Sprintf("OUTPUT --destination %s --jump REJECT", master)
return master
}

// Return node external IP concatenated with port 22 for ssh
// e.g. 1.2.3.4:22
func getNodeExternalIP(node *api.Node) string {
Logf("Getting external IP address for %s", node.Name)
host := ""
for _, a := range node.Status.Addresses {
if a.Type == api.NodeExternalIP {
host = a.Address + ":22"
break
}
}
if host == "" {
Failf("Couldn't get the external IP of host %s with addresses %v", node.Name, node.Status.Addresses)
}
return host
}

// Blocks outgoing network traffic on 'node'. Then verifies that 'podNameToDisappear',
// that belongs to replication controller 'rcName', really disappeared.
// Finally, it checks that the replication controller recreates the
// pods on another node and that now the number of replicas is equal 'replicas'.
// At the end (even in case of errors), the network traffic is brought back to normal.
// This function executes commands on a node so it will work only for some
// environments.
func performTemporaryNetworkFailure(c *client.Client, ns, rcName string, replicas int, podNameToDisappear string, node *api.Node) {
host := getNodeExternalIP(node)
master := getMaster(c)
By(fmt.Sprintf("block network traffic from node %s to the master", node.Name))
defer func() {
// This code will execute even if setting the iptables rule failed.
// It is on purpose because we may have an error even if the new rule
// had been inserted. (yes, we could look at the error code and ssh error
// separately, but I prefer to stay on the safe side).

By(fmt.Sprintf("Unblock network traffic from node %s to the master", node.Name))
undropCmd := fmt.Sprintf("sudo iptables --delete %s", iptablesRule)
// Undrop command may fail if the rule has never been created.
// In such case we just lose 30 seconds, but the cluster is healthy.
// But if the rule had been created and removing it failed, the node is broken and
// not coming back. Subsequent tests will run or fewer nodes (some of the tests
// may fail). Manual intervention is required in such case (recreating the
// cluster solves the problem too).
err := wait.Poll(time.Millisecond*100, time.Second*30, func() (bool, error) {
result, err := SSH(undropCmd, host, testContext.Provider)
if result.Code == 0 && err == nil {
return true, nil
}
LogSSHResult(result)
if err != nil {
Logf("Unexpected error: %v", err)
}
return false, nil
})
if err != nil {
Failf("Failed to remove the iptable REJECT rule. Manual intervention is "+
"required on node %s: remove rule %s, if exists", node.Name, iptablesRule)
}
unblockNetwork(host, master)
}()

Logf("Waiting %v to ensure node %s is ready before beginning test...", resizeNodeReadyTimeout, node.Name)
if !waitForNodeToBe(c, node.Name, api.NodeReady, true, resizeNodeReadyTimeout) {
Failf("Node %s did not become ready within %v", node.Name, resizeNodeReadyTimeout)
}

// The command will block all outgoing network traffic from the node to the master
// When multi-master is implemented, this test will have to be improved to block
// network traffic to all masters.
// We could also block network traffic from the master(s) to this node,
// but blocking it one way is sufficient for this test.
dropCmd := fmt.Sprintf("sudo iptables --insert %s", iptablesRule)
if result, err := SSH(dropCmd, host, testContext.Provider); result.Code != 0 || err != nil {
LogSSHResult(result)
Failf("Unexpected error: %v", err)
}
blockNetwork(host, master)

Logf("Waiting %v for node %s to be not ready after simulated network failure", resizeNodeNotReadyTimeout, node.Name)
if !waitForNodeToBe(c, node.Name, api.NodeReady, false, resizeNodeNotReadyTimeout) {
Expand All @@ -388,12 +414,32 @@ func performTemporaryNetworkFailure(c *client.Client, ns, rcName string, replica
// network traffic is unblocked in a deferred function
}

func expectNodeReadiness(isReady bool, newNode chan *api.Node) {
timeout := false
expected := false
timer := time.After(nodeReadinessTimeout)
for !expected && !timeout {
select {
case n := <-newNode:
if isNodeConditionSetAsExpected(n, api.NodeReady, isReady) {
expected = true
} else {
Logf("Observed node ready status is NOT %v as expected", isReady)
}
case <-timer:
timeout = true
}
}
if !expected {
Failf("Failed to observe node ready status change to %v", isReady)
}
}

var _ = Describe("Nodes [Disruptive]", func() {
framework := NewFramework("resize-nodes")
var systemPodsNo int
var c *client.Client
var ns string

BeforeEach(func() {
c = framework.Client
ns = framework.Namespace.Name
Expand Down Expand Up @@ -560,6 +606,101 @@ var _ = Describe("Nodes [Disruptive]", func() {
}
}
})

// What happens in this test:
// Network traffic from a node to master is cut off to simulate network partition
// Expect to observe:
// 1. Node is marked NotReady after timeout by nodecontroller (40seconds)
// 2. All pods on node are marked NotReady shortly after #1
// 3. Node and pods return to Ready after connectivivty recovers
It("All pods on the unreachable node should be marked as NotReady upon the node turn NotReady "+
"AND all pods should be mark back to Ready when the node get back to Ready before pod eviction timeout", func() {
By("choose a node - we will block all network traffic on this node")
var podOpts api.ListOptions
nodeOpts := api.ListOptions{}
nodes, err := c.Nodes().List(nodeOpts)
Expect(err).NotTo(HaveOccurred())
filterNodes(nodes, func(node api.Node) bool {
if !isNodeConditionSetAsExpected(&node, api.NodeReady, true) {
return false
}
podOpts = api.ListOptions{FieldSelector: fields.OneTermEqualSelector(client.PodHost, node.Name)}
pods, err := c.Pods(api.NamespaceAll).List(podOpts)
if err != nil || len(pods.Items) <= 0 {
return false
}
return true
})
if len(nodes.Items) <= 0 {
Failf("No eligible node were found: %d", len(nodes.Items))
}
node := nodes.Items[0]
podOpts = api.ListOptions{FieldSelector: fields.OneTermEqualSelector(client.PodHost, node.Name)}
if err = waitForMatchPodsCondition(c, podOpts, "Running and Ready", podReadyTimeout, podRunningReady); err != nil {
Failf("Pods on node %s are not ready and running within %v: %v", node.Name, podReadyTimeout, err)
}

By("Set up watch on node status")
nodeSelector := fields.OneTermEqualSelector("metadata.name", node.Name)
stopCh := make(chan struct{})
newNode := make(chan *api.Node)
var controller *controllerframework.Controller
_, controller = controllerframework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
options.FieldSelector = nodeSelector
return framework.Client.Nodes().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
options.FieldSelector = nodeSelector
return framework.Client.Nodes().Watch(options)
},
},
&api.Node{},
0,
controllerframework.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj, newObj interface{}) {
n, ok := newObj.(*api.Node)
Expect(ok).To(Equal(true))
newNode <- n

},
},
)

defer func() {
// Will not explicitly close newNode channel here due to
// race condition where stopCh and newNode are closed but informer onUpdate still executes.
close(stopCh)
}()
go controller.Run(stopCh)

By(fmt.Sprintf("Block traffic from node %s to the master", node.Name))
host := getNodeExternalIP(&node)
master := getMaster(c)
defer func() {
By(fmt.Sprintf("Unblock traffic from node %s to the master", node.Name))
unblockNetwork(host, master)

if CurrentGinkgoTestDescription().Failed {
return
}

By("Expect to observe node and pod status change from NotReady to Ready after network connectivity recovers")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The section between this By and the next feels like it can be isolated into a function (in one case it's notready->ready and the other its ready->notready). Is that easy?

expectNodeReadiness(true, newNode)
if err = waitForMatchPodsCondition(c, podOpts, "Running and Ready", podReadyTimeout, podRunningReady); err != nil {
Failf("Pods on node %s did not become ready and running within %v: %v", node.Name, podReadyTimeout, err)
}
}()

blockNetwork(host, master)

By("Expect to observe node and pod status change from Ready to NotReady after network partition")
expectNodeReadiness(false, newNode)
if err = waitForMatchPodsCondition(c, podOpts, "NotReady", podNotReadyTimeout, podNotReady); err != nil {
Failf("Pods on node %s did not become NotReady within %v: %v", node.Name, podNotReadyTimeout, err)
}
})
})
})
})
38 changes: 38 additions & 0 deletions test/e2e/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/kubectl"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
Expand Down Expand Up @@ -354,6 +355,16 @@ func podRunningReady(p *api.Pod) (bool, error) {
return true, nil
}

// podNotReady checks whether pod p's has a ready condition of status false.
func podNotReady(p *api.Pod) (bool, error) {
// Check the ready condition is false.
if podReady(p) {
return false, fmt.Errorf("pod '%s' on '%s' didn't have condition {%v %v}; conditions: %v",
p.ObjectMeta.Name, p.Spec.NodeName, api.PodReady, api.ConditionFalse, p.Status.Conditions)
}
return true, nil
}

// check if a Pod is controlled by a Replication Controller in the List
func hasReplicationControllersForPod(rcs *api.ReplicationControllerList, pod api.Pod) bool {
for _, rc := range rcs.Items {
Expand Down Expand Up @@ -545,6 +556,33 @@ func waitForPodCondition(c *client.Client, ns, podName, desc string, timeout tim
return fmt.Errorf("gave up waiting for pod '%s' to be '%s' after %v", podName, desc, timeout)
}

// waitForMatchPodsCondition finds match pods based on the input ListOptions.
// waits and checks if all match pods are in the given podCondition
func waitForMatchPodsCondition(c *client.Client, opts api.ListOptions, desc string, timeout time.Duration, condition podCondition) error {
Logf("Waiting up to %v for matching pods' status to be %s", timeout, desc)
for start := time.Now(); time.Since(start) < timeout; time.Sleep(poll) {
pods, err := c.Pods(api.NamespaceAll).List(opts)
if err != nil {
return err
}
conditionNotMatch := []string{}
for _, pod := range pods.Items {
done, err := condition(&pod)
if done && err != nil {
return fmt.Errorf("Unexpected error: %v", err)
}
if !done {
conditionNotMatch = append(conditionNotMatch, format.Pod(&pod))
}
}
if len(conditionNotMatch) <= 0 {
return err
}
Logf("%d pods are not %s", len(conditionNotMatch), desc)
}
return fmt.Errorf("gave up waiting for matching pods to be '%s' after %v", desc, timeout)
}

// waitForDefaultServiceAccountInNamespace waits for the default service account to be provisioned
// the default service account is what is associated with pods when they do not specify a service account
// as a result, pods are not able to be provisioned in a namespace until the service account is provisioned
Expand Down