Skip to content

Commit

Permalink
GH-2702: RetryableTopic with asyncAcks Back Port
Browse files Browse the repository at this point in the history
Resolves #2707

When using `asyncAcks` with manual ack modes, the `DefaultErrorHandler`
must have `seekAfterError` set to `false`; this required user configuration.

The framework now unconditionally sets the property when it configures a container
using a manual ack mode.

In addition, the default DLT handler was not compatible with any manual ack mode,
regardless of the `asyncAcks` setting.

Add `Acknowledgment` to the `LoggingDltListenerHandlerMethod`.

Also tested with reporter's reproducer.

**cherry-pick to 2.9.x (will require instanceof polishing for Java 8)**

* Only supply `NoOpAck` with explicit `@NonNull` param annotation.
  • Loading branch information
garyrussell committed Jun 14, 2023
1 parent 8e1575b commit 4401f96
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 22 deletions.
16 changes: 16 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/retrytopic.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,22 @@ IMPORTANT: You can set the `AckMode` mode you prefer, but `RECORD` is suggested.

IMPORTANT: At this time this functionality doesn't support class level `@KafkaListener` annotations

When using a manual `AckMode` with `asyncAcks` set to true, the `DefaultErrorHandler` must be configured with `seekAfterError` set to `false`.
Starting with versions 2.9.10, 3.0.8, this will be set to true unconditionally for such configurations.
With earlier versions, it was necessary to override the `RetryConfigurationSupport.configureCustomizers()` method to set the property to `true`.

====
[source, java]
----
@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
customizersConfigurer.customizeErrorHandler(eh -> eh.setSeekAfterError(false));
}
----
====

In addition, before those versions, using the default (logging) DLT handler was not compatible with any kind of manual `AckMode`, regardless of the `asyncAcks` property.

==== Back Off Delay Precision

===== Overview and Guarantees
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.springframework.kafka.support.KafkaUtils;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.lang.NonNull;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
Expand Down Expand Up @@ -84,6 +85,8 @@ public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerS

private static final SpelExpressionParser PARSER = new SpelExpressionParser();

private static final Acknowledgment NO_OP_ACK = new NoOpAck();

/**
* Message used when no conversion is needed.
*/
Expand Down Expand Up @@ -120,6 +123,8 @@ public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerS

private boolean hasAckParameter;

private boolean noOpAck;

private boolean hasMetadataParameter;

