diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java index 5a46d715ea..f71d4a5c83 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java @@ -180,8 +180,9 @@ private void setupReplyTo(InvocableHandlerMethod handler) { replyTo = extractSendTo(method.toString(), ann); } if (ann == null) { - ann = AnnotationUtils.getAnnotation(this.bean.getClass(), SendTo.class); - replyTo = extractSendTo(this.getBean().getClass().getSimpleName(), ann); + Class beanType = handler.getBeanType(); + ann = AnnotationUtils.getAnnotation(beanType, SendTo.class); + replyTo = extractSendTo(beanType.getSimpleName(), ann); } if (ann != null && replyTo == null) { replyTo = AdapterUtils.getDefaultReplyTopicExpression(); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java index 8c8d733c7a..5b8efbd9b9 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java @@ -47,6 +47,10 @@ import javax.validation.ValidationException; import javax.validation.constraints.Max; +import org.aopalliance.intercept.MethodInterceptor; +import org.aopalliance.intercept.MethodInvocation; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -62,14 +66,21 @@ import org.junit.jupiter.api.Test; import org.mockito.Mockito; +import org.springframework.aop.framework.ProxyFactory; +import org.springframework.beans.BeansException; import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.config.BeanDefinition; +import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; +import org.springframework.context.annotation.Role; import org.springframework.context.event.EventListener; import org.springframework.context.support.PropertySourcesPlaceholderConfigurer; import org.springframework.core.MethodParameter; +import org.springframework.core.Ordered; +import org.springframework.core.annotation.Order; import org.springframework.core.convert.converter.Converter; import org.springframework.data.web.JsonPath; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; @@ -163,6 +174,8 @@ "annotated38", "annotated38reply", "annotated39"}) public class EnableKafkaIntegrationTests { + private static final Log logger = LogFactory.getLog(EnableKafkaIntegrationTests.class); + private static final String DEFAULT_TEST_GROUP_ID = "testAnnot"; @Autowired @@ -1218,6 +1231,13 @@ public IfaceListener ifaceListener() { return new IfaceListenerImpl(); } + @Bean + @Order(Ordered.HIGHEST_PRECEDENCE) + @Role(BeanDefinition.ROLE_INFRASTRUCTURE) + public ProxyListenerPostProcessor proxyListenerPostProcessor() { + return new ProxyListenerPostProcessor(); + } + @Bean public MultiListenerBean multiListener() { return new MultiListenerBean(); @@ -1936,6 +1956,27 @@ public void registerSeekCallback(ConsumerSeekCallback callback) { } + static class ProxyListenerPostProcessor implements BeanPostProcessor { + + @Override + public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { + if ("multiListenerSendTo".equals(beanName)) { + ProxyFactory proxyFactory = new ProxyFactory(bean); + proxyFactory.setProxyTargetClass(true); + proxyFactory.addAdvice(new MethodInterceptor() { + @Override + public Object invoke(MethodInvocation invocation) throws Throwable { + logger.info(String.format("Proxy listener for %s.$s", + invocation.getMethod().getDeclaringClass(), invocation.getMethod().getName())); + return invocation.proceed(); + } + }); + return proxyFactory.getProxy(); + } + return bean; + } + } + public static class SeekToLastOnIdleListener extends AbstractConsumerSeekAware { private final CountDownLatch latch1 = new CountDownLatch(10);