Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auto Config Non-Blocking Retries with asyncAcks #2702

Closed
wapkch opened this issue Jun 10, 2023 · 5 comments · Fixed by #2706
Closed

Auto Config Non-Blocking Retries with asyncAcks #2702

wapkch opened this issue Jun 10, 2023 · 5 comments · Fixed by #2706

Comments

@wapkch
Copy link

wapkch commented Jun 10, 2023

In what version(s) of Spring for Apache Kafka are you seeing this issue?

3.0.7

Describe the bug

when using asyncAcks together with RetryableTopic, the retried message will not be invoked on KafkaListener.

To Reproduce

@RestController
@SpringBootApplication
public class SpringkafkaAsyncacksDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringkafkaAsyncacksDemoApplication.class, args);
    }

    @Autowired
    private KafkaTemplate<Object, Object> template;

    @PostMapping(path = "/send/{what}")
    public void sendFoo(@PathVariable String what) {
        this.template.send("topic1", what);
    }

    @KafkaListener(id = "fooGroup", topics = "topic1")
    @RetryableTopic(attempts = "5", backoff = @Backoff(delay = 2_000, maxDelay = 10_000, multiplier = 2))
    public void onMessage(ConsumerRecord<String, ?> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
        if (record.value().equals("fail")) {
            throw new RuntimeException("failed");
        }
        acknowledgment.acknowledge();
    }

}

application.properties

spring.kafka.listener.async-acks=true
spring.kafka.retry.topic.enabled=true
spring.kafka.listener.ack-mode=manual
  1. Run local kafka server
  2. Start the application
  3. Access http://localhost:8080/send/fail, and the retried message will not be invoked on KafkaListener.

Expected behavior

I expect Non-Blocking Retries can work with asyncAcks.

I konw using asyncAcks in above case is meaningless, but sometimes the async processing in @KafkaListener is useful. For example:

@Component
public class FooConsumer {

    @KafkaListener(id = "fooGroup", topics = "topic1")
    @RetryableTopic(attempts = "5", backoff = @Backoff(delay = 2_000, maxDelay = 10_000, multiplier = 2))
    public void onMessage(ConsumerRecord<String, ?> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
       threadPoolTaskExecutor.submit(() -> {
           try {
                 doConsume(record);
                 acknowledgment.acknowledge();
           } catch (Exception ex) {
                 container.getCommonErrorHandler().handleOne(new RuntimeException(), record, consumer, container);
           }
        });
    }

}

Sample

https://github.com/wapkch/springkafka-asyncacks-demo

@garyrussell
Copy link
Contributor

I will look at your example but, with this code:

       threadPoolTaskExecutor.submit(() -> {
           try {
                 doConsume(record);
                 acknowledgment.acknowledge();
           } catch (Exception ex) {
                 container.getCommonErrorHandler().handleOne(new RuntimeException(), record, consumer, container);
           }
        });

You must call acknowledgment.acknowledge(); after the error handler handles the error.

@garyrussell
Copy link
Contributor

garyrussell commented Jun 12, 2023

@wapkch After further review, the use of asyncAcks with @RetryableTopic requires the error handler seekAfterError property to be false. We need to document this limitation.

In addition, the built-in DLT handler is not compatible with MANUAL acks.

See the following changes I made to your example app to make it work...

@RestController
@SpringBootApplication
public class SpringkafkaAsyncacksDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringkafkaAsyncacksDemoApplication.class, args);
    }

    @Autowired
    private KafkaTemplate<Object, Object> template;

    @GetMapping(path = "/send/{what}")
    public void sendFoo(@PathVariable String what) {
        this.template.send("topic1", what);
    }
    
}

@Component
class Listener {

    @KafkaListener(id = "fooGroup", topics = "topic1")
    @RetryableTopic(attempts = "2", backoff = @Backoff(delay = 2_000, maxDelay = 2_000, multiplier = 2))
    public void onMessage(ConsumerRecord<String, ?> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
        System.out.println(record.topic() + ":" + record.value());
        if (record.value().equals("fail")) {
            throw new RuntimeException("failed");
        }
        acknowledgment.acknowledge();
    }

    @DltHandler
    void dlt(ConsumerRecord<String, ?> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
        System.out.println(record.topic() + ":" + record.value());
        acknowledgment.acknowledge();
    }

}

@Configuration
class Config extends RetryTopicConfigurationSupport {

    @Override
    protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
        customizersConfigurer.customizeErrorHandler(eh -> eh.setSeekAfterError(false));
    }

    @Bean
    TaskScheduler sched() {
        return new ThreadPoolTaskScheduler();
    }

}

@garyrussell garyrussell self-assigned this Jun 13, 2023
@garyrussell garyrussell changed the title Non-Blocking Retries don't work with asyncAcks Auto Config Non-Blocking Retries with asyncAcks Jun 13, 2023
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Jun 13, 2023
Resolves spring-projects#2702

When using `asyncAcks` with manual ack modes, the `DefaultErrorHandler`
must have `seekAfterError` set to `false`; this required user configuration.

The framework now unconditionally sets the property when it configures a container
using a manual ack mode.

In addition, the default DLT handler was not compatible with any manual ack mode,
regardless of the `asyncAcks` setting.

Add `Acknowledgment` to the `LoggingDltListenerHandlerMethod`.

**cherry-pick to 2.9.x**
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Jun 13, 2023
Resolves spring-projects#2702

When using `asyncAcks` with manual ack modes, the `DefaultErrorHandler`
must have `seekAfterError` set to `false`; this required user configuration.

The framework now unconditionally sets the property when it configures a container
using a manual ack mode.

In addition, the default DLT handler was not compatible with any manual ack mode,
regardless of the `asyncAcks` setting.

Add `Acknowledgment` to the `LoggingDltListenerHandlerMethod`.

Also tested with reporter's reproducer.

**cherry-pick to 2.9.x (will require instanceof polishing for Java 8)**
artembilan pushed a commit that referenced this issue Jun 14, 2023
Resolves #2702

When using `asyncAcks` with manual ack modes, the `DefaultErrorHandler`
must have `seekAfterError` set to `false`; this required user configuration.

The framework now unconditionally sets the property when it configures a container
using a manual ack mode.

In addition, the default DLT handler was not compatible with any manual ack mode,
regardless of the `asyncAcks` setting.

Add `Acknowledgment` to the `LoggingDltListenerHandlerMethod`.

Also tested with reporter's reproducer.

**cherry-pick to 2.9.x (will require instanceof polishing for Java 8)**

* Only supply `NoOpAck` with explicit `@NonNull` param annotation.
garyrussell added a commit that referenced this issue Jun 14, 2023
Resolves #2707

When using `asyncAcks` with manual ack modes, the `DefaultErrorHandler`
must have `seekAfterError` set to `false`; this required user configuration.

The framework now unconditionally sets the property when it configures a container
using a manual ack mode.

In addition, the default DLT handler was not compatible with any manual ack mode,
regardless of the `asyncAcks` setting.

Add `Acknowledgment` to the `LoggingDltListenerHandlerMethod`.

Also tested with reporter's reproducer.

**cherry-pick to 2.9.x (will require instanceof polishing for Java 8)**

* Only supply `NoOpAck` with explicit `@NonNull` param annotation.
@wapkch
Copy link
Author

wapkch commented Jun 15, 2023

Thanks!

@pkgonan
Copy link

pkgonan commented Jul 6, 2023

@garyrussell
Hi.

In Spring Kafka 3.0.8 latest version

When data is fetched from @KafkaListener based on MANUAL_ACK, if ack cannot be done, data cannot be fetched from @KafkaListener anymore.

How should I solve this?

I don't want to use DLQ, nor do I want to use Retry Topic. I want it to work with Blocking Retry.

Is there any code I can refer to?

After reading the blog below, I could see that the data read once was stored inside Spring Kafka, so the data was no longer imported. However, in this case, if new data is not sent to the topic, there seems to be a major bug in that old data cannot be consumed forever until the topic is rebalanced. What do you think?

https://jessyt.tistory.com/167

@garyrussell
Copy link
Contributor

Don't ask questions on closed issues, especially non-related ones; use Stack Overflow or the discussions tab for Q&A.

It is not clear what you mean by "ack cannot be done".

You have to throw an exception so the error handler will take care of redelivering the same record.

Please provide an MCRE so I can understand your question better.

This issue was closed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants