Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When using AckMode.MANUAL_IMMEDIATE, ack always fails for existing few messages on topic. #62

Closed
shrikantpatel opened this issue May 6, 2016 · 8 comments
Labels

Comments

@shrikantpatel
Copy link

Using ConcurrentMessageListenerContainer, and setting concurrency to 1 for now (in future it may be greater than 1.) Also set container.setAckMode(AckMode.MANUAL_IMMEDIATE)

I am manually triggering acknowledge in onMessage();

    container.setMessageListener(new AcknowledgingMessageListener<K, V>() {
        @Override
        public void onMessage(ConsumerRecord<K, V> message, Acknowledgment ack) {
                          logger.warn("**************************");
                          logger.warn("event id - " + event.getEventId());
                          logger.warn("**************************");
                  ack.acknowledge();
        }
    });

When there are messages already on topic, the acknowledge succeeds for first few messages on topic, but it fails for remaining messages. Once the consumer catches up, the acknowledge for subsequent message (event id -7 ie offset 8 onwards in below log) don't fail.

Why this happens?? Am i doing anything wrong.

2016-05-06 00:10:39.946 INFO 275200 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version : 0.9.0.1
2016-05-06 00:10:39.946 INFO 275200 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : 23c69d62a0cabf06
2016-05-06 00:10:40.214 INFO 275200 --- [ kafka-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[]
2016-05-06 00:10:40.323 INFO 275200 --- [ kafka-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[my-topic-0]
2016-05-06 00:10:40.402 INFO 275200 --- [ kafka-1] o.a.k.c.consumer.internals.Fetcher : Fetch offset 87 is out of range, resetting offset
2016-05-06 00:10:40.511 WARN 275200 --- [ kafka-1] c.p.subscriber.SampleEventSubscriber : **************************
2016-05-06 00:10:40.511 WARN 275200 --- [ kafka-1] c.p.subscriber.SampleEventSubscriber : event id - 0
2016-05-06 00:10:40.511 WARN 275200 --- [ kafka-1] c.p.subscriber.SampleEventSubscriber : **************************
2016-05-06 00:10:40.558 WARN 275200 --- [ kafka-1] c.p.subscriber.SampleEventSubscriber : **************************
2016-05-06 00:10:40.558 WARN 275200 --- [ kafka-1] c.p.subscriber.SampleEventSubscriber : event id - 1
2016-05-06 00:10:40.558 WARN 275200 --- [ kafka-1] c.p.subscriber.SampleEventSubscriber : **************************
2016-05-06 00:10:40.558 WARN 275200 --- [ kafka-1] c.p.subscriber.SampleEventSubscriber : **************************
2016-05-06 00:10:40.558 WARN 275200 --- [ kafka-1] c.p.subscriber.SampleEventSubscriber : event id - 2
2016-05-06 00:10:40.558 WARN 275200 --- [ kafka-1] c.p.subscriber.SampleEventSubscriber : **************************
2016-05-06 00:10:40.558 WARN 275200 --- [ kafka-1] c.p.subscriber.SampleEventSubscriber : **************************
2016-05-06 00:10:40.558 WARN 275200 --- [ kafka-1] c.p.subscriber.SampleEventSubscriber : event id - 3
2016-05-06 00:10:40.558 WARN 275200 --- [ kafka-1] c.p.subscriber.SampleEventSubscriber : **************************
2016-05-06 00:10:40.558 WARN 275200 --- [ kafka-1] c.p.subscriber.SampleEventSubscriber : **************************
2016-05-06 00:10:40.558 WARN 275200 --- [ kafka-1] c.p.subscriber.SampleEventSubscriber : event id - 4
2016-05-06 00:10:40.558 WARN 275200 --- [ kafka-1] c.p.subscriber.SampleEventSubscriber : **************************
2016-05-06 00:10:40.558 WARN 275200 --- [ kafka-1] c.p.subscriber.SampleEventSubscriber : **************************
2016-05-06 00:10:40.558 WARN 275200 --- [ kafka-1] c.p.subscriber.SampleEventSubscriber : event id - 5
2016-05-06 00:10:40.558 WARN 275200 --- [ kafka-1] c.p.subscriber.SampleEventSubscriber : **************************
2016-05-06 00:10:40.558 WARN 275200 --- [ kafka-1] c.p.subscriber.SampleEventSubscriber : **************************
2016-05-06 00:10:40.558 WARN 275200 --- [ kafka-1] c.p.subscriber.SampleEventSubscriber : event id - 6
2016-05-06 00:10:40.558 WARN 275200 --- [ kafka-1] c.p.subscriber.SampleEventSubscriber : **************************
2016-05-06 00:10:40.558 WARN 275200 --- [ kafka-1] c.p.subscriber.SampleEventSubscriber : **************************
2016-05-06 00:10:40.558 WARN 275200 --- [ kafka-1] c.p.subscriber.SampleEventSubscriber : event id - 7
2016-05-06 00:10:40.558 WARN 275200 --- [ kafka-1] c.p.subscriber.SampleEventSubscriber : **************************
2016-05-06 00:10:40.558 ERROR 275200 --- [ kafka-1] o.a.k.c.consumer.OffsetCommitCallback : Commit failed for {my-topic-0=OffsetAndMetadata{offset=3, metadata=''}}

org.apache.kafka.clients.consumer.internals.SendFailedException: null

2016-05-06 00:10:40.574 ERROR 275200 --- [ kafka-1] o.a.k.c.consumer.OffsetCommitCallback : Commit failed for {my-topic-0=OffsetAndMetadata{offset=4, metadata=''}}

org.apache.kafka.clients.consumer.internals.SendFailedException: null

2016-05-06 00:10:40.574 ERROR 275200 --- [ kafka-1] o.a.k.c.consumer.OffsetCommitCallback : Commit failed for {my-topic-0=OffsetAndMetadata{offset=5, metadata=''}}

org.apache.kafka.clients.consumer.internals.SendFailedException: null

2016-05-06 00:10:40.574 ERROR 275200 --- [ kafka-1] o.a.k.c.consumer.OffsetCommitCallback : Commit failed for {my-topic-0=OffsetAndMetadata{offset=6, metadata=''}}

org.apache.kafka.clients.consumer.internals.SendFailedException: null

2016-05-06 00:10:40.574 ERROR 275200 --- [ kafka-1] o.a.k.c.consumer.OffsetCommitCallback : Commit failed for {my-topic-0=OffsetAndMetadata{offset=7, metadata=''}}

org.apache.kafka.clients.consumer.internals.SendFailedException: null

2016-05-06 00:10:40.574 ERROR 275200 --- [ kafka-1] o.a.k.c.consumer.OffsetCommitCallback : Commit failed for {my-topic-0=OffsetAndMetadata{offset=8, metadata=''}}

org.apache.kafka.clients.consumer.internals.SendFailedException: null

@garyrussell
Copy link
Contributor

Can you provide some more information and a test case that reproduces this issue? I just added this test case - it adds 4 messages, starts the container, sends 4 more messages and waits for all 8 to be delivered. I see all the commits working ok...

2016-05-06 11:04:49,045 [testManualExisting-0-kafka-1] DEBUG: OffsetCommitCallback - Commits for {testTopic7-0=OffsetAndMetadata{offset=1, metadata=''}} completed
2016-05-06 11:04:49,045 [testManualExisting-1-kafka-1] DEBUG: OffsetCommitCallback - Commits for {testTopic7-1=OffsetAndMetadata{offset=1, metadata=''}} completed
2016-05-06 11:04:49,046 [testManualExisting-0-kafka-1] DEBUG: OffsetCommitCallback - Commits for {testTopic7-0=OffsetAndMetadata{offset=2, metadata=''}} completed
2016-05-06 11:04:49,046 [testManualExisting-1-kafka-1] DEBUG: OffsetCommitCallback - Commits for {testTopic7-1=OffsetAndMetadata{offset=2, metadata=''}} completed
...
2016-05-06 11:04:49,075 [testManualExisting-0-kafka-1] DEBUG: OffsetCommitCallback - Commits for {testTopic7-0=OffsetAndMetadata{offset=3, metadata=''}} completed
2016-05-06 11:04:49,075 [testManualExisting-1-kafka-1] DEBUG: OffsetCommitCallback - Commits for {testTopic7-1=OffsetAndMetadata{offset=3, metadata=''}} completed
2016-05-06 11:04:49,076 [testManualExisting-1-kafka-1] DEBUG: OffsetCommitCallback - Commits for {testTopic7-1=OffsetAndMetadata{offset=4, metadata=''}} completed
2016-05-06 11:04:49,076 [testManualExisting-0-kafka-1] DEBUG: OffsetCommitCallback - Commits for {testTopic7-0=OffsetAndMetadata{offset=4, metadata=''}} completed

@shrikantpatel
Copy link
Author

was busy today, should have something my monday.

@shrikantpatel
Copy link
Author

archive_name.tar.gz

Gary,

Attach is the spring boot producer and consumer app. The consumer has unit test SpringKakfaSubscriberTest. There are 2 logs for unit test "Complete log from Unit Test.txt" and "Relevant log from Unit Test.txt". In my case the observations are different from your. I changed my unit test to almost match your unit test. I am surprised that i get different result from you.

The producer app publish to local kafka broker on "my-topic" topic and consumer subscribes to this topic.
I start the producer, let it publish few messages on topic, and then shut it down. Then i bring up the subscriber app.

There are 3 log files -->

Consumer_Initial_Start_Log.txt is output when consumer come up for first time. Its able to commit till offset 2 successful, subsequent offset fail, even it able though it read all the messages.

I shutdown the consumer and bring it up again.

Consumer_Second_Start_Log.txt is output of this stage, it start from offset 3, and fails at offset 5. Again able to read all the messages but commit starts failing at some point.

I shutdown the consumer and bring it up again.

Consumer_Third_Start_Log.txt is output of this stage, it start from offset 5 and fail at offset 7.

It like sliding window at each restart. move little forward each time.

@garyrussell
Copy link
Contributor

Interesting -

2016-05-12 14:46:25.494  INFO 63017 --- [        kafka-1] c.shri.subscriber.SpringKafkaSubscriber  : received: key - 25 value - value 25 offset - 160
2016-05-12 14:46:25.497 ERROR 63017 --- [        kafka-1] o.a.k.c.consumer.OffsetCommitCallback    : Commit failed for {my-topic-0=OffsetAndMetadata{offset=160, metadata=''}}

org.apache.kafka.clients.consumer.internals.SendFailedException: null

2016-05-12 14:46:25.497 ERROR 63017 --- [        kafka-1] o.a.k.c.consumer.OffsetCommitCallback    : Commit failed for {my-topic-0=OffsetAndMetadata{offset=161, metadata=''}}

org.apache.kafka.clients.consumer.internals.SendFailedException: null

2016-05-12 14:46:35.208  INFO 63017 --- [        kafka-1] c.shri.subscriber.SpringKafkaSubscriber  : received: key - 0 value - value 0 offset - 161
2016-05-12 14:46:36.003  INFO 63017 --- [        kafka-1] c.shri.subscriber.SpringKafkaSubscriber  : received: key - 1 value - value 1 offset - 162
2016-05-12 14:46:37.005  INFO 63017 --- [        kafka-1] c.shri.subscriber.SpringKafkaSubscriber  : received: key - 2 value - value 2 offset - 163

Unfortunately, the exception is pretty useless; I did find this discussion:

http://grokbase.com/t/kafka/users/163ecj6a23/new-client-commitasync-sendfailedexception

Where they acknowledge that async commits are a bit flaky but, unfortunately, the JIRA issue it points to doesn't exist.

Investigating...

@garyrussell
Copy link
Contributor

@shrikantpatel I just changed your test to use the new MANUAL_IMMEDIATE_SYNC (available in 1.0.0.BUILD-SNAPSHOT) and it seems to be more stable.

garyrussell added a commit to garyrussell/spring-kafka that referenced this issue May 12, 2016
Fixes spring-projects#62
Resolves spring-projects#72

See the discussion on spring-projectsGH-62 `commitAsync()` is not currenly reliable.
Use `commitSync()` by default; add `syncCommits` property to the containers
(default true).

Also allow a user-injected commit callback (spring-projectsGH-72)
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue May 12, 2016
Fixes spring-projects#62
Resolves spring-projects#72

See the discussion on spring-projectsGH-62 `commitAsync()` is not currenly reliable.
Use `commitSync()` by default; add `syncCommits` property to the containers
(default true).

Also allow a user-injected commit callback (spring-projectsGH-72)
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue May 12, 2016
Fixes spring-projects#62
Resolves spring-projects#72

See the discussion on spring-projectsGH-62 `commitAsync()` is not currenly reliable.
Use `commitSync()` by default; add `syncCommits` property to the containers
(default true).

Also allow a user-injected commit callback (spring-projectsGH-72)
@garyrussell
Copy link
Contributor

I found the JIRA Issue - looks like it's fixed, but not until 0.10.0.0, sigh.

@shrikantpatel
Copy link
Author

@garyrussell Thanks for the update.

I see the MANUAL_IMMEDIATE_SYNC is schedule for 1.0.0.RC1, any idea when it will be released?

(I can use the trunk for now, but our process will not allow this in non dev env)

@garyrussell
Copy link
Contributor

Your process is good 😄

RC1 should be out by early next week with GA before the end of the month.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants