diff --git a/apis/logging/v1/output_types.go b/apis/logging/v1/output_types.go index 0c30d516e..c0f4f6b2a 100644 --- a/apis/logging/v1/output_types.go +++ b/apis/logging/v1/output_types.go @@ -286,7 +286,7 @@ type Http struct { // Timeout specifies the Http request timeout in seconds. If not set, 10secs is used. // +optional - Timeout string `json:"timeout,omitempty"` + Timeout int `json:"timeout,omitempty"` // Method specifies the Http method to be used for sending logs. If not set, 'POST' is used. // +kubebuilder:validation:Enum:=GET;HEAD;POST;PUT;DELETE;OPTIONS;TRACE;PATCH diff --git a/bundle/manifests/clusterlogging.clusterserviceversion.yaml b/bundle/manifests/clusterlogging.clusterserviceversion.yaml index 460e761bd..e990e5a7d 100644 --- a/bundle/manifests/clusterlogging.clusterserviceversion.yaml +++ b/bundle/manifests/clusterlogging.clusterserviceversion.yaml @@ -118,7 +118,7 @@ metadata: certified: "false" console.openshift.io/plugins: '["logging-view-plugin"]' containerImage: quay.io/openshift-logging/cluster-logging-operator:latest - createdAt: "2023-10-11T17:54:55Z" + createdAt: "2023-10-12T13:12:40Z" description: The Red Hat OpenShift Logging Operator for OCP provides a means for configuring and managing your aggregated logging stack. olm.skipRange: '>=5.6.0-0 <5.8.0' diff --git a/bundle/manifests/logging.openshift.io_clusterlogforwarders.yaml b/bundle/manifests/logging.openshift.io_clusterlogforwarders.yaml index c48a91325..3ea05a708 100644 --- a/bundle/manifests/logging.openshift.io_clusterlogforwarders.yaml +++ b/bundle/manifests/logging.openshift.io_clusterlogforwarders.yaml @@ -495,7 +495,7 @@ spec: timeout: description: Timeout specifies the Http request timeout in seconds. If not set, 10secs is used. - type: string + type: integer type: object kafka: description: 'Kafka provides optional extra properties for `type: diff --git a/config/crd/bases/logging.openshift.io_clusterlogforwarders.yaml b/config/crd/bases/logging.openshift.io_clusterlogforwarders.yaml index 835ac6f26..f191095bf 100644 --- a/config/crd/bases/logging.openshift.io_clusterlogforwarders.yaml +++ b/config/crd/bases/logging.openshift.io_clusterlogforwarders.yaml @@ -496,7 +496,7 @@ spec: timeout: description: Timeout specifies the Http request timeout in seconds. If not set, 10secs is used. - type: string + type: integer type: object kafka: description: 'Kafka provides optional extra properties for `type: diff --git a/internal/generator/fluentd/output/http/http_test.go b/internal/generator/fluentd/output/http/http_test.go index 04f66cdca..ec1f073ad 100644 --- a/internal/generator/fluentd/output/http/http_test.go +++ b/internal/generator/fluentd/output/http/http_test.go @@ -98,7 +98,7 @@ var _ = Describe("Generate fluentd config", func() { Name: "http-receiver", URL: "https://my-logstore.com/logs/app-logs", OutputTypeSpec: v1.OutputTypeSpec{Http: &v1.Http{ - Timeout: "50", + Timeout: 50, Headers: map[string]string{ "k1": "v1", "k2": "v2", @@ -170,7 +170,7 @@ var _ = Describe("Generate fluentd config", func() { Name: "http-receiver", URL: "https://my-logstore.com/logs/app-logs", OutputTypeSpec: v1.OutputTypeSpec{Http: &v1.Http{ - Timeout: "50", + Timeout: 50, Headers: map[string]string{ "k1": "v1", "k2": "v2", @@ -253,7 +253,7 @@ var _ = Describe("Generate fluentd config", func() { Name: "http-receiver", URL: "https://my-logstore.com/logs/app-logs", OutputTypeSpec: v1.OutputTypeSpec{Http: &v1.Http{ - Timeout: "50", + Timeout: 50, Headers: map[string]string{ "Content-Type": "application/json", "k1": "v1", diff --git a/internal/generator/helpers/optional_pair.go b/internal/generator/helpers/optional_pair.go new file mode 100644 index 000000000..dc833319a --- /dev/null +++ b/internal/generator/helpers/optional_pair.go @@ -0,0 +1,29 @@ +package helpers + +import ( + "fmt" + "reflect" +) + +type OptionalPair struct { + key string + Value interface{} +} + +func NewOptionalPair(key string, value interface{}) OptionalPair { + return OptionalPair{ + key, + value, + } +} + +func (op OptionalPair) String() string { + if op.Value == nil { + return "" + } + format := "%s = %v" + if reflect.TypeOf(op.Value).Kind() == reflect.String { + format = "%s = %q" + } + return fmt.Sprintf(format, op.key, op.Value) +} diff --git a/internal/generator/helpers/optional_pair_test.go b/internal/generator/helpers/optional_pair_test.go new file mode 100644 index 000000000..418a6119e --- /dev/null +++ b/internal/generator/helpers/optional_pair_test.go @@ -0,0 +1,34 @@ +package helpers + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("OptionalPair", func() { + Context("#Template", func() { + var ( + pair OptionalPair + ) + It("should return an empty template when the value is nil", func() { + pair = NewOptionalPair("abc", nil) + Expect(pair.String()).To(BeEmpty()) + }) + It("should return a formatted string config when value is a string", func() { + pair = NewOptionalPair("abc", "xyz") + Expect(pair.String()).To(Equal(`abc = "xyz"`)) + }) + It("should return a formatted numerical config when value is an int", func() { + pair = NewOptionalPair("abc", 123) + Expect(pair.String()).To(Equal(`abc = 123`)) + }) + It("should return a formatted bool config when value is bool", func() { + pair = NewOptionalPair("abc", true) + Expect(pair.String()).To(Equal(`abc = true`)) + }) + It("should return a formatted false bool config when value is bool", func() { + pair = NewOptionalPair("abc", false) + Expect(pair.String()).To(Equal(`abc = false`)) + }) + }) +}) diff --git a/internal/generator/helpers/suite_test.go b/internal/generator/helpers/suite_test.go new file mode 100644 index 000000000..443596bb0 --- /dev/null +++ b/internal/generator/helpers/suite_test.go @@ -0,0 +1,13 @@ +package helpers + +import ( + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +func TestSuite(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "[internal][generator][helpers] Suite") +} diff --git a/internal/generator/utils/utils.go b/internal/generator/utils/utils.go index 67f8a8bb0..497343959 100644 --- a/internal/generator/utils/utils.go +++ b/internal/generator/utils/utils.go @@ -7,6 +7,9 @@ import ( ) func ToHeaderStr(h map[string]string, formatStr string) string { + if len(h) == 0 { + return "" + } sortedKeys := make([]string, len(h)) i := 0 for k := range h { diff --git a/internal/generator/vector/conf_test.go b/internal/generator/vector/conf_test.go index f66e9b335..2c6729f65 100644 --- a/internal/generator/vector/conf_test.go +++ b/internal/generator/vector/conf_test.go @@ -73,15 +73,6 @@ var _ = Describe("Testing Complete Config Generation", func() { Options: generator.Options{ generator.ClusterTLSProfileSpec: tls.GetClusterTLSProfileSpec(nil), }, - CLSpec: logging.CollectionSpec{ - Fluentd: &logging.FluentdForwarderSpec{ - Buffer: &logging.FluentdBufferSpec{ - ChunkLimitSize: "8m", - TotalLimitSize: "800000000", - OverflowAction: "throw_exception", - }, - }, - }, CLFSpec: logging.ClusterLogForwarderSpec{ Inputs: []logging.InputSpec{ { diff --git a/internal/generator/vector/conf_test/complex.toml b/internal/generator/vector/conf_test/complex.toml index 67b8b5322..5c0bf06e1 100644 --- a/internal/generator/vector/conf_test/complex.toml +++ b/internal/generator/vector/conf_test/complex.toml @@ -381,6 +381,9 @@ topic = "topic" codec = "json" timestamp_format = "rfc3339" +[sinks.kafka_receiver.buffer] +when_full = "drop_newest" + [sinks.kafka_receiver.tls] enabled = true min_tls_version = "VersionTLS12" diff --git a/internal/generator/vector/conf_test/complex_custom_data_dir.toml b/internal/generator/vector/conf_test/complex_custom_data_dir.toml index 876c192a7..fef898bd2 100644 --- a/internal/generator/vector/conf_test/complex_custom_data_dir.toml +++ b/internal/generator/vector/conf_test/complex_custom_data_dir.toml @@ -382,6 +382,9 @@ topic = "topic" codec = "json" timestamp_format = "rfc3339" +[sinks.kafka_receiver.buffer] +when_full = "drop_newest" + [sinks.kafka_receiver.tls] enabled = true min_tls_version = "VersionTLS12" diff --git a/internal/generator/vector/conf_test/complex_es_no_ver.toml b/internal/generator/vector/conf_test/complex_es_no_ver.toml index 9cd9657de..a9d347eb2 100644 --- a/internal/generator/vector/conf_test/complex_es_no_ver.toml +++ b/internal/generator/vector/conf_test/complex_es_no_ver.toml @@ -421,10 +421,16 @@ endpoints = ["https://es-1.svc.messaging.cluster.local:9200"] bulk.index = "{{ write_index }}" bulk.action = "create" encoding.except_fields = ["write_index"] -request.timeout_secs = 2147483648 id_key = "_id" api_version = "v6" +[sinks.es_1.buffer] +when_full = "drop_newest" + +[sinks.es_1.request] +retry_attempts = 17 +timeout_secs = 2147483648 + [sinks.es_1.tls] min_tls_version = "VersionTLS12" ciphersuites = "TLS_AES_128_GCM_SHA256,TLS_AES_256_GCM_SHA384,TLS_CHACHA20_POLY1305_SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-CHACHA20-POLY1305,ECDHE-RSA-CHACHA20-POLY1305,DHE-RSA-AES128-GCM-SHA256,DHE-RSA-AES256-GCM-SHA384" @@ -535,10 +541,17 @@ endpoints = ["https://es-2.svc.messaging.cluster.local:9200"] bulk.index = "{{ write_index }}" bulk.action = "create" encoding.except_fields = ["write_index"] -request.timeout_secs = 2147483648 + id_key = "_id" api_version = "v6" +[sinks.es_2.buffer] +when_full = "drop_newest" + +[sinks.es_2.request] +retry_attempts = 17 +timeout_secs = 2147483648 + [sinks.es_2.tls] min_tls_version = "VersionTLS12" ciphersuites = "TLS_AES_128_GCM_SHA256,TLS_AES_256_GCM_SHA384,TLS_CHACHA20_POLY1305_SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-CHACHA20-POLY1305,ECDHE-RSA-CHACHA20-POLY1305,DHE-RSA-AES128-GCM-SHA256,DHE-RSA-AES256-GCM-SHA384" diff --git a/internal/generator/vector/conf_test/complex_es_v6.toml b/internal/generator/vector/conf_test/complex_es_v6.toml index 64eb27a7d..89051791a 100644 --- a/internal/generator/vector/conf_test/complex_es_v6.toml +++ b/internal/generator/vector/conf_test/complex_es_v6.toml @@ -420,10 +420,16 @@ endpoints = ["https://elasticsearch:9200"] bulk.index = "{{ write_index }}" bulk.action = "create" encoding.except_fields = ["write_index"] -request.timeout_secs = 2147483648 id_key = "_id" api_version = "v6" +[sinks.default.buffer] +when_full = "drop_newest" + +[sinks.default.request] +retry_attempts = 17 +timeout_secs = 2147483648 + [sinks.default.tls] key_file = "/var/run/ocp-collector/secrets/collector/tls.key" crt_file = "/var/run/ocp-collector/secrets/collector/tls.crt" @@ -537,10 +543,16 @@ endpoints = ["https://es-1.svc.messaging.cluster.local:9200"] bulk.index = "{{ write_index }}" bulk.action = "create" encoding.except_fields = ["write_index"] -request.timeout_secs = 2147483648 id_key = "_id" api_version = "v6" +[sinks.es_1.buffer] +when_full = "drop_newest" + +[sinks.es_1.request] +retry_attempts = 17 +timeout_secs = 2147483648 + [sinks.es_1.tls] min_tls_version = "VersionTLS12" ciphersuites = "TLS_AES_128_GCM_SHA256,TLS_AES_256_GCM_SHA384,TLS_CHACHA20_POLY1305_SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-CHACHA20-POLY1305,ECDHE-RSA-CHACHA20-POLY1305,DHE-RSA-AES128-GCM-SHA256,DHE-RSA-AES256-GCM-SHA384" @@ -655,10 +667,16 @@ endpoints = ["https://es-2.svc.messaging.cluster.local:9200"] bulk.index = "{{ write_index }}" bulk.action = "create" encoding.except_fields = ["write_index"] -request.timeout_secs = 2147483648 id_key = "_id" api_version = "v8" +[sinks.es_2.buffer] +when_full = "drop_newest" + +[sinks.es_2.request] +retry_attempts = 17 +timeout_secs = 2147483648 + [sinks.es_2.tls] min_tls_version = "VersionTLS12" ciphersuites = "TLS_AES_128_GCM_SHA256,TLS_AES_256_GCM_SHA384,TLS_CHACHA20_POLY1305_SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-CHACHA20-POLY1305,ECDHE-RSA-CHACHA20-POLY1305,DHE-RSA-AES128-GCM-SHA256,DHE-RSA-AES256-GCM-SHA384" diff --git a/internal/generator/vector/conf_test/complex_otel.toml b/internal/generator/vector/conf_test/complex_otel.toml index 33e17576d..80e86a9e5 100644 --- a/internal/generator/vector/conf_test/complex_otel.toml +++ b/internal/generator/vector/conf_test/complex_otel.toml @@ -427,7 +427,11 @@ method = "post" [sinks.http_receiver.encoding] codec = "json" +[sinks.http_receiver.buffer] +when_full = "drop_newest" + [sinks.http_receiver.request] +retry_attempts = 17 timeout_secs = 10 headers = {"h1"="v1","h2"="v2"} diff --git a/internal/generator/vector/conf_test/es_pipeline_w_spaces.toml b/internal/generator/vector/conf_test/es_pipeline_w_spaces.toml index 21c21a65d..cdb1def24 100644 --- a/internal/generator/vector/conf_test/es_pipeline_w_spaces.toml +++ b/internal/generator/vector/conf_test/es_pipeline_w_spaces.toml @@ -421,10 +421,16 @@ endpoints = ["https://es-1.svc.messaging.cluster.local:9200"] bulk.index = "{{ write_index }}" bulk.action = "create" encoding.except_fields = ["write_index"] -request.timeout_secs = 2147483648 id_key = "_id" api_version = "v6" +[sinks.es_1.buffer] +when_full = "drop_newest" + +[sinks.es_1.request] +retry_attempts = 17 +timeout_secs = 2147483648 + [sinks.es_1.tls] min_tls_version = "VersionTLS12" ciphersuites = "TLS_AES_128_GCM_SHA256,TLS_AES_256_GCM_SHA384,TLS_CHACHA20_POLY1305_SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-CHACHA20-POLY1305,ECDHE-RSA-CHACHA20-POLY1305,DHE-RSA-AES128-GCM-SHA256,DHE-RSA-AES256-GCM-SHA384" diff --git a/internal/generator/vector/elements/keyval.go b/internal/generator/vector/elements/keyval.go index aa3e42a7c..736d37de9 100644 --- a/internal/generator/vector/elements/keyval.go +++ b/internal/generator/vector/elements/keyval.go @@ -13,6 +13,9 @@ func (kv KeyVal) Name() string { } func (kv KeyVal) Template() string { + if kv.Val == "" { + return `{{define "` + kv.Name() + `" -}}{{end -}}` + } return `{{define "` + kv.Name() + `" -}} {{.Key}} = {{.Val}} {{end -}}` diff --git a/internal/generator/vector/output/buffer.go b/internal/generator/vector/output/buffer.go new file mode 100644 index 000000000..0b32a5427 --- /dev/null +++ b/internal/generator/vector/output/buffer.go @@ -0,0 +1,24 @@ +package output + +type Buffer struct { + ComponentID string + WhenFull string +} + +func NewBuffer(id string) Buffer { + return Buffer{ + ComponentID: id, + WhenFull: "drop_newest", + } +} + +func (b Buffer) Name() string { + return "buffer" +} + +func (b Buffer) Template() string { + return `{{define "` + b.Name() + `" -}} +[sinks.{{.ComponentID}}.buffer] +when_full = "{{.WhenFull}}" +{{end}}` +} diff --git a/internal/generator/vector/output/cloudwatch/cloudwatch.go b/internal/generator/vector/output/cloudwatch/cloudwatch.go index 6bc71bd70..93390de56 100644 --- a/internal/generator/vector/output/cloudwatch/cloudwatch.go +++ b/internal/generator/vector/output/cloudwatch/cloudwatch.go @@ -2,6 +2,7 @@ package cloudwatch import ( "fmt" + "github.com/openshift/cluster-logging-operator/internal/generator/vector/output" "regexp" "strings" @@ -61,7 +62,6 @@ group_name = "{{"{{ group_name }}"}}" stream_name = "{{"{{ stream_name }}"}}" {{compose_one .SecurityConfig}} encoding.codec = "json" -request.concurrency = 2 healthcheck.enabled = false {{compose_one .EndpointConfig}} {{- end}} @@ -69,29 +69,33 @@ healthcheck.enabled = false } func Conf(o logging.OutputSpec, inputs []string, secret *corev1.Secret, op Options) []Element { - outputName := helpers.FormatComponentID(o.Name) - componentID := fmt.Sprintf("%s_%s", outputName, "normalize_group_and_streams") - dedottedID := normalize.ID(outputName, "dedot") + id := helpers.FormatComponentID(o.Name) + componentID := fmt.Sprintf("%s_%s", id, "normalize_group_and_streams") + dedottedID := normalize.ID(id, "dedot") if genhelper.IsDebugOutput(op) { return []Element{ NormalizeGroupAndStreamName(LogGroupNameField(o), LogGroupPrefix(o), componentID, inputs), - Debug(outputName, helpers.MakeInputs([]string{componentID}...)), + Debug(id, helpers.MakeInputs([]string{componentID}...)), } } + request := output.NewRequest(id) + request.Concurrency.Value = 2 return MergeElements( []Element{ NormalizeGroupAndStreamName(LogGroupNameField(o), LogGroupPrefix(o), componentID, inputs), normalize.DedotLabels(dedottedID, []string{componentID}), - OutputConf(o, []string{dedottedID}, secret, op, o.Cloudwatch.Region), + OutputConf(id, o, []string{dedottedID}, secret, op, o.Cloudwatch.Region), + output.NewBuffer(id), + request, }, TLSConf(o, secret, op), ) } -func OutputConf(o logging.OutputSpec, inputs []string, secret *corev1.Secret, op Options, region string) Element { +func OutputConf(id string, o logging.OutputSpec, inputs []string, secret *corev1.Secret, op Options, region string) Element { return CloudWatch{ Desc: "Cloudwatch Logs", - ComponentID: helpers.FormatComponentID(o.Name), + ComponentID: id, Inputs: helpers.MakeInputs(inputs...), Region: region, SecurityConfig: SecurityConfig(secret), diff --git a/internal/generator/vector/output/cloudwatch/output_cloudwatch_test.go b/internal/generator/vector/output/cloudwatch/output_cloudwatch_test.go index 6f7e358dd..c32a0feef 100644 --- a/internal/generator/vector/output/cloudwatch/output_cloudwatch_test.go +++ b/internal/generator/vector/output/cloudwatch/output_cloudwatch_test.go @@ -104,8 +104,15 @@ stream_name = "{{ stream_name }}" auth.access_key_id = "` + keyId + `" auth.secret_access_key = "` + keySecret + `" encoding.codec = "json" -request.concurrency = 2 healthcheck.enabled = false +` + cwBufferAndRequest = ` +[sinks.cw.buffer] +when_full = "drop_newest" + +[sinks.cw.request] +retry_attempts = 17 +concurrency = 2 ` cwSinkRole = ` # Cloudwatch Logs @@ -118,8 +125,14 @@ group_name = "{{ group_name }}" stream_name = "{{ stream_name }}" # role_arn and identity token set via env vars encoding.codec = "json" -request.concurrency = 2 healthcheck.enabled = false + +[sinks.cw.buffer] +when_full = "drop_newest" + +[sinks.cw.request] +retry_attempts = 17 +concurrency = 2 ` cwSinkKeyIdTLS = ` # Cloudwatch Logs @@ -133,8 +146,15 @@ stream_name = "{{ stream_name }}" auth.access_key_id = "` + keyId + `" auth.secret_access_key = "` + keySecret + `" encoding.codec = "json" -request.concurrency = 2 healthcheck.enabled = false + +[sinks.cw.buffer] +when_full = "drop_newest" + +[sinks.cw.request] +retry_attempts = 17 +concurrency = 2 + [sinks.cw.tls] min_tls_version = "` + defaultTLS + `" ciphersuites = "` + defaultCiphers + `" @@ -154,8 +174,14 @@ stream_name = "{{ stream_name }}" auth.access_key_id = "` + keyId + `" auth.secret_access_key = "` + keySecret + `" encoding.codec = "json" -request.concurrency = 2 healthcheck.enabled = false + +[sinks.cw.buffer] +when_full = "drop_newest" + +[sinks.cw.request] +retry_attempts = 17 +concurrency = 2 [sinks.cw.tls] min_tls_version = "` + defaultTLS + `" ciphersuites = "` + defaultCiphers + `" @@ -173,8 +199,14 @@ stream_name = "{{ stream_name }}" auth.access_key_id = "` + keyId + `" auth.secret_access_key = "` + keySecret + `" encoding.codec = "json" -request.concurrency = 2 healthcheck.enabled = false + +[sinks.cw.buffer] +when_full = "drop_newest" + +[sinks.cw.request] +retry_attempts = 17 +concurrency = 2 [sinks.cw.tls] min_tls_version = "` + defaultTLS + `" ciphersuites = "` + defaultCiphers + `" @@ -195,8 +227,14 @@ group_name = "{{ group_name }}" stream_name = "{{ stream_name }}" # role_arn and identity token set via env vars encoding.codec = "json" -request.concurrency = 2 healthcheck.enabled = false + +[sinks.cw.buffer] +when_full = "drop_newest" + +[sinks.cw.request] +retry_attempts = 17 +concurrency = 2 [sinks.cw.tls] min_tls_version = "` + defaultTLS + `" ciphersuites = "` + defaultCiphers + `" @@ -215,8 +253,14 @@ group_name = "{{ group_name }}" stream_name = "{{ stream_name }}" # role_arn and identity token set via env vars encoding.codec = "json" -request.concurrency = 2 healthcheck.enabled = false + +[sinks.cw.buffer] +when_full = "drop_newest" + +[sinks.cw.request] +retry_attempts = 17 +concurrency = 2 [sinks.cw.tls] min_tls_version = "` + defaultTLS + `" ciphersuites = "` + defaultCiphers + `" @@ -237,8 +281,14 @@ group_name = "{{ group_name }}" stream_name = "{{ stream_name }}" # role_arn and identity token set via env vars encoding.codec = "json" -request.concurrency = 2 healthcheck.enabled = false + +[sinks.cw.buffer] +when_full = "drop_newest" + +[sinks.cw.request] +retry_attempts = 17 +concurrency = 2 [sinks.cw.tls] min_tls_version = "` + defaultTLS + `" ciphersuites = "` + defaultCiphers + `" @@ -257,8 +307,14 @@ group_name = "{{ group_name }}" stream_name = "{{ stream_name }}" # role_arn and identity token set via env vars encoding.codec = "json" -request.concurrency = 2 healthcheck.enabled = false + +[sinks.cw.buffer] +when_full = "drop_newest" + +[sinks.cw.request] +retry_attempts = 17 +concurrency = 2 [sinks.cw.tls] min_tls_version = "` + defaultTLS + `" ciphersuites = "` + defaultCiphers + `" @@ -349,6 +405,7 @@ var _ = Describe("Generating vector config for cloudwatch output", func() { ` + dedotted + ` ` + cwSinkKeyId + ` +` + cwBufferAndRequest + ` ` element := Conf(output, pipelineName, secrets[output.Secret.Name], nil) results, err := g.GenerateConf(element...) @@ -383,6 +440,7 @@ var _ = Describe("Generating vector config for cloudwatch output", func() { ` + dedotted + ` ` + cwSinkKeyId + ` +` + cwBufferAndRequest + ` ` element := Conf(output, pipelineName, secrets[output.Secret.Name], nil) results, err := g.GenerateConf(element...) @@ -417,6 +475,7 @@ var _ = Describe("Generating vector config for cloudwatch output", func() { ` + dedotted + ` ` + cwSinkKeyId + ` +` + cwBufferAndRequest + ` ` element := Conf(output, pipelineName, secrets[output.Secret.Name], nil) results, err := g.GenerateConf(element...) @@ -458,6 +517,7 @@ var _ = Describe("Generating vector config for cloudwatch output", func() { ` + dedotted + ` ` + cwSinkKeyId + ` +` + cwBufferAndRequest + ` ` element := Conf(output, pipelineName, secrets[output.Secret.Name], nil) results, err := g.GenerateConf(element...) @@ -494,7 +554,8 @@ var _ = Describe("Generating vector config for cloudwatch output", func() { ` + dedotted + ` ` + cwSinkKeyId + ` -endpoint = "` + endpoint + `"` +endpoint = "` + endpoint + `"` + cwBufferAndRequest + element := Conf(output, pipelineName, secrets[output.Secret.Name], nil) results, err := g.GenerateConf(element...) Expect(err).To(BeNil()) diff --git a/internal/generator/vector/output/elasticsearch/elasticsearch.go b/internal/generator/vector/output/elasticsearch/elasticsearch.go index d22d31e60..f93ca30fb 100644 --- a/internal/generator/vector/output/elasticsearch/elasticsearch.go +++ b/internal/generator/vector/output/elasticsearch/elasticsearch.go @@ -2,6 +2,7 @@ package elasticsearch import ( "fmt" + "github.com/openshift/cluster-logging-operator/internal/generator/vector/output" "strings" logging "github.com/openshift/cluster-logging-operator/apis/logging/v1" @@ -37,7 +38,6 @@ endpoints = ["{{.Endpoint}}"] bulk.index = "{{ "{{ write_index }}" }}" bulk.action = "create" encoding.except_fields = ["write_index"] -request.timeout_secs = 2147483648 id_key = "_id" {{- if ne .Version 0 }} api_version = "v{{ .Version }}" @@ -218,19 +218,23 @@ func ID(id1, id2 string) string { func Conf(o logging.OutputSpec, inputs []string, secret *corev1.Secret, op Options) []Element { outputs := []Element{} - outputName := helpers.FormatComponentID(o.Name) + id := helpers.FormatComponentID(o.Name) if genhelper.IsDebugOutput(op) { return []Element{ - SetESIndex(ID(outputName, "add_es_index"), inputs, o, op), - FlattenLabels(ID(outputName, "dedot_and_flatten"), []string{ID(outputName, "add_es_index")}), - Debug(outputName, helpers.MakeInputs([]string{ID(outputName, "dedot_and_flatten")}...)), + SetESIndex(ID(id, "add_es_index"), inputs, o, op), + FlattenLabels(ID(id, "dedot_and_flatten"), []string{ID(id, "add_es_index")}), + Debug(id, helpers.MakeInputs([]string{ID(id, "dedot_and_flatten")}...)), } } + request := output.NewRequest(id) + request.TimeoutSecs.Value = 2147483648 outputs = MergeElements(outputs, []Element{ - SetESIndex(ID(outputName, "add_es_index"), inputs, o, op), - FlattenLabels(ID(outputName, "dedot_and_flatten"), []string{ID(outputName, "add_es_index")}), - Output(o, []string{ID(outputName, "dedot_and_flatten")}, secret, op), + SetESIndex(ID(id, "add_es_index"), inputs, o, op), + FlattenLabels(ID(id, "dedot_and_flatten"), []string{ID(id, "add_es_index")}), + Output(o, []string{ID(id, "dedot_and_flatten")}, secret, op), + output.NewBuffer(id), + request, }, TLSConf(o, secret, op), BasicAuth(o, secret), diff --git a/internal/generator/vector/output/elasticsearch/elasticsearch_test.go b/internal/generator/vector/output/elasticsearch/elasticsearch_test.go index 5545098c2..67c9ea414 100644 --- a/internal/generator/vector/output/elasticsearch/elasticsearch_test.go +++ b/internal/generator/vector/output/elasticsearch/elasticsearch_test.go @@ -150,10 +150,16 @@ endpoints = ["https://es.svc.infra.cluster:9200"] bulk.index = "{{ write_index }}" bulk.action = "create" encoding.except_fields = ["write_index"] -request.timeout_secs = 2147483648 id_key = "_id" api_version = "v6" +[sinks.es_1.buffer] +when_full = "drop_newest" + +[sinks.es_1.request] +retry_attempts = 17 +timeout_secs = 2147483648 + # Basic Auth Config [sinks.es_1.auth] strategy = "basic" @@ -287,10 +293,16 @@ endpoints = ["https://es.svc.infra.cluster:9200"] bulk.index = "{{ write_index }}" bulk.action = "create" encoding.except_fields = ["write_index"] -request.timeout_secs = 2147483648 id_key = "_id" api_version = "v6" +[sinks.es_1.buffer] +when_full = "drop_newest" + +[sinks.es_1.request] +retry_attempts = 17 +timeout_secs = 2147483648 + [sinks.es_1.tls] key_file = "/var/run/ocp-collector/secrets/es-1/tls.key" crt_file = "/var/run/ocp-collector/secrets/es-1/tls.crt" @@ -413,9 +425,15 @@ endpoints = ["http://es.svc.infra.cluster:9200"] bulk.index = "{{ write_index }}" bulk.action = "create" encoding.except_fields = ["write_index"] -request.timeout_secs = 2147483648 id_key = "_id" api_version = "v6" + +[sinks.es_1.buffer] +when_full = "drop_newest" + +[sinks.es_1.request] +retry_attempts = 17 +timeout_secs = 2147483648 `, }), Entry("with multiple pipelines for elastic-search", helpers.ConfGenerateTest{ @@ -576,10 +594,14 @@ endpoints = ["https://es-1.svc.messaging.cluster.local:9200"] bulk.index = "{{ write_index }}" bulk.action = "create" encoding.except_fields = ["write_index"] -request.timeout_secs = 2147483648 id_key = "_id" api_version = "v6" +[sinks.es_1.buffer] +when_full = "drop_newest" +[sinks.es_1.request] +retry_attempts = 17 +timeout_secs = 2147483648 [sinks.es_1.tls] key_file = "/var/run/ocp-collector/secrets/es-1/tls.key" crt_file = "/var/run/ocp-collector/secrets/es-1/tls.crt" @@ -688,9 +710,14 @@ endpoints = ["https://es-2.svc.messaging.cluster.local:9200"] bulk.index = "{{ write_index }}" bulk.action = "create" encoding.except_fields = ["write_index"] -request.timeout_secs = 2147483648 id_key = "_id" api_version = "v6" +[sinks.es_2.buffer] +when_full = "drop_newest" + +[sinks.es_2.request] +retry_attempts = 17 +timeout_secs = 2147483648 [sinks.es_2.tls] key_file = "/var/run/ocp-collector/secrets/es-2/tls.key" @@ -832,9 +859,14 @@ endpoints = ["http://es.svc.infra.cluster:9200"] bulk.index = "{{ write_index }}" bulk.action = "create" encoding.except_fields = ["write_index"] -request.timeout_secs = 2147483648 id_key = "_id" api_version = "v6" +[sinks.es_1.buffer] +when_full = "drop_newest" + +[sinks.es_1.request] +retry_attempts = 17 +timeout_secs = 2147483648 `, }), Entry("with StructuredTypeName", helpers.ConfGenerateTest{ @@ -967,9 +999,14 @@ endpoints = ["http://es.svc.infra.cluster:9200"] bulk.index = "{{ write_index }}" bulk.action = "create" encoding.except_fields = ["write_index"] -request.timeout_secs = 2147483648 id_key = "_id" api_version = "v6" +[sinks.es_1.buffer] +when_full = "drop_newest" + +[sinks.es_1.request] +retry_attempts = 17 +timeout_secs = 2147483648 `, }), Entry("with both StructuredTypeKey and StructuredTypeName", helpers.ConfGenerateTest{ @@ -1108,9 +1145,14 @@ endpoints = ["http://es.svc.infra.cluster:9200"] bulk.index = "{{ write_index }}" bulk.action = "create" encoding.except_fields = ["write_index"] -request.timeout_secs = 2147483648 id_key = "_id" api_version = "v6" +[sinks.es_1.buffer] +when_full = "drop_newest" + +[sinks.es_1.request] +retry_attempts = 17 +timeout_secs = 2147483648 `, }), Entry("with StructuredTypeKey, StructuredTypeName, container annotations enabled", helpers.ConfGenerateTest{ @@ -1262,9 +1304,14 @@ endpoints = ["http://es.svc.infra.cluster:9200"] bulk.index = "{{ write_index }}" bulk.action = "create" encoding.except_fields = ["write_index"] -request.timeout_secs = 2147483648 id_key = "_id" api_version = "v6" +[sinks.es_1.buffer] +when_full = "drop_newest" + +[sinks.es_1.request] +retry_attempts = 17 +timeout_secs = 2147483648 `, }), Entry("without an Elasticsearch version", helpers.ConfGenerateTest{ @@ -1383,9 +1430,14 @@ endpoints = ["http://es.svc.infra.cluster:9200"] bulk.index = "{{ write_index }}" bulk.action = "create" encoding.except_fields = ["write_index"] -request.timeout_secs = 2147483648 id_key = "_id" api_version = "v6" +[sinks.es_1.buffer] +when_full = "drop_newest" + +[sinks.es_1.request] +retry_attempts = 17 +timeout_secs = 2147483648 `, }), Entry("with an Elasticsearch version less than our default", helpers.ConfGenerateTest{ @@ -1513,9 +1565,14 @@ endpoints = ["http://es.svc.infra.cluster:9200"] bulk.index = "{{ write_index }}" bulk.action = "create" encoding.except_fields = ["write_index"] -request.timeout_secs = 2147483648 id_key = "_id" api_version = "v5" +[sinks.es_1.buffer] +when_full = "drop_newest" + +[sinks.es_1.request] +retry_attempts = 17 +timeout_secs = 2147483648 `, }), Entry("with our default Elasticsearch version", helpers.ConfGenerateTest{ @@ -1643,9 +1700,14 @@ endpoints = ["http://es.svc.infra.cluster:9200"] bulk.index = "{{ write_index }}" bulk.action = "create" encoding.except_fields = ["write_index"] -request.timeout_secs = 2147483648 id_key = "_id" api_version = "v6" +[sinks.es_1.buffer] +when_full = "drop_newest" + +[sinks.es_1.request] +retry_attempts = 17 +timeout_secs = 2147483648 `, }), Entry("with Elasticsearch version 7", helpers.ConfGenerateTest{ @@ -1773,9 +1835,14 @@ endpoints = ["http://es.svc.infra.cluster:9200"] bulk.index = "{{ write_index }}" bulk.action = "create" encoding.except_fields = ["write_index"] -request.timeout_secs = 2147483648 id_key = "_id" api_version = "v7" +[sinks.es_1.buffer] +when_full = "drop_newest" + +[sinks.es_1.request] +retry_attempts = 17 +timeout_secs = 2147483648 `, }), Entry("with an Elasticsearch version greater than latest version", helpers.ConfGenerateTest{ @@ -1903,9 +1970,14 @@ endpoints = ["http://es.svc.infra.cluster:9200"] bulk.index = "{{ write_index }}" bulk.action = "create" encoding.except_fields = ["write_index"] -request.timeout_secs = 2147483648 id_key = "_id" api_version = "v9" +[sinks.es_1.buffer] +when_full = "drop_newest" + +[sinks.es_1.request] +retry_attempts = 17 +timeout_secs = 2147483648 `, }), ) diff --git a/internal/generator/vector/output/gcl/gcl.go b/internal/generator/vector/output/gcl/gcl.go index 17be3fe5e..a6025489c 100644 --- a/internal/generator/vector/output/gcl/gcl.go +++ b/internal/generator/vector/output/gcl/gcl.go @@ -2,9 +2,8 @@ package gcl import ( "fmt" - "strings" - logging "github.com/openshift/cluster-logging-operator/apis/logging/v1" + "github.com/openshift/cluster-logging-operator/internal/generator/vector/output" . "github.com/openshift/cluster-logging-operator/internal/generator" genhelper "github.com/openshift/cluster-logging-operator/internal/generator/helpers" @@ -63,19 +62,19 @@ node_name = "{{"{{hostname}}"}}" } func Conf(o logging.OutputSpec, inputs []string, secret *corev1.Secret, op Options) []Element { + id := vectorhelpers.FormatComponentID(o.Name) if genhelper.IsDebugOutput(op) { return []Element{ - Debug(strings.ToLower(vectorhelpers.Replacer.Replace(o.Name)), vectorhelpers.MakeInputs(inputs...)), + Debug(id, vectorhelpers.MakeInputs(inputs...)), } } if o.GoogleCloudLogging == nil { return []Element{} } g := o.GoogleCloudLogging - outputName := helpers.FormatComponentID(o.Name) - dedottedID := normalize.ID(outputName, "dedot") + dedottedID := normalize.ID(id, "dedot") gcl := GoogleCloudLogging{ - ComponentID: helpers.FormatComponentID(o.Name), + ComponentID: id, Inputs: helpers.MakeInputs(inputs...), LogDestination: LogDestination(g), LogID: g.LogID, @@ -84,7 +83,12 @@ func Conf(o logging.OutputSpec, inputs []string, secret *corev1.Secret, op Optio } setInput(&gcl, []string{dedottedID}) return MergeElements( - []Element{normalize.DedotLabels(dedottedID, inputs), gcl}, + []Element{ + normalize.DedotLabels(dedottedID, inputs), + gcl, + output.NewBuffer(id), + output.NewRequest(id), + }, TLSConf(o, secret, op), ) } diff --git a/internal/generator/vector/output/gcl/gcl_test.go b/internal/generator/vector/output/gcl/gcl_test.go index 1397410ab..c36602524 100644 --- a/internal/generator/vector/output/gcl/gcl_test.go +++ b/internal/generator/vector/output/gcl/gcl_test.go @@ -109,10 +109,16 @@ credentials_path = "/var/run/ocp-collector/secrets/junk/google-application-crede log_id = "vector-1" severity_key = "level" - [sinks.gcl_1.resource] type = "k8s_node" node_name = "{{hostname}}" + +[sinks.gcl_1.buffer] +when_full = "drop_newest" + +[sinks.gcl_1.request] +retry_attempts = 17 + `, }), Entry("with TLS config with default minTLSVersion & ciphers", helpers.ConfGenerateTest{ @@ -207,11 +213,16 @@ credentials_path = "/var/run/ocp-collector/secrets/junk/google-application-crede log_id = "vector-1" severity_key = "level" - [sinks.gcl_tls.resource] type = "k8s_node" node_name = "{{hostname}}" +[sinks.gcl_tls.buffer] +when_full = "drop_newest" + +[sinks.gcl_tls.request] +retry_attempts = 17 + [sinks.gcl_tls.tls] min_tls_version = "` + defaultTLS + `" ciphersuites = "` + defaultCiphers + `" @@ -310,12 +321,17 @@ billing_account_id = "billing-1" credentials_path = "/var/run/ocp-collector/secrets/junk/google-application-credentials.json" log_id = "vector-1" severity_key = "level" - - [sinks.gcl_tls.resource] type = "k8s_node" node_name = "{{hostname}}" +[sinks.gcl_tls.buffer] +when_full = "drop_newest" + +[sinks.gcl_tls.request] +retry_attempts = 17 + + [sinks.gcl_tls.tls] min_tls_version = "` + defaultTLS + `" ciphersuites = "` + defaultCiphers + `" diff --git a/internal/generator/vector/output/http/http.go b/internal/generator/vector/output/http/http.go index bd153edcb..21a829a46 100644 --- a/internal/generator/vector/output/http/http.go +++ b/internal/generator/vector/output/http/http.go @@ -2,13 +2,13 @@ package http import ( "fmt" + "github.com/openshift/cluster-logging-operator/internal/generator/vector/output" "strings" logging "github.com/openshift/cluster-logging-operator/apis/logging/v1" "github.com/openshift/cluster-logging-operator/internal/constants" . "github.com/openshift/cluster-logging-operator/internal/generator" genhelper "github.com/openshift/cluster-logging-operator/internal/generator/helpers" - "github.com/openshift/cluster-logging-operator/internal/generator/utils" . "github.com/openshift/cluster-logging-operator/internal/generator/vector/elements" "github.com/openshift/cluster-logging-operator/internal/generator/vector/helpers" vectorhelpers "github.com/openshift/cluster-logging-operator/internal/generator/vector/helpers" @@ -111,9 +111,9 @@ func Normalize(componentID string, inputs []string) Element { } func Conf(o logging.OutputSpec, inputs []string, secret *corev1.Secret, op Options) []Element { - outputName := helpers.FormatComponentID(o.Name) + id := helpers.FormatComponentID(o.Name) component := strings.ToLower(vectorhelpers.Replacer.Replace(fmt.Sprintf("%s_%s", o.Name, NormalizeHttp))) - dedottedID := normalize.ID(outputName, "dedot") + dedottedID := normalize.ID(id, "dedot") if genhelper.IsDebugOutput(op) { return []Element{ Normalize(component, inputs), @@ -121,11 +121,12 @@ func Conf(o logging.OutputSpec, inputs []string, secret *corev1.Secret, op Optio } } return MergeElements( - Schema(o, outputName, component, inputs, op), + Schema(o, id, component, inputs, op), []Element{ normalize.DedotLabels(dedottedID, []string{component}), Output(o, []string{dedottedID}, secret, op), Encoding(o), + output.NewBuffer(id), Request(o), }, TLSConf(o, secret, op), @@ -181,25 +182,17 @@ func Method(h *logging.Http) string { return "post" } -func Request(o logging.OutputSpec) Element { - var timeout string - if o.Http == nil || o.Http.Timeout == "" { - timeout = fmt.Sprintf("%d", DefaultHttpTimeoutSecs) - } else { +func Request(o logging.OutputSpec) *output.Request { + timeout := DefaultHttpTimeoutSecs + if o.Http != nil && o.Http.Timeout != 0 { timeout = o.Http.Timeout } - return HttpRequest{ - ComponentID: strings.ToLower(vectorhelpers.Replacer.Replace(o.Name)), - Timeout: timeout, - Headers: Headers(o), - } -} - -func Headers(o logging.OutputSpec) Element { - if o.Http == nil || len(o.Http.Headers) == 0 { - return Nil + req := output.NewRequest(strings.ToLower(vectorhelpers.Replacer.Replace(o.Name))) + req.TimeoutSecs.Value = timeout + if o.Http != nil && len(o.Http.Headers) != 0 { + req.SetHeaders(o.Http.Headers) } - return KV("headers", utils.ToHeaderStr(o.Http.Headers, "%q=%q")) + return req } func Encoding(o logging.OutputSpec) Element { diff --git a/internal/generator/vector/output/http/http_test.go b/internal/generator/vector/output/http/http_test.go index efd661e17..9e3b329a3 100644 --- a/internal/generator/vector/output/http/http_test.go +++ b/internal/generator/vector/output/http/http_test.go @@ -116,7 +116,11 @@ method = "post" [sinks.http_receiver.encoding] codec = "json" +[sinks.http_receiver.buffer] +when_full = "drop_newest" + [sinks.http_receiver.request] +retry_attempts = 17 timeout_secs = 10 headers = {"h1"="v1","h2"="v2"} @@ -221,7 +225,11 @@ method = "post" [sinks.http_receiver.encoding] codec = "json" +[sinks.http_receiver.buffer] +when_full = "drop_newest" + [sinks.http_receiver.request] +retry_attempts = 17 timeout_secs = 10 headers = {"h1"="v1","h2"="v2"} @@ -239,7 +247,7 @@ token = "token-for-custom-http" Name: "http-receiver", URL: "https://my-logstore.com", OutputTypeSpec: logging.OutputTypeSpec{Http: &logging.Http{ - Timeout: "50", + Timeout: 50, Headers: map[string]string{ "k1": "v1", "k2": "v2", @@ -323,7 +331,11 @@ method = "post" [sinks.http_receiver.encoding] codec = "json" +[sinks.http_receiver.buffer] +when_full = "drop_newest" + [sinks.http_receiver.request] +retry_attempts = 17 timeout_secs = 50 headers = {"k1"="v1","k2"="v2"} @@ -341,7 +353,7 @@ token = "token-for-custom-http" Name: "http-receiver", URL: "https://my-logstore.com", OutputTypeSpec: logging.OutputTypeSpec{Http: &logging.Http{ - Timeout: "50", + Timeout: 50, Headers: map[string]string{ "k1": "v1", "k2": "v2", @@ -425,7 +437,11 @@ method = "post" [sinks.http_receiver.encoding] codec = "json" +[sinks.http_receiver.buffer] +when_full = "drop_newest" + [sinks.http_receiver.request] +retry_attempts = 17 timeout_secs = 50 headers = {"k1"="v1","k2"="v2"} @@ -443,7 +459,7 @@ token = "token-for-custom-http" Name: "http-receiver", URL: "https://my-logstore.com", OutputTypeSpec: logging.OutputTypeSpec{Http: &logging.Http{ - Timeout: "50", + Timeout: 50, Headers: map[string]string{ "k1": "v1", "k2": "v2", @@ -533,7 +549,11 @@ method = "post" [sinks.http_receiver.encoding] codec = "json" +[sinks.http_receiver.buffer] +when_full = "drop_newest" + [sinks.http_receiver.request] +retry_attempts = 17 timeout_secs = 50 headers = {"k1"="v1","k2"="v2"} @@ -688,7 +708,11 @@ method = "post" [sinks.http_receiver.encoding] codec = "json" +[sinks.http_receiver.buffer] +when_full = "drop_newest" + [sinks.http_receiver.request] +retry_attempts = 17 timeout_secs = 10 headers = {"h1"="v1","h2"="v2"} @@ -787,7 +811,11 @@ method = "post" [sinks.http_receiver.encoding] codec = "json" +[sinks.http_receiver.buffer] +when_full = "drop_newest" + [sinks.http_receiver.request] +retry_attempts = 17 timeout_secs = 10 # Basic Auth Config diff --git a/internal/generator/vector/output/kafka/kafka.go b/internal/generator/vector/output/kafka/kafka.go index 34ab30e0b..9d7235f34 100644 --- a/internal/generator/vector/output/kafka/kafka.go +++ b/internal/generator/vector/output/kafka/kafka.go @@ -2,6 +2,7 @@ package kafka import ( "fmt" + "github.com/openshift/cluster-logging-operator/internal/generator/vector/output" "net/url" "strings" @@ -47,20 +48,21 @@ topic = {{.Topic}} } func Conf(o logging.OutputSpec, inputs []string, secret *corev1.Secret, op Options) []Element { + id := vectorhelpers.FormatComponentID(o.Name) if genhelper.IsDebugOutput(op) { return []Element{ - Debug(strings.ToLower(vectorhelpers.Replacer.Replace(o.Name)), vectorhelpers.MakeInputs(inputs...)), + Debug(id, vectorhelpers.MakeInputs(inputs...)), } } - outputName := vectorhelpers.FormatComponentID(o.Name) - dedottedID := normalize.ID(outputName, "dedot") + dedottedID := normalize.ID(id, "dedot") brokers, genTlsConf := Brokers(o) return MergeElements( []Element{ normalize.DedotLabels(dedottedID, inputs), Output(o, []string{dedottedID}, secret, op, brokers), Encoding(o, op), + output.NewBuffer(id), }, TLSConf(o, secret, op, genTlsConf), SASLConf(o, secret), diff --git a/internal/generator/vector/output/kafka/kafka_test.go b/internal/generator/vector/output/kafka/kafka_test.go index 29133efbd..428f5c84b 100644 --- a/internal/generator/vector/output/kafka/kafka_test.go +++ b/internal/generator/vector/output/kafka/kafka_test.go @@ -107,6 +107,8 @@ topic = "build_complete" codec = "json" timestamp_format = "rfc3339" +[sinks.kafka_receiver.buffer] +when_full = "drop_newest" # SASL Config [sinks.kafka_receiver.sasl] @@ -205,6 +207,9 @@ topic = "build_complete" codec = "json" timestamp_format = "rfc3339" +[sinks.kafka_receiver.buffer] +when_full = "drop_newest" + [sinks.kafka_receiver.tls] enabled = true key_file = "/var/run/ocp-collector/secrets/kafka-receiver-1/tls.key" @@ -308,6 +313,9 @@ topic = "build_complete" codec = "json" timestamp_format = "rfc3339" +[sinks.kafka_receiver.buffer] +when_full = "drop_newest" + [sinks.kafka_receiver.tls] enabled = true key_file = "/var/run/ocp-collector/secrets/kafka-receiver-1/tls.key" @@ -403,6 +411,9 @@ topic = "topic" codec = "json" timestamp_format = "rfc3339" +[sinks.kafka_receiver.buffer] +when_full = "drop_newest" + [sinks.kafka_receiver.tls] enabled = true key_file = "/var/run/ocp-collector/secrets/kafka-receiver-1/tls.key" @@ -497,6 +508,9 @@ topic = "topic" codec = "json" timestamp_format = "rfc3339" +[sinks.kafka_receiver.buffer] +when_full = "drop_newest" + [sinks.kafka_receiver.tls] enabled = true key_file = "/var/run/ocp-collector/secrets/kafka-receiver-1/tls.key" @@ -589,6 +603,9 @@ topic = "topic" codec = "json" timestamp_format = "rfc3339" +[sinks.kafka_receiver.buffer] +when_full = "drop_newest" + [sinks.kafka_receiver.librdkafka_options] "enable.ssl.certificate.verification" = "false" [sinks.kafka_receiver.tls] @@ -678,6 +695,9 @@ topic = "topic" codec = "json" timestamp_format = "rfc3339" +[sinks.kafka_receiver.buffer] +when_full = "drop_newest" + [sinks.kafka_receiver.tls] enabled = true key_pass = "junk" @@ -753,6 +773,9 @@ topic = "topic" [sinks.kafka_receiver.encoding] codec = "json" timestamp_format = "rfc3339" + +[sinks.kafka_receiver.buffer] +when_full = "drop_newest" `, }), Entry("with plain TLS - no secret", helpers.ConfGenerateTest{ @@ -825,6 +848,9 @@ topic = "topic" [sinks.kafka_receiver.encoding] codec = "json" timestamp_format = "rfc3339" + +[sinks.kafka_receiver.buffer] +when_full = "drop_newest" `, }), Entry("without security", helpers.ConfGenerateTest{ @@ -897,6 +923,9 @@ topic = "topic" [sinks.kafka_receiver.encoding] codec = "json" timestamp_format = "rfc3339" + +[sinks.kafka_receiver.buffer] +when_full = "drop_newest" `, }), ) diff --git a/internal/generator/vector/output/loki/loki.go b/internal/generator/vector/output/loki/loki.go index 89ffafd61..55bf331c7 100644 --- a/internal/generator/vector/output/loki/loki.go +++ b/internal/generator/vector/output/loki/loki.go @@ -2,6 +2,7 @@ package loki import ( "fmt" + "github.com/openshift/cluster-logging-operator/internal/generator/vector/output" "strings" "github.com/openshift/cluster-logging-operator/internal/generator/vector/helpers" @@ -107,20 +108,22 @@ func (l LokiLabels) Template() string { } func Conf(o logging.OutputSpec, inputs []string, secret *corev1.Secret, op Options) []Element { + id := vectorhelpers.FormatComponentID(o.Name) if genhelper.IsDebugOutput(op) { return []Element{ - Debug(strings.ToLower(vectorhelpers.Replacer.Replace(o.Name)), vectorhelpers.MakeInputs(inputs...)), + Debug(id, vectorhelpers.MakeInputs(inputs...)), } } - outputName := helpers.FormatComponentID(o.Name) - componentID := fmt.Sprintf("%s_%s", outputName, "remap") - dedottedID := normalize.ID(outputName, "dedot") + componentID := fmt.Sprintf("%s_%s", id, "remap") + dedottedID := normalize.ID(id, "dedot") return MergeElements( []Element{ CleanupFields(componentID, inputs), normalize.DedotLabels(dedottedID, []string{componentID}), Output(o, []string{dedottedID}), Encoding(o), + output.NewBuffer(id), + output.NewRequest(id), Labels(o), }, TLSConf(o, secret, op), diff --git a/internal/generator/vector/output/loki/loki_conf_test.go b/internal/generator/vector/output/loki/loki_conf_test.go index 0684653ee..8c64e9ca0 100644 --- a/internal/generator/vector/output/loki/loki_conf_test.go +++ b/internal/generator/vector/output/loki/loki_conf_test.go @@ -141,6 +141,12 @@ healthcheck.enabled = false [sinks.loki_receiver.encoding] codec = "json" +[sinks.loki_receiver.buffer] +when_full = "drop_newest" + +[sinks.loki_receiver.request] +retry_attempts = 17 + [sinks.loki_receiver.labels] kubernetes_container_name = "{{kubernetes.container_name}}" kubernetes_host = "${VECTOR_SELF_NODE_NAME}" @@ -245,6 +251,12 @@ healthcheck.enabled = false [sinks.loki_receiver.encoding] codec = "json" +[sinks.loki_receiver.buffer] +when_full = "drop_newest" + +[sinks.loki_receiver.request] +retry_attempts = 17 + [sinks.loki_receiver.labels] kubernetes_container_name = "{{kubernetes.container_name}}" kubernetes_host = "${VECTOR_SELF_NODE_NAME}" @@ -348,6 +360,12 @@ tenant_id = "{{foo.bar.baz}}" [sinks.loki_receiver.encoding] codec = "json" +[sinks.loki_receiver.buffer] +when_full = "drop_newest" + +[sinks.loki_receiver.request] +retry_attempts = 17 + [sinks.loki_receiver.labels] kubernetes_container_name = "{{kubernetes.container_name}}" kubernetes_host = "${VECTOR_SELF_NODE_NAME}" @@ -449,6 +467,12 @@ healthcheck.enabled = false [sinks.loki_receiver.encoding] codec = "json" +[sinks.loki_receiver.buffer] +when_full = "drop_newest" + +[sinks.loki_receiver.request] +retry_attempts = 17 + [sinks.loki_receiver.labels] kubernetes_container_name = "{{kubernetes.container_name}}" kubernetes_host = "${VECTOR_SELF_NODE_NAME}" @@ -551,6 +575,12 @@ healthcheck.enabled = false [sinks.loki_receiver.encoding] codec = "json" +[sinks.loki_receiver.buffer] +when_full = "drop_newest" + +[sinks.loki_receiver.request] +retry_attempts = 17 + [sinks.loki_receiver.labels] kubernetes_container_name = "{{kubernetes.container_name}}" kubernetes_host = "${VECTOR_SELF_NODE_NAME}" @@ -643,6 +673,12 @@ healthcheck.enabled = false [sinks.loki_receiver.encoding] codec = "json" +[sinks.loki_receiver.buffer] +when_full = "drop_newest" + +[sinks.loki_receiver.request] +retry_attempts = 17 + [sinks.loki_receiver.labels] kubernetes_container_name = "{{kubernetes.container_name}}" kubernetes_host = "${VECTOR_SELF_NODE_NAME}" @@ -743,6 +779,13 @@ out_of_order_action = "accept" healthcheck.enabled = false [sinks.loki_receiver.encoding] codec = "json" + +[sinks.loki_receiver.buffer] +when_full = "drop_newest" + +[sinks.loki_receiver.request] +retry_attempts = 17 + [sinks.loki_receiver.labels] kubernetes_container_name = "{{kubernetes.container_name}}" kubernetes_host = "${VECTOR_SELF_NODE_NAME}" @@ -851,6 +894,12 @@ healthcheck.enabled = false [sinks.default_loki_apps.encoding] codec = "json" +[sinks.default_loki_apps.buffer] +when_full = "drop_newest" + +[sinks.default_loki_apps.request] +retry_attempts = 17 + [sinks.default_loki_apps.labels] kubernetes_container_name = "{{kubernetes.container_name}}" kubernetes_host = "${VECTOR_SELF_NODE_NAME}" diff --git a/internal/generator/vector/output/request.go b/internal/generator/vector/output/request.go new file mode 100644 index 000000000..d7e3d6355 --- /dev/null +++ b/internal/generator/vector/output/request.go @@ -0,0 +1,49 @@ +package output + +import ( + "github.com/openshift/cluster-logging-operator/internal/generator/helpers" + "github.com/openshift/cluster-logging-operator/internal/generator/utils" + "github.com/openshift/cluster-logging-operator/internal/generator/vector/elements" +) + +type Request struct { + ComponentID string + RetryAttempts int + Concurrency helpers.OptionalPair + TimeoutSecs helpers.OptionalPair + headers map[string]string +} + +// NewRequest section for an output +// Ref: LOG-4536 for RetryAttempts default +func NewRequest(id string) *Request { + return &Request{ + ComponentID: id, + RetryAttempts: 17, + Concurrency: helpers.NewOptionalPair("concurrency", nil), + TimeoutSecs: helpers.NewOptionalPair("timeout_secs", nil), + } +} + +func (r *Request) Name() string { + return "request" +} + +func (r *Request) Template() string { + return `{{define "` + r.Name() + `" -}} +[sinks.{{.ComponentID}}.request] +retry_attempts = {{.RetryAttempts}} +{{ .Concurrency -}} +{{ .TimeoutSecs }} +{{kv .Headers }} +{{end}} +` +} + +func (r *Request) Headers() elements.KeyVal { + return elements.KV("headers", utils.ToHeaderStr(r.headers, "%q=%q")) +} + +func (r *Request) SetHeaders(headers map[string]string) { + r.headers = headers +} diff --git a/internal/generator/vector/output/splunk/splunk.go b/internal/generator/vector/output/splunk/splunk.go index 1d2b7b7fc..c39d46c8c 100644 --- a/internal/generator/vector/output/splunk/splunk.go +++ b/internal/generator/vector/output/splunk/splunk.go @@ -2,6 +2,7 @@ package splunk import ( "fmt" + "github.com/openshift/cluster-logging-operator/internal/generator/vector/output" "strings" "github.com/openshift/cluster-logging-operator/internal/constants" @@ -61,18 +62,20 @@ codec = {{.Codec}} } func Conf(o logging.OutputSpec, inputs []string, secret *corev1.Secret, op Options) []Element { + id := vectorhelpers.FormatComponentID(o.Name) if genhelper.IsDebugOutput(op) { return []Element{ - Debug(strings.ToLower(vectorhelpers.Replacer.Replace(o.Name)), vectorhelpers.MakeInputs(inputs...)), + Debug(id, vectorhelpers.MakeInputs(inputs...)), } } - outputName := vectorhelpers.FormatComponentID(o.Name) - dedottedID := normalize.ID(outputName, "dedot") + dedottedID := normalize.ID(id, "dedot") return MergeElements( []Element{ normalize.DedotLabels(dedottedID, inputs), Output(o, []string{dedottedID}, secret, op), Encoding(o), + output.NewBuffer(id), + output.NewRequest(id), }, TLSConf(o, secret, op), ) @@ -80,7 +83,7 @@ func Conf(o logging.OutputSpec, inputs []string, secret *corev1.Secret, op Optio func Output(o logging.OutputSpec, inputs []string, secret *corev1.Secret, op Options) Element { return Splunk{ - ComponentID: strings.ToLower(vectorhelpers.Replacer.Replace(o.Name)), + ComponentID: vectorhelpers.FormatComponentID(o.Name), Inputs: vectorhelpers.MakeInputs(inputs...), Endpoint: o.URL, DefaultToken: security.GetFromSecret(secret, constants.SplunkHECTokenKey), diff --git a/internal/generator/vector/output/splunk/splunk_test.go b/internal/generator/vector/output/splunk/splunk_test.go index 78c290538..8789a60d8 100644 --- a/internal/generator/vector/output/splunk/splunk_test.go +++ b/internal/generator/vector/output/splunk/splunk_test.go @@ -76,6 +76,12 @@ default_token = "` + hecToken + `" timestamp_key = "@timestamp" [sinks.splunk_hec.encoding] codec = "json" + +[sinks.splunk_hec.buffer] +when_full = "drop_newest" + +[sinks.splunk_hec.request] +retry_attempts = 17 ` splunkSinkTls = splunkDedot + ` [sinks.splunk_hec] @@ -87,6 +93,12 @@ default_token = "` + hecToken + `" timestamp_key = "@timestamp" [sinks.splunk_hec.encoding] codec = "json" + +[sinks.splunk_hec.buffer] +when_full = "drop_newest" + +[sinks.splunk_hec.request] +retry_attempts = 17 [sinks.splunk_hec.tls] key_file = "/var/run/ocp-collector/secrets/vector-splunk-secret-tls/tls.key" crt_file = "/var/run/ocp-collector/secrets/vector-splunk-secret-tls/tls.crt" @@ -102,6 +114,13 @@ default_token = "` + hecToken + `" timestamp_key = "@timestamp" [sinks.splunk_hec.encoding] codec = "json" + +[sinks.splunk_hec.buffer] +when_full = "drop_newest" + +[sinks.splunk_hec.request] +retry_attempts = 17 + [sinks.splunk_hec.tls] verify_certificate = false verify_hostname = false @@ -119,6 +138,13 @@ default_token = "" timestamp_key = "@timestamp" [sinks.splunk_hec.encoding] codec = "json" + +[sinks.splunk_hec.buffer] +when_full = "drop_newest" + +[sinks.splunk_hec.request] +retry_attempts = 17 + [sinks.splunk_hec.tls] verify_certificate = false verify_hostname = false @@ -135,6 +161,12 @@ timestamp_key = "@timestamp" [sinks.splunk_hec.encoding] codec = "json" +[sinks.splunk_hec.buffer] +when_full = "drop_newest" + +[sinks.splunk_hec.request] +retry_attempts = 17 + [sinks.splunk_hec.tls] key_pass = "junk" ` diff --git a/internal/generator/vector/output/syslog/syslog.go b/internal/generator/vector/output/syslog/syslog.go index 9d3bfcd6f..f04abe977 100644 --- a/internal/generator/vector/output/syslog/syslog.go +++ b/internal/generator/vector/output/syslog/syslog.go @@ -85,9 +85,10 @@ severity = "{{.Severity}}" } func Conf(o logging.OutputSpec, inputs []string, secret *corev1.Secret, op Options) []Element { + id := vectorhelpers.FormatComponentID(o.Name) if genhelper.IsDebugOutput(op) { return []Element{ - Debug(vectorhelpers.FormatComponentID(o.Name), vectorhelpers.MakeInputs(inputs...)), + Debug(id, vectorhelpers.MakeInputs(inputs...)), } } u, _ := url.Parse(o.URL) diff --git a/internal/generator/vector/outputs_test.go b/internal/generator/vector/outputs_test.go index b7d561dbb..78b16cb5b 100644 --- a/internal/generator/vector/outputs_test.go +++ b/internal/generator/vector/outputs_test.go @@ -106,6 +106,12 @@ healthcheck.enabled = false [sinks.default_loki_apps.encoding] codec = "json" +[sinks.default_loki_apps.buffer] +when_full = "drop_newest" + +[sinks.default_loki_apps.request] +retry_attempts = 17 + [sinks.default_loki_apps.labels] kubernetes_container_name = "{{kubernetes.container_name}}" kubernetes_host = "${VECTOR_SELF_NODE_NAME}" diff --git a/internal/generator/vector/pipelines_to_output_test.go b/internal/generator/vector/pipelines_to_output_test.go index 9275b2d24..55d3f76d6 100644 --- a/internal/generator/vector/pipelines_to_output_test.go +++ b/internal/generator/vector/pipelines_to_output_test.go @@ -111,6 +111,12 @@ healthcheck.enabled = false [sinks.loki.encoding] codec = "json" +[sinks.loki.buffer] +when_full = "drop_newest" + +[sinks.loki.request] +retry_attempts = 17 + [sinks.loki.labels] kubernetes_container_name = "{{kubernetes.container_name}}" kubernetes_host = "${VECTOR_SELF_NODE_NAME}" @@ -239,6 +245,12 @@ healthcheck.enabled = false [sinks.loki.encoding] codec = "json" +[sinks.loki.buffer] +when_full = "drop_newest" + +[sinks.loki.request] +retry_attempts = 17 + [sinks.loki.labels] kubernetes_container_name = "{{kubernetes.container_name}}" kubernetes_host = "${VECTOR_SELF_NODE_NAME}"