Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Duplicated messages when kafka-monitor is restarted #129

Closed
jlisam opened this issue Aug 2, 2018 · 5 comments
Closed

Duplicated messages when kafka-monitor is restarted #129

jlisam opened this issue Aug 2, 2018 · 5 comments

Comments

@jlisam
Copy link
Contributor

jlisam commented Aug 2, 2018

I had to do a redeploy of kafka-monitor and I started seeing duplicated messages through the metrics. Now I understand how kafka-monitor defines something as duplicated through the producing and consuming of the indices; however I was wondering what was the expected behavior of a restart? I do notice that it's trying to catch up with the nextIndex:

[2018-08-02 23:30:34,313] INFO single-cluster-monitor/Current index=197338, nextIndex=686555 for partition=9 (com.linkedin.kmf.services.ConsumeService)
[2018-08-02 23:30:34,414] INFO single-cluster-monitor/Current index=197339, nextIndex=686555 for partition=9 (com.linkedin.kmf.services.ConsumeService)
[2018-08-02 23:30:34,515] INFO single-cluster-monitor/Current index=197340, nextIndex=686555 for partition=9 (com.linkedin.kmf.services.ConsumeService)
[2018-08-02 23:30:34,615] INFO single-cluster-monitor/Current index=197341, nextIndex=686555 for partition=9 (com.linkedin.kmf.services.ConsumeService)
[2018-08-02 23:30:34,716] INFO single-cluster-monitor/Current index=197342, nextIndex=686555 for partition=9 (com.linkedin.kmf.services.ConsumeService)

It's been taking a while so I am wondering if this expected? I haven't dug really deep into the code but I noticed this https://github.com/linkedin/kafka-monitor/blob/master/src/main/java/com/linkedin/kmf/services/ProduceService.java#L167:L177. However, I don't really understand how that helps with the restart.

logs from the producer:

{"topic":"hydro-monitor-topic","time":1533253727144,"index":207774,"producerId":"kmf-producer","content":"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"}
{"topic":"hydro-monitor-topic","time":1533253727144,"index":207753,"producerId":"kmf-producer","content":"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"}
{"topic":"hydro-monitor-topic","time":1533253727097,"index":208174,"producerId":"kmf-producer","content":"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"}
{"topic":"hydro-monitor-topic","time":1533253727198,"index":208175,"producerId":"kmf-producer","content":"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"}
{"topic":"hydro-monitor-topic","time":1533253727097,"index":208171,"producerId":"kmf-producer","content":"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"}
{"topic":"hydro-monitor-topic","time":1533253727198,"index":208172,"producerId":"kmf-producer","content":"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"}

To me it looks like the producer needs to figure out a way to get the last index produced and start from there in a restart (correct me if I am wrong). Let me know if there is anything else you would like me to provide.

Thank you 🙇

@jlisam
Copy link
Contributor Author

jlisam commented Aug 3, 2018

An update on the above, I restarted this again but this time the indices make sense. The consumer picks up the latest produced index.

