Skip to content

Commit

Permalink
Improve functions support
Browse files Browse the repository at this point in the history
* Add `functions-support.adoc` chapter
* Add more tests
* Improve `InboundChannelAdapterAnnotationPostProcessor` to support
Kotlin `Function0`
* Add `FunctionsTests.kt`
* Reformat Kotlin classes to use tabs
* Upgrade to Kotlin `1.2.71`

* Add `What's New` bullet

Doc Polishing
  • Loading branch information
artembilan authored and garyrussell committed Oct 9, 2018
1 parent e7bc060 commit 7176690
Show file tree
Hide file tree
Showing 12 changed files with 559 additions and 190 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
buildscript {
ext.kotlinVersion = '1.2.61'
ext.kotlinVersion = '1.2.71'
repositories {
maven { url 'https://repo.spring.io/plugins-release' }
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2016 the original author or authors.
* Copyright 2014-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -34,6 +34,7 @@
import org.springframework.integration.util.MessagingAnnotationUtils;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.ReflectionUtils;

/**
Expand All @@ -42,11 +43,27 @@
* @author Artem Bilan
* @author Gary Russell
* @author Oleg Zhurakousky
*
* @since 4.0
*/
public class InboundChannelAdapterAnnotationPostProcessor extends
AbstractMethodAnnotationPostProcessor<InboundChannelAdapter> {

private static final Class<?> kotlinFunction0Class;

static {
Class<?> kotlinClass = null;
try {
kotlinClass = ClassUtils.forName("kotlin.jvm.functions.Function0", ClassUtils.getDefaultClassLoader());
}
catch (ClassNotFoundException e) {
//Ignore: assume no Kotlin in classpath
}
finally {
kotlinFunction0Class = kotlinClass;
}
}

public InboundChannelAdapterAnnotationPostProcessor(ConfigurableListableBeanFactory beanFactory) {
super(beanFactory);
}
Expand All @@ -58,7 +75,8 @@ protected String getInputChannelAttribute() {

@Override
public Object postProcess(Object bean, String beanName, Method method, List<Annotation> annotations) {
String channelName = MessagingAnnotationUtils.resolveAttribute(annotations, AnnotationUtils.VALUE, String.class);
String channelName = MessagingAnnotationUtils
.resolveAttribute(annotations, AnnotationUtils.VALUE, String.class);
Assert.hasText(channelName, "The channel ('value' attribute of @InboundChannelAdapter) can't be empty.");

MessageSource<?> messageSource = null;
Expand Down Expand Up @@ -86,23 +104,33 @@ private MessageSource<?> createMessageSource(Object bean, String beanName, Metho
MessageSource<?> messageSource = null;
if (AnnotatedElementUtils.isAnnotated(method, Bean.class.getName())) {
Object target = this.resolveTargetBeanFromMethodWithBeanAnnotation(method);
Assert.isTrue(target instanceof MessageSource || target instanceof Supplier, "The '" + this.annotationType + "' on @Bean method " +
"level is allowed only for: " + MessageSource.class.getName() + " or " + Supplier.class.getName() + " beans");
Class<?> targetClass = target.getClass();
Assert.isTrue(MessageSource.class.isAssignableFrom(targetClass) ||
Supplier.class.isAssignableFrom(targetClass) ||
(kotlinFunction0Class == null || kotlinFunction0Class.isAssignableFrom(targetClass)),
"The '" + this.annotationType + "' on @Bean method " + "level is allowed only for: "
+ MessageSource.class.getName() + " or " + Supplier.class.getName()
+ (kotlinFunction0Class != null ? " or " + kotlinFunction0Class.getName() : "") + " beans");
if (target instanceof MessageSource<?>) {
messageSource = (MessageSource<?>) target;
}
else {
else if (target instanceof Supplier<?>) {
method = ReflectionUtils.findMethod(Supplier.class, "get");
bean = target;
}
else if (kotlinFunction0Class != null) {
method = ReflectionUtils.findMethod(kotlinFunction0Class, "invoke");
bean = target;
}
}
if (messageSource == null) {
MethodInvokingMessageSource methodInvokingMessageSource = new MethodInvokingMessageSource();
methodInvokingMessageSource.setObject(bean);
methodInvokingMessageSource.setMethod(method);
String messageSourceBeanName = this.generateHandlerBeanName(beanName, method);
this.beanFactory.registerSingleton(messageSourceBeanName, methodInvokingMessageSource);
messageSource = (MessageSource<?>) this.beanFactory.initializeBean(methodInvokingMessageSource, messageSourceBeanName);
messageSource = (MessageSource<?>) this.beanFactory
.initializeBean(methodInvokingMessageSource, messageSourceBeanName);
}
return messageSource;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;

import org.aopalliance.aop.Advice;
import org.aopalliance.intercept.MethodInterceptor;
Expand Down Expand Up @@ -526,10 +527,20 @@ public interface ControlBusGateway {
@EnableIntegration
public static class SupplierContextConfiguration1 {

@Bean
public Function<String, String> toUpperCaseFunction() {
return String::toUpperCase;
}

@Bean
public Supplier<String> stringSupplier() {
return () -> "foo";
}

@Bean
public IntegrationFlow supplierFlow() {
return IntegrationFlows.from(() -> "foo")
.<String, String>transform(String::toUpperCase)
return IntegrationFlows.from(stringSupplier())
.transform(toUpperCaseFunction())
.channel("suppliedChannel")
.get();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/integration"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beans="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd">
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

<context:annotation-config/>

<beans:bean class="org.springframework.integration.handler.ServiceActivatorDefaultFrameworkMethodTests$FunctionConfiguration"/>

<message-history/>

Expand Down Expand Up @@ -76,4 +82,6 @@
<queue />
</channel>

<service-activator input-channel="processorViaFunctionChannel" ref="functionAsService"/>

</beans:beans>
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,13 +31,15 @@
import static org.springframework.integration.test.matcher.HeaderMatcher.hasHeaderKey;

import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import org.hamcrest.Matchers;
import org.junit.Test;
import org.junit.runner.RunWith;

import org.springframework.beans.factory.BeanCreationException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
Expand Down Expand Up @@ -107,6 +109,9 @@ public class ServiceActivatorDefaultFrameworkMethodTests {
@Autowired
private PollableChannel errorChannel;

@Autowired
private MessageChannel processorViaFunctionChannel;

@Test
public void testGateway() {
QueueChannel replyChannel = new QueueChannel();
Expand Down Expand Up @@ -280,6 +285,16 @@ public void testAsyncErrorNoHeader() {
assertEquals("test", ((MessagingException) error.getPayload()).getFailedMessage().getPayload());
}

@Test
public void testFunctionFromXml() {
QueueChannel replyChannel = new QueueChannel();
Message<?> message = MessageBuilder.withPayload("test").setReplyChannel(replyChannel).build();
this.processorViaFunctionChannel.send(message);
Message<?> reply = replyChannel.receive(0);
assertNotNull(reply);
assertEquals("TEST", reply.getPayload());
}

public static void throwIllegalStateException(String message) {
throw new IllegalStateException(message);
}
Expand Down Expand Up @@ -316,6 +331,7 @@ public void handleMessage(Message<?> requestMessage) {
assertTrue(StackTraceUtils.isFrameContainingXBeforeFrameContainingY("AbstractSubscribableChannel",
"MethodInvokerHelper", st)); // close to the metal
}

}

private static class TestMessageProcessor implements MessageProcessor<String> {
Expand All @@ -331,6 +347,7 @@ public void setPrefix(String prefix) {
public String processMessage(Message<?> message) {
return prefix + ":" + message.getPayload();
}

}

private static class AsyncService {
Expand All @@ -341,11 +358,21 @@ private static class AsyncService {

@SuppressWarnings("unused")
public ListenableFuture<String> process(String payload) {
this.future = new SettableListenableFuture<String>();
this.future = new SettableListenableFuture<>();
this.payload = payload;
return this.future;
}

}

public static class FunctionConfiguration {

@Bean
public Function<String, String> functionAsService() {
return String::toUpperCase;
}

}


}

0 comments on commit 7176690

Please sign in to comment.