diff --git a/internal/generator/vector/conf/outputs.go b/internal/generator/vector/conf/outputs.go index ba3288ea77..090d968f6f 100644 --- a/internal/generator/vector/conf/outputs.go +++ b/internal/generator/vector/conf/outputs.go @@ -3,13 +3,11 @@ package conf import ( "fmt" "github.com/openshift/cluster-logging-operator/internal/generator/framework" + "github.com/openshift/cluster-logging-operator/internal/generator/vector/output/common" + "github.com/openshift/cluster-logging-operator/internal/generator/vector/output/metrics" "github.com/openshift/cluster-logging-operator/internal/generator/vector/source" - "github.com/openshift/cluster-logging-operator/internal/generator/vector/normalize" - - log "github.com/ViaQ/logerr/v2/log/static" logging "github.com/openshift/cluster-logging-operator/apis/logging/v1" - "github.com/openshift/cluster-logging-operator/internal/constants" "github.com/openshift/cluster-logging-operator/internal/generator/vector/helpers" "github.com/openshift/cluster-logging-operator/internal/generator/vector/output/cloudwatch" "github.com/openshift/cluster-logging-operator/internal/generator/vector/output/elasticsearch" @@ -22,12 +20,6 @@ import ( corev1 "k8s.io/api/core/v1" ) -var ( - SinkTransformThrottle = "sink_throttle" - - UserDefinedSinkThrottle = fmt.Sprintf(`%s_%%s`, SinkTransformThrottle) -) - func OutputFromPipelines(spec *logging.ClusterLogForwarderSpec, op framework.Options) logging.RouteMap { r := logging.RouteMap{} for _, p := range spec.Pipelines { @@ -38,51 +30,19 @@ func OutputFromPipelines(spec *logging.ClusterLogForwarderSpec, op framework.Opt return r } -func AddThrottleForSink(spec *logging.OutputSpec, inputs []string) []framework.Element { - el := []framework.Element{} - - el = append(el, normalize.Throttle{ - ComponentID: fmt.Sprintf(UserDefinedSinkThrottle, spec.Name), - Inputs: helpers.MakeInputs(inputs...), - Threshold: spec.Limit.MaxRecordsPerSecond, - KeyField: "", - }) - - return el -} - func Outputs(clspec *logging.CollectionSpec, secrets map[string]*corev1.Secret, clfspec *logging.ClusterLogForwarderSpec, op framework.Options) []framework.Element { outputs := []framework.Element{} ofp := OutputFromPipelines(clfspec, op) for idx, o := range clfspec.Outputs { - var secret *corev1.Secret - if s, ok := secrets[o.Name]; ok { - secret = s - log.V(9).Info("Using secret configured in output: " + o.Name) - } else { - secret = secrets[constants.LogCollectorToken] - if secret != nil { - log.V(9).Info("Using secret configured in " + constants.LogCollectorToken) - } else { - log.V(9).Info("No Secret found in " + constants.LogCollectorToken) - } - } - - if o.Name == logging.OutputNameDefault && o.Type == logging.OutputTypeElasticsearch { - op[framework.MinTLSVersion] = "" - op[framework.Ciphers] = "" - } else { - outMinTlsVersion, outCiphers := op.TLSProfileInfo(o, ",") - op[framework.MinTLSVersion] = outMinTlsVersion - op[framework.Ciphers] = outCiphers - } + secret := helpers.GetOutputSecret(o, secrets) + helpers.SetTLSProfileOptions(o, op) inputs := ofp[o.Name].List() if o.HasPolicy() && o.GetMaxRecordsPerSecond() > 0 { // Vector Throttle component cannot have zero threshold - outputs = append(outputs, AddThrottleForSink(&clfspec.Outputs[idx], inputs)...) - inputs = []string{fmt.Sprintf(UserDefinedSinkThrottle, o.Name)} + outputs = append(outputs, common.AddThrottleForSink(&clfspec.Outputs[idx], inputs)...) + inputs = []string{fmt.Sprintf(common.UserDefinedSinkThrottle, o.Name)} } if !o.HasPolicy() || (o.HasPolicy() && o.GetMaxRecordsPerSecond() > 0) { @@ -109,24 +69,7 @@ func Outputs(clspec *logging.CollectionSpec, secrets map[string]*corev1.Secret, minTlsVersion, cipherSuites := op.TLSProfileInfo(logging.OutputSpec{}, ",") outputs = append(outputs, - AddNodeNameToMetric(source.AddNodenameToMetricTransformName, []string{source.InternalMetricsSourceName}), - PrometheusOutput(source.PrometheusOutputSinkName, []string{source.AddNodenameToMetricTransformName}, minTlsVersion, cipherSuites)) + metrics.AddNodeNameToMetric(metrics.AddNodenameToMetricTransformName, []string{source.InternalMetricsSourceName}), + metrics.PrometheusOutput(metrics.PrometheusOutputSinkName, []string{metrics.AddNodenameToMetricTransformName}, minTlsVersion, cipherSuites)) return outputs } - -func PrometheusOutput(id string, inputs []string, minTlsVersion string, cipherSuites string) framework.Element { - return source.PrometheusExporter{ - ID: id, - Inputs: helpers.MakeInputs(inputs...), - Address: helpers.ListenOnAllLocalInterfacesAddress() + `:` + source.PrometheusExporterListenPort, - TlsMinVersion: minTlsVersion, - CipherSuites: cipherSuites, - } -} - -func AddNodeNameToMetric(id string, inputs []string) framework.Element { - return source.AddNodenameToMetric{ - ID: id, - Inputs: helpers.MakeInputs(inputs...), - } -} diff --git a/internal/generator/vector/helpers/secrets.go b/internal/generator/vector/helpers/secrets.go new file mode 100644 index 0000000000..61e4e42353 --- /dev/null +++ b/internal/generator/vector/helpers/secrets.go @@ -0,0 +1,21 @@ +package helpers + +import ( + log "github.com/ViaQ/logerr/v2/log/static" + logging "github.com/openshift/cluster-logging-operator/apis/logging/v1" + "github.com/openshift/cluster-logging-operator/internal/constants" + corev1 "k8s.io/api/core/v1" +) + +func GetOutputSecret(o logging.OutputSpec, secrets map[string]*corev1.Secret) *corev1.Secret { + if s, ok := secrets[o.Name]; ok { + log.V(9).Info("Using secret configured in output: " + o.Name) + return s + } + if s := secrets[constants.LogCollectorToken]; s != nil { + log.V(9).Info("Using secret configured in " + constants.LogCollectorToken) + return s + } + log.V(9).Info("No Secret found in " + constants.LogCollectorToken) + return nil +} diff --git a/internal/generator/vector/helpers/tls_options.go b/internal/generator/vector/helpers/tls_options.go new file mode 100644 index 0000000000..32a5448536 --- /dev/null +++ b/internal/generator/vector/helpers/tls_options.go @@ -0,0 +1,16 @@ +package helpers + +import ( + logging "github.com/openshift/cluster-logging-operator/apis/logging/v1" + "github.com/openshift/cluster-logging-operator/internal/generator/framework" +) + +func SetTLSProfileOptions(o logging.OutputSpec, op framework.Options) { + op[framework.MinTLSVersion], op[framework.Ciphers] = func() (string, string) { + if o.Name == logging.OutputNameDefault && o.Type == logging.OutputTypeElasticsearch { + return "", "" + } else { + return op.TLSProfileInfo(o, ",") + } + }() +} diff --git a/internal/generator/vector/input/inputs.go b/internal/generator/vector/input/inputs.go index 6d6d98e76b..9c893b8333 100644 --- a/internal/generator/vector/input/inputs.go +++ b/internal/generator/vector/input/inputs.go @@ -26,10 +26,12 @@ const ( InputContainerLogs = "container_logs" InputJournalLogs = "journal_logs" - RouteApplicationLogs = "route_application_logs" - SourceTransformThrottle = "source_throttle" + RouteApplicationLogs = "route_application_logs" SrcPassThrough = "." + + UserDefinedSourceThrottle = `source_throttle_%s` + perContainerLimitKeyField = `"{{ file }}"` ) var ( @@ -45,9 +47,7 @@ var ( AddLogTypeInfra = fmt.Sprintf(".log_type = %q", logging.InputNameInfrastructure) AddLogTypeAudit = fmt.Sprintf(".log_type = %q", logging.InputNameAudit) - UserDefinedInput = fmt.Sprintf("%s.%%s", RouteApplicationLogs) - UserDefinedSourceThrottle = fmt.Sprintf("%s_%%s", SourceTransformThrottle) - perContainerLimitKeyField = `"{{ file }}"` + UserDefinedInput = fmt.Sprintf("%s.%%s", RouteApplicationLogs) MatchNS = func(ns string) string { return helpers.Eq(K8sNamespaceName, ns) @@ -76,12 +76,12 @@ func AddThrottle(spec *logging.InputSpec) []Element { threshold = spec.Application.GroupLimit.MaxRecordsPerSecond } - el = append(el, normalize.Throttle{ - ComponentID: fmt.Sprintf(UserDefinedSourceThrottle, spec.Name), - Inputs: helpers.MakeInputs([]string{input}...), - Threshold: threshold, - KeyField: throttle_key, - }) + el = append(el, normalize.NewThrottle( + fmt.Sprintf(UserDefinedSourceThrottle, spec.Name), + []string{input}, + threshold, + throttle_key, + )...) return el } diff --git a/internal/generator/vector/normalize/throttle.go b/internal/generator/vector/normalize/throttle.go index 011f1e743d..ceebb6903f 100644 --- a/internal/generator/vector/normalize/throttle.go +++ b/internal/generator/vector/normalize/throttle.go @@ -1,5 +1,10 @@ package normalize +import ( + "github.com/openshift/cluster-logging-operator/internal/generator/framework" + "github.com/openshift/cluster-logging-operator/internal/generator/vector/helpers" +) + type Throttle struct { ComponentID string Desc string @@ -8,6 +13,19 @@ type Throttle struct { KeyField string } +func NewThrottle(id string, inputs []string, threshhold int64, throttleKey string) []framework.Element { + el := []framework.Element{} + + el = append(el, Throttle{ + ComponentID: id, + Inputs: helpers.MakeInputs(inputs...), + Threshold: threshhold, + KeyField: throttleKey, + }) + + return el +} + func (t Throttle) Name() string { return "throttleTemplate" } diff --git a/internal/generator/vector/output/common/throttle.go b/internal/generator/vector/output/common/throttle.go new file mode 100644 index 0000000000..940106259b --- /dev/null +++ b/internal/generator/vector/output/common/throttle.go @@ -0,0 +1,17 @@ +package common + +import ( + "fmt" + logging "github.com/openshift/cluster-logging-operator/apis/logging/v1" + "github.com/openshift/cluster-logging-operator/internal/generator/framework" + "github.com/openshift/cluster-logging-operator/internal/generator/vector/normalize" +) + +const ( + UserDefinedSinkThrottle = `sink_throttle_%s` +) + +func AddThrottleForSink(spec *logging.OutputSpec, inputs []string) []framework.Element { + id := fmt.Sprintf(UserDefinedSinkThrottle, spec.Name) + return normalize.NewThrottle(id, inputs, spec.Limit.MaxRecordsPerSecond, "") +} diff --git a/internal/generator/vector/output/metrics/prometheus.go b/internal/generator/vector/output/metrics/prometheus.go new file mode 100644 index 0000000000..0599bcd66d --- /dev/null +++ b/internal/generator/vector/output/metrics/prometheus.go @@ -0,0 +1,77 @@ +package metrics + +import ( + "github.com/openshift/cluster-logging-operator/internal/generator/framework" + "github.com/openshift/cluster-logging-operator/internal/generator/vector/helpers" +) + +const ( + AddNodenameToMetricTransformName = "add_nodename_to_metric" + PrometheusOutputSinkName = "prometheus_output" + PrometheusExporterListenPort = `24231` +) + +type PrometheusExporter struct { + ID string + Inputs string + Address string + TlsMinVersion string + CipherSuites string +} + +func (p PrometheusExporter) Name() string { + return "PrometheusExporterTemplate" +} + +func (p PrometheusExporter) Template() string { + return `{{define "` + p.Name() + `" -}} +[sinks.{{.ID}}] +type = "prometheus_exporter" +inputs = {{.Inputs}} +address = "{{.Address}}" +default_namespace = "collector" +[sinks.{{.ID}}.tls] +enabled = true +key_file = "/etc/collector/metrics/tls.key" +crt_file = "/etc/collector/metrics/tls.crt" +min_tls_version = "{{.TlsMinVersion}}" +ciphersuites = "{{.CipherSuites}}" +{{end}}` +} + +type AddNodenameToMetric struct { + ID string + Inputs string +} + +func (a AddNodenameToMetric) Name() string { + return AddNodenameToMetricTransformName +} + +func (a AddNodenameToMetric) Template() string { + return `{{define "` + a.Name() + `" -}} +[transforms.{{.ID}}] +type = "remap" +inputs = {{.Inputs}} +source = ''' +.tags.hostname = get_env_var!("VECTOR_SELF_NODE_NAME") +''' +{{end}}` +} + +func PrometheusOutput(id string, inputs []string, minTlsVersion string, cipherSuites string) framework.Element { + return PrometheusExporter{ + ID: id, + Inputs: helpers.MakeInputs(inputs...), + Address: helpers.ListenOnAllLocalInterfacesAddress() + `:` + PrometheusExporterListenPort, + TlsMinVersion: minTlsVersion, + CipherSuites: cipherSuites, + } +} + +func AddNodeNameToMetric(id string, inputs []string) framework.Element { + return AddNodenameToMetric{ + ID: id, + Inputs: helpers.MakeInputs(inputs...), + } +} diff --git a/internal/generator/vector/source/metrics.go b/internal/generator/vector/source/metrics.go index 89dfe005c5..1528f0bd52 100644 --- a/internal/generator/vector/source/metrics.go +++ b/internal/generator/vector/source/metrics.go @@ -1,10 +1,7 @@ package source const ( - InternalMetricsSourceName = "internal_metrics" - PrometheusOutputSinkName = "prometheus_output" - PrometheusExporterListenPort = `24231` - AddNodenameToMetricTransformName = "add_nodename_to_metric" + InternalMetricsSourceName = "internal_metrics" ) type InternalMetrics struct { @@ -26,52 +23,3 @@ type = "internal_metrics" {{end}} ` } - -type PrometheusExporter struct { - ID string - Inputs string - Address string - TlsMinVersion string - CipherSuites string -} - -func (p PrometheusExporter) Name() string { - return "PrometheusExporterTemplate" -} - -func (p PrometheusExporter) Template() string { - return `{{define "` + p.Name() + `" -}} -[sinks.{{.ID}}] -type = "prometheus_exporter" -inputs = {{.Inputs}} -address = "{{.Address}}" -default_namespace = "collector" - -[sinks.{{.ID}}.tls] -enabled = true -key_file = "/etc/collector/metrics/tls.key" -crt_file = "/etc/collector/metrics/tls.crt" -min_tls_version = "{{.TlsMinVersion}}" -ciphersuites = "{{.CipherSuites}}" -{{end}}` -} - -type AddNodenameToMetric struct { - ID string - Inputs string -} - -func (a AddNodenameToMetric) Name() string { - return AddNodenameToMetricTransformName -} - -func (a AddNodenameToMetric) Template() string { - return `{{define "` + a.Name() + `" -}} -[transforms.{{.ID}}] -type = "remap" -inputs = {{.Inputs}} -source = ''' -.tags.hostname = get_env_var!("VECTOR_SELF_NODE_NAME") -''' -{{end}}` -}