From a85b2c2642ab83ab620db6816e3f6c1371aefe7c Mon Sep 17 00:00:00 2001 From: Calvin Lee Date: Fri, 21 Apr 2023 13:41:09 -0400 Subject: [PATCH] LOG-3945: Collector pods in CrashLoopBackOff when ClusterLogForwarder pipeline has space in between the pipeline name. --- internal/generator/vector/conf_test.go | 456 ++++++++++++++++++ internal/generator/vector/output/gcl/gcl.go | 3 +- internal/generator/vector/output/http/http.go | 15 +- .../generator/vector/output/kafka/kafka.go | 2 +- internal/generator/vector/output/loki/loki.go | 14 +- .../generator/vector/output/splunk/splunk.go | 7 +- internal/generator/vector/pipelines.go | 6 +- .../vector/sources_to_pipelines_test.go | 55 +++ 8 files changed, 535 insertions(+), 23 deletions(-) diff --git a/internal/generator/vector/conf_test.go b/internal/generator/vector/conf_test.go index d60eb8c60..b35b8d4d7 100644 --- a/internal/generator/vector/conf_test.go +++ b/internal/generator/vector/conf_test.go @@ -1781,6 +1781,462 @@ inputs = ["add_nodename_to_metric"] address = "[::]:24231" default_namespace = "collector" +[sinks.prometheus_output.tls] +enabled = true +key_file = "/etc/collector/metrics/tls.key" +crt_file = "/etc/collector/metrics/tls.crt" +min_tls_version = "VersionTLS12" +ciphersuites = "TLS_AES_128_GCM_SHA256,TLS_AES_256_GCM_SHA384,TLS_CHACHA20_POLY1305_SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-CHACHA20-POLY1305,ECDHE-RSA-CHACHA20-POLY1305,DHE-RSA-AES128-GCM-SHA256,DHE-RSA-AES256-GCM-SHA384" +`, + }), + + Entry("with complex spec for elasticsearch, pipeline name with spaces", testhelpers.ConfGenerateTest{ + CLSpec: logging.CollectionSpec{}, + CLFSpec: logging.ClusterLogForwarderSpec{ + Pipelines: []logging.PipelineSpec{ + { + InputRefs: []string{ + logging.InputNameApplication, + logging.InputNameInfrastructure, + logging.InputNameAudit}, + OutputRefs: []string{"es-1"}, + Name: "pipeline with space", + }, + }, + Outputs: []logging.OutputSpec{ + { + Type: logging.OutputTypeElasticsearch, + Name: "es-1", + URL: "https://es-1.svc.messaging.cluster.local:9200", + Secret: &logging.OutputSecretSpec{ + Name: "es-1", + }, + }, + }, + }, + Secrets: map[string]*corev1.Secret{ + "es-1": { + Data: map[string][]byte{ + "tls.key": []byte("junk"), + "tls.crt": []byte("junk"), + "ca-bundle.crt": []byte("junk"), + }, + }, + }, + ExpectedConf: ` +# Logs from containers (including openshift containers) +[sources.raw_container_logs] +type = "kubernetes_logs" +glob_minimum_cooldown_ms = 15000 +auto_partial_merge = true +exclude_paths_glob_patterns = ["/var/log/pods/openshift-logging_collector-*/*/*.log", "/var/log/pods/openshift-logging_elasticsearch-*/*/*.log", "/var/log/pods/openshift-logging_*/loki*/*.log", "/var/log/pods/openshift-logging_kibana-*/*/*.log", "/var/log/pods/openshift-logging_*/gateway/*.log", "/var/log/pods/openshift-logging_*/opa/*.log", "/var/log/pods/*/*/*.gz", "/var/log/pods/*/*/*.tmp"] +pod_annotation_fields.pod_labels = "kubernetes.labels" +pod_annotation_fields.pod_namespace = "kubernetes.namespace_name" +pod_annotation_fields.pod_annotations = "kubernetes.annotations" +pod_annotation_fields.pod_uid = "kubernetes.pod_id" +pod_annotation_fields.pod_node_name = "hostname" + +[sources.raw_journal_logs] +type = "journald" +journal_directory = "/var/log/journal" + +# Logs from host audit +[sources.raw_host_audit_logs] +type = "file" +include = ["/var/log/audit/audit.log"] +host_key = "hostname" +glob_minimum_cooldown_ms = 15000 + +# Logs from kubernetes audit +[sources.raw_k8s_audit_logs] +type = "file" +include = ["/var/log/kube-apiserver/audit.log"] +host_key = "hostname" +glob_minimum_cooldown_ms = 15000 + +# Logs from openshift audit +[sources.raw_openshift_audit_logs] +type = "file" +include = ["/var/log/oauth-apiserver/audit.log","/var/log/openshift-apiserver/audit.log","/var/log/oauth-server/audit.log"] +host_key = "hostname" +glob_minimum_cooldown_ms = 15000 + +# Logs from ovn audit +[sources.raw_ovn_audit_logs] +type = "file" +include = ["/var/log/ovn/acl-audit-log.log"] +host_key = "hostname" +glob_minimum_cooldown_ms = 15000 + +[sources.internal_metrics] +type = "internal_metrics" + +[transforms.container_logs] +type = "remap" +inputs = ["raw_container_logs"] +source = ''' + .openshift.cluster_id = "${OPENSHIFT_CLUSTER_ID:-}" + if !exists(.level) { + .level = "default" + if match!(.message, r'Info|INFO|^I[0-9]+|level=info|Value:info|"level":"info"|') { + .level = "info" + } else if match!(.message, r'Warning|WARN|^W[0-9]+|level=warn|Value:warn|"level":"warn"|') { + .level = "warn" + } else if match!(.message, r'Error|ERROR|^E[0-9]+|level=error|Value:error|"level":"error"|') { + .level = "error" + } else if match!(.message, r'Critical|CRITICAL|^C[0-9]+|level=critical|Value:critical|"level":"critical"|') { + .level = "critical" + } else if match!(.message, r'Debug|DEBUG|^D[0-9]+|level=debug|Value:debug|"level":"debug"|') { + .level = "debug" + } else if match!(.message, r'Notice|NOTICE|^N[0-9]+|level=notice|Value:notice|"level":"notice"|') { + .level = "notice" + } else if match!(.message, r'Alert|ALERT|^A[0-9]+|level=alert|Value:alert|"level":"alert"|') { + .level = "alert" + } else if match!(.message, r'Emergency|EMERGENCY|^EM[0-9]+|level=emergency|Value:emergency|"level":"emergency"|') { + .level = "emergency" + } + } + del(.source_type) + del(.stream) + del(.kubernetes.pod_ips) + ts = del(.timestamp); if !exists(."@timestamp") {."@timestamp" = ts} +''' + +[transforms.drop_journal_logs] +type = "filter" +inputs = ["raw_journal_logs"] +condition = ".PRIORITY != \"7\" && .PRIORITY != 7" + +[transforms.journal_logs] +type = "remap" +inputs = ["drop_journal_logs"] +source = ''' + .openshift.cluster_id = "${OPENSHIFT_CLUSTER_ID:-}" + + .tag = ".journal.system" + + del(.source_type) + del(._CPU_USAGE_NSEC) + del(.__REALTIME_TIMESTAMP) + del(.__MONOTONIC_TIMESTAMP) + del(._SOURCE_REALTIME_TIMESTAMP) + del(.JOB_RESULT) + del(.JOB_TYPE) + del(.TIMESTAMP_BOOTTIME) + del(.TIMESTAMP_MONOTONIC) + + if .PRIORITY == "8" || .PRIORITY == 8 { + .level = "trace" + } else { + priority = to_int!(.PRIORITY) + .level, err = to_syslog_level(priority) + if err != null { + log("Unable to determine level from PRIORITY: " + err, level: "error") + log(., level: "error") + .level = "unknown" + } else { + del(.PRIORITY) + } + } + + .hostname = del(.host) + + # systemd’s kernel-specific metadata. + # .systemd.k = {} + if exists(.KERNEL_DEVICE) { .systemd.k.KERNEL_DEVICE = del(.KERNEL_DEVICE) } + if exists(.KERNEL_SUBSYSTEM) { .systemd.k.KERNEL_SUBSYSTEM = del(.KERNEL_SUBSYSTEM) } + if exists(.UDEV_DEVLINK) { .systemd.k.UDEV_DEVLINK = del(.UDEV_DEVLINK) } + if exists(.UDEV_DEVNODE) { .systemd.k.UDEV_DEVNODE = del(.UDEV_DEVNODE) } + if exists(.UDEV_SYSNAME) { .systemd.k.UDEV_SYSNAME = del(.UDEV_SYSNAME) } + + # trusted journal fields, fields that are implicitly added by the journal and cannot be altered by client code. + .systemd.t = {} + if exists(._AUDIT_LOGINUID) { .systemd.t.AUDIT_LOGINUID = del(._AUDIT_LOGINUID) } + if exists(._BOOT_ID) { .systemd.t.BOOT_ID = del(._BOOT_ID) } + if exists(._AUDIT_SESSION) { .systemd.t.AUDIT_SESSION = del(._AUDIT_SESSION) } + if exists(._CAP_EFFECTIVE) { .systemd.t.CAP_EFFECTIVE = del(._CAP_EFFECTIVE) } + if exists(._CMDLINE) { .systemd.t.CMDLINE = del(._CMDLINE) } + if exists(._COMM) { .systemd.t.COMM = del(._COMM) } + if exists(._EXE) { .systemd.t.EXE = del(._EXE) } + if exists(._GID) { .systemd.t.GID = del(._GID) } + if exists(._HOSTNAME) { .systemd.t.HOSTNAME = .hostname } + if exists(._LINE_BREAK) { .systemd.t.LINE_BREAK = del(._LINE_BREAK) } + if exists(._MACHINE_ID) { .systemd.t.MACHINE_ID = del(._MACHINE_ID) } + if exists(._PID) { .systemd.t.PID = del(._PID) } + if exists(._SELINUX_CONTEXT) { .systemd.t.SELINUX_CONTEXT = del(._SELINUX_CONTEXT) } + if exists(._SOURCE_REALTIME_TIMESTAMP) { .systemd.t.SOURCE_REALTIME_TIMESTAMP = del(._SOURCE_REALTIME_TIMESTAMP) } + if exists(._STREAM_ID) { .systemd.t.STREAM_ID = ._STREAM_ID } + if exists(._SYSTEMD_CGROUP) { .systemd.t.SYSTEMD_CGROUP = del(._SYSTEMD_CGROUP) } + if exists(._SYSTEMD_INVOCATION_ID) {.systemd.t.SYSTEMD_INVOCATION_ID = ._SYSTEMD_INVOCATION_ID} + if exists(._SYSTEMD_OWNER_UID) { .systemd.t.SYSTEMD_OWNER_UID = del(._SYSTEMD_OWNER_UID) } + if exists(._SYSTEMD_SESSION) { .systemd.t.SYSTEMD_SESSION = del(._SYSTEMD_SESSION) } + if exists(._SYSTEMD_SLICE) { .systemd.t.SYSTEMD_SLICE = del(._SYSTEMD_SLICE) } + if exists(._SYSTEMD_UNIT) { .systemd.t.SYSTEMD_UNIT = del(._SYSTEMD_UNIT) } + if exists(._SYSTEMD_USER_UNIT) { .systemd.t.SYSTEMD_USER_UNIT = del(._SYSTEMD_USER_UNIT) } + if exists(._TRANSPORT) { .systemd.t.TRANSPORT = del(._TRANSPORT) } + if exists(._UID) { .systemd.t.UID = del(._UID) } + + # fields that are directly passed from clients and stored in the journal. + .systemd.u = {} + if exists(.CODE_FILE) { .systemd.u.CODE_FILE = del(.CODE_FILE) } + if exists(.CODE_FUNC) { .systemd.u.CODE_FUNCTION = del(.CODE_FUNC) } + if exists(.CODE_LINE) { .systemd.u.CODE_LINE = del(.CODE_LINE) } + if exists(.ERRNO) { .systemd.u.ERRNO = del(.ERRNO) } + if exists(.MESSAGE_ID) { .systemd.u.MESSAGE_ID = del(.MESSAGE_ID) } + if exists(.SYSLOG_FACILITY) { .systemd.u.SYSLOG_FACILITY = del(.SYSLOG_FACILITY) } + if exists(.SYSLOG_IDENTIFIER) { .systemd.u.SYSLOG_IDENTIFIER = del(.SYSLOG_IDENTIFIER) } + if exists(.SYSLOG_PID) { .systemd.u.SYSLOG_PID = del(.SYSLOG_PID) } + if exists(.RESULT) { .systemd.u.RESULT = del(.RESULT) } + if exists(.UNIT) { .systemd.u.UNIT = del(.UNIT) } + + .time = format_timestamp!(.timestamp, format: "%FT%T%:z") + + ts = del(.timestamp); if !exists(."@timestamp") {."@timestamp" = ts} +''' + +[transforms.host_audit_logs] +type = "remap" +inputs = ["raw_host_audit_logs"] +source = ''' + .openshift.cluster_id = "${OPENSHIFT_CLUSTER_ID:-}" + .tag = ".linux-audit.log" + + match1 = parse_regex(.message, r'type=(?P[^ ]+)') ?? {} + envelop = {} + envelop |= {"type": match1.type} + + match2, err = parse_regex(.message, r'msg=audit\((?P[^ ]+)\):') + if err == null { + sp = split(match2.ts_record,":") + if length(sp) == 2 { + ts = parse_timestamp(sp[0],"%s.%3f") ?? "" + envelop |= {"record_id": sp[1]} + . |= {"audit.linux" : envelop} + . |= {"@timestamp" : format_timestamp(ts,"%+") ?? ""} + } + } else { + log("could not parse host audit msg. err=" + err, rate_limit_secs: 0) + } + + .level = "default" +''' + +[transforms.k8s_audit_logs] +type = "remap" +inputs = ["raw_k8s_audit_logs"] +source = ''' + .openshift.cluster_id = "${OPENSHIFT_CLUSTER_ID:-}" + .tag = ".k8s-audit.log" + . = merge(., parse_json!(string!(.message))) ?? . + del(.message) + .k8s_audit_level = .level + .level = "default" +''' + +[transforms.openshift_audit_logs] +type = "remap" +inputs = ["raw_openshift_audit_logs"] +source = ''' + .openshift.cluster_id = "${OPENSHIFT_CLUSTER_ID:-}" + .tag = ".openshift-audit.log" + . = merge(., parse_json!(string!(.message))) ?? . + del(.message) + .openshift_audit_level = .level + .level = "default" +''' + +[transforms.ovn_audit_logs] +type = "remap" +inputs = ["raw_ovn_audit_logs"] +source = ''' + .openshift.cluster_id = "${OPENSHIFT_CLUSTER_ID:-}" + .tag = ".ovn-audit.log" + if !exists(.level) { + .level = "default" + if match!(.message, r'Info|INFO|^I[0-9]+|level=info|Value:info|"level":"info"|') { + .level = "info" + } else if match!(.message, r'Warning|WARN|^W[0-9]+|level=warn|Value:warn|"level":"warn"|') { + .level = "warn" + } else if match!(.message, r'Error|ERROR|^E[0-9]+|level=error|Value:error|"level":"error"|') { + .level = "error" + } else if match!(.message, r'Critical|CRITICAL|^C[0-9]+|level=critical|Value:critical|"level":"critical"|') { + .level = "critical" + } else if match!(.message, r'Debug|DEBUG|^D[0-9]+|level=debug|Value:debug|"level":"debug"|') { + .level = "debug" + } else if match!(.message, r'Notice|NOTICE|^N[0-9]+|level=notice|Value:notice|"level":"notice"|') { + .level = "notice" + } else if match!(.message, r'Alert|ALERT|^A[0-9]+|level=alert|Value:alert|"level":"alert"|') { + .level = "alert" + } else if match!(.message, r'Emergency|EMERGENCY|^EM[0-9]+|level=emergency|Value:emergency|"level":"emergency"|') { + .level = "emergency" + } + } +''' + +[transforms.route_container_logs] +type = "route" +inputs = ["container_logs"] +route.app = '!((starts_with!(.kubernetes.namespace_name,"kube-")) || (starts_with!(.kubernetes.namespace_name,"openshift-")) || (.kubernetes.namespace_name == "default") || (.kubernetes.namespace_name == "openshift") || (.kubernetes.namespace_name == "kube"))' +route.infra = '(starts_with!(.kubernetes.namespace_name,"kube-")) || (starts_with!(.kubernetes.namespace_name,"openshift-")) || (.kubernetes.namespace_name == "default") || (.kubernetes.namespace_name == "openshift") || (.kubernetes.namespace_name == "kube")' + +# Set log_type to "application" +[transforms.application] +type = "remap" +inputs = ["route_container_logs.app"] +source = ''' + .log_type = "application" +''' + +# Set log_type to "infrastructure" +[transforms.infrastructure] +type = "remap" +inputs = ["route_container_logs.infra","journal_logs"] +source = ''' + .log_type = "infrastructure" +''' + +# Set log_type to "audit" +[transforms.audit] +type = "remap" +inputs = ["host_audit_logs","k8s_audit_logs","openshift_audit_logs","ovn_audit_logs"] +source = ''' + .log_type = "audit" + .hostname = get_env_var("VECTOR_SELF_NODE_NAME") ?? "" + ts = del(.timestamp); if !exists(."@timestamp") {."@timestamp" = ts} +''' + +[transforms.pipeline_with_space] +type = "remap" +inputs = ["application","infrastructure","audit"] +source = ''' + . +''' + +# Set Elasticsearch index +[transforms.es_1_add_es_index] +type = "remap" +inputs = ["pipeline_with_space"] +source = ''' + index = "default" + if (.log_type == "application"){ + index = "app" + } + if (.log_type == "infrastructure"){ + index = "infra" + } + if (.log_type == "audit"){ + index = "audit" + } + .write_index = index + "-write" + ._id = encode_base64(uuid_v4()) + del(.file) + del(.tag) + del(.source_type) +''' + +[transforms.es_1_dedot_and_flatten] +type = "lua" +inputs = ["es_1_add_es_index"] +version = "2" +hooks.init = "init" +hooks.process = "process" +source = ''' + function init() + count = 0 + end + function process(event, emit) + count = count + 1 + event.log.openshift.sequence = count + if event.log.kubernetes == nil then + emit(event) + return + end + if event.log.kubernetes.labels == nil then + emit(event) + return + end + dedot(event.log.kubernetes.namespace_labels) + dedot(event.log.kubernetes.labels) + flatten_labels(event) + prune_labels(event) + emit(event) + end + + function dedot(map) + if map == nil then + return + end + local new_map = {} + local changed_keys = {} + for k, v in pairs(map) do + local dedotted = string.gsub(k, "[./]", "_") + if dedotted ~= k then + new_map[dedotted] = v + changed_keys[k] = true + end + end + for k in pairs(changed_keys) do + map[k] = nil + end + for k, v in pairs(new_map) do + map[k] = v + end + end + + function flatten_labels(event) + -- create "flat_labels" key + event.log.kubernetes.flat_labels = {} + i = 1 + -- flatten the labels + for k,v in pairs(event.log.kubernetes.labels) do + event.log.kubernetes.flat_labels[i] = k.."="..v + i=i+1 + end + end + + function prune_labels(event) + local exclusions = {"app_kubernetes_io_name", "app_kubernetes_io_instance", "app_kubernetes_io_version", "app_kubernetes_io_component", "app_kubernetes_io_part-of", "app_kubernetes_io_managed-by", "app_kubernetes_io_created-by"} + local keys = {} + for k,v in pairs(event.log.kubernetes.labels) do + for index, e in pairs(exclusions) do + if k == e then + keys[k] = v + end + end + end + event.log.kubernetes.labels = keys + end +''' + +[sinks.es_1] +type = "elasticsearch" +inputs = ["es_1_dedot_and_flatten"] +endpoint = "https://es-1.svc.messaging.cluster.local:9200" +bulk.index = "{{ write_index }}" +bulk.action = "create" +encoding.except_fields = ["write_index"] +request.timeout_secs = 2147483648 +id_key = "_id" + +[sinks.es_1.tls] +enabled = true +key_file = "/var/run/ocp-collector/secrets/es-1/tls.key" +crt_file = "/var/run/ocp-collector/secrets/es-1/tls.crt" +ca_file = "/var/run/ocp-collector/secrets/es-1/ca-bundle.crt" + +[transforms.add_nodename_to_metric] +type = "remap" +inputs = ["internal_metrics"] +source = ''' +.tags.hostname = get_env_var!("VECTOR_SELF_NODE_NAME") +''' + +[sinks.prometheus_output] +type = "prometheus_exporter" +inputs = ["add_nodename_to_metric"] +address = "[::]:24231" +default_namespace = "collector" + [sinks.prometheus_output.tls] enabled = true key_file = "/etc/collector/metrics/tls.key" diff --git a/internal/generator/vector/output/gcl/gcl.go b/internal/generator/vector/output/gcl/gcl.go index 42abafdf1..d16bc2172 100644 --- a/internal/generator/vector/output/gcl/gcl.go +++ b/internal/generator/vector/output/gcl/gcl.go @@ -2,7 +2,6 @@ package gcl import ( "fmt" - "strings" logging "github.com/openshift/cluster-logging-operator/apis/logging/v1" @@ -65,7 +64,7 @@ node_name = "{{"{{hostname}}"}}" func Conf(o logging.OutputSpec, inputs []string, secret *corev1.Secret, op Options) []Element { if genhelper.IsDebugOutput(op) { return []Element{ - Debug(strings.ToLower(vectorhelpers.Replacer.Replace(o.Name)), vectorhelpers.MakeInputs(inputs...)), + Debug(vectorhelpers.FormatComponentID(o.Name), vectorhelpers.MakeInputs(inputs...)), } } if o.GoogleCloudLogging == nil { diff --git a/internal/generator/vector/output/http/http.go b/internal/generator/vector/output/http/http.go index a2149cde7..f51a7a0a5 100644 --- a/internal/generator/vector/output/http/http.go +++ b/internal/generator/vector/output/http/http.go @@ -2,7 +2,6 @@ package http import ( "fmt" - "strings" logging "github.com/openshift/cluster-logging-operator/apis/logging/v1" "github.com/openshift/cluster-logging-operator/internal/constants" @@ -111,12 +110,12 @@ func Normalize(componentID string, inputs []string) Element { func Conf(o logging.OutputSpec, inputs []string, secret *corev1.Secret, op Options) []Element { outputName := helpers.FormatComponentID(o.Name) - component := strings.ToLower(vectorhelpers.Replacer.Replace(fmt.Sprintf("%s_%s", o.Name, NormalizeHttp))) + component := fmt.Sprintf("%s_%s", outputName, NormalizeHttp) dedottedID := normalize.ID(outputName, "dedot") if genhelper.IsDebugOutput(op) { return []Element{ Normalize(component, inputs), - Debug(strings.ToLower(vectorhelpers.Replacer.Replace(o.Name)), component), + Debug(vectorhelpers.FormatComponentID(o.Name), component), } } return MergeElements( @@ -135,7 +134,7 @@ func Conf(o logging.OutputSpec, inputs []string, secret *corev1.Secret, op Optio func Output(o logging.OutputSpec, inputs []string, secret *corev1.Secret, op Options) Element { return Http{ - ComponentID: strings.ToLower(vectorhelpers.Replacer.Replace(o.Name)), + ComponentID: vectorhelpers.FormatComponentID(o.Name), Inputs: vectorhelpers.MakeInputs(inputs...), URI: o.URL, Method: Method(o.Http), @@ -175,7 +174,7 @@ func Request(o logging.OutputSpec) Element { timeout = o.Http.Timeout } return HttpRequest{ - ComponentID: strings.ToLower(vectorhelpers.Replacer.Replace(o.Name)), + ComponentID: vectorhelpers.FormatComponentID(o.Name), Timeout: timeout, Headers: Headers(o), } @@ -190,7 +189,7 @@ func Headers(o logging.OutputSpec) Element { func Encoding(o logging.OutputSpec) Element { return HttpEncoding{ - ComponentID: strings.ToLower(vectorhelpers.Replacer.Replace(o.Name)), + ComponentID: vectorhelpers.FormatComponentID(o.Name), Codec: httpEncodingJson, } } @@ -211,7 +210,7 @@ func BasicAuth(o logging.OutputSpec, secret *corev1.Secret) []Element { hasBasicAuth := false conf = append(conf, BasicAuthConf{ Desc: "Basic Auth Config", - ComponentID: strings.ToLower(vectorhelpers.Replacer.Replace(o.Name)), + ComponentID: vectorhelpers.FormatComponentID(o.Name), }) if security.HasUsernamePassword(secret) { hasBasicAuth = true @@ -237,7 +236,7 @@ func BearerTokenAuth(o logging.OutputSpec, secret *corev1.Secret) []Element { if security.HasBearerTokenFileKey(secret) { conf = append(conf, BasicAuthConf{ Desc: "Bearer Auth Config", - ComponentID: strings.ToLower(vectorhelpers.Replacer.Replace(o.Name)), + ComponentID: vectorhelpers.FormatComponentID(o.Name), }, BearerToken{ Token: security.GetFromSecret(secret, constants.BearerTokenFileKey), }) diff --git a/internal/generator/vector/output/kafka/kafka.go b/internal/generator/vector/output/kafka/kafka.go index 766e97403..bf90853b8 100644 --- a/internal/generator/vector/output/kafka/kafka.go +++ b/internal/generator/vector/output/kafka/kafka.go @@ -49,7 +49,7 @@ topic = {{.Topic}} func Conf(o logging.OutputSpec, inputs []string, secret *corev1.Secret, op Options) []Element { if genhelper.IsDebugOutput(op) { return []Element{ - Debug(strings.ToLower(vectorhelpers.Replacer.Replace(o.Name)), vectorhelpers.MakeInputs(inputs...)), + Debug(vectorhelpers.FormatComponentID(o.Name), vectorhelpers.MakeInputs(inputs...)), } } diff --git a/internal/generator/vector/output/loki/loki.go b/internal/generator/vector/output/loki/loki.go index 3aa6c5ee6..483516576 100644 --- a/internal/generator/vector/output/loki/loki.go +++ b/internal/generator/vector/output/loki/loki.go @@ -109,7 +109,7 @@ func (l LokiLabels) Template() string { func Conf(o logging.OutputSpec, inputs []string, secret *corev1.Secret, op Options) []Element { if genhelper.IsDebugOutput(op) { return []Element{ - Debug(strings.ToLower(vectorhelpers.Replacer.Replace(o.Name)), vectorhelpers.MakeInputs(inputs...)), + Debug(vectorhelpers.FormatComponentID(o.Name), vectorhelpers.MakeInputs(inputs...)), } } outputName := helpers.FormatComponentID(o.Name) @@ -131,7 +131,7 @@ func Conf(o logging.OutputSpec, inputs []string, secret *corev1.Secret, op Optio func Output(o logging.OutputSpec, inputs []string) Element { return Loki{ - ComponentID: strings.ToLower(vectorhelpers.Replacer.Replace(o.Name)), + ComponentID: vectorhelpers.FormatComponentID(o.Name), Inputs: vectorhelpers.MakeInputs(inputs...), Endpoint: o.URL, TenantID: Tenant(o.Loki), @@ -140,7 +140,7 @@ func Output(o logging.OutputSpec, inputs []string) Element { func Encoding(o logging.OutputSpec) Element { return LokiEncoding{ - ComponentID: strings.ToLower(vectorhelpers.Replacer.Replace(o.Name)), + ComponentID: vectorhelpers.FormatComponentID(o.Name), Codec: lokiEncodingJson, } } @@ -177,7 +177,7 @@ func lokiLabels(lo *logging.Loki) []Label { func Labels(o logging.OutputSpec) Element { return LokiLabels{ - ComponentID: strings.ToLower(vectorhelpers.Replacer.Replace(o.Name)), + ComponentID: vectorhelpers.FormatComponentID(o.Name), Labels: lokiLabels(o.Loki), } } @@ -198,7 +198,7 @@ func TLSConf(o logging.OutputSpec, secret *corev1.Secret, op Options) []Element } else if secret != nil { // Set CA from logcollector ServiceAccount for internal Loki tlsConf := security.TLSConf{ - ComponentID: strings.ToLower(vectorhelpers.Replacer.Replace(o.Name)), + ComponentID: vectorhelpers.FormatComponentID(o.Name), CAFilePath: `"/var/run/secrets/kubernetes.io/serviceaccount/service-ca.crt"`, } tlsConf.SetTLSProfileFromOptions(op) @@ -216,7 +216,7 @@ func BasicAuth(o logging.OutputSpec, secret *corev1.Secret) []Element { hasBasicAuth := false conf = append(conf, BasicAuthConf{ Desc: "Basic Auth Config", - ComponentID: strings.ToLower(vectorhelpers.Replacer.Replace(o.Name)), + ComponentID: vectorhelpers.FormatComponentID(o.Name), }) if security.HasUsernamePassword(secret) { hasBasicAuth = true @@ -242,7 +242,7 @@ func BearerTokenAuth(o logging.OutputSpec, secret *corev1.Secret) []Element { if security.HasBearerTokenFileKey(secret) { conf = append(conf, BasicAuthConf{ Desc: "Bearer Auth Config", - ComponentID: strings.ToLower(vectorhelpers.Replacer.Replace(o.Name)), + ComponentID: vectorhelpers.FormatComponentID(o.Name), }, BearerToken{ Token: security.GetFromSecret(secret, constants.BearerTokenFileKey), }) diff --git a/internal/generator/vector/output/splunk/splunk.go b/internal/generator/vector/output/splunk/splunk.go index 1d2a78f0c..bc14e76a3 100644 --- a/internal/generator/vector/output/splunk/splunk.go +++ b/internal/generator/vector/output/splunk/splunk.go @@ -2,7 +2,6 @@ package splunk import ( "fmt" - "strings" "github.com/openshift/cluster-logging-operator/internal/generator/vector/normalize" @@ -61,7 +60,7 @@ codec = {{.Codec}} func Conf(o logging.OutputSpec, inputs []string, secret *corev1.Secret, op Options) []Element { if genhelper.IsDebugOutput(op) { return []Element{ - Debug(strings.ToLower(vectorhelpers.Replacer.Replace(o.Name)), vectorhelpers.MakeInputs(inputs...)), + Debug(vectorhelpers.FormatComponentID(o.Name), vectorhelpers.MakeInputs(inputs...)), } } outputName := vectorhelpers.FormatComponentID(o.Name) @@ -78,7 +77,7 @@ func Conf(o logging.OutputSpec, inputs []string, secret *corev1.Secret, op Optio func Output(o logging.OutputSpec, inputs []string, secret *corev1.Secret, op Options) Element { return Splunk{ - ComponentID: strings.ToLower(vectorhelpers.Replacer.Replace(o.Name)), + ComponentID: vectorhelpers.FormatComponentID(o.Name), Inputs: vectorhelpers.MakeInputs(inputs...), Endpoint: o.URL, DefaultToken: security.GetFromSecret(secret, "hecToken"), @@ -87,7 +86,7 @@ func Output(o logging.OutputSpec, inputs []string, secret *corev1.Secret, op Opt func Encoding(o logging.OutputSpec) Element { return SplunkEncoding{ - ComponentID: strings.ToLower(vectorhelpers.Replacer.Replace(o.Name)), + ComponentID: vectorhelpers.FormatComponentID(o.Name), Codec: splunkEncodingJson, } } diff --git a/internal/generator/vector/pipelines.go b/internal/generator/vector/pipelines.go index 0d49cc85e..6facce484 100644 --- a/internal/generator/vector/pipelines.go +++ b/internal/generator/vector/pipelines.go @@ -22,8 +22,12 @@ var ( func Pipelines(spec *logging.ClusterLogForwarderSpec, op generator.Options) []generator.Element { el := []generator.Element{} userDefined := spec.InputMap() - for _, p := range spec.Pipelines { + for i, p := range spec.Pipelines { inputs := []string{} + // Sanitize pipeline name and persist it for config generation + p.Name = helpers.FormatComponentID(p.Name) + spec.Pipelines[i].Name = p.Name + for _, inputName := range p.InputRefs { if _, ok := userDefined[inputName]; ok { inputs = append(inputs, fmt.Sprintf(UserDefinedInput, inputName)) diff --git a/internal/generator/vector/sources_to_pipelines_test.go b/internal/generator/vector/sources_to_pipelines_test.go index 4bf907f75..39a4c8163 100644 --- a/internal/generator/vector/sources_to_pipelines_test.go +++ b/internal/generator/vector/sources_to_pipelines_test.go @@ -378,6 +378,61 @@ inputs = ["detect_exceptions_pipeline"] source = ''' . ''' +`, + }), + Entry("pipeline with spaces are sanitized", helpers.ConfGenerateTest{ + CLFSpec: logging.ClusterLogForwarderSpec{ + Pipelines: []logging.PipelineSpec{ + { + InputRefs: []string{ + logging.InputNameApplication, + logging.InputNameInfrastructure, + logging.InputNameAudit, + }, + OutputRefs: []string{logging.OutputNameDefault}, + Name: "pipeline with space", + }, + }, + }, + ExpectedConf: ` +[transforms.route_container_logs] +type = "route" +inputs = ["container_logs"] +route.app = '!((starts_with!(.kubernetes.namespace_name,"kube-")) || (starts_with!(.kubernetes.namespace_name,"openshift-")) || (.kubernetes.namespace_name == "default") || (.kubernetes.namespace_name == "openshift") || (.kubernetes.namespace_name == "kube"))' +route.infra = '(starts_with!(.kubernetes.namespace_name,"kube-")) || (starts_with!(.kubernetes.namespace_name,"openshift-")) || (.kubernetes.namespace_name == "default") || (.kubernetes.namespace_name == "openshift") || (.kubernetes.namespace_name == "kube")' + +# Set log_type to "application" +[transforms.application] +type = "remap" +inputs = ["route_container_logs.app"] +source = ''' + .log_type = "application" +''' + +# Set log_type to "infrastructure" +[transforms.infrastructure] +type = "remap" +inputs = ["route_container_logs.infra","journal_logs"] +source = ''' + .log_type = "infrastructure" +''' + +# Set log_type to "audit" +[transforms.audit] +type = "remap" +inputs = ["host_audit_logs","k8s_audit_logs","openshift_audit_logs","ovn_audit_logs"] +source = ''' + .log_type = "audit" + .hostname = get_env_var("VECTOR_SELF_NODE_NAME") ?? "" + ts = del(.timestamp); if !exists(."@timestamp") {."@timestamp" = ts} +''' + +[transforms.pipeline_with_space] +type = "remap" +inputs = ["application","infrastructure","audit"] +source = ''' + . +''' `, }), )