private boolean messageReturnType;
Expand Down Expand Up @@ -334,25 +339,29 @@ protected Message<?> toMessagingMessage(ConsumerRecord<K, V> record, @Nullable A
protected final Object invokeHandler(Object data, @Nullable Acknowledgment acknowledgment, Message<?> message,
Consumer<?, ?> consumer) {

Acknowledgment ack = acknowledgment;
if (ack == null && this.noOpAck) {
ack = NO_OP_ACK;
}
try {
if (data instanceof List && !this.isConsumerRecordList) {
return this.handlerMethod.invoke(message, acknowledgment, consumer);
return this.handlerMethod.invoke(message, ack, consumer);
}
else {
if (this.hasMetadataParameter) {
return this.handlerMethod.invoke(message, data, acknowledgment, consumer,
return this.handlerMethod.invoke(message, data, ack, consumer,
AdapterUtils.buildConsumerRecordMetadata(data));
}
else {
return this.handlerMethod.invoke(message, data, acknowledgment, consumer);
return this.handlerMethod.invoke(message, data, ack, consumer);
}
}
}
catch (org.springframework.messaging.converter.MessageConversionException ex) {
throw checkAckArg(acknowledgment, message, new MessageConversionException("Cannot handle message", ex));
throw checkAckArg(ack, message, new MessageConversionException("Cannot handle message", ex));
}
catch (MethodArgumentNotValidException ex) {
throw checkAckArg(acknowledgment, message, ex);
throw checkAckArg(ack, message, ex);
}
catch (MessagingException ex) {
throw new ListenerExecutionFailedException(createMessagingErrorMessage("Listener method could not " +
Expand Down Expand Up @@ -588,6 +597,9 @@ protected Type determineInferredType(Method method) { // NOSONAR complexity
boolean isNotConvertible = parameterIsType(parameterType, ConsumerRecord.class);
boolean isAck = parameterIsType(parameterType, Acknowledgment.class);
this.hasAckParameter |= isAck;
if (isAck) {
this.noOpAck |= methodParameter.getParameterAnnotation(NonNull.class) != null;
}
isNotConvertible |= isAck;
boolean isConsumer = parameterIsType(parameterType, Consumer.class);
isNotConvertible |= isConsumer;
Expand Down Expand Up @@ -767,4 +779,12 @@ public Object getResult() {

}

static class NoOpAck implements Acknowledgment {

@Override
public void acknowledge() {
}

}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2022 the original author or authors.
* Copyright 2018-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,6 +29,7 @@
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
Expand Down Expand Up @@ -208,12 +209,23 @@ private class RetryTopicListenerContainerFactoryDecorator
return decorate(this.delegate.createListenerContainer(endpoint));
}

private ConcurrentMessageListenerContainer<?, ?> decorate(ConcurrentMessageListenerContainer<?, ?> listenerContainer) {
private ConcurrentMessageListenerContainer<?, ?> decorate(
ConcurrentMessageListenerContainer<?, ?> listenerContainer) {

CommonErrorHandler errorHandler = createErrorHandler(
ListenerContainerFactoryConfigurer.this.deadLetterPublishingRecovererFactory.create(),
this.configuration);
if (listenerContainer.getContainerProperties().isAsyncAcks()) {
AckMode ackMode = listenerContainer.getContainerProperties().getAckMode();
if ((AckMode.MANUAL.equals(ackMode) || AckMode.MANUAL_IMMEDIATE.equals(ackMode))
&& errorHandler instanceof DefaultErrorHandler) {
((DefaultErrorHandler) errorHandler).setSeekAfterError(false);
}
}
listenerContainer
.setCommonErrorHandler(createErrorHandler(
ListenerContainerFactoryConfigurer.this.deadLetterPublishingRecovererFactory.create(),
this.configuration));
setupBackoffAwareMessageListenerAdapter(listenerContainer, this.configuration, this.isSetContainerProperties);
.setCommonErrorHandler(errorHandler);
setupBackoffAwareMessageListenerAdapter(listenerContainer, this.configuration,
this.isSetContainerProperties);
return listenerContainer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint;
import org.springframework.kafka.listener.ListenerUtils;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.EndpointHandlerMethod;
import org.springframework.kafka.support.TopicForRetryable;
import org.springframework.lang.NonNull;
import org.springframework.lang.Nullable;


Expand Down Expand Up @@ -448,15 +450,17 @@ static class LoggingDltListenerHandlerMethod {
public static final String DEFAULT_DLT_METHOD_NAME = "logMessage";

@SuppressWarnings("deprecation")
public void logMessage(Object message) {
public void logMessage(Object message, @NonNull Acknowledgment ack) {
if (message instanceof ConsumerRecord) {
LOGGER.info(() -> "Received message in dlt listener: "
+ ListenerUtils.recordToString((ConsumerRecord<?, ?>) message));
}
else {
LOGGER.info(() -> "Received message in dlt listener.");
}
ack.acknowledge();
}

}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2021 the original author or authors.
* Copyright 2019-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.BDDMockito.given;
Expand Down Expand Up @@ -147,7 +148,7 @@ void shouldCallBackoffManagerIfBackoffHeaderIsPresentAndFirstMethodIsCalled() {
then(kafkaConsumerBackoffManager).should(times(1))
.backOffIfNecessary(context);

then(delegate).should(times(1)).onMessage(data, null, null);
then(delegate).should(times(1)).onMessage(eq(data), any(), isNull());
}

@Test
Expand All @@ -159,7 +160,7 @@ void shouldWrapExceptionInTimestampedException() {
given(kafkaConsumerBackoffManager.createContext(originalTimestamp, listenerId, topicPartition, null))
.willReturn(context);
RuntimeException thrownException = new RuntimeException();
willThrow(thrownException).given(delegate).onMessage(data, null, null);
willThrow(thrownException).given(delegate).onMessage(eq(data), any(), isNull());

KafkaBackoffAwareMessageListenerAdapter<Object, Object> backoffAwareMessageListenerAdapter =
new KafkaBackoffAwareMessageListenerAdapter<>(delegate, kafkaConsumerBackoffManager, listenerId, clock);
Expand All @@ -175,7 +176,7 @@ void shouldWrapExceptionInTimestampedException() {
then(kafkaConsumerBackoffManager).should(times(1))
.backOffIfNecessary(context);

then(delegate).should(times(1)).onMessage(data, null, null);
then(delegate).should(times(1)).onMessage(eq(data), any(), isNull());
}

@Test
Expand Down Expand Up @@ -224,7 +225,7 @@ void shouldCallBackoffManagerIfBackoffHeaderIsPresentAndThirdMethodIsCalled() {
then(kafkaConsumerBackoffManager).should(times(1))
.backOffIfNecessary(context);

then(delegate).should(times(1)).onMessage(data, null, consumer);
then(delegate).should(times(1)).onMessage(eq(data), any(), eq(consumer));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2022 the original author or authors.
* Copyright 2018-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,6 +24,7 @@
import static org.mockito.BDDMockito.then;
import static org.mockito.BDDMockito.willReturn;
import static org.mockito.BDDMockito.willThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;

import java.lang.reflect.Method;
Expand Down Expand Up @@ -52,6 +53,7 @@
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint;
import org.springframework.kafka.listener.ListenerUtils;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.EndpointHandlerMethod;
import org.springframework.kafka.test.condition.LogLevels;
import org.springframework.test.util.ReflectionTestUtils;
Expand Down Expand Up @@ -367,7 +369,7 @@ void shouldLogConsumerRecordMessage() {
ListenerUtils.setLogOnlyMetadata(true);
RetryTopicConfigurer.LoggingDltListenerHandlerMethod method =
new RetryTopicConfigurer.LoggingDltListenerHandlerMethod();
method.logMessage(consumerRecordMessage);
method.logMessage(consumerRecordMessage, mock(Acknowledgment.class));
then(consumerRecordMessage).should().topic();
ListenerUtils.setLogOnlyMetadata(false);
}
Expand All @@ -376,7 +378,7 @@ void shouldLogConsumerRecordMessage() {
void shouldNotLogObjectMessage() {
RetryTopicConfigurer.LoggingDltListenerHandlerMethod method =
new RetryTopicConfigurer.LoggingDltListenerHandlerMethod();
method.logMessage(objectMessage);
method.logMessage(objectMessage, mock(Acknowledgment.class));
then(objectMessage).shouldHaveNoInteractions();
}

Expand Down
Loading

0 comments on commit 4401f96

Please sign in to comment.