Skip to content
This repository has been archived by the owner on Mar 30, 2023. It is now read-only.

Commit

Permalink
Fix NPE for retryTemp!=null but recoveryCall==null
Browse files Browse the repository at this point in the history
Starting with version `3.0.x`, the `KafkaInboundGateway` and
`KafkaMessageDrivenChannelAdapter` rely on the
`RetryContext.getRetryCount()` for the
`IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT` message header, but
the `RetryContext` is obtained from the `TheadLocal` `attributesHolder`,
which is populated from the `RetryListener.open()` only if
`recoveryCallback != null`

* Fix `KafkaInboundGateway` and `KafkaMessageDrivenChannelAdapter` to
populate `TheadLocal` `attributesHolder` with the `RetryContext`
from the `RetryListener.open()` when only `retryTemplate != null`

**Cherry-pick to `master` & 3.0.x**

# Conflicts:
#	src/test/java/org/springframework/integration/kafka/inbound/InboundGatewayTests.java
#	src/test/java/org/springframework/integration/kafka/inbound/MessageDrivenAdapterTests.java
  • Loading branch information
ukeller authored and artembilan committed Jul 18, 2019
1 parent 54ed9be commit 7f135ba
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
*
* @author Gary Russell
* @author Artem Bilan
* @author Urs Keller
*
* @since 3.0.2
*
Expand Down Expand Up @@ -272,8 +273,7 @@ public void onMessage(ConsumerRecord<K, V> record, Acknowledgment acknowledgment
}
}
else {
KafkaInboundGateway.this.logger.debug("Converter returned a null message for: "
+ record);
KafkaInboundGateway.this.logger.debug("Converter returned a null message for: " + record);
}
}

