-
Notifications
You must be signed in to change notification settings - Fork 38.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
De-flaking E2E: PersistentVolumes[Disruptive] - restart kubelet via service manager CLI #44923
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,10 +18,12 @@ package storage | |
|
||
import ( | ||
"fmt" | ||
"strings" | ||
"time" | ||
|
||
. "github.com/onsi/ginkgo" | ||
. "github.com/onsi/gomega" | ||
apierrs "k8s.io/apimachinery/pkg/api/errors" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/apimachinery/pkg/labels" | ||
"k8s.io/kubernetes/pkg/api/v1" | ||
|
@@ -58,10 +60,11 @@ var _ = framework.KubeDescribe("PersistentVolumes [Volume][Disruptive][Flaky]", | |
volLabel labels.Set | ||
selector *metav1.LabelSelector | ||
) | ||
|
||
BeforeEach(func() { | ||
// To protect the NFS volume pod from the kubelet restart, we isolate it on its own node. | ||
framework.SkipUnlessNodeCountIsAtLeast(MinNodes) | ||
framework.SkipIfProviderIs("local") | ||
|
||
c = f.ClientSet | ||
ns = f.Namespace.Name | ||
volLabel = labels.Set{framework.VolumeSelectorKey: ns} | ||
|
@@ -70,7 +73,7 @@ var _ = framework.KubeDescribe("PersistentVolumes [Volume][Disruptive][Flaky]", | |
// Start the NFS server pod. | ||
framework.Logf("[BeforeEach] Creating NFS Server Pod") | ||
nfsServerPod = initNFSserverPod(c, ns) | ||
|
||
framework.Logf("NFS server Pod %q created on Node %q", nfsServerPod.Name, nfsServerPod.Spec.NodeName) | ||
framework.Logf("[BeforeEach] Configuring PersistentVolume") | ||
nfsServerIP = nfsServerPod.Status.PodIP | ||
Expect(nfsServerIP).NotTo(BeEmpty()) | ||
|
@@ -105,44 +108,38 @@ var _ = framework.KubeDescribe("PersistentVolumes [Volume][Disruptive][Flaky]", | |
Expect(clientNodeIP).NotTo(BeEmpty()) | ||
} | ||
}) | ||
|
||
AfterEach(func() { | ||
framework.DeletePodWithWait(f, c, nfsServerPod) | ||
}) | ||
|
||
Context("when kubelet restarts", func() { | ||
|
||
var ( | ||
clientPod *v1.Pod | ||
pv *v1.PersistentVolume | ||
pvc *v1.PersistentVolumeClaim | ||
) | ||
|
||
BeforeEach(func() { | ||
framework.Logf("Initializing test spec") | ||
clientPod, pv, pvc = initTestCase(f, c, nfsPVconfig, pvcConfig, ns, clientNode.Name) | ||
}) | ||
|
||
AfterEach(func() { | ||
framework.Logf("Tearing down test spec") | ||
tearDownTestCase(c, f, ns, clientPod, pvc, pv) | ||
pv, pvc, clientPod = nil, nil, nil | ||
}) | ||
|
||
// Test table housing the It() title string and test spec. runTest is type testBody, defined at | ||
// the start of this file. To add tests, define a function mirroring the testBody signature and assign | ||
// to runTest. | ||
disruptiveTestTable := []disruptiveTest{ | ||
{ | ||
testItStmt: "Should test that a file written to the mount before kubelet restart can be read after restart.", | ||
testItStmt: "Should test that a file written to the mount before kubelet restart is readable after restart.", | ||
runTest: testKubeletRestartsAndRestoresMount, | ||
}, | ||
{ | ||
testItStmt: "Should test that a volume mounted to a pod that is deleted while the kubelet is down unmounts when the kubelet returns.", | ||
runTest: testVolumeUnmountsFromDeletedPod, | ||
}, | ||
} | ||
|
||
// Test loop executes each disruptiveTest iteratively. | ||
for _, test := range disruptiveTestTable { | ||
func(t disruptiveTest) { | ||
|
@@ -159,14 +156,16 @@ var _ = framework.KubeDescribe("PersistentVolumes [Volume][Disruptive][Flaky]", | |
func testKubeletRestartsAndRestoresMount(c clientset.Interface, f *framework.Framework, clientPod *v1.Pod, pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) { | ||
By("Writing to the volume.") | ||
file := "/mnt/_SUCCESS" | ||
_, err := podExec(clientPod, fmt.Sprintf("touch %s", file)) | ||
out, err := podExec(clientPod, fmt.Sprintf("touch %s", file)) | ||
framework.Logf(out) | ||
Expect(err).NotTo(HaveOccurred()) | ||
|
||
By("Restarting kubelet") | ||
kubeletCommand(kRestart, c, clientPod) | ||
|
||
By("Testing that written file is accessible.") | ||
_, err = podExec(clientPod, fmt.Sprintf("cat %s", file)) | ||
out, err = podExec(clientPod, fmt.Sprintf("cat %s", file)) | ||
framework.Logf(out) | ||
Expect(err).NotTo(HaveOccurred()) | ||
framework.Logf("Volume mount detected on pod %s and written file %s is readable post-restart.", clientPod.Name, file) | ||
} | ||
|
@@ -178,36 +177,61 @@ func testVolumeUnmountsFromDeletedPod(c clientset.Interface, f *framework.Framew | |
nodeIP = nodeIP + ":22" | ||
|
||
By("Expecting the volume mount to be found.") | ||
result, err := framework.SSH(fmt.Sprintf("mount| grep %s", string(clientPod.UID)), nodeIP, framework.TestContext.Provider) | ||
Expect(err).NotTo(HaveOccurred()) | ||
Expect(result.Code).To(BeZero()) | ||
result, err := framework.SSH(fmt.Sprintf("mount | grep %s", clientPod.UID), nodeIP, framework.TestContext.Provider) | ||
framework.LogSSHResult(result) | ||
Expect(err).NotTo(HaveOccurred(), "Encountered SSH error.") | ||
Expect(result.Code).To(BeZero(), fmt.Sprintf("Expected grep exit code of 0, got %s", result.Code)) | ||
|
||
By("Restarting the kubelet.") | ||
By("Stopping the kubelet.") | ||
kubeletCommand(kStop, c, clientPod) | ||
framework.ExpectNoError(framework.DeletePodWithWait(f, c, clientPod), "Failed to delete pod ", clientPod.Name) | ||
defer func() { | ||
if err != nil { | ||
kubeletCommand(kStart, c, clientPod) | ||
} | ||
}() | ||
By(fmt.Sprintf("Deleting Pod %q", clientPod.Name)) | ||
err = c.Core().Pods(clientPod.Namespace).Delete(clientPod.Name, &metav1.DeleteOptions{}) | ||
Expect(err).NotTo(HaveOccurred()) | ||
By("Starting the kubelet and waiting for pod to delete.") | ||
kubeletCommand(kStart, c, clientPod) | ||
err = f.WaitForPodTerminated(clientPod.Name, "") | ||
if !apierrs.IsNotFound(err) && err != nil { | ||
Expect(err).NotTo(HaveOccurred(), "Expected pod to terminate.") | ||
} | ||
|
||
By("Expecting the volume mount not to be found.") | ||
result, err = framework.SSH(fmt.Sprintf("mount| grep %s", string(clientPod.UID)), nodeIP, framework.TestContext.Provider) | ||
Expect(err).NotTo(HaveOccurred()) | ||
Expect(result.Code).NotTo(BeZero()) | ||
result, err = framework.SSH(fmt.Sprintf("mount | grep %s", clientPod.UID), nodeIP, framework.TestContext.Provider) | ||
framework.LogSSHResult(result) | ||
Expect(err).NotTo(HaveOccurred(), "Encountered SSH error.") | ||
Expect(result.Stdout).To(BeEmpty(), "Expected grep stdout to be empty (i.e. no mount found).") | ||
framework.Logf("Volume unmounted on node %s", clientPod.Spec.NodeName) | ||
} | ||
|
||
// initTestCase initializes spec resources (pv, pvc, and pod) and returns pointers to be consumed | ||
// by the test. | ||
func initTestCase(f *framework.Framework, c clientset.Interface, pvConfig framework.PersistentVolumeConfig, pvcConfig framework.PersistentVolumeClaimConfig, ns, nodeName string) (*v1.Pod, *v1.PersistentVolume, *v1.PersistentVolumeClaim) { | ||
|
||
pv, pvc, err := framework.CreatePVPVC(c, pvConfig, pvcConfig, ns, false) | ||
defer func() { | ||
if err != nil { | ||
framework.DeletePersistentVolumeClaim(c, pvc.Name, ns) | ||
framework.DeletePersistentVolume(c, pv.Name) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Intentional to skip error checking above? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, the reason being that an error will interrupt cleanup and orphan the PV. An alternative would be to return all resources on failure. I'm fine with that method, but I'd like to do that in a separate PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agree. |
||
} | ||
}() | ||
Expect(err).NotTo(HaveOccurred()) | ||
pod := framework.MakePod(ns, []*v1.PersistentVolumeClaim{pvc}, true, "") | ||
pod.Spec.NodeName = nodeName | ||
framework.Logf("Creating nfs client Pod %s on node %s", pod.Name, nodeName) | ||
framework.Logf("Creating NFS client pod.") | ||
pod, err = c.CoreV1().Pods(ns).Create(pod) | ||
framework.Logf("NFS client Pod %q created on Node %q", pod.Name, nodeName) | ||
Expect(err).NotTo(HaveOccurred()) | ||
defer func() { | ||
if err != nil { | ||
framework.DeletePodWithWait(f, c, pod) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No error check here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same reason as above. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In test a setup where 2 Only the function scope e.g.
Will only produce
|
||
} | ||
}() | ||
err = framework.WaitForPodRunningInNamespace(c, pod) | ||
Expect(err).NotTo(HaveOccurred()) | ||
|
||
Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("Pod %q timed out waiting for phase: Running", pod.Name)) | ||
// Return created api objects | ||
pod, err = c.CoreV1().Pods(ns).Get(pod.Name, metav1.GetOptions{}) | ||
Expect(err).NotTo(HaveOccurred()) | ||
pvc, err = c.CoreV1().PersistentVolumeClaims(ns).Get(pvc.Name, metav1.GetOptions{}) | ||
|
@@ -218,22 +242,36 @@ func initTestCase(f *framework.Framework, c clientset.Interface, pvConfig framew | |
} | ||
|
||
// tearDownTestCase destroy resources created by initTestCase. | ||
func tearDownTestCase(c clientset.Interface, f *framework.Framework, ns string, pod *v1.Pod, pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) { | ||
framework.ExpectNoError(framework.DeletePodWithWait(f, c, pod), "tearDown: Failed to delete pod ", pod.Name) | ||
framework.ExpectNoError(framework.DeletePersistentVolumeClaim(c, pvc.Name, ns), "tearDown: Failed to delete PVC ", pvc.Name) | ||
framework.ExpectNoError(framework.DeletePersistentVolume(c, pv.Name), "tearDown: Failed to delete PV ", pv.Name) | ||
func tearDownTestCase(c clientset.Interface, f *framework.Framework, ns string, client *v1.Pod, pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) { | ||
// Ignore deletion errors. Failing on them will interrupt test cleanup. | ||
framework.DeletePodWithWait(f, c, client) | ||
framework.DeletePersistentVolumeClaim(c, pvc.Name, ns) | ||
framework.DeletePersistentVolume(c, pv.Name) | ||
} | ||
|
||
// kubeletCommand performs `start`, `restart`, or `stop` on the kubelet running on the node of the target pod. | ||
// kubeletCommand performs `start`, `restart`, or `stop` on the kubelet running on the node of the target pod and waits | ||
// for the desired statues.. | ||
// - First issues the command via `systemctl` | ||
// - If `systemctl` returns stderr "command not found, issues the command via `service` | ||
// - If `service` also returns stderr "command not found", the test is aborted. | ||
// Allowed kubeletOps are `kStart`, `kStop`, and `kRestart` | ||
func kubeletCommand(kOp kubeletOpt, c clientset.Interface, pod *v1.Pod) { | ||
nodeIP, err := framework.GetHostExternalAddress(c, pod) | ||
Expect(err).NotTo(HaveOccurred()) | ||
nodeIP = nodeIP + ":22" | ||
sshResult, err := framework.SSH("sudo /etc/init.d/kubelet "+string(kOp), nodeIP, framework.TestContext.Provider) | ||
Expect(err).NotTo(HaveOccurred()) | ||
systemctlCmd := fmt.Sprintf("sudo systemctl %s kubelet", string(kOp)) | ||
framework.Logf("Attempting `%s`", systemctlCmd) | ||
sshResult, err := framework.SSH(systemctlCmd, nodeIP, framework.TestContext.Provider) | ||
Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("SSH to Node %q errored.", pod.Spec.NodeName)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If error != nil, the test will stop here and return? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so if err is non-nil, should we try another SSh with service kubelet, or just return? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the err return from |
||
framework.LogSSHResult(sshResult) | ||
|
||
if strings.Contains(sshResult.Stderr, "command not found") { | ||
serviceCmd := fmt.Sprintf("sudo service kubelet %s", string(kOp)) | ||
framework.Logf("Attempting `%s`", serviceCmd) | ||
sshResult, err = framework.SSH(serviceCmd, nodeIP, framework.TestContext.Provider) | ||
Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("SSH to Node %q errored.", pod.Spec.NodeName)) | ||
framework.LogSSHResult(sshResult) | ||
} | ||
Expect(sshResult.Code).To(BeZero(), "Failed to [%s] kubelet:\n%#v", string(kOp), sshResult) | ||
// On restart, waiting for node NotReady prevents a race condition where the node takes a few moments to leave the | ||
// Ready state which in turn short circuits WaitForNodeToBeReady() | ||
if kOp == kStop || kOp == kRestart { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it ok to start the kubelet twice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this instance, I think it's okay. Issuing
start
on a running kubelet will make it restart. This defer is primarily to protect against the kubelet being left stopped if an error happens while deleting the pod or waiting for the pod to terminate. In testing, timeouts here left the kubelet stopped after the test finished.The side effect being if an error occurs after the kubelet is started (SSH fails or the volume is still attached), then it will be restarted once before the function returns.
kubeletCommand
waits for the node to come back online before returning, so this will add a few seconds to run time.