diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java index f29c81bb353..417353682d5 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java @@ -460,7 +460,7 @@ public B controlBus() { * @see GenericEndpointSpec */ public B controlBus(Consumer> endpointConfigurer) { - return this.handle(new ServiceActivatingHandler(new ExpressionCommandMessageProcessor( + return handle(new ServiceActivatingHandler(new ExpressionCommandMessageProcessor( new ControlBusMethodFilter())), endpointConfigurer); } @@ -537,7 +537,9 @@ public B transform(Object service, String methodName, } /** - * Populate the {@link MessageTransformingHandler} instance for the provided {@link GenericTransformer}. + * Populate the {@link MessageTransformingHandler} instance for the provided + * {@link GenericTransformer}. Use {@link #transform(Class, GenericTransformer)} if + * you need to access the entire message. * @param genericTransformer the {@link GenericTransformer} to populate. * @param the source type - 'transform from'. * @param the target type - 'transform to'. @@ -589,11 +591,16 @@ public B transform(MessageProcessorSpec messageProcessorSpec, } /** - * Populate the {@link MessageTransformingHandler} instance for the provided {@link GenericTransformer} - * for the specific {@code payloadType} to convert at runtime. - * @param payloadType the {@link Class} for expected payload type. + * Populate the {@link MessageTransformingHandler} instance for the provided + * {@link GenericTransformer} for the specific {@code payloadType} to convert at + * runtime. + * Use {@link #transform(Class, GenericTransformer)} if you need access to the + * entire message. + * @param payloadType the {@link Class} for expected payload type. It can also be + * {@code Message.class} if you wish to access the entire message in the transformer. + * Conversion to this type will be attempted, if necessary. * @param genericTransformer the {@link GenericTransformer} to populate. - * @param

the payload type - 'transform from'. + * @param

the payload type - 'transform from' or {@code Message.class}. * @param the target type - 'transform to'. * @return the current {@link IntegrationFlowDefinition}. * @see MethodInvokingTransformer @@ -604,10 +611,14 @@ public B transform(Class

payloadType, GenericTransformer generic } /** - * Populate the {@link MessageTransformingHandler} instance for the provided {@link GenericTransformer}. - * In addition accept options for the integration endpoint using {@link GenericEndpointSpec}. + * Populate the {@link MessageTransformingHandler} instance for the provided + * {@link GenericTransformer}. In addition accept options for the integration endpoint + * using {@link GenericEndpointSpec}. Use + * {@link #transform(Class, GenericTransformer, Consumer)} if you need to access the + * entire message. * @param genericTransformer the {@link GenericTransformer} to populate. - * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. + * @param endpointConfigurer the {@link Consumer} to provide integration endpoint + * options. * @param the source type - 'transform from'. * @param the target type - 'transform to'. * @return the current {@link IntegrationFlowDefinition}. @@ -617,17 +628,20 @@ public B transform(Class

payloadType, GenericTransformer generic */ public B transform(GenericTransformer genericTransformer, Consumer> endpointConfigurer) { - return this.transform(null, genericTransformer, endpointConfigurer); + + return transform(null, genericTransformer, endpointConfigurer); } /** * Populate the {@link MessageTransformingHandler} instance for the provided {@link GenericTransformer} * for the specific {@code payloadType} to convert at runtime. * In addition accept options for the integration endpoint using {@link GenericEndpointSpec}. - * @param payloadType the {@link Class} for expected payload type. + * @param payloadType the {@link Class} for expected payload type. It can also be + * {@code Message.class} if you wish to access the entire message in the transformer. + * Conversion to this type will be attempted, if necessary. * @param genericTransformer the {@link GenericTransformer} to populate. * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. - * @param

the payload type - 'transform from'. + * @param

the payload type - 'transform from', or {@code Message.class}. * @param the target type - 'transform to'. * @return the current {@link IntegrationFlowDefinition}. * @see MethodInvokingTransformer @@ -724,6 +738,8 @@ public B filter(Object service, String methodName, Consumer * .filter("World"::equals) * } * + * Use {@link #filter(Class, GenericSelector)} if you need to access the entire + * message. * @param genericSelector the {@link GenericSelector} to use. * @param

the source payload type. * @return the current {@link IntegrationFlowDefinition}. @@ -779,14 +795,16 @@ public B filter(MessageProcessorSpec messageProcessorSpec, Consumer p.after(new Date())) * } * - * @param payloadType the {@link Class} for desired {@code payload} type. + * @param payloadType the {@link Class} for expected payload type. It can also be + * {@code Message.class} if you wish to access the entire message in the selector. + * Conversion to this type will be attempted, if necessary. * @param genericSelector the {@link GenericSelector} to use. - * @param

the source payload type. + * @param

the source payload type or {@code Message.class}. * @return the current {@link IntegrationFlowDefinition}. * @see LambdaMessageProcessor */ public

B filter(Class

payloadType, GenericSelector

genericSelector) { - return this.filter(payloadType, genericSelector, null); + return filter(payloadType, genericSelector, null); } /** @@ -799,6 +817,8 @@ public

B filter(Class

payloadType, GenericSelector

genericSelector) { * .filter("World"::equals, e -> e.autoStartup(false)) * } * + * Use {@link #filter(Class, GenericSelector, Consumer)} if you need to access the entire + * message. * @param genericSelector the {@link GenericSelector} to use. * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. * @param

the source payload type. @@ -819,10 +839,12 @@ public

B filter(GenericSelector

genericSelector, Consumer p.after(new Date()), e -> e.autoStartup(false)) * } * - * @param payloadType the {@link Class} for desired {@code payload} type. + * @param payloadType the {@link Class} for expected payload type. It can also be + * {@code Message.class} if you wish to access the entire message in the selector. + * Conversion to this type will be attempted, if necessary. * @param genericSelector the {@link GenericSelector} to use. * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. - * @param

the source payload type. + * @param

the source payload type or {@code Message.class}. * @return the current {@link IntegrationFlowDefinition}. * @see LambdaMessageProcessor * @see FilterEndpointSpec @@ -880,7 +902,7 @@ public B handle(MessageHandler messageHandler) { * @return the current {@link IntegrationFlowDefinition}. */ public B handle(String beanName, String methodName) { - return this.handle(beanName, methodName, null); + return handle(beanName, methodName, null); } /** @@ -955,6 +977,8 @@ public B handle(Object service, String methodName, * .handle((p, h) -> p / 2) * } * + * Use {@link #handle(Class, GenericHandler)} if you need to access the entire + * message. * @param handler the handler to invoke. * @param

the payload type to expect. * @return the current {@link IntegrationFlowDefinition}. @@ -975,6 +999,8 @@ public

B handle(GenericHandler

handler) { * .handle((p, h) -> p / 2, e -> e.autoStartup(false)) * } * + * Use {@link #handle(Class, GenericHandler, Consumer)} if you need to access the entire + * message. * @param handler the handler to invoke. * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. * @param

the payload type to expect. @@ -984,7 +1010,7 @@ public

B handle(GenericHandler

handler) { */ public

B handle(GenericHandler

handler, Consumer> endpointConfigurer) { - return this.handle(null, handler, endpointConfigurer); + return handle(null, handler, endpointConfigurer); } /** @@ -997,15 +1023,16 @@ public

B handle(GenericHandler

handler, * .handle(Integer.class, (p, h) -> p / 2) * } * - * @param payloadType the expected payload type. - * The accepted payload can be converted to this one at runtime + * @param payloadType the {@link Class} for expected payload type. It can also be + * {@code Message.class} if you wish to access the entire message in the handler. + * Conversion to this type will be attempted, if necessary. * @param handler the handler to invoke. - * @param

the payload type to expect. + * @param

the payload type to expect, or {@code Message.class}. * @return the current {@link IntegrationFlowDefinition}. * @see LambdaMessageProcessor */ public

B handle(Class

payloadType, GenericHandler

handler) { - return this.handle(payloadType, handler, null); + return handle(payloadType, handler, null); } /** @@ -1019,11 +1046,12 @@ public

B handle(Class

payloadType, GenericHandler

handler) { * .handle(Integer.class, (p, h) -> p / 2, e -> e.autoStartup(false)) * } * - * @param payloadType the expected payload type. - * The accepted payload can be converted to this one at runtime + * @param payloadType the {@link Class} for expected payload type. It can also be + * {@code Message.class} if you wish to access the entire message in the handler. + * Conversion to this type will be attempted, if necessary. * @param handler the handler to invoke. * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. - * @param

the payload type to expect. + * @param

the payload type to expect or {@code Message.class}. * @return the current {@link IntegrationFlowDefinition}. * @see LambdaMessageProcessor */ @@ -1036,7 +1064,7 @@ public

B handle(Class

payloadType, GenericHandler

handler, else { serviceActivatingHandler = new ServiceActivatingHandler(handler, ClassUtils.HANDLER_HANDLE_METHOD); } - return this.handle(serviceActivatingHandler, endpointConfigurer); + return handle(serviceActivatingHandler, endpointConfigurer); } /** @@ -1241,9 +1269,9 @@ public B enrichHeaders(MapBuilder headers, /** * Accept a {@link Map} of values to be used for the - * {@link org.springframework.messaging.Message} header enrichment. + * {@link Message} header enrichment. * {@code values} can apply an {@link org.springframework.expression.Expression} - * to be evaluated against a request {@link org.springframework.messaging.Message}. + * to be evaluated against a request {@link Message}. * @param headers the Map of headers to enrich. * @return the current {@link IntegrationFlowDefinition}. */ @@ -1253,9 +1281,9 @@ public B enrichHeaders(Map headers) { /** * Accept a {@link Map} of values to be used for the - * {@link org.springframework.messaging.Message} header enrichment. + * {@link Message} header enrichment. * {@code values} can apply an {@link org.springframework.expression.Expression} - * to be evaluated against a request {@link org.springframework.messaging.Message}. + * to be evaluated against a request {@link Message}. * @param headers the Map of headers to enrich. * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. * @return the current {@link IntegrationFlowDefinition}. @@ -1296,7 +1324,7 @@ public B enrichHeaders(Consumer headerEnricherConfigurer) { * @return the current {@link IntegrationFlowDefinition}. */ public B split() { - return this.split((Consumer>) null); + return split((Consumer>) null); } /** @@ -1314,7 +1342,7 @@ public B split() { * @see SplitterEndpointSpec */ public B split(Consumer> endpointConfigurer) { - return this.split(new DefaultMessageSplitter(), endpointConfigurer); + return split(new DefaultMessageSplitter(), endpointConfigurer); } /** @@ -1398,7 +1426,7 @@ public B split(Object service, String methodName, * @return the current {@link IntegrationFlowDefinition}. */ public B split(String beanName, String methodName) { - return this.split(beanName, methodName, null); + return split(beanName, methodName, null); } /** @@ -1474,9 +1502,11 @@ public B split(MessageProcessorSpec messageProcessorSpec, * new Foo(rs.getInt(1), rs.getString(2))))) * } * - * @param payloadType the expected payload type. Used at runtime to convert received payload type to. + * @param payloadType the {@link Class} for expected payload type. It can also be + * {@code Message.class} if you wish to access the entire message in the splitter. + * Conversion to this type will be attempted, if necessary. * @param splitter the splitter {@link Function}. - * @param

the payload type. + * @param

the payload type or {@code Message.class}. * @return the current {@link IntegrationFlowDefinition}. * @see LambdaMessageProcessor */ @@ -1528,20 +1558,23 @@ public

B split(Function splitter, * , e -> e.applySequence(false)) * } * - * @param payloadType the expected payload type. Used at runtime to convert received payload type to. + * @param payloadType the {@link Class} for expected payload type. It can also be + * {@code Message.class} if you wish to access the entire message in the splitter. + * Conversion to this type will be attempted, if necessary. * @param splitter the splitter {@link Function}. * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. - * @param

the payload type. + * @param

the payload type or {@code Message.class}. * @return the current {@link IntegrationFlowDefinition}. * @see LambdaMessageProcessor * @see SplitterEndpointSpec */ public

B split(Class

payloadType, Function splitter, Consumer> endpointConfigurer) { + MethodInvokingSplitter split = isLambda(splitter) ? new MethodInvokingSplitter(new LambdaMessageProcessor(splitter, payloadType)) : new MethodInvokingSplitter(splitter, ClassUtils.FUNCTION_APPLY_METHOD); - return this.split(split, endpointConfigurer); + return split(split, endpointConfigurer); } /** @@ -1631,7 +1664,7 @@ public B headerFilter(String headersToRemove, boolean patternMatch) { */ public B headerFilter(HeaderFilter headerFilter, Consumer> endpointConfigurer) { - return this.transform(headerFilter, endpointConfigurer); + return transform(headerFilter, endpointConfigurer); } /** @@ -1655,7 +1688,7 @@ public B claimCheckIn(MessageStore messageStore) { */ public B claimCheckIn(MessageStore messageStore, Consumer> endpointConfigurer) { - return this.transform(new ClaimCheckInTransformer(messageStore), endpointConfigurer); + return transform(new ClaimCheckInTransformer(messageStore), endpointConfigurer); } /** @@ -1696,7 +1729,7 @@ public B claimCheckOut(MessageStore messageStore, boolean removeMessage, Consumer> endpointConfigurer) { ClaimCheckOutTransformer claimCheckOutTransformer = new ClaimCheckOutTransformer(messageStore); claimCheckOutTransformer.setRemoveMessage(removeMessage); - return this.transform(claimCheckOutTransformer, endpointConfigurer); + return transform(claimCheckOutTransformer, endpointConfigurer); } /** @@ -1861,6 +1894,7 @@ public B route(String expression, Consumer p.equals("foo") || p.equals("bar") ? new String[] {"foo", "bar"} : null) * } * + * Use {@link #route(Class, Function)} if you need to access the entire message. * @param router the {@link Function} to use. * @param the source payload type. * @param the target result type. @@ -1879,9 +1913,11 @@ public B route(Function router) { * .route(Integer.class, p -> p % 2 == 0) * } * - * @param payloadType the expected payload type. + * @param payloadType the {@link Class} for expected payload type. It can also be + * {@code Message.class} if you wish to access the entire message in the splitter. + * Conversion to this type will be attempted, if necessary. * @param router the {@link Function} to use. - * @param the source payload type. + * @param the source payload type or {@code Message.class}. * @param the target result type. * @return the current {@link IntegrationFlowDefinition}. * @see LambdaMessageProcessor @@ -1904,6 +1940,7 @@ public B route(Class payloadType, Function router) { * .applySequence(false)) * } * + * Use {@link #route(Class, Function, Consumer)} if you need to access the entire message. * @param router the {@link Function} to use. * @param routerConfigurer the {@link Consumer} to provide {@link MethodInvokingRouter} options. * @param the source payload type. @@ -1928,16 +1965,19 @@ public B route(Function router, Consumer - * @param payloadType the expected payload type. + * @param payloadType the {@link Class} for expected payload type. It can also be + * {@code Message.class} if you wish to access the entire message in the splitter. + * Conversion to this type will be attempted, if necessary. * @param router the {@link Function} to use. * @param routerConfigurer the {@link Consumer} to provide {@link MethodInvokingRouter} options. - * @param

the source payload type. + * @param

the source payload type or {@code Message.class}. * @param the target result type. * @return the current {@link IntegrationFlowDefinition}. * @see LambdaMessageProcessor */ public B route(Class

payloadType, Function router, Consumer> routerConfigurer) { + MethodInvokingRouter methodInvokingRouter = isLambda(router) ? new MethodInvokingRouter(new LambdaMessageProcessor(router, payloadType)) : new MethodInvokingRouter(router, ClassUtils.FUNCTION_APPLY_METHOD); diff --git a/spring-integration-core/src/main/java/org/springframework/integration/handler/LambdaMessageProcessor.java b/spring-integration-core/src/main/java/org/springframework/integration/handler/LambdaMessageProcessor.java index 0deef01ef06..dde3c0a8146 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/handler/LambdaMessageProcessor.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/handler/LambdaMessageProcessor.java @@ -22,6 +22,9 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanFactoryAware; @@ -39,11 +42,14 @@ * - functional interface implementations. * * @author Artem Bilan + * @author Gary Russell * * @since 5.0 */ public class LambdaMessageProcessor implements MessageProcessor, BeanFactoryAware { + private static final Log logger = LogFactory.getLog(LambdaMessageProcessor.class); + private final Object target; private final Method method; @@ -128,6 +134,12 @@ else if (Map.class.isAssignableFrom(parameterType)) { return this.method.invoke(this.target, args); } catch (InvocationTargetException e) { + if (e.getTargetException() instanceof ClassCastException) { + logger.error("Could not invoke the method due to a class cast exception, if using a lambda in the DSL, " + + "consider using an overloaded EIP method that takes a Class argument to explicitly " + + "specify the type. An example of when this often occurs is if the lambda is configured to " + + "receive a Message argument.", e.getCause()); + } throw new MessageHandlingException(message, e.getCause()); } catch (Exception e) { diff --git a/spring-integration-core/src/test/java/org/springframework/integration/dsl/LambdaMessageProcessorTests.java b/spring-integration-core/src/test/java/org/springframework/integration/dsl/LambdaMessageProcessorTests.java index f8b62a00026..91420d6f2ed 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/dsl/LambdaMessageProcessorTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/dsl/LambdaMessageProcessorTests.java @@ -16,6 +16,7 @@ package org.springframework.integration.dsl; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertThat; @@ -67,6 +68,15 @@ public Message transform(Message source) { assertSame(testMessage, result); } + @Test + public void testMessageAsArgumentLambda() { + LambdaMessageProcessor lmp = new LambdaMessageProcessor( + (GenericTransformer, Message>) source -> messageTransformer(source), null); + lmp.setBeanFactory(mock(BeanFactory.class)); + GenericMessage testMessage = new GenericMessage<>("foo"); + assertThatThrownBy(() -> lmp.processMessage(testMessage)).hasCauseExactlyInstanceOf(ClassCastException.class); + } + private void handle(GenericHandler h) { LambdaMessageProcessor lmp = new LambdaMessageProcessor(h, String.class); lmp.setBeanFactory(mock(BeanFactory.class)); diff --git a/src/reference/asciidoc/dsl.adoc b/src/reference/asciidoc/dsl.adoc index e8c5bb0c4ba..911a7dff6c8 100644 --- a/src/reference/asciidoc/dsl.adoc +++ b/src/reference/asciidoc/dsl.adoc @@ -108,6 +108,30 @@ The above example composes a sequence of `Filter -> Transformer -> Service Activ The flow is 'one way', that is it does not provide a a reply message but simply prints the payload to STDOUT. The endpoints are automatically wired together using direct channels. +[[java-dsl-class-cast]] +.Lambdas And `Message` Arguments +IMPORTANT: When using lambdas in EIP methods, the "input" argument is generally the message payload. +If you wish to access the entire message, use one of the overloaded methods that take a `Class` as the first parameter. +For example, this won't work: + +==== +[source, java] +---- +., Foo>transform(m -> newFooFromMessage(m)) +---- +==== + +This will fail at runtime with a `ClassCastException` because the lambda doesn't retain the argument type and the framework will attempt to cast the payload to a `Message`. + +Instead, use: + +==== +[source, java] +---- +.(Message.class, m -> newFooFromMessage(m)) +---- +==== + [[java-dsl-channels]] === Message Channels @@ -258,6 +282,8 @@ Nevertheless, the DSL parser takes care about bean declarations for inline objec See `Transformers` Java Docs for more information and supported factory methods. +Also see <>. + [[java-dsl-inbound-adapters]] === Inbound Channel Adapters @@ -360,6 +386,8 @@ public IntegrationFlow recipientListFlow() { The `.defaultOutputToParentFlow()` of the `.routeToRecipients()` allows to make the router's `defaultOutput` as a gateway to continue a process for the unmatched messages in the main flow. +Also see <>. + [[java-dsl-splitters]] === Splitters @@ -383,6 +411,8 @@ public IntegrationFlow splitFlow() { This creates a splitter that splits a message containing a comma delimited String. Note: the `getT2()` method comes from `Tuple` `Collection` which is the result of `EndpointSpec.get()` and represents a pair of `ConsumerEndpointFactoryBean` and `DefaultMessageSplitter` for the example above. +Also see <>. + [[java-dsl-aggregators]] === Aggregators and Resequencers @@ -458,6 +488,8 @@ public IntegrationFlow integerFlow() { Of course we register some custom `BytesToIntegerConverter` within `ConversionService` and get rid of that additional `.transform()`. +Also see <>. + [[java-dsl-log]] === Operator log()