From 2f048be46c8a1ee2f0238d3f9fa0cac2a87d872e Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Thu, 27 Aug 2020 10:17:50 -0400 Subject: [PATCH] Fix Request/Reply with ConsumerRecord Message conversion is bypassed when consuming the raw `ConsumerRecord`. However, the request message is needed when returning a result (for reply topic determination, correlation, etc). **I will do the backports; I expect conflicts in the test.** --- .../MessagingMessageListenerAdapter.java | 2 +- .../ReplyingKafkaTemplateTests.java | 31 ++++++++++++++++++- 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java index 35b47d8bd5..7930d268ef 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java @@ -579,7 +579,7 @@ else if (methodParameter.hasParameterAnnotation(Header.class)) { } } - if (notConvertibleParameters == method.getParameterCount()) { + if (notConvertibleParameters == method.getParameterCount() && method.getReturnType().equals(void.class)) { this.conversionNeeded = false; } boolean validParametersForBatch = method.getGenericParameterTypes().length <= allowedBatchParameters; diff --git a/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java b/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java index 96f60dc39f..2699dd91a4 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java @@ -106,7 +106,8 @@ ReplyingKafkaTemplateTests.D_REPLY, ReplyingKafkaTemplateTests.D_REQUEST, ReplyingKafkaTemplateTests.E_REPLY, ReplyingKafkaTemplateTests.E_REQUEST, ReplyingKafkaTemplateTests.F_REPLY, ReplyingKafkaTemplateTests.F_REQUEST, - ReplyingKafkaTemplateTests.J_REPLY, ReplyingKafkaTemplateTests.J_REQUEST }) + ReplyingKafkaTemplateTests.J_REPLY, ReplyingKafkaTemplateTests.J_REQUEST, + ReplyingKafkaTemplateTests.K_REPLY, ReplyingKafkaTemplateTests.K_REQUEST }) public class ReplyingKafkaTemplateTests { public static final String A_REPLY = "aReply"; @@ -141,6 +142,10 @@ public class ReplyingKafkaTemplateTests { public static final String J_REQUEST = "jRequest"; + public static final String K_REPLY = "kReply"; + + public static final String K_REQUEST = "kRequest"; + @Autowired private EmbeddedKafkaBroker embeddedKafka; @@ -187,6 +192,24 @@ public void testGood() throws Exception { } } + @Test + void testConsumerRecord() throws Exception { + ReplyingKafkaTemplate template = createTemplate(K_REPLY); + try { + template.setDefaultReplyTimeout(Duration.ofSeconds(30)); + Headers headers = new RecordHeaders(); + ProducerRecord record = new ProducerRecord<>(K_REQUEST, null, null, null, "foo", headers); + RequestReplyFuture future = template.sendAndReceive(record); + future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok + ConsumerRecord consumerRecord = future.get(30, TimeUnit.SECONDS); + assertThat(consumerRecord.value()).isEqualTo("FOO"); + } + finally { + template.stop(); + template.destroy(); + } + } + @Test public void testBadDeserialize() throws Exception { ReplyingKafkaTemplate template = createTemplate(J_REPLY, true); @@ -669,6 +692,12 @@ public void gListener(Message in) { public String handleJ(String in) throws InterruptedException { return in.toUpperCase(); } + @KafkaListener(id = K_REQUEST, topics = { K_REQUEST }) + + @SendTo + public String handleK(ConsumerRecord in) { + return in.value().toUpperCase(); + } }