Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add validation for fluentd pods #18448

Merged
merged 1 commit into from
Dec 15, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions cluster/saltbase/salt/fluentd-es/fluentd-es.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ kind: Pod
metadata:
name: fluentd-elasticsearch
namespace: kube-system
labels:
k8s-app: fluentd-logging
spec:
containers:
- name: fluentd-elasticsearch
Expand Down
2 changes: 2 additions & 0 deletions cluster/saltbase/salt/fluentd-gcp/fluentd-gcp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ kind: Pod
metadata:
name: fluentd-cloud-logging
namespace: kube-system
labels:
k8s-app: fluentd-logging
spec:
containers:
- name: fluentd-cloud-logging
Expand Down
2 changes: 2 additions & 0 deletions docs/getting-started-guides/logging.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ kind: Pod
metadata:
name: fluentd-cloud-logging
namespace: kube-system
labels:
k8s-app: fluentd-logging
spec:
containers:
- name: fluentd-cloud-logging
Expand Down
57 changes: 51 additions & 6 deletions test/e2e/es_cluster_logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ var _ = Describe("Cluster level logging using Elasticsearch", func() {
})

const (
esKey = "k8s-app"
esValue = "elasticsearch-logging"
k8sAppKey = "k8s-app"
esValue = "elasticsearch-logging"
fluentdValue = "fluentd-logging"
)

func bodyToJSON(body []byte) (map[string]interface{}, error) {
Expand All @@ -59,6 +60,15 @@ func bodyToJSON(body []byte) (map[string]interface{}, error) {
return r, nil
}

func nodeInNodeList(nodeName string, nodeList *api.NodeList) bool {
for _, node := range nodeList.Items {
if nodeName == node.Name {
return true
}
}
return false
}

// ClusterLevelLoggingWithElasticsearch is an end to end test for cluster level logging.
func ClusterLevelLoggingWithElasticsearch(f *Framework) {
// graceTime is how long to keep retrying requests for status information.
Expand All @@ -83,7 +93,7 @@ func ClusterLevelLoggingWithElasticsearch(f *Framework) {

// Wait for the Elasticsearch pods to enter the running state.
By("Checking to make sure the Elasticsearch pods are running")
label := labels.SelectorFromSet(labels.Set(map[string]string{esKey: esValue}))
label := labels.SelectorFromSet(labels.Set(map[string]string{k8sAppKey: esValue}))
options := api.ListOptions{LabelSelector: label}
pods, err := f.Client.Pods(api.NamespaceSystem).List(options)
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -152,13 +162,14 @@ func ClusterLevelLoggingWithElasticsearch(f *Framework) {
Resource("services").
Name("elasticsearch-logging").
Suffix("_cluster/health").
Param("health", "pretty").
Param("level", "indices").
DoRaw()
if err != nil {
continue
}
health, err := bodyToJSON(body)
if err != nil {
Logf("Bad json response from elasticsearch: %v", err)
continue
}
statusIntf, ok := health["status"]
Expand All @@ -168,7 +179,7 @@ func ClusterLevelLoggingWithElasticsearch(f *Framework) {
}
status := statusIntf.(string)
if status != "green" && status != "yellow" {
Logf("Cluster health has bad status: %s", status)
Logf("Cluster health has bad status: %v", health)
continue
}
if err == nil && ok {
Expand Down Expand Up @@ -202,6 +213,33 @@ func ClusterLevelLoggingWithElasticsearch(f *Framework) {
}
Logf("Found %d healthy nodes.", len(nodes.Items))

// Wait for the Fluentd pods to enter the running state.
By("Checking to make sure the Fluentd pod are running on each healthy node")
label = labels.SelectorFromSet(labels.Set(map[string]string{k8sAppKey: fluentdValue}))
options = api.ListOptions{LabelSelector: label}
pods, err = f.Client.Pods(api.NamespaceSystem).List(options)
Expect(err).NotTo(HaveOccurred())
for _, pod := range pods.Items {
if nodeInNodeList(pod.Spec.NodeName, nodes) {
err = waitForPodRunningInNamespace(f.Client, pod.Name, api.NamespaceSystem)
Expect(err).NotTo(HaveOccurred())
}
}

// Check if each healthy node has fluentd running on it
for _, node := range nodes.Items {
exists := false
for _, pod := range pods.Items {
if pod.Spec.NodeName == node.Name {
exists = true
break
}
}
if !exists {
Failf("Node %v does not have fluentd pod running on it.", node.Name)
}
}

// Create a unique root name for the resources in this test to permit
// parallel executions of this test.
// Use a unique namespace for the resources created in this test.
Expand Down Expand Up @@ -268,7 +306,7 @@ func ClusterLevelLoggingWithElasticsearch(f *Framework) {
for start := time.Now(); time.Since(start) < ingestionTimeout; time.Sleep(10 * time.Second) {

// Debugging code to report the status of the elasticsearch logging endpoints.
selector := labels.Set{esKey: esValue}.AsSelector()
selector := labels.Set{k8sAppKey: esValue}.AsSelector()
options := api.ListOptions{LabelSelector: selector}
esPods, err := f.Client.Pods(api.NamespaceSystem).List(options)
if err != nil {
Expand Down Expand Up @@ -386,6 +424,13 @@ func ClusterLevelLoggingWithElasticsearch(f *Framework) {
for n := range missingPerNode {
if missingPerNode[n] > 0 {
Logf("Node %d is missing %d logs", n, missingPerNode[n])
opts := &api.PodLogOptions{}
body, err = f.Client.Pods(ns).GetLogs(podNames[n], opts).DoRaw()
if err != nil {
Logf("Cannot get logs from pod %v", podNames[n])
continue
}
Logf("Pod %s has the following logs: %s", podNames[n], body)
}
}
Failf("Failed to find all %d log lines", expected)
Expand Down