Skip to content

Commit

Permalink
Allow Functions as Service Activators (#2575)
Browse files Browse the repository at this point in the history
* Allow Functions as Service Activators

* Fix `MessagingMethodInvokerHelper` to extract canonical method for the
`Function` and `Consumer` beans
* Demonstrate in test how `Function` and `Consumer` can be configured
with the Messaging annotations

* Don't mutate method argument
  • Loading branch information
artembilan authored and garyrussell committed Oct 2, 2018
1 parent 6cb8c24 commit 8b4d1e6
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -676,7 +678,8 @@ private boolean contentTypeIsJson(Message<?> message) {
}

private Map<String, Map<Class<?>, HandlerMethod>> findHandlerMethodsForTarget(final Object targetObject,
final Class<? extends Annotation> annotationType, final String methodName, final boolean requiresReply) {
final Class<? extends Annotation> annotationType, final String methodNameToUse,
final boolean requiresReply) {

Map<String, Map<Class<?>, HandlerMethod>> handlerMethods = new HashMap<>();

Expand All @@ -687,6 +690,25 @@ private Map<String, Map<Class<?>, HandlerMethod>> findHandlerMethodsForTarget(fi
final AtomicReference<Class<?>> ambiguousFallbackType = new AtomicReference<>();
final AtomicReference<Class<?>> ambiguousFallbackMessageGenericType = new AtomicReference<>();
final Class<?> targetClass = getTargetClass(targetObject);

final String methodName;

if (methodNameToUse == null) {
if (Function.class.isAssignableFrom(targetClass)) {
methodName = "apply";
}
else if (Consumer.class.isAssignableFrom(targetClass)) {
methodName = "accept";
}
else {
methodName = null;
}
}
else {
methodName = methodNameToUse;
}


MethodFilter methodFilter = new UniqueMethodFilter(targetClass);
ReflectionUtils.doWithMethods(targetClass, method1 -> {
boolean matchesAnnotation = false;
Expand Down Expand Up @@ -874,7 +896,8 @@ private void findSingleSpecifMethodOnInterfacesIfProxy(final Object targetObject
}
Method theMethod = targetMethod.get();
if (theMethod != null) {
theMethod = org.springframework.util.ClassUtils.getMostSpecificMethod(theMethod, targetObject.getClass());
theMethod = org.springframework.util.ClassUtils
.getMostSpecificMethod(theMethod, targetObject.getClass());
InvocableHandlerMethod invocableHandlerMethod =
this.messageHandlerMethodFactory.createInvocableHandlerMethod(targetObject, theMethod);
HandlerMethod handlerMethod = new HandlerMethod(invocableHandlerMethod, this.canProcessMessageList);
Expand Down Expand Up @@ -1250,6 +1273,7 @@ private void setExclusiveTargetParameterType(TypeDescriptor targetParameterType,

this.exclusiveMethodParameter = methodParameter;
}

}

public static class ParametersWrapper {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2016 the original author or authors.
* Copyright 2014-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,16 +18,19 @@

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;

Expand All @@ -36,6 +39,7 @@
import org.hamcrest.Matchers;
import org.junit.Test;
import org.junit.runner.RunWith;

import org.springframework.beans.factory.BeanCreationException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
Expand Down Expand Up @@ -66,6 +70,7 @@
import org.springframework.integration.config.EnableMessageHistory;
import org.springframework.integration.core.MessageSelector;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.endpoint.AbstractEndpoint;
import org.springframework.integration.endpoint.EventDrivenConsumer;
import org.springframework.integration.endpoint.SourcePollingChannelAdapter;
import org.springframework.integration.filter.ExpressionEvaluatingSelector;
Expand All @@ -78,6 +83,7 @@
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
Expand All @@ -86,6 +92,7 @@
* @author Artem Bilan
* @author Gary Russell
* @author Oleg Zhurakousky
*
* @since 4.0
*/
@ContextConfiguration(classes = MessagingAnnotationsWithBeanAnnotationTests.ContextConfiguration.class)
Expand Down Expand Up @@ -125,14 +132,29 @@ public class MessagingAnnotationsWithBeanAnnotationTests {
@Autowired
private PollableChannel counterErrorChannel;

@Autowired
private MessageChannel functionServiceChannel;

@Autowired
private MessageChannel functionMessageServiceChannel;

@Autowired
private MessageChannel consumerServiceChannel;

@Autowired
private List<String> stringCollector;

@Autowired
private MessageChannel messageConsumerServiceChannel;

@Test
public void testMessagingAnnotationsFlow() {
Stream.of(this.sourcePollingChannelAdapters).forEach(a -> a.start());
Stream.of(this.sourcePollingChannelAdapters).forEach(AbstractEndpoint::start);
//this.sourcePollingChannelAdapter.start();
for (int i = 0; i < 10; i++) {
Message<?> receive = this.discardChannel.receive(10000);
assertNotNull(receive);
assertTrue(((Integer) receive.getPayload()) % 2 == 0);
assertEquals(0, ((Integer) receive.getPayload()) % 2);

receive = this.counterErrorChannel.receive(10000);
assertNotNull(receive);
Expand All @@ -145,8 +167,8 @@ public void testMessagingAnnotationsFlow() {
" rejected Message"));

}
for (Message<?> message : collector) {
assertFalse(((Integer) message.getPayload()) % 2 == 0);
for (Message<?> message : this.collector) {
assertNotEquals(0, ((Integer) message.getPayload()) % 2);
MessageHistory messageHistory = MessageHistory.read(message);
assertNotNull(messageHistory);
String messageHistoryString = messageHistory.toString();
Expand All @@ -163,6 +185,43 @@ public void testMessagingAnnotationsFlow() {
assertNull(this.skippedChannel);
assertNull(this.skippedChannel2);
assertNull(this.skippedMessageSource);

QueueChannel replyChannel = new QueueChannel();

Message<String> message = MessageBuilder.withPayload("foo")
.setReplyChannel(replyChannel)
.build();

this.functionServiceChannel.send(message);

Message<?> receive = replyChannel.receive(10_000);

assertNotNull(receive);
assertEquals("FOO", receive.getPayload());

message = MessageBuilder.withPayload("BAR")
.setReplyChannel(replyChannel)
.build();

this.functionMessageServiceChannel.send(message);

receive = replyChannel.receive(10_000);

assertNotNull(receive);
assertEquals("bar", receive.getPayload());

this.consumerServiceChannel.send(new GenericMessage<>("baz"));

assertFalse(this.stringCollector.isEmpty());
assertEquals("baz", this.stringCollector.iterator().next());

this.collector.clear();

this.messageConsumerServiceChannel.send(new GenericMessage<>("123"));

assertFalse(this.collector.isEmpty());
Message<?> next = this.collector.iterator().next();
assertEquals("123", next.getPayload());
}

@Test
Expand Down Expand Up @@ -201,7 +260,7 @@ public MessageSource<Integer> counterMessageSource(final AtomicInteger counter)
@InboundChannelAdapter(value = "routerChannel", autoStartup = "false",
poller = @Poller(fixedRate = "10", maxMessagesPerPoll = "1", errorChannel = "counterErrorChannel"))
public Supplier<Integer> counterMessageSupplier(final AtomicInteger counter) {
return () -> counter.incrementAndGet();
return counter::incrementAndGet;
}

@Bean
Expand Down Expand Up @@ -275,7 +334,7 @@ public PollableChannel discardChannel() {

@Bean
public List<Message<?>> collector() {
return new ArrayList<Message<?>>();
return new ArrayList<>();
}

@Bean
Expand All @@ -286,8 +345,7 @@ public MessageChannel serviceChannel() {
@Bean
@ServiceActivator(inputChannel = "serviceChannel")
public MessageHandler service() {
final List<Message<?>> collector = this.collector();
return message -> collector.add(message);
return collector()::add;
}

@Bean
Expand Down Expand Up @@ -323,6 +381,50 @@ public MessageSource<?> skippedMessageSource() {
return () -> new GenericMessage<>("foo");
}


@Bean
@Transformer(inputChannel = "functionServiceChannel")
public Function<String, String> functionAsService() {
return String::toUpperCase;
}

@Bean
@ServiceActivator(inputChannel = "functionMessageServiceChannel")
public Function<Message<String>, String> messageFunctionAsService() {
return new Function<Message<String>, String>() { // Has to be interface for proper type inferring

@Override
public String apply(Message<String> m) {
return m.getPayload().toLowerCase();
}

};
}

@Bean
public List<String> stringCollector() {
return new ArrayList<>();
}

@Bean
@ServiceActivator(inputChannel = "consumerServiceChannel")
public Consumer<String> consumerAsService() {
return stringCollector()::add;
}

@Bean
@ServiceActivator(inputChannel = "messageConsumerServiceChannel")
public Consumer<Message<?>> messageConsumerAsService() {
return new Consumer<Message<?>>() { // Has to be interface for proper type inferring

@Override
public void accept(Message<?> e) {
collector().add(e);
}

};
}

}

@Configuration
Expand All @@ -337,5 +439,4 @@ public MessageHandler splitter() {

}


}

0 comments on commit 8b4d1e6

Please sign in to comment.