Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LOG-2864: Compatibility with ES default output #1587

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
49 changes: 15 additions & 34 deletions internal/k8shandler/forwarding.go
Expand Up @@ -123,31 +123,15 @@ func (clusterRequest *ClusterLoggingRequest) NormalizeForwarder() (*logging.Clus
clusterRequest.OutputSecrets = make(map[string]*corev1.Secret, len(clusterRequest.ForwarderSpec.Outputs))

// Check for default configuration
logStore := clusterRequest.Cluster.Spec.LogStore
if len(clusterRequest.ForwarderSpec.Pipelines) == 0 {
logStore := clusterRequest.Cluster.Spec.LogStore
if logStore != nil &&
(logStore.Type == logging.LogStoreTypeElasticsearch ||
logStore.Type == logging.LogStoreTypeLokiStack) {
if logStore != nil && logStore.Type != "" {
log.V(2).Info("ClusterLogForwarder forwarding to default store")
switch logStore.Type {
case logging.LogStoreTypeElasticsearch:
clusterRequest.ForwarderSpec.Pipelines = []logging.PipelineSpec{
{
InputRefs: []string{logging.InputNameApplication, logging.InputNameInfrastructure},
OutputRefs: []string{logging.OutputNameDefault},
},
}
case logging.LogStoreTypeLokiStack:
clusterRequest.ForwarderSpec.Pipelines = []logging.PipelineSpec{
{
InputRefs: []string{logging.InputNameApplication},
OutputRefs: []string{logging.OutputNameDefault},
},
{
InputRefs: []string{logging.InputNameInfrastructure},
OutputRefs: []string{logging.OutputNameDefault + "-infra"},
},
}
clusterRequest.ForwarderSpec.Pipelines = []logging.PipelineSpec{
{
InputRefs: []string{logging.InputNameApplication, logging.InputNameInfrastructure},
OutputRefs: []string{logging.OutputNameDefault},
},
}
// Continue with normalization to fill out spec and status.
} else if clusterRequest.ForwarderRequest == nil {
Expand All @@ -156,6 +140,13 @@ func (clusterRequest *ClusterLoggingRequest) NormalizeForwarder() (*logging.Clus
}
}

if logStore != nil && logStore.Type == logging.LogStoreTypeLokiStack {
outputs, pipelines := clusterRequest.processPipelinesForLokiStack(clusterRequest.ForwarderSpec.Pipelines)

clusterRequest.ForwarderSpec.Outputs = outputs
clusterRequest.ForwarderSpec.Pipelines = pipelines
}

spec := &logging.ClusterLogForwarderSpec{}
status := &logging.ClusterLogForwarderStatus{}

Expand Down Expand Up @@ -400,17 +391,7 @@ func (clusterRequest *ClusterLoggingRequest) verifyOutputs(spec *logging.Cluster
})
status.Outputs.Set(name, condReady)
case logging.LogStoreTypeLokiStack:
spec.Outputs = append(spec.Outputs,
logging.OutputSpec{
Name: logging.OutputNameDefault,
Type: logging.OutputTypeLoki,
URL: clusterRequest.LokiStackURL("application"),
},
logging.OutputSpec{
Name: logging.OutputNameDefault + "-infra",
Type: logging.OutputTypeLoki,
URL: clusterRequest.LokiStackURL("infrastructure"),
})
// The outputs for LokiStack will already have been added at this point
default:
status.Outputs.Set(name, condInvalid(fmt.Sprintf("unknown log store type: %s", clusterRequest.Cluster.Spec.LogStore.Type)))
}
Expand Down
39 changes: 35 additions & 4 deletions internal/k8shandler/forwarding_test.go
Expand Up @@ -641,23 +641,54 @@ pipelines:
spec, _ := request.NormalizeForwarder()
Expect(YAMLString(spec)).To(EqualLines(`
outputs:
- name: default
- name: default-loki-apps
type: loki
url: https://lokistack-testing-gateway-http.aNamespace.svc:8080/api/logs/v1/application
- name: default-infra
- name: default-loki-infra
type: loki
url: https://lokistack-testing-gateway-http.aNamespace.svc:8080/api/logs/v1/infrastructure
pipelines:
- inputRefs:
- application
name: pipeline_0_
outputRefs:
- default
- default-loki-apps
- inputRefs:
- infrastructure
name: pipeline_1_
outputRefs:
- default-infra
- default-loki-infra
`))
})

