diff --git a/samples/sample-04/kafka-4-reactor/.gitignore b/samples/sample-04/kafka-4-reactor/.gitignore
new file mode 100644
index 0000000000..153c9335eb
--- /dev/null
+++ b/samples/sample-04/kafka-4-reactor/.gitignore
@@ -0,0 +1,29 @@
+HELP.md
+/target/
+!.mvn/wrapper/maven-wrapper.jar
+
+### STS ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+.sts4-cache
+
+### IntelliJ IDEA ###
+.idea
+*.iws
+*.iml
+*.ipr
+
+### NetBeans ###
+/nbproject/private/
+/nbbuild/
+/dist/
+/nbdist/
+/.nb-gradle/
+/build/
+
+### VS Code ###
+.vscode/
diff --git a/samples/sample-04/kafka-4-reactor/pom.xml b/samples/sample-04/kafka-4-reactor/pom.xml
new file mode 100644
index 0000000000..4257a4a849
--- /dev/null
+++ b/samples/sample-04/kafka-4-reactor/pom.xml
@@ -0,0 +1,108 @@
+
+
+ 4.0.0
+
+ org.springframework.boot
+ spring-boot-starter-parent
+ 2.2.0.BUILD-SNAPSHOT
+
+
+ net.gprussell
+ kafka-4-reactor
+ 0.0.1-SNAPSHOT
+ kafka-4-reactor
+ cdc
+
+
+ 1.8
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter
+
+
+ org.springframework.kafka
+ spring-kafka
+ 2.3.0.BUILD-SNAPSHOT
+
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+ org.junit.vintage
+ junit-vintage-engine
+
+
+ junit
+ junit
+
+
+
+
+ org.springframework.kafka
+ spring-kafka-test
+ test
+
+
+ io.projectreactor
+ reactor-core
+
+
+ io.projectreactor.kafka
+ reactor-kafka
+ 1.2.0.BUILD-SNAPSHOT
+
+
+ io.projectreactor
+ reactor-test
+ test
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+
+
+
+
+
+ spring-snapshots
+ Spring Snapshots
+ https://repo.spring.io/libs-snapshot
+
+ true
+
+
+
+ spring-milestones
+ Spring Milestones
+ https://repo.spring.io/milestone
+
+
+
+
+ spring-snapshots
+ Spring Snapshots
+ https://repo.spring.io/snapshot
+
+ true
+
+
+
+ spring-milestones
+ Spring Milestones
+ https://repo.spring.io/milestone
+
+
+
+
diff --git a/samples/sample-04/kafka-4-reactor/src/main/java/com/example/Kafka4ReactorApplication.java b/samples/sample-04/kafka-4-reactor/src/main/java/com/example/Kafka4ReactorApplication.java
new file mode 100644
index 0000000000..c32b6723e3
--- /dev/null
+++ b/samples/sample-04/kafka-4-reactor/src/main/java/com/example/Kafka4ReactorApplication.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018-2019 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.example;
+
+import java.util.stream.IntStream;
+
+import org.apache.kafka.clients.admin.NewTopic;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
+import org.springframework.kafka.config.TopicBuilder;
+import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate;
+import org.springframework.stereotype.Component;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.kafka.receiver.ReceiverRecord;
+import reactor.kafka.sender.SenderOptions;
+import reactor.kafka.sender.SenderResult;
+
+@SpringBootApplication
+public class Kafka4ReactorApplication {
+
+
+ private static final Logger logger = LoggerFactory.getLogger(Kafka4ReactorApplication.class);
+
+
+ public static void main(String[] args) {
+ SpringApplication.run(Kafka4ReactorApplication.class, args);
+ }
+
+ @Bean
+ public ReactiveKafkaProducerTemplate template(KafkaProperties properties) {
+ SenderOptions senderOptions = SenderOptions.create(properties.buildProducerProperties());
+ return new ReactiveKafkaProducerTemplate<>(senderOptions);
+ }
+
+ @Bean
+ public NewTopic topic() {
+ return TopicBuilder.name("skReactorTopic").partitions(10).replicas(1).build();
+ }
+
+ @Bean
+ public ApplicationRunner runner(ReactiveKafkaProducerTemplate template,
+ KafkaListenerEndpointRegistry registry) {
+
+ return args -> IntStream.range(0, 10).forEach(i -> {
+ Mono> send = template.send("skReactorTopic", "foo", "bar" + i);
+ send.subscribe(sr -> logger.info(sr.recordMetadata().toString()));
+ });
+ }
+
+}
+
+@Component
+class Listener {
+
+ private static final Logger logger = LoggerFactory.getLogger(Listener.class);
+
+ @KafkaListener(topics = "skReactorTopic", concurrency = "2")
+ public Mono listen(Flux> flux) {
+ return flux.doOnNext(record -> {
+ logger.info(record.key() + ":" + record.value() + "@" + record.offset());
+ record.receiverOffset().acknowledge();
+ }).then();
+ }
+
+}
+
diff --git a/samples/sample-04/kafka-4-reactor/src/main/resources/application.properties b/samples/sample-04/kafka-4-reactor/src/main/resources/application.properties
new file mode 100644
index 0000000000..5e3bd48646
--- /dev/null
+++ b/samples/sample-04/kafka-4-reactor/src/main/resources/application.properties
@@ -0,0 +1,3 @@
+spring.kafka.consumer.enable-auto-commit=false
+spring.kafka.consumer.auto-offset-reset=earliest
+spring.kafka.consumer.group-id=skReactor
diff --git a/samples/sample-04/kafka-4-reactor/src/test/java/com/example/Kafka4ReactorApplicationTests.java b/samples/sample-04/kafka-4-reactor/src/test/java/com/example/Kafka4ReactorApplicationTests.java
new file mode 100644
index 0000000000..1e3ff0db8c
--- /dev/null
+++ b/samples/sample-04/kafka-4-reactor/src/test/java/com/example/Kafka4ReactorApplicationTests.java
@@ -0,0 +1,13 @@
+package com.example;
+
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+
+@SpringBootTest
+class Kafka4ReactorApplicationTests {
+
+ @Test
+ void contextLoads() {
+ }
+
+}
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java
index 4af17e11b4..661a7a059e 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java
@@ -27,6 +27,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -90,6 +91,8 @@
import org.springframework.util.StringUtils;
import org.springframework.validation.Validator;
+import reactor.core.publisher.Flux;
+
/**
* Bean post-processor that registers methods annotated with {@link KafkaListener}
* to be invoked by a Kafka message listener container created under the covers
@@ -288,6 +291,7 @@ public Object postProcessAfterInitialization(final Object bean, final String bea
AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);
multiMethods.addAll(methodsWithHandler);
}
+ Iterator iterator = annotatedMethods.keySet().iterator();
if (annotatedMethods.isEmpty()) {
this.nonAnnotatedClasses.add(bean.getClass());
this.logger.trace(() -> "No @KafkaListener annotations found on bean type: " + bean.getClass());
@@ -457,6 +461,10 @@ protected void processListener(MethodKafkaListenerEndpoint, ?> endpoint, Kafka
if (StringUtils.hasText(errorHandlerBeanName)) {
endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, KafkaListenerErrorHandler.class));
}
+ Class>[] parameterTypes = endpoint.getMethod().getParameterTypes();
+ if (parameterTypes.length == 1 && parameterTypes[0].equals(Flux.class)) {
+ endpoint.setReactive(true);
+ }
this.registrar.registerEndpoint(endpoint, factory);
if (StringUtils.hasText(beanRef)) {
this.listenerScope.removeListener(beanRef);
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java
index fce1564294..70dc88eb42 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java
@@ -109,6 +109,7 @@ public void setConsumerFactory(ConsumerFactory super K, ? super V> consumerFac
this.consumerFactory = consumerFactory;
}
+ @Override
public ConsumerFactory super K, ? super V> getConsumerFactory() {
return this.consumerFactory;
}
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java
index eb92662c46..fceb493796 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java
@@ -113,6 +113,8 @@ public abstract class AbstractKafkaListenerEndpoint
private Properties consumerProperties;
+ private boolean reactive;
+
@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
@@ -411,6 +413,15 @@ public void setConsumerProperties(Properties consumerProperties) {
this.consumerProperties = consumerProperties;
}
+ public void setReactive(boolean reactive) {
+ this.reactive = reactive;
+ }
+
+ @Override
+ public boolean isReactive() {
+ return this.reactive;
+ }
+
@Override
public void afterPropertiesSet() {
boolean topicsEmpty = getTopics().isEmpty();
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerContainerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerContainerFactory.java
index 8a64dbba03..49f8040c2c 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerContainerFactory.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerContainerFactory.java
@@ -19,6 +19,7 @@
import java.util.Collection;
import java.util.regex.Pattern;
+import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.support.TopicPartitionInitialOffset;
@@ -72,4 +73,8 @@ public interface KafkaListenerContainerFactory getConsumerFactory() {
+ throw new UnsupportedOperationException("This factory does not support this method");
+ }
+
}
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java
index c3ec65c0b7..8e17b903d6 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java
@@ -115,6 +115,10 @@ default Properties getConsumerProperties() {
return null;
}
+ default boolean isReactive() {
+ return false;
+ }
+
/**
* Setup the specified message listener container with the model
* defined by this endpoint.
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java
index 923e5cecea..c1ce7982be 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java
@@ -19,6 +19,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -26,6 +27,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.LogFactory;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanInitializationException;
@@ -40,6 +42,9 @@
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.MessageListenerContainer;
+import org.springframework.kafka.listener.reactive.ReactorAdapter;
+import org.springframework.kafka.support.JavaUtils;
+import org.springframework.kafka.support.TopicPartitionInitialOffset;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
@@ -73,6 +78,8 @@ public class KafkaListenerEndpointRegistry implements DisposableBean, SmartLifec
private final Map listenerContainers =
new ConcurrentHashMap();
+ private final Map reactorAdapters = new ConcurrentHashMap<>();
+
private int phase = AbstractMessageListenerContainer.DEFAULT_PHASE;
private ConfigurableApplicationContext applicationContext;
@@ -81,6 +88,8 @@ public class KafkaListenerEndpointRegistry implements DisposableBean, SmartLifec
private volatile boolean running;
+ private int reactive;
+
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
if (applicationContext instanceof ConfigurableApplicationContext) {
@@ -144,7 +153,53 @@ public Collection getAllListenerContainers() {
* @see #registerListenerContainer(KafkaListenerEndpoint, KafkaListenerContainerFactory, boolean)
*/
public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory> factory) {
- registerListenerContainer(endpoint, factory, false);
+ if (endpoint.isReactive()) {
+ registerReactiveAdapter(endpoint, factory);
+ }
+ else {
+ registerListenerContainer(endpoint, factory, false);
+ }
+ }
+
+ private void registerReactiveAdapter(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory> factory) {
+ Integer concurrency = endpoint.getConcurrency();
+ if (concurrency == null) {
+ concurrency = 1;
+ }
+ Assert.isInstanceOf(MethodKafkaListenerEndpoint.class, endpoint);
+ MethodKafkaListenerEndpoint, ?> methodEndpoint = (MethodKafkaListenerEndpoint, ?>) endpoint;
+ for (int i = 0; i < concurrency; i++) {
+ ReactorAdapter adapter;
+ if (endpoint.getTopics() != null) {
+ adapter = new ReactorAdapter(methodEndpoint.getBean(), methodEndpoint.getMethod(),
+ endpoint.getTopics().toArray(new String[0]));
+ }
+ else if (endpoint.getTopicPattern() != null) {
+ adapter = new ReactorAdapter(methodEndpoint.getBean(), methodEndpoint.getMethod(),
+ endpoint.getTopicPattern());
+ }
+ else if (endpoint.getTopicPartitions() != null) {
+ adapter = new ReactorAdapter(methodEndpoint.getBean(), methodEndpoint.getMethod(),
+ endpoint.getTopicPartitions().toArray(new TopicPartitionInitialOffset[0]));
+ }
+ else {
+ throw new IllegalStateException("topics, topicPattern, or topicPartitions is required");
+ }
+ Map configs = new HashMap<>(factory.getConsumerFactory().getConfigurationProperties());
+ int instance = i;
+ JavaUtils.INSTANCE
+ .acceptIfNotNull(ConsumerConfig.GROUP_ID_CONFIG, endpoint.getGroupId(),
+ (key, value) -> configs.put(key, value))
+ .acceptIfNotNull(ConsumerConfig.CLIENT_ID_CONFIG, endpoint.getClientIdPrefix(),
+ (key, value) -> configs.put(key, value + "-" + instance));
+ adapter.setConfigs(configs);
+ String id = endpoint.getId();
+ if (id == null) {
+ id = "reactiveListener#" + this.reactive + "#" + instance;
+ }
+ this.reactorAdapters.put(id, adapter);
+ }
+ this.reactive++;
}
/**
@@ -255,6 +310,9 @@ public void start() {
for (MessageListenerContainer listenerContainer : getListenerContainers()) {
startIfNecessary(listenerContainer);
}
+ this.reactorAdapters.values().forEach(adapter -> {
+ adapter.start();
+ });
this.running = true;
}
@@ -263,12 +321,17 @@ public void stop() {
for (MessageListenerContainer listenerContainer : getListenerContainers()) {
listenerContainer.stop();
}
+ this.reactorAdapters.values().forEach(adapter -> adapter.stop());
this.running = false;
}
@Override
public void stop(Runnable callback) {
Collection listenerContainersToStop = getListenerContainers();
+ if (listenerContainersToStop.size() == 0) {
+ callback.run();
+ return;
+ }
AggregatingCallback aggregatingCallback = new AggregatingCallback(listenerContainersToStop.size(), callback);
for (MessageListenerContainer listenerContainer : listenerContainersToStop) {
if (listenerContainer.isRunning()) {
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/reactive/ReactorAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/reactive/ReactorAdapter.java
new file mode 100644
index 0000000000..0b5dd962e7
--- /dev/null
+++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/reactive/ReactorAdapter.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2019 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.kafka.listener.reactive;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import org.springframework.context.SmartLifecycle;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.support.TopicPartitionInitialOffset;
+
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.kafka.receiver.KafkaReceiver;
+import reactor.kafka.receiver.ReceiverOptions;
+
+/**
+ * Adapter for {@link KafkaListener} with a {@link Flux} parameter.
+ *
+ * @author Gary Russell
+ * @since 2.3
+ *
+ */
+public class ReactorAdapter implements SmartLifecycle {
+
+ private final Object bean;
+
+ private final Method method;
+
+ private final List topics;
+
+ private final Pattern topicPattern;
+
+ private final List topicPartitions;
+
+ private final Map configs = new HashMap<>();
+
+ private String id;
+
+ private volatile Disposable disposable;
+
+ private volatile boolean running;
+
+ public ReactorAdapter(Object bean, Method method, String... topics) {
+ this.bean = bean;
+ this.method = method;
+ this.method.setAccessible(true);
+ this.topics = Arrays.asList(topics);
+ this.topicPattern = null;
+ this.topicPartitions = null;
+ }
+
+ public ReactorAdapter(Object bean, Method method, Pattern topicPattern) {
+ this.bean = bean;
+ this.method = method;
+ this.method.setAccessible(true);
+ this.topics = null;
+ this.topicPattern = topicPattern;
+ this.topicPartitions = null;
+ }
+
+ public ReactorAdapter(Object bean, Method method, TopicPartitionInitialOffset... topics) {
+ this.bean = bean;
+ this.method = method;
+ this.method.setAccessible(true);
+ this.topics = null;
+ this.topicPattern = null;
+ this.topicPartitions = Arrays.asList(topics);
+ }
+
+ public void setConfigs(Map configs) {
+ this.configs.putAll(configs);
+ }
+
+ public String getId() {
+ return this.id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public void start() {
+ ReceiverOptions, ?> options = ReceiverOptions.create(this.configs)
+ .subscription(this.topics);
+ Flux> flux = KafkaReceiver.create(options).receive();
+ /*
+ * If the method's parameter is not a Flux, we could map
+ * record.value() using Spring's ConversionService.
+ * We could also, say, have Flux> and map
+ * the key/value (again with conversion if needed).
+ */
+ try {
+ Object result = this.method.invoke(this.bean, flux);
+ if (result instanceof Mono) {
+ this.disposable = ((Mono>) result).subscribe();
+ }
+ this.running = true;
+ }
+ catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void stop() {
+ if (this.disposable != null && !this.disposable.isDisposed()) {
+ this.disposable.dispose();
+ this.disposable = null;
+ }
+ this.running = false;
+ }
+
+ @Override
+ public boolean isRunning() {
+ return this.running;
+ }
+
+}
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/reactive/package-info.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/reactive/package-info.java
new file mode 100644
index 0000000000..f2c2876d6e
--- /dev/null
+++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/reactive/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * Provides classes for reactive listeners.
+ */
+package org.springframework.kafka.listener.reactive;