Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH 920 - Topic-based retry support #1664

Merged
merged 16 commits into from
Feb 16, 2021

Conversation

tomazfernandes
Copy link
Contributor

@tomazfernandes tomazfernandes commented Jan 3, 2021

Please refer to the RetryTopicConfigurer class' JavaDoc for an overview of the functionalities: https://github.com/tomazfernandes/spring-kafka/blob/GH-920/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurer.java

I've separated the code in 5 commits:

  • Pausing partitions in the MessageListenerContainer -> so that we don't pause the entire consumer and end up backing off the other partition's messages more than we should
  • BackOff Manager and ListenerAdapter -> Functionality to read a timestamp header and manage the partition's consumption by listening to events
  • RetryTopic functionality -> Adds the topics / consumers configuration functionality
  • A few style checks I've missed
  • Some improvements to the javadoc

@garyrussell
Copy link
Contributor

Thanks, @tomazfernandes we prefer rebasing PRs rather than adding merge commits (we'll rebase at the end anyway).

@tomazfernandes
Copy link
Contributor Author

Thanks, @tomazfernandes we prefer rebasing PRs rather than adding merge commits (we'll rebase at the end anyway).

Ok @garyrussell, I actually tried that, but couldn’t push or force push to this branch, it seemed to me because of the open PR (not really experienced with rebasing) Should I try squashing this merge / conflict resolution and rebasing again? Or maybe just leave the conflict there as it was?

@garyrussell
Copy link
Contributor

No don't worry; we'll squash and rebase before merging to master any way; weird that you couldn't force push, though.

@garyrussell
Copy link
Contributor

Hi @tomazfernandes I finally found some time to take a quick look at this.

It is very impressive and seems to be a complete solution and will be a valuable addition to the framework. I like the approach.

It's a huge amount of code to review, though so it will take some time.

That said, I am inclined to merge it as-is and, perhaps, document it as an "experimental" feature, at least initially; I am sure we can get it into next month's 2.7.0-M2 milestone, as long as you have time to address a few issues.

  • First a couple of style issues: we wrap our javadocs at column 90 (when possible) and code at 120 (when possible).

  • Before the milestone, we will need at least some test cases; but the more coverage, the better; we don't want to trigger our Sonar coverage gate (we're currently a little short of 80%, the gate is currently 70%).

  • We will need to document the feature in src/reference/asciidoc - it probably deserves a whole new chapter, rather than sprinkling stuff throughout the other sections (aside from documenting the new container properties).

We would not need the docs for M2, but the sooner, the better. Our only strict asciidoctor rule is one-sentence-per-line, but you can see the other docs for examples.

Thanks again for such a significant contribution!!

@tomazfernandes
Copy link
Contributor Author

tomazfernandes commented Jan 22, 2021

Hi @garyrussell, that's awesome news! I'm really glad you liked it, thank you very much, I'm really excited about this. Sure, I'll address these issues and implement the tests, no problem. There are a couple of things I've been meaning to improve as well, so I'll get to it.

Also, if you have any suggestions please let me know.

When would it be a good timeframe for me to finish the changes to get in M2?

@garyrussell
Copy link
Contributor

M2 is currently scheduled for Feb 17: https://github.com/spring-projects/spring-kafka/milestone/135

So, we have a few weeks.

@tomazfernandes
Copy link
Contributor Author

Sounds perfect @garyrussell. Thank you very much for the opportunity.

@tomazfernandes
Copy link
Contributor Author

Hi @garyrussell, just a quick update. I've implemented some changes and improvements, as well as more than 90% test coverage, both integration and unitary.

What's missing is updating the javadocs and addressing the style changes, as well as the documentation, which I plan on doing this week. Unfortunately I had covid and that set me back a couple of weeks, otherwise I'd probably have everything ready by now.

I'll commit the code as is so you can take a look if you want - it won't build with Gradle due to checkstyle issues, but it should build and run normally on IntelliJ or by disabling checkstyle.

What would be the deadline for committing the javadoc / style adjustments in order to make it into M2?

Thanks!

@garyrussell
Copy link
Contributor

garyrussell commented Feb 8, 2021

@tomazfernandes Thanks. The PR needs to be clean and reviews complete by end of day February 16.

The reference manual (which has now moved to spring-kafka-docs/src/main/asciidoc) should be lowest priority and can miss M2 if necessary (although it would be nice to have).

FYI, we are not working this Friday (12th) and next Monday (15th).

@garyrussell
Copy link
Contributor

Sorry to read about your Covid encounter; I hope you are and remain well.

@tomazfernandes
Copy link
Contributor Author

Thanks for your concern Gary, it wasn't a fun ride at all, but thankfully not as bad as it could have been. I'm well now.

About the code, I'll clean up the PR, think I should have it done by tomorrow. Then I'll work on the documentation until the 16th, but hopefully I'll have it ready sooner. Of course, if you feel that's too close to the M2 release date we can push it back to the following release, although I'd really like if we can make it for M2.

@tomazfernandes
Copy link
Contributor Author

Hi @garyrussell, I've cleaned the PR and formatted the javadocs and code the way you asked. If you could take a look, I think it should be fine now.

I'll start working on the documentation soon to have it ready for M2, please let me know if there's anything else that needs to be done or anything I might have missed.

Thanks again for the opportunity!

@garyrussell
Copy link
Contributor

garyrussell commented Feb 9, 2021

Please rebase to master; thanks. (Or merge; you said you had problems rebasing before).

@tomazfernandes
Copy link
Contributor Author

@garyrussell, seems like this time I got the rebase right. I'll start working on the documentation. Thanks.

@tomazfernandes
Copy link
Contributor Author

Hi @garyrussell, I've created the documentation for the non-blocking retry functionality and updated the relevant parts of the previous documentations. Hope it's ok, please let me know if there's anything else needed.

Thanks!

Copy link
Contributor

@garyrussell garyrussell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent work; thanks!

Just a few doc polishing comments and a suggestion to change one constant name.

Thanks again; I think this can go into next week's M2.

spring-kafka-docs/src/main/asciidoc/kafka.adoc Outdated Show resolved Hide resolved
spring-kafka-docs/src/main/asciidoc/retrytopic.adoc Outdated Show resolved Hide resolved
spring-kafka-docs/src/main/asciidoc/retrytopic.adoc Outdated Show resolved Hide resolved
spring-kafka-docs/src/main/asciidoc/retrytopic.adoc Outdated Show resolved Hide resolved
spring-kafka-docs/src/main/asciidoc/retrytopic.adoc Outdated Show resolved Hide resolved
spring-kafka-docs/src/main/asciidoc/retrytopic.adoc Outdated Show resolved Hide resolved
spring-kafka-docs/src/main/asciidoc/retrytopic.adoc Outdated Show resolved Hide resolved
spring-kafka-docs/src/main/asciidoc/retrytopic.adoc Outdated Show resolved Hide resolved
@tomazfernandes
Copy link
Contributor Author

tomazfernandes commented Feb 11, 2021

Thanks a lot @garyrussell! I'll make these adjustments today after my 9-5.

Also, there are a few other improvements / functionalities I'd like do add, such as:

  • make DLT and DLT processing optional
  • include the possibility of configuring a global timeout for the retries
  • include the ability to configure retries from the application.properties files
  • some other minor things

Do you think it's worth trying to get those into M2, or maybe we should go with what we have now and create a new PR afterwards for the next release (considering all goes well)? I'd probably have it done by Tuesday (since you won't be working tomorrow and next Monday).

Copy link
Contributor

@garyrussell garyrussell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tomazfernandes

make DLT and DLT processing optional

I thought about that too, while reading the docs, but then I thought it doesn't hurt to log it - they can always use a different application to consume from the DLT. But I guess it would be nice to have it configurable.

include the ability to configure retries from the application.properties files

That would have to be a PR submitted to the Boot team, after they upgrade to 2.7.0.M2 (next Wednesday).

Do you think it's worth trying to get those into M2

It's up to you, if they are small enough, I still have Tuesday to review.

I would, however, suggest a new PR(s) after we merge this, rather than adding to this (rather large) one.

Another thought I just had is we should at least offer an option to use a different groupId for the retry container Vs. the main container - otherwise a rebalance on a retry topic will cause an unnecessary rebalance on the main topic.

We also may want to use a different container for each tier of retries, for the same reason.

But we can discuss these after M2.

Thanks again.

@tomazfernandes
Copy link
Contributor Author

tomazfernandes commented Feb 11, 2021

@garyrussell thanks for your comments, I agree on the DLT part.

For the group id, it gets suffixed with the retry topic's suffix as long as the user provides a groupId for the main topic. You can check it out in RetryTopicConfigurer.java:431, that's one of the functions for the customiser added to the KafkaListenerABPP processListener method. So each retry topic / dlt should get it's own consumer group.

I think there might be some minor adjustments to make in that logic to cover all possibilities of replicating the main endpoint's configuration, maybe we can look into that some time after M2. But it should be good for most use cases.

Also, I think I should be able to retrieve the application.properties from the ApplicationContext in order to create a retry configuration, shouldn't I? Or even create a bean for that purpose. Maybe there's a more "Springy" way to do that, which probably is the case for other places of the code as well, feel free to point them out as you see them. I'll take a look into how they're handled internally by the Spring Boot app.

As for the other improvements, I think I'll try to code them until Tuesday and maybe commit it to a separate branch, if this PR hasn't been merged by then. Then you can decide whether or not it's small enough for you to review and merge to M2, or if it goes to the release after that - I'll be good with either. Also maybe I can separate the commits so it doesn't have to be an all or nothing decision.

Thanks a lot again!

@tomazfernandes
Copy link
Contributor Author

tomazfernandes commented Feb 11, 2021

I'll add this groupId part to the documentation when I make the adjustments later today.

@garyrussell
Copy link
Contributor

Also, I think I should be able to retrieve the application.properties from the ApplicationContext in order to create a retry configuration, shouldn't I? Or even create a bean for that purpose.

Right, the properties are in the environment, but it's separation of concerns; application.properties/.yml is processed by Boot's auto configuration in KafkaAnnotationDrivenConfiguration and KafkaProperties . I think it would be too confusing for automatic configuration via properties to be handled in two different places.

Also Boot has much goodness in its property mapping - e.g. some-timeout: 5s Vs. 5000ms for delays, etc., and completion hints/validation in IDE editors, we don't want to reinvent all that here and, if it's not supported, it would be inconvenient for users.

I am sure the Boot team will be receptive to a PR submitted to enhance the auto configuration for this important enhancement.

@tomazfernandes
Copy link
Contributor Author

Sounds great @garyrussell, I'll look into adding it to Spring Boot's properties then, probably after M2. Thank you very much for your support.

…orresponding events to MessageListenerContainer interface and its implementations.
…picConfigurer.java in the retrytopic package for more information.
- Changes in dependency management
- RetryTopicBootstrapper
- Some refactorings
Many improvements
Test implementations
@Duncol
Copy link

Duncol commented Apr 2, 2021

Hi Tomaz, thanks for the quick response! I am using spring-kafka 2.7.0-M2. This is not a requirement (the requirement is far less frequent TBH), just wanted to check how is this behaving and stumbled across this weird, nearly linear 'magic' 10s delay, which I don't know where it comes from. I've tried to set this to 60s and at start it seems quite fine, but sometimes it happened to be quite spontaneous:
image
(looks like the 2nd and 3rd msg is delayed by configured backoff + 10s)

but when message throughput is less 'quiet' (seems like less than 10s in betweeen), I get almost exact 10s delay for each:
image

Maybe my configuration is not sufficient somehow? I've provided solely the @backoff(60000L) for this case. Is there something to be configured for the kafka perhaps, or more in the retry feature itself?

@garyrussell
Copy link
Contributor

@Duncol Try upgrading to 2.7.0-RC1; with M2, it was tightly coupled to the poll timeout and idle interval; I saw similar issues with M2 and @tomazfernandes made some improvements that are included in RC1.

@Duncol
Copy link

Duncol commented Apr 12, 2021

@garyrussell @tomazfernandes I've upgraded to the RC1, but now it does not seems to retry at all. I still get the KafkaBackoffException (more concise in RC1), logging the approx backoff, but nothing happens after that (needs restart of the service to kick next (single) retry). M2 works well for my configuration (except the aforementioned issues with backoff times) and the only thing I've changed was the M2 -> RC1 and config data types (int/long -> String) in @RetryableTopic

More info about my approach: My retry is based on RuntimeExceptions (exception thrown -> should retry, no exception -> no further retries). First retry is initiated in try/catch around the core service; catch block sends wrapped message to first retry topic via KafkaTemplate. When first topic retry is exhausted and the wrapped msg lands in the DLT, I pass that msg to another retry topic (via KafkaTemplate as for the initial retry topic). After second retry topic 'completes', the DLT just logs the msg, with no further passing.

@tomazfernandes
Copy link
Contributor Author

Hi @Duncol, thanks a lot for bringing this up! There's indeed a bug when we use the same factory for the KafkaListener and RetryableTopic annotations. It'll be fixed ASAP, but for now as a workaround if you specify a different factory instance for the RetryableTopic annotation it should work.

This scenario will be added to our integration tests so that it doesn't happen again.

Please let us know how it turns out.

Thanks again!

@garyrussell garyrussell added this to the 2.7.0 milestone Apr 12, 2021
@garyrussell garyrussell removed this from the 2.7.0 milestone Apr 12, 2021
@garyrussell
Copy link
Contributor

@tomazfernandes There still seems to be something amiss - when I changed my test app to use a different factory, I see this

2021-04-12 16:49:44.152  INFO 35392 --- [   kgh920-0-C-1] com.example.demo.Kgh920Application       : foo from kgh920
2021-04-12 16:49:44.657  INFO 35392 --- [etry-1000-0-C-1] com.example.demo.Kgh920Application       : foo from kgh920-retry-1000
2021-04-12 16:49:45.167  INFO 35392 --- [etry-2000-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-kgh920-retry-2000-1, groupId=kgh920-retry-2000] Seeking to offset 3 for partition kgh920-retry-2000-0
2021-04-12 16:49:45.167  WARN 35392 --- [etry-2000-0-C-1] essageListenerContainer$ListenerConsumer : Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.listener.KafkaBackoffException: Partition 0 from topic kgh920-retry-2000 is not ready for consumption, backing off for approx. 490 millis.
2021-04-12 16:49:47.175  INFO 35392 --- [etry-2000-0-C-1] com.example.demo.Kgh920Application       : foo from kgh920-retry-2000
2021-04-12 16:49:47.681  INFO 35392 --- [etry-4000-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-kgh920-retry-4000-4, groupId=kgh920-retry-4000] Seeking to offset 3 for partition kgh920-retry-4000-0
2021-04-12 16:49:47.681  WARN 35392 --- [etry-4000-0-C-1] essageListenerContainer$ListenerConsumer : Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.listener.KafkaBackoffException: Partition 0 from topic kgh920-retry-4000 is not ready for consumption, backing off for approx. 1494 millis.
2021-04-12 16:49:49.688  INFO 35392 --- [etry-4000-0-C-1] com.example.demo.Kgh920Application       : foo from kgh920-retry-4000
2021-04-12 16:49:50.200  INFO 35392 --- [etry-8000-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-kgh920-retry-8000-5, groupId=kgh920-retry-8000] Seeking to offset 3 for partition kgh920-retry-8000-0
2021-04-12 16:49:50.200  WARN 35392 --- [etry-8000-0-C-1] essageListenerContainer$ListenerConsumer : Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.listener.KafkaBackoffException: Partition 0 from topic kgh920-retry-8000 is not ready for consumption, backing off for approx. 3488 millis.
2021-04-12 16:49:53.699  INFO 35392 --- [etry-8000-0-C-1] com.example.demo.Kgh920Application       : foo from kgh920-retry-8000
2021-04-12 16:49:54.210  INFO 35392 --- [gh920-dlt-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-kgh920-dlt-6, groupId=kgh920-dlt] Seeking to offset 3 for partition kgh920-dlt-0
2021-04-12 16:49:54.210  WARN 35392 --- [gh920-dlt-0-C-1] essageListenerContainer$ListenerConsumer : Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.listener.KafkaBackoffException: Partition 0 from topic kgh920-dlt is not ready for consumption, backing off for approx. 7489 millis.
2021-04-12 16:50:01.705  INFO 35392 --- [gh920-dlt-0-C-1] com.example.demo.Kgh920Application       : foo from kgh920-dlt
@RetryableTopic(attempts = "5", backoff = @Backoff(delay = 1000, multiplier = 2.0),
			listenerContainerFactory = "retryFactory")
@KafkaListener(id = "kgh920", topics = "kgh920")
public void listen(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
	LOG.info(in + " from " + topic);
	throw new RuntimeException("test");
}

The first retry is +500ms instead of 1s, the next retry is +2s (correct), the next is +2.5s instead of 4, the next retry is +4s instead of 8.

I then see 8 seconds before it goes to the DLT - in earlier versions, I am sure that it went straight to the DLT after the 8 second delivery attempt failed.

@tomazfernandes
Copy link
Contributor Author

@garyrussell, there are two things going on there: the first is a bug I've found now where the current topic's delay is being used instead of the next topic's - so it'd wrongly be 0, 1s, 2s, 4s, 8s.

The 500ms+- differences are related to the poll timeout, which has to be a lot smaller in the retry topics if we have low backoffs such as 1s - not much time to go through all the pause - partition idle event - resume container - resume consumer process considering it takes about 500ms to get there in the first place. I've changed the default configuration to do that in the latest PR regarding this.

I've already fixed this bug and will submit a PR after I run the tests.

If you can test this out again tomorrow with the fixes it'll be great, I think everything should work as expected.

I'll also open the PR for the factories bug.

Thanks!

@Duncol
Copy link

Duncol commented Apr 14, 2021

@tomazfernandes Separate ListenerFactory works, thanks for the quick response, I can move forward now :). Just one side question (maybe more for you @garyrussell ) - whatis the planned release date of spring-kafka 2.7.0?

@garyrussell
Copy link
Contributor

@Duncol Later today https://github.com/spring-projects/spring-kafka/milestones

@Duncol
Copy link

Duncol commented Apr 16, 2021

Found something strange when writing IT for my retry feature - seems like messages addressed for the -retry topics are doubled (except the first one)? I might be raising a false alarm due to some misunderstanding of deeper internals (still learning Kafka), but thought it would be worth mentioning.
image

@tomazfernandes
Copy link
Contributor Author

Found something strange when writing IT for my retry feature - seems like messages addressed for the -retry topics are doubled (except the first one)? I might be raising a false alarm due to some misunderstanding of deeper internals (still learning Kafka), but thought it would be worth mentioning.
image

Hi @Duncol! Can you share the code where you’re getting this list from? Is it a batch consumer?

Thanks for mentioning, your feedback is very important for us!

@Duncol
Copy link

Duncol commented Apr 16, 2021

@tomazfernandes (just mind it's contantly improved WIP and I'm looking for a cleaner way to register RecordInterceptor just for tests :) )
image

