Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions internal/generator/vector/conf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ import (
//go:embed conf_test/complex.toml
var ExpectedComplexToml string

//go:embed conf_test/complex_threshold.toml
var ExpectedThresholdToml string

//go:embed conf_test/complex_es_no_ver.toml
var ExpectedComplexEsNoVerToml string

Expand Down Expand Up @@ -337,6 +340,53 @@ var _ = Describe("Testing Complete Config Generation", func() {
},
ExpectedConf: ExpectedComplexOTELToml,
}),

Entry("with complex spec with MaxRecordsPerSecond", testhelpers.ConfGenerateTest{
Options: generator.Options{
generator.ClusterTLSProfileSpec: tls.GetClusterTLSProfileSpec(nil),
},
CLFSpec: logging.ClusterLogForwarderSpec{
Inputs: []logging.InputSpec{
{
Name: "limited-rates-1",
Application: &logging.Application{
Namespaces: []string{"test-project-1", "test-project-2"},
ContainerLimit: &logging.LimitSpec{
MaxRecordsPerSecond: 0,
},
},
},
{
Name: "app-logs",
Application: &logging.Application{
Namespaces: []string{"test-project-3", "test-project-4"},
},
},
{
Name: "limited-rates-2",
Application: &logging.Application{
Namespaces: []string{"test-project-5"},
ContainerLimit: &logging.LimitSpec{
MaxRecordsPerSecond: 10,
},
},
},
},
Pipelines: []logging.PipelineSpec{
{
InputRefs: []string{
"limited-rates-1",
"limited-rates-2",
"app-logs",
logging.InputNameInfrastructure,
},
OutputRefs: []string{"default"},
Name: "to-default",
},
},
},
ExpectedConf: ExpectedThresholdToml,
}),
)

DescribeTable("Generate full vector.toml with custom data dir", namedForwarder,
Expand Down
246 changes: 246 additions & 0 deletions internal/generator/vector/conf_test/complex_threshold.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
expire_metrics_secs = 60


[api]
enabled = true

# Logs from containers (including openshift containers)
[sources.raw_container_logs]
type = "kubernetes_logs"
max_read_bytes = 3145728
glob_minimum_cooldown_ms = 15000
auto_partial_merge = true
exclude_paths_glob_patterns = ["/var/log/pods/openshift-logging_logfilesmetricexporter-*/*/*.log", "/var/log/pods/openshift-logging_elasticsearch-*/*/*.log", "/var/log/pods/openshift-logging_*/loki*/*.log", "/var/log/pods/openshift-logging_kibana-*/*/*.log", "/var/log/pods/openshift-logging_*/gateway/*.log", "/var/log/pods/openshift-logging_*/opa/*.log", "/var/log/pods/*/*/*.gz", "/var/log/pods/*/*/*.tmp"]
pod_annotation_fields.pod_labels = "kubernetes.labels"
pod_annotation_fields.pod_namespace = "kubernetes.namespace_name"
pod_annotation_fields.pod_annotations = "kubernetes.annotations"
pod_annotation_fields.pod_uid = "kubernetes.pod_id"
pod_annotation_fields.pod_node_name = "hostname"
namespace_annotation_fields.namespace_uid = "kubernetes.namespace_id"
rotate_wait_ms = 5000

[sources.raw_journal_logs]
type = "journald"
journal_directory = "/var/log/journal"

[sources.internal_metrics]
type = "internal_metrics"

[transforms.container_logs]
type = "remap"
inputs = ["raw_container_logs"]
source = '''
.openshift.cluster_id = "${OPENSHIFT_CLUSTER_ID:-}"
if !exists(.level) {
.level = "default"
if match!(.message, r'Warning|WARN|^W[0-9]+|level=warn|Value:warn|"level":"warn"|<warn>') {
.level = "warn"
} else if match!(.message, r'Error|ERROR|^E[0-9]+|level=error|Value:error|"level":"error"|<error>') {
.level = "error"
} else if match!(.message, r'Critical|CRITICAL|^C[0-9]+|level=critical|Value:critical|"level":"critical"|<critical>') {
.level = "critical"
} else if match!(.message, r'Debug|DEBUG|^D[0-9]+|level=debug|Value:debug|"level":"debug"|<debug>') {
.level = "debug"
} else if match!(.message, r'Notice|NOTICE|^N[0-9]+|level=notice|Value:notice|"level":"notice"|<notice>') {
.level = "notice"
} else if match!(.message, r'Alert|ALERT|^A[0-9]+|level=alert|Value:alert|"level":"alert"|<alert>') {
.level = "alert"
} else if match!(.message, r'Emergency|EMERGENCY|^EM[0-9]+|level=emergency|Value:emergency|"level":"emergency"|<emergency>') {
.level = "emergency"
} else if match!(.message, r'(?i)\b(?:info)\b|^I[0-9]+|level=info|Value:info|"level":"info"|<info>') {
.level = "info"
}
}
pod_name = string!(.kubernetes.pod_name)
if starts_with(pod_name, "eventrouter-") {
parsed, err = parse_json(.message)
if err != null {
log("Unable to process EventRouter log: " + err, level: "info")
} else {
., err = merge(.,parsed)
if err == null && exists(.event) && is_object(.event) {
if exists(.verb) {
.event.verb = .verb
del(.verb)
}
.kubernetes.event = del(.event)
.message = del(.kubernetes.event.message)
set!(., ["@timestamp"], .kubernetes.event.metadata.creationTimestamp)
del(.kubernetes.event.metadata.creationTimestamp)
. = compact(., nullish: true)
} else {
log("Unable to merge EventRouter log message into record: " + err, level: "info")
}
}
}
del(.source_type)
del(.stream)
del(.kubernetes.pod_ips)
del(.kubernetes.node_labels)
del(.timestamp_end)
ts = del(.timestamp); if !exists(."@timestamp") {."@timestamp" = ts}
'''

[transforms.drop_journal_logs]
type = "filter"
inputs = ["raw_journal_logs"]
condition = ".PRIORITY != \"7\" && .PRIORITY != 7"

[transforms.journal_logs]
type = "remap"
inputs = ["drop_journal_logs"]
source = '''
.openshift.cluster_id = "${OPENSHIFT_CLUSTER_ID:-}"

.tag = ".journal.system"

del(.source_type)
del(._CPU_USAGE_NSEC)
del(.__REALTIME_TIMESTAMP)
del(.__MONOTONIC_TIMESTAMP)
del(._SOURCE_REALTIME_TIMESTAMP)
del(.JOB_RESULT)
del(.JOB_TYPE)
del(.TIMESTAMP_BOOTTIME)
del(.TIMESTAMP_MONOTONIC)

if .PRIORITY == "8" || .PRIORITY == 8 {
.level = "trace"
} else {
priority = to_int!(.PRIORITY)
.level, err = to_syslog_level(priority)
if err != null {
log("Unable to determine level from PRIORITY: " + err, level: "error")
log(., level: "error")
.level = "unknown"
} else {
del(.PRIORITY)
}
}

.hostname = del(.host)

# systemd’s kernel-specific metadata.
# .systemd.k = {}
if exists(.KERNEL_DEVICE) { .systemd.k.KERNEL_DEVICE = del(.KERNEL_DEVICE) }
if exists(.KERNEL_SUBSYSTEM) { .systemd.k.KERNEL_SUBSYSTEM = del(.KERNEL_SUBSYSTEM) }
if exists(.UDEV_DEVLINK) { .systemd.k.UDEV_DEVLINK = del(.UDEV_DEVLINK) }
if exists(.UDEV_DEVNODE) { .systemd.k.UDEV_DEVNODE = del(.UDEV_DEVNODE) }
if exists(.UDEV_SYSNAME) { .systemd.k.UDEV_SYSNAME = del(.UDEV_SYSNAME) }

# trusted journal fields, fields that are implicitly added by the journal and cannot be altered by client code.
.systemd.t = {}
if exists(._AUDIT_LOGINUID) { .systemd.t.AUDIT_LOGINUID = del(._AUDIT_LOGINUID) }
if exists(._BOOT_ID) { .systemd.t.BOOT_ID = del(._BOOT_ID) }
if exists(._AUDIT_SESSION) { .systemd.t.AUDIT_SESSION = del(._AUDIT_SESSION) }
if exists(._CAP_EFFECTIVE) { .systemd.t.CAP_EFFECTIVE = del(._CAP_EFFECTIVE) }
if exists(._CMDLINE) { .systemd.t.CMDLINE = del(._CMDLINE) }
if exists(._COMM) { .systemd.t.COMM = del(._COMM) }
if exists(._EXE) { .systemd.t.EXE = del(._EXE) }
if exists(._GID) { .systemd.t.GID = del(._GID) }
if exists(._HOSTNAME) { .systemd.t.HOSTNAME = .hostname }
if exists(._LINE_BREAK) { .systemd.t.LINE_BREAK = del(._LINE_BREAK) }
if exists(._MACHINE_ID) { .systemd.t.MACHINE_ID = del(._MACHINE_ID) }
if exists(._PID) { .systemd.t.PID = del(._PID) }
if exists(._SELINUX_CONTEXT) { .systemd.t.SELINUX_CONTEXT = del(._SELINUX_CONTEXT) }
if exists(._SOURCE_REALTIME_TIMESTAMP) { .systemd.t.SOURCE_REALTIME_TIMESTAMP = del(._SOURCE_REALTIME_TIMESTAMP) }
if exists(._STREAM_ID) { .systemd.t.STREAM_ID = ._STREAM_ID }
if exists(._SYSTEMD_CGROUP) { .systemd.t.SYSTEMD_CGROUP = del(._SYSTEMD_CGROUP) }
if exists(._SYSTEMD_INVOCATION_ID) {.systemd.t.SYSTEMD_INVOCATION_ID = ._SYSTEMD_INVOCATION_ID}
if exists(._SYSTEMD_OWNER_UID) { .systemd.t.SYSTEMD_OWNER_UID = del(._SYSTEMD_OWNER_UID) }
if exists(._SYSTEMD_SESSION) { .systemd.t.SYSTEMD_SESSION = del(._SYSTEMD_SESSION) }
if exists(._SYSTEMD_SLICE) { .systemd.t.SYSTEMD_SLICE = del(._SYSTEMD_SLICE) }
if exists(._SYSTEMD_UNIT) { .systemd.t.SYSTEMD_UNIT = del(._SYSTEMD_UNIT) }
if exists(._SYSTEMD_USER_UNIT) { .systemd.t.SYSTEMD_USER_UNIT = del(._SYSTEMD_USER_UNIT) }
if exists(._TRANSPORT) { .systemd.t.TRANSPORT = del(._TRANSPORT) }
if exists(._UID) { .systemd.t.UID = del(._UID) }

# fields that are directly passed from clients and stored in the journal.
.systemd.u = {}
if exists(.CODE_FILE) { .systemd.u.CODE_FILE = del(.CODE_FILE) }
if exists(.CODE_FUNC) { .systemd.u.CODE_FUNCTION = del(.CODE_FUNC) }
if exists(.CODE_LINE) { .systemd.u.CODE_LINE = del(.CODE_LINE) }
if exists(.ERRNO) { .systemd.u.ERRNO = del(.ERRNO) }
if exists(.MESSAGE_ID) { .systemd.u.MESSAGE_ID = del(.MESSAGE_ID) }
if exists(.SYSLOG_FACILITY) { .systemd.u.SYSLOG_FACILITY = del(.SYSLOG_FACILITY) }
if exists(.SYSLOG_IDENTIFIER) { .systemd.u.SYSLOG_IDENTIFIER = del(.SYSLOG_IDENTIFIER) }
if exists(.SYSLOG_PID) { .systemd.u.SYSLOG_PID = del(.SYSLOG_PID) }
if exists(.RESULT) { .systemd.u.RESULT = del(.RESULT) }
if exists(.UNIT) { .systemd.u.UNIT = del(.UNIT) }

.time = format_timestamp!(.timestamp, format: "%FT%T%:z")

ts = del(.timestamp); if !exists(."@timestamp") {."@timestamp" = ts}
'''

[transforms.route_container_logs]
type = "route"
inputs = ["container_logs"]
route.app = '!((starts_with!(.kubernetes.namespace_name,"kube-")) || (starts_with!(.kubernetes.namespace_name,"openshift-")) || (.kubernetes.namespace_name == "default") || (.kubernetes.namespace_name == "openshift") || (.kubernetes.namespace_name == "kube"))'
route.infra = '(starts_with!(.kubernetes.namespace_name,"kube-")) || (starts_with!(.kubernetes.namespace_name,"openshift-")) || (.kubernetes.namespace_name == "default") || (.kubernetes.namespace_name == "openshift") || (.kubernetes.namespace_name == "kube")'

# Set log_type to "application"
[transforms.application]
type = "remap"
inputs = ["route_container_logs.app"]
source = '''
.log_type = "application"
'''

# Set log_type to "infrastructure"
[transforms.infrastructure]
type = "remap"
inputs = ["route_container_logs.infra","journal_logs"]
source = '''
.log_type = "infrastructure"
'''

[transforms.route_application_logs]
type = "route"
inputs = ["application"]
route.app-logs = '(.kubernetes.namespace_name == "test-project-3") || (.kubernetes.namespace_name == "test-project-4")'
route.limited-rates-1 = '(.kubernetes.namespace_name == "test-project-1") || (.kubernetes.namespace_name == "test-project-2")'
route.limited-rates-2 = '.kubernetes.namespace_name == "test-project-5"'


[transforms.source_throttle_limited-rates-1]
type = "throttle"
inputs = ["route_application_logs.limited-rates-1"]
window_secs = 1
threshold = 0
key_field = "{{ file }}"


[transforms.source_throttle_limited-rates-2]
type = "throttle"
inputs = ["route_application_logs.limited-rates-2"]
window_secs = 1
threshold = 10
key_field = "{{ file }}"

[transforms.to_default_user_defined]
type = "remap"
inputs = ["source_throttle_limited-rates-1","source_throttle_limited-rates-2","route_application_logs.app-logs","infrastructure"]
source = '''
.
'''

[transforms.add_nodename_to_metric]
type = "remap"
inputs = ["internal_metrics"]
source = '''
.tags.hostname = get_env_var!("VECTOR_SELF_NODE_NAME")
'''

[sinks.prometheus_output]
type = "prometheus_exporter"
inputs = ["add_nodename_to_metric"]
address = "[::]:24231"
default_namespace = "collector"

[sinks.prometheus_output.tls]
enabled = true
key_file = "/etc/collector/metrics/tls.key"
crt_file = "/etc/collector/metrics/tls.crt"
min_tls_version = "VersionTLS12"
ciphersuites = "TLS_AES_128_GCM_SHA256,TLS_AES_256_GCM_SHA384,TLS_CHACHA20_POLY1305_SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-CHACHA20-POLY1305,ECDHE-RSA-CHACHA20-POLY1305,DHE-RSA-AES128-GCM-SHA256,DHE-RSA-AES256-GCM-SHA384"
2 changes: 1 addition & 1 deletion internal/generator/vector/inputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func Inputs(spec *logging.ClusterLogForwarderSpec, o Options) []Element {

userDefined := spec.InputMap()
for _, inRef := range keys {
if input, ok := userDefined[inRef]; ok && input.HasPolicy() && input.GetMaxRecordsPerSecond() > 0 {
if input, ok := userDefined[inRef]; ok && input.HasPolicy() && input.GetMaxRecordsPerSecond() >= 0 {
// Vector Throttle component cannot have zero threshold
el = append(el, AddThrottle(input)...)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/generator/vector/pipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func Pipelines(spec *logging.ClusterLogForwarderSpec, op generator.Options) []ge
for _, inputName := range p.InputRefs {
input, isUserDefined := userDefinedInputs[inputName]
if isUserDefined {
if input.HasPolicy() && input.GetMaxRecordsPerSecond() > 0 {
if input.HasPolicy() && input.GetMaxRecordsPerSecond() >= 0 {
// if threshold is zero, then don't add to pipeline
inputs = append(inputs, fmt.Sprintf(UserDefinedSourceThrottle, input.Name))
} else {
Expand Down