From 7d6a87f3aa9f5793bd33262f9180803f83048e94 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Thu, 13 Jun 2019 16:03:04 -0400 Subject: [PATCH 1/4] Reactive @KafkaListener --- samples/sample-04/kafka-4-reactor/.gitignore | 29 +++++ samples/sample-04/kafka-4-reactor/pom.xml | 108 ++++++++++++++++++ .../com/example/Kafka4ReactorApplication.java | 77 +++++++++++++ .../src/main/resources/application.properties | 3 + .../Kafka4ReactorApplicationTests.java | 13 +++ ...kaListenerAnnotationBeanPostProcessor.java | 12 ++ .../config/KafkaListenerEndpointRegistry.java | 4 + .../listener/reactive/ReactorAdapter.java | 101 ++++++++++++++++ .../kafka/listener/reactive/package-info.java | 4 + 9 files changed, 351 insertions(+) create mode 100644 samples/sample-04/kafka-4-reactor/.gitignore create mode 100644 samples/sample-04/kafka-4-reactor/pom.xml create mode 100644 samples/sample-04/kafka-4-reactor/src/main/java/com/example/Kafka4ReactorApplication.java create mode 100644 samples/sample-04/kafka-4-reactor/src/main/resources/application.properties create mode 100644 samples/sample-04/kafka-4-reactor/src/test/java/com/example/Kafka4ReactorApplicationTests.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/listener/reactive/ReactorAdapter.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/listener/reactive/package-info.java diff --git a/samples/sample-04/kafka-4-reactor/.gitignore b/samples/sample-04/kafka-4-reactor/.gitignore new file mode 100644 index 0000000000..153c9335eb --- /dev/null +++ b/samples/sample-04/kafka-4-reactor/.gitignore @@ -0,0 +1,29 @@ +HELP.md +/target/ +!.mvn/wrapper/maven-wrapper.jar + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +/build/ + +### VS Code ### +.vscode/ diff --git a/samples/sample-04/kafka-4-reactor/pom.xml b/samples/sample-04/kafka-4-reactor/pom.xml new file mode 100644 index 0000000000..4257a4a849 --- /dev/null +++ b/samples/sample-04/kafka-4-reactor/pom.xml @@ -0,0 +1,108 @@ + + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 2.2.0.BUILD-SNAPSHOT + + + net.gprussell + kafka-4-reactor + 0.0.1-SNAPSHOT + kafka-4-reactor + cdc + + + 1.8 + + + + + org.springframework.boot + spring-boot-starter + + + org.springframework.kafka + spring-kafka + 2.3.0.BUILD-SNAPSHOT + + + + org.springframework.boot + spring-boot-starter-test + test + + + org.junit.vintage + junit-vintage-engine + + + junit + junit + + + + + org.springframework.kafka + spring-kafka-test + test + + + io.projectreactor + reactor-core + + + io.projectreactor.kafka + reactor-kafka + 1.2.0.BUILD-SNAPSHOT + + + io.projectreactor + reactor-test + test + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + + + + spring-snapshots + Spring Snapshots + https://repo.spring.io/libs-snapshot + + true + + + + spring-milestones + Spring Milestones + https://repo.spring.io/milestone + + + + + spring-snapshots + Spring Snapshots + https://repo.spring.io/snapshot + + true + + + + spring-milestones + Spring Milestones + https://repo.spring.io/milestone + + + + diff --git a/samples/sample-04/kafka-4-reactor/src/main/java/com/example/Kafka4ReactorApplication.java b/samples/sample-04/kafka-4-reactor/src/main/java/com/example/Kafka4ReactorApplication.java new file mode 100644 index 0000000000..445a573fd1 --- /dev/null +++ b/samples/sample-04/kafka-4-reactor/src/main/java/com/example/Kafka4ReactorApplication.java @@ -0,0 +1,77 @@ +package com.example; + +import java.lang.reflect.Method; +import java.util.stream.IntStream; + +import org.apache.kafka.clients.admin.NewTopic; + +import org.springframework.boot.ApplicationRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.config.TopicBuilder; +import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate; +import org.springframework.kafka.listener.reactive.ReactorAdapter; +import org.springframework.stereotype.Component; + +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.kafka.receiver.ReceiverRecord; +import reactor.kafka.sender.SenderOptions; +import reactor.kafka.sender.SenderResult; + +@SpringBootApplication +public class Kafka4ReactorApplication { + + public static void main(String[] args) { + SpringApplication.run(Kafka4ReactorApplication.class, args); + } + + @Bean + public ReactorAdapter adapter(Listener listener, KafkaProperties properties) + throws NoSuchMethodException, SecurityException { + + // This will be done by the bean post processor + Method method = Listener.class.getDeclaredMethod("listen", Flux.class); + ReactorAdapter adapter = new ReactorAdapter(listener, method, "skReactorTopic"); + adapter.setConfigs(properties.buildConsumerProperties()); + return adapter; + } + + @Bean + public ReactiveKafkaProducerTemplate template(KafkaProperties properties) { + SenderOptions senderOptions = SenderOptions.create(properties.buildProducerProperties()); + return new ReactiveKafkaProducerTemplate<>(senderOptions); + } + + @Bean + public NewTopic topic() { + return TopicBuilder.name("skReactorTopic").partitions(1).replicas(1).build(); + } + + @Bean + public ApplicationRunner runner(ReactiveKafkaProducerTemplate template) { + return args -> IntStream.range(0, 10).forEach(i -> { + Mono> send = template.send("skReactorTopic", "foo", "bar" + i); + send.subscribe(sr -> System.out.println(sr.recordMetadata())); + }); + } + +} + +@Component +class Listener { + + @KafkaListener(topics = "skReactorTopic") + public Disposable listen(Flux> flux) { + return flux.subscribe(record -> { + System.out.println(record.key() + ":" + record.value() + "@" + record.offset()); + record.receiverOffset().acknowledge(); + }); + } + +} + diff --git a/samples/sample-04/kafka-4-reactor/src/main/resources/application.properties b/samples/sample-04/kafka-4-reactor/src/main/resources/application.properties new file mode 100644 index 0000000000..5e3bd48646 --- /dev/null +++ b/samples/sample-04/kafka-4-reactor/src/main/resources/application.properties @@ -0,0 +1,3 @@ +spring.kafka.consumer.enable-auto-commit=false +spring.kafka.consumer.auto-offset-reset=earliest +spring.kafka.consumer.group-id=skReactor diff --git a/samples/sample-04/kafka-4-reactor/src/test/java/com/example/Kafka4ReactorApplicationTests.java b/samples/sample-04/kafka-4-reactor/src/test/java/com/example/Kafka4ReactorApplicationTests.java new file mode 100644 index 0000000000..1e3ff0db8c --- /dev/null +++ b/samples/sample-04/kafka-4-reactor/src/test/java/com/example/Kafka4ReactorApplicationTests.java @@ -0,0 +1,13 @@ +package com.example; + +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; + +@SpringBootTest +class Kafka4ReactorApplicationTests { + + @Test + void contextLoads() { + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java index 4af17e11b4..95c507cf9a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java @@ -27,6 +27,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; @@ -90,6 +91,8 @@ import org.springframework.util.StringUtils; import org.springframework.validation.Validator; +import reactor.core.publisher.Flux; + /** * Bean post-processor that registers methods annotated with {@link KafkaListener} * to be invoked by a Kafka message listener container created under the covers @@ -288,6 +291,15 @@ public Object postProcessAfterInitialization(final Object bean, final String bea AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null); multiMethods.addAll(methodsWithHandler); } + // TODO + Iterator iterator = annotatedMethods.keySet().iterator(); + while (iterator.hasNext()) { + Method next = iterator.next(); + if (next.getParameterTypes()[0].equals(Flux.class)) { + annotatedMethods.remove(next); + } + } + // end TODO if (annotatedMethods.isEmpty()) { this.nonAnnotatedClasses.add(bean.getClass()); this.logger.trace(() -> "No @KafkaListener annotations found on bean type: " + bean.getClass()); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java index 923e5cecea..0998cf598a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java @@ -269,6 +269,10 @@ public void stop() { @Override public void stop(Runnable callback) { Collection listenerContainersToStop = getListenerContainers(); + if (listenerContainersToStop.size() == 0) { + callback.run(); + return; + } AggregatingCallback aggregatingCallback = new AggregatingCallback(listenerContainersToStop.size(), callback); for (MessageListenerContainer listenerContainer : listenerContainersToStop) { if (listenerContainer.isRunning()) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/reactive/ReactorAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/reactive/ReactorAdapter.java new file mode 100644 index 0000000000..a7cc4a4bc8 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/reactive/ReactorAdapter.java @@ -0,0 +1,101 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener.reactive; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.springframework.context.SmartLifecycle; +import org.springframework.kafka.annotation.KafkaListener; + +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.kafka.receiver.KafkaReceiver; +import reactor.kafka.receiver.ReceiverOptions; + +/** + * Adapter for {@link KafkaListener} with a {@link Flux} parameter. + * + * @author Gary Russell + * @since 2.3 + * + */ +public class ReactorAdapter implements SmartLifecycle { + + private final Object bean; + + private final Method method; + + private final List topics; + + private final Map configs = new HashMap<>(); + + private Disposable disposable; + + private volatile boolean running; + + public ReactorAdapter(Object bean, Method method, String... topics) { + this.bean = bean; + this.method = method; + this.method.setAccessible(true); + this.topics = Arrays.asList(topics); + } + + public void setConfigs(Map configs) { + this.configs.putAll(configs); + } + + @Override + public void start() { + ReceiverOptions options = ReceiverOptions.create(this.configs) + .subscription(this.topics); + Flux flux = KafkaReceiver.create(options).receive(); + /* + * If the method's parameter is not a ReceiverRecord, we could map + * record.value() using Spring's ConversionService. + */ + try { + Object result = this.method.invoke(this.bean, flux); + if (result instanceof Disposable) { + this.disposable = (Disposable) result; + } + this.running = true; + } + catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { + e.printStackTrace(); + } + } + + @Override + public void stop() { + if (this.disposable != null && !this.disposable.isDisposed()) { + this.disposable.dispose(); + this.disposable = null; + } + this.running = false; + } + + @Override + public boolean isRunning() { + return this.running; + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/reactive/package-info.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/reactive/package-info.java new file mode 100644 index 0000000000..f2c2876d6e --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/reactive/package-info.java @@ -0,0 +1,4 @@ +/** + * Provides classes for reactive listeners. + */ +package org.springframework.kafka.listener.reactive; From 833e1e4ea2107f54c76afb29ec7be0969add690c Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Fri, 14 Jun 2019 10:46:31 -0400 Subject: [PATCH 2/4] Polish comment --- .../kafka/listener/reactive/ReactorAdapter.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/reactive/ReactorAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/reactive/ReactorAdapter.java index a7cc4a4bc8..32fe2ad067 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/reactive/ReactorAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/reactive/ReactorAdapter.java @@ -69,8 +69,10 @@ public void start() { .subscription(this.topics); Flux flux = KafkaReceiver.create(options).receive(); /* - * If the method's parameter is not a ReceiverRecord, we could map + * If the method's parameter is not a Flux, we could map * record.value() using Spring's ConversionService. + * We could also, say, have Flux> and map + * the key/value (again with conversion if needed). */ try { Object result = this.method.invoke(this.bean, flux); From 2649b725b0ca13ffe83bb9b83aef9a7e9267a08d Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Mon, 17 Jun 2019 12:23:19 -0400 Subject: [PATCH 3/4] PR Comments; return `Mono` instead of `Disposable`. --- .../com/example/Kafka4ReactorApplication.java | 23 +++++++++++++++---- .../listener/reactive/ReactorAdapter.java | 5 ++-- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/samples/sample-04/kafka-4-reactor/src/main/java/com/example/Kafka4ReactorApplication.java b/samples/sample-04/kafka-4-reactor/src/main/java/com/example/Kafka4ReactorApplication.java index 445a573fd1..92239ace4d 100644 --- a/samples/sample-04/kafka-4-reactor/src/main/java/com/example/Kafka4ReactorApplication.java +++ b/samples/sample-04/kafka-4-reactor/src/main/java/com/example/Kafka4ReactorApplication.java @@ -1,3 +1,19 @@ +/* + * Copyright 2018-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.example; import java.lang.reflect.Method; @@ -16,7 +32,6 @@ import org.springframework.kafka.listener.reactive.ReactorAdapter; import org.springframework.stereotype.Component; -import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.kafka.receiver.ReceiverRecord; @@ -66,11 +81,11 @@ public ApplicationRunner runner(ReactiveKafkaProducerTemplate te class Listener { @KafkaListener(topics = "skReactorTopic") - public Disposable listen(Flux> flux) { - return flux.subscribe(record -> { + public Mono listen(Flux> flux) { + return flux.doOnNext(record -> { System.out.println(record.key() + ":" + record.value() + "@" + record.offset()); record.receiverOffset().acknowledge(); - }); + }).then(); } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/reactive/ReactorAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/reactive/ReactorAdapter.java index 32fe2ad067..3b93d24bbd 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/reactive/ReactorAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/reactive/ReactorAdapter.java @@ -28,6 +28,7 @@ import reactor.core.Disposable; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import reactor.kafka.receiver.KafkaReceiver; import reactor.kafka.receiver.ReceiverOptions; @@ -76,8 +77,8 @@ public void start() { */ try { Object result = this.method.invoke(this.bean, flux); - if (result instanceof Disposable) { - this.disposable = (Disposable) result; + if (result instanceof Mono) { + this.disposable = ((Mono) result).subscribe(); } this.running = true; } From ca26e06c5c25fe527fdc910fd98b79a6b3ee2293 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Mon, 17 Jun 2019 13:56:27 -0400 Subject: [PATCH 4/4] Support concurrency, group.id, client.id and create the adapters in the registry. --- .../com/example/Kafka4ReactorApplication.java | 34 +++++------ ...kaListenerAnnotationBeanPostProcessor.java | 12 ++-- ...AbstractKafkaListenerContainerFactory.java | 1 + .../config/AbstractKafkaListenerEndpoint.java | 11 ++++ .../config/KafkaListenerContainerFactory.java | 5 ++ .../kafka/config/KafkaListenerEndpoint.java | 4 ++ .../config/KafkaListenerEndpointRegistry.java | 61 ++++++++++++++++++- .../listener/reactive/ReactorAdapter.java | 38 +++++++++++- 8 files changed, 138 insertions(+), 28 deletions(-) diff --git a/samples/sample-04/kafka-4-reactor/src/main/java/com/example/Kafka4ReactorApplication.java b/samples/sample-04/kafka-4-reactor/src/main/java/com/example/Kafka4ReactorApplication.java index 92239ace4d..c32b6723e3 100644 --- a/samples/sample-04/kafka-4-reactor/src/main/java/com/example/Kafka4ReactorApplication.java +++ b/samples/sample-04/kafka-4-reactor/src/main/java/com/example/Kafka4ReactorApplication.java @@ -16,10 +16,11 @@ package com.example; -import java.lang.reflect.Method; import java.util.stream.IntStream; import org.apache.kafka.clients.admin.NewTopic; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; @@ -27,9 +28,9 @@ import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.context.annotation.Bean; import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; import org.springframework.kafka.config.TopicBuilder; import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate; -import org.springframework.kafka.listener.reactive.ReactorAdapter; import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; @@ -41,21 +42,14 @@ @SpringBootApplication public class Kafka4ReactorApplication { + + private static final Logger logger = LoggerFactory.getLogger(Kafka4ReactorApplication.class); + + public static void main(String[] args) { SpringApplication.run(Kafka4ReactorApplication.class, args); } - @Bean - public ReactorAdapter adapter(Listener listener, KafkaProperties properties) - throws NoSuchMethodException, SecurityException { - - // This will be done by the bean post processor - Method method = Listener.class.getDeclaredMethod("listen", Flux.class); - ReactorAdapter adapter = new ReactorAdapter(listener, method, "skReactorTopic"); - adapter.setConfigs(properties.buildConsumerProperties()); - return adapter; - } - @Bean public ReactiveKafkaProducerTemplate template(KafkaProperties properties) { SenderOptions senderOptions = SenderOptions.create(properties.buildProducerProperties()); @@ -64,14 +58,16 @@ public ReactiveKafkaProducerTemplate template(KafkaProperties pr @Bean public NewTopic topic() { - return TopicBuilder.name("skReactorTopic").partitions(1).replicas(1).build(); + return TopicBuilder.name("skReactorTopic").partitions(10).replicas(1).build(); } @Bean - public ApplicationRunner runner(ReactiveKafkaProducerTemplate template) { + public ApplicationRunner runner(ReactiveKafkaProducerTemplate template, + KafkaListenerEndpointRegistry registry) { + return args -> IntStream.range(0, 10).forEach(i -> { Mono> send = template.send("skReactorTopic", "foo", "bar" + i); - send.subscribe(sr -> System.out.println(sr.recordMetadata())); + send.subscribe(sr -> logger.info(sr.recordMetadata().toString())); }); } @@ -80,10 +76,12 @@ public ApplicationRunner runner(ReactiveKafkaProducerTemplate te @Component class Listener { - @KafkaListener(topics = "skReactorTopic") + private static final Logger logger = LoggerFactory.getLogger(Listener.class); + + @KafkaListener(topics = "skReactorTopic", concurrency = "2") public Mono listen(Flux> flux) { return flux.doOnNext(record -> { - System.out.println(record.key() + ":" + record.value() + "@" + record.offset()); + logger.info(record.key() + ":" + record.value() + "@" + record.offset()); record.receiverOffset().acknowledge(); }).then(); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java index 95c507cf9a..661a7a059e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java @@ -291,15 +291,7 @@ public Object postProcessAfterInitialization(final Object bean, final String bea AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null); multiMethods.addAll(methodsWithHandler); } - // TODO Iterator iterator = annotatedMethods.keySet().iterator(); - while (iterator.hasNext()) { - Method next = iterator.next(); - if (next.getParameterTypes()[0].equals(Flux.class)) { - annotatedMethods.remove(next); - } - } - // end TODO if (annotatedMethods.isEmpty()) { this.nonAnnotatedClasses.add(bean.getClass()); this.logger.trace(() -> "No @KafkaListener annotations found on bean type: " + bean.getClass()); @@ -469,6 +461,10 @@ protected void processListener(MethodKafkaListenerEndpoint endpoint, Kafka if (StringUtils.hasText(errorHandlerBeanName)) { endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, KafkaListenerErrorHandler.class)); } + Class[] parameterTypes = endpoint.getMethod().getParameterTypes(); + if (parameterTypes.length == 1 && parameterTypes[0].equals(Flux.class)) { + endpoint.setReactive(true); + } this.registrar.registerEndpoint(endpoint, factory); if (StringUtils.hasText(beanRef)) { this.listenerScope.removeListener(beanRef); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java index fce1564294..70dc88eb42 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java @@ -109,6 +109,7 @@ public void setConsumerFactory(ConsumerFactory consumerFac this.consumerFactory = consumerFactory; } + @Override public ConsumerFactory getConsumerFactory() { return this.consumerFactory; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java index eb92662c46..fceb493796 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java @@ -113,6 +113,8 @@ public abstract class AbstractKafkaListenerEndpoint private Properties consumerProperties; + private boolean reactive; + @Override public void setBeanFactory(BeanFactory beanFactory) throws BeansException { this.beanFactory = beanFactory; @@ -411,6 +413,15 @@ public void setConsumerProperties(Properties consumerProperties) { this.consumerProperties = consumerProperties; } + public void setReactive(boolean reactive) { + this.reactive = reactive; + } + + @Override + public boolean isReactive() { + return this.reactive; + } + @Override public void afterPropertiesSet() { boolean topicsEmpty = getTopics().isEmpty(); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerContainerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerContainerFactory.java index 8a64dbba03..49f8040c2c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerContainerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerContainerFactory.java @@ -19,6 +19,7 @@ import java.util.Collection; import java.util.regex.Pattern; +import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.listener.MessageListenerContainer; import org.springframework.kafka.support.TopicPartitionInitialOffset; @@ -72,4 +73,8 @@ public interface KafkaListenerContainerFactory getConsumerFactory() { + throw new UnsupportedOperationException("This factory does not support this method"); + } + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java index c3ec65c0b7..8e17b903d6 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java @@ -115,6 +115,10 @@ default Properties getConsumerProperties() { return null; } + default boolean isReactive() { + return false; + } + /** * Setup the specified message listener container with the model * defined by this endpoint. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java index 0998cf598a..c1ce7982be 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java @@ -19,6 +19,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -26,6 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.LogFactory; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanInitializationException; @@ -40,6 +42,9 @@ import org.springframework.core.log.LogAccessor; import org.springframework.kafka.listener.AbstractMessageListenerContainer; import org.springframework.kafka.listener.MessageListenerContainer; +import org.springframework.kafka.listener.reactive.ReactorAdapter; +import org.springframework.kafka.support.JavaUtils; +import org.springframework.kafka.support.TopicPartitionInitialOffset; import org.springframework.util.Assert; import org.springframework.util.StringUtils; @@ -73,6 +78,8 @@ public class KafkaListenerEndpointRegistry implements DisposableBean, SmartLifec private final Map listenerContainers = new ConcurrentHashMap(); + private final Map reactorAdapters = new ConcurrentHashMap<>(); + private int phase = AbstractMessageListenerContainer.DEFAULT_PHASE; private ConfigurableApplicationContext applicationContext; @@ -81,6 +88,8 @@ public class KafkaListenerEndpointRegistry implements DisposableBean, SmartLifec private volatile boolean running; + private int reactive; + @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { if (applicationContext instanceof ConfigurableApplicationContext) { @@ -144,7 +153,53 @@ public Collection getAllListenerContainers() { * @see #registerListenerContainer(KafkaListenerEndpoint, KafkaListenerContainerFactory, boolean) */ public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory factory) { - registerListenerContainer(endpoint, factory, false); + if (endpoint.isReactive()) { + registerReactiveAdapter(endpoint, factory); + } + else { + registerListenerContainer(endpoint, factory, false); + } + } + + private void registerReactiveAdapter(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory factory) { + Integer concurrency = endpoint.getConcurrency(); + if (concurrency == null) { + concurrency = 1; + } + Assert.isInstanceOf(MethodKafkaListenerEndpoint.class, endpoint); + MethodKafkaListenerEndpoint methodEndpoint = (MethodKafkaListenerEndpoint) endpoint; + for (int i = 0; i < concurrency; i++) { + ReactorAdapter adapter; + if (endpoint.getTopics() != null) { + adapter = new ReactorAdapter(methodEndpoint.getBean(), methodEndpoint.getMethod(), + endpoint.getTopics().toArray(new String[0])); + } + else if (endpoint.getTopicPattern() != null) { + adapter = new ReactorAdapter(methodEndpoint.getBean(), methodEndpoint.getMethod(), + endpoint.getTopicPattern()); + } + else if (endpoint.getTopicPartitions() != null) { + adapter = new ReactorAdapter(methodEndpoint.getBean(), methodEndpoint.getMethod(), + endpoint.getTopicPartitions().toArray(new TopicPartitionInitialOffset[0])); + } + else { + throw new IllegalStateException("topics, topicPattern, or topicPartitions is required"); + } + Map configs = new HashMap<>(factory.getConsumerFactory().getConfigurationProperties()); + int instance = i; + JavaUtils.INSTANCE + .acceptIfNotNull(ConsumerConfig.GROUP_ID_CONFIG, endpoint.getGroupId(), + (key, value) -> configs.put(key, value)) + .acceptIfNotNull(ConsumerConfig.CLIENT_ID_CONFIG, endpoint.getClientIdPrefix(), + (key, value) -> configs.put(key, value + "-" + instance)); + adapter.setConfigs(configs); + String id = endpoint.getId(); + if (id == null) { + id = "reactiveListener#" + this.reactive + "#" + instance; + } + this.reactorAdapters.put(id, adapter); + } + this.reactive++; } /** @@ -255,6 +310,9 @@ public void start() { for (MessageListenerContainer listenerContainer : getListenerContainers()) { startIfNecessary(listenerContainer); } + this.reactorAdapters.values().forEach(adapter -> { + adapter.start(); + }); this.running = true; } @@ -263,6 +321,7 @@ public void stop() { for (MessageListenerContainer listenerContainer : getListenerContainers()) { listenerContainer.stop(); } + this.reactorAdapters.values().forEach(adapter -> adapter.stop()); this.running = false; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/reactive/ReactorAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/reactive/ReactorAdapter.java index 3b93d24bbd..0b5dd962e7 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/reactive/ReactorAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/reactive/ReactorAdapter.java @@ -22,9 +22,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.regex.Pattern; import org.springframework.context.SmartLifecycle; import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.TopicPartitionInitialOffset; import reactor.core.Disposable; import reactor.core.publisher.Flux; @@ -47,9 +49,15 @@ public class ReactorAdapter implements SmartLifecycle { private final List topics; + private final Pattern topicPattern; + + private final List topicPartitions; + private final Map configs = new HashMap<>(); - private Disposable disposable; + private String id; + + private volatile Disposable disposable; private volatile boolean running; @@ -58,12 +66,40 @@ public ReactorAdapter(Object bean, Method method, String... topics) { this.method = method; this.method.setAccessible(true); this.topics = Arrays.asList(topics); + this.topicPattern = null; + this.topicPartitions = null; + } + + public ReactorAdapter(Object bean, Method method, Pattern topicPattern) { + this.bean = bean; + this.method = method; + this.method.setAccessible(true); + this.topics = null; + this.topicPattern = topicPattern; + this.topicPartitions = null; + } + + public ReactorAdapter(Object bean, Method method, TopicPartitionInitialOffset... topics) { + this.bean = bean; + this.method = method; + this.method.setAccessible(true); + this.topics = null; + this.topicPattern = null; + this.topicPartitions = Arrays.asList(topics); } public void setConfigs(Map configs) { this.configs.putAll(configs); } + public String getId() { + return this.id; + } + + public void setId(String id) { + this.id = id; + } + @Override public void start() { ReceiverOptions options = ReceiverOptions.create(this.configs)