This is defined in @TestConfiguration class
The previous screen shows this collection after all retries.
retry/auxRetry are the separate listener container factories for @KafkaListener/@RetryableTopic

Everything starts as a single message sent via KafkaTemplate

@tomazfernandes
Copy link
Contributor Author

Hmm, that’s strange. What’s the “callbackKafkaListenerContainerFactory” for?

The retry topic’s mechanism relies on the battle tested dead letter publishing recovered to forward messages, and we didn’t see any behavior like this before.

The only possibility I see from the feature side would be if we’re registering two consumers per topic, which again never happened in our tests.

So what comes to mind is this:
Can you check if you have two consumer instances for each topic? You might be able to notice that by putting a breakpoint in the pollAndInvoke method in the KafkaMessageListenerContainer class and checking the instance id for each message consumption.

The other possibility I see would be if you’re for some reason registering two interceptors for the same factory instance, not really sure how to check for that but probably a breakpoint in the interceptor assignment will do.

@garyrussell, any thoughts on this?

@artembilan
Copy link
Member

According to your screenshot all the interceptors are placing their records into the same consumerRecords collection.
So, it might not be a surprise to see the same data in the tests when you produce a record into a topic.

Rings a bell?

@Duncol
Copy link

Duncol commented Apr 16, 2021

@tomazfernandes 'callbackKafkaListenerContainerFactory' is for our main @KafkaListener. I'm not placing the @RetryableTopic directly over this listener - instead, I have separate handler with two listening methods (each having @KafkaListener + @RetryableTopic over them). I also have one method with @DltHandler.
It goes something like this:

(external service) --msg-->

@KafkaListener(containerFactory = "callbackKafkaListenerContainerFactory") --on-error-->

@KafkaListener(topics = "first-topic", containerFactory = "callbackRetryKafkaListenerContainerFactory")
@RetryableTopic(listenerContainerFactory = callbackRetryAuxKafkaListenerContainerFactory) --retry-exhaustion-->

@DltHandler --send-to-next-retry-topic-->

@KafkaListener(topics = "first-topic", containerFactory = "callbackRetryKafkaListenerContainerFactory")
@RetryableTopic(listenerContainerFactory = callbackRetryAuxKafkaListenerContainerFactory) --retry-exhaustion-->

@DltHandler (do nothing more)

callbackRetryKafkaListenerContainerFactory and callbackRetryAuxKafkaListenerContainerFactory are having the same ConsumerFactory
Each of those methods have own logging which prove, that the retry count is correct (i.e. no doubled messages)
I could perhaps place @RetryableTopic directly over the:

