Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 8 additions & 65 deletions internal/generator/vector/conf/outputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@ package conf
import (
"fmt"
"github.com/openshift/cluster-logging-operator/internal/generator/framework"
"github.com/openshift/cluster-logging-operator/internal/generator/vector/output/common"
"github.com/openshift/cluster-logging-operator/internal/generator/vector/output/metrics"
"github.com/openshift/cluster-logging-operator/internal/generator/vector/source"

"github.com/openshift/cluster-logging-operator/internal/generator/vector/normalize"

log "github.com/ViaQ/logerr/v2/log/static"
logging "github.com/openshift/cluster-logging-operator/apis/logging/v1"
"github.com/openshift/cluster-logging-operator/internal/constants"
"github.com/openshift/cluster-logging-operator/internal/generator/vector/helpers"
"github.com/openshift/cluster-logging-operator/internal/generator/vector/output/cloudwatch"
"github.com/openshift/cluster-logging-operator/internal/generator/vector/output/elasticsearch"
Expand All @@ -22,12 +20,6 @@ import (
corev1 "k8s.io/api/core/v1"
)

var (
SinkTransformThrottle = "sink_throttle"

UserDefinedSinkThrottle = fmt.Sprintf(`%s_%%s`, SinkTransformThrottle)
)

func OutputFromPipelines(spec *logging.ClusterLogForwarderSpec, op framework.Options) logging.RouteMap {
r := logging.RouteMap{}
for _, p := range spec.Pipelines {
Expand All @@ -38,51 +30,19 @@ func OutputFromPipelines(spec *logging.ClusterLogForwarderSpec, op framework.Opt
return r
}

func AddThrottleForSink(spec *logging.OutputSpec, inputs []string) []framework.Element {
el := []framework.Element{}

el = append(el, normalize.Throttle{
ComponentID: fmt.Sprintf(UserDefinedSinkThrottle, spec.Name),
Inputs: helpers.MakeInputs(inputs...),
Threshold: spec.Limit.MaxRecordsPerSecond,
KeyField: "",
})

return el
}

func Outputs(clspec *logging.CollectionSpec, secrets map[string]*corev1.Secret, clfspec *logging.ClusterLogForwarderSpec, op framework.Options) []framework.Element {
outputs := []framework.Element{}
ofp := OutputFromPipelines(clfspec, op)

for idx, o := range clfspec.Outputs {
var secret *corev1.Secret
if s, ok := secrets[o.Name]; ok {
secret = s
log.V(9).Info("Using secret configured in output: " + o.Name)
} else {
secret = secrets[constants.LogCollectorToken]
if secret != nil {
log.V(9).Info("Using secret configured in " + constants.LogCollectorToken)
} else {
log.V(9).Info("No Secret found in " + constants.LogCollectorToken)
}
}

if o.Name == logging.OutputNameDefault && o.Type == logging.OutputTypeElasticsearch {
op[framework.MinTLSVersion] = ""
op[framework.Ciphers] = ""
} else {
outMinTlsVersion, outCiphers := op.TLSProfileInfo(o, ",")
op[framework.MinTLSVersion] = outMinTlsVersion
op[framework.Ciphers] = outCiphers
}
secret := helpers.GetOutputSecret(o, secrets)
helpers.SetTLSProfileOptions(o, op)

inputs := ofp[o.Name].List()
if o.HasPolicy() && o.GetMaxRecordsPerSecond() > 0 {
// Vector Throttle component cannot have zero threshold
outputs = append(outputs, AddThrottleForSink(&clfspec.Outputs[idx], inputs)...)
inputs = []string{fmt.Sprintf(UserDefinedSinkThrottle, o.Name)}
outputs = append(outputs, common.AddThrottleForSink(&clfspec.Outputs[idx], inputs)...)
inputs = []string{fmt.Sprintf(common.UserDefinedSinkThrottle, o.Name)}
}

if !o.HasPolicy() || (o.HasPolicy() && o.GetMaxRecordsPerSecond() > 0) {
Expand All @@ -109,24 +69,7 @@ func Outputs(clspec *logging.CollectionSpec, secrets map[string]*corev1.Secret,

minTlsVersion, cipherSuites := op.TLSProfileInfo(logging.OutputSpec{}, ",")
outputs = append(outputs,
AddNodeNameToMetric(source.AddNodenameToMetricTransformName, []string{source.InternalMetricsSourceName}),
PrometheusOutput(source.PrometheusOutputSinkName, []string{source.AddNodenameToMetricTransformName}, minTlsVersion, cipherSuites))
metrics.AddNodeNameToMetric(metrics.AddNodenameToMetricTransformName, []string{source.InternalMetricsSourceName}),
metrics.PrometheusOutput(metrics.PrometheusOutputSinkName, []string{metrics.AddNodenameToMetricTransformName}, minTlsVersion, cipherSuites))
return outputs
}

func PrometheusOutput(id string, inputs []string, minTlsVersion string, cipherSuites string) framework.Element {
return source.PrometheusExporter{
ID: id,
Inputs: helpers.MakeInputs(inputs...),
Address: helpers.ListenOnAllLocalInterfacesAddress() + `:` + source.PrometheusExporterListenPort,
TlsMinVersion: minTlsVersion,
CipherSuites: cipherSuites,
}
}

func AddNodeNameToMetric(id string, inputs []string) framework.Element {
return source.AddNodenameToMetric{
ID: id,
Inputs: helpers.MakeInputs(inputs...),
}
}
21 changes: 21 additions & 0 deletions internal/generator/vector/helpers/secrets.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package helpers

import (
log "github.com/ViaQ/logerr/v2/log/static"
logging "github.com/openshift/cluster-logging-operator/apis/logging/v1"
"github.com/openshift/cluster-logging-operator/internal/constants"
corev1 "k8s.io/api/core/v1"
)

func GetOutputSecret(o logging.OutputSpec, secrets map[string]*corev1.Secret) *corev1.Secret {
if s, ok := secrets[o.Name]; ok {
log.V(9).Info("Using secret configured in output: " + o.Name)
return s
}
if s := secrets[constants.LogCollectorToken]; s != nil {
log.V(9).Info("Using secret configured in " + constants.LogCollectorToken)
return s
}
log.V(9).Info("No Secret found in " + constants.LogCollectorToken)
return nil
}
16 changes: 16 additions & 0 deletions internal/generator/vector/helpers/tls_options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package helpers

import (
logging "github.com/openshift/cluster-logging-operator/apis/logging/v1"
"github.com/openshift/cluster-logging-operator/internal/generator/framework"
)

func SetTLSProfileOptions(o logging.OutputSpec, op framework.Options) {
op[framework.MinTLSVersion], op[framework.Ciphers] = func() (string, string) {
if o.Name == logging.OutputNameDefault && o.Type == logging.OutputTypeElasticsearch {
return "", ""
} else {
return op.TLSProfileInfo(o, ",")
}
}()
}
Copy link
Contributor

@syedriko syedriko Nov 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How 'bout

func SetTLSProfileOptions(o logging.OutputSpec, op framework.Options) {
    op[framework.MinTLSVersion], op[framework.Ciphers] = func()(string, string) {
        if o.Name == logging.OutputNameDefault && o.Type == logging.OutputTypeElasticsearch {
            return "", ""
        } else {
            return op.TLSProfileInfo(o, ",")
        }
    }()
}

22 changes: 11 additions & 11 deletions internal/generator/vector/input/inputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ const (
InputContainerLogs = "container_logs"
InputJournalLogs = "journal_logs"

RouteApplicationLogs = "route_application_logs"
SourceTransformThrottle = "source_throttle"
RouteApplicationLogs = "route_application_logs"

SrcPassThrough = "."

UserDefinedSourceThrottle = `source_throttle_%s`
perContainerLimitKeyField = `"{{ file }}"`
)

var (
Expand All @@ -45,9 +47,7 @@ var (
AddLogTypeInfra = fmt.Sprintf(".log_type = %q", logging.InputNameInfrastructure)
AddLogTypeAudit = fmt.Sprintf(".log_type = %q", logging.InputNameAudit)

UserDefinedInput = fmt.Sprintf("%s.%%s", RouteApplicationLogs)
UserDefinedSourceThrottle = fmt.Sprintf("%s_%%s", SourceTransformThrottle)
perContainerLimitKeyField = `"{{ file }}"`
UserDefinedInput = fmt.Sprintf("%s.%%s", RouteApplicationLogs)

MatchNS = func(ns string) string {
return helpers.Eq(K8sNamespaceName, ns)
Expand Down Expand Up @@ -76,12 +76,12 @@ func AddThrottle(spec *logging.InputSpec) []Element {
threshold = spec.Application.GroupLimit.MaxRecordsPerSecond
}

el = append(el, normalize.Throttle{
ComponentID: fmt.Sprintf(UserDefinedSourceThrottle, spec.Name),
Inputs: helpers.MakeInputs([]string{input}...),
Threshold: threshold,
KeyField: throttle_key,
})
el = append(el, normalize.NewThrottle(
fmt.Sprintf(UserDefinedSourceThrottle, spec.Name),
[]string{input},
threshold,
throttle_key,
)...)

return el
}
Expand Down
18 changes: 18 additions & 0 deletions internal/generator/vector/normalize/throttle.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package normalize

import (
"github.com/openshift/cluster-logging-operator/internal/generator/framework"
"github.com/openshift/cluster-logging-operator/internal/generator/vector/helpers"
)

type Throttle struct {
ComponentID string
Desc string
Expand All @@ -8,6 +13,19 @@ type Throttle struct {
KeyField string
}

func NewThrottle(id string, inputs []string, threshhold int64, throttleKey string) []framework.Element {
el := []framework.Element{}

el = append(el, Throttle{
ComponentID: id,
Inputs: helpers.MakeInputs(inputs...),
Threshold: threshhold,
KeyField: throttleKey,
})

return el
}

func (t Throttle) Name() string {
return "throttleTemplate"
}
Expand Down
17 changes: 17 additions & 0 deletions internal/generator/vector/output/common/throttle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package common

import (
"fmt"
logging "github.com/openshift/cluster-logging-operator/apis/logging/v1"
"github.com/openshift/cluster-logging-operator/internal/generator/framework"
"github.com/openshift/cluster-logging-operator/internal/generator/vector/normalize"
)

const (
UserDefinedSinkThrottle = `sink_throttle_%s`
)

func AddThrottleForSink(spec *logging.OutputSpec, inputs []string) []framework.Element {
id := fmt.Sprintf(UserDefinedSinkThrottle, spec.Name)
return normalize.NewThrottle(id, inputs, spec.Limit.MaxRecordsPerSecond, "")
}
77 changes: 77 additions & 0 deletions internal/generator/vector/output/metrics/prometheus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package metrics

import (
"github.com/openshift/cluster-logging-operator/internal/generator/framework"
"github.com/openshift/cluster-logging-operator/internal/generator/vector/helpers"
)

const (
AddNodenameToMetricTransformName = "add_nodename_to_metric"
PrometheusOutputSinkName = "prometheus_output"
PrometheusExporterListenPort = `24231`
)

type PrometheusExporter struct {
ID string
Inputs string
Address string
TlsMinVersion string
CipherSuites string
}

func (p PrometheusExporter) Name() string {
return "PrometheusExporterTemplate"
}

func (p PrometheusExporter) Template() string {
return `{{define "` + p.Name() + `" -}}
[sinks.{{.ID}}]
type = "prometheus_exporter"
inputs = {{.Inputs}}
address = "{{.Address}}"
default_namespace = "collector"
[sinks.{{.ID}}.tls]
enabled = true
key_file = "/etc/collector/metrics/tls.key"
crt_file = "/etc/collector/metrics/tls.crt"
min_tls_version = "{{.TlsMinVersion}}"
ciphersuites = "{{.CipherSuites}}"
{{end}}`
}

type AddNodenameToMetric struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AddNodeNameToMetric is only used in one place, together with the PrometheusOutput, may be we could just inline it into func (p PrometheusExporter) Template() string { and remove all the AddNodenameToMetric code?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And even merge internal_metrics into that same template ^^^?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds like a future refactoring

ID string
Inputs string
}

func (a AddNodenameToMetric) Name() string {
return AddNodenameToMetricTransformName
}

func (a AddNodenameToMetric) Template() string {
return `{{define "` + a.Name() + `" -}}
[transforms.{{.ID}}]
type = "remap"
inputs = {{.Inputs}}
source = '''
.tags.hostname = get_env_var!("VECTOR_SELF_NODE_NAME")
'''
{{end}}`
}

func PrometheusOutput(id string, inputs []string, minTlsVersion string, cipherSuites string) framework.Element {
return PrometheusExporter{
ID: id,
Inputs: helpers.MakeInputs(inputs...),
Address: helpers.ListenOnAllLocalInterfacesAddress() + `:` + PrometheusExporterListenPort,
TlsMinVersion: minTlsVersion,
CipherSuites: cipherSuites,
}
}

func AddNodeNameToMetric(id string, inputs []string) framework.Element {
return AddNodenameToMetric{
ID: id,
Inputs: helpers.MakeInputs(inputs...),
}
}
54 changes: 1 addition & 53 deletions internal/generator/vector/source/metrics.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package source

const (
InternalMetricsSourceName = "internal_metrics"
PrometheusOutputSinkName = "prometheus_output"
PrometheusExporterListenPort = `24231`
AddNodenameToMetricTransformName = "add_nodename_to_metric"
InternalMetricsSourceName = "internal_metrics"
)

type InternalMetrics struct {
Expand All @@ -26,52 +23,3 @@ type = "internal_metrics"
{{end}}
`
}

type PrometheusExporter struct {
ID string
Inputs string
Address string
TlsMinVersion string
CipherSuites string
}

func (p PrometheusExporter) Name() string {
return "PrometheusExporterTemplate"
}

func (p PrometheusExporter) Template() string {
return `{{define "` + p.Name() + `" -}}
[sinks.{{.ID}}]
type = "prometheus_exporter"
inputs = {{.Inputs}}
address = "{{.Address}}"
default_namespace = "collector"

[sinks.{{.ID}}.tls]
enabled = true
key_file = "/etc/collector/metrics/tls.key"
crt_file = "/etc/collector/metrics/tls.crt"
min_tls_version = "{{.TlsMinVersion}}"
ciphersuites = "{{.CipherSuites}}"
{{end}}`
}

type AddNodenameToMetric struct {
ID string
Inputs string
}

func (a AddNodenameToMetric) Name() string {
return AddNodenameToMetricTransformName
}

func (a AddNodenameToMetric) Template() string {
return `{{define "` + a.Name() + `" -}}
[transforms.{{.ID}}]
type = "remap"
inputs = {{.Inputs}}
source = '''
.tags.hostname = get_env_var!("VECTOR_SELF_NODE_NAME")
'''
{{end}}`
}