Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve e2e retry logic with standard wait.Poll() #8399

Merged
1 commit merged into from
May 19, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
43 changes: 24 additions & 19 deletions test/e2e/es_cluster_logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
Expand Down Expand Up @@ -72,12 +73,14 @@ func ClusterLevelLoggingWithElasticsearch(c *client.Client) {
// being run as the first e2e test just after the e2e cluster has been created.
var err error
const graceTime = 10 * time.Minute
for start := time.Now(); time.Since(start) < graceTime; time.Sleep(5 * time.Second) {
start := time.Now()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can remove this and use graceTime instead in logging.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry - I'm confused. Ignore the above comment.

expectNoError(wait.Poll(5*time.Second, graceTime, func() (bool, error) {
if _, err = s.Get("elasticsearch-logging"); err == nil {
break
return true, nil
}
Logf("Attempt to check for the existence of the Elasticsearch service failed after %v", time.Since(start))
}
return false, nil
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally you should return (true, err) here in cases of errors which are not worth retrying. That way you fail earlier, and get a more useful error message as output (e.g. "malformed request x" rather then "timed out").

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some test cases are worth retrying, right?
I wonder how you distinguish them:)

}))
Expect(err).NotTo(HaveOccurred())

// Wait for the Elasticsearch pods to enter the running state.
Expand All @@ -95,7 +98,8 @@ func ClusterLevelLoggingWithElasticsearch(c *client.Client) {
var statusCode float64
var esResponse map[string]interface{}
err = nil
for start := time.Now(); time.Since(start) < graceTime; time.Sleep(5 * time.Second) {
start = time.Now()
expectNoError(wait.Poll(5*time.Second, graceTime, func() (bool, error) {
// Query against the root URL for Elasticsearch.
body, err := c.Get().
Namespace(api.NamespaceDefault).
Expand All @@ -105,26 +109,26 @@ func ClusterLevelLoggingWithElasticsearch(c *client.Client) {
DoRaw()
if err != nil {
Logf("After %v proxy call to elasticsearch-loigging failed: %v", time.Since(start), err)
continue
return false, nil
}
esResponse, err = bodyToJSON(body)
if err != nil {
Logf("After %v failed to convert Elasticsearch JSON response %v to map[string]interface{}: %v", time.Since(start), string(body), err)
continue
return false, nil
}
statusIntf, ok := esResponse["status"]
if !ok {
Logf("After %v Elasticsearch response has no status field: %v", time.Since(start), esResponse)
continue
return false, nil
}
statusCode, ok = statusIntf.(float64)
if !ok {
// Assume this is a string returning Failure. Retry.
Logf("After %v expected status to be a float64 but got %v of type %T", time.Since(start), statusIntf, statusIntf)
continue
return false, nil
}
break
}
return true, nil
}))
Expect(err).NotTo(HaveOccurred())
if int(statusCode) != 200 {
Failf("Elasticsearch cluster has a bad status: %v", statusCode)
Expand Down Expand Up @@ -233,7 +237,8 @@ func ClusterLevelLoggingWithElasticsearch(c *client.Client) {
By("Checking all the log lines were ingested into Elasticsearch")
missing := 0
expected := nodeCount * countTo
for start := time.Now(); time.Since(start) < graceTime; time.Sleep(10 * time.Second) {
start = time.Now()
expectNoError(wait.Poll(10*time.Second, graceTime, func() (bool, error) {
// Ask Elasticsearch to return all the log lines that were tagged with the underscore
// verison of the name. Ask for twice as many log lines as we expect to check for
// duplication bugs.
Expand All @@ -248,13 +253,13 @@ func ClusterLevelLoggingWithElasticsearch(c *client.Client) {
DoRaw()
if err != nil {
Logf("After %v failed to make proxy call to elasticsearch-logging: %v", time.Since(start), err)
continue
return false, nil
}

response, err := bodyToJSON(body)
if err != nil {
Logf("After %v failed to unmarshal response: %v", time.Since(start), err)
continue
return false, nil
}
hits, ok := response["hits"].(map[string]interface{})
if !ok {
Expand All @@ -263,17 +268,17 @@ func ClusterLevelLoggingWithElasticsearch(c *client.Client) {
totalF, ok := hits["total"].(float64)
if !ok {
Logf("After %v hits[total] not of the expected type: %T", time.Since(start), hits["total"])
continue
return false, nil
}
total := int(totalF)
if total < expected {
Logf("After %v expecting to find %d log lines but saw only %d", time.Since(start), expected, total)
continue
return false, nil
}
h, ok := hits["hits"].([]interface{})
if !ok {
Logf("After %v hits not of the expected type: %T", time.Since(start), hits["hits"])
continue
return false, nil
}
// Initialize data-structure for observing counts.
observed := make([][]int, nodeCount)
Expand Down Expand Up @@ -329,10 +334,10 @@ func ClusterLevelLoggingWithElasticsearch(c *client.Client) {
}
if missing != 0 {
Logf("After %v still missing %d log lines", time.Since(start), missing)
continue
return false, nil
}
Logf("After %s found all %d log lines", time.Since(start), expected)
return
}
return true, nil
}))
Failf("Failed to find all %d log lines", expected)
}
10 changes: 6 additions & 4 deletions test/e2e/kubectl.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
Expand Down Expand Up @@ -144,13 +145,14 @@ func validateGuestbookApp(c *client.Client, ns string) {

// Returns whether received expected response from guestbook on time.
func waitForGuestbookResponse(c *client.Client, cmd, arg, expectedResponse string, timeout time.Duration, ns string) bool {
for start := time.Now(); time.Since(start) < timeout; time.Sleep(5 * time.Second) {
expectNoError(wait.Poll(5*time.Second, timeout, func() (bool, error) {
res, err := makeRequestToGuestbook(c, cmd, arg, ns)
if err == nil && res == expectedResponse {
return true
return true, nil
}
}
return false
return false, nil
}))
return true
}

func makeRequestToGuestbook(c *client.Client, cmd, value string, ns string) (string, error) {
Expand Down
6 changes: 4 additions & 2 deletions test/e2e/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
Expand Down Expand Up @@ -118,7 +119,7 @@ func playWithRC(c *client.Client, wg *sync.WaitGroup, ns, name string, size int)
defer wg.Done()
rcExist := false
// Once every 1-2 minutes perform resize of RC.
for start := time.Now(); time.Since(start) < simulationTime; time.Sleep(time.Duration(60+rand.Intn(60)) * time.Second) {
expectNoError(wait.Poll(time.Duration(60+rand.Intn(60))*time.Second, simulationTime, func() (bool, error) {
if !rcExist {
expectNoError(RunRC(c, name, ns, image, size), fmt.Sprintf("creating rc %s in namespace %s", name, ns))
rcExist = true
Expand All @@ -131,7 +132,8 @@ func playWithRC(c *client.Client, wg *sync.WaitGroup, ns, name string, size int)
expectNoError(DeleteRC(c, ns, name), fmt.Sprintf("deleting rc %s in namespace %s", name, ns))
rcExist = false
}
}
return false, nil
}))
if rcExist {
expectNoError(DeleteRC(c, ns, name), fmt.Sprintf("deleting rc %s in namespace %s after test completion", name, ns))
}
Expand Down
14 changes: 6 additions & 8 deletions test/e2e/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
influxdb "github.com/influxdb/influxdb/client"

. "github.com/onsi/ginkgo"
Expand Down Expand Up @@ -223,15 +224,12 @@ func testMonitoringUsingHeapsterInfluxdb(c *client.Client) {

expectedNodes, err := getAllNodesInCluster(c)
expectNoError(err)
startTime := time.Now()
for {

expectNoError(wait.Poll(sleepBetweenAttempts, testTimeout, func() (bool, error) {
if validatePodsAndNodes(influxdbClient, expectedPods, expectedNodes) {
return
return true, nil
}
if time.Since(startTime) >= testTimeout {
break
}
time.Sleep(sleepBetweenAttempts)
}
return false, nil
}))
Failf("monitoring using heapster and influxdb test failed")
}
19 changes: 11 additions & 8 deletions test/e2e/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
Expand Down Expand Up @@ -108,14 +110,14 @@ var _ = Describe("PD", func() {
expectNoError(podClient.Delete(host1Pod.Name, nil), "Failed to delete host1Pod")

By(fmt.Sprintf("deleting PD %q", diskName))
for start := time.Now(); time.Since(start) < 180*time.Second; time.Sleep(5 * time.Second) {
expectNoError(wait.Poll(5*time.Second, 180*time.Second, func() (bool, error) {
if err = deletePD(diskName); err != nil {
Logf("Couldn't delete PD. Sleeping 5 seconds (%v)", err)
continue
return false, nil
}
Logf("Deleted PD %v", diskName)
break
}
return true, nil
}))
expectNoError(err, "Error deleting PD")

return
Expand Down Expand Up @@ -176,13 +178,14 @@ var _ = Describe("PD", func() {
expectNoError(podClient.Delete(host1ROPod.Name, nil), "Failed to delete host1ROPod")

By(fmt.Sprintf("deleting PD %q", diskName))
for start := time.Now(); time.Since(start) < 180*time.Second; time.Sleep(5 * time.Second) {

expectNoError(wait.Poll(5*time.Second, 180*time.Second, func() (bool, error) {
if err = deletePD(diskName); err != nil {
Logf("Couldn't delete PD. Sleeping 5 seconds")
continue
return false, nil
}
break
}
return true, nil
}))
expectNoError(err, "Error deleting PD")
})
})
Expand Down
27 changes: 13 additions & 14 deletions test/e2e/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,22 +98,21 @@ func testHostIP(c *client.Client, pod *api.Pod) {
err = waitForPodRunningInNamespace(c, pod.Name, ns)
Expect(err).NotTo(HaveOccurred())
// Try to make sure we get a hostIP for each pod.
hostIPTimeout := 2 * time.Minute
t := time.Now()
for {
p, err := podClient.Get(pod.Name)

var (
hostIPTimeout = 2 * time.Minute
pods *api.Pod
)
expectNoError(wait.Poll(5*time.Second, hostIPTimeout, func() (bool, error) {
pods, err = podClient.Get(pod.Name)
Expect(err).NotTo(HaveOccurred())
if p.Status.HostIP != "" {
Logf("Pod %s has hostIP: %s", p.Name, p.Status.HostIP)
break
}
if time.Since(t) >= hostIPTimeout {
Failf("Gave up waiting for hostIP of pod %s after %v seconds",
p.Name, time.Since(t).Seconds())
if pods.Status.HostIP != "" {
Logf("Pod %s has hostIP: %s", pods.Name, pods.Status.HostIP)
return true, nil
}
Logf("Retrying to get the hostIP of pod %s", p.Name)
time.Sleep(5 * time.Second)
}
Logf("Retrying to get the hostIP of pod %s", pods.Name)
return false, nil
}))
}

var _ = Describe("Pods", func() {
Expand Down
20 changes: 7 additions & 13 deletions test/e2e/rc.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,22 +111,16 @@ func ServeImageOrFail(c *client.Client, test string, image string) {
// List the pods, making sure we observe all the replicas.
listTimeout := time.Minute
label := labels.SelectorFromSet(labels.Set(map[string]string{"name": name}))
pods, err := c.Pods(ns).List(label, fields.Everything())
Expect(err).NotTo(HaveOccurred())
t := time.Now()
for {

var pods *api.PodList
expectNoError(wait.Poll(5*time.Second, listTimeout, func() (bool, error) {
pods, err = c.Pods(ns).List(label, fields.Everything())
Logf("Controller %s: Found %d pods out of %d", name, len(pods.Items), replicas)
if len(pods.Items) == replicas {
break
}
if time.Since(t) > listTimeout {
Failf("Controller %s: Gave up waiting for %d pods to come up after seeing only %d pods after %v seconds",
name, replicas, len(pods.Items), time.Since(t).Seconds())
return true, nil
}
time.Sleep(5 * time.Second)
pods, err = c.Pods(ns).List(label, fields.Everything())
Expect(err).NotTo(HaveOccurred())
}
return false, nil
}))

By("Ensuring each pod is running")

Expand Down
14 changes: 8 additions & 6 deletions test/e2e/reboot.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
Expand Down Expand Up @@ -255,11 +256,11 @@ func waitForNodeToBeNotReady(c *client.Client, name string, timeout time.Duratio
// ready or unknown).
func waitForNodeToBe(c *client.Client, name string, wantReady bool, timeout time.Duration) bool {
Logf("Waiting up to %v for node %s readiness to be %t", timeout, name, wantReady)
for start := time.Now(); time.Since(start) < timeout; time.Sleep(poll) {
expectNoError(wait.Poll(poll, timeout, func() (bool, error) {
node, err := c.Nodes().Get(name)
if err != nil {
Logf("Couldn't get node %s", name)
continue
return false, nil
}

// Check the node readiness condition (logging all).
Expand All @@ -270,10 +271,11 @@ func waitForNodeToBe(c *client.Client, name string, wantReady bool, timeout time
// matches as desired.
if cond.Type == api.NodeReady && (cond.Status == api.ConditionTrue) == wantReady {
Logf("Successfully found node %s readiness to be %t", name, wantReady)
return true
return true, nil
}

}
}
Logf("Node %s didn't reach desired readiness (%t) within %v", name, wantReady, timeout)
return false
return false, nil
}))
return true
}