Skip to content

Commit

Permalink
GH-2069: Same factory for retryable and normal endpoints (#2108)
Browse files Browse the repository at this point in the history
* GH-2069: Same factory for retryable and normal endpoints

Resolves #2069

Originally the retryable topic's ListenerContainerFactory configuration would interfere with non-retryable topics.

Now we do not set the feature's factory configurations if the container is not retryable.

* Add and update javadoc as requested in code review
  • Loading branch information
tomazfernandes authored and garyrussell committed Feb 18, 2022
1 parent 2cb2adf commit a110259
Show file tree
Hide file tree
Showing 7 changed files with 651 additions and 19 deletions.
23 changes: 21 additions & 2 deletions spring-kafka-docs/src/main/asciidoc/retrytopic.adoc
Expand Up @@ -668,8 +668,6 @@ public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> templ

By default the RetryTopic configuration will use the provided factory from the `@KafkaListener` annotation, but you can specify a different one to be used to create the retry topic and dlt listener containers.

IMPORTANT: The provided factory will be configured for the retry topic functionality, so you should not use the same factory for both retrying and non-retrying topics. You can however share the same factory between many retry topic configurations.

For the `@RetryableTopic` annotation you can provide the factory's bean name, and using the `RetryTopicConfiguration` bean you can either provide the bean name or the instance itself.

====
Expand Down Expand Up @@ -703,6 +701,27 @@ public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo>
----
====


IMPORTANT: Since 2.8.3 you can use the same factory for retryable and non-retryable topics.

If you need to revert the factory configuration behavior to prior 2.8.3, you can replace the standard `RetryTopicConfigurer` bean and set `useLegacyFactoryConfigurer` to `true`, such as:

====
[source, java]
----
@Bean(name = RetryTopicInternalBeanNames.RETRY_TOPIC_CONFIGURER)
public RetryTopicConfigurer retryTopicConfigurer(DestinationTopicProcessor destinationTopicProcessor,
ListenerContainerFactoryResolver containerFactoryResolver,
ListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer,
BeanFactory beanFactory,
RetryTopicNamesProviderFactory retryTopicNamesProviderFactory) {
RetryTopicConfigurer retryTopicConfigurer = new RetryTopicConfigurer(destinationTopicProcessor, containerFactoryResolver, listenerContainerFactoryConfigurer, beanFactory, retryTopicNamesProviderFactory);
retryTopicConfigurer.useLegacyFactoryConfigurer(true);
return retryTopicConfigurer;
}
----
[[change-kboe-logging-level]]
==== Changing KafkaBackOffException Logging Level
Expand Down
Expand Up @@ -23,13 +23,16 @@
import java.util.List;
import java.util.Set;
import java.util.function.Consumer;
import java.util.regex.Pattern;

import org.apache.commons.logging.LogFactory;

import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpoint;
import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
Expand All @@ -38,18 +41,23 @@
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.util.Assert;
import org.springframework.util.backoff.FixedBackOff;

/**
*
* Configures the provided {@link ConcurrentKafkaListenerContainerFactory} with a
* {@link DefaultErrorHandler}, the {@link DeadLetterPublishingRecoverer} created by
* the {@link DeadLetterPublishingRecovererFactory}.
* Decorates the provided {@link ConcurrentKafkaListenerContainerFactory} to add a
* {@link DefaultErrorHandler} and the {@link DeadLetterPublishingRecoverer}
* created by the {@link DeadLetterPublishingRecovererFactory}.
*
* Mind that the same factory can be used by many different
* {@link org.springframework.kafka.annotation.RetryableTopic}s but should not be shared
* with non retryable topics as some of their configurations will be overriden.
* Also sets {@link ContainerProperties#setIdlePartitionEventInterval(Long)}
* and {@link ContainerProperties#setPollTimeout(long)} if its defaults haven't
* been overridden by the user.
*
* Since 2.8.3 these configurations don't interfere with the provided factory
* instance itself, so the same factory instance can be shared among retryable and
* non-retryable endpoints.
*
* @author Tomaz Fernandes
* @since 2.7
Expand Down Expand Up @@ -95,20 +103,62 @@ public class ListenerContainerFactoryConfigurer {
this.clock = clock;
}

/**
* Configures the provided {@link ConcurrentKafkaListenerContainerFactory}.
* @param containerFactory the factory instance to be configured.
* @param configuration the configuration provided by the {@link RetryTopicConfiguration}.
* @return the configured factory instance.
* @deprecated in favor of
* {@link #decorateFactory(ConcurrentKafkaListenerContainerFactory, Configuration)}.
*/
@Deprecated
public ConcurrentKafkaListenerContainerFactory<?, ?> configure(
ConcurrentKafkaListenerContainerFactory<?, ?> containerFactory, Configuration configuration) {
return isCached(containerFactory)
? containerFactory
: addToCache(doConfigure(containerFactory, configuration.backOffValues));
}

/**
* Configures the provided {@link ConcurrentKafkaListenerContainerFactory}.
* Meant to be used for the main endpoint, this method ignores the provided backOff values.
* @param containerFactory the factory instance to be configured.
* @param configuration the configuration provided by the {@link RetryTopicConfiguration}.
* @return the configured factory instance.
* @deprecated in favor of
* {@link #decorateFactoryWithoutBackOffValues(ConcurrentKafkaListenerContainerFactory, Configuration)}.
*/
@Deprecated
public ConcurrentKafkaListenerContainerFactory<?, ?> configureWithoutBackOffValues(
ConcurrentKafkaListenerContainerFactory<?, ?> containerFactory, Configuration configuration) {
return isCached(containerFactory)
? containerFactory
: doConfigure(containerFactory, Collections.emptyList());
}

/**
* Decorates the provided {@link ConcurrentKafkaListenerContainerFactory}.
* @param factory the factory instance to be decorated.
* @param configuration the configuration provided by the {@link RetryTopicConfiguration}.
* @return the decorated factory instance.
*/
public KafkaListenerContainerFactory<?> decorateFactory(ConcurrentKafkaListenerContainerFactory<?, ?> factory,
Configuration configuration) {
return new RetryTopicListenerContainerFactoryDecorator(factory, configuration.backOffValues);
}

/**
* Decorates the provided {@link ConcurrentKafkaListenerContainerFactory}.
* Meant to be used for the main endpoint, this method ignores the provided backOff values.
* @param factory the factory instance to be decorated.
* @param configuration the configuration provided by the {@link RetryTopicConfiguration}.
* @return the decorated factory instance.
*/
public KafkaListenerContainerFactory<?> decorateFactoryWithoutBackOffValues(
ConcurrentKafkaListenerContainerFactory<?, ?> factory, Configuration configuration) {
return new RetryTopicListenerContainerFactoryDecorator(factory, Collections.emptyList());
}

private ConcurrentKafkaListenerContainerFactory<?, ?> doConfigure(
ConcurrentKafkaListenerContainerFactory<?, ?> containerFactory, List<Long> backOffValues) {

Expand Down Expand Up @@ -217,6 +267,45 @@ private <T> T checkAndCast(Object obj, Class<T> clazz) {
return (T) obj;
}

private class RetryTopicListenerContainerFactoryDecorator implements KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<?, ?>> {

private final ConcurrentKafkaListenerContainerFactory<?, ?> delegate;
private final List<Long> backOffValues;

RetryTopicListenerContainerFactoryDecorator(ConcurrentKafkaListenerContainerFactory<?, ?> delegate, List<Long> backOffValues) {
this.delegate = delegate;
this.backOffValues = backOffValues;
}

@Override
public ConcurrentMessageListenerContainer<?, ?> createListenerContainer(KafkaListenerEndpoint endpoint) {
return decorate(this.delegate.createListenerContainer(endpoint));
}

private ConcurrentMessageListenerContainer<?, ?> decorate(ConcurrentMessageListenerContainer<?, ?> listenerContainer) {
setupBackoffAwareMessageListenerAdapter(listenerContainer, this.backOffValues);
listenerContainer
.setCommonErrorHandler(createErrorHandler(
ListenerContainerFactoryConfigurer.this.deadLetterPublishingRecovererFactory.create()));
return listenerContainer;
}

@Override
public ConcurrentMessageListenerContainer<?, ?> createContainer(TopicPartitionOffset... topicPartitions) {
return decorate(this.delegate.createContainer(topicPartitions));
}

@Override
public ConcurrentMessageListenerContainer<?, ?> createContainer(String... topics) {
return decorate(this.delegate.createContainer(topics));
}

@Override
public ConcurrentMessageListenerContainer<?, ?> createContainer(Pattern topicPattern) {
return decorate(this.delegate.createContainer(topicPattern));
}
}

static class Configuration {

private final List<Long> backOffValues;
Expand Down
Expand Up @@ -220,6 +220,8 @@ public class RetryTopicConfigurer {

private final RetryTopicNamesProviderFactory retryTopicNamesProviderFactory;

private boolean useLegacyFactoryConfigurer = false;

@Deprecated
public RetryTopicConfigurer(DestinationTopicProcessor destinationTopicProcessor,
ListenerContainerFactoryResolver containerFactoryResolver,
Expand All @@ -229,6 +231,14 @@ public RetryTopicConfigurer(DestinationTopicProcessor destinationTopicProcessor,
this(destinationTopicProcessor, containerFactoryResolver, listenerContainerFactoryConfigurer, beanFactory, new SuffixingRetryTopicNamesProviderFactory());
}

/**
* Create an instance with the provided properties.
* @param destinationTopicProcessor the destination topic processor.
* @param containerFactoryResolver the container factory resolver.
* @param listenerContainerFactoryConfigurer the container factory configurer.
* @param beanFactory the bean factory.
* @param retryTopicNamesProviderFactory the retry topic names factory.
*/
@Autowired
public RetryTopicConfigurer(DestinationTopicProcessor destinationTopicProcessor,
ListenerContainerFactoryResolver containerFactoryResolver,
Expand Down Expand Up @@ -298,7 +308,7 @@ private void processAndRegisterEndpoint(MethodKafkaListenerEndpoint<?, ?> mainEn
RetryTopicConfiguration configuration, DestinationTopicProcessor.Context context,
DestinationTopic.Properties destinationTopicProperties) {

ConcurrentKafkaListenerContainerFactory<?, ?> resolvedFactory =
KafkaListenerContainerFactory<?> resolvedFactory =
destinationTopicProperties.isMainEndpoint()
? resolveAndConfigureFactoryForMainEndpoint(factory, defaultFactoryBeanName, configuration)
: resolveAndConfigureFactoryForRetryEndpoint(factory, defaultFactoryBeanName, configuration);
Expand Down Expand Up @@ -361,25 +371,32 @@ private EndpointHandlerMethod getDltEndpointHandlerMethodOrDefault(EndpointHandl
return dltEndpointHandlerMethod != null ? dltEndpointHandlerMethod : DEFAULT_DLT_HANDLER;
}

private ConcurrentKafkaListenerContainerFactory<?, ?> resolveAndConfigureFactoryForMainEndpoint(
private KafkaListenerContainerFactory<?> resolveAndConfigureFactoryForMainEndpoint(
KafkaListenerContainerFactory<?> providedFactory,
String defaultFactoryBeanName, RetryTopicConfiguration configuration) {
ConcurrentKafkaListenerContainerFactory<?, ?> resolvedFactory = this.containerFactoryResolver
.resolveFactoryForMainEndpoint(providedFactory, defaultFactoryBeanName,
configuration.forContainerFactoryResolver());
return this.listenerContainerFactoryConfigurer
.configureWithoutBackOffValues(resolvedFactory, configuration.forContainerFactoryConfigurer());

return this.useLegacyFactoryConfigurer
? this.listenerContainerFactoryConfigurer
.configureWithoutBackOffValues(resolvedFactory, configuration.forContainerFactoryConfigurer())
: this.listenerContainerFactoryConfigurer
.decorateFactoryWithoutBackOffValues(resolvedFactory, configuration.forContainerFactoryConfigurer());
}

private ConcurrentKafkaListenerContainerFactory<?, ?> resolveAndConfigureFactoryForRetryEndpoint(
private KafkaListenerContainerFactory<?> resolveAndConfigureFactoryForRetryEndpoint(
KafkaListenerContainerFactory<?> providedFactory,
String defaultFactoryBeanName,
RetryTopicConfiguration configuration) {
ConcurrentKafkaListenerContainerFactory<?, ?> resolvedFactory =
this.containerFactoryResolver.resolveFactoryForRetryEndpoint(providedFactory, defaultFactoryBeanName,
configuration.forContainerFactoryResolver());
return this.listenerContainerFactoryConfigurer
.configure(resolvedFactory, configuration.forContainerFactoryConfigurer());
return this.useLegacyFactoryConfigurer
? this.listenerContainerFactoryConfigurer.configure(resolvedFactory,
configuration.forContainerFactoryConfigurer())
: this.listenerContainerFactoryConfigurer
.decorateFactory(resolvedFactory, configuration.forContainerFactoryConfigurer());
}

private void throwIfMultiMethodEndpoint(MethodKafkaListenerEndpoint<?, ?> mainEndpoint) {
Expand All @@ -396,6 +413,17 @@ public static EndpointHandlerMethod createHandlerMethodWith(Object bean, Method
return new EndpointHandlerMethod(bean, method);
}

/**
* Set to true if you want the {@link ListenerContainerFactoryConfigurer} to
* behave as before 2.8.3.
* @param useLegacyFactoryConfigurer Whether to use the legacy factory configuration.
* @deprecated for removal after the deprecated legacy configuration methods are removed.
*/
@Deprecated
public void useLegacyFactoryConfigurer(boolean useLegacyFactoryConfigurer) {
this.useLegacyFactoryConfigurer = useLegacyFactoryConfigurer;
}

public interface EndpointProcessor extends Consumer<MethodKafkaListenerEndpoint<?, ?>> {

default void process(MethodKafkaListenerEndpoint<?, ?> listenerEndpoint) {
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2021 the original author or authors.
* Copyright 2018-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,7 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.then;
import static org.mockito.BDDMockito.willReturn;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;

Expand All @@ -46,6 +47,8 @@

import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.ContainerCustomizer;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpoint;
import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
Expand Down Expand Up @@ -132,6 +135,9 @@ class ListenerContainerFactoryConfigurerTests {
@Mock
private RetryTopicConfiguration configuration;

@Mock
private KafkaListenerEndpoint endpoint;

private final long backOffValue = 2000L;

private final ListenerContainerFactoryConfigurer.Configuration lcfcConfiguration =
Expand Down Expand Up @@ -360,6 +366,44 @@ void shouldSetupMessageListenerAdapter() {
then(this.configurerContainerCustomizer).should(times(1)).accept(container);
}

@Test
void shouldDecorateFactory() {

// given
given(container.getContainerProperties()).willReturn(containerProperties);
given(deadLetterPublishingRecovererFactory.create()).willReturn(recoverer);
given(containerProperties.getMessageListener()).willReturn(listener);
RecordHeaders headers = new RecordHeaders();
headers.add(RetryTopicHeaders.DEFAULT_HEADER_BACKOFF_TIMESTAMP, originalTimestampBytes);
given(data.headers()).willReturn(headers);
String testListenerId = "testListenerId";
given(container.getListenerId()).willReturn(testListenerId);
given(configuration.forContainerFactoryConfigurer()).willReturn(lcfcConfiguration);
willReturn(container).given(containerFactory).createListenerContainer(endpoint);

// when
ListenerContainerFactoryConfigurer configurer =
new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager,
deadLetterPublishingRecovererFactory, clock);
configurer.setContainerCustomizer(configurerContainerCustomizer);
KafkaListenerContainerFactory<?> factory = configurer
.decorateFactory(containerFactory, configuration.forContainerFactoryConfigurer());
factory.createListenerContainer(endpoint);

// then
then(container).should(times(1)).setupMessageListener(listenerAdapterCaptor.capture());
KafkaBackoffAwareMessageListenerAdapter<?, ?> listenerAdapter =
(KafkaBackoffAwareMessageListenerAdapter<?, ?>) listenerAdapterCaptor.getValue();
listenerAdapter.onMessage(data, ack, consumer);

then(this.kafkaConsumerBackoffManager).should(times(1))
.createContext(anyLong(), listenerIdCaptor.capture(), any(TopicPartition.class), eq(consumer));
assertThat(listenerIdCaptor.getValue()).isEqualTo(testListenerId);
then(listener).should(times(1)).onMessage(data, ack, consumer);

then(this.configurerContainerCustomizer).should(times(1)).accept(container);
}

@Test
void shouldCacheFactoryInstances() {

Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2021 the original author or authors.
* Copyright 2018-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -224,9 +224,9 @@ void shouldConfigureRetryEndpoints() {

willReturn(containerFactory).given(containerFactoryResolver).resolveFactoryForRetryEndpoint(containerFactory,
defaultFactoryBeanName, factoryResolverConfig);
willReturn(containerFactory).given(this.listenerContainerFactoryConfigurer).configure(containerFactory,
willReturn(containerFactory).given(this.listenerContainerFactoryConfigurer).decorateFactory(containerFactory,
lcfcConfiguration);
willReturn(containerFactory).given(this.listenerContainerFactoryConfigurer).configureWithoutBackOffValues(containerFactory,
willReturn(containerFactory).given(this.listenerContainerFactoryConfigurer).decorateFactoryWithoutBackOffValues(containerFactory,
lcfcConfiguration);

RetryTopicConfigurer configurer = new RetryTopicConfigurer(destinationTopicProcessor, containerFactoryResolver,
Expand Down

0 comments on commit a110259

Please sign in to comment.