Skip to content

Commit

Permalink
GH-2117: Make ListenerContainerFactoryConfigurer ctor public
Browse files Browse the repository at this point in the history
Resolves GH-2117

Make many inner methods protected

Review protected methods' signatures to make maintenance easier in the future
  • Loading branch information
tomazfernandes authored and garyrussell committed Feb 21, 2022
1 parent 6f63a57 commit 3c9e00f
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.springframework.kafka.retrytopic;

import java.time.Clock;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -94,7 +93,7 @@ public class ListenerContainerFactoryConfigurer {

private final Clock clock;

ListenerContainerFactoryConfigurer(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
public ListenerContainerFactoryConfigurer(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
@Qualifier(RetryTopicInternalBeanNames
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
Expand All @@ -116,7 +115,7 @@ public class ListenerContainerFactoryConfigurer {
ConcurrentKafkaListenerContainerFactory<?, ?> containerFactory, Configuration configuration) {
return isCached(containerFactory)
? containerFactory
: addToCache(doConfigure(containerFactory, configuration.backOffValues));
: addToCache(doConfigure(containerFactory, configuration, true));
}

/**
Expand All @@ -126,14 +125,14 @@ public class ListenerContainerFactoryConfigurer {
* @param configuration the configuration provided by the {@link RetryTopicConfiguration}.
* @return the configured factory instance.
* @deprecated in favor of
* {@link #decorateFactoryWithoutBackOffValues(ConcurrentKafkaListenerContainerFactory, Configuration)}.
* {@link #decorateFactoryWithoutSettingContainerProperties(ConcurrentKafkaListenerContainerFactory, Configuration)}.
*/
@Deprecated
public ConcurrentKafkaListenerContainerFactory<?, ?> configureWithoutBackOffValues(
ConcurrentKafkaListenerContainerFactory<?, ?> containerFactory, Configuration configuration) {
return isCached(containerFactory)
? containerFactory
: doConfigure(containerFactory, Collections.emptyList());
: doConfigure(containerFactory, configuration, false);
}

/**
Expand All @@ -144,7 +143,7 @@ public class ListenerContainerFactoryConfigurer {
*/
public KafkaListenerContainerFactory<?> decorateFactory(ConcurrentKafkaListenerContainerFactory<?, ?> factory,
Configuration configuration) {
return new RetryTopicListenerContainerFactoryDecorator(factory, configuration.backOffValues);
return new RetryTopicListenerContainerFactoryDecorator(factory, configuration, true);
}

/**
Expand All @@ -154,18 +153,19 @@ public KafkaListenerContainerFactory<?> decorateFactory(ConcurrentKafkaListenerC
* @param configuration the configuration provided by the {@link RetryTopicConfiguration}.
* @return the decorated factory instance.
*/
public KafkaListenerContainerFactory<?> decorateFactoryWithoutBackOffValues(
public KafkaListenerContainerFactory<?> decorateFactoryWithoutSettingContainerProperties(
ConcurrentKafkaListenerContainerFactory<?, ?> factory, Configuration configuration) {
return new RetryTopicListenerContainerFactoryDecorator(factory, Collections.emptyList());
return new RetryTopicListenerContainerFactoryDecorator(factory, configuration, false);
}

private ConcurrentKafkaListenerContainerFactory<?, ?> doConfigure(
ConcurrentKafkaListenerContainerFactory<?, ?> containerFactory, List<Long> backOffValues) {
ConcurrentKafkaListenerContainerFactory<?, ?> containerFactory, Configuration configuration,
boolean isSetContainerProperties) {

containerFactory
.setContainerCustomizer(container -> setupBackoffAwareMessageListenerAdapter(container, backOffValues));
.setContainerCustomizer(container -> setupBackoffAwareMessageListenerAdapter(container, configuration, isSetContainerProperties));
containerFactory
.setCommonErrorHandler(createErrorHandler(this.deadLetterPublishingRecovererFactory.create()));
.setCommonErrorHandler(createErrorHandler(this.deadLetterPublishingRecovererFactory.create(), configuration));
return containerFactory;
}

Expand All @@ -191,7 +191,8 @@ public void setErrorHandlerCustomizer(Consumer<CommonErrorHandler> errorHandlerC
this.errorHandlerCustomizer = errorHandlerCustomizer;
}

private CommonErrorHandler createErrorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
protected CommonErrorHandler createErrorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer,
Configuration configuration) {
DefaultErrorHandler errorHandler = new DefaultErrorHandler(deadLetterPublishingRecoverer,
new FixedBackOff(0, 0));
errorHandler.setCommitRecovered(true);
Expand All @@ -200,52 +201,52 @@ private CommonErrorHandler createErrorHandler(DeadLetterPublishingRecoverer dead
return errorHandler;
}

private void setupBackoffAwareMessageListenerAdapter(ConcurrentMessageListenerContainer<?, ?> container,
List<Long> backOffValues) {
protected void setupBackoffAwareMessageListenerAdapter(ConcurrentMessageListenerContainer<?, ?> container,
Configuration configuration, boolean isSetContainerProperties) {
AcknowledgingConsumerAwareMessageListener<?, ?> listener = checkAndCast(container.getContainerProperties()
.getMessageListener(), AcknowledgingConsumerAwareMessageListener.class);

configurePollTimeoutAndIdlePartitionInterval(container, backOffValues);
if (isSetContainerProperties && !configuration.backOffValues.isEmpty()) {
configurePollTimeoutAndIdlePartitionInterval(container, configuration);
}

container.setupMessageListener(new KafkaBackoffAwareMessageListenerAdapter<>(listener,
this.kafkaConsumerBackoffManager, container.getListenerId(), this.clock)); // NOSONAR

this.containerCustomizer.accept(container);
}

private void configurePollTimeoutAndIdlePartitionInterval(ConcurrentMessageListenerContainer<?, ?> container,
List<Long> backOffValues) {
if (backOffValues.isEmpty()) {
return;
}
protected void configurePollTimeoutAndIdlePartitionInterval(ConcurrentMessageListenerContainer<?, ?> container,
Configuration configuration) {

ContainerProperties containerProperties = container.getContainerProperties();

long pollTimeoutValue = getPollTimeoutValue(containerProperties, backOffValues);
long pollTimeoutValue = getPollTimeoutValue(containerProperties, configuration);
long idlePartitionEventInterval = getIdlePartitionInterval(containerProperties, pollTimeoutValue);

LOGGER.debug(() -> "pollTimeout and idlePartitionEventInterval for back off values "
+ backOffValues + " will be set to " + pollTimeoutValue
+ configuration.backOffValues + " will be set to " + pollTimeoutValue
+ " and " + idlePartitionEventInterval);

containerProperties
.setIdlePartitionEventInterval(idlePartitionEventInterval);
containerProperties.setPollTimeout(pollTimeoutValue);
}

private long getIdlePartitionInterval(ContainerProperties containerProperties, long pollTimeoutValue) {
protected long getIdlePartitionInterval(ContainerProperties containerProperties, long pollTimeoutValue) {
Long idlePartitionEventInterval = containerProperties.getIdlePartitionEventInterval();
return idlePartitionEventInterval != null && idlePartitionEventInterval > 0
? idlePartitionEventInterval
: pollTimeoutValue;
}

private long getPollTimeoutValue(ContainerProperties containerProperties, List<Long> backOffValues) {
if (containerProperties.getPollTimeout() != ContainerProperties.DEFAULT_POLL_TIMEOUT) {
protected long getPollTimeoutValue(ContainerProperties containerProperties, Configuration configuration) {
if (containerProperties.getPollTimeout() != ContainerProperties.DEFAULT_POLL_TIMEOUT
|| configuration.backOffValues.isEmpty()) {
return containerProperties.getPollTimeout();
}

Long lowestBackOff = backOffValues
Long lowestBackOff = configuration.backOffValues
.stream()
.min(Comparator.naturalOrder())
.orElseThrow(() -> new IllegalArgumentException("No back off values found!"));
Expand All @@ -267,14 +268,19 @@ private <T> T checkAndCast(Object obj, Class<T> clazz) {
return (T) obj;
}

private class RetryTopicListenerContainerFactoryDecorator implements KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<?, ?>> {
private class RetryTopicListenerContainerFactoryDecorator
implements KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<?, ?>> {

private final ConcurrentKafkaListenerContainerFactory<?, ?> delegate;
private final List<Long> backOffValues;
private final Configuration configuration;
private final boolean isSetContainerProperties;

RetryTopicListenerContainerFactoryDecorator(ConcurrentKafkaListenerContainerFactory<?, ?> delegate, List<Long> backOffValues) {
RetryTopicListenerContainerFactoryDecorator(ConcurrentKafkaListenerContainerFactory<?, ?> delegate,
Configuration configuration,
boolean isSetContainerProperties) {
this.delegate = delegate;
this.backOffValues = backOffValues;
this.configuration = configuration;
this.isSetContainerProperties = isSetContainerProperties;
}

@Override
Expand All @@ -283,10 +289,11 @@ private class RetryTopicListenerContainerFactoryDecorator implements KafkaListen
}

private ConcurrentMessageListenerContainer<?, ?> decorate(ConcurrentMessageListenerContainer<?, ?> listenerContainer) {
setupBackoffAwareMessageListenerAdapter(listenerContainer, this.backOffValues);
listenerContainer
.setCommonErrorHandler(createErrorHandler(
ListenerContainerFactoryConfigurer.this.deadLetterPublishingRecovererFactory.create()));
ListenerContainerFactoryConfigurer.this.deadLetterPublishingRecovererFactory.create(),
this.configuration));
setupBackoffAwareMessageListenerAdapter(listenerContainer, this.configuration, this.isSetContainerProperties);
return listenerContainer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,8 @@ private KafkaListenerContainerFactory<?> resolveAndConfigureFactoryForMainEndpoi
? this.listenerContainerFactoryConfigurer
.configureWithoutBackOffValues(resolvedFactory, configuration.forContainerFactoryConfigurer())
: this.listenerContainerFactoryConfigurer
.decorateFactoryWithoutBackOffValues(resolvedFactory, configuration.forContainerFactoryConfigurer());
.decorateFactoryWithoutSettingContainerProperties(resolvedFactory,
configuration.forContainerFactoryConfigurer());
}

@SuppressWarnings("deprecation")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ void shouldConfigureRetryEndpoints() {
defaultFactoryBeanName, factoryResolverConfig);
willReturn(containerFactory).given(this.listenerContainerFactoryConfigurer).decorateFactory(containerFactory,
lcfcConfiguration);
willReturn(containerFactory).given(this.listenerContainerFactoryConfigurer).decorateFactoryWithoutBackOffValues(containerFactory,
willReturn(containerFactory).given(this.listenerContainerFactoryConfigurer).decorateFactoryWithoutSettingContainerProperties(containerFactory,
lcfcConfiguration);

RetryTopicConfigurer configurer = new RetryTopicConfigurer(destinationTopicProcessor, containerFactoryResolver,
Expand Down

0 comments on commit 3c9e00f

Please sign in to comment.