From 6f6f32320d79b255159c2d9510fe7badf7aba8ea Mon Sep 17 00:00:00 2001 From: onobc Date: Sun, 19 Oct 2025 20:50:27 -0500 Subject: [PATCH 1/3] Remove Spring Pulsar Reactive support This removes the auto-configuration for Spring Pulsar Reactive. Signed-off-by: onobc --- .../antora/AntoraAsciidocAttributes.java | 1 - .../antora-asciidoc-attributes.properties | 3 - .../antora/AntoraAsciidocAttributesTests.java | 1 - documentation/spring-boot-docs/build.gradle | 1 - .../antora/modules/ROOT/pages/redirect.adoc | 4 - .../reference/pages/messaging/pulsar.adoc | 71 +-- .../pulsar/readingreactive/MyBean.java | 51 -- .../pulsar/receivingreactive/MyBean.java | 33 -- .../pulsar/sendingreactive/MyBean.java | 35 -- .../pulsar/readingreactive/MyBean.kt | 45 -- .../pulsar/receivingreactive/MyBean.kt | 33 -- .../pulsar/sendingreactive/MyBean.kt | 29 - module/spring-boot-pulsar/build.gradle | 1 - ...lsarAutoConfigurationIntegrationTests.java | 3 +- .../autoconfigure/PulsarConfiguration.java | 4 +- .../PulsarReactiveAutoConfiguration.java | 218 ------- .../PulsarReactivePropertiesMapper.java | 111 ---- ...ot.autoconfigure.AutoConfiguration.imports | 1 - .../PulsarAutoConfigurationTests.java | 17 - ...ulsarContainerFactoryCustomizersTests.java | 15 +- .../PulsarReactiveAutoConfigurationTests.java | 555 ------------------ .../PulsarReactivePropertiesMapperTests.java | 151 ----- .../spring-boot-dependencies/build.gradle | 11 - settings.gradle | 2 - .../build.gradle | 1 - .../pulsar/SamplePulsarApplicationTests.java | 11 - .../smoketest/pulsar/ReactiveAppConfig.java | 64 -- .../build.gradle | 32 - .../build.gradle | 35 -- 29 files changed, 9 insertions(+), 1530 deletions(-) delete mode 100644 documentation/spring-boot-docs/src/main/java/org/springframework/boot/docs/messaging/pulsar/readingreactive/MyBean.java delete mode 100644 documentation/spring-boot-docs/src/main/java/org/springframework/boot/docs/messaging/pulsar/receivingreactive/MyBean.java delete mode 100644 documentation/spring-boot-docs/src/main/java/org/springframework/boot/docs/messaging/pulsar/sendingreactive/MyBean.java delete mode 100644 documentation/spring-boot-docs/src/main/kotlin/org/springframework/boot/docs/messaging/pulsar/readingreactive/MyBean.kt delete mode 100644 documentation/spring-boot-docs/src/main/kotlin/org/springframework/boot/docs/messaging/pulsar/receivingreactive/MyBean.kt delete mode 100644 documentation/spring-boot-docs/src/main/kotlin/org/springframework/boot/docs/messaging/pulsar/sendingreactive/MyBean.kt delete mode 100644 module/spring-boot-pulsar/src/main/java/org/springframework/boot/pulsar/autoconfigure/PulsarReactiveAutoConfiguration.java delete mode 100644 module/spring-boot-pulsar/src/main/java/org/springframework/boot/pulsar/autoconfigure/PulsarReactivePropertiesMapper.java delete mode 100644 module/spring-boot-pulsar/src/test/java/org/springframework/boot/pulsar/autoconfigure/PulsarReactiveAutoConfigurationTests.java delete mode 100644 module/spring-boot-pulsar/src/test/java/org/springframework/boot/pulsar/autoconfigure/PulsarReactivePropertiesMapperTests.java delete mode 100644 smoke-test/spring-boot-smoke-test-pulsar/src/main/java/smoketest/pulsar/ReactiveAppConfig.java delete mode 100644 starter/spring-boot-starter-pulsar-reactive-test/build.gradle delete mode 100644 starter/spring-boot-starter-pulsar-reactive/build.gradle diff --git a/buildSrc/src/main/java/org/springframework/boot/build/antora/AntoraAsciidocAttributes.java b/buildSrc/src/main/java/org/springframework/boot/build/antora/AntoraAsciidocAttributes.java index c0f6a06d644a..39ee2273c9d6 100644 --- a/buildSrc/src/main/java/org/springframework/boot/build/antora/AntoraAsciidocAttributes.java +++ b/buildSrc/src/main/java/org/springframework/boot/build/antora/AntoraAsciidocAttributes.java @@ -178,7 +178,6 @@ private void addVersionAttributes(Map attributes, Map mockDependencyVersions(String version) { addMockJacksonCoreVersion(versions, "jackson-databind", version); addMockJacksonCoreVersion(versions, "jackson-databind", version); versions.put("org.apache.pulsar:pulsar-client-api", version); - versions.put("org.apache.pulsar:pulsar-client-reactive-api", version); versions.put("tools.jackson.dataformat:jackson-dataformat-xml", version); return versions; } diff --git a/documentation/spring-boot-docs/build.gradle b/documentation/spring-boot-docs/build.gradle index 9dde07734023..4ecbf36f0f60 100644 --- a/documentation/spring-boot-docs/build.gradle +++ b/documentation/spring-boot-docs/build.gradle @@ -204,7 +204,6 @@ dependencies { implementation("org.springframework.kafka:spring-kafka") implementation("org.springframework.kafka:spring-kafka-test") implementation("org.springframework.pulsar:spring-pulsar") - implementation("org.springframework.pulsar:spring-pulsar-reactive") implementation("org.springframework.restdocs:spring-restdocs-mockmvc") implementation("org.springframework.restdocs:spring-restdocs-webtestclient") implementation("org.springframework.security:spring-security-config") diff --git a/documentation/spring-boot-docs/src/docs/antora/modules/ROOT/pages/redirect.adoc b/documentation/spring-boot-docs/src/docs/antora/modules/ROOT/pages/redirect.adoc index 1e6f5e5cb645..9e1e5c724e89 100644 --- a/documentation/spring-boot-docs/src/docs/antora/modules/ROOT/pages/redirect.adoc +++ b/documentation/spring-boot-docs/src/docs/antora/modules/ROOT/pages/redirect.adoc @@ -1781,15 +1781,11 @@ * xref:reference:messaging/pulsar.adoc#messaging.pulsar.additional-properties[#messaging.pulsar.additional-properties] * xref:reference:messaging/pulsar.adoc#messaging.pulsar.admin.auth[#messaging.pulsar.admin.auth] * xref:reference:messaging/pulsar.adoc#messaging.pulsar.admin[#messaging.pulsar.admin] -* xref:reference:messaging/pulsar.adoc#messaging.pulsar.connecting-reactive[#messaging.pulsar.connecting-reactive] * xref:reference:messaging/pulsar.adoc#messaging.pulsar.connecting.auth[#messaging.pulsar.connecting.auth] * xref:reference:messaging/pulsar.adoc#messaging.pulsar.connecting.ssl[#messaging.pulsar.connecting.ssl] * xref:reference:messaging/pulsar.adoc#messaging.pulsar.connecting[#messaging.pulsar.connecting] -* xref:reference:messaging/pulsar.adoc#messaging.pulsar.reading-reactive[#messaging.pulsar.reading-reactive] * xref:reference:messaging/pulsar.adoc#messaging.pulsar.reading[#messaging.pulsar.reading] -* xref:reference:messaging/pulsar.adoc#messaging.pulsar.receiving-reactive[#messaging.pulsar.receiving-reactive] * xref:reference:messaging/pulsar.adoc#messaging.pulsar.receiving[#messaging.pulsar.receiving] -* xref:reference:messaging/pulsar.adoc#messaging.pulsar.sending-reactive[#messaging.pulsar.sending-reactive] * xref:reference:messaging/pulsar.adoc#messaging.pulsar.sending[#messaging.pulsar.sending] * xref:reference:messaging/pulsar.adoc#messaging.pulsar[#messaging.pulsar] * xref:reference:messaging/rsocket.adoc#messaging.rsocket.messaging[#boot-features-rsocket-messaging] diff --git a/documentation/spring-boot-docs/src/docs/antora/modules/reference/pages/messaging/pulsar.adoc b/documentation/spring-boot-docs/src/docs/antora/modules/reference/pages/messaging/pulsar.adoc index cdbe4e50a506..ad5c918fe11a 100644 --- a/documentation/spring-boot-docs/src/docs/antora/modules/reference/pages/messaging/pulsar.adoc +++ b/documentation/spring-boot-docs/src/docs/antora/modules/reference/pages/messaging/pulsar.adoc @@ -3,11 +3,9 @@ https://pulsar.apache.org/[Apache Pulsar] is supported by providing auto-configuration of the {url-spring-pulsar-site}[Spring for Apache Pulsar] project. -Spring Boot will auto-configure and register the classic (imperative) Spring for Apache Pulsar components when `org.springframework.pulsar:spring-pulsar` is on the classpath. -It will do the same for the reactive components when `org.springframework.pulsar:spring-pulsar-reactive` is on the classpath. - -There are `spring-boot-starter-pulsar` and `spring-boot-starter-pulsar-reactive` starters for conveniently collecting the dependencies for imperative and reactive use, respectively. +Spring Boot will auto-configure and register the Spring for Apache Pulsar components when `org.springframework.pulsar:spring-pulsar` is on the classpath. +There is the `spring-boot-starter-pulsar` starter for conveniently collecting the dependencies for use. [[messaging.pulsar.connecting]] @@ -68,15 +66,6 @@ You can follow {url-spring-pulsar-docs}/reference/pulsar/pulsar-client.html#tls- For complete details on the client and authentication see the Spring for Apache Pulsar {url-spring-pulsar-docs}/reference/pulsar/pulsar-client.html[reference documentation]. -[[messaging.pulsar.connecting-reactive]] -== Connecting to Pulsar Reactively - -When the Reactive auto-configuration is activated, Spring Boot will auto-configure and register a javadoc:org.apache.pulsar.reactive.client.api.ReactivePulsarClient[] bean. - -The javadoc:org.apache.pulsar.reactive.client.api.ReactivePulsarClient[] adapts an instance of the previously described javadoc:org.apache.pulsar.client.api.PulsarClient[]. -Therefore, follow the previous section to configure the javadoc:org.apache.pulsar.client.api.PulsarClient[] used by the javadoc:org.apache.pulsar.reactive.client.api.ReactivePulsarClient[]. - - [[messaging.pulsar.admin]] == Connecting to Pulsar Administration @@ -118,27 +107,6 @@ You can also pass in a javadoc:org.springframework.pulsar.core.ProducerBuilderCu If you need more control over the message being sent, you can pass in a javadoc:org.springframework.pulsar.core.TypedMessageBuilderCustomizer[] when sending a message. - - -[[messaging.pulsar.sending-reactive]] -== Sending a Message Reactively - -When the Reactive auto-configuration is activated, Spring's javadoc:org.springframework.pulsar.reactive.core.ReactivePulsarTemplate[] is auto-configured, and you can use it to send messages, as shown in the following example: - -include-code::MyBean[] - -The javadoc:org.springframework.pulsar.reactive.core.ReactivePulsarTemplate[] relies on a javadoc:org.springframework.pulsar.reactive.core.ReactivePulsarSenderFactory[] to actually create the underlying sender. -Spring Boot auto-configuration also provides this sender factory, which by default, caches the producers that it creates. -You can configure the sender factory and cache settings by specifying any of the `spring.pulsar.producer.\*` and `spring.pulsar.producer.cache.*` prefixed application properties. - -If you need more control over the sender factory configuration, consider registering one or more javadoc:org.springframework.pulsar.reactive.core.ReactiveMessageSenderBuilderCustomizer[] beans. -These customizers are applied to all created senders. -You can also pass in a javadoc:org.springframework.pulsar.reactive.core.ReactiveMessageSenderBuilderCustomizer[] when sending a message to only affect the current sender. - -If you need more control over the message being sent, you can pass in a javadoc:org.springframework.pulsar.reactive.core.MessageSpecBuilderCustomizer[] when sending a message. - - - [[messaging.pulsar.receiving]] == Receiving a Message @@ -156,23 +124,6 @@ You can also customize a single listener by setting the `consumerCustomizer` att If you need more control over the actual container factory configuration, consider registering one or more `PulsarContainerFactoryCustomizer>` beans. -[[messaging.pulsar.receiving-reactive]] -== Receiving a Message Reactively - -When the Apache Pulsar infrastructure is present and the Reactive auto-configuration is activated, any bean can be annotated with javadoc:org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener[format=annotation] to create a reactive listener endpoint. -The following component creates a reactive listener endpoint on the `someTopic` topic: - -include-code::MyBean[] - -Spring Boot auto-configuration provides all the components necessary for javadoc:org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener[], such as the javadoc:org.springframework.pulsar.reactive.config.ReactivePulsarListenerContainerFactory[] and the consumer factory it uses to construct the underlying reactive Pulsar consumers. -You can configure these components by specifying any of the `spring.pulsar.listener.\*` and `spring.pulsar.consumer.*` prefixed application properties. - -If you need more control over the configuration of the consumer factory, consider registering one or more javadoc:org.springframework.pulsar.reactive.core.ReactiveMessageConsumerBuilderCustomizer[] beans. -These customizers are applied to all consumers created by the factory, and therefore all javadoc:org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener[format=annotation] instances. -You can also customize a single listener by setting the `consumerCustomizer` attribute of the javadoc:org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener[format=annotation] annotation. - -If you need more control over the actual container factory configuration, consider registering one or more `PulsarContainerFactoryCustomizer>` beans. - [[messaging.pulsar.reading]] == Reading a Message @@ -194,22 +145,6 @@ You can also customize a single listener by setting the `readerCustomizer` attri If you need more control over the actual container factory configuration, consider registering one or more `PulsarContainerFactoryCustomizer>` beans. -[[messaging.pulsar.reading-reactive]] -== Reading a Message Reactively - -When the Apache Pulsar infrastructure is present and the Reactive auto-configuration is activated, Spring's javadoc:org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory[] is provided, and you can use it to create a reader in order to read messages in a reactive fashion. -The following component creates a reader using the provided factory and reads a single message from 5 minutes ago from the `someTopic` topic: - -include-code::MyBean[] - -Spring Boot auto-configuration provides this reader factory which can be customized by setting any of the `spring.pulsar.reader.*` prefixed application properties. - -If you need more control over the reader factory configuration, consider passing in one or more javadoc:org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer[] instances when using the factory to create a reader. - -If you need more control over the reader factory configuration, consider registering one or more javadoc:org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer[] beans. -These customizers are applied to all created readers. -You can also pass one or more javadoc:org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer[] when creating a reader to only apply the customizations to the created reader. - TIP: For more details on any of the above components and to discover other available features, see the Spring for Apache Pulsar {url-spring-pulsar-docs}[reference documentation]. @@ -219,8 +154,6 @@ TIP: For more details on any of the above components and to discover other avail Spring for Apache Pulsar supports transactions when using javadoc:org.springframework.pulsar.core.PulsarTemplate[] and javadoc:org.springframework.pulsar.annotation.PulsarListener[format=annotation]. -NOTE: Transactions are not currently supported when using the reactive variants. - Setting the configprop:spring.pulsar.transaction.enabled[] property to `true` will: * Configure a javadoc:org.springframework.pulsar.transaction.PulsarTransactionManager[] bean diff --git a/documentation/spring-boot-docs/src/main/java/org/springframework/boot/docs/messaging/pulsar/readingreactive/MyBean.java b/documentation/spring-boot-docs/src/main/java/org/springframework/boot/docs/messaging/pulsar/readingreactive/MyBean.java deleted file mode 100644 index 18c00102ab9c..000000000000 --- a/documentation/spring-boot-docs/src/main/java/org/springframework/boot/docs/messaging/pulsar/readingreactive/MyBean.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright 2012-present the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.boot.docs.messaging.pulsar.readingreactive; - -import java.time.Instant; -import java.util.List; - -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.reactive.client.api.StartAtSpec; -import reactor.core.publisher.Mono; - -import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer; -import org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory; -import org.springframework.stereotype.Component; - -@Component -public class MyBean { - - private final ReactivePulsarReaderFactory pulsarReaderFactory; - - public MyBean(ReactivePulsarReaderFactory pulsarReaderFactory) { - this.pulsarReaderFactory = pulsarReaderFactory; - } - - @SuppressWarnings("unused") - public void someMethod() { - ReactiveMessageReaderBuilderCustomizer readerBuilderCustomizer = (readerBuilder) -> readerBuilder - .topic("someTopic") - .startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5))); - Mono> message = this.pulsarReaderFactory - .createReader(Schema.STRING, List.of(readerBuilderCustomizer)) - .readOne(); - // ... - } - -} diff --git a/documentation/spring-boot-docs/src/main/java/org/springframework/boot/docs/messaging/pulsar/receivingreactive/MyBean.java b/documentation/spring-boot-docs/src/main/java/org/springframework/boot/docs/messaging/pulsar/receivingreactive/MyBean.java deleted file mode 100644 index cffd084f837f..000000000000 --- a/documentation/spring-boot-docs/src/main/java/org/springframework/boot/docs/messaging/pulsar/receivingreactive/MyBean.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright 2012-present the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.boot.docs.messaging.pulsar.receivingreactive; - -import reactor.core.publisher.Mono; - -import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener; -import org.springframework.stereotype.Component; - -@Component -public class MyBean { - - @ReactivePulsarListener(topics = "someTopic") - public Mono processMessage(String content) { - // ... - return Mono.empty(); - } - -} diff --git a/documentation/spring-boot-docs/src/main/java/org/springframework/boot/docs/messaging/pulsar/sendingreactive/MyBean.java b/documentation/spring-boot-docs/src/main/java/org/springframework/boot/docs/messaging/pulsar/sendingreactive/MyBean.java deleted file mode 100644 index 0bf90a6258c6..000000000000 --- a/documentation/spring-boot-docs/src/main/java/org/springframework/boot/docs/messaging/pulsar/sendingreactive/MyBean.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright 2012-present the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.boot.docs.messaging.pulsar.sendingreactive; - -import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate; -import org.springframework.stereotype.Component; - -@Component -public class MyBean { - - private final ReactivePulsarTemplate pulsarTemplate; - - public MyBean(ReactivePulsarTemplate pulsarTemplate) { - this.pulsarTemplate = pulsarTemplate; - } - - public void someMethod() { - this.pulsarTemplate.send("someTopic", "Hello").subscribe(); - } - -} diff --git a/documentation/spring-boot-docs/src/main/kotlin/org/springframework/boot/docs/messaging/pulsar/readingreactive/MyBean.kt b/documentation/spring-boot-docs/src/main/kotlin/org/springframework/boot/docs/messaging/pulsar/readingreactive/MyBean.kt deleted file mode 100644 index c213deb8db8e..000000000000 --- a/documentation/spring-boot-docs/src/main/kotlin/org/springframework/boot/docs/messaging/pulsar/readingreactive/MyBean.kt +++ /dev/null @@ -1,45 +0,0 @@ -/* -* Copyright 2012-present the original author or authors. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* https://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.springframework.boot.docs.messaging.pulsar.readingreactive - -import org.apache.pulsar.client.api.Schema -import org.apache.pulsar.reactive.client.api.ReactiveMessageReaderBuilder -import org.apache.pulsar.reactive.client.api.StartAtSpec -import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer -import org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory -import org.springframework.stereotype.Component -import java.time.Instant - -@Suppress("UNUSED_PARAMETER", "UNUSED_VARIABLE") -@Component -class MyBean(private val pulsarReaderFactory: ReactivePulsarReaderFactory) { - - fun someMethod() { - val readerBuilderCustomizer = ReactiveMessageReaderBuilderCustomizer { - readerBuilder: ReactiveMessageReaderBuilder -> - readerBuilder - .topic("someTopic") - .startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5))) - } - val message = pulsarReaderFactory - .createReader(Schema.STRING, listOf(readerBuilderCustomizer)) - .readOne() - // ... - } - -} - diff --git a/documentation/spring-boot-docs/src/main/kotlin/org/springframework/boot/docs/messaging/pulsar/receivingreactive/MyBean.kt b/documentation/spring-boot-docs/src/main/kotlin/org/springframework/boot/docs/messaging/pulsar/receivingreactive/MyBean.kt deleted file mode 100644 index ec8a120ca7ed..000000000000 --- a/documentation/spring-boot-docs/src/main/kotlin/org/springframework/boot/docs/messaging/pulsar/receivingreactive/MyBean.kt +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright 2012-present the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.boot.docs.messaging.pulsar.receivingreactive - -import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener -import org.springframework.stereotype.Component -import reactor.core.publisher.Mono - -@Component -@Suppress("UNUSED_PARAMETER") -class MyBean { - - @ReactivePulsarListener(topics = ["someTopic"]) - fun processMessage(content: String?): Mono { - // ... - return Mono.empty() - } - -} - diff --git a/documentation/spring-boot-docs/src/main/kotlin/org/springframework/boot/docs/messaging/pulsar/sendingreactive/MyBean.kt b/documentation/spring-boot-docs/src/main/kotlin/org/springframework/boot/docs/messaging/pulsar/sendingreactive/MyBean.kt deleted file mode 100644 index a70024285ea0..000000000000 --- a/documentation/spring-boot-docs/src/main/kotlin/org/springframework/boot/docs/messaging/pulsar/sendingreactive/MyBean.kt +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright 2012-present the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.boot.docs.messaging.pulsar.sendingreactive - -import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate -import org.springframework.stereotype.Component - -@Component -class MyBean(private val pulsarTemplate: ReactivePulsarTemplate) { - - fun someMethod() { - pulsarTemplate.send("someTopic", "Hello").subscribe() - } - -} - diff --git a/module/spring-boot-pulsar/build.gradle b/module/spring-boot-pulsar/build.gradle index b1f9a35b2504..2c1d2699064c 100644 --- a/module/spring-boot-pulsar/build.gradle +++ b/module/spring-boot-pulsar/build.gradle @@ -32,7 +32,6 @@ dependencies { optional(project(":core:spring-boot-autoconfigure")) optional(project(":core:spring-boot-docker-compose")) optional(project(":core:spring-boot-testcontainers")) - optional("org.springframework.pulsar:spring-pulsar-reactive") optional("org.testcontainers:testcontainers-pulsar") dockerTestImplementation(project(":core:spring-boot-test")) diff --git a/module/spring-boot-pulsar/src/dockerTest/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationIntegrationTests.java b/module/spring-boot-pulsar/src/dockerTest/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationIntegrationTests.java index 84e54c4bfe8e..04fddcb59e80 100644 --- a/module/spring-boot-pulsar/src/dockerTest/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationIntegrationTests.java +++ b/module/spring-boot-pulsar/src/dockerTest/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationIntegrationTests.java @@ -27,7 +27,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.ImportAutoConfiguration; import org.springframework.boot.pulsar.autoconfigure.PulsarAutoConfiguration; -import org.springframework.boot.pulsar.autoconfigure.PulsarReactiveAutoConfiguration; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.testsupport.container.TestImage; import org.springframework.context.annotation.Configuration; @@ -76,7 +75,7 @@ void sendAndReceive(@Autowired TestService testService) throws InterruptedExcept } @Configuration(proxyBeanMethods = false) - @ImportAutoConfiguration({ PulsarAutoConfiguration.class, PulsarReactiveAutoConfiguration.class }) + @ImportAutoConfiguration({ PulsarAutoConfiguration.class }) @Import(TestService.class) static class TestConfiguration { diff --git a/module/spring-boot-pulsar/src/main/java/org/springframework/boot/pulsar/autoconfigure/PulsarConfiguration.java b/module/spring-boot-pulsar/src/main/java/org/springframework/boot/pulsar/autoconfigure/PulsarConfiguration.java index 5763b5d17cea..3e10968f9715 100644 --- a/module/spring-boot-pulsar/src/main/java/org/springframework/boot/pulsar/autoconfigure/PulsarConfiguration.java +++ b/module/spring-boot-pulsar/src/main/java/org/springframework/boot/pulsar/autoconfigure/PulsarConfiguration.java @@ -56,9 +56,7 @@ import org.springframework.util.Assert; /** - * Common configuration used by both {@link PulsarAutoConfiguration} and - * {@link PulsarReactiveAutoConfiguration}. A separate configuration class is used so that - * {@link PulsarAutoConfiguration} can be excluded for reactive only application. + * Common configuration used by {@link PulsarAutoConfiguration}. * * @author Chris Bono * @author Phillip Webb diff --git a/module/spring-boot-pulsar/src/main/java/org/springframework/boot/pulsar/autoconfigure/PulsarReactiveAutoConfiguration.java b/module/spring-boot-pulsar/src/main/java/org/springframework/boot/pulsar/autoconfigure/PulsarReactiveAutoConfiguration.java deleted file mode 100644 index e0e2086f36e0..000000000000 --- a/module/spring-boot-pulsar/src/main/java/org/springframework/boot/pulsar/autoconfigure/PulsarReactiveAutoConfiguration.java +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Copyright 2012-present the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.boot.pulsar.autoconfigure; - -import java.time.Duration; -import java.util.ArrayList; -import java.util.List; - -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.reactive.client.adapter.AdaptedReactivePulsarClientFactory; -import org.apache.pulsar.reactive.client.adapter.ProducerCacheProvider; -import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumerBuilder; -import org.apache.pulsar.reactive.client.api.ReactiveMessageReaderBuilder; -import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderBuilder; -import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderCache; -import org.apache.pulsar.reactive.client.api.ReactivePulsarClient; -import org.apache.pulsar.reactive.client.producercache.CaffeineShadedProducerCacheProvider; -import org.jspecify.annotations.Nullable; - -import org.springframework.beans.factory.ObjectProvider; -import org.springframework.boot.autoconfigure.AutoConfiguration; -import org.springframework.boot.autoconfigure.EnableAutoConfiguration; -import org.springframework.boot.autoconfigure.condition.ConditionalOnBooleanProperty; -import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; -import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; -import org.springframework.boot.util.LambdaSafe; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Import; -import org.springframework.pulsar.config.PulsarAnnotationSupportBeanNames; -import org.springframework.pulsar.core.PulsarTopicBuilder; -import org.springframework.pulsar.core.SchemaResolver; -import org.springframework.pulsar.core.TopicResolver; -import org.springframework.pulsar.reactive.config.DefaultReactivePulsarListenerContainerFactory; -import org.springframework.pulsar.reactive.config.annotation.EnableReactivePulsar; -import org.springframework.pulsar.reactive.core.DefaultReactivePulsarConsumerFactory; -import org.springframework.pulsar.reactive.core.DefaultReactivePulsarReaderFactory; -import org.springframework.pulsar.reactive.core.DefaultReactivePulsarSenderFactory; -import org.springframework.pulsar.reactive.core.DefaultReactivePulsarSenderFactory.Builder; -import org.springframework.pulsar.reactive.core.ReactiveMessageConsumerBuilderCustomizer; -import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer; -import org.springframework.pulsar.reactive.core.ReactiveMessageSenderBuilderCustomizer; -import org.springframework.pulsar.reactive.core.ReactivePulsarConsumerFactory; -import org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory; -import org.springframework.pulsar.reactive.core.ReactivePulsarSenderFactory; -import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate; -import org.springframework.pulsar.reactive.listener.ReactivePulsarContainerProperties; - -/** - * {@link EnableAutoConfiguration Auto-configuration} for Spring for Apache Pulsar - * Reactive. - * - * @author Chris Bono - * @author Christophe Bornet - * @since 4.0.0 - */ -@AutoConfiguration(after = PulsarAutoConfiguration.class) -@ConditionalOnClass({ PulsarClient.class, ReactivePulsarClient.class, ReactivePulsarTemplate.class }) -@Import(PulsarConfiguration.class) -public final class PulsarReactiveAutoConfiguration { - - private final PulsarProperties properties; - - private final PulsarReactivePropertiesMapper propertiesMapper; - - PulsarReactiveAutoConfiguration(PulsarProperties properties) { - this.properties = properties; - this.propertiesMapper = new PulsarReactivePropertiesMapper(properties); - } - - @Bean - @ConditionalOnMissingBean - ReactivePulsarClient reactivePulsarClient(PulsarClient pulsarClient) { - return AdaptedReactivePulsarClientFactory.create(pulsarClient); - } - - @Bean - @ConditionalOnMissingBean(ProducerCacheProvider.class) - @ConditionalOnClass(CaffeineShadedProducerCacheProvider.class) - @ConditionalOnBooleanProperty(name = "spring.pulsar.producer.cache.enabled", matchIfMissing = true) - CaffeineShadedProducerCacheProvider reactivePulsarProducerCacheProvider() { - PulsarProperties.Producer.Cache properties = this.properties.getProducer().getCache(); - return new CaffeineShadedProducerCacheProvider(properties.getExpireAfterAccess(), Duration.ofMinutes(10), - properties.getMaximumSize(), properties.getInitialCapacity()); - } - - @Bean - @ConditionalOnMissingBean - @ConditionalOnBooleanProperty(name = "spring.pulsar.producer.cache.enabled", matchIfMissing = true) - ReactiveMessageSenderCache reactivePulsarMessageSenderCache( - ObjectProvider producerCacheProvider) { - return reactivePulsarMessageSenderCache(producerCacheProvider.getIfAvailable()); - } - - private ReactiveMessageSenderCache reactivePulsarMessageSenderCache( - @Nullable ProducerCacheProvider producerCacheProvider) { - return (producerCacheProvider != null) ? AdaptedReactivePulsarClientFactory.createCache(producerCacheProvider) - : AdaptedReactivePulsarClientFactory.createCache(); - } - - @Bean - @ConditionalOnMissingBean(ReactivePulsarSenderFactory.class) - DefaultReactivePulsarSenderFactory reactivePulsarSenderFactory(ReactivePulsarClient reactivePulsarClient, - ObjectProvider reactiveMessageSenderCache, TopicResolver topicResolver, - ObjectProvider> customizersProvider, - ObjectProvider topicBuilderProvider) { - List> customizers = new ArrayList<>(); - customizers.add(this.propertiesMapper::customizeMessageSenderBuilder); - customizers.addAll(customizersProvider.orderedStream().toList()); - List> lambdaSafeCustomizers = List - .of((builder) -> applyMessageSenderBuilderCustomizers(customizers, builder)); - Builder senderFactoryBuilder = DefaultReactivePulsarSenderFactory.builderFor(reactivePulsarClient) - .withDefaultConfigCustomizers(lambdaSafeCustomizers) - .withTopicResolver(topicResolver); - reactiveMessageSenderCache.ifAvailable(senderFactoryBuilder::withMessageSenderCache); - topicBuilderProvider.ifAvailable(senderFactoryBuilder::withTopicBuilder); - return senderFactoryBuilder.build(); - } - - @SuppressWarnings("unchecked") - private void applyMessageSenderBuilderCustomizers(List> customizers, - ReactiveMessageSenderBuilder builder) { - LambdaSafe.callbacks(ReactiveMessageSenderBuilderCustomizer.class, customizers, builder) - .invoke((customizer) -> customizer.customize(builder)); - } - - @Bean - @ConditionalOnMissingBean(ReactivePulsarConsumerFactory.class) - DefaultReactivePulsarConsumerFactory reactivePulsarConsumerFactory( - ReactivePulsarClient pulsarReactivePulsarClient, - ObjectProvider> customizersProvider, - ObjectProvider topicBuilderProvider) { - List> customizers = new ArrayList<>(); - customizers.add(this.propertiesMapper::customizeMessageConsumerBuilder); - customizers.addAll(customizersProvider.orderedStream().toList()); - List> lambdaSafeCustomizers = List - .of((builder) -> applyMessageConsumerBuilderCustomizers(customizers, builder)); - DefaultReactivePulsarConsumerFactory consumerFactory = new DefaultReactivePulsarConsumerFactory<>( - pulsarReactivePulsarClient, lambdaSafeCustomizers); - topicBuilderProvider.ifAvailable(consumerFactory::setTopicBuilder); - return consumerFactory; - } - - @SuppressWarnings("unchecked") - private void applyMessageConsumerBuilderCustomizers(List> customizers, - ReactiveMessageConsumerBuilder builder) { - LambdaSafe.callbacks(ReactiveMessageConsumerBuilderCustomizer.class, customizers, builder) - .invoke((customizer) -> customizer.customize(builder)); - } - - @Bean - @ConditionalOnMissingBean(name = "reactivePulsarListenerContainerFactory") - DefaultReactivePulsarListenerContainerFactory reactivePulsarListenerContainerFactory( - ReactivePulsarConsumerFactory reactivePulsarConsumerFactory, SchemaResolver schemaResolver, - TopicResolver topicResolver, PulsarContainerFactoryCustomizers containerFactoryCustomizers) { - ReactivePulsarContainerProperties containerProperties = new ReactivePulsarContainerProperties<>(); - containerProperties.setSchemaResolver(schemaResolver); - containerProperties.setTopicResolver(topicResolver); - this.propertiesMapper.customizeContainerProperties(containerProperties); - DefaultReactivePulsarListenerContainerFactory containerFactory = new DefaultReactivePulsarListenerContainerFactory<>( - reactivePulsarConsumerFactory, containerProperties); - containerFactoryCustomizers.customize(containerFactory); - return containerFactory; - } - - @Bean - @ConditionalOnMissingBean(ReactivePulsarReaderFactory.class) - DefaultReactivePulsarReaderFactory reactivePulsarReaderFactory(ReactivePulsarClient reactivePulsarClient, - ObjectProvider> customizersProvider, - ObjectProvider topicBuilderProvider) { - List> customizers = new ArrayList<>(); - customizers.add(this.propertiesMapper::customizeMessageReaderBuilder); - customizers.addAll(customizersProvider.orderedStream().toList()); - List> lambdaSafeCustomizers = List - .of((builder) -> applyMessageReaderBuilderCustomizers(customizers, builder)); - DefaultReactivePulsarReaderFactory readerFactory = new DefaultReactivePulsarReaderFactory<>( - reactivePulsarClient, lambdaSafeCustomizers); - topicBuilderProvider.ifAvailable(readerFactory::setTopicBuilder); - return readerFactory; - } - - @SuppressWarnings("unchecked") - private void applyMessageReaderBuilderCustomizers(List> customizers, - ReactiveMessageReaderBuilder builder) { - LambdaSafe.callbacks(ReactiveMessageReaderBuilderCustomizer.class, customizers, builder) - .invoke((customizer) -> customizer.customize(builder)); - } - - @Bean - @ConditionalOnMissingBean - ReactivePulsarTemplate pulsarReactiveTemplate(ReactivePulsarSenderFactory reactivePulsarSenderFactory, - SchemaResolver schemaResolver, TopicResolver topicResolver) { - return new ReactivePulsarTemplate<>(reactivePulsarSenderFactory, schemaResolver, topicResolver); - } - - @Configuration(proxyBeanMethods = false) - @EnableReactivePulsar - @ConditionalOnMissingBean( - name = PulsarAnnotationSupportBeanNames.REACTIVE_PULSAR_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME) - static class EnableReactivePulsarConfiguration { - - } - -} diff --git a/module/spring-boot-pulsar/src/main/java/org/springframework/boot/pulsar/autoconfigure/PulsarReactivePropertiesMapper.java b/module/spring-boot-pulsar/src/main/java/org/springframework/boot/pulsar/autoconfigure/PulsarReactivePropertiesMapper.java deleted file mode 100644 index fedd97e2a69e..000000000000 --- a/module/spring-boot-pulsar/src/main/java/org/springframework/boot/pulsar/autoconfigure/PulsarReactivePropertiesMapper.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Copyright 2012-present the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.boot.pulsar.autoconfigure; - -import java.util.ArrayList; - -import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumerBuilder; -import org.apache.pulsar.reactive.client.api.ReactiveMessageReaderBuilder; -import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderBuilder; - -import org.springframework.boot.context.properties.PropertyMapper; -import org.springframework.pulsar.reactive.listener.ReactivePulsarContainerProperties; - -/** - * Helper class used to map reactive {@link PulsarProperties} to various builder - * customizers. - * - * @author Chris Bono - * @author Phillip Webb - * @author Vedran Pavic - */ -final class PulsarReactivePropertiesMapper { - - private final PulsarProperties properties; - - PulsarReactivePropertiesMapper(PulsarProperties properties) { - this.properties = properties; - } - - void customizeMessageSenderBuilder(ReactiveMessageSenderBuilder builder) { - PulsarProperties.Producer properties = this.properties.getProducer(); - PropertyMapper map = PropertyMapper.get(); - map.from(properties::getName).to(builder::producerName); - map.from(properties::getTopicName).to(builder::topic); - map.from(properties::getSendTimeout).to(builder::sendTimeout); - map.from(properties::getMessageRoutingMode).to(builder::messageRoutingMode); - map.from(properties::getHashingScheme).to(builder::hashingScheme); - map.from(properties::isBatchingEnabled).to(builder::batchingEnabled); - map.from(properties::isChunkingEnabled).to(builder::chunkingEnabled); - map.from(properties::getCompressionType).to(builder::compressionType); - map.from(properties::getAccessMode).to(builder::accessMode); - } - - void customizeMessageConsumerBuilder(ReactiveMessageConsumerBuilder builder) { - PulsarProperties.Consumer properties = this.properties.getConsumer(); - PropertyMapper map = PropertyMapper.get(); - map.from(properties::getName).to(builder::consumerName); - map.from(properties::getTopics).as(ArrayList::new).to(builder::topics); - map.from(properties::getTopicsPattern).to(builder::topicsPattern); - map.from(properties::getPriorityLevel).to(builder::priorityLevel); - map.from(properties::isReadCompacted).to(builder::readCompacted); - map.from(properties::getDeadLetterPolicy).as(DeadLetterPolicyMapper::map).to(builder::deadLetterPolicy); - map.from(properties::isRetryEnable).to(builder::retryLetterTopicEnable); - customizerMessageConsumerBuilderSubscription(builder); - } - - private void customizerMessageConsumerBuilderSubscription(ReactiveMessageConsumerBuilder builder) { - PulsarProperties.Consumer.Subscription properties = this.properties.getConsumer().getSubscription(); - PropertyMapper map = PropertyMapper.get(); - map.from(properties::getName).to(builder::subscriptionName); - map.from(properties::getInitialPosition).to(builder::subscriptionInitialPosition); - map.from(properties::getMode).to(builder::subscriptionMode); - map.from(properties::getTopicsMode).to(builder::topicsPatternSubscriptionMode); - map.from(properties::getType).to(builder::subscriptionType); - } - - void customizeContainerProperties(ReactivePulsarContainerProperties containerProperties) { - customizePulsarContainerConsumerSubscriptionProperties(containerProperties); - customizePulsarContainerListenerProperties(containerProperties); - } - - private void customizePulsarContainerConsumerSubscriptionProperties( - ReactivePulsarContainerProperties containerProperties) { - PulsarProperties.Consumer.Subscription properties = this.properties.getConsumer().getSubscription(); - PropertyMapper map = PropertyMapper.get(); - map.from(properties::getType).to(containerProperties::setSubscriptionType); - map.from(properties::getName).to(containerProperties::setSubscriptionName); - } - - private void customizePulsarContainerListenerProperties(ReactivePulsarContainerProperties containerProperties) { - PulsarProperties.Listener properties = this.properties.getListener(); - PropertyMapper map = PropertyMapper.get(); - map.from(properties::getSchemaType).to(containerProperties::setSchemaType); - map.from(properties::getConcurrency).to(containerProperties::setConcurrency); - } - - void customizeMessageReaderBuilder(ReactiveMessageReaderBuilder builder) { - PulsarProperties.Reader properties = this.properties.getReader(); - PropertyMapper map = PropertyMapper.get(); - map.from(properties::getName).to(builder::readerName); - map.from(properties::getTopics).to(builder::topics); - map.from(properties::getSubscriptionName).to(builder::subscriptionName); - map.from(properties::getSubscriptionRolePrefix).to(builder::generatedSubscriptionNamePrefix); - map.from(properties::isReadCompacted).to(builder::readCompacted); - } - -} diff --git a/module/spring-boot-pulsar/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/module/spring-boot-pulsar/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports index 97d91ee0c43a..acf667fa1d2b 100644 --- a/module/spring-boot-pulsar/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports +++ b/module/spring-boot-pulsar/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -1,2 +1 @@ org.springframework.boot.pulsar.autoconfigure.PulsarAutoConfiguration -org.springframework.boot.pulsar.autoconfigure.PulsarReactiveAutoConfiguration diff --git a/module/spring-boot-pulsar/src/test/java/org/springframework/boot/pulsar/autoconfigure/PulsarAutoConfigurationTests.java b/module/spring-boot-pulsar/src/test/java/org/springframework/boot/pulsar/autoconfigure/PulsarAutoConfigurationTests.java index ea9919a97b12..50a5b8d1e17b 100644 --- a/module/spring-boot-pulsar/src/test/java/org/springframework/boot/pulsar/autoconfigure/PulsarAutoConfigurationTests.java +++ b/module/spring-boot-pulsar/src/test/java/org/springframework/boot/pulsar/autoconfigure/PulsarAutoConfigurationTests.java @@ -72,7 +72,6 @@ import org.springframework.pulsar.core.SchemaResolver; import org.springframework.pulsar.core.TopicResolver; import org.springframework.pulsar.listener.PulsarContainerProperties.TransactionSettings; -import org.springframework.pulsar.reactive.config.DefaultReactivePulsarListenerContainerFactory; import org.springframework.pulsar.transaction.PulsarAwareTransactionManager; import org.springframework.test.util.ReflectionTestUtils; @@ -597,14 +596,6 @@ void whenHasUserDefinedCustomizersAppliesInCorrectOrder() { @TestConfiguration(proxyBeanMethods = false) static class ListenerContainerFactoryCustomizersConfig { - @Bean - @Order(50) - PulsarContainerFactoryCustomizer> customizerIgnored() { - return (containerFactory) -> { - throw new IllegalStateException("should-not-have-matched"); - }; - } - @Bean @Order(200) PulsarContainerFactoryCustomizer> customizerFoo() { @@ -723,14 +714,6 @@ ReaderBuilderCustomizer customizerBar() { @TestConfiguration(proxyBeanMethods = false) static class ReaderContainerFactoryCustomizersConfig { - @Bean - @Order(50) - PulsarContainerFactoryCustomizer> customizerIgnored() { - return (containerFactory) -> { - throw new IllegalStateException("should-not-have-matched"); - }; - } - @Bean @Order(200) PulsarContainerFactoryCustomizer> customizerFoo() { diff --git a/module/spring-boot-pulsar/src/test/java/org/springframework/boot/pulsar/autoconfigure/PulsarContainerFactoryCustomizersTests.java b/module/spring-boot-pulsar/src/test/java/org/springframework/boot/pulsar/autoconfigure/PulsarContainerFactoryCustomizersTests.java index b6893ba84f4c..d2cb181c1eff 100644 --- a/module/spring-boot-pulsar/src/test/java/org/springframework/boot/pulsar/autoconfigure/PulsarContainerFactoryCustomizersTests.java +++ b/module/spring-boot-pulsar/src/test/java/org/springframework/boot/pulsar/autoconfigure/PulsarContainerFactoryCustomizersTests.java @@ -26,10 +26,8 @@ import org.springframework.pulsar.config.DefaultPulsarReaderContainerFactory; import org.springframework.pulsar.config.ListenerContainerFactory; import org.springframework.pulsar.config.PulsarContainerFactory; -import org.springframework.pulsar.config.PulsarListenerContainerFactory; import org.springframework.pulsar.core.PulsarConsumerFactory; import org.springframework.pulsar.listener.PulsarContainerProperties; -import org.springframework.pulsar.reactive.config.DefaultReactivePulsarListenerContainerFactory; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.BDDMockito.then; @@ -74,12 +72,12 @@ void customizeShouldCheckGeneric() { assertThat(list.get(1).getCount()).isZero(); assertThat(list.get(2).getCount()).isZero(); - customizers.customize(mock(ConcurrentPulsarListenerContainerFactory.class)); + customizers.customize(mock(ListenerContainerFactory.class)); assertThat(list.get(0).getCount()).isEqualTo(2); assertThat(list.get(1).getCount()).isOne(); - assertThat(list.get(2).getCount()).isOne(); + assertThat(list.get(2).getCount()).isZero(); - customizers.customize(mock(DefaultReactivePulsarListenerContainerFactory.class)); + customizers.customize(mock(ConcurrentPulsarListenerContainerFactory.class)); assertThat(list.get(0).getCount()).isEqualTo(3); assertThat(list.get(1).getCount()).isEqualTo(2); assertThat(list.get(2).getCount()).isOne(); @@ -101,7 +99,7 @@ public void customize(ConcurrentPulsarListenerContainerFactory containerFacto } /** - * Test customizer that will match all {@link PulsarListenerContainerFactory}. + * Test customizer that will match all {@link PulsarContainerFactory}. * * @param the container factory type */ @@ -121,10 +119,7 @@ int getCount() { } /** - * Test customizer that will match both - * {@link ConcurrentPulsarListenerContainerFactory} and - * {@link DefaultReactivePulsarListenerContainerFactory} as they both extend - * {@link ListenerContainerFactory}. + * Test customizer that will match all {@link ListenerContainerFactory}. */ static class TestPulsarListenersContainerFactoryCustomizer extends TestCustomizer> { diff --git a/module/spring-boot-pulsar/src/test/java/org/springframework/boot/pulsar/autoconfigure/PulsarReactiveAutoConfigurationTests.java b/module/spring-boot-pulsar/src/test/java/org/springframework/boot/pulsar/autoconfigure/PulsarReactiveAutoConfigurationTests.java deleted file mode 100644 index 64174ba01426..000000000000 --- a/module/spring-boot-pulsar/src/test/java/org/springframework/boot/pulsar/autoconfigure/PulsarReactiveAutoConfigurationTests.java +++ /dev/null @@ -1,555 +0,0 @@ -/* - * Copyright 2012-present the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.boot.pulsar.autoconfigure; - -import java.time.Duration; -import java.util.ArrayList; -import java.util.List; -import java.util.function.Supplier; - -import com.github.benmanes.caffeine.cache.Caffeine; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.common.schema.SchemaType; -import org.apache.pulsar.reactive.client.adapter.ProducerCacheProvider; -import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumerBuilder; -import org.apache.pulsar.reactive.client.api.ReactiveMessageReaderBuilder; -import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderBuilder; -import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderCache; -import org.apache.pulsar.reactive.client.api.ReactivePulsarClient; -import org.apache.pulsar.reactive.client.producercache.CaffeineShadedProducerCacheProvider; -import org.assertj.core.api.AbstractObjectAssert; -import org.assertj.core.api.InstanceOfAssertFactories; -import org.junit.jupiter.api.Nested; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; - -import org.springframework.boot.autoconfigure.AutoConfigurations; -import org.springframework.boot.test.context.FilteredClassLoader; -import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.boot.test.context.assertj.AssertableApplicationContext; -import org.springframework.boot.test.context.runner.ApplicationContextRunner; -import org.springframework.context.annotation.Bean; -import org.springframework.core.annotation.Order; -import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactory; -import org.springframework.pulsar.core.DefaultSchemaResolver; -import org.springframework.pulsar.core.DefaultTopicResolver; -import org.springframework.pulsar.core.PulsarAdministration; -import org.springframework.pulsar.core.PulsarTopicBuilder; -import org.springframework.pulsar.core.SchemaResolver; -import org.springframework.pulsar.core.TopicResolver; -import org.springframework.pulsar.reactive.config.DefaultReactivePulsarListenerContainerFactory; -import org.springframework.pulsar.reactive.config.ReactivePulsarListenerContainerFactory; -import org.springframework.pulsar.reactive.config.ReactivePulsarListenerEndpointRegistry; -import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarBootstrapConfiguration; -import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListenerAnnotationBeanPostProcessor; -import org.springframework.pulsar.reactive.core.DefaultReactivePulsarConsumerFactory; -import org.springframework.pulsar.reactive.core.DefaultReactivePulsarReaderFactory; -import org.springframework.pulsar.reactive.core.DefaultReactivePulsarSenderFactory; -import org.springframework.pulsar.reactive.core.ReactiveMessageConsumerBuilderCustomizer; -import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer; -import org.springframework.pulsar.reactive.core.ReactiveMessageSenderBuilderCustomizer; -import org.springframework.pulsar.reactive.core.ReactivePulsarConsumerFactory; -import org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory; -import org.springframework.pulsar.reactive.core.ReactivePulsarSenderFactory; -import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate; -import org.springframework.pulsar.reactive.listener.ReactivePulsarContainerProperties; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.mock; - -/** - * Tests for {@link PulsarReactiveAutoConfiguration}. - * - * @author Chris Bono - * @author Christophe Bornet - * @author Phillip Webb - */ -class PulsarReactiveAutoConfigurationTests { - - private static final String INTERNAL_PULSAR_LISTENER_ANNOTATION_PROCESSOR = "org.springframework.pulsar.config.internalReactivePulsarListenerAnnotationProcessor"; - - private final ApplicationContextRunner contextRunner = new ApplicationContextRunner() - .withConfiguration(AutoConfigurations.of(PulsarReactiveAutoConfiguration.class)) - .withBean(PulsarClient.class, () -> mock(PulsarClient.class)); - - @Test - void whenPulsarNotOnClasspathAutoConfigurationIsSkipped() { - new ApplicationContextRunner().withConfiguration(AutoConfigurations.of(PulsarReactiveAutoConfiguration.class)) - .withClassLoader(new FilteredClassLoader(PulsarClient.class)) - .run((context) -> assertThat(context).doesNotHaveBean(PulsarReactiveAutoConfiguration.class)); - } - - @Test - void whenReactivePulsarNotOnClasspathAutoConfigurationIsSkipped() { - this.contextRunner.withClassLoader(new FilteredClassLoader(ReactivePulsarClient.class)) - .run((context) -> assertThat(context).doesNotHaveBean(PulsarReactiveAutoConfiguration.class)); - } - - @Test - void whenReactiveSpringPulsarNotOnClasspathAutoConfigurationIsSkipped() { - this.contextRunner.withClassLoader(new FilteredClassLoader(ReactivePulsarTemplate.class)) - .run((context) -> assertThat(context).doesNotHaveBean(PulsarReactiveAutoConfiguration.class)); - } - - @Test - void whenCustomPulsarListenerAnnotationProcessorDefinedAutoConfigurationIsSkipped() { - this.contextRunner.withBean(INTERNAL_PULSAR_LISTENER_ANNOTATION_PROCESSOR, String.class, () -> "bean") - .run((context) -> assertThat(context).doesNotHaveBean(ReactivePulsarBootstrapConfiguration.class)); - } - - @Test - void autoConfiguresBeans() { - this.contextRunner.run((context) -> assertThat(context).hasSingleBean(PulsarConfiguration.class) - .hasSingleBean(PulsarClient.class) - .hasSingleBean(PulsarTopicBuilder.class) - .hasSingleBean(PulsarAdministration.class) - .hasSingleBean(DefaultSchemaResolver.class) - .hasSingleBean(DefaultTopicResolver.class) - .hasSingleBean(ReactivePulsarClient.class) - .hasSingleBean(CaffeineShadedProducerCacheProvider.class) - .hasSingleBean(ReactiveMessageSenderCache.class) - .hasSingleBean(DefaultReactivePulsarSenderFactory.class) - .hasSingleBean(ReactivePulsarTemplate.class) - .hasSingleBean(DefaultReactivePulsarConsumerFactory.class) - .hasSingleBean(DefaultReactivePulsarListenerContainerFactory.class) - .hasSingleBean(ReactivePulsarListenerAnnotationBeanPostProcessor.class) - .hasSingleBean(ReactivePulsarListenerEndpointRegistry.class)); - } - - @Test - void topicDefaultsCanBeDisabled() { - this.contextRunner.withPropertyValues("spring.pulsar.defaults.topic.enabled=false") - .run((context) -> assertThat(context).doesNotHaveBean(PulsarTopicBuilder.class)); - } - - @Test - @SuppressWarnings("rawtypes") - void injectsExpectedBeansIntoReactivePulsarClient() { - this.contextRunner.run((context) -> { - PulsarClient pulsarClient = context.getBean(PulsarClient.class); - assertThat(context).hasNotFailed() - .getBean(ReactivePulsarClient.class) - .extracting("reactivePulsarResourceAdapter") - .extracting("pulsarClientSupplier", InstanceOfAssertFactories.type(Supplier.class)) - .extracting(Supplier::get) - .isSameAs(pulsarClient); - }); - } - - @ParameterizedTest - @ValueSource(classes = { ReactivePulsarClient.class, ProducerCacheProvider.class, ReactiveMessageSenderCache.class, - ReactivePulsarSenderFactory.class, ReactivePulsarConsumerFactory.class, ReactivePulsarReaderFactory.class, - ReactivePulsarTemplate.class }) - void whenHasUserDefinedBeanDoesNotAutoConfigureBean(Class beanClass) { - T bean = mock(beanClass); - this.contextRunner.withBean(beanClass.getName(), beanClass, () -> bean) - .run((context) -> assertThat(context).getBean(beanClass).isSameAs(bean)); - } - - @Nested - class SenderFactoryTests { - - private final ApplicationContextRunner contextRunner = PulsarReactiveAutoConfigurationTests.this.contextRunner; - - @Test - void injectsExpectedBeans() { - ReactivePulsarClient client = mock(ReactivePulsarClient.class); - ReactiveMessageSenderCache cache = mock(ReactiveMessageSenderCache.class); - this.contextRunner.withPropertyValues("spring.pulsar.producer.topic-name=test-topic") - .withBean("customReactivePulsarClient", ReactivePulsarClient.class, () -> client) - .withBean("customReactiveMessageSenderCache", ReactiveMessageSenderCache.class, () -> cache) - .run((context) -> { - DefaultReactivePulsarSenderFactory senderFactory = context - .getBean(DefaultReactivePulsarSenderFactory.class); - assertThat(senderFactory) - .extracting("reactivePulsarClient", InstanceOfAssertFactories.type(ReactivePulsarClient.class)) - .isSameAs(client); - assertThat(senderFactory) - .extracting("reactiveMessageSenderCache", - InstanceOfAssertFactories.type(ReactiveMessageSenderCache.class)) - .isSameAs(cache); - assertThat(senderFactory) - .extracting("topicResolver", InstanceOfAssertFactories.type(TopicResolver.class)) - .isSameAs(context.getBean(TopicResolver.class)); - assertThat(senderFactory).extracting("topicBuilder").isNotNull(); - }); - } - - @Test - void hasNoTopicBuilderWhenTopicDefaultsAreDisabled() { - this.contextRunner.withPropertyValues("spring.pulsar.defaults.topic.enabled=false") - .run((context) -> assertThat((DefaultReactivePulsarSenderFactory) context - .getBean(DefaultReactivePulsarSenderFactory.class)).extracting("topicBuilder").isNull()); - } - - @Test - void injectsExpectedBeansIntoReactiveMessageSenderCache() { - ProducerCacheProvider provider = mock(ProducerCacheProvider.class); - this.contextRunner.withBean("customProducerCacheProvider", ProducerCacheProvider.class, () -> provider) - .run((context) -> assertThat(context).getBean(ReactiveMessageSenderCache.class) - .extracting("cacheProvider", InstanceOfAssertFactories.type(ProducerCacheProvider.class)) - .isSameAs(provider)); - } - - @Test - void whenHasUserDefinedCustomizersAppliesInCorrectOrder() { - this.contextRunner.withPropertyValues("spring.pulsar.producer.name=fromPropsCustomizer") - .withUserConfiguration(ReactiveMessageSenderBuilderCustomizerConfig.class) - .run((context) -> { - DefaultReactivePulsarSenderFactory producerFactory = context - .getBean(DefaultReactivePulsarSenderFactory.class); - Customizers, ReactiveMessageSenderBuilder> customizers = Customizers - .of(ReactiveMessageSenderBuilder.class, ReactiveMessageSenderBuilderCustomizer::customize); - assertThat(customizers.fromField(producerFactory, "defaultConfigCustomizers")).callsInOrder( - ReactiveMessageSenderBuilder::producerName, "fromPropsCustomizer", "fromCustomizer1", - "fromCustomizer2"); - }); - } - - @TestConfiguration(proxyBeanMethods = false) - static class ReactiveMessageSenderBuilderCustomizerConfig { - - @Bean - @Order(200) - ReactiveMessageSenderBuilderCustomizer customizerFoo() { - return (builder) -> builder.producerName("fromCustomizer2"); - } - - @Bean - @Order(100) - ReactiveMessageSenderBuilderCustomizer customizerBar() { - return (builder) -> builder.producerName("fromCustomizer1"); - } - - } - - } - - @Nested - class TemplateTests { - - private final ApplicationContextRunner contextRunner = PulsarReactiveAutoConfigurationTests.this.contextRunner; - - @Test - @SuppressWarnings("rawtypes") - void injectsExpectedBeans() { - ReactivePulsarSenderFactory senderFactory = mock(ReactivePulsarSenderFactory.class); - SchemaResolver schemaResolver = mock(SchemaResolver.class); - this.contextRunner - .withBean("customReactivePulsarSenderFactory", ReactivePulsarSenderFactory.class, () -> senderFactory) - .withBean("schemaResolver", SchemaResolver.class, () -> schemaResolver) - .run((context) -> assertThat(context).getBean(ReactivePulsarTemplate.class).satisfies((template) -> { - assertThat(template).extracting("reactiveMessageSenderFactory").isSameAs(senderFactory); - assertThat(template).extracting("schemaResolver").isSameAs(schemaResolver); - })); - } - - } - - @Nested - class ConsumerFactoryTests { - - private final ApplicationContextRunner contextRunner = PulsarReactiveAutoConfigurationTests.this.contextRunner; - - @Test - void injectsExpectedBeans() { - ReactivePulsarClient client = mock(ReactivePulsarClient.class); - PulsarTopicBuilder topicBuilder = mock(PulsarTopicBuilder.class); - this.contextRunner.withBean("customReactivePulsarClient", ReactivePulsarClient.class, () -> client) - .withBean("customTopicBuilder", PulsarTopicBuilder.class, () -> topicBuilder) - .run((context) -> { - ReactivePulsarConsumerFactory consumerFactory = context - .getBean(DefaultReactivePulsarConsumerFactory.class); - assertThat(consumerFactory) - .extracting("reactivePulsarClient", InstanceOfAssertFactories.type(ReactivePulsarClient.class)) - .isSameAs(client); - assertThat(consumerFactory) - .extracting("topicBuilder", InstanceOfAssertFactories.type(PulsarTopicBuilder.class)) - .isSameAs(topicBuilder); - }); - } - - @Test - void hasNoTopicBuilderWhenTopicDefaultsAreDisabled() { - this.contextRunner.withPropertyValues("spring.pulsar.defaults.topic.enabled=false") - .run((context) -> assertThat( - (ReactivePulsarConsumerFactory) context.getBean(DefaultReactivePulsarConsumerFactory.class)) - .extracting("topicBuilder") - .isNull()); - } - - @Test - void whenHasUserDefinedCustomizersAppliesInCorrectOrder() { - this.contextRunner.withPropertyValues("spring.pulsar.consumer.name=fromPropsCustomizer") - .withUserConfiguration(ReactiveMessageConsumerBuilderCustomizerConfig.class) - .run((context) -> { - DefaultReactivePulsarConsumerFactory consumerFactory = context - .getBean(DefaultReactivePulsarConsumerFactory.class); - Customizers, ReactiveMessageConsumerBuilder> customizers = Customizers - .of(ReactiveMessageConsumerBuilder.class, ReactiveMessageConsumerBuilderCustomizer::customize); - assertThat(customizers.fromField(consumerFactory, "defaultConfigCustomizers")).callsInOrder( - ReactiveMessageConsumerBuilder::consumerName, "fromPropsCustomizer", "fromCustomizer1", - "fromCustomizer2"); - }); - } - - @TestConfiguration(proxyBeanMethods = false) - static class ReactiveMessageConsumerBuilderCustomizerConfig { - - @Bean - @Order(200) - ReactiveMessageConsumerBuilderCustomizer customizerFoo() { - return (builder) -> builder.consumerName("fromCustomizer2"); - } - - @Bean - @Order(100) - ReactiveMessageConsumerBuilderCustomizer customizerBar() { - return (builder) -> builder.consumerName("fromCustomizer1"); - } - - } - - } - - @Nested - class ListenerTests { - - private final ApplicationContextRunner contextRunner = PulsarReactiveAutoConfigurationTests.this.contextRunner; - - @Test - void whenHasUserDefinedBeanDoesNotAutoConfigureBean() { - ReactivePulsarListenerContainerFactory listenerContainerFactory = mock( - ReactivePulsarListenerContainerFactory.class); - this.contextRunner - .withBean("reactivePulsarListenerContainerFactory", ReactivePulsarListenerContainerFactory.class, - () -> listenerContainerFactory) - .run((context) -> assertThat(context).getBean(ReactivePulsarListenerContainerFactory.class) - .isSameAs(listenerContainerFactory)); - } - - @Test - void whenHasUserDefinedReactivePulsarListenerAnnotationBeanPostProcessorDoesNotAutoConfigureBean() { - ReactivePulsarListenerAnnotationBeanPostProcessor listenerAnnotationBeanPostProcessor = mock( - ReactivePulsarListenerAnnotationBeanPostProcessor.class); - this.contextRunner.withBean(INTERNAL_PULSAR_LISTENER_ANNOTATION_PROCESSOR, - ReactivePulsarListenerAnnotationBeanPostProcessor.class, () -> listenerAnnotationBeanPostProcessor) - .run((context) -> assertThat(context).getBean(ReactivePulsarListenerAnnotationBeanPostProcessor.class) - .isSameAs(listenerAnnotationBeanPostProcessor)); - } - - @Test - void whenHasCustomProperties() { - List properties = new ArrayList<>(); - properties.add("spring.pulsar.listener.schema-type=avro"); - this.contextRunner.withPropertyValues(properties.toArray(String[]::new)).run((context) -> { - DefaultReactivePulsarListenerContainerFactory factory = context - .getBean(DefaultReactivePulsarListenerContainerFactory.class); - assertThat(factory.getContainerProperties().getSchemaType()).isEqualTo(SchemaType.AVRO); - }); - } - - @Test - void injectsExpectedBeans() { - ReactivePulsarConsumerFactory consumerFactory = mock(ReactivePulsarConsumerFactory.class); - SchemaResolver schemaResolver = mock(SchemaResolver.class); - this.contextRunner - .withBean("customReactivePulsarConsumerFactory", ReactivePulsarConsumerFactory.class, - () -> consumerFactory) - .withBean("schemaResolver", SchemaResolver.class, () -> schemaResolver) - .run((context) -> { - DefaultReactivePulsarListenerContainerFactory containerFactory = context - .getBean(DefaultReactivePulsarListenerContainerFactory.class); - assertThat(containerFactory).extracting("consumerFactory").isSameAs(consumerFactory); - assertThat(containerFactory) - .extracting(DefaultReactivePulsarListenerContainerFactory::getContainerProperties) - .extracting(ReactivePulsarContainerProperties::getSchemaResolver) - .isSameAs(schemaResolver); - }); - } - - @Test - void whenHasUserDefinedFactoryCustomizersAppliesInCorrectOrder() { - this.contextRunner.withUserConfiguration(ListenerContainerFactoryCustomizersConfig.class) - .run((context) -> assertThat(context).getBean(DefaultReactivePulsarListenerContainerFactory.class) - .hasFieldOrPropertyWithValue("containerProperties.subscriptionName", ":bar:foo")); - } - - @TestConfiguration(proxyBeanMethods = false) - static class ListenerContainerFactoryCustomizersConfig { - - @Bean - @Order(50) - PulsarContainerFactoryCustomizer> customizerIgnored() { - return (containerFactory) -> { - throw new IllegalStateException("should-not-have-matched"); - }; - } - - @Bean - @Order(200) - PulsarContainerFactoryCustomizer> customizerFoo() { - return (containerFactory) -> appendToSubscriptionName(containerFactory, ":foo"); - } - - @Bean - @Order(100) - PulsarContainerFactoryCustomizer> customizerBar() { - return (containerFactory) -> appendToSubscriptionName(containerFactory, ":bar"); - } - - private void appendToSubscriptionName(DefaultReactivePulsarListenerContainerFactory containerFactory, - String valueToAppend) { - String subscriptionName = containerFactory.getContainerProperties().getSubscriptionName(); - String updatedValue = (subscriptionName != null) ? subscriptionName + valueToAppend : valueToAppend; - containerFactory.getContainerProperties().setSubscriptionName(updatedValue); - } - - } - - } - - @Nested - class ReaderFactoryTests { - - private final ApplicationContextRunner contextRunner = PulsarReactiveAutoConfigurationTests.this.contextRunner; - - @Test - void injectsExpectedBeans() { - ReactivePulsarClient client = mock(ReactivePulsarClient.class); - PulsarTopicBuilder topicBuilder = mock(PulsarTopicBuilder.class); - this.contextRunner.withPropertyValues("spring.pulsar.reader.name=test-reader") - .withBean("customReactivePulsarClient", ReactivePulsarClient.class, () -> client) - .withBean("customPulsarTopicBuilder", PulsarTopicBuilder.class, () -> topicBuilder) - .run((context) -> { - DefaultReactivePulsarReaderFactory readerFactory = context - .getBean(DefaultReactivePulsarReaderFactory.class); - assertThat(readerFactory) - .extracting("reactivePulsarClient", InstanceOfAssertFactories.type(ReactivePulsarClient.class)) - .isSameAs(client); - assertThat(readerFactory) - .extracting("topicBuilder", InstanceOfAssertFactories.type(PulsarTopicBuilder.class)) - .isSameAs(topicBuilder); - }); - } - - @Test - void hasNoTopicBuilderWhenTopicDefaultsAreDisabled() { - this.contextRunner.withPropertyValues("spring.pulsar.defaults.topic.enabled=false") - .run((context) -> assertThat((DefaultReactivePulsarReaderFactory) context - .getBean(DefaultReactivePulsarReaderFactory.class)).extracting("topicBuilder").isNull()); - } - - @Test - void whenHasUserDefinedCustomizersAppliesInCorrectOrder() { - this.contextRunner.withPropertyValues("spring.pulsar.reader.name=fromPropsCustomizer") - .withUserConfiguration(ReactiveMessageReaderBuilderCustomizerConfig.class) - .run((context) -> { - DefaultReactivePulsarReaderFactory readerFactory = context - .getBean(DefaultReactivePulsarReaderFactory.class); - Customizers, ReactiveMessageReaderBuilder> customizers = Customizers - .of(ReactiveMessageReaderBuilder.class, ReactiveMessageReaderBuilderCustomizer::customize); - assertThat(customizers.fromField(readerFactory, "defaultConfigCustomizers")).callsInOrder( - ReactiveMessageReaderBuilder::readerName, "fromPropsCustomizer", "fromCustomizer1", - "fromCustomizer2"); - }); - } - - @TestConfiguration(proxyBeanMethods = false) - static class ReactiveMessageReaderBuilderCustomizerConfig { - - @Bean - @Order(200) - ReactiveMessageReaderBuilderCustomizer customizerFoo() { - return (builder) -> builder.readerName("fromCustomizer2"); - } - - @Bean - @Order(100) - ReactiveMessageReaderBuilderCustomizer customizerBar() { - return (builder) -> builder.readerName("fromCustomizer1"); - } - - } - - } - - @Nested - class SenderCacheAutoConfigurationTests { - - private final ApplicationContextRunner contextRunner = PulsarReactiveAutoConfigurationTests.this.contextRunner; - - @Test - void whenNoPropertiesEnablesCaching() { - this.contextRunner.run(this::assertCaffeineProducerCacheProvider); - } - - @Test - void whenCachingEnabledEnablesCaching() { - this.contextRunner.withPropertyValues("spring.pulsar.producer.cache.enabled=true") - .run(this::assertCaffeineProducerCacheProvider); - } - - @Test - void whenCachingDisabledDoesNotEnableCaching() { - this.contextRunner.withPropertyValues("spring.pulsar.producer.cache.enabled=false") - .run((context) -> assertThat(context).doesNotHaveBean(ProducerCacheProvider.class) - .doesNotHaveBean(ReactiveMessageSenderCache.class)); - } - - @Test - void whenCachingEnabledAndCaffeineNotOnClasspathStillUsesCaffeine() { - // The reactive client shades Caffeine - it should still be used - this.contextRunner.withClassLoader(new FilteredClassLoader(Caffeine.class)) - .withPropertyValues("spring.pulsar.producer.cache.enabled=true") - .run(this::assertCaffeineProducerCacheProvider); - } - - @Test - void whenCachingEnabledAndNoCacheProviderAvailable() { - // The reactive client uses a shaded caffeine cache provider as its internal - // cache - this.contextRunner.withClassLoader(new FilteredClassLoader(CaffeineShadedProducerCacheProvider.class)) - .withPropertyValues("spring.pulsar.producer.cache.enabled=true") - .run((context) -> assertThat(context).doesNotHaveBean(ProducerCacheProvider.class) - .getBean(ReactiveMessageSenderCache.class) - .extracting("cacheProvider") - .isExactlyInstanceOf(CaffeineShadedProducerCacheProvider.class)); - } - - @Test - void whenCustomCachingPropertiesCreatesConfiguredBean() { - this.contextRunner - .withPropertyValues("spring.pulsar.producer.cache.expire-after-access=100s", - "spring.pulsar.producer.cache.maximum-size=5150", - "spring.pulsar.producer.cache.initial-capacity=200") - .run((context) -> assertCaffeineProducerCacheProvider(context).extracting("cache.cache") - .hasFieldOrPropertyWithValue("expiresAfterAccessNanos", Duration.ofSeconds(100).toNanos()) - .hasFieldOrPropertyWithValue("maximum", 5150L)); - } - - private AbstractObjectAssert assertCaffeineProducerCacheProvider( - AssertableApplicationContext context) { - return assertThat(context).hasSingleBean(ReactiveMessageSenderCache.class) - .getBean(ProducerCacheProvider.class) - .isExactlyInstanceOf(CaffeineShadedProducerCacheProvider.class); - } - - } - -} diff --git a/module/spring-boot-pulsar/src/test/java/org/springframework/boot/pulsar/autoconfigure/PulsarReactivePropertiesMapperTests.java b/module/spring-boot-pulsar/src/test/java/org/springframework/boot/pulsar/autoconfigure/PulsarReactivePropertiesMapperTests.java deleted file mode 100644 index 9c8a6bf74c27..000000000000 --- a/module/spring-boot-pulsar/src/test/java/org/springframework/boot/pulsar/autoconfigure/PulsarReactivePropertiesMapperTests.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Copyright 2012-present the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.boot.pulsar.autoconfigure; - -import java.time.Duration; -import java.util.List; -import java.util.regex.Pattern; - -import org.apache.pulsar.client.api.CompressionType; -import org.apache.pulsar.client.api.DeadLetterPolicy; -import org.apache.pulsar.client.api.HashingScheme; -import org.apache.pulsar.client.api.MessageRoutingMode; -import org.apache.pulsar.client.api.ProducerAccessMode; -import org.apache.pulsar.client.api.RegexSubscriptionMode; -import org.apache.pulsar.client.api.SubscriptionInitialPosition; -import org.apache.pulsar.client.api.SubscriptionMode; -import org.apache.pulsar.client.api.SubscriptionType; -import org.apache.pulsar.common.schema.SchemaType; -import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumerBuilder; -import org.apache.pulsar.reactive.client.api.ReactiveMessageReaderBuilder; -import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderBuilder; -import org.junit.jupiter.api.Test; - -import org.springframework.boot.pulsar.autoconfigure.PulsarProperties.Consumer; -import org.springframework.boot.pulsar.autoconfigure.PulsarProperties.Consumer.Subscription; -import org.springframework.pulsar.reactive.listener.ReactivePulsarContainerProperties; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.BDDMockito.then; -import static org.mockito.Mockito.mock; - -/** - * Tests for {@link PulsarReactivePropertiesMapper}. - * - * @author Chris Bono - * @author Phillip Webb - * @author Vedran Pavic - */ -class PulsarReactivePropertiesMapperTests { - - @Test - @SuppressWarnings("unchecked") - void customizeMessageSenderBuilder() { - PulsarProperties properties = new PulsarProperties(); - properties.getProducer().setName("name"); - properties.getProducer().setTopicName("topicname"); - properties.getProducer().setSendTimeout(Duration.ofSeconds(1)); - properties.getProducer().setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition); - properties.getProducer().setHashingScheme(HashingScheme.JavaStringHash); - properties.getProducer().setBatchingEnabled(false); - properties.getProducer().setChunkingEnabled(true); - properties.getProducer().setCompressionType(CompressionType.SNAPPY); - properties.getProducer().setAccessMode(ProducerAccessMode.Exclusive); - ReactiveMessageSenderBuilder builder = mock(ReactiveMessageSenderBuilder.class); - new PulsarReactivePropertiesMapper(properties).customizeMessageSenderBuilder(builder); - then(builder).should().producerName("name"); - then(builder).should().topic("topicname"); - then(builder).should().sendTimeout(Duration.ofSeconds(1)); - then(builder).should().messageRoutingMode(MessageRoutingMode.RoundRobinPartition); - then(builder).should().hashingScheme(HashingScheme.JavaStringHash); - then(builder).should().batchingEnabled(false); - then(builder).should().chunkingEnabled(true); - then(builder).should().compressionType(CompressionType.SNAPPY); - then(builder).should().accessMode(ProducerAccessMode.Exclusive); - } - - @Test - @SuppressWarnings("unchecked") - void customizeMessageConsumerBuilder() { - PulsarProperties properties = new PulsarProperties(); - List topics = List.of("mytopic"); - Pattern topisPattern = Pattern.compile("my-pattern"); - properties.getConsumer().setName("name"); - properties.getConsumer().setTopics(topics); - properties.getConsumer().setTopicsPattern(topisPattern); - properties.getConsumer().setPriorityLevel(123); - properties.getConsumer().setReadCompacted(true); - Consumer.DeadLetterPolicy deadLetterPolicy = new Consumer.DeadLetterPolicy(); - deadLetterPolicy.setDeadLetterTopic("my-dlt"); - deadLetterPolicy.setMaxRedeliverCount(1); - properties.getConsumer().setDeadLetterPolicy(deadLetterPolicy); - properties.getConsumer().setRetryEnable(false); - Subscription subscriptionProperties = properties.getConsumer().getSubscription(); - subscriptionProperties.setName("subname"); - subscriptionProperties.setInitialPosition(SubscriptionInitialPosition.Earliest); - subscriptionProperties.setMode(SubscriptionMode.NonDurable); - subscriptionProperties.setTopicsMode(RegexSubscriptionMode.NonPersistentOnly); - subscriptionProperties.setType(SubscriptionType.Key_Shared); - ReactiveMessageConsumerBuilder builder = mock(ReactiveMessageConsumerBuilder.class); - new PulsarReactivePropertiesMapper(properties).customizeMessageConsumerBuilder(builder); - then(builder).should().consumerName("name"); - then(builder).should().topics(topics); - then(builder).should().topicsPattern(topisPattern); - then(builder).should().priorityLevel(123); - then(builder).should().readCompacted(true); - then(builder).should().deadLetterPolicy(new DeadLetterPolicy(1, null, "my-dlt", null, null, null)); - then(builder).should().retryLetterTopicEnable(false); - then(builder).should().subscriptionName("subname"); - then(builder).should().subscriptionInitialPosition(SubscriptionInitialPosition.Earliest); - then(builder).should().subscriptionMode(SubscriptionMode.NonDurable); - then(builder).should().topicsPatternSubscriptionMode(RegexSubscriptionMode.NonPersistentOnly); - then(builder).should().subscriptionType(SubscriptionType.Key_Shared); - } - - @Test - void customizeContainerProperties() { - PulsarProperties properties = new PulsarProperties(); - properties.getConsumer().getSubscription().setType(SubscriptionType.Shared); - properties.getConsumer().getSubscription().setName("my-subscription"); - properties.getListener().setSchemaType(SchemaType.AVRO); - properties.getListener().setConcurrency(10); - ReactivePulsarContainerProperties containerProperties = new ReactivePulsarContainerProperties<>(); - new PulsarReactivePropertiesMapper(properties).customizeContainerProperties(containerProperties); - assertThat(containerProperties.getSubscriptionType()).isEqualTo(SubscriptionType.Shared); - assertThat(containerProperties.getSubscriptionName()).isEqualTo("my-subscription"); - assertThat(containerProperties.getSchemaType()).isEqualTo(SchemaType.AVRO); - assertThat(containerProperties.getConcurrency()).isEqualTo(10); - } - - @Test - @SuppressWarnings("unchecked") - void customizeMessageReaderBuilder() { - List topics = List.of("my-topic"); - PulsarProperties properties = new PulsarProperties(); - properties.getReader().setName("name"); - properties.getReader().setTopics(topics); - properties.getReader().setSubscriptionName("subname"); - properties.getReader().setSubscriptionRolePrefix("srp"); - ReactiveMessageReaderBuilder builder = mock(ReactiveMessageReaderBuilder.class); - new PulsarReactivePropertiesMapper(properties).customizeMessageReaderBuilder(builder); - then(builder).should().readerName("name"); - then(builder).should().topics(topics); - then(builder).should().subscriptionName("subname"); - then(builder).should().generatedSubscriptionNamePrefix("srp"); - } - -} diff --git a/platform/spring-boot-dependencies/build.gradle b/platform/spring-boot-dependencies/build.gradle index 1887c8bd7528..951e68d76b6c 100644 --- a/platform/spring-boot-dependencies/build.gradle +++ b/platform/spring-boot-dependencies/build.gradle @@ -1796,15 +1796,6 @@ bom { releaseNotes("https://pulsar.apache.org/release-notes/versioned/pulsar-{version}") } } - library("Pulsar Reactive", "0.7.0") { - group("org.apache.pulsar") { - bom("pulsar-client-reactive-bom") - } - links { - site("https://github.com/apache/pulsar-client-reactive") - releaseNotes("https://github.com/apache/pulsar-client-reactive/releases/tag/v{version}") - } - } library("Quartz", "2.5.0") { group("org.quartz-scheduler") { modules = [ @@ -2207,8 +2198,6 @@ bom { "spring-boot-starter-opentelemetry-test", "spring-boot-starter-pulsar", "spring-boot-starter-pulsar-test", - "spring-boot-starter-pulsar-reactive", - "spring-boot-starter-pulsar-reactive-test", "spring-boot-starter-quartz", "spring-boot-starter-quartz-test", "spring-boot-starter-r2dbc", diff --git a/settings.gradle b/settings.gradle index bbcd67e46da5..df53ce99d64e 100644 --- a/settings.gradle +++ b/settings.gradle @@ -310,8 +310,6 @@ include "starter:spring-boot-starter-opentelemetry-test" include "starter:spring-boot-starter-parent" include "starter:spring-boot-starter-pulsar" include "starter:spring-boot-starter-pulsar-test" -include "starter:spring-boot-starter-pulsar-reactive" -include "starter:spring-boot-starter-pulsar-reactive-test" include "starter:spring-boot-starter-quartz" include "starter:spring-boot-starter-quartz-test" include "starter:spring-boot-starter-r2dbc" diff --git a/smoke-test/spring-boot-smoke-test-pulsar/build.gradle b/smoke-test/spring-boot-smoke-test-pulsar/build.gradle index 18b205190021..fdcb45cfe400 100644 --- a/smoke-test/spring-boot-smoke-test-pulsar/build.gradle +++ b/smoke-test/spring-boot-smoke-test-pulsar/build.gradle @@ -23,7 +23,6 @@ description = "Spring Boot Pulsar smoke test" dependencies { implementation(project(":starter:spring-boot-starter-pulsar")) - implementation(project(":starter:spring-boot-starter-pulsar-reactive")) dockerTestImplementation(project(":starter:spring-boot-starter-test")) dockerTestImplementation(project(":test-support:spring-boot-docker-test-support")) diff --git a/smoke-test/spring-boot-smoke-test-pulsar/src/dockerTest/java/smoketest/pulsar/SamplePulsarApplicationTests.java b/smoke-test/spring-boot-smoke-test-pulsar/src/dockerTest/java/smoketest/pulsar/SamplePulsarApplicationTests.java index bb17f94f05c6..adafc8b7f098 100644 --- a/smoke-test/spring-boot-smoke-test-pulsar/src/dockerTest/java/smoketest/pulsar/SamplePulsarApplicationTests.java +++ b/smoke-test/spring-boot-smoke-test-pulsar/src/dockerTest/java/smoketest/pulsar/SamplePulsarApplicationTests.java @@ -78,15 +78,4 @@ class ImperativePulsarApplication extends PulsarApplication { } - @Nested - @SpringBootTest - @ActiveProfiles("smoketest-pulsar-reactive") - class ReactivePulsarApplication extends PulsarApplication { - - ReactivePulsarApplication() { - super("REACTIVE"); - } - - } - } diff --git a/smoke-test/spring-boot-smoke-test-pulsar/src/main/java/smoketest/pulsar/ReactiveAppConfig.java b/smoke-test/spring-boot-smoke-test-pulsar/src/main/java/smoketest/pulsar/ReactiveAppConfig.java deleted file mode 100644 index 2a23f45bac8b..000000000000 --- a/smoke-test/spring-boot-smoke-test-pulsar/src/main/java/smoketest/pulsar/ReactiveAppConfig.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright 2012-present the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package smoketest.pulsar; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.pulsar.reactive.client.api.MessageSpec; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import org.springframework.boot.ApplicationRunner; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Profile; -import org.springframework.pulsar.core.PulsarTopic; -import org.springframework.pulsar.core.PulsarTopicBuilder; -import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener; -import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate; - -@Configuration(proxyBeanMethods = false) -@Profile("smoketest-pulsar-reactive") -class ReactiveAppConfig { - - private static final Log logger = LogFactory.getLog(ReactiveAppConfig.class); - - private static final String TOPIC = "pulsar-reactive-smoke-test-topic"; - - @Bean - PulsarTopic pulsarTestTopic() { - return new PulsarTopicBuilder().name(TOPIC).numberOfPartitions(1).build(); - } - - @Bean - ApplicationRunner sendMessagesToPulsarTopic(ReactivePulsarTemplate template) { - return (args) -> Flux.range(0, 10) - .map((i) -> new SampleMessage(i, "message:" + i)) - .map(MessageSpec::of) - .as((msgs) -> template.send(TOPIC, msgs)) - .doOnNext((sendResult) -> logger - .info("++++++PRODUCE REACTIVE:(" + sendResult.getMessageSpec().getValue().id() + ")------")) - .subscribe(); - } - - @ReactivePulsarListener(topics = TOPIC) - Mono consumeMessagesFromPulsarTopic(SampleMessage msg) { - logger.info("++++++CONSUME REACTIVE:(" + msg.id() + ")------"); - return Mono.empty(); - } - -} diff --git a/starter/spring-boot-starter-pulsar-reactive-test/build.gradle b/starter/spring-boot-starter-pulsar-reactive-test/build.gradle deleted file mode 100644 index 409b244fb475..000000000000 --- a/starter/spring-boot-starter-pulsar-reactive-test/build.gradle +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright 2012-present the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the License); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -plugins { - id "org.springframework.boot.starter" -} - -description = "Starter for testing Spring for Apache Pulsar Reactive" - -dependencies { - api(project(":starter:spring-boot-starter-pulsar-reactive")) - api(project(":starter:spring-boot-starter-test")) -} - -checkRuntimeClasspathForConflicts { - ignore { name -> name.startsWith("org/bouncycastle/") || - name.matches("^org/apache/pulsar/.*/package-info.class\$") || - name.equals("findbugsExclude.xml") } -} diff --git a/starter/spring-boot-starter-pulsar-reactive/build.gradle b/starter/spring-boot-starter-pulsar-reactive/build.gradle deleted file mode 100644 index 80fcfbc05249..000000000000 --- a/starter/spring-boot-starter-pulsar-reactive/build.gradle +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright 2012-present the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the License); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -plugins { - id "org.springframework.boot.starter" -} - -description = "Starter for using Spring for Apache Pulsar Reactive" - -dependencies { - api(project(":starter:spring-boot-starter")) - - api(project(":module:spring-boot-pulsar")) - api(project(":module:spring-boot-reactor")) - api("org.springframework.pulsar:spring-pulsar-reactive") -} - -checkRuntimeClasspathForConflicts { - ignore { name -> name.startsWith("org/bouncycastle/") || - name.matches("^org/apache/pulsar/.*/package-info.class\$") || - name.equals("findbugsExclude.xml") } -} From 5798f046f020938c0de4e3748b18559d8685933a Mon Sep 17 00:00:00 2001 From: onobc Date: Sun, 19 Oct 2025 21:12:03 -0500 Subject: [PATCH 2/3] Remove Spring Pulsar Reactive support Consolidates the PulsarConfiguration into the PulsarAutoConfiguration because there is no longer a need to factor out the common components between Spring Pulsar and Spring Pulsar Reactive. Signed-off-by: onobc --- .../PulsarAutoConfiguration.java | 154 ++++++++++++- .../autoconfigure/PulsarConfiguration.java | 207 ------------------ 2 files changed, 153 insertions(+), 208 deletions(-) delete mode 100644 module/spring-boot-pulsar/src/main/java/org/springframework/boot/pulsar/autoconfigure/PulsarConfiguration.java diff --git a/module/spring-boot-pulsar/src/main/java/org/springframework/boot/pulsar/autoconfigure/PulsarAutoConfiguration.java b/module/spring-boot-pulsar/src/main/java/org/springframework/boot/pulsar/autoconfigure/PulsarAutoConfiguration.java index 710605de4ea5..4e68ea71d63c 100644 --- a/module/spring-boot-pulsar/src/main/java/org/springframework/boot/pulsar/autoconfigure/PulsarAutoConfiguration.java +++ b/module/spring-boot-pulsar/src/main/java/org/springframework/boot/pulsar/autoconfigure/PulsarAutoConfiguration.java @@ -19,23 +19,33 @@ import java.util.ArrayList; import java.util.List; +import org.apache.pulsar.client.admin.PulsarAdminBuilder; +import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.ReaderBuilder; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.interceptor.ProducerInterceptor; +import org.apache.pulsar.common.naming.TopicDomain; +import org.apache.pulsar.common.schema.SchemaType; +import org.jspecify.annotations.Nullable; import org.springframework.beans.factory.ObjectProvider; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnBooleanProperty; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.pulsar.autoconfigure.PulsarProperties.Defaults.SchemaInfo; +import org.springframework.boot.pulsar.autoconfigure.PulsarProperties.Defaults.TypeMapping; import org.springframework.boot.thread.Threading; import org.springframework.boot.util.LambdaSafe; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; +import org.springframework.context.annotation.Scope; import org.springframework.core.env.Environment; import org.springframework.core.task.VirtualThreadTaskExecutor; import org.springframework.pulsar.annotation.EnablePulsar; @@ -44,10 +54,17 @@ import org.springframework.pulsar.config.PulsarAnnotationSupportBeanNames; import org.springframework.pulsar.core.CachingPulsarProducerFactory; import org.springframework.pulsar.core.ConsumerBuilderCustomizer; +import org.springframework.pulsar.core.DefaultPulsarClientFactory; import org.springframework.pulsar.core.DefaultPulsarConsumerFactory; import org.springframework.pulsar.core.DefaultPulsarProducerFactory; import org.springframework.pulsar.core.DefaultPulsarReaderFactory; +import org.springframework.pulsar.core.DefaultSchemaResolver; +import org.springframework.pulsar.core.DefaultTopicResolver; import org.springframework.pulsar.core.ProducerBuilderCustomizer; +import org.springframework.pulsar.core.PulsarAdminBuilderCustomizer; +import org.springframework.pulsar.core.PulsarAdministration; +import org.springframework.pulsar.core.PulsarClientBuilderCustomizer; +import org.springframework.pulsar.core.PulsarClientFactory; import org.springframework.pulsar.core.PulsarConsumerFactory; import org.springframework.pulsar.core.PulsarProducerFactory; import org.springframework.pulsar.core.PulsarReaderFactory; @@ -55,11 +72,17 @@ import org.springframework.pulsar.core.PulsarTopicBuilder; import org.springframework.pulsar.core.ReaderBuilderCustomizer; import org.springframework.pulsar.core.SchemaResolver; +import org.springframework.pulsar.core.SchemaResolver.SchemaResolverCustomizer; import org.springframework.pulsar.core.TopicResolver; +import org.springframework.pulsar.function.PulsarFunction; +import org.springframework.pulsar.function.PulsarFunctionAdministration; +import org.springframework.pulsar.function.PulsarSink; +import org.springframework.pulsar.function.PulsarSource; import org.springframework.pulsar.listener.PulsarContainerProperties; import org.springframework.pulsar.reader.PulsarReaderContainerProperties; import org.springframework.pulsar.transaction.PulsarAwareTransactionManager; import org.springframework.pulsar.transaction.PulsarTransactionManager; +import org.springframework.util.Assert; /** * {@link EnableAutoConfiguration Auto-configuration} for Apache Pulsar. @@ -73,7 +96,6 @@ */ @AutoConfiguration @ConditionalOnClass({ PulsarClient.class, PulsarTemplate.class }) -@Import(PulsarConfiguration.class) public final class PulsarAutoConfiguration { private final PulsarProperties properties; @@ -85,6 +107,136 @@ public final class PulsarAutoConfiguration { this.propertiesMapper = new PulsarPropertiesMapper(properties); } + @Bean + @ConditionalOnMissingBean(PulsarConnectionDetails.class) + PropertiesPulsarConnectionDetails pulsarConnectionDetails() { + return new PropertiesPulsarConnectionDetails(this.properties); + } + + @Bean + @ConditionalOnMissingBean(PulsarClientFactory.class) + DefaultPulsarClientFactory pulsarClientFactory(PulsarConnectionDetails connectionDetails, + ObjectProvider customizersProvider) { + List allCustomizers = new ArrayList<>(); + allCustomizers.add((builder) -> this.propertiesMapper.customizeClientBuilder(builder, connectionDetails)); + allCustomizers.addAll(customizersProvider.orderedStream().toList()); + DefaultPulsarClientFactory clientFactory = new DefaultPulsarClientFactory( + (clientBuilder) -> applyClientBuilderCustomizers(allCustomizers, clientBuilder)); + return clientFactory; + } + + private void applyClientBuilderCustomizers(List customizers, + ClientBuilder clientBuilder) { + customizers.forEach((customizer) -> customizer.customize(clientBuilder)); + } + + @Bean + @ConditionalOnMissingBean + PulsarClient pulsarClient(PulsarClientFactory clientFactory) { + return clientFactory.createClient(); + } + + @Bean + @ConditionalOnMissingBean + PulsarAdministration pulsarAdministration(PulsarConnectionDetails connectionDetails, + ObjectProvider pulsarAdminBuilderCustomizers) { + List allCustomizers = new ArrayList<>(); + allCustomizers.add((builder) -> this.propertiesMapper.customizeAdminBuilder(builder, connectionDetails)); + allCustomizers.addAll(pulsarAdminBuilderCustomizers.orderedStream().toList()); + return new PulsarAdministration((adminBuilder) -> applyAdminBuilderCustomizers(allCustomizers, adminBuilder)); + } + + private void applyAdminBuilderCustomizers(List customizers, + PulsarAdminBuilder adminBuilder) { + customizers.forEach((customizer) -> customizer.customize(adminBuilder)); + } + + @Bean + @ConditionalOnMissingBean(SchemaResolver.class) + DefaultSchemaResolver pulsarSchemaResolver(ObjectProvider> schemaResolverCustomizers) { + DefaultSchemaResolver schemaResolver = new DefaultSchemaResolver(); + addCustomSchemaMappings(schemaResolver, this.properties.getDefaults().getTypeMappings()); + applySchemaResolverCustomizers(schemaResolverCustomizers.orderedStream().toList(), schemaResolver); + return schemaResolver; + } + + private void addCustomSchemaMappings(DefaultSchemaResolver schemaResolver, + @Nullable List typeMappings) { + if (typeMappings != null) { + typeMappings.forEach((typeMapping) -> addCustomSchemaMapping(schemaResolver, typeMapping)); + } + } + + private void addCustomSchemaMapping(DefaultSchemaResolver schemaResolver, TypeMapping typeMapping) { + SchemaInfo schemaInfo = typeMapping.schemaInfo(); + if (schemaInfo != null) { + Class messageType = typeMapping.messageType(); + SchemaType schemaType = schemaInfo.schemaType(); + Class messageKeyType = schemaInfo.messageKeyType(); + Schema schema = getSchema(schemaResolver, schemaType, messageType, messageKeyType); + schemaResolver.addCustomSchemaMapping(typeMapping.messageType(), schema); + } + } + + private Schema getSchema(DefaultSchemaResolver schemaResolver, SchemaType schemaType, Class messageType, + @Nullable Class messageKeyType) { + Schema schema = schemaResolver.resolveSchema(schemaType, messageType, messageKeyType).orElseThrow(); + Assert.state(schema != null, "'schema' must not be null"); + return schema; + } + + @SuppressWarnings("unchecked") + private void applySchemaResolverCustomizers(List> customizers, + DefaultSchemaResolver schemaResolver) { + LambdaSafe.callbacks(SchemaResolverCustomizer.class, customizers, schemaResolver) + .invoke((customizer) -> customizer.customize(schemaResolver)); + } + + @Bean + @ConditionalOnMissingBean(TopicResolver.class) + DefaultTopicResolver pulsarTopicResolver() { + DefaultTopicResolver topicResolver = new DefaultTopicResolver(); + List typeMappings = this.properties.getDefaults().getTypeMappings(); + if (typeMappings != null) { + typeMappings.forEach((typeMapping) -> addCustomTopicMapping(topicResolver, typeMapping)); + } + return topicResolver; + } + + private void addCustomTopicMapping(DefaultTopicResolver topicResolver, TypeMapping typeMapping) { + String topicName = typeMapping.topicName(); + if (topicName != null) { + topicResolver.addCustomTopicMapping(typeMapping.messageType(), topicName); + } + } + + @Bean + @ConditionalOnMissingBean + @ConditionalOnBooleanProperty(name = "spring.pulsar.function.enabled", matchIfMissing = true) + PulsarFunctionAdministration pulsarFunctionAdministration(PulsarAdministration pulsarAdministration, + ObjectProvider pulsarFunctions, ObjectProvider pulsarSinks, + ObjectProvider pulsarSources) { + PulsarProperties.Function properties = this.properties.getFunction(); + return new PulsarFunctionAdministration(pulsarAdministration, pulsarFunctions, pulsarSinks, pulsarSources, + properties.isFailFast(), properties.isPropagateFailures(), properties.isPropagateStopFailures()); + } + + @Bean + @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) + @ConditionalOnMissingBean + @ConditionalOnBooleanProperty(name = "spring.pulsar.defaults.topic.enabled", matchIfMissing = true) + PulsarTopicBuilder pulsarTopicBuilder() { + return new PulsarTopicBuilder(TopicDomain.persistent, this.properties.getDefaults().getTopic().getTenant(), + this.properties.getDefaults().getTopic().getNamespace()); + } + + @Bean + @ConditionalOnMissingBean + PulsarContainerFactoryCustomizers pulsarContainerFactoryCustomizers( + ObjectProvider> customizers) { + return new PulsarContainerFactoryCustomizers(customizers.orderedStream().toList()); + } + @Bean @ConditionalOnMissingBean(PulsarProducerFactory.class) @ConditionalOnBooleanProperty(name = "spring.pulsar.producer.cache.enabled", havingValue = false) diff --git a/module/spring-boot-pulsar/src/main/java/org/springframework/boot/pulsar/autoconfigure/PulsarConfiguration.java b/module/spring-boot-pulsar/src/main/java/org/springframework/boot/pulsar/autoconfigure/PulsarConfiguration.java deleted file mode 100644 index 3e10968f9715..000000000000 --- a/module/spring-boot-pulsar/src/main/java/org/springframework/boot/pulsar/autoconfigure/PulsarConfiguration.java +++ /dev/null @@ -1,207 +0,0 @@ -/* - * Copyright 2012-present the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.boot.pulsar.autoconfigure; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.pulsar.client.admin.PulsarAdminBuilder; -import org.apache.pulsar.client.api.ClientBuilder; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.common.naming.TopicDomain; -import org.apache.pulsar.common.schema.SchemaType; -import org.jspecify.annotations.Nullable; - -import org.springframework.beans.factory.ObjectProvider; -import org.springframework.beans.factory.config.ConfigurableBeanFactory; -import org.springframework.boot.autoconfigure.condition.ConditionalOnBooleanProperty; -import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.boot.pulsar.autoconfigure.PulsarProperties.Defaults.SchemaInfo; -import org.springframework.boot.pulsar.autoconfigure.PulsarProperties.Defaults.TypeMapping; -import org.springframework.boot.util.LambdaSafe; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Scope; -import org.springframework.pulsar.core.DefaultPulsarClientFactory; -import org.springframework.pulsar.core.DefaultSchemaResolver; -import org.springframework.pulsar.core.DefaultTopicResolver; -import org.springframework.pulsar.core.PulsarAdminBuilderCustomizer; -import org.springframework.pulsar.core.PulsarAdministration; -import org.springframework.pulsar.core.PulsarClientBuilderCustomizer; -import org.springframework.pulsar.core.PulsarClientFactory; -import org.springframework.pulsar.core.PulsarTopicBuilder; -import org.springframework.pulsar.core.SchemaResolver; -import org.springframework.pulsar.core.SchemaResolver.SchemaResolverCustomizer; -import org.springframework.pulsar.core.TopicResolver; -import org.springframework.pulsar.function.PulsarFunction; -import org.springframework.pulsar.function.PulsarFunctionAdministration; -import org.springframework.pulsar.function.PulsarSink; -import org.springframework.pulsar.function.PulsarSource; -import org.springframework.util.Assert; - -/** - * Common configuration used by {@link PulsarAutoConfiguration}. - * - * @author Chris Bono - * @author Phillip Webb - */ -@Configuration(proxyBeanMethods = false) -@EnableConfigurationProperties(PulsarProperties.class) -class PulsarConfiguration { - - private final PulsarProperties properties; - - private final PulsarPropertiesMapper propertiesMapper; - - PulsarConfiguration(PulsarProperties properties) { - this.properties = properties; - this.propertiesMapper = new PulsarPropertiesMapper(properties); - } - - @Bean - @ConditionalOnMissingBean(PulsarConnectionDetails.class) - PropertiesPulsarConnectionDetails pulsarConnectionDetails() { - return new PropertiesPulsarConnectionDetails(this.properties); - } - - @Bean - @ConditionalOnMissingBean(PulsarClientFactory.class) - DefaultPulsarClientFactory pulsarClientFactory(PulsarConnectionDetails connectionDetails, - ObjectProvider customizersProvider) { - List allCustomizers = new ArrayList<>(); - allCustomizers.add((builder) -> this.propertiesMapper.customizeClientBuilder(builder, connectionDetails)); - allCustomizers.addAll(customizersProvider.orderedStream().toList()); - DefaultPulsarClientFactory clientFactory = new DefaultPulsarClientFactory( - (clientBuilder) -> applyClientBuilderCustomizers(allCustomizers, clientBuilder)); - return clientFactory; - } - - private void applyClientBuilderCustomizers(List customizers, - ClientBuilder clientBuilder) { - customizers.forEach((customizer) -> customizer.customize(clientBuilder)); - } - - @Bean - @ConditionalOnMissingBean - PulsarClient pulsarClient(PulsarClientFactory clientFactory) { - return clientFactory.createClient(); - } - - @Bean - @ConditionalOnMissingBean - PulsarAdministration pulsarAdministration(PulsarConnectionDetails connectionDetails, - ObjectProvider pulsarAdminBuilderCustomizers) { - List allCustomizers = new ArrayList<>(); - allCustomizers.add((builder) -> this.propertiesMapper.customizeAdminBuilder(builder, connectionDetails)); - allCustomizers.addAll(pulsarAdminBuilderCustomizers.orderedStream().toList()); - return new PulsarAdministration((adminBuilder) -> applyAdminBuilderCustomizers(allCustomizers, adminBuilder)); - } - - private void applyAdminBuilderCustomizers(List customizers, - PulsarAdminBuilder adminBuilder) { - customizers.forEach((customizer) -> customizer.customize(adminBuilder)); - } - - @Bean - @ConditionalOnMissingBean(SchemaResolver.class) - DefaultSchemaResolver pulsarSchemaResolver(ObjectProvider> schemaResolverCustomizers) { - DefaultSchemaResolver schemaResolver = new DefaultSchemaResolver(); - addCustomSchemaMappings(schemaResolver, this.properties.getDefaults().getTypeMappings()); - applySchemaResolverCustomizers(schemaResolverCustomizers.orderedStream().toList(), schemaResolver); - return schemaResolver; - } - - private void addCustomSchemaMappings(DefaultSchemaResolver schemaResolver, - @Nullable List typeMappings) { - if (typeMappings != null) { - typeMappings.forEach((typeMapping) -> addCustomSchemaMapping(schemaResolver, typeMapping)); - } - } - - private void addCustomSchemaMapping(DefaultSchemaResolver schemaResolver, TypeMapping typeMapping) { - SchemaInfo schemaInfo = typeMapping.schemaInfo(); - if (schemaInfo != null) { - Class messageType = typeMapping.messageType(); - SchemaType schemaType = schemaInfo.schemaType(); - Class messageKeyType = schemaInfo.messageKeyType(); - Schema schema = getSchema(schemaResolver, schemaType, messageType, messageKeyType); - schemaResolver.addCustomSchemaMapping(typeMapping.messageType(), schema); - } - } - - private Schema getSchema(DefaultSchemaResolver schemaResolver, SchemaType schemaType, Class messageType, - @Nullable Class messageKeyType) { - Schema schema = schemaResolver.resolveSchema(schemaType, messageType, messageKeyType).orElseThrow(); - Assert.state(schema != null, "'schema' must not be null"); - return schema; - } - - @SuppressWarnings("unchecked") - private void applySchemaResolverCustomizers(List> customizers, - DefaultSchemaResolver schemaResolver) { - LambdaSafe.callbacks(SchemaResolverCustomizer.class, customizers, schemaResolver) - .invoke((customizer) -> customizer.customize(schemaResolver)); - } - - @Bean - @ConditionalOnMissingBean(TopicResolver.class) - DefaultTopicResolver pulsarTopicResolver() { - DefaultTopicResolver topicResolver = new DefaultTopicResolver(); - List typeMappings = this.properties.getDefaults().getTypeMappings(); - if (typeMappings != null) { - typeMappings.forEach((typeMapping) -> addCustomTopicMapping(topicResolver, typeMapping)); - } - return topicResolver; - } - - private void addCustomTopicMapping(DefaultTopicResolver topicResolver, TypeMapping typeMapping) { - String topicName = typeMapping.topicName(); - if (topicName != null) { - topicResolver.addCustomTopicMapping(typeMapping.messageType(), topicName); - } - } - - @Bean - @ConditionalOnMissingBean - @ConditionalOnBooleanProperty(name = "spring.pulsar.function.enabled", matchIfMissing = true) - PulsarFunctionAdministration pulsarFunctionAdministration(PulsarAdministration pulsarAdministration, - ObjectProvider pulsarFunctions, ObjectProvider pulsarSinks, - ObjectProvider pulsarSources) { - PulsarProperties.Function properties = this.properties.getFunction(); - return new PulsarFunctionAdministration(pulsarAdministration, pulsarFunctions, pulsarSinks, pulsarSources, - properties.isFailFast(), properties.isPropagateFailures(), properties.isPropagateStopFailures()); - } - - @Bean - @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) - @ConditionalOnMissingBean - @ConditionalOnBooleanProperty(name = "spring.pulsar.defaults.topic.enabled", matchIfMissing = true) - PulsarTopicBuilder pulsarTopicBuilder() { - return new PulsarTopicBuilder(TopicDomain.persistent, this.properties.getDefaults().getTopic().getTenant(), - this.properties.getDefaults().getTopic().getNamespace()); - } - - @Bean - @ConditionalOnMissingBean - PulsarContainerFactoryCustomizers pulsarContainerFactoryCustomizers( - ObjectProvider> customizers) { - return new PulsarContainerFactoryCustomizers(customizers.orderedStream().toList()); - } - -} From 2bd9f058a19fe1593e665266edb799479ecb5580 Mon Sep 17 00:00:00 2001 From: onobc Date: Mon, 20 Oct 2025 09:16:13 -0500 Subject: [PATCH 3/3] Fix PulsarConfigurationTests due to previous commit consolidation. Signed-off-by: onobc --- .../PulsarAutoConfiguration.java | 3 +- .../PulsarAutoConfigurationTests.java | 376 +++++++++++++++- .../PulsarConfigurationTests.java | 421 ------------------ 3 files changed, 377 insertions(+), 423 deletions(-) delete mode 100644 module/spring-boot-pulsar/src/test/java/org/springframework/boot/pulsar/autoconfigure/PulsarConfigurationTests.java diff --git a/module/spring-boot-pulsar/src/main/java/org/springframework/boot/pulsar/autoconfigure/PulsarAutoConfiguration.java b/module/spring-boot-pulsar/src/main/java/org/springframework/boot/pulsar/autoconfigure/PulsarAutoConfiguration.java index 4e68ea71d63c..176dafe4432a 100644 --- a/module/spring-boot-pulsar/src/main/java/org/springframework/boot/pulsar/autoconfigure/PulsarAutoConfiguration.java +++ b/module/spring-boot-pulsar/src/main/java/org/springframework/boot/pulsar/autoconfigure/PulsarAutoConfiguration.java @@ -38,13 +38,13 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnBooleanProperty; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.pulsar.autoconfigure.PulsarProperties.Defaults.SchemaInfo; import org.springframework.boot.pulsar.autoconfigure.PulsarProperties.Defaults.TypeMapping; import org.springframework.boot.thread.Threading; import org.springframework.boot.util.LambdaSafe; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Import; import org.springframework.context.annotation.Scope; import org.springframework.core.env.Environment; import org.springframework.core.task.VirtualThreadTaskExecutor; @@ -96,6 +96,7 @@ */ @AutoConfiguration @ConditionalOnClass({ PulsarClient.class, PulsarTemplate.class }) +@EnableConfigurationProperties(PulsarProperties.class) public final class PulsarAutoConfiguration { private final PulsarProperties properties; diff --git a/module/spring-boot-pulsar/src/test/java/org/springframework/boot/pulsar/autoconfigure/PulsarAutoConfigurationTests.java b/module/spring-boot-pulsar/src/test/java/org/springframework/boot/pulsar/autoconfigure/PulsarAutoConfigurationTests.java index 50a5b8d1e17b..8cc83b3eb5ff 100644 --- a/module/spring-boot-pulsar/src/test/java/org/springframework/boot/pulsar/autoconfigure/PulsarAutoConfigurationTests.java +++ b/module/spring-boot-pulsar/src/test/java/org/springframework/boot/pulsar/autoconfigure/PulsarAutoConfigurationTests.java @@ -16,24 +16,36 @@ package org.springframework.boot.pulsar.autoconfigure; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import java.util.function.Consumer; import com.github.benmanes.caffeine.cache.Caffeine; +import org.apache.pulsar.client.admin.PulsarAdminBuilder; +import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.ReaderBuilder; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.interceptor.ProducerInterceptor; +import org.apache.pulsar.client.impl.AutoClusterFailover; +import org.apache.pulsar.common.schema.KeyValueEncodingType; import org.apache.pulsar.common.schema.SchemaType; +import org.assertj.core.api.InstanceOfAssertFactories; +import org.assertj.core.api.ThrowingConsumer; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.EnabledForJreRange; import org.junit.jupiter.api.condition.JRE; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.ArgumentMatchers; +import org.mockito.InOrder; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.AutoConfigurations; @@ -62,7 +74,10 @@ import org.springframework.pulsar.core.DefaultSchemaResolver; import org.springframework.pulsar.core.DefaultTopicResolver; import org.springframework.pulsar.core.ProducerBuilderCustomizer; +import org.springframework.pulsar.core.PulsarAdminBuilderCustomizer; import org.springframework.pulsar.core.PulsarAdministration; +import org.springframework.pulsar.core.PulsarClientBuilderCustomizer; +import org.springframework.pulsar.core.PulsarClientFactory; import org.springframework.pulsar.core.PulsarConsumerFactory; import org.springframework.pulsar.core.PulsarProducerFactory; import org.springframework.pulsar.core.PulsarReaderFactory; @@ -70,12 +85,16 @@ import org.springframework.pulsar.core.PulsarTopicBuilder; import org.springframework.pulsar.core.ReaderBuilderCustomizer; import org.springframework.pulsar.core.SchemaResolver; +import org.springframework.pulsar.core.SchemaResolver.SchemaResolverCustomizer; import org.springframework.pulsar.core.TopicResolver; +import org.springframework.pulsar.function.PulsarFunctionAdministration; import org.springframework.pulsar.listener.PulsarContainerProperties.TransactionSettings; import org.springframework.pulsar.transaction.PulsarAwareTransactionManager; import org.springframework.test.util.ReflectionTestUtils; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; /** @@ -123,7 +142,7 @@ void whenCustomPulsarReaderAnnotationProcessorDefinedAutoConfigurationIsSkipped( @Test void autoConfiguresBeans() { - this.contextRunner.run((context) -> assertThat(context).hasSingleBean(PulsarConfiguration.class) + this.contextRunner.run((context) -> assertThat(context).hasSingleBean(PulsarAutoConfiguration.class) .hasSingleBean(PulsarConnectionDetails.class) .hasSingleBean(DefaultPulsarClientFactory.class) .hasSingleBean(PulsarClient.class) @@ -149,6 +168,361 @@ void topicDefaultsCanBeDisabled() { .run((context) -> assertThat(context).doesNotHaveBean(PulsarTopicBuilder.class)); } + @Test + void whenHasUserDefinedConnectionDetailsBeanDoesNotAutoConfigureBean() { + PulsarConnectionDetails customConnectionDetails = mock(PulsarConnectionDetails.class); + this.contextRunner + .withBean("customPulsarConnectionDetails", PulsarConnectionDetails.class, () -> customConnectionDetails) + .run((context) -> assertThat(context).getBean(PulsarConnectionDetails.class) + .isSameAs(customConnectionDetails)); + } + + @Test + void whenHasUserDefinedContainerFactoryCustomizersBeanDoesNotAutoConfigureBean() { + PulsarContainerFactoryCustomizers customizers = mock(PulsarContainerFactoryCustomizers.class); + this.contextRunner + .withBean("customContainerFactoryCustomizers", PulsarContainerFactoryCustomizers.class, () -> customizers) + .run((context) -> assertThat(context).getBean(PulsarContainerFactoryCustomizers.class) + .isSameAs(customizers)); + } + + @Nested + class ClientTests { + + @Test + void whenHasUserDefinedClientFactoryBeanDoesNotAutoConfigureBean() { + PulsarClientFactory customFactory = mock(PulsarClientFactory.class); + // client factory 'createClient' used as input to producer factory + given(customFactory.createClient()).willReturn(mock(PulsarClient.class)); + new ApplicationContextRunner().withConfiguration(AutoConfigurations.of(PulsarAutoConfiguration.class)) + .withBean("customPulsarClientFactory", PulsarClientFactory.class, () -> customFactory) + .run((context) -> assertThat(context).getBean(PulsarClientFactory.class).isSameAs(customFactory)); + } + + @Test + void whenHasUserDefinedClientBeanDoesNotAutoConfigureBean() { + PulsarClient customClient = mock(PulsarClient.class); + new ApplicationContextRunner().withConfiguration(AutoConfigurations.of(PulsarAutoConfiguration.class)) + .withBean("customPulsarClient", PulsarClient.class, () -> customClient) + .run((context) -> assertThat(context).getBean(PulsarClient.class).isSameAs(customClient)); + } + + @Test + void whenHasUserDefinedCustomizersAppliesInCorrectOrder() { + PulsarConnectionDetails connectionDetails = mock(PulsarConnectionDetails.class); + given(connectionDetails.getBrokerUrl()).willReturn("connectiondetails"); + PulsarAutoConfigurationTests.this.contextRunner + .withUserConfiguration( + PulsarAutoConfigurationTests.ClientTests.PulsarClientBuilderCustomizersConfig.class) + .withBean(PulsarConnectionDetails.class, () -> connectionDetails) + .withPropertyValues("spring.pulsar.client.service-url=properties") + .run((context) -> { + DefaultPulsarClientFactory clientFactory = context.getBean(DefaultPulsarClientFactory.class); + Customizers customizers = Customizers + .of(ClientBuilder.class, PulsarClientBuilderCustomizer::customize); + assertThat(customizers.fromField(clientFactory, "customizer")).callsInOrder( + ClientBuilder::serviceUrl, "connectiondetails", "fromCustomizer1", "fromCustomizer2"); + }); + } + + @Test + void whenHasUserDefinedFailoverPropertiesAddsToClient() { + PulsarConnectionDetails connectionDetails = mock(PulsarConnectionDetails.class); + given(connectionDetails.getBrokerUrl()).willReturn("connectiondetails"); + PulsarAutoConfigurationTests.this.contextRunner + .withBean(PulsarConnectionDetails.class, () -> connectionDetails) + .withPropertyValues("spring.pulsar.client.service-url=properties", + "spring.pulsar.client.failover.backup-clusters[0].service-url=backup-cluster-1", + "spring.pulsar.client.failover.delay=15s", + "spring.pulsar.client.failover.switch-back-delay=30s", + "spring.pulsar.client.failover.check-interval=5s", + "spring.pulsar.client.failover.backup-clusters[1].service-url=backup-cluster-2", + "spring.pulsar.client.failover.backup-clusters[1].authentication.plugin-class-name=" + + MockAuthentication.class.getName(), + "spring.pulsar.client.failover.backup-clusters[1].authentication.param.token=1234") + .run((context) -> { + DefaultPulsarClientFactory clientFactory = context.getBean(DefaultPulsarClientFactory.class); + PulsarProperties pulsarProperties = context.getBean(PulsarProperties.class); + ClientBuilder target = mock(ClientBuilder.class); + BiConsumer customizeAction = PulsarClientBuilderCustomizer::customize; + PulsarClientBuilderCustomizer pulsarClientBuilderCustomizer = (PulsarClientBuilderCustomizer) ReflectionTestUtils + .getField(clientFactory, "customizer"); + customizeAction.accept(pulsarClientBuilderCustomizer, target); + InOrder ordered = inOrder(target); + ordered.verify(target).serviceUrlProvider(ArgumentMatchers.any(AutoClusterFailover.class)); + assertThat(pulsarProperties.getClient().getFailover().getDelay()).isEqualTo(Duration.ofSeconds(15)); + assertThat(pulsarProperties.getClient().getFailover().getSwitchBackDelay()) + .isEqualTo(Duration.ofSeconds(30)); + assertThat(pulsarProperties.getClient().getFailover().getCheckInterval()) + .isEqualTo(Duration.ofSeconds(5)); + assertThat(pulsarProperties.getClient().getFailover().getBackupClusters().size()).isEqualTo(2); + }); + } + + @TestConfiguration(proxyBeanMethods = false) + static class PulsarClientBuilderCustomizersConfig { + + @Bean + @Order(200) + PulsarClientBuilderCustomizer customizerFoo() { + return (builder) -> builder.serviceUrl("fromCustomizer2"); + } + + @Bean + @Order(100) + PulsarClientBuilderCustomizer customizerBar() { + return (builder) -> builder.serviceUrl("fromCustomizer1"); + } + + } + + } + + @Nested + class AdministrationTests { + + private final ApplicationContextRunner contextRunner = PulsarAutoConfigurationTests.this.contextRunner; + + @Test + void whenHasUserDefinedBeanDoesNotAutoConfigureBean() { + PulsarAdministration pulsarAdministration = mock(PulsarAdministration.class); + this.contextRunner + .withBean("customPulsarAdministration", PulsarAdministration.class, () -> pulsarAdministration) + .run((context) -> assertThat(context).getBean(PulsarAdministration.class) + .isSameAs(pulsarAdministration)); + } + + @Test + void whenHasUserDefinedCustomizersAppliesInCorrectOrder() { + PulsarConnectionDetails connectionDetails = mock(PulsarConnectionDetails.class); + given(connectionDetails.getAdminUrl()).willReturn("connectiondetails"); + this.contextRunner + .withUserConfiguration( + PulsarAutoConfigurationTests.AdministrationTests.PulsarAdminBuilderCustomizersConfig.class) + .withBean(PulsarConnectionDetails.class, () -> connectionDetails) + .withPropertyValues("spring.pulsar.admin.service-url=property") + .run((context) -> { + PulsarAdministration pulsarAdmin = context.getBean(PulsarAdministration.class); + Customizers customizers = Customizers + .of(PulsarAdminBuilder.class, PulsarAdminBuilderCustomizer::customize); + assertThat(customizers.fromField(pulsarAdmin, "adminCustomizers")).callsInOrder( + PulsarAdminBuilder::serviceHttpUrl, "connectiondetails", "fromCustomizer1", + "fromCustomizer2"); + }); + } + + @TestConfiguration(proxyBeanMethods = false) + static class PulsarAdminBuilderCustomizersConfig { + + @Bean + @Order(200) + PulsarAdminBuilderCustomizer customizerFoo() { + return (builder) -> builder.serviceHttpUrl("fromCustomizer2"); + } + + @Bean + @Order(100) + PulsarAdminBuilderCustomizer customizerBar() { + return (builder) -> builder.serviceHttpUrl("fromCustomizer1"); + } + + } + + } + + @Nested + class SchemaResolverTests { + + private final ApplicationContextRunner contextRunner = PulsarAutoConfigurationTests.this.contextRunner; + + @Test + void whenHasUserDefinedBeanDoesNotAutoConfigureBean() { + SchemaResolver schemaResolver = mock(SchemaResolver.class); + this.contextRunner.withBean("customSchemaResolver", SchemaResolver.class, () -> schemaResolver) + .run((context) -> assertThat(context).getBean(SchemaResolver.class).isSameAs(schemaResolver)); + } + + @Test + void whenHasUserDefinedSchemaResolverCustomizer() { + SchemaResolverCustomizer customizer = (schemaResolver) -> schemaResolver + .addCustomSchemaMapping(PulsarAutoConfigurationTests.TestRecord.class, Schema.STRING); + this.contextRunner.withBean("schemaResolverCustomizer", SchemaResolverCustomizer.class, () -> customizer) + .run((context) -> assertThat(context).getBean(DefaultSchemaResolver.class) + .satisfies(customSchemaMappingOf(PulsarAutoConfigurationTests.TestRecord.class, Schema.STRING))); + } + + @Test + void whenHasDefaultsTypeMappingForPrimitiveAddsToSchemaResolver() { + List properties = new ArrayList<>(); + properties.add("spring.pulsar.defaults.type-mappings[0].message-type=" + + PulsarAutoConfigurationTests.TestRecord.CLASS_NAME); + properties.add("spring.pulsar.defaults.type-mappings[0].schema-info.schema-type=STRING"); + this.contextRunner.withPropertyValues(properties.toArray(String[]::new)) + .run((context) -> assertThat(context).getBean(DefaultSchemaResolver.class) + .satisfies(customSchemaMappingOf(PulsarAutoConfigurationTests.TestRecord.class, Schema.STRING))); + } + + @Test + void whenHasDefaultsTypeMappingForStructAddsToSchemaResolver() { + List properties = new ArrayList<>(); + properties.add("spring.pulsar.defaults.type-mappings[0].message-type=" + + PulsarAutoConfigurationTests.TestRecord.CLASS_NAME); + properties.add("spring.pulsar.defaults.type-mappings[0].schema-info.schema-type=JSON"); + Schema expectedSchema = Schema.JSON(PulsarAutoConfigurationTests.TestRecord.class); + this.contextRunner.withPropertyValues(properties.toArray(String[]::new)) + .run((context) -> assertThat(context).getBean(DefaultSchemaResolver.class) + .satisfies(customSchemaMappingOf(PulsarAutoConfigurationTests.TestRecord.class, expectedSchema))); + } + + @Test + void whenHasDefaultsTypeMappingForKeyValueAddsToSchemaResolver() { + List properties = new ArrayList<>(); + properties.add("spring.pulsar.defaults.type-mappings[0].message-type=" + + PulsarAutoConfigurationTests.TestRecord.CLASS_NAME); + properties.add("spring.pulsar.defaults.type-mappings[0].schema-info.schema-type=key-value"); + properties.add("spring.pulsar.defaults.type-mappings[0].schema-info.message-key-type=java.lang.String"); + Schema expectedSchema = Schema.KeyValue(Schema.STRING, + Schema.JSON(PulsarAutoConfigurationTests.TestRecord.class), KeyValueEncodingType.INLINE); + this.contextRunner.withPropertyValues(properties.toArray(String[]::new)) + .run((context) -> assertThat(context).getBean(DefaultSchemaResolver.class) + .satisfies(customSchemaMappingOf(PulsarAutoConfigurationTests.TestRecord.class, expectedSchema))); + } + + private ThrowingConsumer customSchemaMappingOf(Class messageType, + Schema expectedSchema) { + return (resolver) -> assertThat(resolver.getCustomSchemaMapping(messageType)) + .hasValueSatisfying(schemaEqualTo(expectedSchema)); + } + + private Consumer> schemaEqualTo(Schema expected) { + return (actual) -> assertThat(actual.getSchemaInfo()).isEqualTo(expected.getSchemaInfo()); + } + + } + + @Nested + class TopicResolverTests { + + private final ApplicationContextRunner contextRunner = PulsarAutoConfigurationTests.this.contextRunner; + + @Test + void whenHasUserDefinedBeanDoesNotAutoConfigureBean() { + TopicResolver topicResolver = mock(TopicResolver.class); + this.contextRunner.withBean("customTopicResolver", TopicResolver.class, () -> topicResolver) + .run((context) -> assertThat(context).getBean(TopicResolver.class).isSameAs(topicResolver)); + } + + @Test + void whenHasDefaultsTypeMappingAddsToSchemaResolver() { + List properties = new ArrayList<>(); + properties.add("spring.pulsar.defaults.type-mappings[0].message-type=" + + PulsarAutoConfigurationTests.TestRecord.CLASS_NAME); + properties.add("spring.pulsar.defaults.type-mappings[0].topic-name=foo-topic"); + properties.add("spring.pulsar.defaults.type-mappings[1].message-type=java.lang.String"); + properties.add("spring.pulsar.defaults.type-mappings[1].topic-name=string-topic"); + this.contextRunner.withPropertyValues(properties.toArray(String[]::new)) + .run((context) -> assertThat(context).getBean(TopicResolver.class) + .asInstanceOf(InstanceOfAssertFactories.type(DefaultTopicResolver.class)) + .satisfies((resolver) -> { + assertThat(resolver.getCustomTopicMapping(PulsarAutoConfigurationTests.TestRecord.class)) + .hasValue("foo-topic"); + assertThat(resolver.getCustomTopicMapping(String.class)).hasValue("string-topic"); + })); + } + + } + + @Nested + class TopicBuilderTests { + + private final ApplicationContextRunner contextRunner = PulsarAutoConfigurationTests.this.contextRunner; + + @Test + void whenHasUserDefinedBeanDoesNotAutoConfigureBean() { + PulsarTopicBuilder topicBuilder = mock(PulsarTopicBuilder.class); + this.contextRunner.withBean("customPulsarTopicBuilder", PulsarTopicBuilder.class, () -> topicBuilder) + .run((context) -> assertThat(context).getBean(PulsarTopicBuilder.class).isSameAs(topicBuilder)); + } + + @Test + void whenHasDefaultsTopicDisabledPropertyDoesNotCreateBean() { + this.contextRunner.withPropertyValues("spring.pulsar.defaults.topic.enabled=false") + .run((context) -> assertThat(context).doesNotHaveBean(PulsarTopicBuilder.class)); + } + + @Test + void whenHasDefaultsTenantAndNamespaceAppliedToTopicBuilder() { + List properties = new ArrayList<>(); + properties.add("spring.pulsar.defaults.topic.tenant=my-tenant"); + properties.add("spring.pulsar.defaults.topic.namespace=my-namespace"); + this.contextRunner.withPropertyValues(properties.toArray(String[]::new)) + .run((context) -> assertThat(context).getBean(PulsarTopicBuilder.class) + .asInstanceOf(InstanceOfAssertFactories.type(PulsarTopicBuilder.class)) + .satisfies((topicBuilder) -> { + assertThat(topicBuilder).hasFieldOrPropertyWithValue("defaultTenant", "my-tenant"); + assertThat(topicBuilder).hasFieldOrPropertyWithValue("defaultNamespace", "my-namespace"); + })); + } + + @Test + void beanHasScopePrototype() { + this.contextRunner.run((context) -> assertThat(context.getBean(PulsarTopicBuilder.class)) + .isNotSameAs(context.getBean(PulsarTopicBuilder.class))); + } + + } + + @Nested + class FunctionAdministrationTests { + + private final ApplicationContextRunner contextRunner = PulsarAutoConfigurationTests.this.contextRunner; + + @Test + void whenNoPropertiesAddsFunctionAdministrationBean() { + this.contextRunner.run((context) -> assertThat(context).getBean(PulsarFunctionAdministration.class) + .hasFieldOrPropertyWithValue("failFast", Boolean.TRUE) + .hasFieldOrPropertyWithValue("propagateFailures", Boolean.TRUE) + .hasFieldOrPropertyWithValue("propagateStopFailures", Boolean.FALSE) + .hasNoNullFieldsOrProperties() // ensures object providers set + .extracting("pulsarAdministration") + .isSameAs(context.getBean(PulsarAdministration.class))); + } + + @Test + void whenHasFunctionPropertiesAppliesPropertiesToBean() { + List properties = new ArrayList<>(); + properties.add("spring.pulsar.function.fail-fast=false"); + properties.add("spring.pulsar.function.propagate-failures=false"); + properties.add("spring.pulsar.function.propagate-stop-failures=true"); + this.contextRunner.withPropertyValues(properties.toArray(String[]::new)) + .run((context) -> assertThat(context).getBean(PulsarFunctionAdministration.class) + .hasFieldOrPropertyWithValue("failFast", Boolean.FALSE) + .hasFieldOrPropertyWithValue("propagateFailures", Boolean.FALSE) + .hasFieldOrPropertyWithValue("propagateStopFailures", Boolean.TRUE)); + } + + @Test + void whenHasFunctionDisabledPropertyDoesNotCreateBean() { + this.contextRunner.withPropertyValues("spring.pulsar.function.enabled=false") + .run((context) -> assertThat(context).doesNotHaveBean(PulsarFunctionAdministration.class)); + } + + @Test + void whenHasCustomFunctionAdministrationBean() { + PulsarFunctionAdministration functionAdministration = mock(PulsarFunctionAdministration.class); + this.contextRunner.withBean(PulsarFunctionAdministration.class, () -> functionAdministration) + .run((context) -> assertThat(context).getBean(PulsarFunctionAdministration.class) + .isSameAs(functionAdministration)); + } + + } + + record TestRecord() { + + private static final String CLASS_NAME = TestRecord.class.getName(); + + } + @Nested class ProducerFactoryTests { diff --git a/module/spring-boot-pulsar/src/test/java/org/springframework/boot/pulsar/autoconfigure/PulsarConfigurationTests.java b/module/spring-boot-pulsar/src/test/java/org/springframework/boot/pulsar/autoconfigure/PulsarConfigurationTests.java deleted file mode 100644 index 1f37f474b142..000000000000 --- a/module/spring-boot-pulsar/src/test/java/org/springframework/boot/pulsar/autoconfigure/PulsarConfigurationTests.java +++ /dev/null @@ -1,421 +0,0 @@ -/* - * Copyright 2012-present the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.boot.pulsar.autoconfigure; - -import java.time.Duration; -import java.util.ArrayList; -import java.util.List; -import java.util.function.BiConsumer; -import java.util.function.Consumer; - -import org.apache.pulsar.client.admin.PulsarAdminBuilder; -import org.apache.pulsar.client.api.ClientBuilder; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.impl.AutoClusterFailover; -import org.apache.pulsar.common.schema.KeyValueEncodingType; -import org.assertj.core.api.InstanceOfAssertFactories; -import org.assertj.core.api.ThrowingConsumer; -import org.junit.jupiter.api.Nested; -import org.junit.jupiter.api.Test; -import org.mockito.ArgumentMatchers; -import org.mockito.InOrder; - -import org.springframework.boot.autoconfigure.AutoConfigurations; -import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.boot.test.context.runner.ApplicationContextRunner; -import org.springframework.context.annotation.Bean; -import org.springframework.core.annotation.Order; -import org.springframework.pulsar.core.DefaultPulsarClientFactory; -import org.springframework.pulsar.core.DefaultSchemaResolver; -import org.springframework.pulsar.core.DefaultTopicResolver; -import org.springframework.pulsar.core.PulsarAdminBuilderCustomizer; -import org.springframework.pulsar.core.PulsarAdministration; -import org.springframework.pulsar.core.PulsarClientBuilderCustomizer; -import org.springframework.pulsar.core.PulsarClientFactory; -import org.springframework.pulsar.core.PulsarTopicBuilder; -import org.springframework.pulsar.core.SchemaResolver; -import org.springframework.pulsar.core.SchemaResolver.SchemaResolverCustomizer; -import org.springframework.pulsar.core.TopicResolver; -import org.springframework.pulsar.function.PulsarFunctionAdministration; -import org.springframework.test.util.ReflectionTestUtils; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.BDDMockito.given; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.mock; - -/** - * Tests for {@link PulsarConfiguration}. - * - * @author Chris Bono - * @author Alexander Preuß - * @author Soby Chacko - * @author Phillip Webb - * @author Swamy Mavuri - */ -class PulsarConfigurationTests { - - private final ApplicationContextRunner contextRunner = new ApplicationContextRunner() - .withConfiguration(AutoConfigurations.of(PulsarConfiguration.class)) - .withBean(PulsarClient.class, () -> mock(PulsarClient.class)); - - @Test - void whenHasUserDefinedConnectionDetailsBeanDoesNotAutoConfigureBean() { - PulsarConnectionDetails customConnectionDetails = mock(PulsarConnectionDetails.class); - this.contextRunner - .withBean("customPulsarConnectionDetails", PulsarConnectionDetails.class, () -> customConnectionDetails) - .run((context) -> assertThat(context).getBean(PulsarConnectionDetails.class) - .isSameAs(customConnectionDetails)); - } - - @Test - void whenHasUserDefinedContainerFactoryCustomizersBeanDoesNotAutoConfigureBean() { - PulsarContainerFactoryCustomizers customizers = mock(PulsarContainerFactoryCustomizers.class); - this.contextRunner - .withBean("customContainerFactoryCustomizers", PulsarContainerFactoryCustomizers.class, () -> customizers) - .run((context) -> assertThat(context).getBean(PulsarContainerFactoryCustomizers.class) - .isSameAs(customizers)); - } - - @Nested - class ClientTests { - - @Test - void whenHasUserDefinedClientFactoryBeanDoesNotAutoConfigureBean() { - PulsarClientFactory customFactory = mock(PulsarClientFactory.class); - new ApplicationContextRunner().withConfiguration(AutoConfigurations.of(PulsarConfiguration.class)) - .withBean("customPulsarClientFactory", PulsarClientFactory.class, () -> customFactory) - .run((context) -> assertThat(context).getBean(PulsarClientFactory.class).isSameAs(customFactory)); - } - - @Test - void whenHasUserDefinedClientBeanDoesNotAutoConfigureBean() { - PulsarClient customClient = mock(PulsarClient.class); - new ApplicationContextRunner().withConfiguration(AutoConfigurations.of(PulsarConfiguration.class)) - .withBean("customPulsarClient", PulsarClient.class, () -> customClient) - .run((context) -> assertThat(context).getBean(PulsarClient.class).isSameAs(customClient)); - } - - @Test - void whenHasUserDefinedCustomizersAppliesInCorrectOrder() { - PulsarConnectionDetails connectionDetails = mock(PulsarConnectionDetails.class); - given(connectionDetails.getBrokerUrl()).willReturn("connectiondetails"); - PulsarConfigurationTests.this.contextRunner - .withUserConfiguration(PulsarClientBuilderCustomizersConfig.class) - .withBean(PulsarConnectionDetails.class, () -> connectionDetails) - .withPropertyValues("spring.pulsar.client.service-url=properties") - .run((context) -> { - DefaultPulsarClientFactory clientFactory = context.getBean(DefaultPulsarClientFactory.class); - Customizers customizers = Customizers - .of(ClientBuilder.class, PulsarClientBuilderCustomizer::customize); - assertThat(customizers.fromField(clientFactory, "customizer")).callsInOrder( - ClientBuilder::serviceUrl, "connectiondetails", "fromCustomizer1", "fromCustomizer2"); - }); - } - - @Test - void whenHasUserDefinedFailoverPropertiesAddsToClient() { - PulsarConnectionDetails connectionDetails = mock(PulsarConnectionDetails.class); - given(connectionDetails.getBrokerUrl()).willReturn("connectiondetails"); - PulsarConfigurationTests.this.contextRunner.withBean(PulsarConnectionDetails.class, () -> connectionDetails) - .withPropertyValues("spring.pulsar.client.service-url=properties", - "spring.pulsar.client.failover.backup-clusters[0].service-url=backup-cluster-1", - "spring.pulsar.client.failover.delay=15s", - "spring.pulsar.client.failover.switch-back-delay=30s", - "spring.pulsar.client.failover.check-interval=5s", - "spring.pulsar.client.failover.backup-clusters[1].service-url=backup-cluster-2", - "spring.pulsar.client.failover.backup-clusters[1].authentication.plugin-class-name=" - + MockAuthentication.class.getName(), - "spring.pulsar.client.failover.backup-clusters[1].authentication.param.token=1234") - .run((context) -> { - DefaultPulsarClientFactory clientFactory = context.getBean(DefaultPulsarClientFactory.class); - PulsarProperties pulsarProperties = context.getBean(PulsarProperties.class); - ClientBuilder target = mock(ClientBuilder.class); - BiConsumer customizeAction = PulsarClientBuilderCustomizer::customize; - PulsarClientBuilderCustomizer pulsarClientBuilderCustomizer = (PulsarClientBuilderCustomizer) ReflectionTestUtils - .getField(clientFactory, "customizer"); - customizeAction.accept(pulsarClientBuilderCustomizer, target); - InOrder ordered = inOrder(target); - ordered.verify(target).serviceUrlProvider(ArgumentMatchers.any(AutoClusterFailover.class)); - assertThat(pulsarProperties.getClient().getFailover().getDelay()).isEqualTo(Duration.ofSeconds(15)); - assertThat(pulsarProperties.getClient().getFailover().getSwitchBackDelay()) - .isEqualTo(Duration.ofSeconds(30)); - assertThat(pulsarProperties.getClient().getFailover().getCheckInterval()) - .isEqualTo(Duration.ofSeconds(5)); - assertThat(pulsarProperties.getClient().getFailover().getBackupClusters().size()).isEqualTo(2); - }); - } - - @TestConfiguration(proxyBeanMethods = false) - static class PulsarClientBuilderCustomizersConfig { - - @Bean - @Order(200) - PulsarClientBuilderCustomizer customizerFoo() { - return (builder) -> builder.serviceUrl("fromCustomizer2"); - } - - @Bean - @Order(100) - PulsarClientBuilderCustomizer customizerBar() { - return (builder) -> builder.serviceUrl("fromCustomizer1"); - } - - } - - } - - @Nested - class AdministrationTests { - - private final ApplicationContextRunner contextRunner = PulsarConfigurationTests.this.contextRunner; - - @Test - void whenHasUserDefinedBeanDoesNotAutoConfigureBean() { - PulsarAdministration pulsarAdministration = mock(PulsarAdministration.class); - this.contextRunner - .withBean("customPulsarAdministration", PulsarAdministration.class, () -> pulsarAdministration) - .run((context) -> assertThat(context).getBean(PulsarAdministration.class) - .isSameAs(pulsarAdministration)); - } - - @Test - void whenHasUserDefinedCustomizersAppliesInCorrectOrder() { - PulsarConnectionDetails connectionDetails = mock(PulsarConnectionDetails.class); - given(connectionDetails.getAdminUrl()).willReturn("connectiondetails"); - this.contextRunner.withUserConfiguration(PulsarAdminBuilderCustomizersConfig.class) - .withBean(PulsarConnectionDetails.class, () -> connectionDetails) - .withPropertyValues("spring.pulsar.admin.service-url=property") - .run((context) -> { - PulsarAdministration pulsarAdmin = context.getBean(PulsarAdministration.class); - Customizers customizers = Customizers - .of(PulsarAdminBuilder.class, PulsarAdminBuilderCustomizer::customize); - assertThat(customizers.fromField(pulsarAdmin, "adminCustomizers")).callsInOrder( - PulsarAdminBuilder::serviceHttpUrl, "connectiondetails", "fromCustomizer1", - "fromCustomizer2"); - }); - } - - @TestConfiguration(proxyBeanMethods = false) - static class PulsarAdminBuilderCustomizersConfig { - - @Bean - @Order(200) - PulsarAdminBuilderCustomizer customizerFoo() { - return (builder) -> builder.serviceHttpUrl("fromCustomizer2"); - } - - @Bean - @Order(100) - PulsarAdminBuilderCustomizer customizerBar() { - return (builder) -> builder.serviceHttpUrl("fromCustomizer1"); - } - - } - - } - - @Nested - class SchemaResolverTests { - - private final ApplicationContextRunner contextRunner = PulsarConfigurationTests.this.contextRunner; - - @Test - void whenHasUserDefinedBeanDoesNotAutoConfigureBean() { - SchemaResolver schemaResolver = mock(SchemaResolver.class); - this.contextRunner.withBean("customSchemaResolver", SchemaResolver.class, () -> schemaResolver) - .run((context) -> assertThat(context).getBean(SchemaResolver.class).isSameAs(schemaResolver)); - } - - @Test - void whenHasUserDefinedSchemaResolverCustomizer() { - SchemaResolverCustomizer customizer = (schemaResolver) -> schemaResolver - .addCustomSchemaMapping(TestRecord.class, Schema.STRING); - this.contextRunner.withBean("schemaResolverCustomizer", SchemaResolverCustomizer.class, () -> customizer) - .run((context) -> assertThat(context).getBean(DefaultSchemaResolver.class) - .satisfies(customSchemaMappingOf(TestRecord.class, Schema.STRING))); - } - - @Test - void whenHasDefaultsTypeMappingForPrimitiveAddsToSchemaResolver() { - List properties = new ArrayList<>(); - properties.add("spring.pulsar.defaults.type-mappings[0].message-type=" + TestRecord.CLASS_NAME); - properties.add("spring.pulsar.defaults.type-mappings[0].schema-info.schema-type=STRING"); - this.contextRunner.withPropertyValues(properties.toArray(String[]::new)) - .run((context) -> assertThat(context).getBean(DefaultSchemaResolver.class) - .satisfies(customSchemaMappingOf(TestRecord.class, Schema.STRING))); - } - - @Test - void whenHasDefaultsTypeMappingForStructAddsToSchemaResolver() { - List properties = new ArrayList<>(); - properties.add("spring.pulsar.defaults.type-mappings[0].message-type=" + TestRecord.CLASS_NAME); - properties.add("spring.pulsar.defaults.type-mappings[0].schema-info.schema-type=JSON"); - Schema expectedSchema = Schema.JSON(TestRecord.class); - this.contextRunner.withPropertyValues(properties.toArray(String[]::new)) - .run((context) -> assertThat(context).getBean(DefaultSchemaResolver.class) - .satisfies(customSchemaMappingOf(TestRecord.class, expectedSchema))); - } - - @Test - void whenHasDefaultsTypeMappingForKeyValueAddsToSchemaResolver() { - List properties = new ArrayList<>(); - properties.add("spring.pulsar.defaults.type-mappings[0].message-type=" + TestRecord.CLASS_NAME); - properties.add("spring.pulsar.defaults.type-mappings[0].schema-info.schema-type=key-value"); - properties.add("spring.pulsar.defaults.type-mappings[0].schema-info.message-key-type=java.lang.String"); - Schema expectedSchema = Schema.KeyValue(Schema.STRING, Schema.JSON(TestRecord.class), - KeyValueEncodingType.INLINE); - this.contextRunner.withPropertyValues(properties.toArray(String[]::new)) - .run((context) -> assertThat(context).getBean(DefaultSchemaResolver.class) - .satisfies(customSchemaMappingOf(TestRecord.class, expectedSchema))); - } - - private ThrowingConsumer customSchemaMappingOf(Class messageType, - Schema expectedSchema) { - return (resolver) -> assertThat(resolver.getCustomSchemaMapping(messageType)) - .hasValueSatisfying(schemaEqualTo(expectedSchema)); - } - - private Consumer> schemaEqualTo(Schema expected) { - return (actual) -> assertThat(actual.getSchemaInfo()).isEqualTo(expected.getSchemaInfo()); - } - - } - - @Nested - class TopicResolverTests { - - private final ApplicationContextRunner contextRunner = PulsarConfigurationTests.this.contextRunner; - - @Test - void whenHasUserDefinedBeanDoesNotAutoConfigureBean() { - TopicResolver topicResolver = mock(TopicResolver.class); - this.contextRunner.withBean("customTopicResolver", TopicResolver.class, () -> topicResolver) - .run((context) -> assertThat(context).getBean(TopicResolver.class).isSameAs(topicResolver)); - } - - @Test - void whenHasDefaultsTypeMappingAddsToSchemaResolver() { - List properties = new ArrayList<>(); - properties.add("spring.pulsar.defaults.type-mappings[0].message-type=" + TestRecord.CLASS_NAME); - properties.add("spring.pulsar.defaults.type-mappings[0].topic-name=foo-topic"); - properties.add("spring.pulsar.defaults.type-mappings[1].message-type=java.lang.String"); - properties.add("spring.pulsar.defaults.type-mappings[1].topic-name=string-topic"); - this.contextRunner.withPropertyValues(properties.toArray(String[]::new)) - .run((context) -> assertThat(context).getBean(TopicResolver.class) - .asInstanceOf(InstanceOfAssertFactories.type(DefaultTopicResolver.class)) - .satisfies((resolver) -> { - assertThat(resolver.getCustomTopicMapping(TestRecord.class)).hasValue("foo-topic"); - assertThat(resolver.getCustomTopicMapping(String.class)).hasValue("string-topic"); - })); - } - - } - - @Nested - class TopicBuilderTests { - - private final ApplicationContextRunner contextRunner = PulsarConfigurationTests.this.contextRunner; - - @Test - void whenHasUserDefinedBeanDoesNotAutoConfigureBean() { - PulsarTopicBuilder topicBuilder = mock(PulsarTopicBuilder.class); - this.contextRunner.withBean("customPulsarTopicBuilder", PulsarTopicBuilder.class, () -> topicBuilder) - .run((context) -> assertThat(context).getBean(PulsarTopicBuilder.class).isSameAs(topicBuilder)); - } - - @Test - void whenHasDefaultsTopicDisabledPropertyDoesNotCreateBean() { - this.contextRunner.withPropertyValues("spring.pulsar.defaults.topic.enabled=false") - .run((context) -> assertThat(context).doesNotHaveBean(PulsarTopicBuilder.class)); - } - - @Test - void whenHasDefaultsTenantAndNamespaceAppliedToTopicBuilder() { - List properties = new ArrayList<>(); - properties.add("spring.pulsar.defaults.topic.tenant=my-tenant"); - properties.add("spring.pulsar.defaults.topic.namespace=my-namespace"); - this.contextRunner.withPropertyValues(properties.toArray(String[]::new)) - .run((context) -> assertThat(context).getBean(PulsarTopicBuilder.class) - .asInstanceOf(InstanceOfAssertFactories.type(PulsarTopicBuilder.class)) - .satisfies((topicBuilder) -> { - assertThat(topicBuilder).hasFieldOrPropertyWithValue("defaultTenant", "my-tenant"); - assertThat(topicBuilder).hasFieldOrPropertyWithValue("defaultNamespace", "my-namespace"); - })); - } - - @Test - void beanHasScopePrototype() { - this.contextRunner.run((context) -> assertThat(context.getBean(PulsarTopicBuilder.class)) - .isNotSameAs(context.getBean(PulsarTopicBuilder.class))); - } - - } - - @Nested - class FunctionAdministrationTests { - - private final ApplicationContextRunner contextRunner = PulsarConfigurationTests.this.contextRunner; - - @Test - void whenNoPropertiesAddsFunctionAdministrationBean() { - this.contextRunner.run((context) -> assertThat(context).getBean(PulsarFunctionAdministration.class) - .hasFieldOrPropertyWithValue("failFast", Boolean.TRUE) - .hasFieldOrPropertyWithValue("propagateFailures", Boolean.TRUE) - .hasFieldOrPropertyWithValue("propagateStopFailures", Boolean.FALSE) - .hasNoNullFieldsOrProperties() // ensures object providers set - .extracting("pulsarAdministration") - .isSameAs(context.getBean(PulsarAdministration.class))); - } - - @Test - void whenHasFunctionPropertiesAppliesPropertiesToBean() { - List properties = new ArrayList<>(); - properties.add("spring.pulsar.function.fail-fast=false"); - properties.add("spring.pulsar.function.propagate-failures=false"); - properties.add("spring.pulsar.function.propagate-stop-failures=true"); - this.contextRunner.withPropertyValues(properties.toArray(String[]::new)) - .run((context) -> assertThat(context).getBean(PulsarFunctionAdministration.class) - .hasFieldOrPropertyWithValue("failFast", Boolean.FALSE) - .hasFieldOrPropertyWithValue("propagateFailures", Boolean.FALSE) - .hasFieldOrPropertyWithValue("propagateStopFailures", Boolean.TRUE)); - } - - @Test - void whenHasFunctionDisabledPropertyDoesNotCreateBean() { - this.contextRunner.withPropertyValues("spring.pulsar.function.enabled=false") - .run((context) -> assertThat(context).doesNotHaveBean(PulsarFunctionAdministration.class)); - } - - @Test - void whenHasCustomFunctionAdministrationBean() { - PulsarFunctionAdministration functionAdministration = mock(PulsarFunctionAdministration.class); - this.contextRunner.withBean(PulsarFunctionAdministration.class, () -> functionAdministration) - .run((context) -> assertThat(context).getBean(PulsarFunctionAdministration.class) - .isSameAs(functionAdministration)); - } - - } - - record TestRecord() { - - private static final String CLASS_NAME = TestRecord.class.getName(); - - } - -}