Skip to content

Commit

Permalink
GH-1631: fix missing @sendto for cglib proxies
Browse files Browse the repository at this point in the history
Resolves #1631

GH-1631: add test

Fix imports.
  • Loading branch information
Andrey Dyomin authored and garyrussell committed Nov 23, 2020
1 parent 79a9e9a commit 65cec04
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,9 @@ private void setupReplyTo(InvocableHandlerMethod handler) {
replyTo = extractSendTo(method.toString(), ann);
}
if (ann == null) {
ann = AnnotationUtils.getAnnotation(this.bean.getClass(), SendTo.class);
replyTo = extractSendTo(this.getBean().getClass().getSimpleName(), ann);
Class<?> beanType = handler.getBeanType();
ann = AnnotationUtils.getAnnotation(beanType, SendTo.class);
replyTo = extractSendTo(beanType.getSimpleName(), ann);
}
if (ann != null && replyTo == null) {
replyTo = AdapterUtils.getDefaultReplyTopicExpression();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@
import javax.validation.ValidationException;
import javax.validation.constraints.Max;

import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
Expand All @@ -62,13 +66,20 @@
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import org.springframework.aop.framework.ProxyFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.context.annotation.Role;
import org.springframework.context.event.EventListener;
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.core.convert.converter.Converter;
import org.springframework.data.web.JsonPath;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
Expand Down Expand Up @@ -161,6 +172,8 @@
"annotated38", "annotated38reply" })
public class EnableKafkaIntegrationTests {

private static final Log logger = LogFactory.getLog(EnableKafkaIntegrationTests.class);

private static final String DEFAULT_TEST_GROUP_ID = "testAnnot";

@Autowired
Expand Down Expand Up @@ -1207,6 +1220,13 @@ public IfaceListener<String> ifaceListener() {
return new IfaceListenerImpl();
}

@Bean
@Order(Ordered.HIGHEST_PRECEDENCE)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public ProxyListenerPostProcessor proxyListenerPostProcessor() {
return new ProxyListenerPostProcessor();
}

@Bean
public MultiListenerBean multiListener() {
return new MultiListenerBean();
Expand Down Expand Up @@ -1883,6 +1903,27 @@ public void registerSeekCallback(ConsumerSeekCallback callback) {

}

static class ProxyListenerPostProcessor implements BeanPostProcessor {

@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
if ("multiListenerSendTo".equals(beanName)) {
ProxyFactory proxyFactory = new ProxyFactory(bean);
proxyFactory.setProxyTargetClass(true);
proxyFactory.addAdvice(new MethodInterceptor() {
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
logger.info(String.format("Proxy listener for %s.$s",
invocation.getMethod().getDeclaringClass(), invocation.getMethod().getName()));
return invocation.proceed();
}
});
return proxyFactory.getProxy();
}
return bean;
}
}

public static class SeekToLastOnIdleListener extends AbstractConsumerSeekAware {

private final CountDownLatch latch1 = new CountDownLatch(10);
Expand Down

0 comments on commit 65cec04

Please sign in to comment.