Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Duplicate events are indexed with different _id field #312

Closed
dannyk81 opened this issue Oct 25, 2017 · 41 comments
Closed

Duplicate events are indexed with different _id field #312

dannyk81 opened this issue Oct 25, 2017 · 41 comments

Comments

@dannyk81
Copy link
Contributor

dannyk81 commented Oct 25, 2017

Problem

We've been experiencing few incidents in the past few weeks where a certain issue on our Elasticsearch cluster would cause errors in fluentd when pushing logs (specifically, we've been exceeding the allowed queue length on Elastic which would send reject errors), these are being retried on fluentd at certain intervals however each retry is submitted with a new _id value.

During this condition. Elasticsearch would seem to be indexing the messages (with some delay), but since fluentd is getting an error, it seem to retry again and again, as a result we end up with many duplicate events (each with a unique _id)

I've found a related issue filed here: https://bugzilla.redhat.com/show_bug.cgi?id=1491401

I'm not certain if this is the plugin's fault or perhaps the output buffer in fluentd that is responsible for this part.

We were able to come up with the following workaround:

  1. We've defined a new filter using the fluent-plugin-genhashvalue that generates a _hash field from several keys in each record (hopefully, providing a unique md5 hash per each event)
  2. Added id_key _hash to the Output configuration

The above solved the duplicate issue we've been experiencing.

Steps to replicate

This is triggered when Elasticsearch sends rejects due to queue limit reached.

Using Fluentd and ES plugin versions

  • Fluentd v0.12.35
  • ES plugin 1.10.1
  • Elasticsearch 5.5.0
@tschroeder-zendesk
Copy link

This has been a major issue for me as well. I ended up using my a similar workaround, but it would be ideal if this can get fixed.

@cosmo0920
Copy link
Collaborator

cosmo0920 commented Nov 14, 2017

We've defined a new filter using the fluent-plugin-genhashvalue that generates a _hash field from several keys in each record (hopefully, providing a unique md5 hash per each event)
Added id_key _hash to the Output configuration

Fluentd plugins should be used in several combinations.

This issue is caused by heavily traffic congestion in Elasticsearch.
I have no idea to fix this issue in plugin side, but @dannyk81 's workaround is pretty reasonable.
Could you add your workaround in document?
It might be helpful for heavily traffic congested EFK stack users.

@cosmo0920
Copy link
Collaborator

By default, all records inserted into ElasticSearch get a random _id. This option allows to use a field in the record as an identifier.

Current document does not imply how to prevent duplicated events.

@dannyk81
Copy link
Contributor Author

Thanks for the reply @cosmo0920!

I have always assumed that events that are being retried will use the same _id, however as we've observed and you confirmed, is not the case.

I wonder if there is a way to store the _id value in the retry buffer so that subsequent retries are sent with the same _id value.

I will submit a PR with this workaround to the README, as you suggested it might help others in similar situations, though I have to say it is not a "pretty" fix, the additional hash calculations consumes resources and you need to carefully select the keys for the differents sources to be used for the hash or you'll get dangerous hash-collisions and loose events.

After reviewing this bugzilla, it seems to me that the root cause may lie in fluentd's own output buffer mechanism, but I don't have enough experience with the inner-working to be certain, perhaps you could review and provide your pov?

@cosmo0920
Copy link
Collaborator

After reviewing this bugzilla, it seems to me that the root cause may lie in fluentd's own output buffer mechanism, but I don't have enough experience with the inner-working to be certain, perhaps you could review and provide your pov?

Fluentd's output and its tied buffer plugin use to write at least once rule into some output routes not to write at once rule.
In short, Fluentd output elasticsearch plugin tries to send records until receiving success response.

To solve ES plugin side, it needs to add generate Hash mechanism which is described here:

shaxxxhash(record) + rand(8) + Timestamp(subsecond precision integer)

will prevent hash value collisions.
I think that this hash calculation should be implemented in a separated helper module or a new plugin such as fluent-plugin-aws-elasticsearch.
How do you think about this?

dannyk81 pushed a commit to dannyk81/fluent-plugin-elasticsearch that referenced this issue Nov 14, 2017
Instructions on a how to avoid duplicate events on highly congested Elasticseatch clusters, relates to uken#312
@dannyk81
Copy link
Contributor Author

I think the above will be a great enhancement!

Don't think a seperate plugin should be used for this, as this is a common Elasticsearch problem (under certain conditions), a helper module sounds good!

In any case, I've submitted #317 with some instructions on how to workaround this issue in the meantime, for you consideration.

@cosmo0920
Copy link
Collaborator

I've released patch for this issue as 2.0.1.rc.1.
If you have more issue for this, please add your comments here.
Cc: @portante , @richm , @lukas-vlcek

@dannyk81
Copy link
Contributor Author

Thank you @cosmo0920 for this work! and everyone for contributing, we plan to test this release asap and will update how things are behaving.

@dannyk81
Copy link
Contributor Author

@cosmo0920 I just realized that >v2.0.0 is for fluentd 0.14, however we are still using fluentd 0.12, is it possible to port this to the 1.x.x version branch?

@cosmo0920
Copy link
Collaborator

cosmo0920 commented Nov 17, 2017

I ported this work for v0.12 on #323.
Could you try this one?

@dannyk81
Copy link
Contributor Author

Thank you! could I trouble you to build & publish an RC release for this as well? just will be easier for us to intergate it.

@cosmo0920
Copy link
Collaborator

I've published this work for v0.12 as v1.10.3.rc.1.
Could you try v1.10.3.rc.1 on your environment?

@dannyk81
Copy link
Contributor Author

dannyk81 commented Nov 17, 2017

Yes! thanks, we'll upgrade and let you know!

@dannyk81
Copy link
Contributor Author

dannyk81 commented Nov 20, 2017

Hi @cosmo0920

I just installed version 1.11.0 in our test cluster, unfortunately there seem to be an issue, I recieve the following errors from Elasticsearch:

Nov 20 18:48:09 elasticbuffer-01 docker-fluentd[21550]: 2017-11-20 18:48:09 +0000 [debug]: Elasticsearch rejected document: {"index"=>{"_index"=>"kubernetes-2017.11.20", "_type"=>"fluentd", "_id"=>"AV_awlRFP4qClgdBOQGe", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"Field [_id] is a metadata field and cannot be added inside a document. Use the index API request parameters."}}}
Nov 20 18:48:09 elasticbuffer-01 docker-fluentd[21550]: 2017-11-20 18:48:09 +0000 [debug]: Elasticsearch rejected document: {"index"=>{"_index"=>"kubernetes-2017.11.20", "_type"=>"fluentd", "_id"=>"AV_awlRFP4qClgdBOQGf", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"Field [_id] is a metadata field and cannot be added inside a document. Use the index API request parameters."}}}
Nov 20 18:48:09 elasticbuffer-01 docker-fluentd[21550]: 2017-11-20 18:48:09 +0000 [debug]: Elasticsearch rejected document: {"index"=>{"_index"=>"kubernetes-2017.11.20", "_type"=>"fluentd", "_id"=>"AV_awlRFP4qClgdBOQGg", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"Field [_id] is a metadata field and cannot be added inside a document. Use the index API request parameters."}}}
Nov 20 18:48:09 elasticbuffer-01 docker-fluentd[21550]: 2017-11-20 18:48:09 +0000 [debug]: Elasticsearch rejected document: {"index"=>{"_index"=>"kubernetes-2017.11.20", "_type"=>"fluentd", "_id"=>"AV_awlRFP4qClgdBOQGh", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"Field [_id] is a metadata field and cannot be added inside a document. Use the index API request parameters."}}}
Nov 20 18:48:09 elasticbuffer-01 docker-fluentd[21550]: 2017-11-20 18:48:09 +0000 [debug]: Elasticsearch rejected document: {"index"=>{"_index"=>"kubernetes-2017.11.20", "_type"=>"fluentd", "_id"=>"AV_awlRFP4qClgdBOQGi", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"Field [_id] is a metadata field and cannot be added inside a document. Use the index API request parameters."}}}
Nov 20 18:48:09 elasticbuffer-01 docker-fluentd[21550]: 2017-11-20 18:48:09 +0000 [debug]: Elasticsearch rejected document: {"index"=>{"_index"=>"kubernetes-2017.11.20", "_type"=>"fluentd", "_id"=>"AV_awlRFP4qClgdBOQGj", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"Field [_id] is a metadata field and cannot be added inside a document. Use the index API request parameters."}}}

Here's my output config:

