Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;

import org.springframework.expression.ParserContext;
import org.springframework.expression.common.TemplateParserContext;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.lang.Nullable;

/**
Expand All @@ -31,6 +34,12 @@
*/
public final class AdapterUtils {

/**
* Parser context for runtime SpEL using ! as the template prefix.
* @since 2.2.15
*/
public static final ParserContext PARSER_CONTEXT = new TemplateParserContext("!{", "}");

private AdapterUtils() {
}

Expand Down Expand Up @@ -67,4 +76,14 @@ public static ConsumerRecordMetadata buildConsumerRecordMetadata(Object data) {
record.serializedValueSize()), record.timestampType());
}

/**
* Return the default expression when no SendTo value is present.
* @return the expression.
* @since 2.2.15
*/
public static String getDefaultReplyTopicExpression() {
return PARSER_CONTEXT.getExpressionPrefix() + "source.headers['"
+ KafkaHeaders.REPLY_TOPIC + "']" + PARSER_CONTEXT.getExpressionSuffix();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
import org.springframework.core.MethodParameter;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.expression.Expression;
import org.springframework.expression.ParserContext;
import org.springframework.expression.common.TemplateParserContext;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.support.KafkaUtils;
Expand All @@ -61,8 +59,6 @@ public class DelegatingInvocableHandler {

private static final SpelExpressionParser PARSER = new SpelExpressionParser();

private static final ParserContext PARSER_CONTEXT = new TemplateParserContext("!{", "}");

private final List<InvocableHandlerMethod> handlers;

private final ConcurrentMap<Class<?>, InvocableHandlerMethod> cachedHandlers = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -205,16 +201,20 @@ protected InvocableHandlerMethod getHandlerForPayload(Class<? extends Object> pa
private void setupReplyTo(InvocableHandlerMethod handler) {
String replyTo = null;
Method method = handler.getMethod();
SendTo ann = null;
if (method != null) {
SendTo ann = AnnotationUtils.getAnnotation(method, SendTo.class);
ann = AnnotationUtils.getAnnotation(method, SendTo.class);
replyTo = extractSendTo(method.toString(), ann);
}
if (replyTo == null) {
SendTo ann = AnnotationUtils.getAnnotation(this.bean.getClass(), SendTo.class);
if (ann == null) {
ann = AnnotationUtils.getAnnotation(this.bean.getClass(), SendTo.class);
replyTo = extractSendTo(this.getBean().getClass().getSimpleName(), ann);
}
if (ann != null && replyTo == null) {
replyTo = AdapterUtils.getDefaultReplyTopicExpression();
}
if (replyTo != null) {
this.handlerSendTo.put(handler, PARSER.parseExpression(replyTo, PARSER_CONTEXT));
this.handlerSendTo.put(handler, PARSER.parseExpression(replyTo, AdapterUtils.PARSER_CONTEXT));
}
this.handlerReturnsMessage.put(handler, KafkaUtils.returnTypeMessageOrCollectionOf(method));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@
import org.springframework.core.log.LogAccessor;
import org.springframework.expression.BeanResolver;
import org.springframework.expression.Expression;
import org.springframework.expression.ParserContext;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.expression.common.TemplateParserContext;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.expression.spel.support.StandardTypeConverter;
Expand Down Expand Up @@ -84,8 +82,6 @@ public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerS

private static final SpelExpressionParser PARSER = new SpelExpressionParser();

private static final ParserContext PARSER_CONTEXT = new TemplateParserContext("!{", "}");

/**
* Message used when no conversion is needed.
*/
Expand Down Expand Up @@ -201,11 +197,10 @@ public boolean isConversionNeeded() {
public void setReplyTopic(String replyTopicParam) {
String replyTopic = replyTopicParam;
if (!StringUtils.hasText(replyTopic)) {
replyTopic = PARSER_CONTEXT.getExpressionPrefix() + "source.headers['"
+ KafkaHeaders.REPLY_TOPIC + "']" + PARSER_CONTEXT.getExpressionSuffix();
replyTopic = AdapterUtils.getDefaultReplyTopicExpression();
}
if (replyTopic.contains(PARSER_CONTEXT.getExpressionPrefix())) {
this.replyTopicExpression = PARSER.parseExpression(replyTopic, PARSER_CONTEXT);
if (replyTopic.contains(AdapterUtils.PARSER_CONTEXT.getExpressionPrefix())) {
this.replyTopicExpression = PARSER.parseExpression(replyTopic, AdapterUtils.PARSER_CONTEXT);
}
else {
this.replyTopicExpression = new LiteralExpression(replyTopic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@
ReplyingKafkaTemplateTests.E_REPLY, ReplyingKafkaTemplateTests.E_REQUEST,
ReplyingKafkaTemplateTests.F_REPLY, ReplyingKafkaTemplateTests.F_REQUEST,
ReplyingKafkaTemplateTests.G_REPLY, ReplyingKafkaTemplateTests.G_REQUEST,
ReplyingKafkaTemplateTests.H_REPLY, ReplyingKafkaTemplateTests.H_REQUEST })
ReplyingKafkaTemplateTests.H_REPLY, ReplyingKafkaTemplateTests.H_REQUEST,
ReplyingKafkaTemplateTests.I_REPLY, ReplyingKafkaTemplateTests.I_REQUEST })
public class ReplyingKafkaTemplateTests {

public static final String A_REPLY = "aReply";
Expand Down Expand Up @@ -138,6 +139,10 @@ public class ReplyingKafkaTemplateTests {

public static final String H_REQUEST = "hRequest";

public static final String I_REPLY = "iReply";

public static final String I_REQUEST = "iRequest";

@Autowired
private EmbeddedKafkaBroker embeddedKafka;

Expand Down Expand Up @@ -202,6 +207,24 @@ public void testMultiListenerMessageReturn() throws Exception {
}
}

@Test
public void testHandlerReturn() throws Exception {
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(I_REPLY);
try {
template.setDefaultReplyTimeout(Duration.ofSeconds(30));
ProducerRecord<Integer, String> record = new ProducerRecord<>(I_REQUEST, "foo");
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, I_REPLY.getBytes()));
RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, String> consumerRecord = future.get(30, TimeUnit.SECONDS);
assertThat(consumerRecord.value()).isEqualTo("FOO");
}
finally {
template.stop();
template.destroy();
}
}

@Test
public void testMessageReturnNoHeadersProvidedByListener() throws Exception {
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(H_REPLY);
Expand Down Expand Up @@ -628,6 +651,11 @@ public MultiMessageReturn mmr() {
return new MultiMessageReturn();
}

@Bean
public HandlerReturn handlerReturn() {
return new HandlerReturn();
}

@KafkaListener(id = "def1", topics = { D_REQUEST, E_REQUEST, F_REQUEST })
@SendTo // default REPLY_TOPIC header
public String dListener1(String in) {
Expand Down Expand Up @@ -678,4 +706,15 @@ public Message<?> listen1(String in, @Header(KafkaHeaders.REPLY_TOPIC) byte[] re

}

@KafkaListener(topics = I_REQUEST, groupId = I_REQUEST)
public static class HandlerReturn {

@KafkaHandler
@SendTo
public String listen1(String in) {
return in.toUpperCase();
}

}

}