diff --git a/internal/k8shandler/forwarding.go b/internal/k8shandler/forwarding.go index 7d7f02375..1e7eb88b5 100644 --- a/internal/k8shandler/forwarding.go +++ b/internal/k8shandler/forwarding.go @@ -123,31 +123,15 @@ func (clusterRequest *ClusterLoggingRequest) NormalizeForwarder() (*logging.Clus clusterRequest.OutputSecrets = make(map[string]*corev1.Secret, len(clusterRequest.ForwarderSpec.Outputs)) // Check for default configuration + logStore := clusterRequest.Cluster.Spec.LogStore if len(clusterRequest.ForwarderSpec.Pipelines) == 0 { - logStore := clusterRequest.Cluster.Spec.LogStore - if logStore != nil && - (logStore.Type == logging.LogStoreTypeElasticsearch || - logStore.Type == logging.LogStoreTypeLokiStack) { + if logStore != nil && logStore.Type != "" { log.V(2).Info("ClusterLogForwarder forwarding to default store") - switch logStore.Type { - case logging.LogStoreTypeElasticsearch: - clusterRequest.ForwarderSpec.Pipelines = []logging.PipelineSpec{ - { - InputRefs: []string{logging.InputNameApplication, logging.InputNameInfrastructure}, - OutputRefs: []string{logging.OutputNameDefault}, - }, - } - case logging.LogStoreTypeLokiStack: - clusterRequest.ForwarderSpec.Pipelines = []logging.PipelineSpec{ - { - InputRefs: []string{logging.InputNameApplication}, - OutputRefs: []string{logging.OutputNameDefault}, - }, - { - InputRefs: []string{logging.InputNameInfrastructure}, - OutputRefs: []string{logging.OutputNameDefault + "-infra"}, - }, - } + clusterRequest.ForwarderSpec.Pipelines = []logging.PipelineSpec{ + { + InputRefs: []string{logging.InputNameApplication, logging.InputNameInfrastructure}, + OutputRefs: []string{logging.OutputNameDefault}, + }, } // Continue with normalization to fill out spec and status. } else if clusterRequest.ForwarderRequest == nil { @@ -156,6 +140,13 @@ func (clusterRequest *ClusterLoggingRequest) NormalizeForwarder() (*logging.Clus } } + if logStore != nil && logStore.Type == logging.LogStoreTypeLokiStack { + outputs, pipelines := clusterRequest.processPipelinesForLokiStack(clusterRequest.ForwarderSpec.Pipelines) + + clusterRequest.ForwarderSpec.Outputs = outputs + clusterRequest.ForwarderSpec.Pipelines = pipelines + } + spec := &logging.ClusterLogForwarderSpec{} status := &logging.ClusterLogForwarderStatus{} @@ -400,17 +391,7 @@ func (clusterRequest *ClusterLoggingRequest) verifyOutputs(spec *logging.Cluster }) status.Outputs.Set(name, condReady) case logging.LogStoreTypeLokiStack: - spec.Outputs = append(spec.Outputs, - logging.OutputSpec{ - Name: logging.OutputNameDefault, - Type: logging.OutputTypeLoki, - URL: clusterRequest.LokiStackURL("application"), - }, - logging.OutputSpec{ - Name: logging.OutputNameDefault + "-infra", - Type: logging.OutputTypeLoki, - URL: clusterRequest.LokiStackURL("infrastructure"), - }) + // The outputs for LokiStack will already have been added at this point default: status.Outputs.Set(name, condInvalid(fmt.Sprintf("unknown log store type: %s", clusterRequest.Cluster.Spec.LogStore.Type))) } diff --git a/internal/k8shandler/forwarding_test.go b/internal/k8shandler/forwarding_test.go index 6ab074cdc..8f08ab338 100644 --- a/internal/k8shandler/forwarding_test.go +++ b/internal/k8shandler/forwarding_test.go @@ -641,10 +641,10 @@ pipelines: spec, _ := request.NormalizeForwarder() Expect(YAMLString(spec)).To(EqualLines(` outputs: -- name: default +- name: default-loki-apps type: loki url: https://lokistack-testing-gateway-http.aNamespace.svc:8080/api/logs/v1/application -- name: default-infra +- name: default-loki-infra type: loki url: https://lokistack-testing-gateway-http.aNamespace.svc:8080/api/logs/v1/infrastructure pipelines: @@ -652,12 +652,43 @@ pipelines: - application name: pipeline_0_ outputRefs: - - default + - default-loki-apps - inputRefs: - infrastructure name: pipeline_1_ outputRefs: - - default-infra + - default-loki-infra +`)) + }) + + It("processes custom pipelines to default LokiStack log store", func() { + cluster.Spec.LogStore = &logging.LogStoreSpec{ + Type: logging.LogStoreTypeLokiStack, + LokiStack: logging.LokiStackStoreSpec{ + Name: "lokistack-testing", + }, + } + request.ForwarderSpec = logging.ClusterLogForwarderSpec{ + Pipelines: []logging.PipelineSpec{ + { + InputRefs: []string{"audit"}, + OutputRefs: []string{"default"}, + }, + }, + } + + spec, _ := request.NormalizeForwarder() + Expect(YAMLString(spec)).To(EqualLines(` +outputs: +- name: default-loki-audit + type: loki + url: https://lokistack-testing-gateway-http.aNamespace.svc:8080/api/logs/v1/audit +pipelines: +- inputRefs: + - audit + name: pipeline_0_ + outputRefs: + - default-loki-audit `)) }) diff --git a/internal/k8shandler/logstore_lokistack.go b/internal/k8shandler/logstore_lokistack.go index 37eb59ed1..0fd367f49 100644 --- a/internal/k8shandler/logstore_lokistack.go +++ b/internal/k8shandler/logstore_lokistack.go @@ -2,13 +2,17 @@ package k8shandler import ( "fmt" + "sort" + "strings" "github.com/ViaQ/logerr/v2/kverrors" log "github.com/ViaQ/logerr/v2/log/static" + loggingv1 "github.com/openshift/cluster-logging-operator/apis/logging/v1" rbacv1 "k8s.io/api/rbac/v1" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/strings/slices" ) const ( @@ -126,6 +130,7 @@ func newLokiStackClusterRole() *rbacv1.ClusterRole { }, Resources: []string{ "application", + "audit", "infrastructure", }, ResourceNames: []string{ @@ -167,3 +172,70 @@ func compareLokiStackClusterRoleBinding(got, want *rbacv1.ClusterRoleBinding) bo return equality.Semantic.DeepEqual(got.RoleRef, want.RoleRef) && equality.Semantic.DeepEqual(got.Subjects, want.Subjects) } + +func (clusterRequest *ClusterLoggingRequest) processPipelinesForLokiStack(inPipelines []loggingv1.PipelineSpec) ([]loggingv1.OutputSpec, []loggingv1.PipelineSpec) { + needOutput := make(map[string]bool) + pipelines := []loggingv1.PipelineSpec{} + + for _, p := range inPipelines { + if !slices.Contains(p.OutputRefs, loggingv1.OutputNameDefault) { + // Skip pipelines that do not reference "default" output + pipelines = append(pipelines, p) + continue + } + + for _, i := range p.InputRefs { + needOutput[i] = true + } + + for i, input := range p.InputRefs { + pOut := p.DeepCopy() + pOut.InputRefs = []string{input} + + for i, output := range pOut.OutputRefs { + if output != loggingv1.OutputNameDefault { + // Leave non-default output names as-is + continue + } + + pOut.OutputRefs[i] = lokiStackOutput(input) + } + + if pOut.Name != "" && i > 0 { + // Generate new name for named pipelines as duplicate names are not allowed + pOut.Name = fmt.Sprintf("%s-%d", pOut.Name, i) + } + + pipelines = append(pipelines, *pOut) + } + } + + outputs := []loggingv1.OutputSpec{} + for input := range needOutput { + outputs = append(outputs, loggingv1.OutputSpec{ + Name: lokiStackOutput(input), + Type: loggingv1.OutputTypeLoki, + URL: clusterRequest.LokiStackURL(input), + }) + } + + // Sort outputs, because we have tests depending on the exact generated configuration + sort.Slice(outputs, func(i, j int) bool { + return strings.Compare(outputs[i].Name, outputs[j].Name) < 0 + }) + + return outputs, pipelines +} + +func lokiStackOutput(inputName string) string { + switch inputName { + case loggingv1.InputNameApplication: + return loggingv1.OutputNameDefault + "-loki-apps" + case loggingv1.InputNameInfrastructure: + return loggingv1.OutputNameDefault + "-loki-infra" + case loggingv1.InputNameAudit: + return loggingv1.OutputNameDefault + "-loki-audit" + } + + return "" +} diff --git a/internal/k8shandler/logstore_lokistack_test.go b/internal/k8shandler/logstore_lokistack_test.go index 25b0b0fc8..c3280f75e 100644 --- a/internal/k8shandler/logstore_lokistack_test.go +++ b/internal/k8shandler/logstore_lokistack_test.go @@ -3,7 +3,10 @@ package k8shandler import ( "testing" + "github.com/google/go-cmp/cmp" + loggingv1 "github.com/openshift/cluster-logging-operator/apis/logging/v1" rbacv1 "k8s.io/api/rbac/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) func TestCompareLokiStackClusterRole(t *testing.T) { @@ -160,3 +163,227 @@ func TestCompareLokiStackClusterRoleBinding(t *testing.T) { }) } } + +func TestProcessPipelinesForLokiStack(t *testing.T) { + tests := []struct { + desc string + in []loggingv1.PipelineSpec + wantOutputs []loggingv1.OutputSpec + wantPipelines []loggingv1.PipelineSpec + }{ + { + desc: "no default output", + in: []loggingv1.PipelineSpec{ + { + OutputRefs: []string{"custom-output"}, + InputRefs: []string{loggingv1.InputNameApplication}, + }, + }, + wantOutputs: []loggingv1.OutputSpec{}, + wantPipelines: []loggingv1.PipelineSpec{ + { + OutputRefs: []string{"custom-output"}, + InputRefs: []string{loggingv1.InputNameApplication}, + }, + }, + }, + { + desc: "single tenant - single output", + in: []loggingv1.PipelineSpec{ + { + OutputRefs: []string{loggingv1.OutputNameDefault}, + InputRefs: []string{loggingv1.InputNameApplication}, + }, + }, + wantOutputs: []loggingv1.OutputSpec{ + { + Name: loggingv1.OutputNameDefault + "-loki-apps", + Type: loggingv1.OutputTypeLoki, + URL: "https://lokistack-testing-gateway-http.aNamespace.svc:8080/api/logs/v1/application", + }, + }, + wantPipelines: []loggingv1.PipelineSpec{ + { + OutputRefs: []string{loggingv1.OutputNameDefault + "-loki-apps"}, + InputRefs: []string{loggingv1.InputNameApplication}, + }, + }, + }, + { + desc: "multiple tenants - single output", + in: []loggingv1.PipelineSpec{ + { + OutputRefs: []string{loggingv1.OutputNameDefault}, + InputRefs: []string{ + loggingv1.InputNameApplication, + loggingv1.InputNameInfrastructure, + }, + }, + }, + wantOutputs: []loggingv1.OutputSpec{ + { + Name: loggingv1.OutputNameDefault + "-loki-apps", + Type: loggingv1.OutputTypeLoki, + URL: "https://lokistack-testing-gateway-http.aNamespace.svc:8080/api/logs/v1/application", + }, + { + Name: loggingv1.OutputNameDefault + "-loki-infra", + Type: loggingv1.OutputTypeLoki, + URL: "https://lokistack-testing-gateway-http.aNamespace.svc:8080/api/logs/v1/infrastructure", + }, + }, + wantPipelines: []loggingv1.PipelineSpec{ + { + OutputRefs: []string{loggingv1.OutputNameDefault + "-loki-apps"}, + InputRefs: []string{loggingv1.InputNameApplication}, + }, + { + OutputRefs: []string{loggingv1.OutputNameDefault + "-loki-infra"}, + InputRefs: []string{loggingv1.InputNameInfrastructure}, + }, + }, + }, + { + desc: "multiple tenants - single output - named pipeline", + in: []loggingv1.PipelineSpec{ + { + Name: "named-pipeline", + OutputRefs: []string{loggingv1.OutputNameDefault}, + InputRefs: []string{ + loggingv1.InputNameApplication, + loggingv1.InputNameInfrastructure, + }, + }, + }, + wantOutputs: []loggingv1.OutputSpec{ + { + Name: loggingv1.OutputNameDefault + "-loki-apps", + Type: loggingv1.OutputTypeLoki, + URL: "https://lokistack-testing-gateway-http.aNamespace.svc:8080/api/logs/v1/application", + }, + { + Name: loggingv1.OutputNameDefault + "-loki-infra", + Type: loggingv1.OutputTypeLoki, + URL: "https://lokistack-testing-gateway-http.aNamespace.svc:8080/api/logs/v1/infrastructure", + }, + }, + wantPipelines: []loggingv1.PipelineSpec{ + { + Name: "named-pipeline", + OutputRefs: []string{loggingv1.OutputNameDefault + "-loki-apps"}, + InputRefs: []string{loggingv1.InputNameApplication}, + }, + { + Name: "named-pipeline-1", + OutputRefs: []string{loggingv1.OutputNameDefault + "-loki-infra"}, + InputRefs: []string{loggingv1.InputNameInfrastructure}, + }, + }, + }, + { + desc: "single tenant - multiple outputs", + in: []loggingv1.PipelineSpec{ + { + OutputRefs: []string{ + "custom-output", + loggingv1.OutputNameDefault, + }, + InputRefs: []string{ + loggingv1.InputNameInfrastructure, + }, + }, + }, + wantOutputs: []loggingv1.OutputSpec{ + { + Name: loggingv1.OutputNameDefault + "-loki-infra", + Type: loggingv1.OutputTypeLoki, + URL: "https://lokistack-testing-gateway-http.aNamespace.svc:8080/api/logs/v1/infrastructure", + }, + }, + wantPipelines: []loggingv1.PipelineSpec{ + { + OutputRefs: []string{ + "custom-output", + loggingv1.OutputNameDefault + "-loki-infra", + }, + InputRefs: []string{loggingv1.InputNameInfrastructure}, + }, + }, + }, + { + desc: "multiple tenants - multiple outputs", + in: []loggingv1.PipelineSpec{ + { + OutputRefs: []string{ + "custom-output", + loggingv1.OutputNameDefault, + }, + InputRefs: []string{ + loggingv1.InputNameInfrastructure, + loggingv1.InputNameAudit, + }, + }, + }, + wantOutputs: []loggingv1.OutputSpec{ + { + Name: loggingv1.OutputNameDefault + "-loki-audit", + Type: loggingv1.OutputTypeLoki, + URL: "https://lokistack-testing-gateway-http.aNamespace.svc:8080/api/logs/v1/audit", + }, + { + Name: loggingv1.OutputNameDefault + "-loki-infra", + Type: loggingv1.OutputTypeLoki, + URL: "https://lokistack-testing-gateway-http.aNamespace.svc:8080/api/logs/v1/infrastructure", + }, + }, + wantPipelines: []loggingv1.PipelineSpec{ + { + OutputRefs: []string{ + "custom-output", + loggingv1.OutputNameDefault + "-loki-infra", + }, + InputRefs: []string{loggingv1.InputNameInfrastructure}, + }, + { + OutputRefs: []string{ + "custom-output", + loggingv1.OutputNameDefault + "-loki-audit", + }, + InputRefs: []string{loggingv1.InputNameAudit}, + }, + }, + }, + } + + for _, tc := range tests { + tc := tc + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + + cr := &ClusterLoggingRequest{ + Cluster: &loggingv1.ClusterLogging{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: aNamespace, + }, + Spec: loggingv1.ClusterLoggingSpec{ + LogStore: &loggingv1.LogStoreSpec{ + Type: loggingv1.LogStoreTypeLokiStack, + LokiStack: loggingv1.LokiStackStoreSpec{ + Name: "lokistack-testing", + }, + }, + }, + }, + } + outputs, pipelines := cr.processPipelinesForLokiStack(tc.in) + + if diff := cmp.Diff(outputs, tc.wantOutputs); diff != "" { + t.Errorf("outputs differ: -got+want\n%s", diff) + } + + if diff := cmp.Diff(pipelines, tc.wantPipelines); diff != "" { + t.Errorf("pipelines differ: -got+want\n%s", diff) + } + }) + } +}