@KafkaListener(containerFactory = "callbackKafkaListenerContainerFactory")
But I wanted to decouple the code this way

@Duncol
Copy link

Duncol commented Apr 16, 2021

@artembilan so in each retry, the message goes through both @KafkaListener's and @RetryableTopic's listenerContainerFactory (and thus - configured consumer)? Such scenario would match my outcome. I would expect, that only initial msg arrival is consumed by consumer configured for callbackRetryKafkaListenerContainerFactory (i.e. @KafkaListener) and the retries are just utilizing callbackRetryAuxKafkaListenerContainerFactory's (i.e. @RetryableTopic) consumer for single message consumption. Is this the way the retry works?

@tomazfernandes
Copy link
Contributor Author

Hmm, well, then you have two listeners per topic, hence two instances of the same message in you collection... That’s the expected behavior, right?

I didn’t understand what exactly you’re trying to achieve with this pattern: a single @KafkaListener method with @RetryableTopic should suffice.

Also, unless that’s somehow a requirement, you don’t need to handle forwarding to the next topic manually in the DLT method, instead you should let the exception go all the way back to the listener (outside of any try/catch) and the framework will handle message forwarding for you.

Makes sense?

@garyrussell
Copy link
Contributor

@Duncol Perhaps you misunderstood @tomazfernandes when we had that bug (needing two factories). This is what he meant...

@SpringBootApplication
public class Kgh920Application {


	private static final Logger LOG = LoggerFactory.getLogger(Kgh920Application.class);


	public static void main(String[] args) {
		SpringApplication.run(Kgh920Application.class, args);
	}

	@RetryableTopic(attempts = "5", backoff = @Backoff(delay = 1000, multiplier = 2.0),
			listenerContainerFactory = "retryFactory")
	@KafkaListener(id = "kgh920", topics = "kgh920")
	public void listen(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
		LOG.info(in + " from " + topic);
		throw new RuntimeException("test");
	}

	@DltHandler
	public void dlt(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
		LOG.info(in + " from " + topic);
	}

	@Bean
	ConcurrentKafkaListenerContainerFactory<?, ?> retryFactory(
			ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
			ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory,
			KafkaProperties kafkaProps) {

		ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
		configurer.configure(factory, kafkaConsumerFactory
				.getIfAvailable(() -> new DefaultKafkaConsumerFactory<>(kafkaProps.buildConsumerProperties())));
		return factory;
	}

}

i.e. specify a different factory on the retry annotation.

Using a different factory is no longer needed now that the bug has been fixed, but it you still want to do it, it needs to go on the retry annotation.

@Duncol
Copy link

Duncol commented Apr 17, 2021

Hmm, well, then you have two listeners per topic, hence two instances of the same message in you collection... That’s the expected behavior, right?

