Skip to content

fix(kafka source): Add commit_interval_ms option#944

Merged
3 commits merged intomasterfrom
unknown repository
Oct 16, 2019
Merged

fix(kafka source): Add commit_interval_ms option#944
3 commits merged intomasterfrom
unknown repository

Conversation

@ghost
Copy link
Copy Markdown

@ghost ghost commented Sep 27, 2019

Closes #865

This PR implements simple committing of offsets to Kafka. Previously the offsets were only stored but not committed.

I'm not very happy with the implementation though. While it is an attempt to guarantee "at least once" delivery, the issue with it is that the number of duplicates happening after a restart of Vector is quite high (with magnitude of commit_interval * throughput).

I think the approach from rdkafka's example with autocommit and storing of the offsets only for processed messages would be more reliable, but there is a need for a way to mark each message as processed (stored to disk buffer or sinks) to store offsets only for marked messages.

Signed-off-by: Alexander Rodin <rodin.alexander@gmail.com>
@binarylogic
Copy link
Copy Markdown
Contributor

Thanks, @a-rodin. Is it worth opening a follow-up issue to represent the improvement?

@ghost
Copy link
Copy Markdown
Author

ghost commented Sep 28, 2019

@binarylogic I've read more topology and fanout code and think now that probably no additional issue is needed, but this PR has to be reworked to do the following:

  • enable auto-commit with manual offset storing;
  • store offset for previous message for each ConsumerStream when the poll function is called next time.

In that case the messages will be delivered to at least one of the sinks and in case of restarts there would be no more duplicates than there are threads.

@lukesteensen
Copy link
Copy Markdown
Member

@a-rodin what do you think about simply setting the enable.auto.commit option? It seems to me like it would have roughly the same effect as this change.

In the longer term, tracking message acknowledgments and propagating them back to sources is something we'll definitely want to explore.

@ghost
Copy link
Copy Markdown
Author

ghost commented Oct 1, 2019

@lukesteensen If we enable auto commit, then we need to store offsets manually. At the moment this solution seems the cleanest to me.

With both autocommit and automatic offset storing there are losses of messages on each restart (I tested it and observed them).

@ghost
Copy link
Copy Markdown
Author

ghost commented Oct 1, 2019

I did a few tests. In all of them Vector was reading data from a Kafka topic and writing the data to a file for around 28 seconds. The commit interval was set to 5 seconds.

These are the results:

Experiment # Offset storing Committing Offset increment Delivered messages Excessive messages
1 Automatic Automatic 2017984 2017984 0
2 Automatic Automatic 1982684 1982683 -1
3 Automatic Automatic 1908673 1908672 -1
4 Manual, immediate Automatic 1932715 1932725 10
5 Manual, immediate Automatic 1970351 1970353 2
6 Manual, immediate Automatic 1989981 1989987 6
7 Manual, immediate Manual, after next message 1911748 2075986 164238
8 Manual, immediate Manual, after next message 1976111 2243403 267292
9 Manual, immediate Manual, after next message 2684911 3022136 337225

Meaning of the columns:

  • "Offset storing" - "automatic" corresponds to enable.auto.offset.store set to true, "manual, immediate" corresponds to enable.auto.offset.store set to false and storing them like here: https://github.com/timberio/vector/blob/ec73082da655d5e17c7023fef3b5c1893a4d7bf4/src/sources/kafka.rs#L107-L109
  • "Committing" - "automatic" means enable.auto.commit set to true, "manual, after next message" means what this PR introduces
  • "Offset increment" - increment of sum of offsets for all partitions for Vector's consumer group after running each test
  • "Delivered messages" - number of messages in the output file
  • "Excessive messages" - difference between "Delivered messages" and "Offset increment"

From the results it looks like using autocommit and automatic offset storing together result in losing one message. I was actually expecting that "Manual, immediate"/"Automatic" would also result in losing the last message, but it doesn't seem to happen.

My idea in the previous comment was to replace "manual, immediate" offset storing strategy by "manual, on next poll" which would save offsets for a message when the next poll on the stream happens.

@lukesteensen
Copy link
Copy Markdown
Member

How were you shutting vector down for these tests? I'd hope that a clean shutdown would give us a chance to ack the final message before shutting down and not have any loss or dups.

Do you think the difference auto and manual commit is that auto will do a commit on shutdown? That seems like the best outcome in your tests, but we could maybe adjust the manual strategy.

@ghost
Copy link
Copy Markdown
Author

ghost commented Oct 8, 2019

@lukesteensen I was pressing Ctrl-C once each time, which should send SIGINT.

Do you think the difference auto and manual commit is that auto will do a commit on shutdown?

It should commit on shutdown because the Drop implementation calls rd_kafka_consumer_close function which is documented to do it.

That seems like the best outcome in your tests, but we could maybe adjust the manual strategy.

Actually, I'm leaning towards this approach (manual/auto) now because it performs well in tests and is simple to maintain. I explored the approach from my previous comment, but it seems to be necessarily cumbersome to implement (requires either moving offsets between threads or calling unsafe functions) without the corresponding safe APIs in rdkafka crate.

@lukesteensen
Copy link
Copy Markdown
Member

Actually, I'm leaning towards this approach (manual/auto) now because it performs well in tests and is simple to maintain.

👍 This sounds good to me

Signed-off-by: Alexander Rodin <rodin.alexander@gmail.com>
Signed-off-by: Alexander Rodin <rodin.alexander@gmail.com>
@binarylogic
Copy link
Copy Markdown
Contributor

@a-rodin just checking progress on this. Anything blocking merging?

@ghost
Copy link
Copy Markdown
Author

ghost commented Oct 14, 2019

@binarylogic I think this is ready to merge. I've tested it manually and it works.

@binarylogic
Copy link
Copy Markdown
Contributor

Perfect! @lukesteensen mind approving if everything is good?

@ghost ghost merged commit a3c7c75 into vectordotdev:master Oct 16, 2019
This pull request was closed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Topic offsets for Kafka source are not always committed

3 participants