Expand Down Expand Up @@ -324,7 +324,7 @@ private Message<?> enhanceReply(Message<?> message, Message<?> reply) {

@Override
public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
if (KafkaInboundGateway.this.recoveryCallback != null) {
if (KafkaInboundGateway.this.retryTemplate != null) {
attributesHolder.set(context);
}
return true;
Expand All @@ -333,6 +333,7 @@ public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback
@Override
public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback,
Throwable throwable) {

attributesHolder.remove();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2018 the original author or authors.
* Copyright 2015-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -69,6 +69,7 @@
* @author Marius Bogoevici
* @author Gary Russell
* @author Artem Bilan
* @author Urs Keller
*
*/
public class KafkaMessageDrivenChannelAdapter<K, V> extends MessageProducerSupport implements OrderlyShutdownCapable,
Expand Down Expand Up @@ -449,7 +450,7 @@ private Message<?> addDeliveryAttemptHeader(Message<?> message) {

@Override
public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
if (KafkaMessageDrivenChannelAdapter.this.recoveryCallback != null) {
if (KafkaMessageDrivenChannelAdapter.this.retryTemplate != null) {
attributesHolder.set(context);
}
return true;
Expand All @@ -458,6 +459,7 @@ public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback
@Override
public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback,
Throwable throwable) {

attributesHolder.remove();
}

Expand Down Expand Up @@ -504,7 +506,7 @@ public void onMessage(List<ConsumerRecord<K, V>> records, Acknowledgment acknowl

@Override
public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
if (KafkaMessageDrivenChannelAdapter.this.recoveryCallback != null) {
if (KafkaMessageDrivenChannelAdapter.this.retryTemplate != null) {
attributesHolder.set(context);
}
return true;
Expand All @@ -513,6 +515,7 @@ public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback
@Override
public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback,
Throwable throwable) {

attributesHolder.remove();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018 the original author or authors.
* Copyright 2018-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -58,13 +58,17 @@
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.backoff.NoBackOffPolicy;
import org.springframework.retry.listener.RetryListenerSupport;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;

/**
*
* @author Gary Russell
* @author Urs Keller
*
* @since 3.0.2
*
Expand All @@ -83,9 +87,11 @@ public class InboundGatewayTests {

private static String topic6 = "testTopic6";

private static String topic7 = "testTopic7";

@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, topic1, topic2, topic3, topic4, topic5,
topic6);
topic6, topic7);

@Rule
public Log4j2LevelAdjuster adjuster = Log4j2LevelAdjuster.trace().categories("org.apache.kafka.clients",
Expand Down Expand Up @@ -322,4 +328,61 @@ public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowle
gateway.stop();
}

@Test
public void testInboundRetryErrorRecoverWithoutRecocveryCallback() throws Exception {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("replyHandler4", "false", embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
ConsumerFactory<Integer, String> cf2 = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<Integer, String> consumer = cf2.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, topic7);

Map<String, Object> props = KafkaTestUtils.consumerProps("test4", "false", embeddedKafka);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
ContainerProperties containerProps = new ContainerProperties(topic7);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(topic7);
KafkaInboundGateway<Integer, String, String> gateway = new KafkaInboundGateway<>(container, template);
MessageChannel out = new DirectChannel() {

@Override
protected boolean doSend(Message<?> message, long timeout) {
throw new RuntimeException("intended");
}

};
gateway.setRequestChannel(out);
gateway.setBeanFactory(mock(BeanFactory.class));
gateway.setReplyTimeout(30_000);
RetryTemplate retryTemplate = new RetryTemplate();
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(5);
retryTemplate.setRetryPolicy(retryPolicy);
retryTemplate.setBackOffPolicy(new NoBackOffPolicy());
final CountDownLatch retryCountLatch = new CountDownLatch(retryPolicy.getMaxAttempts());
retryTemplate.registerListener(new RetryListenerSupport() {

@Override
public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback,
Throwable throwable) {
retryCountLatch.countDown();
}
});
gateway.setRetryTemplate(retryTemplate);
gateway.afterPropertiesSet();
gateway.start();
ContainerTestUtils.waitForAssignment(container, 2);

template.sendDefault(0, 1487694048607L, 1, "foo");

assertThat(retryCountLatch.await(10, TimeUnit.SECONDS)).isTrue();

gateway.stop();
consumer.close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.listener.RetryListenerSupport;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;

Expand All @@ -90,6 +93,7 @@
* @author Gary Russell
* @author Artem Bilan
* @author Biju Kunjummen
* @author Urs Keller
*
* @since 2.0
*
Expand All @@ -106,8 +110,10 @@ public class MessageDrivenAdapterTests {

private static String topic5 = "testTopic5";

private static String topic6 = "testTopic6";

@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, topic1, topic2, topic3, topic4, topic5);
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, topic1, topic2, topic3, topic4, topic5, topic6);

@Test
public void testInboundRecord() throws Exception {
Expand Down Expand Up @@ -246,6 +252,61 @@ protected boolean doSend(Message<?> message, long timeout) {
adapter.stop();
}


/**
* the recovery callback is not mandatory, if not set and retries are exhausted the last throwable is rethrown
* to the consumer.
*/
@Test
public void testInboundRecordRetryRecoverWithoutRecoveryCallback() throws Exception {
Map<String, Object> props = KafkaTestUtils.consumerProps("test6", "true", embeddedKafka);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
ContainerProperties containerProps = new ContainerProperties(topic6);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);

KafkaMessageDrivenChannelAdapter<Integer, String> adapter = new KafkaMessageDrivenChannelAdapter<>(container);
MessageChannel out = new DirectChannel() {

@Override
protected boolean doSend(Message<?> message, long timeout) {
throw new RuntimeException("intended");
}

};
adapter.setOutputChannel(out);
RetryTemplate retryTemplate = new RetryTemplate();
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(2);
retryTemplate.setRetryPolicy(retryPolicy);
final CountDownLatch retryCountLatch = new CountDownLatch(retryPolicy.getMaxAttempts());
retryTemplate.registerListener(new RetryListenerSupport() {

@Override
public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback,
Throwable throwable) {
retryCountLatch.countDown();
}
});
adapter.setRetryTemplate(retryTemplate);

adapter.afterPropertiesSet();
adapter.start();
ContainerTestUtils.waitForAssignment(container, 2);

Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(topic6);
template.sendDefault(1, "foo");

assertThat(retryCountLatch.await(10, TimeUnit.SECONDS)).isTrue();

adapter.stop();
pf.destroy();
}

@Test
public void testInboundRecordNoRetryRecover() throws Exception {
Map<String, Object> props = KafkaTestUtils.consumerProps("test5", "true", embeddedKafka);
Expand Down

0 comments on commit 7f135ba

Please sign in to comment.