Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't continue with messages on error #1114

Closed
stphngrtz opened this issue Oct 26, 2017 · 13 comments
Closed

Don't continue with messages on error #1114

stphngrtz opened this issue Oct 26, 2017 · 13 comments

Comments

@stphngrtz
Copy link

Hello everyone! I've built a simple message consumer that will throw an exception if the payload contains the string "error". With default settings, the following happens: (kafka, topic with only 1 partition)

  • send "error"
  • send "hello"
  • consume "error", fail
  • retry, fail
  • retry, fail
  • print exception stacktrace
  • consume "hello"
  • ack message, increment offset

What I want is the system to stop consuming messages on error. Doing retries is fine but please don't consume "hello". I don't think this is a very unique request but I haven't found a solution yet. Maybe you guys are able help? I've already tried my luck at stackoverflow, but without success.

@stphngrtz
Copy link
Author

This issue is similiar to #542. If, after three retries the "error" message is being put on a dlq, then I'm fine with continueing with "hello". If no dlq is configured I would like stop.

@olegz olegz self-assigned this Oct 27, 2017
@olegz
Copy link
Contributor

olegz commented Oct 27, 2017

@stphngrtz Thank you for bringing this up. We will definitely consider it, however. . .
What's the end game (in your opinion)? Assume the consumer stops; What condition do you envision would resume the consumption of messages?

@stphngrtz
Copy link
Author

With a restart of the application.

If it's an exception because of a bug in the business code, the bug would need to be fixed and the application recompiled. If it's a technical exception that could not be resolved with retries but is to be fixed without changes to the code, a crashed database for example, simply restart the service as soon as the database is back online.

@olegz
Copy link
Contributor

olegz commented Oct 27, 2017

So, what you'er advocating is that if "poisoned message" has been received stop the consumer and go through resolution process (whatever that may be) and restart after problem is resolved.
But with Kafka it may not be as clean cut as it seems, since in the case of multiple consumers Kafka will rebalance and the subsequent message will still be consumed by another consumer.
Thoughts?
In fact come to think of it it will be similar for Rabbit as well

@stphngrtz
Copy link
Author

If another consumer of the same group would be able to consume the message without exception then I wouldn't be in such a hurry to fix the first one ;) If not, the partition would still be blocked until anyone is able to consume the blocking message. That is exactly what I need.

Imagine a user-created and a user-updated event in an CQRS like system. How should user-updated modify the view, if user-created has failed?

@stphngrtz
Copy link
Author

Good morning. Is there any update on this issue? If I'm able to help in any way, please let me know.

@olegz
Copy link
Contributor

olegz commented Nov 9, 2017

@stphngrtz sorry for the delay. Yes we are definitely looking into that. It is part of the greater effort of better managing lifecycle of some of the core components.
We are hoping that this and few other related issues will be part of the 2.0 release.

@stphngrtz
Copy link
Author

Just in case anyone is facing a similar issue and cannot wait until this feature has been added to Spring Cloud Stream, with plain Spring Kafka it is very simple to stop the consumer.

@KafkaListener(id = "dummy-listener", topics = "dummy-topic", groupId = "dummy-group", errorHandler = "errorHandler")
void receive(Message<String> message, Acknowledgment acknowledgment) {
    if (message.getPayload().contains("error"))
        throw new RuntimeException();

    log.info("received: {} [{}]", message.getPayload(), message.getHeaders());
    acknowledgment.acknowledge();
}
@Autowired
KafkaListenerEndpointRegistry registry;

@Bean
KafkaListenerErrorHandler errorHandler() {
    return (message, e) -> {
        log.info("error handler for message: {} [{}], exception: {}", message.getPayload(), message.getHeaders(), e.getMessage());
        MessageListenerContainer listenerContainer = registry.getListenerContainer("dummy-listener");
        listenerContainer.stop();
        throw new RuntimeException(e);
    };
}

application,yml

spring.kafka:
  consumer:
    enableAutoCommit: false
    maxPollRecords: 1
  listener.ackMode: manual

You could write a @CustomKafkaListener with a CustomKafkaListenerAnnotationBeanPostProcessor to hide all this behind the curtain.

Spring Cloud Stream is not using the KafkaListenerEndpointRegistry. I've decided to use plain Spring Kafka until this feature has been added to Spring Cloud Stream.

@garyrussell
Copy link
Contributor

It's best not to stop the container on the consumer thread - it causes a delay because stop() waits for a while until the container actually stops.

See how we do it with an executor in the new ContainerStoppingErrorHandler here, available in spring-kafka 2.1.

@rahulsom
Copy link

Strangely enough, I'm seeing what @stphngrtz is asking for work correctly on my app. My only problem was spring-boot actuator does not automatically expose this. I ended up adding a healthcheck which marks the application as DOWN if any of those exceptions occurs.

@stphngrtz
Copy link
Author

@garyrussell thanks for pointing this out. With Spring Boot 2 M7 you get Spring Kafka 2.1.0.RC1 which is lacking the ContainerStoppingErrorHandler. That reminds me, that I need to check for updates more often.

@garyrussell
Copy link
Contributor

I need to check for updates more often.

You might want to consider subscribing to the release blog feed at https://spring.io/blog/category/releases.atom

Spring for Apache Kafka 2.1.0.RELEASE (and 1.3.2, 2.0.2) Available mentions the new error handlers.

@olegz olegz added this to the 2.1.x milestone Feb 23, 2018
@spring-cloud spring-cloud deleted a comment from messaoudi-mounir Oct 23, 2018
@olegz olegz modified the milestones: 2.2.x, 2.3.x May 8, 2019
@pitinga
Copy link

pitinga commented Sep 16, 2021

Would be possible achieve this?

  • send "error"
  • consume "error", fail
  • consume "hello"
  • retry, fail (after wait 5s)
  • retry, fail (after wait 5s)
  • print exception stacktrace
  • ack message, increment offset

While the consumer waits to retry the error message it continues consuming other messages.

@olegz olegz closed this as completed Jul 25, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants