Skip to content

Commit

Permalink
LOG-2826: optimize fluent conf for throughput
Browse files Browse the repository at this point in the history
  • Loading branch information
jcantrill authored and openshift-cherrypick-robot committed Aug 11, 2022
1 parent 4687642 commit e1246c5
Show file tree
Hide file tree
Showing 15 changed files with 339 additions and 1,142 deletions.
6 changes: 2 additions & 4 deletions hack/testing-olm/test-040-eventrouter.sh
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,7 @@ spec:
memory: 1Gi
cpu: 100m
collection:
logs:
type: "fluentd"
fluentd: {}
type: "fluentd"
EOL

deploy_eventrouter
Expand All @@ -152,7 +150,7 @@ espod=$(oc -n $LOGGING_NS get pods -l component=elasticsearch -o jsonpath={.item
os::log::info "Testing Elasticsearch pod ${espod}..."
os::cmd::try_until_text "oc -n $LOGGING_NS exec -c elasticsearch ${espod} -- es_util --query=/ --request HEAD --head --output /dev/null --write-out %{response_code}" "200" "$(( 1*$minute ))"

warn_nonformatted 'infra'
warn_nonformatted 'infra-*'

evpod=$(oc -n $LOGGING_NS get pods -l component=eventrouter -o jsonpath={.items[0].metadata.name})

Expand Down
1 change: 1 addition & 0 deletions internal/collector/fluentd/visitors.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func CollectorVisitor(collectorContainer *v1.Container, podSpec *v1.PodSpec) {
},
},
},
v1.EnvVar{Name: "RUBY_GC_HEAP_OLDOBJECT_LIMIT_FACTOR", Value: "0.9"},
)
collectorContainer.VolumeMounts = append(collectorContainer.VolumeMounts,
v1.VolumeMount{Name: certsVolumeName, ReadOnly: true, MountPath: certsVolumePath},
Expand Down
4 changes: 0 additions & 4 deletions internal/generator/fluentd/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@ func Conf(clspec *logging.CollectionSpec, secrets map[string]*corev1.Secret, clf
Sources(clspec, clfspec, op),
"Set of all input sources",
},
{
PrometheusMetrics(clfspec, op),
"Section to add measurement, and dispatch to Concat or Ingress pipelines",
},
{
Concat(clfspec, op),
`Concat pipeline section`,
Expand Down
159 changes: 26 additions & 133 deletions internal/generator/fluentd/conf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ var _ = Describe("Testing Complete Config Generation", func() {
<source>
@type systemd
@id systemd-input
@label @MEASURE
@label @INGRESS
path '/var/log/journal'
<storage>
@type local
Expand All @@ -145,36 +145,28 @@ var _ = Describe("Testing Complete Config Generation", func() {
<source>
@type tail
@id container-input
path "/var/log/pods/**/*.log"
exclude_path ["/var/log/pods/openshift-logging_collector-*/*/*.log", "/var/log/pods/openshift-logging_elasticsearch-*/*/*.log", "/var/log/pods/openshift-logging_kibana-*/*/*.log"]
path "/var/log/pods/*/*/*.log"
exclude_path ["/var/log/pods/openshift-logging_collector-*/*/*.log", "/var/log/pods/openshift-logging_elasticsearch-*/*/*.log", "/var/log/pods/openshift-logging_kibana-*/*/*.log", "/var/log/pods/*/*/*.gz", "/var/log/pods/*/*/*.tmp"]
pos_file "/var/lib/fluentd/pos/es-containers.log.pos"
refresh_interval 5
rotate_wait 5
tag kubernetes.*
read_from_head "true"
skip_refresh_on_startup true
@label @MEASURE
@label @CONCAT
<parse>
@type multi_format
<pattern>
format json
time_format '%Y-%m-%dT%H:%M:%S.%N%Z'
keep_time_key true
</pattern>
<pattern>
format regexp
expression /^(?<time>[^\s]+) (?<stream>stdout|stderr)( (?<logtag>.))? (?<log>.*)$/
time_format '%Y-%m-%dT%H:%M:%S.%N%:z'
keep_time_key true
</pattern>
@type regexp
expression /^(?<@timestamp>[^\s]+) (?<stream>stdout|stderr) (?<logtag>[F|P]) (?<message>.*)$/
time_key '@timestamp'
keep_time_key true
</parse>
</source>
# linux audit logs
<source>
@type tail
@id audit-input
@label @MEASURE
@label @INGRESS
path "/var/log/audit/audit.log"
pos_file "/var/lib/fluentd/pos/audit.log.pos"
tag linux-audit.log
Expand All @@ -187,7 +179,7 @@ var _ = Describe("Testing Complete Config Generation", func() {
<source>
@type tail
@id k8s-audit-input
@label @MEASURE
@label @INGRESS
path "/var/log/kube-apiserver/audit.log"
pos_file "/var/lib/fluentd/pos/kube-apiserver.audit.log.pos"
tag k8s-audit.log
Expand All @@ -204,7 +196,7 @@ var _ = Describe("Testing Complete Config Generation", func() {
<source>
@type tail
@id openshift-audit-input
@label @MEASURE
@label @INGRESS
path /var/log/oauth-apiserver/audit.log,/var/log/openshift-apiserver/audit.log
pos_file /var/lib/fluentd/pos/oauth-apiserver.audit.log
tag openshift-audit.log
Expand All @@ -221,7 +213,7 @@ var _ = Describe("Testing Complete Config Generation", func() {
<source>
@type tail
@id ovn-audit-input
@label @MEASURE
@label @INGRESS
path "/var/log/ovn/acl-audit-log.log"
pos_file "/var/lib/fluentd/pos/acl-audit-log.pos"
tag ovn-audit.log
Expand All @@ -233,72 +225,11 @@ var _ = Describe("Testing Complete Config Generation", func() {
</parse>
</source>
# Increment Prometheus metrics
<label @MEASURE>
<filter **>
@type record_transformer
enable_ruby
<record>
msg_size ${record.to_s.length}
</record>
</filter>
<filter **>
@type prometheus
<metric>
name cluster_logging_collector_input_record_total
type counter
desc The total number of incoming records
<labels>
tag ${tag}
hostname ${hostname}
</labels>
</metric>
</filter>
<filter **>
@type prometheus
<metric>
name cluster_logging_collector_input_record_bytes
type counter
desc The total bytes of incoming records
key msg_size
<labels>
tag ${tag}
hostname ${hostname}
</labels>
</metric>
</filter>
<filter **>
@type record_transformer
remove_keys msg_size
</filter>
# Journal Logs go to INGRESS pipeline
<match journal>
@type relabel
@label @INGRESS
</match>
# Audit Logs go to INGRESS pipeline
<match *audit.log>
@type relabel
@label @INGRESS
</match>
# Kubernetes Logs go to CONCAT pipeline
<match kubernetes.**>
@type relabel
@label @CONCAT
</match>
</label>
# Concat log lines of container logs, and send to INGRESS pipeline
<label @CONCAT>
<filter kubernetes.**>
@type concat
key log
key message
partial_key logtag
partial_value P
separator ''
Expand Down Expand Up @@ -402,49 +333,23 @@ var _ = Describe("Testing Complete Config Generation", func() {
</rule>
</match>
# Invoke kubernetes apiserver to get kunbernetes metadata
# Invoke kubernetes apiserver to get kubernetes metadata
<filter kubernetes.**>
@id kubernetes-metadata
@type kubernetes_metadata
kubernetes_url 'https://kubernetes.default.svc'
annotation_match ["^containerType\.logging\.openshift\.io\/.*$"]
allow_orphans false
cache_size '1000'
de_dot false
use_journal 'nil'
ssl_partial_chain 'true'
</filter>
# Parse Json fields for container, journal and eventrouter logs
<filter kubernetes.journal.**>
@type parse_json_field
merge_json_log 'false'
preserve_json_log 'true'
json_fields 'log,MESSAGE'
</filter>
<filter kubernetes.var.log.pods.**>
@type parse_json_field
merge_json_log 'false'
preserve_json_log 'true'
json_fields 'log,MESSAGE'
</filter>
<filter kubernetes.var.log.pods.**_eventrouter-**>
@type parse_json_field
merge_json_log true
preserve_json_log true
json_fields 'log,MESSAGE'
</filter>
# Clean kibana log fields
<filter **kibana**>
@type record_transformer
enable_ruby
<record>
log ${record['err'] || record['msg'] || record['MESSAGE'] || record['log']}
</record>
remove_keys req,res,msg,name,level,v,pid,err
json_fields 'message'
</filter>
# Fix level field in audit logs
Expand All @@ -466,21 +371,12 @@ var _ = Describe("Testing Complete Config Generation", func() {
<filter **>
@type viaq_data_model
enable_flatten_labels true
elasticsearch_index_prefix_field 'viaq_index_name'
enable_prune_empty_fields false
default_keep_fields CEE,time,@timestamp,aushape,ci_job,collectd,docker,fedora-ci,file,foreman,geoip,hostname,ipaddr4,ipaddr6,kubernetes,level,message,namespace_name,namespace_uuid,offset,openstack,ovirt,pid,pipeline_metadata,rsyslog,service,systemd,tags,testcase,tlog,viaq_msg_id
extra_keep_fields ''
keep_empty_fields 'message'
use_undefined false
undefined_name 'undefined'
rename_time true
rename_time_if_missing false
src_time_name 'time'
dest_time_name '@timestamp'
pipeline_type 'collector'
undefined_to_string 'false'
undefined_dot_replace_char 'UNUSED'
undefined_max_num_fields '-1'
process_kubernetes_events 'false'
process_kubernetes_events false
<level>
name warn
match 'Warning|WARN|^W[0-9]+|level=warn|Value:warn|"level":"warn"'
Expand All @@ -501,31 +397,21 @@ var _ = Describe("Testing Complete Config Generation", func() {
name debug
match 'Debug|DEBUG|^D[0-9]+|level=debug|Value:debug|"level":"debug"'
</level>
<formatter>
tag "system.var.log**"
type sys_var_log
remove_keys host,pid,ident
</formatter>
<formatter>
tag "journal.system**"
type sys_journal
remove_keys log,stream,MESSAGE,_SOURCE_REALTIME_TIMESTAMP,__REALTIME_TIMESTAMP,CONTAINER_ID,CONTAINER_ID_FULL,CONTAINER_NAME,PRIORITY,_BOOT_ID,_CAP_EFFECTIVE,_CMDLINE,_COMM,_EXE,_GID,_HOSTNAME,_MACHINE_ID,_PID,_SELINUX_CONTEXT,_SYSTEMD_CGROUP,_SYSTEMD_SLICE,_SYSTEMD_UNIT,_TRANSPORT,_UID,_AUDIT_LOGINUID,_AUDIT_SESSION,_SYSTEMD_OWNER_UID,_SYSTEMD_SESSION,_SYSTEMD_USER_UNIT,CODE_FILE,CODE_FUNCTION,CODE_LINE,ERRNO,MESSAGE_ID,RESULT,UNIT,_KERNEL_DEVICE,_KERNEL_SUBSYSTEM,_UDEV_SYSNAME,_UDEV_DEVNODE,_UDEV_DEVLINK,SYSLOG_FACILITY,SYSLOG_IDENTIFIER,SYSLOG_PID
</formatter>
<formatter>
tag "kubernetes.journal.container**"
type k8s_journal
remove_keys 'log,stream,MESSAGE,_SOURCE_REALTIME_TIMESTAMP,__REALTIME_TIMESTAMP,CONTAINER_ID,CONTAINER_ID_FULL,CONTAINER_NAME,PRIORITY,_BOOT_ID,_CAP_EFFECTIVE,_CMDLINE,_COMM,_EXE,_GID,_HOSTNAME,_MACHINE_ID,_PID,_SELINUX_CONTEXT,_SYSTEMD_CGROUP,_SYSTEMD_SLICE,_SYSTEMD_UNIT,_TRANSPORT,_UID,_AUDIT_LOGINUID,_AUDIT_SESSION,_SYSTEMD_OWNER_UID,_SYSTEMD_SESSION,_SYSTEMD_USER_UNIT,CODE_FILE,CODE_FUNCTION,CODE_LINE,ERRNO,MESSAGE_ID,RESULT,UNIT,_KERNEL_DEVICE,_KERNEL_SUBSYSTEM,_UDEV_SYSNAME,_UDEV_DEVNODE,_UDEV_DEVLINK,SYSLOG_FACILITY,SYSLOG_IDENTIFIER,SYSLOG_PID'
</formatter>
<formatter>
tag "kubernetes.var.log.pods.**_eventrouter-** k8s-audit.log** openshift-audit.log** ovn-audit.log**"
type k8s_json_file
remove_keys log,stream,CONTAINER_ID_FULL,CONTAINER_NAME
remove_keys stream
process_kubernetes_events 'true'
</formatter>
<formatter>
tag "kubernetes.var.log.pods**"
type k8s_json_file
remove_keys log,stream,CONTAINER_ID_FULL,CONTAINER_NAME
remove_keys stream
</formatter>
</filter>
Expand Down Expand Up @@ -626,6 +512,10 @@ var _ = Describe("Testing Complete Config Generation", func() {
# Viaq Data Model
<filter **>
@type viaq_data_model
enable_openshift_model false
enable_prune_empty_fields false
rename_time false
undefined_dot_replace_char UNUSED
elasticsearch_index_prefix_field 'viaq_index_name'
<elasticsearch_index_name>
enabled 'true'
Expand All @@ -649,6 +539,9 @@ var _ = Describe("Testing Complete Config Generation", func() {
<filter **>
@type viaq_data_model
enable_prune_labels true
enable_openshift_model false
rename_time false
undefined_dot_replace_char UNUSED
prune_labels_exclusions app.kubernetes.io/name,app.kubernetes.io/instance,app.kubernetes.io/version,app.kubernetes.io/component,app.kubernetes.io/part-of,app.kubernetes.io/managed-by,app.kubernetes.io/created-by
</filter>
Expand Down

0 comments on commit e1246c5

Please sign in to comment.