diff --git a/spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinIntegrationFlowDefinition.kt b/spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinIntegrationFlowDefinition.kt index b0ad7695f3d..f562e74be45 100644 --- a/spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinIntegrationFlowDefinition.kt +++ b/spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinIntegrationFlowDefinition.kt @@ -50,6 +50,7 @@ import org.springframework.integration.transformer.ClaimCheckOutTransformer import org.springframework.integration.transformer.HeaderFilter import org.springframework.integration.transformer.MessageTransformingHandler import org.springframework.integration.transformer.MethodInvokingTransformer +import org.springframework.integration.transformer.Transformer import org.springframework.messaging.Message import org.springframework.messaging.MessageChannel import org.springframework.messaging.MessageHandler @@ -287,8 +288,20 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ this.delegate.controlBus(endpointConfigurer) } + + /** + * Populate the [Transformer] EI Pattern specific [MessageHandler] implementation + * for the provided `Transformer` instance. + * @since 5.3.1 + */ + fun transform(transformer: Transformer, + endpointConfigurer: GenericEndpointSpec.() -> Unit = {}) { + + this.delegate.transform(transformer, Consumer { endpointConfigurer(it) }) + } + /** - * Populate the `Transformer` EI Pattern specific [MessageHandler] implementation + * Populate the [Transformer] EI Pattern specific [MessageHandler] implementation * for the SpEL [Expression]. */ fun transform(expression: String, @@ -298,7 +311,7 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ } /** - * Populate the `MessageTransformingHandler` for the [MethodInvokingTransformer] + * Populate the [MessageTransformingHandler] for the [MethodInvokingTransformer] * to invoke the service method at runtime. */ fun transform(service: Any, methodName: String? = null) { @@ -306,7 +319,7 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ } /** - * Populate the `MessageTransformingHandler` for the [MethodInvokingTransformer] + * Populate the [MessageTransformingHandler] for the [MethodInvokingTransformer] * to invoke the service method at runtime. */ fun transform(service: Any, methodName: String?, @@ -358,9 +371,23 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ */ fun filter(messageProcessorSpec: MessageProcessorSpec<*>, filterConfigurer: KotlinFilterEndpointSpec.() -> Unit = {}) { + this.delegate.filter(messageProcessorSpec) { filterConfigurer(KotlinFilterEndpointSpec(it)) } } + + /** + * Populate a [MessageFilter] with the provided [MessageSelector]. + * In addition accept options for the integration endpoint using [KotlinFilterEndpointSpec]. + * @since 5.3.1 + */ + fun filter(messageSelector: MessageSelector, + filterConfigurer: KotlinFilterEndpointSpec.() -> Unit = {}) { + + this.delegate.filter(Message::class.java, messageSelector, + Consumer { filterConfigurer(KotlinFilterEndpointSpec(it)) }) + } + /** * Populate a [ServiceActivatingHandler] for the selected protocol specific * [MessageHandler] implementation from `Namespace Factory`: diff --git a/spring-integration-core/src/test/kotlin/org/springframework/integration/dsl/KotlinDslTests.kt b/spring-integration-core/src/test/kotlin/org/springframework/integration/dsl/KotlinDslTests.kt index a58c50f5d48..c68f78b3237 100644 --- a/spring-integration-core/src/test/kotlin/org/springframework/integration/dsl/KotlinDslTests.kt +++ b/spring-integration-core/src/test/kotlin/org/springframework/integration/dsl/KotlinDslTests.kt @@ -21,6 +21,7 @@ import assertk.assertions.isEqualTo import assertk.assertions.isGreaterThanOrEqualTo import assertk.assertions.isInstanceOf import assertk.assertions.isNotNull +import assertk.assertions.isTrue import assertk.assertions.size import org.junit.jupiter.api.Test import org.springframework.beans.factory.BeanFactory @@ -37,6 +38,7 @@ import org.springframework.integration.dsl.context.IntegrationFlowContext import org.springframework.integration.endpoint.MessageProcessorMessageSource import org.springframework.integration.handler.LoggingHandler import org.springframework.integration.scheduling.PollerMetadata +import org.springframework.integration.selector.UnexpiredMessageSelector import org.springframework.integration.support.MessageBuilder import org.springframework.integration.test.util.OnlyOnceTrigger import org.springframework.messaging.Message @@ -88,11 +90,12 @@ class KotlinDslTests { @Autowired @Qualifier("functionGateway") - private lateinit var upperCaseFunction: Function + private lateinit var upperCaseFunction: Function @Test fun `uppercase function`() { - assertThat(this.upperCaseFunction.apply("test")).isEqualTo("TEST") + assertThat(beanFactory.containsBean("objectToStringTransformer")).isTrue() + assertThat(this.upperCaseFunction.apply("test".toByteArray())).isEqualTo("TEST") } @Autowired @@ -225,7 +228,8 @@ class KotlinDslTests { @Bean fun functionFlow() = - integrationFlow>({ beanName("functionGateway") }) { + integrationFlow>({ beanName("functionGateway") }) { + transform(Transformers.objectToString()) { id("objectToStringTransformer") } transform { it.toUpperCase() } split> { it.payload } split({ it }) { id("splitterEndpoint") } @@ -240,6 +244,7 @@ class KotlinDslTests { fun functionFlow2() = integrationFlow> { transform { it.toLowerCase() } + filter(UnexpiredMessageSelector()) route, Any?>({ null }) { defaultOutputToParentFlow() } route> { m -> m.headers.replyChannel } }