Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster Verification Framework #23771

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
90 changes: 41 additions & 49 deletions test/e2e/examples.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (

"k8s.io/kubernetes/pkg/api"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/test/e2e/framework"

. "github.com/onsi/ginkgo"
Expand All @@ -40,6 +39,14 @@ const (

var _ = framework.KubeDescribe("[Feature:Example]", func() {
f := framework.NewDefaultFramework("examples")
// Customized ForEach wrapper for this test.
forEachPod := func(selectorKey string, selectorValue string, fn func(api.Pod)) {
f.NewClusterVerification(
framework.PodStateVerification{
Selectors: map[string]string{selectorKey: selectorValue},
ValidPhases: []api.PodPhase{api.PodRunning},
}).ForEach(fn)
}
var c *client.Client
var ns string
BeforeEach(func() {
Expand Down Expand Up @@ -85,13 +92,13 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() {

By("checking up the services")
checkAllLogs := func() {
forEachPod(c, ns, "name", "redis", func(pod api.Pod) {
forEachPod("name", "redis", func(pod api.Pod) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why was c & ns trim'd out of the signature. Seems like they are still leveraged in the function are now relying on scope.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • entirely on scope. yes. This is a local wrapper for backward compat w the forEach that incorrectly depended on examples.go. So this fixes forEachPod cleanup #23730 .
  • new framework basically allows anyone to build wrapper functions (if they want them - i think now we really dont need em unless its a lonnnng test like kubectl.go) in 2 lines of code, so no need for forEachPod global wrapper with hidden semantics.
  • to answer the original question (why ns removed?): the framework knows how to iterate itself, there is no need to send a client/ns as those are 1-1 embedded in the framework itself.

if pod.Name != bootstrapPodName {
_, err := framework.LookForStringInLog(ns, pod.Name, "redis", expectedOnServer, serverStartTimeout)
Expect(err).NotTo(HaveOccurred())
}
})
forEachPod(c, ns, "name", "redis-sentinel", func(pod api.Pod) {
forEachPod("name", "redis-sentinel", func(pod api.Pod) {
if pod.Name != bootstrapPodName {
_, err := framework.LookForStringInLog(ns, pod.Name, "sentinel", expectedOnSentinel, serverStartTimeout)
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -124,7 +131,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() {
By("starting rabbitmq")
framework.RunKubectlOrDie("create", "-f", rabbitmqServiceYaml, nsFlag)
framework.RunKubectlOrDie("create", "-f", rabbitmqControllerYaml, nsFlag)
forEachPod(c, ns, "component", "rabbitmq", func(pod api.Pod) {
forEachPod("component", "rabbitmq", func(pod api.Pod) {
_, err := framework.LookForStringInLog(ns, pod.Name, "rabbitmq", "Server startup complete", serverStartTimeout)
Expect(err).NotTo(HaveOccurred())
})
Expand All @@ -133,22 +140,24 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() {

By("starting celery")
framework.RunKubectlOrDie("create", "-f", celeryControllerYaml, nsFlag)
forEachPod(c, ns, "component", "celery", func(pod api.Pod) {
forEachPod("component", "celery", func(pod api.Pod) {
_, err := framework.LookForStringInFile(ns, pod.Name, "celery", "/data/celery.log", " ready.", serverStartTimeout)
Expect(err).NotTo(HaveOccurred())
})

By("starting flower")
framework.RunKubectlOrDie("create", "-f", flowerServiceYaml, nsFlag)
framework.RunKubectlOrDie("create", "-f", flowerControllerYaml, nsFlag)
forEachPod(c, ns, "component", "flower", func(pod api.Pod) {
// Do nothing. just wait for it to be up and running.
forEachPod("component", "flower", func(pod api.Pod) {

})
forEachPod("component", "flower", func(pod api.Pod) {
content, err := makeHttpRequestToService(c, ns, "flower-service", "/", framework.EndpointRegisterTimeout)
Expect(err).NotTo(HaveOccurred())
if !strings.Contains(content, "<title>Celery Flower</title>") {
framework.Failf("Flower HTTP request failed")
}
})
content, err := makeHttpRequestToService(c, ns, "flower-service", "/", framework.EndpointRegisterTimeout)
Expect(err).NotTo(HaveOccurred())
if !strings.Contains(content, "<title>Celery Flower</title>") {
framework.Failf("Flower HTTP request failed")
}
})
})

Expand All @@ -172,7 +181,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() {
framework.Logf("Now polling for Master startup...")

// Only one master pod: But its a natural way to look up pod names.
forEachPod(c, ns, "component", "spark-master", func(pod api.Pod) {
forEachPod("component", "spark-master", func(pod api.Pod) {
framework.Logf("Now waiting for master to startup in %v", pod.Name)
_, err := framework.LookForStringInLog(ns, pod.Name, "spark-master", "Starting Spark master at", serverStartTimeout)
Expect(err).NotTo(HaveOccurred())
Expand All @@ -181,6 +190,12 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() {
By("waiting for master endpoint")
err := framework.WaitForEndpoint(c, ns, "spark-master")
Expect(err).NotTo(HaveOccurred())
forEachPod("component", "spark-master", func(pod api.Pod) {
_, maErr := framework.LookForStringInLog(f.Namespace.Name, pod.Name, "spark-master", "Starting Spark master at", serverStartTimeout)
if maErr != nil {
framework.Failf("Didn't find target string. error:", maErr)
}
})
}
worker := func() {
By("starting workers")
Expand All @@ -191,10 +206,13 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() {
// framework.ScaleRC(c, ns, "spark-worker-controller", 2, true)

framework.Logf("Now polling for worker startup...")
forEachPod(c, ns, "component", "spark-worker", func(pod api.Pod) {
_, err := framework.LookForStringInLog(ns, pod.Name, "spark-worker", "Successfully registered with master", serverStartTimeout)
Expect(err).NotTo(HaveOccurred())
})
// ScaleRC(c, ns, "spark-worker-controller", 2, true)
framework.Logf("Now polling for worker startup...")
forEachPod("component", "spark-worker",
func(pod api.Pod) {
_, slaveErr := framework.LookForStringInLog(ns, pod.Name, "spark-worker", "Successfully registered with master", serverStartTimeout)
Expect(slaveErr).NotTo(HaveOccurred())
})
}
// Run the worker verification after we turn up the master.
defer worker()
Expand Down Expand Up @@ -231,7 +249,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() {
// Create an RC with n nodes in it. Each node will then be verified.
By("Creating a Cassandra RC")
framework.RunKubectlOrDie("create", "-f", controllerYaml, nsFlag)
forEachPod(c, ns, "app", "cassandra", func(pod api.Pod) {
forEachPod("app", "cassandra", func(pod api.Pod) {
framework.Logf("Verifying pod %v ", pod.Name)
_, err = framework.LookForStringInLog(ns, pod.Name, "cassandra", "Listening for thrift clients", serverStartTimeout)
Expect(err).NotTo(HaveOccurred())
Expand All @@ -241,7 +259,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() {

By("Finding each node in the nodetool status lines")
output := framework.RunKubectlOrDie("exec", "cassandra", nsFlag, "--", "nodetool", "status")
forEachPod(c, ns, "app", "cassandra", func(pod api.Pod) {
forEachPod("app", "cassandra", func(pod api.Pod) {
if !strings.Contains(output, pod.Status.PodIP) {
framework.Failf("Pod ip %s not found in nodetool status", pod.Status.PodIP)
}
Expand Down Expand Up @@ -285,7 +303,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() {

By("starting workers")
framework.RunKubectlOrDie("create", "-f", workerControllerJson, nsFlag)
forEachPod(c, ns, "name", "storm-worker", func(pod api.Pod) {
forEachPod("name", "storm-worker", func(pod api.Pod) {
//do nothing, just wait for the pod to be running
})
// TODO: Add logging configuration to nimbus & workers images and then
Expand Down Expand Up @@ -412,7 +430,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() {
framework.RunKubectlOrDie("create", "-f", driverServiceYaml, nsFlag)
framework.RunKubectlOrDie("create", "-f", rethinkDbControllerYaml, nsFlag)
checkDbInstances := func() {
forEachPod(c, ns, "db", "rethinkdb", func(pod api.Pod) {
forEachPod("db", "rethinkdb", func(pod api.Pod) {
_, err := framework.LookForStringInLog(ns, pod.Name, "rethinkdb", "Server ready", serverStartTimeout)
Expect(err).NotTo(HaveOccurred())
})
Expand Down Expand Up @@ -451,7 +469,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() {
By("starting hazelcast")
framework.RunKubectlOrDie("create", "-f", serviceYaml, nsFlag)
framework.RunKubectlOrDie("create", "-f", controllerYaml, nsFlag)
forEachPod(c, ns, "name", "hazelcast", func(pod api.Pod) {
forEachPod("name", "hazelcast", func(pod api.Pod) {
_, err := framework.LookForStringInLog(ns, pod.Name, "hazelcast", "Members [1]", serverStartTimeout)
Expect(err).NotTo(HaveOccurred())
_, err = framework.LookForStringInLog(ns, pod.Name, "hazelcast", "is STARTED", serverStartTimeout)
Expand All @@ -463,7 +481,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() {

By("scaling hazelcast")
framework.ScaleRC(c, ns, "hazelcast", 2, true)
forEachPod(c, ns, "name", "hazelcast", func(pod api.Pod) {
forEachPod("name", "hazelcast", func(pod api.Pod) {
_, err := framework.LookForStringInLog(ns, pod.Name, "hazelcast", "Members [2]", serverStartTimeout)
Expect(err).NotTo(HaveOccurred())
})
Expand Down Expand Up @@ -501,29 +519,3 @@ func prepareResourceWithReplacedString(inputFile, old, new string) string {
podYaml := strings.Replace(string(data), old, new, 1)
return podYaml
}

func forEachPod(c *client.Client, ns, selectorKey, selectorValue string, fn func(api.Pod)) {
pods := []*api.Pod{}
for t := time.Now(); time.Since(t) < framework.PodListTimeout; time.Sleep(framework.Poll) {
selector := labels.SelectorFromSet(labels.Set(map[string]string{selectorKey: selectorValue}))
options := api.ListOptions{LabelSelector: selector}
podList, err := c.Pods(ns).List(options)
Expect(err).NotTo(HaveOccurred())
for _, pod := range podList.Items {
if pod.Status.Phase == api.PodPending || pod.Status.Phase == api.PodRunning {
pods = append(pods, &pod)
}
}
if len(pods) > 0 {
break
}
}
if pods == nil || len(pods) == 0 {
framework.Failf("No pods found")
}
for _, pod := range pods {
err := framework.WaitForPodRunningInNamespace(c, pod.Name, ns)
Expect(err).NotTo(HaveOccurred())
fn(*pod)
}
}
173 changes: 173 additions & 0 deletions test/e2e/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import (

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util/wait"
)

const (
Expand Down Expand Up @@ -423,3 +425,174 @@ func kubectlExec(namespace string, podName, containerName string, args ...string
func KubeDescribe(text string, body func()) bool {
return Describe("[k8s.io] "+text, body)
}

// PodStateVerification represents a verification of pod state.
// Any time you have a set of pods that you want to operate against or query,
// this struct can be used to declaratively identify those pods.
type PodStateVerification struct {
// Optional: only pods that have k=v labels will pass this filter.
Selectors map[string]string

// Required: The phases which are valid for your pod.
ValidPhases []api.PodPhase

// Optional: only pods passing this function will pass the filter
// Verify a pod.
// As an optimization, in addition to specfying filter (boolean),
// this function allows specifying an error as well.
// The error indicates that the polling of the pod spectrum should stop.
Verify func(api.Pod) (bool, error)

// Optional: only pods with this name will pass the filter.
PodName string
}

type ClusterVerification struct {
client *client.Client
namespace *api.Namespace // pointer rather than string, since ns isn't created until before each.
podState PodStateVerification
}

func (f *Framework) NewClusterVerification(filter PodStateVerification) *ClusterVerification {
return &ClusterVerification{
f.Client,
f.Namespace,
filter,
}
}

func passesPodNameFilter(pod api.Pod, name string) bool {
return name == "" || strings.Contains(pod.Name, name)
}

func passesVerifyFilter(pod api.Pod, verify func(p api.Pod) (bool, error)) (bool, error) {
if verify == nil {
return true, nil
} else {
verified, err := verify(pod)
// If an error is returned, by definition, pod verification fails
if err != nil {
return false, err
} else {
return verified, nil
}
}
}

func passesPhasesFilter(pod api.Pod, validPhases []api.PodPhase) bool {
passesPhaseFilter := false
for _, phase := range validPhases {
if pod.Status.Phase == phase {
passesPhaseFilter = true
}
}
return passesPhaseFilter
}

// filterLabels returns a list of pods which have labels.
func filterLabels(selectors map[string]string, cli *client.Client, ns string) (*api.PodList, error) {
var err error
var selector labels.Selector
var pl *api.PodList
// List pods based on selectors. This might be a tiny optimization rather then filtering
// everything manually.
if len(selectors) > 0 {
selector = labels.SelectorFromSet(labels.Set(selectors))
options := api.ListOptions{LabelSelector: selector}
pl, err = cli.Pods(ns).List(options)
} else {
pl, err = cli.Pods(ns).List(api.ListOptions{})
}
return pl, err
}

// filter filters pods which pass a filter. It can be used to compose
// the more useful abstractions like ForEach, WaitFor, and so on, which
// can be used directly by tests.
func (p *PodStateVerification) filter(c *client.Client, namespace *api.Namespace) ([]api.Pod, error) {
if len(p.ValidPhases) == 0 || namespace == nil {
panic(fmt.Errorf("Need to specify a valid pod phases (%v) and namespace (%v). ", p.ValidPhases, namespace))
}

ns := namespace.Name
pl, err := filterLabels(p.Selectors, c, ns) // Build an api.PodList to operate against.
Logf("Selector matched %v pods for %v", len(pl.Items), p.Selectors)
if len(pl.Items) == 0 || err != nil {
return pl.Items, err
}

unfilteredPods := pl.Items
filteredPods := []api.Pod{}
ReturnPodsSoFar:
// Next: Pod must match at least one of the states that the user specified
for _, pod := range unfilteredPods {
if !(passesPhasesFilter(pod, p.ValidPhases) && passesPodNameFilter(pod, p.PodName)) {
continue
}
passesVerify, err := passesVerifyFilter(pod, p.Verify)
if err != nil {
Logf("Error detected on %v : %v !", pod.Name, err)
break ReturnPodsSoFar
}
if passesVerify {
filteredPods = append(filteredPods, pod)
}
}
return filteredPods, err
}

// WaitFor waits for some minimum number of pods to be verified, according to the PodStateVerification
// definition.
func (cl *ClusterVerification) WaitFor(atLeast int, timeout time.Duration) ([]api.Pod, error) {
pods := []api.Pod{}
var returnedErr error

err := wait.Poll(1*time.Second, timeout, func() (bool, error) {
pods, returnedErr = cl.podState.filter(cl.client, cl.namespace)

// Failure
if returnedErr != nil {
Logf("Cutting polling short: We got an error from the pod filtering layer.")
// stop polling if the pod filtering returns an error. that should never happen.
// it indicates, for example, that the client is broken or something non-pod related.
return false, returnedErr
}
Logf("Found %v / %v", len(pods), atLeast)

// Success
if len(pods) >= atLeast {
return true, nil
}
// Keep trying...
return false, nil
})
Logf("WaitFor completed. Pods found = %v out of %v", timeout, len(pods), atLeast)
return pods, err
}

// WaitForOrFail provides a shorthand WaitFor with failure as an option if anything goes wrong.
func (cl *ClusterVerification) WaitForOrFail(atLeast int, timeout time.Duration) {
pods, err := cl.WaitFor(atLeast, timeout)
if err != nil || len(pods) < atLeast {
Failf("Verified %v of %v pods , error : %v", len(pods), atLeast, err)
}
}

// ForEach runs a function against every verifiable pod. Be warned that this doesn't wait for "n" pods to verifiy,
// so it may return very quickly if you have strict pod state requirements.
//
// For example, if you require at least 5 pods to be running before your test will pass,
// its smart to first call "clusterVerification.WaitFor(5)" before you call clusterVerification.ForEach.
func (cl *ClusterVerification) ForEach(podFunc func(api.Pod)) error {
pods, err := cl.podState.filter(cl.client, cl.namespace)
if err == nil {
Logf("ForEach: Found %v pods from the filter. Now looping through them.", len(pods))
for _, p := range pods {
podFunc(p)
}
} else {
Logf("ForEach: Something went wrong when filtering pods to execute against: %v", err)
}

return err
}