diff --git a/CHANGELOG.md b/CHANGELOG.md index 88779ffc..cac004e9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 12.1.0 +- Add drop_error_types config option to not retry after certain error types [#1228](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1228) + ## 12.0.7 - Support both, encoded and non encoded api-key formats on plugin configuration [#1223](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1223) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index 1cb062d1..3611c04b 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -373,6 +373,7 @@ Please check out <> for details. | <> |<>|No | <> |<>|No | <> |<>|No +| <> |<>|No | <> | <>|No | <> |<>|No | <> |<>|No @@ -644,6 +645,24 @@ If you don't set a value for this option: - for elasticsearch clusters 8.x: no value will be used; - for elasticsearch clusters 7.x: the value of '_doc' will be used. +[id="plugins-{type}s-{plugin}-drop_error_types"] +===== `drop_error_types` + + * Value type is <> + * Default value is `[]` + +Lists the set of error types for which individual bulk request actions will not be retried. Unless an individual - document level - action returns 409 or an error from this list, failures will be retried indefinitely. +A warning message will be logged indicating that the action failed, unless the error type is +listed in the <> config option. +Note that the events are not added to the Dead Letter Queue (DLQ), regardless of whether it is enabled. + +[source,ruby] + output { + elasticsearch { + drop_error_types => ["index_closed_exception"] + } + } + [id="plugins-{type}s-{plugin}-ecs_compatibility"] ===== `ecs_compatibility` diff --git a/lib/logstash/plugin_mixins/elasticsearch/api_configs.rb b/lib/logstash/plugin_mixins/elasticsearch/api_configs.rb index 29cd4fb8..b8773b2e 100644 --- a/lib/logstash/plugin_mixins/elasticsearch/api_configs.rb +++ b/lib/logstash/plugin_mixins/elasticsearch/api_configs.rb @@ -204,6 +204,10 @@ module APIConfigs # if enabled, failed index name interpolation events go into dead letter queue. :dlq_on_failed_indexname_interpolation => { :validate => :boolean, :default => true }, + # Failures on actions from a bulk request will not be retried for these error types; the events will be dropped. + # The events won't be added to the DLQ either. + :drop_error_types => { :validate => :string, :list => true, :default => [] }, + # Obsolete Settings :ssl => { :obsolete => "Set 'ssl_enabled' instead." }, :ssl_certificate_verification => { :obsolete => "Set 'ssl_verification_mode' instead." }, diff --git a/lib/logstash/plugin_mixins/elasticsearch/common.rb b/lib/logstash/plugin_mixins/elasticsearch/common.rb index 977ef204..8e7f9240 100644 --- a/lib/logstash/plugin_mixins/elasticsearch/common.rb +++ b/lib/logstash/plugin_mixins/elasticsearch/common.rb @@ -278,16 +278,18 @@ def submit(actions) status = action_props["status"] error = action_props["error"] + type = error["type"] if error action = actions[idx] - # Retry logic: If it is success, we move on. If it is a failure, we have 3 paths: + # Retry logic: If it is success, we move on. If it is a failure, we have the following paths: # - For 409, we log and drop. there is nothing we can do + # - For any error types set in the 'drop_error_types' config, log and drop. # - For a mapping error, we send to dead letter queue for a human to intervene at a later point. # - For everything else there's mastercard. Yep, and we retry indefinitely. This should fix #572 and other transient network issues if DOC_SUCCESS_CODES.include?(status) @document_level_metrics.increment(:successes) next - elsif DOC_CONFLICT_CODE == status + elsif DOC_CONFLICT_CODE == status || @drop_error_types.include?(type) @document_level_metrics.increment(:non_retryable_failures) @logger.warn "Failed action", status: status, action: action, response: response if log_failure_type?(error) next diff --git a/logstash-output-elasticsearch.gemspec b/logstash-output-elasticsearch.gemspec index 8569ae72..f884bdeb 100644 --- a/logstash-output-elasticsearch.gemspec +++ b/logstash-output-elasticsearch.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-output-elasticsearch' - s.version = '12.0.7' + s.version = '12.1.0' s.licenses = ['apache-2.0'] s.summary = "Stores logs in Elasticsearch" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" diff --git a/spec/unit/outputs/elasticsearch_spec.rb b/spec/unit/outputs/elasticsearch_spec.rb index 5d1d6628..7a71e2e8 100644 --- a/spec/unit/outputs/elasticsearch_spec.rb +++ b/spec/unit/outputs/elasticsearch_spec.rb @@ -1499,6 +1499,80 @@ end end + describe 'drop_error_types' do + + let(:error_type) { 'index_closed_exception' } + + let(:options) { super().merge('drop_error_types' => [error_type]) } + + let(:events) { [ LogStash::Event.new("foo" => "bar") ] } + + let(:dlq_writer) { subject.instance_variable_get(:@dlq_writer) } + + let(:error_code) { 403 } + + let(:event_action_tuples) { subject.map_events(events) } + + let(:bulk_response) do + { + "took"=>1, "ingest_took"=>11, "errors"=>true, "items"=> + [{ + "index"=>{"_index"=>"bar", "_type"=>"_doc", "_id"=>'bar', "status" => error_code, + "error"=>{"type" => error_type, "reason" => "TEST" } + } + }] + } + end + + before(:each) do + allow(subject.client).to receive(:bulk_send).and_return(bulk_response) + end + + context 'DLQ is enabled' do + + let(:options) { super().merge("dlq_custom_codes" => [403]) } + + it 'does not write the event to the DLQ' do + expect(dlq_writer).not_to receive(:write) + subject.send(:submit, event_action_tuples) + end + end + + context 'DLQ is not enabled' do + + before(:each) do + allow(subject).to receive(:dlq_enabled?).and_return(false) + end + + it 'does not retry indexing the event' do + expect(subject).to receive(:submit).with(event_action_tuples).once.and_call_original + subject.send(:retrying_submit, event_action_tuples) + end + end + + context 'the error type is not in `silence_errors_in_log`' do + + it 'logs the error' do + expect(subject.logger).to receive(:warn).with(a_string_including("Failed action"), anything) + subject.send(:submit, event_action_tuples) + end + end + + context 'the error type is in `silence_errors_in_log`' do + + let(:options) { super().merge('silence_errors_in_log' => [error_type]) } + + before(:each) do + # ensure that neither warn nor info is called on the logger by using a test double + subject.instance_variable_set("@logger", double('logger')) + end + + it 'does not log the error' do + subject.send(:submit, event_action_tuples) + end + end + end + describe "custom headers" do let(:manticore_options) { subject.client.pool.adapter.manticore.instance_variable_get(:@options) }