Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 35 additions & 7 deletions pkg/generators/forwarding/fluentd/output_fluentd_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ func (olc *outputLabelConf) TotalLimitSize() string {

func (olc *outputLabelConf) OverflowAction() string {
if hasBufferConfig(olc.forwarder) {
return string(olc.forwarder.Fluentd.Buffer.OverflowAction)
oa := string(olc.forwarder.Fluentd.Buffer.OverflowAction)

if oa != "" {
return oa
}
}

switch olc.Target.Type {
Expand All @@ -56,23 +60,35 @@ func (olc *outputLabelConf) OverflowAction() string {

func (olc *outputLabelConf) FlushThreadCount() string {
if hasBufferConfig(olc.forwarder) {
return fmt.Sprintf("%d", olc.forwarder.Fluentd.Buffer.FlushThreadCount)
ftc := olc.forwarder.Fluentd.Buffer.FlushThreadCount

if ftc > 0 {
return fmt.Sprintf("%d", ftc)
}
}

return defaultFlushThreadCount
}

func (olc *outputLabelConf) FlushMode() string {
if hasBufferConfig(olc.forwarder) {
return string(olc.forwarder.Fluentd.Buffer.FlushMode)
fm := string(olc.forwarder.Fluentd.Buffer.FlushMode)

if fm != "" {
return fm
}
}

return defaultFlushMode
}

func (olc *outputLabelConf) FlushInterval() string {
if hasBufferConfig(olc.forwarder) {
return string(olc.forwarder.Fluentd.Buffer.FlushInterval)
fi := string(olc.forwarder.Fluentd.Buffer.FlushInterval)

if fi != "" {
return fi
}
}

switch olc.Target.Type {
Expand All @@ -85,23 +101,35 @@ func (olc *outputLabelConf) FlushInterval() string {

func (olc *outputLabelConf) RetryWait() string {
if hasBufferConfig(olc.forwarder) {
return string(olc.forwarder.Fluentd.Buffer.RetryWait)
rw := string(olc.forwarder.Fluentd.Buffer.RetryWait)

if rw != "" {
return rw
}
}

return defaultRetryWait
}

func (olc *outputLabelConf) RetryType() string {
if hasBufferConfig(olc.forwarder) {
return string(olc.forwarder.Fluentd.Buffer.RetryType)
rt := string(olc.forwarder.Fluentd.Buffer.RetryType)

if rt != "" {
return rt
}
}

return defaultRetryType
}

func (olc *outputLabelConf) RetryMaxInterval() string {
if hasBufferConfig(olc.forwarder) {
return string(olc.forwarder.Fluentd.Buffer.RetryMaxInterval)
rmi := string(olc.forwarder.Fluentd.Buffer.RetryMaxInterval)

if rmi != "" {
return rmi
}
}

return defaultRetryMaxInterval
Expand Down
121 changes: 121 additions & 0 deletions pkg/generators/forwarding/fluentd/output_fluentd_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,127 @@ var _ = Describe("Generating fluentd config", func() {
}
})

Context("for empty forwarder buffer spec", func() {
JustBeforeEach(func() {
var err error
generator, err = NewConfigGenerator(false, false, true)
Expect(err).To(BeNil())
Expect(generator).ToNot(BeNil())

customForwarderSpec = &loggingv1.ForwarderSpec{
Fluentd: &loggingv1.FluentdForwarderSpec{
Buffer: &loggingv1.FluentdBufferSpec{},
},
}

outputs = []loggingv1.OutputSpec{
{
Type: loggingv1.OutputTypeElasticsearch,
Name: "other-elasticsearch",
URL: "http://es.svc.messaging.cluster.local:9654",
},
}
})

It("should provide a default buffer configuration", func() {
esConf := `
<label @OTHER_ELASTICSEARCH>
<match retry_other_elasticsearch>
@type copy
<store>
@type elasticsearch
@id retry_other_elasticsearch
host es.svc.messaging.cluster.local
port 9654
verify_es_version_at_startup false
scheme http
target_index_key viaq_index_name
id_key viaq_msg_id
remove_keys viaq_index_name
user fluentd
password changeme

type_name _doc
write_operation create
reload_connections 'true'
# https://github.com/uken/fluent-plugin-elasticsearch#reload-after
reload_after '200'
# https://github.com/uken/fluent-plugin-elasticsearch#sniffer-class-name
sniffer_class_name 'Fluent::Plugin::ElasticsearchSimpleSniffer'
reload_on_failure false
# 2 ^ 31
request_timeout 2147483648
<buffer>
@type file
path '/var/lib/fluentd/retry_other_elasticsearch'
flush_mode interval
flush_interval 1s
flush_thread_count 2
flush_at_shutdown true
retry_type exponential_backoff
retry_wait 1s
retry_max_interval 300s
retry_forever true
queued_chunks_limit_size "#{ENV['BUFFER_QUEUE_LIMIT'] || '32' }"
total_limit_size "#{ENV['TOTAL_LIMIT_SIZE'] || 8589934592 }" #8G
chunk_limit_size "#{ENV['BUFFER_SIZE_LIMIT'] || '8m'}"
overflow_action block
</buffer>
</store>
</match>
<match **>
@type copy
<store>
@type elasticsearch
@id other_elasticsearch
host es.svc.messaging.cluster.local
port 9654
verify_es_version_at_startup false
scheme http
target_index_key viaq_index_name
id_key viaq_msg_id
remove_keys viaq_index_name
user fluentd
password changeme

type_name _doc
retry_tag retry_other_elasticsearch
write_operation create
reload_connections 'true'
# https://github.com/uken/fluent-plugin-elasticsearch#reload-after
reload_after '200'
# https://github.com/uken/fluent-plugin-elasticsearch#sniffer-class-name
sniffer_class_name 'Fluent::Plugin::ElasticsearchSimpleSniffer'
reload_on_failure false
# 2 ^ 31
request_timeout 2147483648
<buffer>
@type file
path '/var/lib/fluentd/other_elasticsearch'
flush_mode interval
flush_interval 1s
flush_thread_count 2
flush_at_shutdown true
retry_type exponential_backoff
retry_wait 1s
retry_max_interval 300s
retry_forever true
queued_chunks_limit_size "#{ENV['BUFFER_QUEUE_LIMIT'] || '32' }"
total_limit_size "#{ENV['TOTAL_LIMIT_SIZE'] || 8589934592 }" #8G
chunk_limit_size "#{ENV['BUFFER_SIZE_LIMIT'] || '8m'}"
overflow_action block
</buffer>
</store>
</match>
</label>`

results, err := generator.generateOutputLabelBlocks(outputs, defaultForwarderSpec)
Expect(err).To(BeNil())
Expect(len(results)).To(Equal(1))
Expect(results[0]).To(EqualTrimLines(esConf))
})
})

Context("for output elasticsearch", func() {
JustBeforeEach(func() {
var err error
Expand Down