Skip to content

Commit

Permalink
GH-3288: Add Kotlin DSL transform(Transformer)
Browse files Browse the repository at this point in the history
Fixes #3288

* For better end-user experience with Kotlin DSL and get a gain from
existing `Transformer` implementations add a `transform(Transformer)`
EI-method into the `KotlinIntegrationFlowDefinition`
* Also add `filter(MessageSelector)` for any out-of-the-box `MessageSelector`

**Cherry-pick to 5.3.x**
  • Loading branch information
artembilan authored and garyrussell committed May 27, 2020
1 parent 8d1dd0b commit e981691
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 6 deletions.
Expand Up @@ -50,6 +50,7 @@ import org.springframework.integration.transformer.ClaimCheckOutTransformer
import org.springframework.integration.transformer.HeaderFilter
import org.springframework.integration.transformer.MessageTransformingHandler
import org.springframework.integration.transformer.MethodInvokingTransformer
import org.springframework.integration.transformer.Transformer
import org.springframework.messaging.Message
import org.springframework.messaging.MessageChannel
import org.springframework.messaging.MessageHandler
Expand Down Expand Up @@ -287,8 +288,20 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ
this.delegate.controlBus(endpointConfigurer)
}


/**
* Populate the [Transformer] EI Pattern specific [MessageHandler] implementation
* for the provided `Transformer` instance.
* @since 5.3.1
*/
fun transform(transformer: Transformer,
endpointConfigurer: GenericEndpointSpec<MessageTransformingHandler>.() -> Unit = {}) {

this.delegate.transform(transformer, Consumer { endpointConfigurer(it) })
}

/**
* Populate the `Transformer` EI Pattern specific [MessageHandler] implementation
* Populate the [Transformer] EI Pattern specific [MessageHandler] implementation
* for the SpEL [Expression].
*/
fun transform(expression: String,
Expand All @@ -298,15 +311,15 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ
}

/**
* Populate the `MessageTransformingHandler` for the [MethodInvokingTransformer]
* Populate the [MessageTransformingHandler] for the [MethodInvokingTransformer]
* to invoke the service method at runtime.
*/
fun transform(service: Any, methodName: String? = null) {
this.delegate.transform(service, methodName)
}

/**
* Populate the `MessageTransformingHandler` for the [MethodInvokingTransformer]
* Populate the [MessageTransformingHandler] for the [MethodInvokingTransformer]
* to invoke the service method at runtime.
*/
fun transform(service: Any, methodName: String?,
Expand Down Expand Up @@ -358,9 +371,23 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ
*/
fun filter(messageProcessorSpec: MessageProcessorSpec<*>,
filterConfigurer: KotlinFilterEndpointSpec.() -> Unit = {}) {

this.delegate.filter(messageProcessorSpec) { filterConfigurer(KotlinFilterEndpointSpec(it)) }
}


/**
* Populate a [MessageFilter] with the provided [MessageSelector].
* In addition accept options for the integration endpoint using [KotlinFilterEndpointSpec].
* @since 5.3.1
*/
fun filter(messageSelector: MessageSelector,
filterConfigurer: KotlinFilterEndpointSpec.() -> Unit = {}) {

this.delegate.filter(Message::class.java, messageSelector,
Consumer { filterConfigurer(KotlinFilterEndpointSpec(it)) })
}

/**
* Populate a [ServiceActivatingHandler] for the selected protocol specific
* [MessageHandler] implementation from `Namespace Factory`:
Expand Down
Expand Up @@ -21,6 +21,7 @@ import assertk.assertions.isEqualTo
import assertk.assertions.isGreaterThanOrEqualTo
import assertk.assertions.isInstanceOf
import assertk.assertions.isNotNull
import assertk.assertions.isTrue
import assertk.assertions.size
import org.junit.jupiter.api.Test
import org.springframework.beans.factory.BeanFactory
Expand All @@ -37,6 +38,7 @@ import org.springframework.integration.dsl.context.IntegrationFlowContext
import org.springframework.integration.endpoint.MessageProcessorMessageSource
import org.springframework.integration.handler.LoggingHandler
import org.springframework.integration.scheduling.PollerMetadata
import org.springframework.integration.selector.UnexpiredMessageSelector
import org.springframework.integration.support.MessageBuilder
import org.springframework.integration.test.util.OnlyOnceTrigger
import org.springframework.messaging.Message
Expand Down Expand Up @@ -88,11 +90,12 @@ class KotlinDslTests {

@Autowired
@Qualifier("functionGateway")
private lateinit var upperCaseFunction: Function<String, String>
private lateinit var upperCaseFunction: Function<ByteArray, String>

@Test
fun `uppercase function`() {
assertThat(this.upperCaseFunction.apply("test")).isEqualTo("TEST")
assertThat(beanFactory.containsBean("objectToStringTransformer")).isTrue()
assertThat(this.upperCaseFunction.apply("test".toByteArray())).isEqualTo("TEST")
}

@Autowired
Expand Down Expand Up @@ -225,7 +228,8 @@ class KotlinDslTests {

@Bean
fun functionFlow() =
integrationFlow<Function<String, String>>({ beanName("functionGateway") }) {
integrationFlow<Function<ByteArray, String>>({ beanName("functionGateway") }) {
transform(Transformers.objectToString()) { id("objectToStringTransformer") }
transform<String> { it.toUpperCase() }
split<Message<*>> { it.payload }
split<String>({ it }) { id("splitterEndpoint") }
Expand All @@ -240,6 +244,7 @@ class KotlinDslTests {
fun functionFlow2() =
integrationFlow<Function<*, *>> {
transform<String> { it.toLowerCase() }
filter(UnexpiredMessageSelector())
route<Message<*>, Any?>({ null }) { defaultOutputToParentFlow() }
route<Message<*>> { m -> m.headers.replyChannel }
}
Expand Down

0 comments on commit e981691

Please sign in to comment.