Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 12.1.0
- Add drop_error_types config option to not retry after certain error types [#1228](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1228)

## 12.0.7
- Support both, encoded and non encoded api-key formats on plugin configuration [#1223](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1223)

Expand Down
19 changes: 19 additions & 0 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ Please check out <<plugins-{type}s-{plugin}-obsolete-options>> for details.
| <<plugins-{type}s-{plugin}-doc_as_upsert>> |<<boolean,boolean>>|No
| <<plugins-{type}s-{plugin}-document_id>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-document_type>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-drop_error_types>> |<<array,array>>|No
| <<plugins-{type}s-{plugin}-ecs_compatibility>> | <<string,string>>|No
| <<plugins-{type}s-{plugin}-failure_type_logging_whitelist>> |<<array,array>>|No
| <<plugins-{type}s-{plugin}-healthcheck_path>> |<<string,string>>|No
Expand Down Expand Up @@ -644,6 +645,24 @@ If you don't set a value for this option:
- for elasticsearch clusters 8.x: no value will be used;
- for elasticsearch clusters 7.x: the value of '_doc' will be used.

[id="plugins-{type}s-{plugin}-drop_error_types"]
===== `drop_error_types`

* Value type is <<array,array>>
* Default value is `[]`

Lists the set of error types for which individual bulk request actions will not be retried. Unless an individual - document level - action returns 409 or an error from this list, failures will be retried indefinitely.
A warning message will be logged indicating that the action failed, unless the error type is
listed in the <<plugins-{type}s-{plugin}-silence_errors_in_log>> config option.
Note that the events are not added to the Dead Letter Queue (DLQ), regardless of whether it is enabled.

[source,ruby]
output {
elasticsearch {
drop_error_types => ["index_closed_exception"]
}
}

[id="plugins-{type}s-{plugin}-ecs_compatibility"]
===== `ecs_compatibility`

Expand Down
4 changes: 4 additions & 0 deletions lib/logstash/plugin_mixins/elasticsearch/api_configs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@ module APIConfigs
# if enabled, failed index name interpolation events go into dead letter queue.
:dlq_on_failed_indexname_interpolation => { :validate => :boolean, :default => true },

# Failures on actions from a bulk request will not be retried for these error types; the events will be dropped.
# The events won't be added to the DLQ either.
:drop_error_types => { :validate => :string, :list => true, :default => [] },

# Obsolete Settings
:ssl => { :obsolete => "Set 'ssl_enabled' instead." },
:ssl_certificate_verification => { :obsolete => "Set 'ssl_verification_mode' instead." },
Expand Down
6 changes: 4 additions & 2 deletions lib/logstash/plugin_mixins/elasticsearch/common.rb
Original file line number Diff line number Diff line change
Expand Up @@ -278,16 +278,18 @@ def submit(actions)

status = action_props["status"]
error = action_props["error"]
type = error["type"] if error
action = actions[idx]

# Retry logic: If it is success, we move on. If it is a failure, we have 3 paths:
# Retry logic: If it is success, we move on. If it is a failure, we have the following paths:
# - For 409, we log and drop. there is nothing we can do
# - For any error types set in the 'drop_error_types' config, log and drop.
# - For a mapping error, we send to dead letter queue for a human to intervene at a later point.
# - For everything else there's mastercard. Yep, and we retry indefinitely. This should fix #572 and other transient network issues
if DOC_SUCCESS_CODES.include?(status)
@document_level_metrics.increment(:successes)
next
elsif DOC_CONFLICT_CODE == status
elsif DOC_CONFLICT_CODE == status || @drop_error_types.include?(type)
@document_level_metrics.increment(:non_retryable_failures)
@logger.warn "Failed action", status: status, action: action, response: response if log_failure_type?(error)
next
Expand Down
2 changes: 1 addition & 1 deletion logstash-output-elasticsearch.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-output-elasticsearch'
s.version = '12.0.7'
s.version = '12.1.0'
s.licenses = ['apache-2.0']
s.summary = "Stores logs in Elasticsearch"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
74 changes: 74 additions & 0 deletions spec/unit/outputs/elasticsearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1499,6 +1499,80 @@
end
end

describe 'drop_error_types' do

let(:error_type) { 'index_closed_exception' }

let(:options) { super().merge('drop_error_types' => [error_type]) }

let(:events) { [ LogStash::Event.new("foo" => "bar") ] }

let(:dlq_writer) { subject.instance_variable_get(:@dlq_writer) }

let(:error_code) { 403 }

let(:event_action_tuples) { subject.map_events(events) }

let(:bulk_response) do
{
"took"=>1, "ingest_took"=>11, "errors"=>true, "items"=>
[{
"index"=>{"_index"=>"bar", "_type"=>"_doc", "_id"=>'bar', "status" => error_code,
"error"=>{"type" => error_type, "reason" => "TEST" }
}
}]
}
end

before(:each) do
allow(subject.client).to receive(:bulk_send).and_return(bulk_response)
end

context 'DLQ is enabled' do

let(:options) { super().merge("dlq_custom_codes" => [403]) }

it 'does not write the event to the DLQ' do
expect(dlq_writer).not_to receive(:write)
subject.send(:submit, event_action_tuples)
end
end

context 'DLQ is not enabled' do

before(:each) do
allow(subject).to receive(:dlq_enabled?).and_return(false)
end

it 'does not retry indexing the event' do
expect(subject).to receive(:submit).with(event_action_tuples).once.and_call_original
subject.send(:retrying_submit, event_action_tuples)
end
end

context 'the error type is not in `silence_errors_in_log`' do

it 'logs the error' do
expect(subject.logger).to receive(:warn).with(a_string_including("Failed action"), anything)
subject.send(:submit, event_action_tuples)
end
end

context 'the error type is in `silence_errors_in_log`' do

let(:options) { super().merge('silence_errors_in_log' => [error_type]) }

before(:each) do
# ensure that neither warn nor info is called on the logger by using a test double
subject.instance_variable_set("@logger", double('logger'))
end

it 'does not log the error' do
subject.send(:submit, event_action_tuples)
end
end
end

describe "custom headers" do
let(:manticore_options) { subject.client.pool.adapter.manticore.instance_variable_get(:@options) }

Expand Down