Skip to content
This repository has been archived by the owner on Mar 30, 2023. It is now read-only.

Commit

Permalink
Revert "Apply SI-Kotlin-DSL in tests"
Browse files Browse the repository at this point in the history
This reverts commit 0ceb761.
  • Loading branch information
garyrussell committed Oct 30, 2019
1 parent 8d8b54b commit c47edbf
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 59 deletions.
7 changes: 2 additions & 5 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,8 @@ ext {
jacksonVersion = '2.10.0'
junitJupiterVersion = '5.5.2'
log4jVersion = '2.12.1'
springIntegrationVersion = '5.2.1.BUILD-SNAPSHOT'
springIntegrationKotlinVersion = '0.0.2.BUILD-SNAPSHOT'
springKafkaVersion = '2.3.1.BUILD-SNAPSHOT'
springIntegrationVersion = '5.2.0.RELEASE'
springKafkaVersion = '2.3.0.RELEASE'

idPrefix = 'kafka'

Expand Down Expand Up @@ -103,13 +102,11 @@ dependencies {
compile "org.springframework.kafka:spring-kafka:$springKafkaVersion"

testCompile "org.springframework.kafka:spring-kafka-test:$springKafkaVersion"
testCompile "org.springframework.integration:spring-integration-kotlin-dsl:$springIntegrationKotlinVersion"
testCompile 'org.springframework.integration:spring-integration-test'
testCompile "com.willowtreeapps.assertk:assertk-jvm:$assertkVersion"
testCompile 'org.jetbrains.kotlin:kotlin-reflect'
testCompile 'org.jetbrains.kotlin:kotlin-stdlib-jdk8'
testCompile 'org.junit.jupiter:junit-jupiter-api'

testRuntime 'org.junit.jupiter:junit-jupiter-engine'
testRuntime 'org.junit.platform:junit-platform-launcher'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,8 @@ import org.springframework.integration.MessageRejectedException
import org.springframework.integration.channel.QueueChannel
import org.springframework.integration.config.EnableIntegration
import org.springframework.integration.dsl.IntegrationFlow
import org.springframework.integration.dsl.IntegrationFlows
import org.springframework.integration.dsl.Pollers
import org.springframework.integration.dsl.kotlin.filterReified
import org.springframework.integration.dsl.kotlin.integrationFlow
import org.springframework.integration.dsl.kotlin.split
import org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer
import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter
import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler
Expand Down Expand Up @@ -89,7 +87,7 @@ import java.util.stream.Stream
@SpringJUnitConfig
@DirtiesContext
@EmbeddedKafka(topics = [KafkaDslKotlinTests.TEST_TOPIC1, KafkaDslKotlinTests.TEST_TOPIC2,
KafkaDslKotlinTests.TEST_TOPIC3, KafkaDslKotlinTests.TEST_TOPIC4, KafkaDslKotlinTests.TEST_TOPIC5])
KafkaDslKotlinTests.TEST_TOPIC3, KafkaDslKotlinTests.TEST_TOPIC4, KafkaDslKotlinTests.TEST_TOPIC5])
class KafkaDslKotlinTests {

companion object {
Expand Down Expand Up @@ -161,8 +159,8 @@ class KafkaDslKotlinTests {
assertThat(receive!!.payload).isEqualTo("FOO")
val headers = receive.headers
assertThat(headers.containsKey(KafkaHeaders.ACKNOWLEDGMENT)).isTrue()
val acknowledgment = headers[KafkaHeaders.ACKNOWLEDGMENT] as Acknowledgment
acknowledgment.acknowledge()
val acknowledgment = headers.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment::class.java)
acknowledgment?.acknowledge()
assertThat(headers[KafkaHeaders.RECEIVED_TOPIC]).isEqualTo(TEST_TOPIC1)
assertThat(headers[KafkaHeaders.RECEIVED_MESSAGE_KEY]).isEqualTo(i + 1)
assertThat(headers[KafkaHeaders.RECEIVED_PARTITION_ID]).isEqualTo(0)
Expand All @@ -178,8 +176,8 @@ class KafkaDslKotlinTests {
assertThat(receive!!.payload).isEqualTo("FOO")
val headers = receive.headers
assertThat(headers.containsKey(KafkaHeaders.ACKNOWLEDGMENT)).isTrue()
val acknowledgment = headers[KafkaHeaders.ACKNOWLEDGMENT] as Acknowledgment
acknowledgment.acknowledge()
val acknowledgment = headers.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment::class.java)
acknowledgment?.acknowledge()
assertThat(headers[KafkaHeaders.RECEIVED_TOPIC]).isEqualTo(TEST_TOPIC2)
assertThat(headers[KafkaHeaders.RECEIVED_MESSAGE_KEY]).isEqualTo(i + 1)
assertThat(headers[KafkaHeaders.RECEIVED_PARTITION_ID]).isEqualTo(0)
Expand Down Expand Up @@ -234,7 +232,7 @@ class KafkaDslKotlinTests {
@Bean
fun consumerFactory(): ConsumerFactory<Int, String> {
val props = KafkaTestUtils.consumerProps("test1", "false", this.embeddedKafka)
props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
return DefaultKafkaConsumerFactory(props)
}

Expand All @@ -243,7 +241,7 @@ class KafkaDslKotlinTests {

@Bean
fun topic1ListenerFromKafkaFlow() =
integrationFlow(
IntegrationFlows.from(
Kafka.messageDrivenChannelAdapter(consumerFactory(),
KafkaMessageDrivenChannelAdapter.ListenerMode.record, TEST_TOPIC1)
.configureListenerContainer {
Expand All @@ -253,52 +251,50 @@ class KafkaDslKotlinTests {
.recoveryCallback(ErrorMessageSendingRecoverer(errorChannel(),
RawRecordHeaderErrorMessageStrategy()))
.retryTemplate(RetryTemplate())
.filterInRetry(true)) {
it.filterReified<Message<*>>(
{ m -> (m.headers[KafkaHeaders.RECEIVED_MESSAGE_KEY] as Int) < 101 },
{ f -> f.throwExceptionOnRejection(true) })
.transform<String, String> { it.toUpperCase() }
.channel { c -> c.queue("listeningFromKafkaResults1") }
}
.filterInRetry(true))
.filter(Message::class.java, { m -> m.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY, Integer::class.java)!! < 101 },
{ f -> f.throwExceptionOnRejection(true) })
.transform<String, String> { it.toUpperCase() }
.channel { c -> c.queue("listeningFromKafkaResults1") }
.get()

@Bean
fun topic2ListenerFromKafkaFlow() =
integrationFlow(
IntegrationFlows.from(
Kafka.messageDrivenChannelAdapter(consumerFactory(),
KafkaMessageDrivenChannelAdapter.ListenerMode.record, TEST_TOPIC2)
.configureListenerContainer { it.ackMode(ContainerProperties.AckMode.MANUAL) }
.recoveryCallback(ErrorMessageSendingRecoverer(errorChannel(),
RawRecordHeaderErrorMessageStrategy()))
.retryTemplate(RetryTemplate())
.filterInRetry(true)) {
it.filterReified<Message<*>>(
{ m -> (m.headers[KafkaHeaders.RECEIVED_MESSAGE_KEY] as Int) < 101 },
{ it.throwExceptionOnRejection(true) })
.transform<String, String> { it.toUpperCase() }
.channel { c -> c.queue("listeningFromKafkaResults2") }
}
.filterInRetry(true))
.filter(Message::class.java,
{ m -> m.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY, Integer::class.java)!! < 101 },
{ it.throwExceptionOnRejection(true) })
.transform<String, String> { it.toUpperCase() }
.channel { c -> c.queue("listeningFromKafkaResults2") }
.get()

@Bean
fun producerFactory(): DefaultKafkaProducerFactory<Int, String> {
val props = KafkaTestUtils.producerProps(this.embeddedKafka)
props[ProducerConfig.MAX_BLOCK_MS_CONFIG] = "10000"
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
return DefaultKafkaProducerFactory(props)
}

@Bean
fun sendToKafkaFlow() =
IntegrationFlow {
it.split<String>({ p -> Stream.generate { p }.limit(101) })
.publishSubscribeChannel {
it
.subscribe {
it.handle(
kafkaMessageHandler(producerFactory(), TEST_TOPIC1)
.timestampExpression("T(Long).valueOf('1487694048633')")
) { it.id("kafkaProducer1") }
}
.subscribe {
it.handle(
IntegrationFlow { f ->
f.split<String>({ p -> Stream.generate { p }.limit(101) }, null)
.publishSubscribeChannel { c ->
c.subscribe { sf ->
sf.handle(
kafkaMessageHandler(producerFactory(), TEST_TOPIC1)
.timestampExpression("T(Long).valueOf('1487694048633')")
) { it.id("kafkaProducer1") }
}
.subscribe { sf ->
sf.handle(
kafkaMessageHandler(producerFactory(), TEST_TOPIC2)
.timestamp<Any> { 1487694048644L }
) { it.id("kafkaProducer2") }
Expand All @@ -314,20 +310,21 @@ class KafkaDslKotlinTests {
.messageKey<Any> { m -> m.headers[IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER] }
.headerMapper(mapper())
.sync(true)
.partitionId<Any> { 0 }
.partitionId<Any> { _ -> 0 }
.topicExpression("headers[kafka_topic] ?: '$topic'")
.configureKafkaTemplate { it.id("kafkaTemplate:$topic") }
.configureKafkaTemplate { t -> t.id("kafkaTemplate:$topic") }


@Bean
fun sourceFlow() =
integrationFlow(Kafka.inboundChannelAdapter(consumerFactory(), ConsumerProperties(TEST_TOPIC3)),
{ e -> e.poller(Pollers.fixedDelay(100)) }) {
it.handle { m ->
this.fromSource = m.payload
this.sourceFlowLatch.countDown()
}
}
IntegrationFlows
.from(Kafka.inboundChannelAdapter(consumerFactory(), ConsumerProperties(TEST_TOPIC3)))
{ e -> e.poller(Pollers.fixedDelay(100)) }
.handle { p ->
this.fromSource = p.getPayload()
this.sourceFlowLatch.countDown()
}
.get()

@Bean
fun replyingKafkaTemplate() =
Expand All @@ -338,10 +335,10 @@ class KafkaDslKotlinTests {

@Bean
fun outboundGateFlow() =
integrationFlow<Gate> {
it.handle(Kafka.outboundGateway(replyingKafkaTemplate())
.sync(true))
}
IntegrationFlows.from(Gate::class.java)
.handle(Kafka.outboundGateway(replyingKafkaTemplate())
.sync(true))
.get()

private fun replyContainer(): GenericMessageListenerContainer<Int, String> {
val containerProperties = ContainerProperties(TEST_TOPIC5)
Expand All @@ -362,9 +359,10 @@ class KafkaDslKotlinTests {

@Bean
fun serverGateway() =
integrationFlow(Kafka.inboundGateway(consumerFactory(), containerProperties(), producerFactory())) {
it.transform<String, String> { it.toUpperCase() }
}
IntegrationFlows.from(
Kafka.inboundGateway(consumerFactory(), containerProperties(), producerFactory()))
.transform<String, String> { it.toUpperCase() }
.get()

private fun containerProperties() =
ContainerProperties(TEST_TOPIC4)
Expand Down

0 comments on commit c47edbf

Please sign in to comment.