Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #51039 #51224 #51230 #52057 #52200

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
55 changes: 34 additions & 21 deletions test/e2e/framework/statefulset_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (s *StatefulSetTester) CheckMount(ss *apps.StatefulSet, mountPath string) e
func (s *StatefulSetTester) ExecInStatefulPods(ss *apps.StatefulSet, cmd string) error {
podList := s.GetPodList(ss)
for _, statefulPod := range podList.Items {
stdout, err := RunHostCmd(statefulPod.Namespace, statefulPod.Name, cmd)
stdout, err := RunHostCmdWithRetries(statefulPod.Namespace, statefulPod.Name, cmd, StatefulSetPoll, StatefulPodTimeout)
Logf("stdout of %v on %v: %v", cmd, statefulPod.Name, stdout)
if err != nil {
return err
Expand All @@ -163,7 +163,7 @@ func (s *StatefulSetTester) CheckHostname(ss *apps.StatefulSet) error {
cmd := "printf $(hostname)"
podList := s.GetPodList(ss)
for _, statefulPod := range podList.Items {
hostname, err := RunHostCmd(statefulPod.Namespace, statefulPod.Name, cmd)
hostname, err := RunHostCmdWithRetries(statefulPod.Namespace, statefulPod.Name, cmd, StatefulSetPoll, StatefulPodTimeout)
if err != nil {
return err
}
Expand Down Expand Up @@ -212,10 +212,12 @@ func getStatefulSetPodNameAtIndex(index int, ss *apps.StatefulSet) string {
}

// Scale scales ss to count replicas.
func (s *StatefulSetTester) Scale(ss *apps.StatefulSet, count int32) error {
func (s *StatefulSetTester) Scale(ss *apps.StatefulSet, count int32) (*apps.StatefulSet, error) {
name := ss.Name
ns := ss.Namespace
s.update(ns, name, func(ss *apps.StatefulSet) { *(ss.Spec.Replicas) = count })

Logf("Scaling statefulset %s to %d", name, count)
ss = s.update(ns, name, func(ss *apps.StatefulSet) { *(ss.Spec.Replicas) = count })

var statefulPodList *v1.PodList
pollErr := wait.PollImmediate(StatefulSetPoll, StatefulSetTimeout, func() (bool, error) {
Expand All @@ -233,9 +235,9 @@ func (s *StatefulSetTester) Scale(ss *apps.StatefulSet, count int32) error {
unhealthy = append(unhealthy, fmt.Sprintf("%v: deletion %v, phase %v, readiness %v", statefulPod.Name, delTs, phase, readiness))
}
}
return fmt.Errorf("Failed to scale statefulset to %d in %v. Remaining pods:\n%v", count, StatefulSetTimeout, unhealthy)
return ss, fmt.Errorf("Failed to scale statefulset to %d in %v. Remaining pods:\n%v", count, StatefulSetTimeout, unhealthy)
}
return nil
return ss, nil
}

// UpdateReplicas updates the replicas of ss to count.
Expand All @@ -246,11 +248,16 @@ func (s *StatefulSetTester) UpdateReplicas(ss *apps.StatefulSet, count int32) {
// Restart scales ss to 0 and then back to its previous number of replicas.
func (s *StatefulSetTester) Restart(ss *apps.StatefulSet) {
oldReplicas := *(ss.Spec.Replicas)
ExpectNoError(s.Scale(ss, 0))
ss, err := s.Scale(ss, 0)
ExpectNoError(err)
// Wait for controller to report the desired number of Pods.
// This way we know the controller has observed all Pod deletions
// before we scale it back up.
s.WaitForStatusReplicas(ss, 0)
s.update(ss.Namespace, ss.Name, func(ss *apps.StatefulSet) { *(ss.Spec.Replicas) = oldReplicas })
}

func (s *StatefulSetTester) update(ns, name string, update func(ss *apps.StatefulSet)) {
func (s *StatefulSetTester) update(ns, name string, update func(ss *apps.StatefulSet)) *apps.StatefulSet {
for i := 0; i < 3; i++ {
ss, err := s.c.Apps().StatefulSets(ns).Get(name, metav1.GetOptions{})
if err != nil {
Expand All @@ -259,13 +266,14 @@ func (s *StatefulSetTester) update(ns, name string, update func(ss *apps.Statefu
update(ss)
ss, err = s.c.Apps().StatefulSets(ns).Update(ss)
if err == nil {
return
return ss
}
if !apierrs.IsConflict(err) && !apierrs.IsServerTimeout(err) {
Failf("failed to update statefulset %q: %v", name, err)
}
}
Failf("too many retries draining statefulset %q", name)
return nil
}

// GetPodList gets the current Pods in ss.
Expand Down Expand Up @@ -523,7 +531,8 @@ func (s *StatefulSetTester) BreakHttpProbe(ss *apps.StatefulSet) error {
if path == "" {
return fmt.Errorf("Path expected to be not empty: %v", path)
}
cmd := fmt.Sprintf("mv -v /usr/share/nginx/html%v /tmp/", path)
// Ignore 'mv' errors to make this idempotent.
cmd := fmt.Sprintf("mv -v /usr/share/nginx/html%v /tmp/ || true", path)
return s.ExecInStatefulPods(ss, cmd)
}

Expand All @@ -533,8 +542,9 @@ func (s *StatefulSetTester) BreakPodHttpProbe(ss *apps.StatefulSet, pod *v1.Pod)
if path == "" {
return fmt.Errorf("Path expected to be not empty: %v", path)
}
cmd := fmt.Sprintf("mv -v /usr/share/nginx/html%v /tmp/", path)
stdout, err := RunHostCmd(pod.Namespace, pod.Name, cmd)
// Ignore 'mv' errors to make this idempotent.
cmd := fmt.Sprintf("mv -v /usr/share/nginx/html%v /tmp/ || true", path)
stdout, err := RunHostCmdWithRetries(pod.Namespace, pod.Name, cmd, StatefulSetPoll, StatefulPodTimeout)
Logf("stdout of %v on %v: %v", cmd, pod.Name, stdout)
return err
}
Expand All @@ -545,7 +555,8 @@ func (s *StatefulSetTester) RestoreHttpProbe(ss *apps.StatefulSet) error {
if path == "" {
return fmt.Errorf("Path expected to be not empty: %v", path)
}
cmd := fmt.Sprintf("mv -v /tmp%v /usr/share/nginx/html/", path)
// Ignore 'mv' errors to make this idempotent.
cmd := fmt.Sprintf("mv -v /tmp%v /usr/share/nginx/html/ || true", path)
return s.ExecInStatefulPods(ss, cmd)
}

Expand All @@ -555,15 +566,16 @@ func (s *StatefulSetTester) RestorePodHttpProbe(ss *apps.StatefulSet, pod *v1.Po
if path == "" {
return fmt.Errorf("Path expected to be not empty: %v", path)
}
cmd := fmt.Sprintf("mv -v /tmp%v /usr/share/nginx/html/", path)
stdout, err := RunHostCmd(pod.Namespace, pod.Name, cmd)
// Ignore 'mv' errors to make this idempotent.
cmd := fmt.Sprintf("mv -v /tmp%v /usr/share/nginx/html/ || true", path)
stdout, err := RunHostCmdWithRetries(pod.Namespace, pod.Name, cmd, StatefulSetPoll, StatefulPodTimeout)
Logf("stdout of %v on %v: %v", cmd, pod.Name, stdout)
return err
}

var pauseProbe = &v1.Probe{
Handler: v1.Handler{
Exec: &v1.ExecAction{Command: []string{"test", "-f", "/tmp/statefulset-continue"}},
Exec: &v1.ExecAction{Command: []string{"test", "-f", "/data/statefulset-continue"}},
},
PeriodSeconds: 1,
SuccessThreshold: 1,
Expand Down Expand Up @@ -601,7 +613,7 @@ func (s *StatefulSetTester) ResumeNextPod(ss *apps.StatefulSet) {
if resumedPod != "" {
Failf("Found multiple paused stateful pods: %v and %v", pod.Name, resumedPod)
}
_, err := RunHostCmd(pod.Namespace, pod.Name, "touch /tmp/statefulset-continue")
_, err := RunHostCmdWithRetries(pod.Namespace, pod.Name, "touch /data/statefulset-continue; sync", StatefulSetPoll, StatefulPodTimeout)
ExpectNoError(err)
Logf("Resumed pod %v", pod.Name)
resumedPod = pod.Name
Expand Down Expand Up @@ -684,12 +696,13 @@ func DeleteAllStatefulSets(c clientset.Interface, ns string) {
// Scale down each statefulset, then delete it completely.
// Deleting a pvc without doing this will leak volumes, #25101.
errList := []string{}
for _, ss := range ssList.Items {
Logf("Scaling statefulset %v to 0", ss.Name)
if err := sst.Scale(&ss, 0); err != nil {
for i := range ssList.Items {
ss := &ssList.Items[i]
var err error
if ss, err = sst.Scale(ss, 0); err != nil {
errList = append(errList, fmt.Sprintf("%v", err))
}
sst.WaitForStatusReplicas(&ss, 0)
sst.WaitForStatusReplicas(ss, 0)
Logf("Deleting statefulset %v", ss.Name)
// Use OrphanDependents=false so it's deleted synchronously.
// We already made sure the Pods are gone inside Scale().
Expand Down
18 changes: 18 additions & 0 deletions test/e2e/framework/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3764,6 +3764,24 @@ func RunHostCmdOrDie(ns, name, cmd string) string {
return stdout
}

// RunHostCmdWithRetries calls RunHostCmd and retries errors it thinks may be transient
// until it succeeds or the specified timeout expires.
// This can be used with idempotent commands to deflake transient Node issues.
func RunHostCmdWithRetries(ns, name, cmd string, interval, timeout time.Duration) (string, error) {
start := time.Now()
for {
out, err := RunHostCmd(ns, name, cmd)
if err == nil {
return out, nil
}
if elapsed := time.Since(start); elapsed > timeout {
return out, fmt.Errorf("RunHostCmd still failed after %v: %v", elapsed, err)
}
Logf("Waiting %v to retry failed RunHostCmd: %v", interval, err)
time.Sleep(interval)
}
}

// LaunchHostExecPod launches a hostexec pod in the given namespace and waits
// until it's Running
func LaunchHostExecPod(client clientset.Interface, ns, name string) *v1.Pod {
Expand Down
12 changes: 6 additions & 6 deletions test/e2e/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ var _ = framework.KubeDescribe("StatefulSet", func() {

By("Restarting statefulset " + ss.Name)
sst.Restart(ss)
sst.Saturate(ss)
sst.WaitForRunningAndReady(*ss.Spec.Replicas, ss)

By("Verifying statefulset mounted data directory is usable")
framework.ExpectNoError(sst.CheckMount(ss, "/data"))
Expand Down Expand Up @@ -234,13 +234,13 @@ var _ = framework.KubeDescribe("StatefulSet", func() {
sst.DeleteStatefulPodAtIndex(0, ss)

By("Confirming stateful pod at index 0 is recreated.")
sst.WaitForRunning(2, 0, ss)
sst.WaitForRunning(2, 1, ss)

By("Deleting unhealthy stateful pod at index 1.")
sst.DeleteStatefulPodAtIndex(1, ss)
By("Resuming stateful pod at index 1.")
sst.ResumeNextPod(ss)

By("Confirming all stateful pods in statefulset are created.")
sst.Saturate(ss)
sst.WaitForRunningAndReady(*ss.Spec.Replicas, ss)
})

It("should perform rolling updates and roll backs of template modifications", func() {
Expand Down Expand Up @@ -758,7 +758,7 @@ var _ = framework.KubeDescribe("StatefulSet", func() {
By("Scaling down stateful set " + ssName + " to 0 replicas and waiting until none of pods will run in namespace" + ns)
sst.RestoreHttpProbe(ss)
sst.Scale(ss, 0)
sst.WaitForStatusReadyReplicas(ss, 0)
sst.WaitForStatusReplicas(ss, 0)
})

It("Should recreate evicted statefulset", func() {
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/upgrades/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,5 +109,5 @@ func (t *StatefulSetUpgradeTest) verify() {
func (t *StatefulSetUpgradeTest) restart() {
By("Restarting statefulset " + t.set.Name)
t.tester.Restart(t.set)
t.tester.Saturate(t.set)
t.tester.WaitForRunningAndReady(*t.set.Spec.Replicas, t.set)
}