Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use the chunk to iterate records in the error case #1

Open
wants to merge 6 commits into
base: sanitize_bulk
Choose a base branch
from

Conversation

richm
Copy link

@richm richm commented Apr 26, 2018

pass the chunk to the error handler. It will process each record
in the chunk in the same way that the plugin handles it, even
calling process_message() on a deep copy of the record.
All of the error handling has been moved into the error condition
in send_bulk, so that there is no error code in the regular,
non-error code path. The only thing left is bulk_message_count
which must be done by the plugin and passed to the error
handling code.
ElasticsearchErrorHandler @records and @bulk_message_count are
now unused so I got rid of them.
I made sure that the tests covered all of the odd corner cases.

DESCRIPTION HERE

(check all that apply)

  • tests added
  • tests passing
  • README updated (if needed)
  • README Table of Contents updated (if needed)
  • History.md and version in gemspec are untouched
  • backward compatible
  • feature works in elasticsearch_dynamic (not required but recommended)

instead of resubmitting the entire request
@richm
Copy link
Author

richm commented Apr 26, 2018

note - this relies on some major hacks to the fluentd config - it adds

@label @RETRY_ES

to our elasticsearch output plugins, then defines a config file with that label to call the es plugins again. Not sure if that means we will have yet another set of queues for each of those plugins . . . but I suspect so . . .

Copy link
Owner

@jcantrill jcantrill left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fail to see in this code changes how we properly associate incoming chunk records with responses from bulk request. Is that because we spin through the chunk and process_message again after having done that already in write_obj I'm not sure we have any memory savings given we are duplicating records anyway

next unless rawrecord.is_a? Hash
begin
# we need a deep copy for process_message to alter
processrecord = Marshal.load(Marshal.dump(rawrecord))
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does rawrecord.dup not perform the same function without serializing and deserializing?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that because we spin through the chunk and process_message again after having done that already in write_obj

Yes. And in the tests I added, you can see that it will correctly associate messages from the chunk with the responses from Elasticsearch.

I'm not sure we have any memory savings given we are duplicating records anyway

In the error case, no, we are not saving any memory. However, in the regular case, we are saving memory - we are not adding any code or any extra memory in the regular, non-error case. That is, this patch makes it so that we do not have any effect on the regular, non-error case, which is what @portante was concerned about. Then, in the error case, yes, we use additional memory - we pay the price in the error case, to avoid having to pay the price in the non-error case.

Does rawrecord.dup not perform the same function without serializing and deserializing?

No, it does a shallow copy, and we need a deep copy here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, I've also tested this code with logging, and it passes the bulk rejection test.

pass the chunk to the error handler.  It will process each record
in the chunk in the same way that the plugin handles it, even
calling process_message() on a deep copy of the record.
All of the error handling has been moved into the error condition
in send_bulk, so that there is no error code in the regular,
non-error code path.  The only thing left is bulk_message_count
which must be done by the plugin and passed to the error
handling code.
ElasticsearchErrorHandler @records and @bulk_message_count are
now unused so I got rid of them.
I made sure that the tests covered all of the odd corner cases.
@richm
Copy link
Author

richm commented Apr 28, 2018

This makes the test_elasticsearch_error_handler.rb pass - however, it causes failures in

/home/rmeggins/fluent-plugin-elasticsearch/test/plugin/test_out_elasticsearch.rb:1413:in `test_bulk_error'

not sure how to fix this - need to dig into the test driver code and find out how the log records are supposed to flow - might have to provide a test function or otherwise stub out the call to router.emit_stream(tag, e.retry_stream) at out_elasticsearch.rb:433

and in two other tests (because we redirect the errors now?):

Failure: test_uses_custom_time_key_format_logs_an_error[default](ElasticsearchOutput)

Failure: test_uses_custom_time_key_format_logs_an_error[custom_tag](ElasticsearchOutput)

@jcantrill
Copy link
Owner

@richm I was having difficulties finding how to verify this method. A method exists for expecting records but not the stream and it appears the driver is reset someplace in between the start and finish of the specific test.

@jcantrill jcantrill force-pushed the sanitize_bulk branch 3 times, most recently from bf0a317 to 614320e Compare May 1, 2018 18:42
jcantrill pushed a commit that referenced this pull request Jun 22, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants