diff --git a/internal/generator/vector/conf_test.go b/internal/generator/vector/conf_test.go index 08fdf42abc..a4dd1b5c8f 100644 --- a/internal/generator/vector/conf_test.go +++ b/internal/generator/vector/conf_test.go @@ -29,6 +29,9 @@ import ( //go:embed conf_test/complex.toml var ExpectedComplexToml string +//go:embed conf_test/complex_threshold.toml +var ExpectedThresholdToml string + //go:embed conf_test/complex_es_no_ver.toml var ExpectedComplexEsNoVerToml string @@ -337,6 +340,53 @@ var _ = Describe("Testing Complete Config Generation", func() { }, ExpectedConf: ExpectedComplexOTELToml, }), + + Entry("with complex spec with MaxRecordsPerSecond", testhelpers.ConfGenerateTest{ + Options: generator.Options{ + generator.ClusterTLSProfileSpec: tls.GetClusterTLSProfileSpec(nil), + }, + CLFSpec: logging.ClusterLogForwarderSpec{ + Inputs: []logging.InputSpec{ + { + Name: "limited-rates-1", + Application: &logging.Application{ + Namespaces: []string{"test-project-1", "test-project-2"}, + ContainerLimit: &logging.LimitSpec{ + MaxRecordsPerSecond: 0, + }, + }, + }, + { + Name: "app-logs", + Application: &logging.Application{ + Namespaces: []string{"test-project-3", "test-project-4"}, + }, + }, + { + Name: "limited-rates-2", + Application: &logging.Application{ + Namespaces: []string{"test-project-5"}, + ContainerLimit: &logging.LimitSpec{ + MaxRecordsPerSecond: 10, + }, + }, + }, + }, + Pipelines: []logging.PipelineSpec{ + { + InputRefs: []string{ + "limited-rates-1", + "limited-rates-2", + "app-logs", + logging.InputNameInfrastructure, + }, + OutputRefs: []string{"default"}, + Name: "to-default", + }, + }, + }, + ExpectedConf: ExpectedThresholdToml, + }), ) DescribeTable("Generate full vector.toml with custom data dir", namedForwarder, diff --git a/internal/generator/vector/conf_test/complex_threshold.toml b/internal/generator/vector/conf_test/complex_threshold.toml new file mode 100644 index 0000000000..154e3258f6 --- /dev/null +++ b/internal/generator/vector/conf_test/complex_threshold.toml @@ -0,0 +1,246 @@ +expire_metrics_secs = 60 + + +[api] +enabled = true + +# Logs from containers (including openshift containers) +[sources.raw_container_logs] +type = "kubernetes_logs" +max_read_bytes = 3145728 +glob_minimum_cooldown_ms = 15000 +auto_partial_merge = true +exclude_paths_glob_patterns = ["/var/log/pods/openshift-logging_logfilesmetricexporter-*/*/*.log", "/var/log/pods/openshift-logging_elasticsearch-*/*/*.log", "/var/log/pods/openshift-logging_*/loki*/*.log", "/var/log/pods/openshift-logging_kibana-*/*/*.log", "/var/log/pods/openshift-logging_*/gateway/*.log", "/var/log/pods/openshift-logging_*/opa/*.log", "/var/log/pods/*/*/*.gz", "/var/log/pods/*/*/*.tmp"] +pod_annotation_fields.pod_labels = "kubernetes.labels" +pod_annotation_fields.pod_namespace = "kubernetes.namespace_name" +pod_annotation_fields.pod_annotations = "kubernetes.annotations" +pod_annotation_fields.pod_uid = "kubernetes.pod_id" +pod_annotation_fields.pod_node_name = "hostname" +namespace_annotation_fields.namespace_uid = "kubernetes.namespace_id" +rotate_wait_ms = 5000 + +[sources.raw_journal_logs] +type = "journald" +journal_directory = "/var/log/journal" + +[sources.internal_metrics] +type = "internal_metrics" + +[transforms.container_logs] +type = "remap" +inputs = ["raw_container_logs"] +source = ''' + .openshift.cluster_id = "${OPENSHIFT_CLUSTER_ID:-}" + if !exists(.level) { + .level = "default" + if match!(.message, r'Warning|WARN|^W[0-9]+|level=warn|Value:warn|"level":"warn"|') { + .level = "warn" + } else if match!(.message, r'Error|ERROR|^E[0-9]+|level=error|Value:error|"level":"error"|') { + .level = "error" + } else if match!(.message, r'Critical|CRITICAL|^C[0-9]+|level=critical|Value:critical|"level":"critical"|') { + .level = "critical" + } else if match!(.message, r'Debug|DEBUG|^D[0-9]+|level=debug|Value:debug|"level":"debug"|') { + .level = "debug" + } else if match!(.message, r'Notice|NOTICE|^N[0-9]+|level=notice|Value:notice|"level":"notice"|') { + .level = "notice" + } else if match!(.message, r'Alert|ALERT|^A[0-9]+|level=alert|Value:alert|"level":"alert"|') { + .level = "alert" + } else if match!(.message, r'Emergency|EMERGENCY|^EM[0-9]+|level=emergency|Value:emergency|"level":"emergency"|') { + .level = "emergency" + } else if match!(.message, r'(?i)\b(?:info)\b|^I[0-9]+|level=info|Value:info|"level":"info"|') { + .level = "info" + } + } + pod_name = string!(.kubernetes.pod_name) + if starts_with(pod_name, "eventrouter-") { + parsed, err = parse_json(.message) + if err != null { + log("Unable to process EventRouter log: " + err, level: "info") + } else { + ., err = merge(.,parsed) + if err == null && exists(.event) && is_object(.event) { + if exists(.verb) { + .event.verb = .verb + del(.verb) + } + .kubernetes.event = del(.event) + .message = del(.kubernetes.event.message) + set!(., ["@timestamp"], .kubernetes.event.metadata.creationTimestamp) + del(.kubernetes.event.metadata.creationTimestamp) + . = compact(., nullish: true) + } else { + log("Unable to merge EventRouter log message into record: " + err, level: "info") + } + } + } + del(.source_type) + del(.stream) + del(.kubernetes.pod_ips) + del(.kubernetes.node_labels) + del(.timestamp_end) + ts = del(.timestamp); if !exists(."@timestamp") {."@timestamp" = ts} +''' + +[transforms.drop_journal_logs] +type = "filter" +inputs = ["raw_journal_logs"] +condition = ".PRIORITY != \"7\" && .PRIORITY != 7" + +[transforms.journal_logs] +type = "remap" +inputs = ["drop_journal_logs"] +source = ''' + .openshift.cluster_id = "${OPENSHIFT_CLUSTER_ID:-}" + + .tag = ".journal.system" + + del(.source_type) + del(._CPU_USAGE_NSEC) + del(.__REALTIME_TIMESTAMP) + del(.__MONOTONIC_TIMESTAMP) + del(._SOURCE_REALTIME_TIMESTAMP) + del(.JOB_RESULT) + del(.JOB_TYPE) + del(.TIMESTAMP_BOOTTIME) + del(.TIMESTAMP_MONOTONIC) + + if .PRIORITY == "8" || .PRIORITY == 8 { + .level = "trace" + } else { + priority = to_int!(.PRIORITY) + .level, err = to_syslog_level(priority) + if err != null { + log("Unable to determine level from PRIORITY: " + err, level: "error") + log(., level: "error") + .level = "unknown" + } else { + del(.PRIORITY) + } + } + + .hostname = del(.host) + + # systemd’s kernel-specific metadata. + # .systemd.k = {} + if exists(.KERNEL_DEVICE) { .systemd.k.KERNEL_DEVICE = del(.KERNEL_DEVICE) } + if exists(.KERNEL_SUBSYSTEM) { .systemd.k.KERNEL_SUBSYSTEM = del(.KERNEL_SUBSYSTEM) } + if exists(.UDEV_DEVLINK) { .systemd.k.UDEV_DEVLINK = del(.UDEV_DEVLINK) } + if exists(.UDEV_DEVNODE) { .systemd.k.UDEV_DEVNODE = del(.UDEV_DEVNODE) } + if exists(.UDEV_SYSNAME) { .systemd.k.UDEV_SYSNAME = del(.UDEV_SYSNAME) } + + # trusted journal fields, fields that are implicitly added by the journal and cannot be altered by client code. + .systemd.t = {} + if exists(._AUDIT_LOGINUID) { .systemd.t.AUDIT_LOGINUID = del(._AUDIT_LOGINUID) } + if exists(._BOOT_ID) { .systemd.t.BOOT_ID = del(._BOOT_ID) } + if exists(._AUDIT_SESSION) { .systemd.t.AUDIT_SESSION = del(._AUDIT_SESSION) } + if exists(._CAP_EFFECTIVE) { .systemd.t.CAP_EFFECTIVE = del(._CAP_EFFECTIVE) } + if exists(._CMDLINE) { .systemd.t.CMDLINE = del(._CMDLINE) } + if exists(._COMM) { .systemd.t.COMM = del(._COMM) } + if exists(._EXE) { .systemd.t.EXE = del(._EXE) } + if exists(._GID) { .systemd.t.GID = del(._GID) } + if exists(._HOSTNAME) { .systemd.t.HOSTNAME = .hostname } + if exists(._LINE_BREAK) { .systemd.t.LINE_BREAK = del(._LINE_BREAK) } + if exists(._MACHINE_ID) { .systemd.t.MACHINE_ID = del(._MACHINE_ID) } + if exists(._PID) { .systemd.t.PID = del(._PID) } + if exists(._SELINUX_CONTEXT) { .systemd.t.SELINUX_CONTEXT = del(._SELINUX_CONTEXT) } + if exists(._SOURCE_REALTIME_TIMESTAMP) { .systemd.t.SOURCE_REALTIME_TIMESTAMP = del(._SOURCE_REALTIME_TIMESTAMP) } + if exists(._STREAM_ID) { .systemd.t.STREAM_ID = ._STREAM_ID } + if exists(._SYSTEMD_CGROUP) { .systemd.t.SYSTEMD_CGROUP = del(._SYSTEMD_CGROUP) } + if exists(._SYSTEMD_INVOCATION_ID) {.systemd.t.SYSTEMD_INVOCATION_ID = ._SYSTEMD_INVOCATION_ID} + if exists(._SYSTEMD_OWNER_UID) { .systemd.t.SYSTEMD_OWNER_UID = del(._SYSTEMD_OWNER_UID) } + if exists(._SYSTEMD_SESSION) { .systemd.t.SYSTEMD_SESSION = del(._SYSTEMD_SESSION) } + if exists(._SYSTEMD_SLICE) { .systemd.t.SYSTEMD_SLICE = del(._SYSTEMD_SLICE) } + if exists(._SYSTEMD_UNIT) { .systemd.t.SYSTEMD_UNIT = del(._SYSTEMD_UNIT) } + if exists(._SYSTEMD_USER_UNIT) { .systemd.t.SYSTEMD_USER_UNIT = del(._SYSTEMD_USER_UNIT) } + if exists(._TRANSPORT) { .systemd.t.TRANSPORT = del(._TRANSPORT) } + if exists(._UID) { .systemd.t.UID = del(._UID) } + + # fields that are directly passed from clients and stored in the journal. + .systemd.u = {} + if exists(.CODE_FILE) { .systemd.u.CODE_FILE = del(.CODE_FILE) } + if exists(.CODE_FUNC) { .systemd.u.CODE_FUNCTION = del(.CODE_FUNC) } + if exists(.CODE_LINE) { .systemd.u.CODE_LINE = del(.CODE_LINE) } + if exists(.ERRNO) { .systemd.u.ERRNO = del(.ERRNO) } + if exists(.MESSAGE_ID) { .systemd.u.MESSAGE_ID = del(.MESSAGE_ID) } + if exists(.SYSLOG_FACILITY) { .systemd.u.SYSLOG_FACILITY = del(.SYSLOG_FACILITY) } + if exists(.SYSLOG_IDENTIFIER) { .systemd.u.SYSLOG_IDENTIFIER = del(.SYSLOG_IDENTIFIER) } + if exists(.SYSLOG_PID) { .systemd.u.SYSLOG_PID = del(.SYSLOG_PID) } + if exists(.RESULT) { .systemd.u.RESULT = del(.RESULT) } + if exists(.UNIT) { .systemd.u.UNIT = del(.UNIT) } + + .time = format_timestamp!(.timestamp, format: "%FT%T%:z") + + ts = del(.timestamp); if !exists(."@timestamp") {."@timestamp" = ts} +''' + +[transforms.route_container_logs] +type = "route" +inputs = ["container_logs"] +route.app = '!((starts_with!(.kubernetes.namespace_name,"kube-")) || (starts_with!(.kubernetes.namespace_name,"openshift-")) || (.kubernetes.namespace_name == "default") || (.kubernetes.namespace_name == "openshift") || (.kubernetes.namespace_name == "kube"))' +route.infra = '(starts_with!(.kubernetes.namespace_name,"kube-")) || (starts_with!(.kubernetes.namespace_name,"openshift-")) || (.kubernetes.namespace_name == "default") || (.kubernetes.namespace_name == "openshift") || (.kubernetes.namespace_name == "kube")' + +# Set log_type to "application" +[transforms.application] +type = "remap" +inputs = ["route_container_logs.app"] +source = ''' + .log_type = "application" +''' + +# Set log_type to "infrastructure" +[transforms.infrastructure] +type = "remap" +inputs = ["route_container_logs.infra","journal_logs"] +source = ''' + .log_type = "infrastructure" +''' + +[transforms.route_application_logs] +type = "route" +inputs = ["application"] +route.app-logs = '(.kubernetes.namespace_name == "test-project-3") || (.kubernetes.namespace_name == "test-project-4")' +route.limited-rates-1 = '(.kubernetes.namespace_name == "test-project-1") || (.kubernetes.namespace_name == "test-project-2")' +route.limited-rates-2 = '.kubernetes.namespace_name == "test-project-5"' + + +[transforms.source_throttle_limited-rates-1] +type = "throttle" +inputs = ["route_application_logs.limited-rates-1"] +window_secs = 1 +threshold = 0 +key_field = "{{ file }}" + + +[transforms.source_throttle_limited-rates-2] +type = "throttle" +inputs = ["route_application_logs.limited-rates-2"] +window_secs = 1 +threshold = 10 +key_field = "{{ file }}" + +[transforms.to_default_user_defined] +type = "remap" +inputs = ["source_throttle_limited-rates-1","source_throttle_limited-rates-2","route_application_logs.app-logs","infrastructure"] +source = ''' + . +''' + +[transforms.add_nodename_to_metric] +type = "remap" +inputs = ["internal_metrics"] +source = ''' +.tags.hostname = get_env_var!("VECTOR_SELF_NODE_NAME") +''' + +[sinks.prometheus_output] +type = "prometheus_exporter" +inputs = ["add_nodename_to_metric"] +address = "[::]:24231" +default_namespace = "collector" + +[sinks.prometheus_output.tls] +enabled = true +key_file = "/etc/collector/metrics/tls.key" +crt_file = "/etc/collector/metrics/tls.crt" +min_tls_version = "VersionTLS12" +ciphersuites = "TLS_AES_128_GCM_SHA256,TLS_AES_256_GCM_SHA384,TLS_CHACHA20_POLY1305_SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-CHACHA20-POLY1305,ECDHE-RSA-CHACHA20-POLY1305,DHE-RSA-AES128-GCM-SHA256,DHE-RSA-AES256-GCM-SHA384" \ No newline at end of file diff --git a/internal/generator/vector/inputs.go b/internal/generator/vector/inputs.go index 8b6b5af8df..f43ceb929d 100644 --- a/internal/generator/vector/inputs.go +++ b/internal/generator/vector/inputs.go @@ -151,7 +151,7 @@ func Inputs(spec *logging.ClusterLogForwarderSpec, o Options) []Element { userDefined := spec.InputMap() for _, inRef := range keys { - if input, ok := userDefined[inRef]; ok && input.HasPolicy() && input.GetMaxRecordsPerSecond() > 0 { + if input, ok := userDefined[inRef]; ok && input.HasPolicy() && input.GetMaxRecordsPerSecond() >= 0 { // Vector Throttle component cannot have zero threshold el = append(el, AddThrottle(input)...) } diff --git a/internal/generator/vector/pipelines.go b/internal/generator/vector/pipelines.go index 592307e9fe..c65ce6e461 100644 --- a/internal/generator/vector/pipelines.go +++ b/internal/generator/vector/pipelines.go @@ -28,7 +28,7 @@ func Pipelines(spec *logging.ClusterLogForwarderSpec, op generator.Options) []ge for _, inputName := range p.InputRefs { input, isUserDefined := userDefinedInputs[inputName] if isUserDefined { - if input.HasPolicy() && input.GetMaxRecordsPerSecond() > 0 { + if input.HasPolicy() && input.GetMaxRecordsPerSecond() >= 0 { // if threshold is zero, then don't add to pipeline inputs = append(inputs, fmt.Sprintf(UserDefinedSourceThrottle, input.Name)) } else {