diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AdapterUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AdapterUtils.java index 29fc61810a..98bd72904c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AdapterUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AdapterUtils.java @@ -20,6 +20,9 @@ import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; +import org.springframework.expression.ParserContext; +import org.springframework.expression.common.TemplateParserContext; +import org.springframework.kafka.support.KafkaHeaders; import org.springframework.lang.Nullable; /** @@ -31,6 +34,12 @@ */ public final class AdapterUtils { + /** + * Parser context for runtime SpEL using ! as the template prefix. + * @since 2.2.15 + */ + public static final ParserContext PARSER_CONTEXT = new TemplateParserContext("!{", "}"); + private AdapterUtils() { } @@ -67,4 +76,14 @@ public static ConsumerRecordMetadata buildConsumerRecordMetadata(Object data) { record.serializedValueSize()), record.timestampType()); } + /** + * Return the default expression when no SendTo value is present. + * @return the expression. + * @since 2.2.15 + */ + public static String getDefaultReplyTopicExpression() { + return PARSER_CONTEXT.getExpressionPrefix() + "source.headers['" + + KafkaHeaders.REPLY_TOPIC + "']" + PARSER_CONTEXT.getExpressionSuffix(); + } + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java index 7c2b30ea28..e676fc64ae 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java @@ -34,8 +34,6 @@ import org.springframework.core.MethodParameter; import org.springframework.core.annotation.AnnotationUtils; import org.springframework.expression.Expression; -import org.springframework.expression.ParserContext; -import org.springframework.expression.common.TemplateParserContext; import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.kafka.KafkaException; import org.springframework.kafka.support.KafkaUtils; @@ -61,8 +59,6 @@ public class DelegatingInvocableHandler { private static final SpelExpressionParser PARSER = new SpelExpressionParser(); - private static final ParserContext PARSER_CONTEXT = new TemplateParserContext("!{", "}"); - private final List handlers; private final ConcurrentMap, InvocableHandlerMethod> cachedHandlers = new ConcurrentHashMap<>(); @@ -205,16 +201,20 @@ protected InvocableHandlerMethod getHandlerForPayload(Class pa private void setupReplyTo(InvocableHandlerMethod handler) { String replyTo = null; Method method = handler.getMethod(); + SendTo ann = null; if (method != null) { - SendTo ann = AnnotationUtils.getAnnotation(method, SendTo.class); + ann = AnnotationUtils.getAnnotation(method, SendTo.class); replyTo = extractSendTo(method.toString(), ann); } - if (replyTo == null) { - SendTo ann = AnnotationUtils.getAnnotation(this.bean.getClass(), SendTo.class); + if (ann == null) { + ann = AnnotationUtils.getAnnotation(this.bean.getClass(), SendTo.class); replyTo = extractSendTo(this.getBean().getClass().getSimpleName(), ann); } + if (ann != null && replyTo == null) { + replyTo = AdapterUtils.getDefaultReplyTopicExpression(); + } if (replyTo != null) { - this.handlerSendTo.put(handler, PARSER.parseExpression(replyTo, PARSER_CONTEXT)); + this.handlerSendTo.put(handler, PARSER.parseExpression(replyTo, AdapterUtils.PARSER_CONTEXT)); } this.handlerReturnsMessage.put(handler, KafkaUtils.returnTypeMessageOrCollectionOf(method)); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java index 520fa1d075..377908f769 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java @@ -39,9 +39,7 @@ import org.springframework.core.log.LogAccessor; import org.springframework.expression.BeanResolver; import org.springframework.expression.Expression; -import org.springframework.expression.ParserContext; import org.springframework.expression.common.LiteralExpression; -import org.springframework.expression.common.TemplateParserContext; import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.expression.spel.support.StandardEvaluationContext; import org.springframework.expression.spel.support.StandardTypeConverter; @@ -84,8 +82,6 @@ public abstract class MessagingMessageListenerAdapter implements ConsumerS private static final SpelExpressionParser PARSER = new SpelExpressionParser(); - private static final ParserContext PARSER_CONTEXT = new TemplateParserContext("!{", "}"); - /** * Message used when no conversion is needed. */ @@ -201,11 +197,10 @@ public boolean isConversionNeeded() { public void setReplyTopic(String replyTopicParam) { String replyTopic = replyTopicParam; if (!StringUtils.hasText(replyTopic)) { - replyTopic = PARSER_CONTEXT.getExpressionPrefix() + "source.headers['" - + KafkaHeaders.REPLY_TOPIC + "']" + PARSER_CONTEXT.getExpressionSuffix(); + replyTopic = AdapterUtils.getDefaultReplyTopicExpression(); } - if (replyTopic.contains(PARSER_CONTEXT.getExpressionPrefix())) { - this.replyTopicExpression = PARSER.parseExpression(replyTopic, PARSER_CONTEXT); + if (replyTopic.contains(AdapterUtils.PARSER_CONTEXT.getExpressionPrefix())) { + this.replyTopicExpression = PARSER.parseExpression(replyTopic, AdapterUtils.PARSER_CONTEXT); } else { this.replyTopicExpression = new LiteralExpression(replyTopic); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java b/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java index 39fd441688..c6c050b4c9 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java @@ -103,7 +103,8 @@ ReplyingKafkaTemplateTests.E_REPLY, ReplyingKafkaTemplateTests.E_REQUEST, ReplyingKafkaTemplateTests.F_REPLY, ReplyingKafkaTemplateTests.F_REQUEST, ReplyingKafkaTemplateTests.G_REPLY, ReplyingKafkaTemplateTests.G_REQUEST, - ReplyingKafkaTemplateTests.H_REPLY, ReplyingKafkaTemplateTests.H_REQUEST }) + ReplyingKafkaTemplateTests.H_REPLY, ReplyingKafkaTemplateTests.H_REQUEST, + ReplyingKafkaTemplateTests.I_REPLY, ReplyingKafkaTemplateTests.I_REQUEST }) public class ReplyingKafkaTemplateTests { public static final String A_REPLY = "aReply"; @@ -138,6 +139,10 @@ public class ReplyingKafkaTemplateTests { public static final String H_REQUEST = "hRequest"; + public static final String I_REPLY = "iReply"; + + public static final String I_REQUEST = "iRequest"; + @Autowired private EmbeddedKafkaBroker embeddedKafka; @@ -202,6 +207,24 @@ public void testMultiListenerMessageReturn() throws Exception { } } + @Test + public void testHandlerReturn() throws Exception { + ReplyingKafkaTemplate template = createTemplate(I_REPLY); + try { + template.setDefaultReplyTimeout(Duration.ofSeconds(30)); + ProducerRecord record = new ProducerRecord<>(I_REQUEST, "foo"); + record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, I_REPLY.getBytes())); + RequestReplyFuture future = template.sendAndReceive(record); + future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok + ConsumerRecord consumerRecord = future.get(30, TimeUnit.SECONDS); + assertThat(consumerRecord.value()).isEqualTo("FOO"); + } + finally { + template.stop(); + template.destroy(); + } + } + @Test public void testMessageReturnNoHeadersProvidedByListener() throws Exception { ReplyingKafkaTemplate template = createTemplate(H_REPLY); @@ -628,6 +651,11 @@ public MultiMessageReturn mmr() { return new MultiMessageReturn(); } + @Bean + public HandlerReturn handlerReturn() { + return new HandlerReturn(); + } + @KafkaListener(id = "def1", topics = { D_REQUEST, E_REQUEST, F_REQUEST }) @SendTo // default REPLY_TOPIC header public String dListener1(String in) { @@ -678,4 +706,15 @@ public Message listen1(String in, @Header(KafkaHeaders.REPLY_TOPIC) byte[] re } + @KafkaListener(topics = I_REQUEST, groupId = I_REQUEST) + public static class HandlerReturn { + + @KafkaHandler + @SendTo + public String listen1(String in) { + return in.toUpperCase(); + } + + } + }