Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Stackdriver Logging e2e tests using PubSub #45255

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions Godeps/Godeps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 35 additions & 0 deletions Godeps/LICENSES

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions test/e2e/cluster-logging/BUILD
Expand Up @@ -27,10 +27,12 @@ go_library(
"//vendor/golang.org/x/net/context:go_default_library",
"//vendor/golang.org/x/oauth2/google:go_default_library",
"//vendor/google.golang.org/api/logging/v2beta1:go_default_library",
"//vendor/google.golang.org/api/pubsub/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/fields:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/util/integer:go_default_library",
],
)
Expand Down
7 changes: 4 additions & 3 deletions test/e2e/cluster-logging/es.go
Expand Up @@ -39,10 +39,11 @@ var _ = framework.KubeDescribe("Cluster level logging using Elasticsearch [Featu
It("should check that logs from containers are ingested into Elasticsearch", func() {
podName := "synthlogger"
esLogsProvider, err := newEsLogsProvider(f)
framework.ExpectNoError(err, "Failed to create GCL logs provider")
framework.ExpectNoError(err, "Failed to create Elasticsearch logs provider")

err = esLogsProvider.EnsureWorking()
framework.ExpectNoError(err, "Elasticsearch is not working")
err = esLogsProvider.Init()
defer esLogsProvider.Cleanup()
framework.ExpectNoError(err, "Failed to init Elasticsearch logs provider")

err = ensureSingleFluentdOnEachNode(f, esLogsProvider.FluentdApplicationName())
framework.ExpectNoError(err, "Fluentd deployed incorrectly")
Expand Down
34 changes: 12 additions & 22 deletions test/e2e/cluster-logging/es_utils.go
Expand Up @@ -46,12 +46,8 @@ func newEsLogsProvider(f *framework.Framework) (*esLogsProvider, error) {
return &esLogsProvider{Framework: f}, nil
}

func (logsProvider *esLogsProvider) FluentdApplicationName() string {
return "fluentd-es"
}

// Ensures that elasticsearch is running and ready to serve requests
func (logsProvider *esLogsProvider) EnsureWorking() error {
func (logsProvider *esLogsProvider) Init() error {
f := logsProvider.Framework
// Check for the existence of the Elasticsearch service.
By("Checking the Elasticsearch service exists.")
Expand Down Expand Up @@ -157,7 +153,11 @@ func (logsProvider *esLogsProvider) EnsureWorking() error {
return nil
}

func (logsProvider *esLogsProvider) ReadEntries(pod *loggingPod) []*logEntry {
func (logsProvider *esLogsProvider) Cleanup() {
// Nothing to do
}

func (logsProvider *esLogsProvider) ReadEntries(pod *loggingPod) []logEntry {
f := logsProvider.Framework

proxyRequest, errProxy := framework.GetServicesProxyRequest(f.ClientSet, f.ClientSet.Core().RESTClient().Get())
Expand Down Expand Up @@ -202,7 +202,7 @@ func (logsProvider *esLogsProvider) ReadEntries(pod *loggingPod) []*logEntry {
return nil
}

entries := []*logEntry{}
entries := []logEntry{}
// Iterate over the hits and populate the observed array.
for _, e := range h {
l, ok := e.(map[string]interface{})
Expand All @@ -223,22 +223,12 @@ func (logsProvider *esLogsProvider) ReadEntries(pod *loggingPod) []*logEntry {
continue
}

timestampString, ok := source["@timestamp"].(string)
if !ok {
framework.Logf("Timestamp not of the expected type: %T", source["@timestamp"])
continue
}
timestamp, err := time.Parse(time.RFC3339, timestampString)
if err != nil {
framework.Logf("Timestamp was not in correct format: %s", timestampString)
continue
}

entries = append(entries, &logEntry{
Payload: msg,
Timestamp: timestamp,
})
entries = append(entries, logEntry{Payload: msg})
}

return entries
}

func (logsProvider *esLogsProvider) FluentdApplicationName() string {
return "fluentd-es"
}
5 changes: 3 additions & 2 deletions test/e2e/cluster-logging/sd.go
Expand Up @@ -39,8 +39,9 @@ var _ = framework.KubeDescribe("Cluster level logging using GCL", func() {
gclLogsProvider, err := newGclLogsProvider(f)
framework.ExpectNoError(err, "Failed to create GCL logs provider")

err = gclLogsProvider.EnsureWorking()
framework.ExpectNoError(err, "GCL is not working")
err = gclLogsProvider.Init()
defer gclLogsProvider.Cleanup()
framework.ExpectNoError(err, "Failed to init GCL logs provider")

err = ensureSingleFluentdOnEachNode(f, gclLogsProvider.FluentdApplicationName())
framework.ExpectNoError(err, "Fluentd deployed incorrectly")
Expand Down
18 changes: 10 additions & 8 deletions test/e2e/cluster-logging/sd_load.go
Expand Up @@ -38,13 +38,17 @@ var _ = framework.KubeDescribe("Cluster level logging using GCL [Feature:Stackdr
gclLogsProvider, err := newGclLogsProvider(f)
framework.ExpectNoError(err, "Failed to create GCL logs provider")

err = gclLogsProvider.Init()
defer gclLogsProvider.Cleanup()
framework.ExpectNoError(err, "Failed to init GCL logs provider")

nodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet).Items
nodeCount := len(nodes)
podCount := 30 * nodeCount
loggingDuration := 10 * time.Minute
linesPerSecond := 1000 * nodeCount
linesPerPod := linesPerSecond * int(loggingDuration.Seconds()) / podCount
ingestionTimeout := 60 * time.Minute
ingestionTimeout := 20 * time.Minute
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this enough?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be, since we start ingesting log entries as soon as they start flowing. From the manual runs that's more than enough, but I don't know of any guarantees/SLOs here (docs say log entries should appear "right away". I suggest leaving this as it is and then applying a fix if that's not enough, though I think timing out would indicate a problem with fluentd not being performant enough, rather than test being too restrictive on time.


By("Running logs generator pods")
pods := []*loggingPod{}
Expand All @@ -56,9 +60,6 @@ var _ = framework.KubeDescribe("Cluster level logging using GCL [Feature:Stackdr
defer f.PodClient().Delete(podName, &meta_v1.DeleteOptions{})
}

By("Waiting for pods to succeed")
time.Sleep(loggingDuration)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like this :D


By("Waiting for all log lines to be ingested")
config := &loggingTestConfig{
LogsProvider: gclLogsProvider,
Expand All @@ -79,12 +80,16 @@ var _ = framework.KubeDescribe("Cluster level logging using GCL [Feature:Stackdr
gclLogsProvider, err := newGclLogsProvider(f)
framework.ExpectNoError(err, "Failed to create GCL logs provider")

err = gclLogsProvider.Init()
defer gclLogsProvider.Cleanup()
framework.ExpectNoError(err, "Failed to init GCL logs provider")

nodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet).Items
maxPodCount := 10
jobDuration := 1 * time.Minute
linesPerPodPerSecond := 100
testDuration := 10 * time.Minute
ingestionTimeout := 60 * time.Minute
ingestionTimeout := 20 * time.Minute

podRunDelay := time.Duration(int64(jobDuration) / int64(maxPodCount))
podRunCount := int(testDuration.Seconds())/int(podRunDelay.Seconds()) - 1
Expand All @@ -102,9 +107,6 @@ var _ = framework.KubeDescribe("Cluster level logging using GCL [Feature:Stackdr
time.Sleep(podRunDelay)
}

By("Waiting for the last pods to finish")
time.Sleep(jobDuration)

By("Waiting for all log lines to be ingested")
config := &loggingTestConfig{
LogsProvider: gclLogsProvider,
Expand Down