I didn’t understand what exactly you’re trying to achieve with this pattern: a single @KafkaListener method with @RetryableTopic should suffice.

Also, unless that’s somehow a requirement, you don’t need to handle forwarding to the next topic manually in the DLT method, instead you should let the exception go all the way back to the listener (outside of any try/catch) and the framework will handle message forwarding for you.

Makes sense?

Given the fact, that there is a '-retry' topic created (single topic strategy) for the retry I assumed that the listener for the initial topic (without the '-retry' suffix) is somehow consuming messages from the '-retry' topics, thus - duplicate msg. I'll check it with the fixes, thanks a lot again, much appreciate work you are doing!

The manual forward to the next topic is due to different backoff/attempt requirement. Can it be reconfigured on the same retry somehow, as a 'second tier approach' maybe?

@tomazfernandes
Copy link
Contributor Author

Hi @Duncol, sorry, I totally missed this message, was cleaning up the inbox and found it now.

How is the feature working for you, is it behaving as expected?

Can you share more details on your retrial requirements, such as number of attempts, delays, etc? Maybe we can work something out to include this second tier.

Thanks and sorry again for taking this long to reply.

@Duncol
Copy link

Duncol commented May 11, 2021

@tomazfernandes No worries, in simpliest words - we want to intizilize retry on a particular issue (exception). The retry should happen every 1m for the first 30m and then switch to 'every 1h' mode, for the next 48h. After that, it should land in the DLT. It works nearly as expected with my current solution (incorporating DLT with simple 'if statement') for the interval switch (next topic), but one thing, that is not working right is the fact, that the first retry happens immediately when message reaches each topic (i.e. backoff seems to be applied only in between retries). I assume it's due to the fact, that I am sending new msg to the separate topic with @RetryableTopic. I've done it this way to decouple the logic a bit + have entirely different msg model for passing some metadata. Do you see maybe some solutions for this case?

Thanks a lot for this feature and your persistent help again!

@tomazfernandes
Copy link
Contributor Author

tomazfernandes commented May 11, 2021

Hi @Duncol, I'm glad the feature is working out for you! This feature is not prepared to handle 2 tiers of delays, but it can handle delaying the message arrival in the first topic of the second tier very easily.

All consumers within the Retry Topic feature are wrapped in a KafkaBackoffAwareMessageListenerAdapter, which looks for a back off timestamp header and backs off until that timestamp if it's in the future. So pretty much all you have to do is add the back off timestamp header to the message you send to the other topic and it will back off until the time you specify on the header.

It should be something like this:
message.headers().add(RetryTopicHeaders.DEFAULT_HEADER_BACKOFF_TIMESTAMP, timeMessageShouldBeConsumed);

Where timeMessageShouldBeConsumed should be a byte[] with the milliseconds from epoch when the message should be consumed (such as System.currentTimeMillis() + Duration.of(1, ChronoUnit.HOURS).toMillis()). The timestamp header can be customised but it adds extra complexity, I can give you more details if that's something that interests you.

Let me know if that works out for you!

And thanks again for the feedback!

@Duncol
Copy link

Duncol commented Jun 2, 2021

Hi @tomazfernandes, sorry for the lack of response, but I was quite occupied with some other tasks. I'll try to implement your suggestion (which looks quite nice and seems a nearly golden bullet for our case - and any further granular adjustments that we might need regarding changing the back-off) and let you know ASAP. One other thing I've stumbled across which might be interesting is that it seems that @RetryableTopic creates additional consumer with the same clientId. This causes the app to throw an exception (logged as WARN) regarding initializing MBean ('Already Exists'):

javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=retry-30m-0
	at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436) ~[?:?]
	at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855) ~[?:?]
	at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955) ~[?:?]
	at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890) ~[?:?]
	at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320) ~[?:?]
	at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) ~[?:?]
	at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64) ~[kafka-clients-2.6.0.jar:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:814) ~[kafka-clients-2.6.0.jar:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:632) ~[kafka-clients-2.6.0.jar:?]
	at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createRawConsumer(DefaultKafkaConsumerFactory.java:366) ~[spring-kafka-2.7.0-M2.jar:2.7.0-M2]
	at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:334) ~[spring-kafka-2.7.0-M2.jar:2.7.0-M2]
	at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:310) ~[spring-kafka-2.7.0-M2.jar:2.7.0-M2]
	at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:277) ~[spring-kafka-2.7.0-M2.jar:2.7.0-M2]
	at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:254) ~[spring-kafka-2.7.0-M2.jar:2.7.0-M2]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:699) ~[spring-kafka-2.7.0-M2.jar:2.7.0-M2]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:317) ~[spring-kafka-2.7.0-M2.jar:2.7.0-M2]
	at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:384) ~[spring-kafka-2.7.0-M2.jar:2.7.0-M2]
	at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:206) ~[spring-kafka-2.7.0-M2.jar:2.7.0-M2]
	at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:384) ~[spring-kafka-2.7.0-M2.jar:2.7.0-M2]
	at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:312) ~[spring-kafka-2.7.0-M2.jar:2.7.0-M2]
	at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:257) ~[spring-kafka-2.7.0-M2.jar:2.7.0-M2]
	at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) ~[spring-context-5.3.3.jar:5.3.3]
	at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:54) ~[spring-context-5.3.3.jar:5.3.3]
	at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356) ~[spring-context-5.3.3.jar:5.3.3]
	at java.lang.Iterable.forEach(Iterable.java:75) ~[?:?]
	at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155) ~[spring-context-5.3.3.jar:5.3.3]
	at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123) ~[spring-context-5.3.3.jar:5.3.3]
	at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:940) ~[spring-context-5.3.3.jar:5.3.3]
	at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:591) ~[spring-context-5.3.3.jar:5.3.3]
	at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:767) ~[spring-boot-2.4.2.jar:2.4.2]
	at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:759) ~[spring-boot-2.4.2.jar:2.4.2]
	at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:426) ~[spring-boot-2.4.2.jar:2.4.2]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:326) ~[spring-boot-2.4.2.jar:2.4.2]
	at org.springframework.boot.test.context.SpringBootContextLoader.loadContext(SpringBootContextLoader.java:123) ~[spring-boot-test-2.4.2.jar:2.4.2]
	at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContextInternal(DefaultCacheAwareContextLoaderDelegate.java:99) ~[spring-test-5.3.3.jar:5.3.3]
	at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:124) ~[spring-test-5.3.3.jar:5.3.3]
	at org.springframework.test.context.support.DefaultTestContext.getApplicationContext(DefaultTestContext.java:124) ~[spring-test-5.3.3.jar:5.3.3]
	at org.springframework.test.context.web.ServletTestExecutionListener.setUpRequestContextIfNecessary(ServletTestExecutionListener.java:190) ~[spring-test-5.3.3.jar:5.3.3]
	at org.springframework.test.context.web.ServletTestExecutionListener.prepareTestInstance(ServletTestExecutionListener.java:132) ~[spring-test-5.3.3.jar:5.3.3]
	at org.springframework.test.context.TestContextManager.prepareTestInstance(TestContextManager.java:244) ~[spring-test-5.3.3.jar:5.3.3]
	at org.springframework.test.context.junit.jupiter.SpringExtension.postProcessTestInstance(SpringExtension.java:138) ~[spring-test-5.3.3.jar:5.3.3]
	at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$invokeTestInstancePostProcessors$6(ClassBasedTestDescriptor.java:350) ~[junit-jupiter-engine-5.7.0.jar:5.7.0]
	at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.executeAndMaskThrowable(ClassBasedTestDescriptor.java:355) ~[junit-jupiter-engine-5.7.0.jar:5.7.0]
	at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$invokeTestInstancePostProcessors$7(ClassBasedTestDescriptor.java:350) ~[junit-jupiter-engine-5.7.0.jar:5.7.0]
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) ~[?:?]
	at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177) ~[?:?]
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625) ~[?:?]
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) ~[?:?]
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) ~[?:?]
	at java.util.stream.StreamSpliterators$WrappingSpliterator.forEachRemaining(StreamSpliterators.java:312) ~[?:?]
	at java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:735) ~[?:?]
	at java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:734) ~[?:?]
	at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658) ~[?:?]
	at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.invokeTestInstancePostProcessors(ClassBasedTestDescriptor.java:349) ~[junit-jupiter-engine-5.7.0.jar:5.7.0]
	at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$instantiateAndPostProcessTestInstance$4(ClassBasedTestDescriptor.java:270) ~[junit-jupiter-engine-5.7.0.jar:5.7.0]
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.7.0.jar:1.7.0]
	at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.instantiateAndPostProcessTestInstance(ClassBasedTestDescriptor.java:269) ~[junit-jupiter-engine-5.7.0.jar:5.7.0]
	at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$testInstancesProvider$2(ClassBasedTestDescriptor.java:259) ~[junit-jupiter-engine-5.7.0.jar:5.7.0]
	at java.util.Optional.orElseGet(Optional.java:362) ~[?:?]
	at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$testInstancesProvider$3(ClassBasedTestDescriptor.java:258) ~[junit-jupiter-engine-5.7.0.jar:5.7.0]
	at org.junit.jupiter.engine.execution.TestInstancesProvider.getTestInstances(TestInstancesProvider.java:31) ~[junit-jupiter-engine-5.7.0.jar:5.7.0]
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$prepare$0(TestMethodTestDescriptor.java:101) ~[junit-jupiter-engine-5.7.0.jar:5.7.0]
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.7.0.jar:1.7.0]
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.prepare(TestMethodTestDescriptor.java:100) ~[junit-jupiter-engine-5.7.0.jar:5.7.0]
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.prepare(TestMethodTestDescriptor.java:65) ~[junit-jupiter-engine-5.7.0.jar:5.7.0]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$prepare$1(NodeTestTask.java:111) ~[junit-platform-engine-1.7.0.jar:1.7.0]
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.7.0.jar:1.7.0]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.prepare(NodeTestTask.java:111) ~[junit-platform-engine-1.7.0.jar:1.7.0]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:79) ~[junit-platform-engine-1.7.0.jar:1.7.0]
	at java.util.ArrayList.forEach(ArrayList.java:1511) ~[?:?]
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38) ~[junit-platform-engine-1.7.0.jar:1.7.0]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143) ~[junit-platform-engine-1.7.0.jar:1.7.0]
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.7.0.jar:1.7.0]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129) ~[junit-platform-engine-1.7.0.jar:1.7.0]
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) ~[junit-platform-engine-1.7.0.jar:1.7.0]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127) ~[junit-platform-engine-1.7.0.jar:1.7.0]
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.7.0.jar:1.7.0]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126) ~[junit-platform-engine-1.7.0.jar:1.7.0]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84) ~[junit-platform-engine-1.7.0.jar:1.7.0]
	at java.util.ArrayList.forEach(ArrayList.java:1511) ~[?:?]
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38) ~[junit-platform-engine-1.7.0.jar:1.7.0]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143) ~[junit-platform-engine-1.7.0.jar:1.7.0]
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.7.0.jar:1.7.0]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129) ~[junit-platform-engine-1.7.0.jar:1.7.0]
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) ~[junit-platform-engine-1.7.0.jar:1.7.0]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127) ~[junit-platform-engine-1.7.0.jar:1.7.0]
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) [junit-platform-engine-1.7.0.jar:1.7.0]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126) [junit-platform-engine-1.7.0.jar:1.7.0]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84) [junit-platform-engine-1.7.0.jar:1.7.0]
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32) [junit-platform-engine-1.7.0.jar:1.7.0]
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57) [junit-platform-engine-1.7.0.jar:1.7.0]
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51) [junit-platform-engine-1.7.0.jar:1.7.0]
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:170) [junit-platform-launcher-1.2.0.jar:1.2.0]
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:154) [junit-platform-launcher-1.2.0.jar:1.2.0]
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:90) [junit-platform-launcher-1.2.0.jar:1.2.0]
	at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invokeAllTests(JUnitPlatformProvider.java:142) [surefire-junit-platform-2.22.0.jar:2.22.0]
	at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invoke(JUnitPlatformProvider.java:117) [surefire-junit-platform-2.22.0.jar:2.22.0]
	at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:383) [surefire-booter-2.22.0.jar:2.22.0]
	at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:344) [surefire-booter-2.22.0.jar:2.22.0]
	at org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:125) [surefire-booter-2.22.0.jar:2.22.0]
	at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:417) [surefire-booter-2.22.0.jar:2.22.0]

