Skip to content

Commit

Permalink
Fix Kotlin DSL delegation (#8658)
Browse files Browse the repository at this point in the history
The `ConsumerEndpointSpec` extensions for Kotlin
don't delegate to the provided `endpointFactoryBean`

* Introduce `KotlinConsumerEndpointSpec` extension for `ConsumerEndpointSpec`
with the proper delegation to the provided spec
* Use `KotlinConsumerEndpointSpec` in the Kotlin-specific `Spec` classes

**Cherry-pick to `6.1.x`, `6.0.x` & `5.5.x`**
  • Loading branch information
artembilan committed Jun 26, 2023
1 parent af95829 commit 65c7e5d
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 11 deletions.
Expand Up @@ -28,9 +28,8 @@ import org.springframework.messaging.MessageChannel
*
* @since 5.3
*/
abstract class AbstractKotlinRouterSpec<S : AbstractRouterSpec<S, R>, R : AbstractMessageRouter>(
open val delegate: AbstractRouterSpec<S, R>)
: ConsumerEndpointSpec<S, R>(delegate.handler) {
abstract class AbstractKotlinRouterSpec<S : AbstractRouterSpec<S, R>, R : AbstractMessageRouter>(override val delegate: S)
: KotlinConsumerEndpointSpec<S, R>(delegate) {

fun ignoreSendFailures(ignoreSendFailures: Boolean) {
this.delegate.ignoreSendFailures(ignoreSendFailures)
Expand Down
@@ -0,0 +1,115 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.dsl

import org.aopalliance.aop.Advice
import org.aopalliance.intercept.MethodInterceptor
import org.reactivestreams.Publisher
import org.springframework.integration.scheduling.PollerMetadata
import org.springframework.messaging.Message
import org.springframework.messaging.MessageHandler
import org.springframework.scheduling.TaskScheduler
import org.springframework.transaction.TransactionManager
import org.springframework.transaction.interceptor.TransactionInterceptor
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono

/**
* A [ConsumerEndpointSpec] wrapped for Kotlin DSL.
*
* @property delegate the [ConsumerEndpointSpec] this instance is delegating to.
*
* @author Artem Bilan
*
* @since 5.5.19
*/
abstract class KotlinConsumerEndpointSpec<S : ConsumerEndpointSpec<S, H>, H : MessageHandler>(open val delegate: S)
: ConsumerEndpointSpec<S, H>(delegate.handler) {

override fun phase(phase: Int): S {
return this.delegate.phase(phase)
}

override fun autoStartup(autoStartup: Boolean): S {
return this.delegate.autoStartup(autoStartup)
}

override fun poller(pollerMetadata: PollerMetadata): S {
return this.delegate.poller(pollerMetadata)
}

override fun reactive(): S {
return this.delegate.reactive()
}

fun reactive(reactiveCustomizer: (Flux<Message<*>>) -> Publisher<Message<*>>) {
this.delegate.reactive(reactiveCustomizer)
}

override fun role(role: String): S {
return this.delegate.role(role)
}

override fun taskScheduler(taskScheduler: TaskScheduler): S {
return this.delegate.taskScheduler(taskScheduler)
}

override fun handleMessageAdvice(vararg interceptors: MethodInterceptor?): S {
return this.delegate.handleMessageAdvice(*interceptors)
}

override fun advice(vararg advice: Advice?): S {
return this.delegate.advice(*advice)
}

override fun transactional(transactionManager: TransactionManager): S {
return this.delegate.transactional(transactionManager)
}

override fun transactional(transactionManager: TransactionManager, handleMessageAdvice: Boolean): S {
return this.delegate.transactional(transactionManager, handleMessageAdvice)
}

override fun transactional(transactionInterceptor: TransactionInterceptor): S {
return this.delegate.transactional(transactionInterceptor)
}

override fun transactional(): S {
return this.delegate.transactional()
}

override fun transactional(handleMessageAdvice: Boolean): S {
return this.delegate.transactional(handleMessageAdvice)
}

fun <T : Any?, V : Any?> customizeMonoReply(replyCustomizer: (Message<*>, Mono<T>) -> Publisher<V>) {
this.delegate.customizeMonoReply(replyCustomizer)
}

override fun id(id: String?): S {
return this.delegate.id(id)
}

override fun poller(pollerMetadataSpec: PollerSpec): S {
return this.delegate.poller(pollerMetadataSpec)
}

fun poller(pollers: (PollerFactory) -> PollerSpec) {
this.delegate.poller(pollers)
}

}
Expand Up @@ -30,8 +30,8 @@ import org.springframework.messaging.MessageChannel
*
* @since 5.3
*/
class KotlinEnricherSpec(val delegate: EnricherSpec)
: ConsumerEndpointSpec<EnricherSpec, ContentEnricher>(delegate.handler) {
class KotlinEnricherSpec(override val delegate: EnricherSpec)
: KotlinConsumerEndpointSpec<EnricherSpec, ContentEnricher>(delegate) {

fun requestChannel(requestChannel: MessageChannel) {
this.delegate.requestChannel(requestChannel)
Expand Down
Expand Up @@ -28,8 +28,8 @@ import org.springframework.messaging.MessageChannel
*
* @since 5.3
*/
class KotlinFilterEndpointSpec(val delegate: FilterEndpointSpec)
: ConsumerEndpointSpec<FilterEndpointSpec, MessageFilter>(delegate.handler) {
class KotlinFilterEndpointSpec(override val delegate: FilterEndpointSpec)
: KotlinConsumerEndpointSpec<FilterEndpointSpec, MessageFilter>(delegate) {

fun throwExceptionOnRejection(throwExceptionOnRejection: Boolean) {
this.delegate.throwExceptionOnRejection(throwExceptionOnRejection)
Expand Down
Expand Up @@ -28,8 +28,8 @@ import org.springframework.messaging.MessageChannel
*
* @since 5.3
*/
class KotlinSplitterEndpointSpec<H : AbstractMessageSplitter>(val delegate: SplitterEndpointSpec<H>)
: ConsumerEndpointSpec<KotlinSplitterEndpointSpec<H>, H>(delegate.handler) {
class KotlinSplitterEndpointSpec<H : AbstractMessageSplitter>(override val delegate: SplitterEndpointSpec<H>)
: KotlinConsumerEndpointSpec<SplitterEndpointSpec<H>, H>(delegate) {

fun applySequence(applySequence: Boolean) {
this.delegate.applySequence(applySequence)
Expand Down
Expand Up @@ -30,6 +30,7 @@ import org.springframework.integration.channel.QueueChannel
import org.springframework.integration.config.EnableIntegration
import org.springframework.integration.core.MessagingTemplate
import org.springframework.integration.dsl.context.IntegrationFlowContext
import org.springframework.integration.endpoint.AbstractEndpoint
import org.springframework.integration.endpoint.MessageProcessorMessageSource
import org.springframework.integration.handler.LoggingHandler
import org.springframework.integration.scheduling.PollerMetadata
Expand Down Expand Up @@ -69,7 +70,7 @@ class KotlinDslTests {

@Test
fun `convert extension`() {
assertThat(this.beanFactory.containsBean("kotlinConverter"))
assertThat(this.beanFactory.containsBean("kotlinConverter")).isTrue()

val replyChannel = QueueChannel()
val date = Date()
Expand All @@ -93,6 +94,8 @@ class KotlinDslTests {
@Test
fun `uppercase function`() {
assertThat(beanFactory.containsBean("objectToStringTransformer")).isTrue()
assertThat(this.beanFactory.containsBean("splitterEndpoint")).isTrue()
assertThat(this.beanFactory.getBean("splitterEndpoint", AbstractEndpoint::class.java).phase).isEqualTo(257)
assertThat(this.upperCaseFunction.apply("test".toByteArray())).isEqualTo("TEST")
}

Expand Down Expand Up @@ -258,7 +261,10 @@ class KotlinDslTests {
}
transform<String> { it.uppercase() }
split<Message<*>> { it.payload }
split<String>({ it }) { id("splitterEndpoint") }
split<String>({ it }) {
id("splitterEndpoint")
phase(257)
}
resequence()
aggregate {
id("aggregator")
Expand Down

0 comments on commit 65c7e5d

Please sign in to comment.