It("processes custom pipelines to default LokiStack log store", func() {
cluster.Spec.LogStore = &logging.LogStoreSpec{
Type: logging.LogStoreTypeLokiStack,
LokiStack: logging.LokiStackStoreSpec{
Name: "lokistack-testing",
},
}
request.ForwarderSpec = logging.ClusterLogForwarderSpec{
Pipelines: []logging.PipelineSpec{
{
InputRefs: []string{"audit"},
OutputRefs: []string{"default"},
},
},
}

spec, _ := request.NormalizeForwarder()
Expect(YAMLString(spec)).To(EqualLines(`
outputs:
- name: default-loki-audit
type: loki
url: https://lokistack-testing-gateway-http.aNamespace.svc:8080/api/logs/v1/audit
pipelines:
- inputRefs:
- audit
name: pipeline_0_
outputRefs:
- default-loki-audit
`))
})

Expand Down
72 changes: 72 additions & 0 deletions internal/k8shandler/logstore_lokistack.go
Expand Up @@ -2,13 +2,17 @@ package k8shandler

import (
"fmt"
"sort"
"strings"

"github.com/ViaQ/logerr/v2/kverrors"
log "github.com/ViaQ/logerr/v2/log/static"
loggingv1 "github.com/openshift/cluster-logging-operator/apis/logging/v1"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/strings/slices"
)

const (
Expand Down Expand Up @@ -126,6 +130,7 @@ func newLokiStackClusterRole() *rbacv1.ClusterRole {
},
Resources: []string{
"application",
"audit",
"infrastructure",
},
ResourceNames: []string{
Expand Down Expand Up @@ -167,3 +172,70 @@ func compareLokiStackClusterRoleBinding(got, want *rbacv1.ClusterRoleBinding) bo
return equality.Semantic.DeepEqual(got.RoleRef, want.RoleRef) &&
equality.Semantic.DeepEqual(got.Subjects, want.Subjects)
}

func (clusterRequest *ClusterLoggingRequest) processPipelinesForLokiStack(inPipelines []loggingv1.PipelineSpec) ([]loggingv1.OutputSpec, []loggingv1.PipelineSpec) {
needOutput := make(map[string]bool)
pipelines := []loggingv1.PipelineSpec{}

for _, p := range inPipelines {
if !slices.Contains(p.OutputRefs, loggingv1.OutputNameDefault) {
// Skip pipelines that do not reference "default" output
pipelines = append(pipelines, p)
continue
}

for _, i := range p.InputRefs {
needOutput[i] = true
}

for i, input := range p.InputRefs {
pOut := p.DeepCopy()
pOut.InputRefs = []string{input}

for i, output := range pOut.OutputRefs {
if output != loggingv1.OutputNameDefault {
// Leave non-default output names as-is
continue
}

pOut.OutputRefs[i] = lokiStackOutput(input)
}

if pOut.Name != "" && i > 0 {
// Generate new name for named pipelines as duplicate names are not allowed
pOut.Name = fmt.Sprintf("%s-%d", pOut.Name, i)
}

pipelines = append(pipelines, *pOut)
}
}

outputs := []loggingv1.OutputSpec{}
for input := range needOutput {
outputs = append(outputs, loggingv1.OutputSpec{
Name: lokiStackOutput(input),
Type: loggingv1.OutputTypeLoki,
URL: clusterRequest.LokiStackURL(input),
})
}

// Sort outputs, because we have tests depending on the exact generated configuration
sort.Slice(outputs, func(i, j int) bool {
return strings.Compare(outputs[i].Name, outputs[j].Name) < 0
})

return outputs, pipelines
}

func lokiStackOutput(inputName string) string {
switch inputName {
case loggingv1.InputNameApplication:
return loggingv1.OutputNameDefault + "-loki-apps"
case loggingv1.InputNameInfrastructure:
return loggingv1.OutputNameDefault + "-loki-infra"
case loggingv1.InputNameAudit:
return loggingv1.OutputNameDefault + "-loki-audit"
}

return ""
}