<match **>
  @type elasticsearch
  hosts elasticin-01:9200,elasticin-02:9200
  include_tag_key true
  tag_key tag
  <hash>
    hash_id_key _id    # storing generated hash id key
  </hash>
  target_index_key target_index
  logstash_format true
  num_threads 8
  reload_on_failure true
  buffer_type file
  buffer_path /fluentd/log/buffer
  buffer_chunk_limit 8m
  flush_interval 5s
  retry_wait 20s
  disable_retry_limit false
  retry_limit 3
  buffer_queue_limit 256
  <secondary>
    @type file # or forward
    path /fluentd/log/forward-failed
  </secondary>
</match>

Seem like we are incorrectly using the API here?

@dannyk81
Copy link
Contributor Author

dannyk81 commented Nov 20, 2017

The following change in the configuration seem to have solved the error for me:

   include_tag_key true
   tag_key tag
   <hash>
-    hash_id_key _id    # storing generated hash id key
+    hash_id_key _hash    # storing generated hash id key
   </hash>
+  id_key _hash
   target_index_key target_index
   logstash_format true
   num_threads 8

Seems like you cannot inject the _id directly into the record, but it works when it is stored in another key (_hash in my case) used by the id_key parameter.

What do you think? is this a correct approach? I assume the value of _hash will not be overwritten by a new hash value during retries, correct?

@lukas-vlcek
Copy link

I think ES client needs to use Document PUT API which is equivalent to curl PUT index_name/type/document_id where the document_id should be the desired _id in this case. The same apply to Bulk API where the _id is defined in the index command preceding the document itself.

@dannyk81
Copy link
Contributor Author

It seems that my configuration above is generating duplicate events in Elasticsearch, I'm able to find many identical documents only with different _id (and _hash) values.

I'm going to revert to our previous configuration for now.

@portante
Copy link

I think the bulk index operations need to use "create" instead of "index", and then you also need error handling detect and ignore 409 errors on "create" operations (see https://github.com/ViaQ/fluent-plugin-elasticsearch/blob/upstream-handle-bulk-errors/lib/fluent/plugin/out_elasticsearch.rb#L397).

@cosmo0920
Copy link
Collaborator

cosmo0920 commented Nov 21, 2017

We can possibly handle duplicate events when using "create" operation at v1.11.0 and v2.1.0:

Could you try write_operation create configuration?

@dannyk81
Copy link
Contributor Author

I can try, but what about the <hash> config, this configuration fails:

  <hash>
    hash_id_key _id    # storing generated hash id key
  </hash>

Error:

Nov 20 18:48:09 elasticbuffer-01 docker-fluentd[21550]: 2017-11-20 18:48:09 +0000 [debug]: Elasticsearch rejected document: {"index"=>{"_index"=>"kubernetes-2017.11.20", "_type"=>"fluentd", "_id"=>"AV_awlRFP4qClgdBOQGe", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"Field [_id] is a metadata field and cannot be added inside a document. Use the index API request parameters."}}}

so, should I try this:

<match **>
  @type elasticsearch
  hosts elasticin-01:9200,elasticin-02:9200
  include_tag_key true
  tag_key tag
  <hash>
    hash_id_key _hash    # storing generated hash id key
  </hash>
  id_key _hash
  write_operation create
  target_index_key target_index
  logstash_format true
  num_threads 8
  reload_on_failure true
  buffer_type file
  buffer_path /fluentd/log/buffer
  buffer_chunk_limit 8m
  flush_interval 5s
  retry_wait 20s
  disable_retry_limit false
  retry_limit 3
  buffer_queue_limit 256
  <secondary>
    @type file # or forward
    path /fluentd/log/forward-failed
  </secondary>
</match>

@cosmo0920
Copy link
Collaborator

I can try, but what about the config, this configuration fails:

hash_id_key _id # storing generated hash id key

Error:

Nov 20 18:48:09 elasticbuffer-01 docker-fluentd[21550]: 2017-11-20 18:48:09 +0000 [debug]: Elasticsearch rejected document: {"index"=>{"_index"=>"kubernetes-2017.11.20", "_type"=>"fluentd", "_id"=>"AV_awlRFP4qClgdBOQGe", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"Field [_id] is a metadata field and cannot be added inside a document. Use the index API request parameters."}}}

Sorry, this is my mistake.
This is a bug for my patch.
And I'm planning to change default hash_id_key's key to be _hash and raise error in configure phase when @hash_config.hash_id_key != id_key.

@dannyk81
Copy link
Contributor Author

Ok, got it.

So this config is correct?

