Skip to content

Commit

Permalink
Log error for class cast exception on lambda
Browse files Browse the repository at this point in the history
**cherry-pick to 5.0.x**

* Fix test

* Polish log message.

* Polishing - docs and javadocs.

# Conflicts:
#	spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java
  • Loading branch information
garyrussell authored and artembilan committed Nov 19, 2018
1 parent 47a4b28 commit ffaa4ee
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 46 deletions.
Expand Up @@ -460,7 +460,7 @@ public B controlBus() {
* @see GenericEndpointSpec
*/
public B controlBus(Consumer<GenericEndpointSpec<ServiceActivatingHandler>> endpointConfigurer) {
return this.handle(new ServiceActivatingHandler(new ExpressionCommandMessageProcessor(
return handle(new ServiceActivatingHandler(new ExpressionCommandMessageProcessor(
new ControlBusMethodFilter())), endpointConfigurer);
}

Expand Down Expand Up @@ -537,7 +537,9 @@ public B transform(Object service, String methodName,
}

/**
* Populate the {@link MessageTransformingHandler} instance for the provided {@link GenericTransformer}.
* Populate the {@link MessageTransformingHandler} instance for the provided
* {@link GenericTransformer}. Use {@link #transform(Class, GenericTransformer)} if
* you need to access the entire message.
* @param genericTransformer the {@link GenericTransformer} to populate.
* @param <S> the source type - 'transform from'.
* @param <T> the target type - 'transform to'.
Expand Down Expand Up @@ -589,11 +591,16 @@ public B transform(MessageProcessorSpec<?> messageProcessorSpec,
}

/**
* Populate the {@link MessageTransformingHandler} instance for the provided {@link GenericTransformer}
* for the specific {@code payloadType} to convert at runtime.
* @param payloadType the {@link Class} for expected payload type.
* Populate the {@link MessageTransformingHandler} instance for the provided
* {@link GenericTransformer} for the specific {@code payloadType} to convert at
* runtime.
* Use {@link #transform(Class, GenericTransformer)} if you need access to the
* entire message.
* @param payloadType the {@link Class} for expected payload type. It can also be
* {@code Message.class} if you wish to access the entire message in the transformer.
* Conversion to this type will be attempted, if necessary.
* @param genericTransformer the {@link GenericTransformer} to populate.
* @param <P> the payload type - 'transform from'.
* @param <P> the payload type - 'transform from' or {@code Message.class}.
* @param <T> the target type - 'transform to'.
* @return the current {@link IntegrationFlowDefinition}.
* @see MethodInvokingTransformer
Expand All @@ -604,10 +611,14 @@ public <P, T> B transform(Class<P> payloadType, GenericTransformer<P, T> generic
}

/**
* Populate the {@link MessageTransformingHandler} instance for the provided {@link GenericTransformer}.
* In addition accept options for the integration endpoint using {@link GenericEndpointSpec}.
* Populate the {@link MessageTransformingHandler} instance for the provided
* {@link GenericTransformer}. In addition accept options for the integration endpoint
* using {@link GenericEndpointSpec}. Use
* {@link #transform(Class, GenericTransformer, Consumer)} if you need to access the
* entire message.
* @param genericTransformer the {@link GenericTransformer} to populate.
* @param endpointConfigurer the {@link Consumer} to provide integration endpoint options.
* @param endpointConfigurer the {@link Consumer} to provide integration endpoint
* options.
* @param <S> the source type - 'transform from'.
* @param <T> the target type - 'transform to'.
* @return the current {@link IntegrationFlowDefinition}.
Expand All @@ -617,17 +628,20 @@ public <P, T> B transform(Class<P> payloadType, GenericTransformer<P, T> generic
*/
public <S, T> B transform(GenericTransformer<S, T> genericTransformer,
Consumer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) {
return this.transform(null, genericTransformer, endpointConfigurer);

return transform(null, genericTransformer, endpointConfigurer);
}

/**
* Populate the {@link MessageTransformingHandler} instance for the provided {@link GenericTransformer}
* for the specific {@code payloadType} to convert at runtime.
* In addition accept options for the integration endpoint using {@link GenericEndpointSpec}.
* @param payloadType the {@link Class} for expected payload type.
* @param payloadType the {@link Class} for expected payload type. It can also be
* {@code Message.class} if you wish to access the entire message in the transformer.
* Conversion to this type will be attempted, if necessary.
* @param genericTransformer the {@link GenericTransformer} to populate.
* @param endpointConfigurer the {@link Consumer} to provide integration endpoint options.
* @param <P> the payload type - 'transform from'.
* @param <P> the payload type - 'transform from', or {@code Message.class}.
* @param <T> the target type - 'transform to'.
* @return the current {@link IntegrationFlowDefinition}.
* @see MethodInvokingTransformer
Expand Down Expand Up @@ -724,6 +738,8 @@ public B filter(Object service, String methodName, Consumer<FilterEndpointSpec>
* .filter("World"::equals)
* }
* </pre>
* Use {@link #filter(Class, GenericSelector)} if you need to access the entire
* message.
* @param genericSelector the {@link GenericSelector} to use.
* @param <P> the source payload type.
* @return the current {@link IntegrationFlowDefinition}.
Expand Down Expand Up @@ -779,14 +795,16 @@ public B filter(MessageProcessorSpec<?> messageProcessorSpec, Consumer<FilterEnd
* .filter(Date.class, p -> p.after(new Date()))
* }
* </pre>
* @param payloadType the {@link Class} for desired {@code payload} type.
* @param payloadType the {@link Class} for expected payload type. It can also be
* {@code Message.class} if you wish to access the entire message in the selector.
* Conversion to this type will be attempted, if necessary.
* @param genericSelector the {@link GenericSelector} to use.
* @param <P> the source payload type.
* @param <P> the source payload type or {@code Message.class}.
* @return the current {@link IntegrationFlowDefinition}.
* @see LambdaMessageProcessor
*/
public <P> B filter(Class<P> payloadType, GenericSelector<P> genericSelector) {
return this.filter(payloadType, genericSelector, null);
return filter(payloadType, genericSelector, null);
}

/**
Expand All @@ -799,6 +817,8 @@ public <P> B filter(Class<P> payloadType, GenericSelector<P> genericSelector) {
* .filter("World"::equals, e -> e.autoStartup(false))
* }
* </pre>
* Use {@link #filter(Class, GenericSelector, Consumer)} if you need to access the entire
* message.
* @param genericSelector the {@link GenericSelector} to use.
* @param endpointConfigurer the {@link Consumer} to provide integration endpoint options.
* @param <P> the source payload type.
Expand All @@ -819,10 +839,12 @@ public <P> B filter(GenericSelector<P> genericSelector, Consumer<FilterEndpointS
* .filter(Date.class, p -> p.after(new Date()), e -> e.autoStartup(false))
* }
* </pre>
* @param payloadType the {@link Class} for desired {@code payload} type.
* @param payloadType the {@link Class} for expected payload type. It can also be
* {@code Message.class} if you wish to access the entire message in the selector.
* Conversion to this type will be attempted, if necessary.
* @param genericSelector the {@link GenericSelector} to use.
* @param endpointConfigurer the {@link Consumer} to provide integration endpoint options.
* @param <P> the source payload type.
* @param <P> the source payload type or {@code Message.class}.
* @return the current {@link IntegrationFlowDefinition}.
* @see LambdaMessageProcessor
* @see FilterEndpointSpec
Expand Down Expand Up @@ -880,7 +902,7 @@ public B handle(MessageHandler messageHandler) {
* @return the current {@link IntegrationFlowDefinition}.
*/
public B handle(String beanName, String methodName) {
return this.handle(beanName, methodName, null);
return handle(beanName, methodName, null);
}

/**
Expand Down Expand Up @@ -955,6 +977,8 @@ public B handle(Object service, String methodName,
* .<Integer>handle((p, h) -> p / 2)
* }
* </pre>
* Use {@link #handle(Class, GenericHandler)} if you need to access the entire
* message.
* @param handler the handler to invoke.
* @param <P> the payload type to expect.
* @return the current {@link IntegrationFlowDefinition}.
Expand All @@ -975,6 +999,8 @@ public <P> B handle(GenericHandler<P> handler) {
* .<Integer>handle((p, h) -> p / 2, e -> e.autoStartup(false))
* }
* </pre>
* Use {@link #handle(Class, GenericHandler, Consumer)} if you need to access the entire
* message.
* @param handler the handler to invoke.
* @param endpointConfigurer the {@link Consumer} to provide integration endpoint options.
* @param <P> the payload type to expect.
Expand All @@ -984,7 +1010,7 @@ public <P> B handle(GenericHandler<P> handler) {
*/
public <P> B handle(GenericHandler<P> handler,
Consumer<GenericEndpointSpec<ServiceActivatingHandler>> endpointConfigurer) {
return this.handle(null, handler, endpointConfigurer);
return handle(null, handler, endpointConfigurer);
}

/**
Expand All @@ -997,15 +1023,16 @@ public <P> B handle(GenericHandler<P> handler,
* .handle(Integer.class, (p, h) -> p / 2)
* }
* </pre>
* @param payloadType the expected payload type.
* The accepted payload can be converted to this one at runtime
* @param payloadType the {@link Class} for expected payload type. It can also be
* {@code Message.class} if you wish to access the entire message in the handler.
* Conversion to this type will be attempted, if necessary.
* @param handler the handler to invoke.
* @param <P> the payload type to expect.
* @param <P> the payload type to expect, or {@code Message.class}.
* @return the current {@link IntegrationFlowDefinition}.
* @see LambdaMessageProcessor
*/
public <P> B handle(Class<P> payloadType, GenericHandler<P> handler) {
return this.handle(payloadType, handler, null);
return handle(payloadType, handler, null);
}

/**
Expand All @@ -1019,11 +1046,12 @@ public <P> B handle(Class<P> payloadType, GenericHandler<P> handler) {
* .handle(Integer.class, (p, h) -> p / 2, e -> e.autoStartup(false))
* }
* </pre>
* @param payloadType the expected payload type.
* The accepted payload can be converted to this one at runtime
* @param payloadType the {@link Class} for expected payload type. It can also be
* {@code Message.class} if you wish to access the entire message in the handler.
* Conversion to this type will be attempted, if necessary.
* @param handler the handler to invoke.
* @param endpointConfigurer the {@link Consumer} to provide integration endpoint options.
* @param <P> the payload type to expect.
* @param <P> the payload type to expect or {@code Message.class}.
* @return the current {@link IntegrationFlowDefinition}.
* @see LambdaMessageProcessor
*/
Expand All @@ -1036,7 +1064,7 @@ public <P> B handle(Class<P> payloadType, GenericHandler<P> handler,
else {
serviceActivatingHandler = new ServiceActivatingHandler(handler, ClassUtils.HANDLER_HANDLE_METHOD);
}
return this.handle(serviceActivatingHandler, endpointConfigurer);
return handle(serviceActivatingHandler, endpointConfigurer);
}

/**
Expand Down Expand Up @@ -1241,9 +1269,9 @@ public B enrichHeaders(MapBuilder<?, String, Object> headers,

/**
* Accept a {@link Map} of values to be used for the
* {@link org.springframework.messaging.Message} header enrichment.
* {@link Message} header enrichment.
* {@code values} can apply an {@link org.springframework.expression.Expression}
* to be evaluated against a request {@link org.springframework.messaging.Message}.
* to be evaluated against a request {@link Message}.
* @param headers the Map of headers to enrich.
* @return the current {@link IntegrationFlowDefinition}.
*/
Expand All @@ -1253,9 +1281,9 @@ public B enrichHeaders(Map<String, Object> headers) {

/**
* Accept a {@link Map} of values to be used for the
* {@link org.springframework.messaging.Message} header enrichment.
* {@link Message} header enrichment.
* {@code values} can apply an {@link org.springframework.expression.Expression}
* to be evaluated against a request {@link org.springframework.messaging.Message}.
* to be evaluated against a request {@link Message}.
* @param headers the Map of headers to enrich.
* @param endpointConfigurer the {@link Consumer} to provide integration endpoint options.
* @return the current {@link IntegrationFlowDefinition}.
Expand Down Expand Up @@ -1296,7 +1324,7 @@ public B enrichHeaders(Consumer<HeaderEnricherSpec> headerEnricherConfigurer) {
* @return the current {@link IntegrationFlowDefinition}.
*/
public B split() {
return this.split((Consumer<SplitterEndpointSpec<DefaultMessageSplitter>>) null);
return split((Consumer<SplitterEndpointSpec<DefaultMessageSplitter>>) null);
}

/**
Expand All @@ -1314,7 +1342,7 @@ public B split() {
* @see SplitterEndpointSpec
*/
public B split(Consumer<SplitterEndpointSpec<DefaultMessageSplitter>> endpointConfigurer) {
return this.split(new DefaultMessageSplitter(), endpointConfigurer);
return split(new DefaultMessageSplitter(), endpointConfigurer);
}

/**
Expand Down Expand Up @@ -1398,7 +1426,7 @@ public B split(Object service, String methodName,
* @return the current {@link IntegrationFlowDefinition}.
*/
public B split(String beanName, String methodName) {
return this.split(beanName, methodName, null);
return split(beanName, methodName, null);
}

/**
Expand Down Expand Up @@ -1474,9 +1502,11 @@ public B split(MessageProcessorSpec<?> messageProcessorSpec,
* new Foo(rs.getInt(1), rs.getString(2)))))
* }
* </pre>
* @param payloadType the expected payload type. Used at runtime to convert received payload type to.
* @param payloadType the {@link Class} for expected payload type. It can also be
* {@code Message.class} if you wish to access the entire message in the splitter.
* Conversion to this type will be attempted, if necessary.
* @param splitter the splitter {@link Function}.
* @param <P> the payload type.
* @param <P> the payload type or {@code Message.class}.
* @return the current {@link IntegrationFlowDefinition}.
* @see LambdaMessageProcessor
*/
Expand Down Expand Up @@ -1528,20 +1558,23 @@ public <P> B split(Function<P, ?> splitter,
* , e -> e.applySequence(false))
* }
* </pre>
* @param payloadType the expected payload type. Used at runtime to convert received payload type to.
* @param payloadType the {@link Class} for expected payload type. It can also be
* {@code Message.class} if you wish to access the entire message in the splitter.
* Conversion to this type will be attempted, if necessary.
* @param splitter the splitter {@link Function}.
* @param endpointConfigurer the {@link Consumer} to provide integration endpoint options.
* @param <P> the payload type.
* @param <P> the payload type or {@code Message.class}.
* @return the current {@link IntegrationFlowDefinition}.
* @see LambdaMessageProcessor
* @see SplitterEndpointSpec
*/
public <P> B split(Class<P> payloadType, Function<P, ?> splitter,
Consumer<SplitterEndpointSpec<MethodInvokingSplitter>> endpointConfigurer) {

MethodInvokingSplitter split = isLambda(splitter)
? new MethodInvokingSplitter(new LambdaMessageProcessor(splitter, payloadType))
: new MethodInvokingSplitter(splitter, ClassUtils.FUNCTION_APPLY_METHOD);
return this.split(split, endpointConfigurer);
return split(split, endpointConfigurer);
}

/**
Expand Down Expand Up @@ -1631,7 +1664,7 @@ public B headerFilter(String headersToRemove, boolean patternMatch) {
*/
public B headerFilter(HeaderFilter headerFilter,
Consumer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) {
return this.transform(headerFilter, endpointConfigurer);
return transform(headerFilter, endpointConfigurer);
}

/**
Expand All @@ -1655,7 +1688,7 @@ public B claimCheckIn(MessageStore messageStore) {
*/
public B claimCheckIn(MessageStore messageStore,
Consumer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) {
return this.transform(new ClaimCheckInTransformer(messageStore), endpointConfigurer);
return transform(new ClaimCheckInTransformer(messageStore), endpointConfigurer);
}

/**
Expand Down Expand Up @@ -1696,7 +1729,7 @@ public B claimCheckOut(MessageStore messageStore, boolean removeMessage,
Consumer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) {
ClaimCheckOutTransformer claimCheckOutTransformer = new ClaimCheckOutTransformer(messageStore);
claimCheckOutTransformer.setRemoveMessage(removeMessage);
return this.transform(claimCheckOutTransformer, endpointConfigurer);
return transform(claimCheckOutTransformer, endpointConfigurer);
}

/**
Expand Down Expand Up @@ -1861,6 +1894,7 @@ public <T> B route(String expression, Consumer<RouterSpec<T, ExpressionEvaluatin
* .route(p -> p.equals("foo") || p.equals("bar") ? new String[] {"foo", "bar"} : null)
* }
* </pre>
* Use {@link #route(Class, Function)} if you need to access the entire message.
* @param router the {@link Function} to use.
* @param <S> the source payload type.
* @param <T> the target result type.
Expand All @@ -1879,9 +1913,11 @@ public <S, T> B route(Function<S, T> router) {
* .route(Integer.class, p -> p % 2 == 0)
* }
* </pre>
* @param payloadType the expected payload type.
* @param payloadType the {@link Class} for expected payload type. It can also be
* {@code Message.class} if you wish to access the entire message in the splitter.
* Conversion to this type will be attempted, if necessary.
* @param router the {@link Function} to use.
* @param <S> the source payload type.
* @param <S> the source payload type or {@code Message.class}.
* @param <T> the target result type.
* @return the current {@link IntegrationFlowDefinition}.
* @see LambdaMessageProcessor
Expand All @@ -1904,6 +1940,7 @@ public <S, T> B route(Class<S> payloadType, Function<S, T> router) {
* .applySequence(false))
* }
* </pre>
* Use {@link #route(Class, Function, Consumer)} if you need to access the entire message.
* @param router the {@link Function} to use.
* @param routerConfigurer the {@link Consumer} to provide {@link MethodInvokingRouter} options.
* @param <S> the source payload type.
Expand All @@ -1928,16 +1965,19 @@ public <S, T> B route(Function<S, T> router, Consumer<RouterSpec<T, MethodInvoki
* .applySequence(false))
* }
* </pre>
* @param payloadType the expected payload type.
* @param payloadType the {@link Class} for expected payload type. It can also be
* {@code Message.class} if you wish to access the entire message in the splitter.
* Conversion to this type will be attempted, if necessary.
* @param router the {@link Function} to use.
* @param routerConfigurer the {@link Consumer} to provide {@link MethodInvokingRouter} options.
* @param <P> the source payload type.
* @param <P> the source payload type or {@code Message.class}.
* @param <T> the target result type.
* @return the current {@link IntegrationFlowDefinition}.
* @see LambdaMessageProcessor
*/
public <P, T> B route(Class<P> payloadType, Function<P, T> router,
Consumer<RouterSpec<T, MethodInvokingRouter>> routerConfigurer) {

MethodInvokingRouter methodInvokingRouter = isLambda(router)
? new MethodInvokingRouter(new LambdaMessageProcessor(router, payloadType))
: new MethodInvokingRouter(router, ClassUtils.FUNCTION_APPLY_METHOD);
Expand Down

0 comments on commit ffaa4ee

Please sign in to comment.