Skip to content

Commit

Permalink
GH-1519: Fix @sendto on @KafkaHandler
Browse files Browse the repository at this point in the history
Resolves #1519

An empty `@SendTo` on a `@KafkaListener` method means send the reply
to the `KafkaHeaders.REPLY_TOPIC` header.

This default was not applied for class-level `@KafkaListener`s.

**backport to 2.4.x, 2.3.x, 2.2.x**

(I will do the back ports, because I expect conflicts).
  • Loading branch information
garyrussell committed Nov 23, 2020
1 parent 724ac8a commit c52d736
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.listener.adapter;

import org.springframework.expression.ParserContext;
import org.springframework.expression.common.TemplateParserContext;
import org.springframework.kafka.support.KafkaHeaders;

/**
* Utilities for listener adapters.
*
* @author Gary Russell
* @since 2.3.13
*
*/
public final class AdapterUtils {

/**
* Parser context for runtime SpEL using ! as the template prefix.
* @since 2.3.13
*/
public static final ParserContext PARSER_CONTEXT = new TemplateParserContext("!{", "}");

private AdapterUtils() {
}

/**
* Return the default expression when no SendTo value is present.
* @return the expression.
* @since 2.3.13
*/
public static String getDefaultReplyTopicExpression() {
return PARSER_CONTEXT.getExpressionPrefix() + "source.headers['"
+ KafkaHeaders.REPLY_TOPIC + "']" + PARSER_CONTEXT.getExpressionSuffix();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
import org.springframework.core.MethodParameter;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.expression.Expression;
import org.springframework.expression.ParserContext;
import org.springframework.expression.common.TemplateParserContext;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.support.KafkaUtils;
Expand All @@ -59,8 +57,6 @@ public class DelegatingInvocableHandler {

private static final SpelExpressionParser PARSER = new SpelExpressionParser();

private static final ParserContext PARSER_CONTEXT = new TemplateParserContext("!{", "}");

private final List<InvocableHandlerMethod> handlers;

private final ConcurrentMap<Class<?>, InvocableHandlerMethod> cachedHandlers = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -178,16 +174,20 @@ protected InvocableHandlerMethod getHandlerForPayload(Class<? extends Object> pa
private void setupReplyTo(InvocableHandlerMethod handler) {
String replyTo = null;
Method method = handler.getMethod();
SendTo ann = null;
if (method != null) {
SendTo ann = AnnotationUtils.getAnnotation(method, SendTo.class);
ann = AnnotationUtils.getAnnotation(method, SendTo.class);
replyTo = extractSendTo(method.toString(), ann);
}
if (replyTo == null) {
SendTo ann = AnnotationUtils.getAnnotation(this.bean.getClass(), SendTo.class);
if (ann == null) {
ann = AnnotationUtils.getAnnotation(this.bean.getClass(), SendTo.class);
replyTo = extractSendTo(this.getBean().getClass().getSimpleName(), ann);
}
if (ann != null && replyTo == null) {
replyTo = AdapterUtils.getDefaultReplyTopicExpression();
}
if (replyTo != null) {
this.handlerSendTo.put(handler, PARSER.parseExpression(replyTo, PARSER_CONTEXT));
this.handlerSendTo.put(handler, PARSER.parseExpression(replyTo, AdapterUtils.PARSER_CONTEXT));
}
this.handlerReturnsMessage.put(handler, KafkaUtils.returnTypeMessageOrCollectionOf(method));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@
import org.springframework.core.log.LogAccessor;
import org.springframework.expression.BeanResolver;
import org.springframework.expression.Expression;
import org.springframework.expression.ParserContext;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.expression.common.TemplateParserContext;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.expression.spel.support.StandardTypeConverter;
Expand Down Expand Up @@ -84,8 +82,6 @@ public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerS

private static final SpelExpressionParser PARSER = new SpelExpressionParser();

private static final ParserContext PARSER_CONTEXT = new TemplateParserContext("!{", "}");

/**
* Message used when no conversion is needed.
*/
Expand Down Expand Up @@ -199,11 +195,10 @@ public boolean isConversionNeeded() {
public void setReplyTopic(String replyTopicParam) {
String replyTopic = replyTopicParam;
if (!StringUtils.hasText(replyTopic)) {
replyTopic = PARSER_CONTEXT.getExpressionPrefix() + "source.headers['"
+ KafkaHeaders.REPLY_TOPIC + "']" + PARSER_CONTEXT.getExpressionSuffix();
replyTopic = AdapterUtils.getDefaultReplyTopicExpression();
}
if (replyTopic.contains(PARSER_CONTEXT.getExpressionPrefix())) {
this.replyTopicExpression = PARSER.parseExpression(replyTopic, PARSER_CONTEXT);
if (replyTopic.contains(AdapterUtils.PARSER_CONTEXT.getExpressionPrefix())) {
this.replyTopicExpression = PARSER.parseExpression(replyTopic, AdapterUtils.PARSER_CONTEXT);
}
else {
this.replyTopicExpression = new LiteralExpression(replyTopic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@
ReplyingKafkaTemplateTests.D_REPLY, ReplyingKafkaTemplateTests.D_REQUEST,
ReplyingKafkaTemplateTests.E_REPLY, ReplyingKafkaTemplateTests.E_REQUEST,
ReplyingKafkaTemplateTests.F_REPLY, ReplyingKafkaTemplateTests.F_REQUEST,
ReplyingKafkaTemplateTests.G_REPLY, ReplyingKafkaTemplateTests.G_REQUEST,
ReplyingKafkaTemplateTests.I_REPLY, ReplyingKafkaTemplateTests.I_REQUEST,
ReplyingKafkaTemplateTests.J_REPLY, ReplyingKafkaTemplateTests.J_REQUEST,
ReplyingKafkaTemplateTests.K_REPLY, ReplyingKafkaTemplateTests.K_REQUEST })
public class ReplyingKafkaTemplateTests {
Expand Down Expand Up @@ -137,6 +139,10 @@ public class ReplyingKafkaTemplateTests {

public static final String G_REQUEST = "gRequest";

public static final String I_REPLY = "iReply";

public static final String I_REQUEST = "iRequest";

public static final String J_REPLY = "jReply";

public static final String J_REQUEST = "jRequest";
Expand Down Expand Up @@ -246,6 +252,24 @@ public void testMultiListenerMessageReturn() throws Exception {
}
}

@Test
public void testHandlerReturn() throws Exception {
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(I_REPLY);
try {
template.setDefaultReplyTimeout(Duration.ofSeconds(30));
ProducerRecord<Integer, String> record = new ProducerRecord<>(I_REQUEST, "foo");
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, I_REPLY.getBytes()));
RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, String> consumerRecord = future.get(30, TimeUnit.SECONDS);
assertThat(consumerRecord.value()).isEqualTo("FOO");
}
finally {
template.stop();
template.destroy();
}
}

@Test
public void testGoodDefaultReplyHeaders() throws Exception {
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(
Expand Down Expand Up @@ -633,6 +657,11 @@ public MultiMessageReturn mmr() {
return new MultiMessageReturn();
}

@Bean
public HandlerReturn handlerReturn() {
return new HandlerReturn();
}

@KafkaListener(id = "def1", topics = { D_REQUEST, E_REQUEST, F_REQUEST })
@SendTo // default REPLY_TOPIC header
public String dListener1(String in) {
Expand Down Expand Up @@ -685,6 +714,7 @@ public Message<?> listen1(String in, @Header(KafkaHeaders.REPLY_TOPIC) byte[] re

}


public static class BadDeser implements Deserializer<Object> {

@Override
Expand All @@ -699,4 +729,15 @@ public Object deserialize(String topic, Headers headers, byte[] data) {

}

@KafkaListener(topics = I_REQUEST, groupId = I_REQUEST)
public static class HandlerReturn {

@KafkaHandler
@SendTo
public String listen1(String in) {
return in.toUpperCase();
}

}

}

0 comments on commit c52d736

Please sign in to comment.