<match **>
  @type elasticsearch
  hosts elasticin-01:9200,elasticin-02:9200
  include_tag_key true
  tag_key tag
  <hash>
    hash_id_key _hash    # storing generated hash id key
  </hash>
  id_key _hash
  write_operation create
  target_index_key target_index
  logstash_format true
  num_threads 8
  reload_on_failure true
  buffer_type file
  buffer_path /fluentd/log/buffer
  buffer_chunk_limit 8m
  flush_interval 5s
  retry_wait 20s
  disable_retry_limit false
  retry_limit 3
  buffer_queue_limit 256
  <secondary>
    @type file # or forward
    path /fluentd/log/forward-failed
  </secondary>
</match>

@cosmo0920
Copy link
Collaborator

Yes, it is.

@dannyk81
Copy link
Contributor Author

@cosmo0920

We've hit the same problem with above setup, many duplicate documents are ingested in Elasticsearch with different _id values, for now we reverted back to generating the _hash with fluent-plugin-genhashvalue.

Any ideas? is it possible the plugin overwrites the value of _hash when the events are retried?

@cosmo0920
Copy link
Collaborator

Any ideas? is it possible the plugin overwrites the value of _hash when the events are retried?

I think that retrying to write into elasticsearch may overwrite the value of _hash.
Currently, ES plugin does not check _hash records key existence: https://github.com/uken/fluent-plugin-elasticsearch/blob/master/lib/fluent/plugin/generate_hash_id_support.rb#L18

@dannyk81
Copy link
Contributor Author

Ok, so seems like we need to check if the key already exist? and if it does then don't create a new one.

Recalling our disucssion in the PR, seems like if the hash was reporoducible this wouldn't be any issue at this point (as the retry would produce the same hash) isn't it? although I still prefer to use the random method to generate the hash to avoid collisions.

@portante
Copy link

Wouldn't we want the ES output plugin to create a bulk request payload with the _id's of each record in place, and on error, just re-submit that same bulk request without regenerating the bulk request payload? Then you don't need to make the expensive calculation of unique ID from each JSON document, right?

@dannyk81
Copy link
Contributor Author

I'm going to try a different "workaround" for now, since using fluent-plugin-genhashvalue is a big headache for us, I'll use this fluent-plugin-add-uuid instead.

<filter **>
  @type adduuid
  key _hash
</filter>

and use the _hash key as id_key in the ES plugin.

@cosmo0920
Copy link
Collaborator

cosmo0920 commented Nov 22, 2017

How about separate Generate Hash ID module as fluent-plugin-filter-elasticsearch-genid plugin?
It will be same as above plugin but I can write its plugin test code. (Above plugin does not have its plugin test.)

@cosmo0920
Copy link
Collaborator

How about separate Generate Hash ID module as fluent-plugin-filter-elasticsearch-genid plugin?
It will be same as above plugin but I can write its plugin test code. (Above plugin does not have its plugin test.)

Ah, this plugin should be bundled this ES plugin.
This plugin should be filter_elasticsearch_genid.

@dannyk81
Copy link
Contributor Author

Sounds good to me!

@cosmo0920
Copy link
Collaborator

I'd published #331 work as v2.2.0.rc.1 and #332 work as v1.12.0.rc.1.

@dannyk81
Copy link
Contributor Author

👍 I'm going to test it asap!

@cosmo0920
Copy link
Collaborator

I'm looking forward your test result.
Sorry for inconvenience for my patches. 😣

@dannyk81
Copy link
Contributor Author

1.12.0.rc.1 already deployed, so far so good - I will keep you posted!

I appreciate your hard work 😃

@dannyk81
Copy link
Contributor Author

Things are looking stable! no issues as far as I can see.

@cosmo0920
Copy link
Collaborator

Awesome. What a good news!

@dannyk81
Copy link
Contributor Author

Do you plan to do a ga release of v1.12.0 and v2.2.0 ?

@cosmo0920
Copy link
Collaborator

I’m planning to ga release of v1.12.0 and v2.2.0 in the next week.
Because today is Japan’s Labor Thanksgiving Day and tomorrow is my PTO.
I’ll back to next Monday.

@dannyk81
Copy link
Contributor Author

Thanks @cosmo0920, have a great weekend 😃

@cosmo0920
Copy link
Collaborator

I've released v1.12.0 and v2.2.0.
Thank you for your report, @dannyk81 !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants