Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion hack/testing-olm/test-030-collection.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ os::test::junit::declare_suite_start "[ClusterLogging] Collection"

start_seconds=$(date +%s)

TEST_DIR=${TEST_DIR:-'./test/e2e/collection/*/'}
TEST_DIR=${TEST_DIR:-'./test/e2e/collection/*'}
ARTIFACT_DIR=${ARTIFACT_DIR:-"$repo_dir/_output"}
if [ ! -d $ARTIFACT_DIR ] ; then
mkdir -p $ARTIFACT_DIR
Expand Down
13 changes: 13 additions & 0 deletions test/e2e/collection/collector_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package collection

import (
"testing"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

func TestCollector(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "ClusterLogging E2E Suite - Collectors")
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package fluentd
package collection

import (
"fmt"
Expand Down
13 changes: 0 additions & 13 deletions test/e2e/collection/fluentd/fluentd_suite_test.go

This file was deleted.

124 changes: 0 additions & 124 deletions test/e2e/collection/fluentd/namespace_filtering_test.go

This file was deleted.

143 changes: 143 additions & 0 deletions test/e2e/collection/namespace_filtering_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package collection

import (
"errors"
"fmt"
"path/filepath"
"runtime"

"github.com/openshift/cluster-logging-operator/internal/constants"

framework "github.com/openshift/cluster-logging-operator/test/framework/e2e"

"github.com/ViaQ/logerr/log"
. "github.com/onsi/ginkgo"
. "github.com/onsi/ginkgo/extensions/table"
. "github.com/onsi/gomega"
logging "github.com/openshift/cluster-logging-operator/apis/logging/v1"
"github.com/openshift/cluster-logging-operator/test/client"
"github.com/openshift/cluster-logging-operator/test/helpers"
elasticsearch "github.com/openshift/elasticsearch-operator/apis/logging/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var _ = Describe("[Collection] Namespace filtering", func() {
_, filename, _, _ := runtime.Caller(0)
log.Info("Running ", "filename", filename)
var (
err error
pipelineSecret *corev1.Secret
elasticsearch *elasticsearch.Elasticsearch
e2e = framework.NewE2ETestFramework()
rootDir string
)
var appclient1 *client.Test
var appclient2 *client.Test

BeforeEach(func() {
appclient1 = client.NewTest()
appclient2 = client.NewTest()
})
BeforeEach(func() {
if err := e2e.DeployLogGeneratorWithNamespace(appclient1.NS.Name); err != nil {
Fail(fmt.Sprintf("Timed out waiting for the log generator 1 to deploy: %v", err))
}
if err := e2e.DeployLogGeneratorWithNamespace(appclient2.NS.Name); err != nil {
Fail(fmt.Sprintf("Timed out waiting for the log generator 2 to deploy: %v", err))
}
rootDir = filepath.Join(filepath.Dir(filename), "..", "..", "..", "/")
if elasticsearch, pipelineSecret, err = e2e.DeployAnElasticsearchCluster(rootDir); err != nil {
Fail(fmt.Sprintf("Unable to deploy an elastic instance: %v", err))
}

forwarder := &logging.ClusterLogForwarder{
TypeMeta: metav1.TypeMeta{
Kind: logging.ClusterLogForwarderKind,
APIVersion: logging.GroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Name: "instance",
},
Spec: logging.ClusterLogForwarderSpec{
Inputs: []logging.InputSpec{
{
Name: "application-logs",
Application: &logging.Application{
Namespaces: []string{appclient1.NS.Name},
},
},
},
Outputs: []logging.OutputSpec{
{
Name: elasticsearch.Name,
Secret: &logging.OutputSecretSpec{
Name: pipelineSecret.ObjectMeta.Name,
},
Type: logging.OutputTypeElasticsearch,
URL: fmt.Sprintf("https://%s.%s.svc:9200", elasticsearch.Name, elasticsearch.Namespace),
},
},
Pipelines: []logging.PipelineSpec{
{
Name: "test-app",
OutputRefs: []string{elasticsearch.Name},
InputRefs: []string{"application-logs"},
},
},
},
}
if err := e2e.CreateClusterLogForwarder(forwarder); err != nil {
Fail(fmt.Sprintf("Unable to create an instance of clusterlogforwarder: %v", err))
}
})

GetNamespaces := func() ([]string, error) {
logs, err := e2e.LogStores[elasticsearch.GetName()].ApplicationLogs(framework.DefaultWaitForLogsTimeout)
if err != nil {
return nil, errors.New("Getting error in application logs")
}
val := len(logs)
namespaceMap := make(map[string]bool)
var namespaceList []string
// Parse each document and extract namespace value
if val > 0 {
for _, document := range logs {
namespace_value := document.Kubernetes.NamespaceName
if _, found := namespaceMap[namespace_value]; !found {
namespaceMap[namespace_value] = true
namespaceList = append(namespaceList, namespace_value)
}
}
}
return namespaceList, nil
}

DescribeTable("Running collector tests",
func(collectorType helpers.LogComponentType) {
cr := helpers.NewClusterLogging(collectorType)
if err := e2e.CreateClusterLogging(cr); err != nil {
Fail(fmt.Sprintf("Unable to create an instance of cluster logging: %v", err))
}
if err := e2e.WaitFor(collectorType); err != nil {
Fail(fmt.Sprintf("Failed waiting for component %s to be ready: %v", collectorType, err))
}
Expect(e2e.LogStores[elasticsearch.GetName()].HasApplicationLogs(framework.DefaultWaitForLogsTimeout)).To(BeTrue(), "Expected to find stored application logs")

namespaceList, err := GetNamespaces()
Expect(err).To(BeNil(), fmt.Sprintf("Error fetching logs: %v", err))
Expect(namespaceList).NotTo(BeNil())
Expect(len(namespaceList)).To(Equal(1))
Expect(namespaceList[0]).To(Equal(appclient1.NS.Name))
},
Entry("for fluentd collector", helpers.ComponentTypeCollectorFluentd),
Entry("for vector collector", helpers.ComponentTypeCollectorVector),
)

AfterEach(func() {
appclient1.Close()
appclient2.Close()
e2e.Cleanup()
e2e.WaitForCleanupCompletion(constants.OpenshiftNS, []string{constants.CollectorName, "elasticsearch"})
}, framework.DefaultCleanUpTimeout)
})
54 changes: 46 additions & 8 deletions test/framework/e2e/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,27 @@ import (
"encoding/json"
"errors"
"fmt"

"github.com/openshift/cluster-logging-operator/internal/constants"
"github.com/openshift/cluster-logging-operator/test/helpers/types"

"strconv"
"strings"
"time"

corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"github.com/openshift/cluster-logging-operator/test/helpers/types"

clolog "github.com/ViaQ/logerr/log"
k8shandler "github.com/openshift/cluster-logging-operator/internal/k8shandler"
"github.com/openshift/cluster-logging-operator/internal/k8shandler/indexmanagement"
"github.com/openshift/cluster-logging-operator/internal/utils"
elastichelper "github.com/openshift/cluster-logging-operator/test/helpers/elasticsearch"
elasticsearch "github.com/openshift/elasticsearch-operator/apis/logging/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
)

const (
Expand Down Expand Up @@ -93,7 +96,42 @@ type ElasticLogStore struct {
}

func (es *ElasticLogStore) ApplicationLogs(timeToWait time.Duration) (types.Logs, error) {
panic("Method not implemented")
app_index := "app-000001"
client, _ := NewKubeClient()
options := metav1.ListOptions{
LabelSelector: "component=elasticsearch",
}
pods, err := client.CoreV1().Pods(constants.OpenshiftNS).List(context.TODO(), options)
if err != nil {
return nil, err
}
if len(pods.Items) == 0 {
return nil, errors.New("No pods found for elasticsearch")
}

hits, err := elastichelper.GetDocsFromElasticSearch(constants.OpenshiftNS, &pods.Items[0], "elasticsearch", app_index, true)

if err != nil {
return nil, err
}
var allLogs types.Logs
var logDoc types.AllLog
for i := 0; i < len(hits); i++ {
hit := hits[i].(map[string]interface{})
jsonHit, err := json.Marshal(hit["_source"])
if err == nil {
err = json.Unmarshal(jsonHit, &logDoc)
if err == nil {
allLogs = append(allLogs, logDoc)
} else {
clolog.V(3).Info("UnMarshall failed", "err", err)
}
} else {
clolog.V(3).Info("Marshall failed", "err", err)
}
}
clolog.V(3).Info("Returning", "logs", allLogs)
return allLogs, err
}

func (es *ElasticLogStore) HasInfraStructureLogs(timeToWait time.Duration) (bool, error) {
Expand Down
Loading