I've tried separate listenerContainerFactory for the @RetryableTopic with explicit clientId, different from the one I set for particular @KafkaListener, but the one from @KafkaListener takes precedence. When I comment out the @retryable topic, those additional consumers are not being registered and problem disappears.

Have you encounter something similar maybe? Those WARNs do not break anything, but as I've read, it may cut off some metrics and pollutes the logfile a bit :)

@tomazfernandes
Copy link
Contributor Author

Hi @Duncol, thanks for bringing this up!

I'm having trouble reproducing your issue. When we specify a clientIdPrefix in the @KafkaListener annotation, the prefix gets suffixed by the topic's suffix (e.g. retry-250). And when we don't, Kafka's ConsumerConfig class has a monotonically increasing number that it appends to the consumer's id so that they're unique at least within the same app instance (line 576). In my tests all consumers end up with different client ids.

Which Spring Kafka version are you using? Can you share more details on how we can reproduce the issue?

Maybe @garyrussell has something to add?

Thanks again for your input!

@garyrussell
Copy link
Contributor

I can't reproduce it either; when I enable info logging for sample-04 I get these client ids when I add clientIdPrefix = "test" to the listener:

client.id = test-retry-10000-0
client.id = test-retry-4000-0
client.id = test-0
client.id = test-retry-2000-0
client.id = test-dlt-0
client.id = test-retry-8000-0

The exception seems to indicate you have multiple listeners with the same clientIdPrefix.

If you can put together a minimal example that exhibits the behavior, we can take a look.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants