Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Secor loses events during kafka rebalancing #395

Closed
alexivpx opened this issue Feb 23, 2018 · 6 comments
Closed

Secor loses events during kafka rebalancing #395

alexivpx opened this issue Feb 23, 2018 · 6 comments

Comments

@alexivpx
Copy link

Referring to the issue first posted here, as i'm experiencing exactly the same behaviour:
https://groups.google.com/forum/#!topic/secor-users/CzTlwiemKXU

I'm using Secor with follow setup:

secor.file.reader.writer.factory=com.pinterest.secor.io.impl.DelimitedTextFileReaderWriterFactory
secor.message.parser.class=com.pinterest.secor.parser.SplitByFieldMessageParser
kafka.consumer.auto.offset.reset=smallest
kafka.partition.assignment.strategy=roundrobin
kafka.dual.commit.enabled=false
kafka.offsets.storage=kafka

Everything works just fine when the number of machines which are running doesn't change, but there is rebalancing going on (which is caused by autoscale configured/ Spot Instances), and as the result Secor loses some data, which is very severe issue!

The issue explained (Quote):
You can find more details about the problem at the bottom of the message, but I think that the root cause of the problem is the logic of the DelimitedTextFileReader.

It takes the initial offset from the filename (com/pinterest/secor/io/impl/DelimitedTextFileReaderWriterFactory.java:73) and then increments it by one for each of the read messages (com/pinterest/secor/io/impl/DelimitedTextFileReaderWriterFactory.java:91).
However, this is not always correct. If output partitioning is enabled the messages from the same kafka partition may go into different output partitions. As a result, secor may delete the messages it was not supposed to delete during file trim (com/pinterest/secor/uploader/Uploader.java:157).

I think pretty much all file readers has the same issue except this two:

  • MessagePackSequenceFileReaderWriterFactory
  • SequenceFileReaderWriterFactory

I’m going to provide an example to be more specific what is wrong with this logic.

  • Let's say the last committed offset is 500
  • And we have two sets of messages in the partition [config] Allow using kafka registered at chroot #7
    • 100 messages at Nov 26 23:59
    • and another 100 messages at Nov 27 00:01
  • the output partitioning will put this messages into two files:
    • <path_to_local_secor_files>/<topic_name>/dt=2017-11-26/1_7_00000000000000000500
    • <path_to_local_secor_files>/<topic_name>/dt=2017-11-27/1_7_00000000000000000500
      If the DelimitedTextFileReader would ever read the second file the offsets of the messages are going to be incorrect. (500-599 instead of 600-699)

This is getting even worse for me because I’m using SplitByFieldMessageParser to split messages into different files. The messages are always spread into different files and so the offsets of the messages read from files are always wrong and whenever secor does file trim we lose a bunch of messages.

Here is a relevant sequence from my log files:

#1 2017-11-13 11:56:47,512 [Thread-5] (com.pinterest.secor.uploader.Uploader) [DEBUG] Uploading for: TopicPartition{mTopic='events', mPartition=14} 
#2 2017-11-13 13:30:33,584 [Thread-4] (com.pinterest.secor.writer.MessageWriter) [DEBUG] appended message ParsedMessage{topic='events', kafkaPartition=14, offset=103446894, kafkaKey=97c4b9932, payload=… 
#3 2017-11-13 14:11:44,020 [Thread-4] (com.pinterest.secor.writer.MessageWriter) [DEBUG] appended message ParsedMessage{topic='events', kafkaPartition=14, offset=103451046, kafkaKey=97c4b9932, payload=… 
#4 2017-11-13 19:11:57,845 [Thread-4] (com.pinterest.secor.uploader.Uploader) [DEBUG] previous committed offset count -1 is lower than committed offset 103422938 is lower than or equal to last seen offset 103488063. Trimming files in topic events partition 14 
#5 2017-11-13 19:11:58,482 [Thread-4] (com.pinterest.secor.common.FileRegistry) [INFO] Deleting writer for path /mnt/secor_data/message_logs/partition/5_15/events/job_created/dt=2017-11-13/1_14_00000000000103415482.gz 
#6 2017-11-13 19:11:58,500 [Thread-4] (com.pinterest.secor.uploader.Uploader) [INFO] removed file /mnt/secor_data/message_logs/partition/5_15/events/job_created/dt=2017-11-13/1_14_00000000000103415482.gz 

So the file was uploaded by the Thread-5 (#1) but then the ownership switched and Thread-4 continued to write messages into the file with the same name (#2 and #3). Couple seconds later secor noticed that partition has already been committed with a higher offset and trims the files (#4).
However, the messages was never copied to the new files and never was uploaded to the S3.

Does it make sense to you guys, or am I just missing something very important here?
Do you have any idea how can I fix this?

@HenryCaiHaiying
Copy link
Contributor

HenryCaiHaiying commented Feb 26, 2018 via email

@jmbutter
Copy link

jmbutter commented Aug 12, 2019

I believe the issue is that only SequenceFileReaderWriterFactory and MessagePackSequenceFileReaderWriterFactory actually store the offset of the messages in the files, and the other formats simply infer their offset based on the starting offset of the file, which is obviously not correct if any file partitioning is used.

I believe this issue could be resolved in a couple ways:

  1. Instead of relying on trimFiles to resolve, the consumer could seek back to the last committed offset after deleting all local files for that partition. This gets a little complex in that the message iterator may be in the middle of a batch of messages, so care needs to be taken to handle this properly.
  2. A new UploadManager could be implemented that will take SequenceFiles and convert them to the appropriate format to be uploaded immediately before upload, allowing the trim logic to work correctly on rebalance. This is very easy, but does make file-size based uploading inaccurate and has performance implications.

I've taken the route of 2) initially to get us going quickly, but would like to circle back around and implement 1) in the long run, if that seems like a reasonable approach. I'd be happy to put up a PR as time allows.

@HenryCaiHaiying
Copy link
Contributor

HenryCaiHaiying commented Aug 16, 2019 via email

@ms309
Copy link

ms309 commented Oct 1, 2019

@jmbutter @HenryCaiHaiying when this fix is going to be in official release or master branch?as I am also facing the same issue where data loss occur during rebalancing.

@GrigorievNick
Copy link

I also have one here.

@HenryCaiHaiying
Copy link
Contributor

With the new SecorConsumerRebalanceListerner and RebalanceHandler, looks like we will be cleaning files on partition reassignment.

To use these classes, build the secor using mvn -Pkafka-2.0.0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants