New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
evaluate bulk request failures and reroute failed messages #405
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
|
|
||
| # RetryRecordsError privides a list of records to be | ||
| # put back in the pipeline for cases where a bulk request | ||
| # failed (e.g some records succeede while others failed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/succeede/succeed/, and missing trailing ")".
| chunk.msgpack_each do |time, record| | ||
| @error.records += 1 | ||
| next unless record.is_a? Hash | ||
| begin | ||
| process_message(tag, meta, header, time, record, bulk_message) | ||
| records << { time: time, record: record } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't this use more memory on the non-error path as well, which we hope would be the fast path? Is there a way we can just pass the "chunk" object and only when we find some errors, we index into the chunk object to pull out the original records to re-submit?
In general, it seems that the non-error case should not have to shoulder any of the burden for error handling, and the error path becomes more costly to compensate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in fact, you have to use the raw record from the chunk - you need the resubmitted record to look exactly like the raw record from the chunk, so it can be reprocessed. So yes - we need the chunk, and in the error handing code we'll need to iterate the records using chunk.msgpack_each do |time, record|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with your observation but Rich is correct, we are basing the functionality on the assumption that the order of records submitted is the same as the items in the response. This assumption is based upon Elasticsearch documentation as there is no way to otherwise correlate the request and response without ids or some hashing. This is also necessary to only keep a list of the records that can be properly deserialized
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But records ends up having the records in the same order as chunk.msgpack_each emits them, right? If that is the case, why do we need to construct a separate array on the normal path? Why not pass chunk in, and when we encounter errors, then construct the list to handle the errors?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have to iterate the chunk in the error handling. We need to make sure to get the same records that were sent, which means adding a couple of the record processing steps: omit records that are not Hash objects, and omit records that were sent to the dlq i.e. not included in the bulk index request. We can't do any more processing on the records - records resubmitted to fluentd must be the raw records directly from the queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, we cant iterate the chunk but the list of records that were actually submitted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to use the chunk to get the raw, unprocessed record to resubmit
7854291
to
38a7aef
Compare
| chunk.msgpack_each do |time, record| | ||
| @error.records += 1 | ||
| next unless record.is_a? Hash | ||
| begin | ||
| process_message(tag, meta, header, time, record, bulk_message) | ||
| records << { time: time, record: record } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But records ends up having the records in the same order as chunk.msgpack_each emits them, right? If that is the case, why do we need to construct a separate array on the normal path? Why not pass chunk in, and when we encounter errors, then construct the list to handle the errors?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sending bad record handling in ES plugin is not good manner. This behavior will make confusion.
I guess that this should be done with secondary output feature.
|
@cosmo0920 I don't understand what you are saying or your requested changes. This PR does nothing with 'bad record handling' that is not already in the plugin. It provides a way to reprocess a failed bulk request by:
We see errors in production right now where one failed insert from the bulk causes every message from the bulk insert to be attempted again adding undo load on Elasticsearch. |
I don't have OpenShift environment, but I guess that if ES plugin can handle |
|
@cosmo0920 Im still not understanding what you are saying regarding a 'secondary' option. The flaw is between this plugin and fluentd, there is currently no way to partially accept the result of trying to submit a chunk. It is unrelated to openshift but a flaw in how this plugin sends to ES. If only one record fails, the exception bubbles up to fluentd and the entire chunk is re-tried. You can demonstrate this with any ES instance by restricting its memory and cpu and the pounding it with a huge message load. You should see failures due to memory exhaustion or the like. There is no need to push to a secondary output; the records just need to be sent back into the queue for processing which I believe is what this does. |
|
@cosmo0920, another way to think about this is that the Elasticsearch plugin is constructing a set of instructions for indexing the individual records of a chunk. Each record has its own success/failure status. That is not the case with other plugins, like secure_forward, where the entire chunk is transmitted as a whole, and the success/failure is of the entire chunk, not for each record in the chunk. |
|
@cosmo0920 I'm going to try to setup a meeting with you but please consider this scenario which is the case we have seen with some of our configurations of Elasticsearch:
The flow of fluentd with this output plugin is:
This current flow between fluent and this plugin has no logic to resubmit ONLY failed messages. You have to resubmit the entire request or non at all. It requires you to resubmit messages that were indexed before. This means resubmitting every successful record, every duplicate record, and every failed record. In our configuration this means 8Mb message chunks from up to 1000 or more instance of fluentd. This means EVERY record that already exists in Elasticsearch is being reserialized and retransmitted across the network because possibly one, single record in the bulk request failed. Regarding secondary output: This means I believe the proposed change is well within what is reasonable based upon fluent's guidelines. Each output destination needs to be handled according to the characteristics of the target destination. In the case of Elasticsearch, this change addresses how to properly interact with bulk requests. Obviously, other output plugins may not need such handling thus there is no 'one size fits all'. |
38a7aef
to
196060f
Compare
Unfortunately, I don't enough knowledge about ES cluster operation.
can change each of chunk message size.
Should we provide one by one insert in ES plugin? |
|
please review https://www.elastic.co/guide/en/elasticsearch/guide/2.x/_monitoring_individual_nodes.html#_threadpool_section We are trying to implement the following algorithm: That is, we are trying to wait for a bit, then resubmit only the rejected actions. |
|
@richm Thank you for the information! It is reasonable to add this feature into master branch not v0.12 branch. Because:
|
196060f
to
04ebac8
Compare
Ok, but this is a pretty critical bug - anyone running Elasticsearch in a non-trivial environment will run into this situation sooner or later, and there is no good way to mitigate this without the fluent-plugin-elasticsearch handling the backpressure correctly. |
04ebac8
to
7fbee95
Compare
|
@cosmo0920 I rebased and fixed up the tests. Please reconsider merging this change as @richm stated this will affect anyone who has a non-trival environment. It's not properly solvable by tweaking buffer settings. We will otherwise need to carry a patch ourselves. cc @portante please review to see if I have properly addressed your error handling concerns |
| def initialize(plugin, records = 0, bulk_message_count = 0) | ||
| @plugin = plugin | ||
| @records = records | ||
| @bulk_message_count = bulk_message_count | ||
| end | ||
|
|
||
| def handle_error(response) | ||
| def handle_error(response, tag, records) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pass chunk instead of records
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can not pass chunk instead of records. This code is executed after we have deserialized the individual records in the chunk to confirm they dont throw exceptions like 'utf-8' problems. The chunk records are not representative of those we submitted to Elasticsearch
| stats = Hash.new(0) | ||
| response['items'].each do |item| | ||
| response['items'].each_with_index do |item, index| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also iterate chunk - pull the next item from chunk
You'll also have to implement the next unless record.is_a? Hash logic, and any logic in process_message that omits the record from the bulk index request. That will need to be refactored into a method callable from both places. You have to guarantee that the record you read from the chunk is the same one being processed in this error handling loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here. We've already processed the chunk. There is no reason to do so here too.
| @@ -41,13 +43,20 @@ def handle_error(response) | |||
| stats[:successes] += 1 | |||
| when CREATE_OP == write_operation && 409 == status | |||
| stats[:duplicates] += 1 | |||
| when 400 == status | |||
| stats[:bad_argument] += 1 | |||
| record = records[index] | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will be the record from the chunk instead
| else | ||
| retry_records << records[index] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will be the record from the chunk instead
| if item[write_operation].has_key?('error') && item[write_operation]['error'].has_key?('type') | ||
| type = item[write_operation]['error']['type'] | ||
| else | ||
| # When we don't have a type field, something changed in the API | ||
| # expected return values (ES 2.x) | ||
| stats[:errors_bad_resp] += 1 | ||
| record = records[index] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will be the record from the chunk instead
Now, https://docs.fluentd.org/ points v1 document by default. I don't think that it is worth to add this back pressure feature into v0.12 branch from now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I decided to approve this PR because:
- Users cannot avoid flood of resubmitting entire chunk on current implementation.
| if records.length != response['items'].length | ||
| raise ElasticsearchError, "The number of records submitted do not match the number returned. Unable to process bulk response" | ||
| end | ||
| retry_stream = Fluent::MultiEventStream.new |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jcantrill I replaced retry_records with retry_stream which is MultiEventStream instance.
I think that we can create an event stream for resubmitting within error handler.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems reasonable
| @error.handle_error(response) if response['errors'] | ||
| @error.handle_error(response, tag, records) if response['errors'] | ||
| rescue RetryStreamError => e | ||
| router.emit_stream(tag, e.retry_stream) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jcantrill
When we create retry_stream which is MultiEventStream instance in error handler, we can pass created stream directly and become resubmitting code simple.
Is there any concern about this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only one which was brought up by @richm . He noted that when we resubmit records, I assume with the same tag, they go through the entire pipeline. This means if you have multiple filters or send messages to multiple outputs, you may be reprocessing(e.g. adding fields again, contacting other servers to get metadata) and duplicating message to other destinations (e.g. if we copied to say file). @richm and I talked about routing the messages in a similar way as @ERROR with providing a label like @RETRY or something. I recommend we may wish in this case to have a configurable label for records we wish to retry:
config_param: :retry_es_label, default: @RETRY_ES
...
router.emit_stream(@retry_es_label, e.retry_stream)
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure how to do this in the code, but it was easy to add a @label @RETRY_ES to our es plugin config, then add a <label @RETRY_ES> section.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Current Fluentd v0.12 (and v1) cannot specify label at runtime.
It is determined in configuration file at bootup phase.
Instead, we can reroute with adding prefix/suffix into tag like as:
config_param: :retry_es_tag_prefix, default: "retrying."
...
router.emit_stream(@retry_es_tag_prefix + tag, e.retry_stream)
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok - so either the pipeline must be reentrant (that is, no label is required, it is ok for records to be resubmitted to the entire pipeline), or add some sort of @label @RETRY_ES to the fluent-plugin-elasticsearch config, with a <label @RETRY_ES> section in your fluent.conf, which guarantees that only records which should be retried for submission to elasticsearch go into this label processing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cosmo0920 Can we not assume that users will have to specify the label in their config pipeline and that the 'configuration' is simply them telling us which label to use. Will routing fail if the label does not exist at runtime?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, routing will fail at startup - if there is a @label @FOO declaration without a <label @FOO>, or vice versa, in the fluent.conf
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@label is optional configuration element.
If there is no specified label, events are re-emitted into same route and cause infinite busy loop.
23480a3
to
0d79c23
Compare
| def send_bulk(data) | ||
| # send_bulk given a specific bulk request, the original tag, | ||
| # chunk, and bulk_message_count | ||
| def send_bulk(data, tag, chunk, bulk_message_count) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mismatched arguments between caller and callee.
Is this intentional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, not intentional - good catch
301a628
to
bf0a317
Compare
instead of resubmitting the entire request
|
Test passing and I reviewed again. |
|
@cosmo0920 We'll also work on a subsequent PR to allow a 'retry' tag which would allow you to possibly reroute messages through a different pipeline. For the one we use, we desire to skip section of the pipeline and simply resubmit the message to skip specific filters. |
|
Yup. We should still work on retrying mechanism. |
Uplift evaluate bulk request failures and reroute failed messages patch to v1.
| @@ -41,13 +59,19 @@ def handle_error(response) | |||
| stats[:successes] += 1 | |||
| when CREATE_OP == write_operation && 409 == status | |||
| stats[:duplicates] += 1 | |||
| when 400 == status | |||
| stats[:bad_argument] += 1 | |||
| @plugin.router.emit_error_event(tag, time, rawrecord, '400 - Rejected by Elasticsearch') | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This and the other emit_error_event usage below seem to be problematic. The router API expects the error to be an Exception. This is causing errors in the logs:
2018-05-08 14:45:51 +0000 [warn]: #0 failed to flush the buffer. retry_time=0 next_retry_seconds=2018-05-08 14:45:52 +0000 chunk="56bb2d75cc799064fe3005a7ce548c43" error_class=NoMethodError error="undefined method `backtrace' for \"400 - Rejected by Elasticsearch\":String"
2018-05-08 14:45:51 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.0.2/lib/fluent/root_agent.rb:304:in `emit_error_event'
2018-05-08 14:45:51 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.0.2/lib/fluent/event_router.rb:102:in `emit_error_event'
2018-05-08 14:45:51 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluent-plugin-elasticsearch-2.10.0/lib/fluent/plugin/elasticsearch_error_handler.rb:64:in `block in handle_error'
2018-05-08 14:45:51 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.0.2/lib/fluent/event.rb:323:in `each'
2018-05-08 14:45:51 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.0.2/lib/fluent/event.rb:323:in `block in each'
2018-05-08 14:45:51 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.0.2/lib/fluent/plugin/buffer/memory_chunk.rb:80:in `open'
2018-05-08 14:45:51 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.0.2/lib/fluent/plugin/buffer/memory_chunk.rb:80:in `open'
2018-05-08 14:45:51 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.0.2/lib/fluent/event.rb:322:in `each'
2018-05-08 14:45:51 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluent-plugin-elasticsearch-2.10.0/lib/fluent/plugin/elasticsearch_error_handler.rb:27:in `handle_error'
2018-05-08 14:45:51 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluent-plugin-elasticsearch-2.10.0/lib/fluent/plugin/out_elasticsearch.rb:510:in `send_bulk'
2018-05-08 14:45:51 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluent-plugin-elasticsearch-2.10.0/lib/fluent/plugin/out_elasticsearch.rb:408:in `write'
2018-05-08 14:45:51 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.0.2/lib/fluent/plugin/output.rb:1093:in `try_flush'
2018-05-08 14:45:51 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.0.2/lib/fluent/plugin/output.rb:1318:in `flush_thread_run'
2018-05-08 14:45:51 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.0.2/lib/fluent/plugin/output.rb:439:in `block (2 levels) in start'
2018-05-08 14:45:51 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.0.2/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'
instead of resubmitting the entire request
This PR add logic to evaluate a bulk request failure and to resubmit to the pipeline only those records that failed. Previously, all records in a bulk submission were resubmitted to Elastic even if there was only one failure in the entire request. This causes undue processing on both the collector and aggregator
(check all that apply)
versionin gemspec are untouchedelasticsearch_dynamic(not required but recommended)