[2018-08-03 00:38:32,485] INFO single-cluster-monitor/Current index=94, nextIndex=-1 for partition=4 (com.linkedin.kmf.services.ConsumeService)
[2018-08-03 00:38:32,486] INFO single-cluster-monitor/Current index=94, nextIndex=-1 for partition=7 (com.linkedin.kmf.services.ConsumeService)
[2018-08-03 00:38:32,486] INFO single-cluster-monitor/Current index=94, nextIndex=-1 for partition=0 (com.linkedin.kmf.services.ConsumeService)
[2018-08-03 00:38:32,487] INFO single-cluster-monitor/Current index=94, nextIndex=-1 for partition=6 (com.linkedin.kmf.services.ConsumeService)
[2018-08-03 00:38:32,487] INFO single-cluster-monitor/Current index=94, nextIndex=-1 for partition=12 (com.linkedin.kmf.services.ConsumeService)
[2018-08-03 00:38:32,488] INFO single-cluster-monitor/Current index=94, nextIndex=-1 for partition=8 (com.linkedin.kmf.services.ConsumeService)
[2018-08-03 00:38:32,488] INFO single-cluster-monitor/Current index=94, nextIndex=-1 for partition=15 (com.linkedin.kmf.services.ConsumeService)
[2018-08-03 00:38:32,488] INFO single-cluster-monitor/Current index=94, nextIndex=-1 for partition=14 (com.linkedin.kmf.services.ConsumeService)
[2018-08-03 00:38:32,490] INFO single-cluster-monitor/Current index=94, nextIndex=-1 for partition=3 (com.linkedin.kmf.services.ConsumeService)
[2018-08-03 00:38:32,491] INFO single-cluster-monitor/Current index=94, nextIndex=-1 for partition=11 (com.linkedin.kmf.services.ConsumeService)
[2018-08-03 00:38:32,493] INFO single-cluster-monitor/Current index=94, nextIndex=-1 for partition=5 (com.linkedin.kmf.services.ConsumeService)
[2018-08-03 00:38:32,493] INFO single-cluster-monitor/Current index=94, nextIndex=-1 for partition=13 (com.linkedin.kmf.services.ConsumeService)
[2018-08-03 00:38:32,500] INFO single-cluster-monitor/Current index=94, nextIndex=-1 for partition=1 (com.linkedin.kmf.services.ConsumeService)
[2018-08-03 00:38:32,500] INFO single-cluster-monitor/Current index=94, nextIndex=-1 for partition=9 (com.linkedin.kmf.services.ConsumeService)
[2018-08-03 00:38:32,513] INFO single-cluster-monitor/Current index=94, nextIndex=-1 for partition=2 (com.linkedin.kmf.services.ConsumeService)
[2018-08-03 00:38:32,514] INFO single-cluster-monitor/Current index=94, nextIndex=-1 for partition=10 (com.linkedin.kmf.services.ConsumeService)
[2018-08-03 00:38:32,586] INFO single-cluster-monitor/Current index=95, nextIndex=95 for partition=7 (com.linkedin.kmf.services.ConsumeService)
[2018-08-03 00:38:32,587] INFO single-cluster-monitor/Current index=95, nextIndex=95 for partition=4 (com.linkedin.kmf.services.ConsumeService)
[2018-08-03 00:38:32,587] INFO single-cluster-monitor/Current index=95, nextIndex=95 for partition=0 (com.linkedin.kmf.services.ConsumeService)
[2018-08-03 00:38:32,588] INFO single-cluster-monitor/Current index=95, nextIndex=95 for partition=6 (com.linkedin.kmf.services.ConsumeService)
[2018-08-03 00:38:32,588] INFO single-cluster-monitor/Current index=95, nextIndex=95 for partition=15 (com.linkedin.kmf.services.ConsumeService)

@lindong28
Copy link
Contributor

Hey @jlisam,

Thanks for raising the issue. Here is the expected behavior of after kafka-monitor is restarted:

  • For any given partition, ProduceService will send a message with index = 0. Say the offset of this produced message is offset1. Then the producer will send messages with index sequence (offset1, offset1+1, offset1+2,...). So the sequence of indexes of messages from ProduceService is (0, offset1, offset1 + 1, ...)

  • For any given partition, ConsumeService starts consuming from the log end offset, which means that ConsumeService will not consume any message that was produced before it is started. Say the sequence of indexes of the messages it has consumed is (m1, m2, m3, ...). ConsumeService will discard the first offset m2 and start from the second offset. It expects offsets starting from the second offset to be consecutive. In other words, it expect sequence of indexes to be (m1, m2, m2 + 1, m2 + 2, ..). If there is a message whose index is smaller than the previous index (not including the first message), then this is considered a message duplication.

So it looks like the logic is correct for kafka-monitor restart. I think we can prove that, if there is no message loss and message duplication in the Kafka side, ConsumeService should always see consecutively incrementing indexes, right?

Given that you actually observed something weird, there may be problem somewhere. I currently don't have an explanation for that.

@jlisam
Copy link
Contributor Author

jlisam commented Aug 7, 2018

thanks @lindong28 for the prompt response! I will try to track this. So far I can't reproduce this locally but when my instance is running in kubernetes, I almost always hit that case.

@lindong28
Copy link
Contributor

@jlisam Thanks for reporting this issue! Since LinkedIn Kafka team does not use Kubernetes, it is hard for us to investigate this issue if this issue only happens when Kafka Monitor is used with Kubernetes. We will try to fix the issue if you or other use can provide more specific information on what is going wrong with Kafka Monitor to help us debug it.

@lindong28
Copy link
Contributor

I will close this issue. Please feel free to re-open if there is more actionable information for us.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants