Skip to content

Commit

Permalink
LOG-3945: Collector pods in CrashLoopBackOff when ClusterLogForwarder…
Browse files Browse the repository at this point in the history
… pipeline has space in between the pipeline name.
  • Loading branch information
Clee2691 committed Apr 21, 2023
1 parent ea2d07a commit cf0ef97
Show file tree
Hide file tree
Showing 8 changed files with 535 additions and 23 deletions.
456 changes: 456 additions & 0 deletions internal/generator/vector/conf_test.go

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions internal/generator/vector/output/gcl/gcl.go
Expand Up @@ -2,7 +2,6 @@ package gcl

import (
"fmt"
"strings"

logging "github.com/openshift/cluster-logging-operator/apis/logging/v1"

Expand Down Expand Up @@ -65,7 +64,7 @@ node_name = "{{"{{hostname}}"}}"
func Conf(o logging.OutputSpec, inputs []string, secret *corev1.Secret, op Options) []Element {
if genhelper.IsDebugOutput(op) {
return []Element{
Debug(strings.ToLower(vectorhelpers.Replacer.Replace(o.Name)), vectorhelpers.MakeInputs(inputs...)),
Debug(vectorhelpers.FormatComponentID(o.Name), vectorhelpers.MakeInputs(inputs...)),
}
}
if o.GoogleCloudLogging == nil {
Expand Down
15 changes: 7 additions & 8 deletions internal/generator/vector/output/http/http.go
Expand Up @@ -2,7 +2,6 @@ package http

import (
"fmt"
"strings"

logging "github.com/openshift/cluster-logging-operator/apis/logging/v1"
"github.com/openshift/cluster-logging-operator/internal/constants"
Expand Down Expand Up @@ -111,12 +110,12 @@ func Normalize(componentID string, inputs []string) Element {

func Conf(o logging.OutputSpec, inputs []string, secret *corev1.Secret, op Options) []Element {
outputName := helpers.FormatComponentID(o.Name)
component := strings.ToLower(vectorhelpers.Replacer.Replace(fmt.Sprintf("%s_%s", o.Name, NormalizeHttp)))
component := fmt.Sprintf("%s_%s", outputName, NormalizeHttp)
dedottedID := normalize.ID(outputName, "dedot")
if genhelper.IsDebugOutput(op) {
return []Element{
Normalize(component, inputs),
Debug(strings.ToLower(vectorhelpers.Replacer.Replace(o.Name)), component),
Debug(vectorhelpers.FormatComponentID(o.Name), component),
}
}
return MergeElements(
Expand All @@ -135,7 +134,7 @@ func Conf(o logging.OutputSpec, inputs []string, secret *corev1.Secret, op Optio

func Output(o logging.OutputSpec, inputs []string, secret *corev1.Secret, op Options) Element {
return Http{
ComponentID: strings.ToLower(vectorhelpers.Replacer.Replace(o.Name)),
ComponentID: vectorhelpers.FormatComponentID(o.Name),
Inputs: vectorhelpers.MakeInputs(inputs...),
URI: o.URL,
Method: Method(o.Http),
Expand Down Expand Up @@ -175,7 +174,7 @@ func Request(o logging.OutputSpec) Element {
timeout = o.Http.Timeout
}
return HttpRequest{
ComponentID: strings.ToLower(vectorhelpers.Replacer.Replace(o.Name)),
ComponentID: vectorhelpers.FormatComponentID(o.Name),
Timeout: timeout,
Headers: Headers(o),
}
Expand All @@ -190,7 +189,7 @@ func Headers(o logging.OutputSpec) Element {

func Encoding(o logging.OutputSpec) Element {
return HttpEncoding{
ComponentID: strings.ToLower(vectorhelpers.Replacer.Replace(o.Name)),
ComponentID: vectorhelpers.FormatComponentID(o.Name),
Codec: httpEncodingJson,
}
}
Expand All @@ -211,7 +210,7 @@ func BasicAuth(o logging.OutputSpec, secret *corev1.Secret) []Element {
hasBasicAuth := false
conf = append(conf, BasicAuthConf{
Desc: "Basic Auth Config",
ComponentID: strings.ToLower(vectorhelpers.Replacer.Replace(o.Name)),
ComponentID: vectorhelpers.FormatComponentID(o.Name),
})
if security.HasUsernamePassword(secret) {
hasBasicAuth = true
Expand All @@ -237,7 +236,7 @@ func BearerTokenAuth(o logging.OutputSpec, secret *corev1.Secret) []Element {
if security.HasBearerTokenFileKey(secret) {
conf = append(conf, BasicAuthConf{
Desc: "Bearer Auth Config",
ComponentID: strings.ToLower(vectorhelpers.Replacer.Replace(o.Name)),
ComponentID: vectorhelpers.FormatComponentID(o.Name),
}, BearerToken{
Token: security.GetFromSecret(secret, constants.BearerTokenFileKey),
})
Expand Down
2 changes: 1 addition & 1 deletion internal/generator/vector/output/kafka/kafka.go
Expand Up @@ -49,7 +49,7 @@ topic = {{.Topic}}
func Conf(o logging.OutputSpec, inputs []string, secret *corev1.Secret, op Options) []Element {
if genhelper.IsDebugOutput(op) {
return []Element{
Debug(strings.ToLower(vectorhelpers.Replacer.Replace(o.Name)), vectorhelpers.MakeInputs(inputs...)),
Debug(vectorhelpers.FormatComponentID(o.Name), vectorhelpers.MakeInputs(inputs...)),
}
}

Expand Down
14 changes: 7 additions & 7 deletions internal/generator/vector/output/loki/loki.go
Expand Up @@ -109,7 +109,7 @@ func (l LokiLabels) Template() string {
func Conf(o logging.OutputSpec, inputs []string, secret *corev1.Secret, op Options) []Element {
if genhelper.IsDebugOutput(op) {
return []Element{
Debug(strings.ToLower(vectorhelpers.Replacer.Replace(o.Name)), vectorhelpers.MakeInputs(inputs...)),
Debug(vectorhelpers.FormatComponentID(o.Name), vectorhelpers.MakeInputs(inputs...)),
}
}
outputName := helpers.FormatComponentID(o.Name)
Expand All @@ -131,7 +131,7 @@ func Conf(o logging.OutputSpec, inputs []string, secret *corev1.Secret, op Optio

func Output(o logging.OutputSpec, inputs []string) Element {
return Loki{
ComponentID: strings.ToLower(vectorhelpers.Replacer.Replace(o.Name)),
ComponentID: vectorhelpers.FormatComponentID(o.Name),
Inputs: vectorhelpers.MakeInputs(inputs...),
Endpoint: o.URL,
TenantID: Tenant(o.Loki),
Expand All @@ -140,7 +140,7 @@ func Output(o logging.OutputSpec, inputs []string) Element {

func Encoding(o logging.OutputSpec) Element {
return LokiEncoding{
ComponentID: strings.ToLower(vectorhelpers.Replacer.Replace(o.Name)),
ComponentID: vectorhelpers.FormatComponentID(o.Name),
Codec: lokiEncodingJson,
}
}
Expand Down Expand Up @@ -177,7 +177,7 @@ func lokiLabels(lo *logging.Loki) []Label {

func Labels(o logging.OutputSpec) Element {
return LokiLabels{
ComponentID: strings.ToLower(vectorhelpers.Replacer.Replace(o.Name)),
ComponentID: vectorhelpers.FormatComponentID(o.Name),
Labels: lokiLabels(o.Loki),
}
}
Expand All @@ -199,7 +199,7 @@ func TLSConf(o logging.OutputSpec, secret *corev1.Secret, op Options) []Element
// Set CA from logcollector ServiceAccount for internal Loki
return []Element{
security.TLSConf{
ComponentID: strings.ToLower(vectorhelpers.Replacer.Replace(o.Name)),
ComponentID: vectorhelpers.FormatComponentID(o.Name),
CAFilePath: `"/var/run/secrets/kubernetes.io/serviceaccount/service-ca.crt"`,
},
}
Expand All @@ -214,7 +214,7 @@ func BasicAuth(o logging.OutputSpec, secret *corev1.Secret) []Element {
hasBasicAuth := false
conf = append(conf, BasicAuthConf{
Desc: "Basic Auth Config",
ComponentID: strings.ToLower(vectorhelpers.Replacer.Replace(o.Name)),
ComponentID: vectorhelpers.FormatComponentID(o.Name),
})
if security.HasUsernamePassword(secret) {
hasBasicAuth = true
Expand All @@ -240,7 +240,7 @@ func BearerTokenAuth(o logging.OutputSpec, secret *corev1.Secret) []Element {
if security.HasBearerTokenFileKey(secret) {
conf = append(conf, BasicAuthConf{
Desc: "Bearer Auth Config",
ComponentID: strings.ToLower(vectorhelpers.Replacer.Replace(o.Name)),
ComponentID: vectorhelpers.FormatComponentID(o.Name),
}, BearerToken{
Token: security.GetFromSecret(secret, constants.BearerTokenFileKey),
})
Expand Down
7 changes: 3 additions & 4 deletions internal/generator/vector/output/splunk/splunk.go
Expand Up @@ -2,7 +2,6 @@ package splunk

import (
"fmt"
"strings"

"github.com/openshift/cluster-logging-operator/internal/generator/vector/normalize"

Expand Down Expand Up @@ -61,7 +60,7 @@ codec = {{.Codec}}
func Conf(o logging.OutputSpec, inputs []string, secret *corev1.Secret, op Options) []Element {
if genhelper.IsDebugOutput(op) {
return []Element{
Debug(strings.ToLower(vectorhelpers.Replacer.Replace(o.Name)), vectorhelpers.MakeInputs(inputs...)),
Debug(vectorhelpers.FormatComponentID(o.Name), vectorhelpers.MakeInputs(inputs...)),
}
}
outputName := vectorhelpers.FormatComponentID(o.Name)
Expand All @@ -78,7 +77,7 @@ func Conf(o logging.OutputSpec, inputs []string, secret *corev1.Secret, op Optio

func Output(o logging.OutputSpec, inputs []string, secret *corev1.Secret, op Options) Element {
return Splunk{
ComponentID: strings.ToLower(vectorhelpers.Replacer.Replace(o.Name)),
ComponentID: vectorhelpers.FormatComponentID(o.Name),
Inputs: vectorhelpers.MakeInputs(inputs...),
Endpoint: o.URL,
DefaultToken: security.GetFromSecret(secret, "hecToken"),
Expand All @@ -87,7 +86,7 @@ func Output(o logging.OutputSpec, inputs []string, secret *corev1.Secret, op Opt

func Encoding(o logging.OutputSpec) Element {
return SplunkEncoding{
ComponentID: strings.ToLower(vectorhelpers.Replacer.Replace(o.Name)),
ComponentID: vectorhelpers.FormatComponentID(o.Name),
Codec: splunkEncodingJson,
}
}
Expand Down
6 changes: 5 additions & 1 deletion internal/generator/vector/pipelines.go
Expand Up @@ -22,8 +22,12 @@ var (
func Pipelines(spec *logging.ClusterLogForwarderSpec, op generator.Options) []generator.Element {
el := []generator.Element{}
userDefined := spec.InputMap()
for _, p := range spec.Pipelines {
for i, p := range spec.Pipelines {
inputs := []string{}
// Sanitize pipeline name and persist it for config generation
p.Name = helpers.FormatComponentID(p.Name)
spec.Pipelines[i].Name = p.Name

for _, inputName := range p.InputRefs {
if _, ok := userDefined[inputName]; ok {
inputs = append(inputs, fmt.Sprintf(UserDefinedInput, inputName))
Expand Down
55 changes: 55 additions & 0 deletions internal/generator/vector/sources_to_pipelines_test.go
Expand Up @@ -378,6 +378,61 @@ inputs = ["detect_exceptions_pipeline"]
source = '''
.
'''
`,
}),
Entry("pipeline with spaces are sanitized", helpers.ConfGenerateTest{
CLFSpec: logging.ClusterLogForwarderSpec{
Pipelines: []logging.PipelineSpec{
{
InputRefs: []string{
logging.InputNameApplication,
logging.InputNameInfrastructure,
logging.InputNameAudit,
},
OutputRefs: []string{logging.OutputNameDefault},
Name: "pipeline with space",
},
},
},
ExpectedConf: `
[transforms.route_container_logs]
type = "route"
inputs = ["container_logs"]
route.app = '!((starts_with!(.kubernetes.namespace_name,"kube-")) || (starts_with!(.kubernetes.namespace_name,"openshift-")) || (.kubernetes.namespace_name == "default") || (.kubernetes.namespace_name == "openshift") || (.kubernetes.namespace_name == "kube"))'
route.infra = '(starts_with!(.kubernetes.namespace_name,"kube-")) || (starts_with!(.kubernetes.namespace_name,"openshift-")) || (.kubernetes.namespace_name == "default") || (.kubernetes.namespace_name == "openshift") || (.kubernetes.namespace_name == "kube")'
# Set log_type to "application"
[transforms.application]
type = "remap"
inputs = ["route_container_logs.app"]
source = '''
.log_type = "application"
'''
# Set log_type to "infrastructure"
[transforms.infrastructure]
type = "remap"
inputs = ["route_container_logs.infra","journal_logs"]
source = '''
.log_type = "infrastructure"
'''
# Set log_type to "audit"
[transforms.audit]
type = "remap"
inputs = ["host_audit_logs","k8s_audit_logs","openshift_audit_logs","ovn_audit_logs"]
source = '''
.log_type = "audit"
.hostname = get_env_var("VECTOR_SELF_NODE_NAME") ?? ""
ts = del(.timestamp); if !exists(."@timestamp") {."@timestamp" = ts}
'''
[transforms.pipeline_with_space]
type = "remap"
inputs = ["application","infrastructure","audit"]
source = '''
.
'''
`,
}),
)
Expand Down

0 comments on commit cf0ef97

Please sign in to comment.