diff --git a/pkg/apis/logging/v1/cluster_log_forwarder_types.go b/pkg/apis/logging/v1/cluster_log_forwarder_types.go
index ce5e2a3793..5928693d79 100644
--- a/pkg/apis/logging/v1/cluster_log_forwarder_types.go
+++ b/pkg/apis/logging/v1/cluster_log_forwarder_types.go
@@ -85,17 +85,17 @@ type InputSpec struct {
// Application, if present, enables `application` logs.
//
// +optional
- Application *Application `json:"application"`
+ Application *Application `json:"application,omitempty"`
// Infrastructure, if present, enables `infrastructure` logs.
//
// +optional
- Infrastructure *Infrastructure `json:"infrastructure"`
+ Infrastructure *Infrastructure `json:"infrastructure,omitempty"`
// Audit, if present, enables `audit` logs.
//
// +optional
- Audit *Audit `json:"audit"`
+ Audit *Audit `json:"audit,omitempty"`
}
type Application struct {
@@ -141,7 +141,7 @@ type OutputSpec struct {
//
// +kubebuilder:validation:Pattern:=`^$|[a-zA-z]+:\/\/.*`
// +optional
- URL string `json:"url"`
+ URL string `json:"url,omitempty"`
OutputTypeSpec `json:",inline"`
diff --git a/pkg/generators/forwarding/fluentd/fluent_conf_test.go b/pkg/generators/forwarding/fluentd/fluent_conf_test.go
index 5b7f0dba96..7024de5337 100644
--- a/pkg/generators/forwarding/fluentd/fluent_conf_test.go
+++ b/pkg/generators/forwarding/fluentd/fluent_conf_test.go
@@ -71,7 +71,7 @@ var _ = Describe("Generating fluentd config", func() {
}
})
- It("should generats container source config for given namespaces only", func() {
+ It("should generate fluent config for sending given namespaces logs only to output", func() {
forwarder = &logging.ClusterLogForwarderSpec{
Outputs: []logging.OutputSpec{
{
@@ -99,38 +99,463 @@ var _ = Describe("Generating fluentd config", func() {
},
},
}
- inputs, namespaces := gatherSources(forwarder)
- results, err := generator.generateSource(inputs, namespaces)
+ results, err := generator.Generate(forwarder, forwarderSpec)
Expect(err).To(BeNil())
- Expect(results).To(HaveLen(1))
- Expect(results[0]).To(EqualTrimLines(`
-# container logs
-
- @type tail
- @id container-input
- path /var/log/containers/*_project1-namespace_*.log, /var/log/containers/*_project2-namespace_*.log
- exclude_path ["/var/log/containers/fluentd-*_openshift-logging_*.log", "/var/log/containers/elasticsearch-*_openshift-logging_*.log", "/var/log/containers/kibana-*_openshift-logging_*.log"]
- pos_file "/var/log/es-containers.log.pos"
- refresh_interval 5
- rotate_wait 5
- tag kubernetes.*
- read_from_head "true"
- @label @CONCAT
-
- @type multi_format
-
- format json
- time_format '%Y-%m-%dT%H:%M:%S.%N%Z'
- keep_time_key true
-
-
- format regexp
- expression /^(?
-
-
+ Expect(results).To(EqualTrimLines(`
+ ## CLO GENERATED CONFIGURATION ###
+ # This file is a copy of the fluentd configuration entrypoint
+ # which should normally be supplied in a configmap.
+
+
+ log_level "#{ENV['LOG_LEVEL'] || 'warn'}"
+
+
+ # In each section below, pre- and post- includes don't include anything initially;
+ # they exist to enable future additions to openshift conf as needed.
+
+ ## sources
+ ## ordered so that syslog always runs last...
+
+ @type prometheus
+ bind ''
+
+ enable true
+ certificate_path "#{ENV['METRICS_CERT'] || '/etc/fluent/metrics/tls.crt'}"
+ private_key_path "#{ENV['METRICS_KEY'] || '/etc/fluent/metrics/tls.key'}"
+
+
+
+
+ @type prometheus_monitor
+
+ hostname ${hostname}
+
+
+
+ # excluding prometheus_tail_monitor
+ # since it leaks namespace/pod info
+ # via file paths
+
+ # This is considered experimental by the repo
+
+ @type prometheus_output_monitor
+
+ hostname ${hostname}
+
+
+ # container logs
+
+ @type tail
+ @id container-input
+ path "/var/log/containers/*.log"
+ exclude_path ["/var/log/containers/fluentd-*_openshift-logging_*.log", "/var/log/containers/elasticsearch-*_openshift-logging_*.log", "/var/log/containers/kibana-*_openshift-logging_*.log"]
+ pos_file "/var/log/es-containers.log.pos"
+ refresh_interval 5
+ rotate_wait 5
+ tag kubernetes.*
+ read_from_head "true"
+ @label @CONCAT
+
+ @type multi_format
+
+ format json
+ time_format '%Y-%m-%dT%H:%M:%S.%N%Z'
+ keep_time_key true
+
+
+ format regexp
+ expression /^(?
+
+
+
+
+
+ #syslog input config here
+
+
+
+ # Relabel specific sources (e.g. logs.apps) to multiple pipelines
+
+
+
+ # Relabel specific pipelines to multiple, outputs (e.g. ES, kafka stores)
+
+
+ # Ship logs to specific outputs
+
`))
})
@@ -1429,7 +1854,7 @@ var _ = Describe("Generating fluentd config", func() {
@type tail
@id container-input
- path /var/log/containers/*_project1_*.log, /var/log/containers/*_project2_*.log
+ path "/var/log/containers/*.log"
exclude_path ["/var/log/containers/fluentd-*_openshift-logging_*.log", "/var/log/containers/elasticsearch-*_openshift-logging_*.log", "/var/log/containers/kibana-*_openshift-logging_*.log"]
pos_file "/var/log/es-containers.log.pos"
refresh_interval 5
@@ -1690,7 +2115,7 @@ var _ = Describe("Generating fluentd config", func() {
@type null
-
+
@type relabel
@label @_APPLICATION
diff --git a/pkg/generators/forwarding/fluentd/generators.go b/pkg/generators/forwarding/fluentd/generators.go
index 7ee355ed16..881e86f706 100644
--- a/pkg/generators/forwarding/fluentd/generators.go
+++ b/pkg/generators/forwarding/fluentd/generators.go
@@ -78,7 +78,7 @@ func (engine *ConfigGenerator) Generate(clfSpec *logging.ClusterLogForwarderSpec
routeMap = inputsToPipelines(clfSpec)
}
- sourceInputLabels, err = engine.generateSource(inputs, namespaces)
+ sourceInputLabels, err = engine.generateSource(inputs)
if err != nil {
logger.Tracef("Error generating source blocks: %v", err)
@@ -120,6 +120,7 @@ func (engine *ConfigGenerator) Generate(clfSpec *logging.ClusterLogForwarderSpec
SourceToPipelineLabels []string
PipelinesToOutputLabels []string
OutputLabels []string
+ AppNamespaces []string
}{
engine.includeLegacyForwardConfig,
engine.includeLegacySyslogConfig,
@@ -130,6 +131,7 @@ func (engine *ConfigGenerator) Generate(clfSpec *logging.ClusterLogForwarderSpec
sourceToPipelineLabels,
pipelineToOutputLabels,
outputLabels,
+ namespaces.List(),
}
result, err := engine.Execute("fluentConf", data)
if err != nil {
diff --git a/pkg/generators/forwarding/fluentd/source.go b/pkg/generators/forwarding/fluentd/source.go
index f5b2ba4570..d51bb361c8 100644
--- a/pkg/generators/forwarding/fluentd/source.go
+++ b/pkg/generators/forwarding/fluentd/source.go
@@ -2,7 +2,6 @@ package fluentd
import (
"fmt"
- "strings"
logging "github.com/openshift/cluster-logging-operator/pkg/apis/logging/v1"
"github.com/openshift/cluster-logging-operator/pkg/constants"
@@ -13,17 +12,14 @@ import (
// We need to also filter them per-user-SourceSpec since different SourceSpecs
// might request different namespaces.
-func (engine *ConfigGenerator) generateSource(sources sets.String, appNs sets.String) (results []string, err error) {
+func (engine *ConfigGenerator) generateSource(sources sets.String) (results []string, err error) {
// Order of templates matters.
- var templates, nsPaths []string
+ var templates []string
if sources.Has(logging.InputNameInfrastructure) {
templates = append(templates, "inputSourceJournalTemplate")
}
if sources.Has(logging.InputNameApplication) {
templates = append(templates, "inputSourceContainerTemplate")
- for _, ns := range appNs.List() {
- nsPaths = append(nsPaths, fmt.Sprintf("/var/log/containers/*_%s_*.log", ns))
- }
}
if sources.Has(logging.InputNameAudit) {
templates = append(templates, "inputSourceHostAuditTemplate")
@@ -38,13 +34,11 @@ func (engine *ConfigGenerator) generateSource(sources sets.String, appNs sets.St
CollectorPodNamePrefix string
LogStorePodNamePrefix string
VisualizationPodNamePrefix string
- AppNsPaths string
}{
constants.OpenshiftNS,
constants.FluentdName,
constants.ElasticsearchName,
constants.KibanaName,
- strings.Join(nsPaths, ", "),
}
for _, template := range templates {
result, err := engine.Execute(template, data)
diff --git a/pkg/generators/forwarding/fluentd/source_test.go b/pkg/generators/forwarding/fluentd/source_test.go
index bc172834a4..e312294fd7 100644
--- a/pkg/generators/forwarding/fluentd/source_test.go
+++ b/pkg/generators/forwarding/fluentd/source_test.go
@@ -23,7 +23,7 @@ var _ = Describe("generating source", func() {
Context("for only logs.app source", func() {
BeforeEach(func() {
- results, err = generator.generateSource(sets.NewString(logging.InputNameApplication), nil)
+ results, err = generator.generateSource(sets.NewString(logging.InputNameApplication))
Expect(err).To(BeNil())
Expect(len(results) == 1).To(BeTrue())
})
@@ -62,7 +62,7 @@ var _ = Describe("generating source", func() {
Context("for only logs.infra source", func() {
BeforeEach(func() {
- results, err = generator.generateSource(sets.NewString(logging.InputNameInfrastructure), nil)
+ results, err = generator.generateSource(sets.NewString(logging.InputNameInfrastructure))
Expect(err).To(BeNil())
Expect(len(results) == 1).To(BeTrue())
})
@@ -92,7 +92,7 @@ var _ = Describe("generating source", func() {
Context("for only logs.audit source", func() {
BeforeEach(func() {
- results, err = generator.generateSource(sets.NewString(logging.InputNameAudit), nil)
+ results, err = generator.generateSource(sets.NewString(logging.InputNameAudit))
Expect(err).To(BeNil())
Expect(len(results)).To(Equal(3))
})
@@ -154,7 +154,7 @@ var _ = Describe("generating source", func() {
Context("for all log sources", func() {
BeforeEach(func() {
- results, err = generator.generateSource(sets.NewString(logging.InputNameApplication, logging.InputNameInfrastructure, logging.InputNameAudit), nil)
+ results, err = generator.generateSource(sets.NewString(logging.InputNameApplication, logging.InputNameInfrastructure, logging.InputNameAudit))
Expect(err).To(BeNil())
Expect(len(results)).To(Equal(5))
})
diff --git a/pkg/generators/forwarding/fluentd/templates.go b/pkg/generators/forwarding/fluentd/templates.go
index 13fa642815..e0d30d5ecf 100644
--- a/pkg/generators/forwarding/fluentd/templates.go
+++ b/pkg/generators/forwarding/fluentd/templates.go
@@ -309,14 +309,23 @@ const fluentConfTemplate = `{{- define "fluentConf" -}}
@type null
{{- end}}
+{{- if .CollectAppLogs}}
+ {{- if .AppNamespaces}}
+
+ @type relabel
+ @label @_APPLICATION
+
+ {{- else}}
-{{- if .CollectAppLogs }}
@type relabel
@label @_APPLICATION
-{{- else }}
+
+ {{- end}}
+{{- else}}
+
@type null
-{{- end}}
+{{- end}}
{{- if .CollectAuditLogs }}
@type relabel
@@ -394,11 +403,7 @@ const inputSourceContainerTemplate = `{{- define "inputSourceContainerTemplate"
@type tail
@id container-input
- {{- if .AppNsPaths}}
- path {{.AppNsPaths}}
- {{else}}
path "/var/log/containers/*.log"
- {{end -}}
exclude_path ["/var/log/containers/{{.CollectorPodNamePrefix}}-*_{{.LoggingNamespace}}_*.log", "/var/log/containers/{{.LogStorePodNamePrefix}}-*_{{.LoggingNamespace}}_*.log", "/var/log/containers/{{.VisualizationPodNamePrefix}}-*_{{.LoggingNamespace}}_*.log"]
pos_file "/var/log/es-containers.log.pos"
refresh_interval 5
diff --git a/test/e2e/collection/fluentd/namespace_filtering_test.go b/test/e2e/collection/fluentd/namespace_filtering_test.go
new file mode 100644
index 0000000000..9dd9a2ea3f
--- /dev/null
+++ b/test/e2e/collection/fluentd/namespace_filtering_test.go
@@ -0,0 +1,121 @@
+package fluentd
+
+import (
+ "fmt"
+ "path/filepath"
+ "runtime"
+
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+ logging "github.com/openshift/cluster-logging-operator/pkg/apis/logging/v1"
+ "github.com/openshift/cluster-logging-operator/pkg/logger"
+ "github.com/openshift/cluster-logging-operator/test/helpers"
+ "github.com/openshift/cluster-logging-operator/test/helpers/oc"
+ apps "k8s.io/api/apps/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+)
+
+var _ = Describe("[Collection] Namespace filtering", func() {
+ _, filename, _, _ := runtime.Caller(0)
+ logger.Infof("Running %s", filename)
+ var (
+ err error
+ fluentDeployment *apps.Deployment
+ e2e = helpers.NewE2ETestFramework()
+ rootDir string
+ )
+ appNamespace1 := "application1"
+ appNamespace2 := "application2"
+
+ BeforeEach(func() {
+ if _, err = oc.Literal().From(fmt.Sprintf("oc create ns %s", appNamespace1)).Run(); err != nil {
+ Fail("failed to create namespace")
+ }
+ if _, err = oc.Literal().From(fmt.Sprintf("oc create ns %s", appNamespace2)).Run(); err != nil {
+ Fail("failed to create namespace")
+ }
+ })
+ BeforeEach(func() {
+ if err := e2e.DeployLogGeneratorWithNamespace(appNamespace1); err != nil {
+ Fail(fmt.Sprintf("Timed out waiting for the log generator 1 to deploy: %v", err))
+ }
+ if err := e2e.DeployLogGeneratorWithNamespace(appNamespace2); err != nil {
+ Fail(fmt.Sprintf("Timed out waiting for the log generator 2 to deploy: %v", err))
+ }
+ rootDir = filepath.Join(filepath.Dir(filename), "..", "..", "..", "..", "/")
+ if fluentDeployment, err = e2e.DeployFluentdReceiver(rootDir, false); err != nil {
+ Fail(fmt.Sprintf("Unable to deploy fluent receiver: %v", err))
+ }
+
+ forwarder := &logging.ClusterLogForwarder{
+ TypeMeta: metav1.TypeMeta{
+ Kind: logging.ClusterLogForwarderKind,
+ APIVersion: logging.SchemeGroupVersion.String(),
+ },
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "instance",
+ },
+ Spec: logging.ClusterLogForwarderSpec{
+ Inputs: []logging.InputSpec{
+ {
+ Name: "application-logs",
+ Application: &logging.Application{
+ Namespaces: []string{appNamespace1},
+ },
+ },
+ },
+ Outputs: []logging.OutputSpec{
+ {
+ Name: fluentDeployment.ObjectMeta.Name,
+ Type: logging.OutputTypeFluentdForward,
+ URL: fmt.Sprintf("tcp://%s.%s.svc:24224", fluentDeployment.ObjectMeta.Name, fluentDeployment.Namespace),
+ },
+ },
+ Pipelines: []logging.PipelineSpec{
+ {
+ Name: "test-app",
+ OutputRefs: []string{fluentDeployment.ObjectMeta.Name},
+ InputRefs: []string{"application-logs"},
+ },
+ },
+ },
+ }
+ if err := e2e.CreateClusterLogForwarder(forwarder); err != nil {
+ Fail(fmt.Sprintf("Unable to create an instance of clusterlogforwarder: %v", err))
+ }
+ cr := helpers.NewClusterLogging(helpers.ComponentTypeCollector)
+ if err := e2e.CreateClusterLogging(cr); err != nil {
+ Fail(fmt.Sprintf("Unable to create an instance of cluster logging: %v", err))
+ }
+ if err := e2e.WaitFor(helpers.ComponentTypeCollector); err != nil {
+ Fail(fmt.Sprintf("Failed waiting for component %s to be ready: %v", helpers.ComponentTypeCollector, err))
+ }
+
+ })
+ It("should send logs from one namespace only", func() {
+ Expect(e2e.LogStores[fluentDeployment.GetName()].HasApplicationLogs(helpers.DefaultWaitForLogsTimeout)).To(BeTrue(), "Expected to find stored application logs")
+
+ logs, err := e2e.LogStores[fluentDeployment.GetName()].ApplicationLogs(helpers.DefaultWaitForLogsTimeout)
+ Expect(err).To(BeNil(), fmt.Sprintf("Error fetching logs: %v", err))
+ Expect(len(logs)).To(Not(Equal(0)), "There were no documents returned in the logs")
+
+ // verify only appNamespace1 logs appear in Application logs
+ for _, log := range logs {
+ Expect(log.Kubernetes.NamespaceName).To(Equal(appNamespace1), fmt.Sprintf("log %#v", log))
+ }
+ })
+
+ AfterEach(func() {
+ e2e.Cleanup()
+ if _, err = oc.Literal().From(fmt.Sprintf("oc delete ns %s", appNamespace1)).Run(); err != nil {
+ Fail("failed to create namespace")
+ }
+ if _, err = oc.Literal().From(fmt.Sprintf("oc delete ns %s", appNamespace2)).Run(); err != nil {
+ Fail("failed to create namespace")
+ }
+ e2e.WaitForCleanupCompletion(helpers.OpenshiftLoggingNS, []string{"fluent-receiver", "fluentd"})
+ e2e.WaitForCleanupCompletion(appNamespace1, []string{"test"})
+ e2e.WaitForCleanupCompletion(appNamespace2, []string{"test"})
+ }, helpers.DefaultCleanUpTimeout)
+
+})
diff --git a/test/helpers/framework.go b/test/helpers/framework.go
index 3d5f14a324..8a38786103 100644
--- a/test/helpers/framework.go
+++ b/test/helpers/framework.go
@@ -98,8 +98,12 @@ func (tc *E2ETestFramework) AddCleanup(fn func() error) {
}
func (tc *E2ETestFramework) DeployLogGenerator() error {
- opts := metav1.CreateOptions{}
namespace := tc.CreateTestNamespace()
+ return tc.DeployLogGeneratorWithNamespace(namespace)
+}
+
+func (tc *E2ETestFramework) DeployLogGeneratorWithNamespace(namespace string) error {
+ opts := metav1.CreateOptions{}
container := corev1.Container{
Name: "log-generator",
Image: "busybox",
diff --git a/vendor/github.com/onsi/gomega/go.mod b/vendor/github.com/onsi/gomega/go.mod
index 1eb0dfa682..fe28a23007 100644
--- a/vendor/github.com/onsi/gomega/go.mod
+++ b/vendor/github.com/onsi/gomega/go.mod
@@ -1,5 +1,7 @@
module github.com/onsi/gomega
+go 1.14
+
require (
github.com/fsnotify/fsnotify v1.4.7 // indirect
github.com/golang/protobuf v1.2.0
@@ -14,4 +16,3 @@ require (
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/yaml.v2 v2.2.4
)
-