Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions samples/sample-04/kafka-4-reactor/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
HELP.md
/target/
!.mvn/wrapper/maven-wrapper.jar

### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache

### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr

### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
/build/

### VS Code ###
.vscode/
108 changes: 108 additions & 0 deletions samples/sample-04/kafka-4-reactor/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.0.BUILD-SNAPSHOT</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>net.gprussell</groupId>
<artifactId>kafka-4-reactor</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafka-4-reactor</name>
<description>cdc</description>

<properties>
<java.version>1.8</java.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.3.0.BUILD-SNAPSHOT</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
<version>1.2.0.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

<repositories>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/libs-snapshot</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/snapshot</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</pluginRepository>
<pluginRepository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
</pluginRepository>
</pluginRepositories>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright 2018-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.example;

import java.util.stream.IntStream;

import org.apache.kafka.clients.admin.NewTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate;
import org.springframework.stereotype.Component;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.ReceiverRecord;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderResult;

@SpringBootApplication
public class Kafka4ReactorApplication {


private static final Logger logger = LoggerFactory.getLogger(Kafka4ReactorApplication.class);


public static void main(String[] args) {
SpringApplication.run(Kafka4ReactorApplication.class, args);
}

@Bean
public ReactiveKafkaProducerTemplate<String, String> template(KafkaProperties properties) {
SenderOptions<String, String> senderOptions = SenderOptions.create(properties.buildProducerProperties());
return new ReactiveKafkaProducerTemplate<>(senderOptions);
}

@Bean
public NewTopic topic() {
return TopicBuilder.name("skReactorTopic").partitions(10).replicas(1).build();
}

@Bean
public ApplicationRunner runner(ReactiveKafkaProducerTemplate<String, String> template,
KafkaListenerEndpointRegistry registry) {

return args -> IntStream.range(0, 10).forEach(i -> {
Mono<SenderResult<Void>> send = template.send("skReactorTopic", "foo", "bar" + i);
send.subscribe(sr -> logger.info(sr.recordMetadata().toString()));
});
}

}

@Component
class Listener {

private static final Logger logger = LoggerFactory.getLogger(Listener.class);

@KafkaListener(topics = "skReactorTopic", concurrency = "2")
public Mono<Void> listen(Flux<ReceiverRecord<String, String>> flux) {
return flux.doOnNext(record -> {
logger.info(record.key() + ":" + record.value() + "@" + record.offset());
record.receiverOffset().acknowledge();
}).then();
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=skReactor
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.example;

import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class Kafka4ReactorApplicationTests {

@Test
void contextLoads() {
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
Expand Down Expand Up @@ -90,6 +91,8 @@
import org.springframework.util.StringUtils;
import org.springframework.validation.Validator;

import reactor.core.publisher.Flux;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this Reactor becomes as not a optional dependency any more...
Is it really what we are aiming now ?


/**
* Bean post-processor that registers methods annotated with {@link KafkaListener}
* to be invoked by a Kafka message listener container created under the covers
Expand Down Expand Up @@ -288,6 +291,7 @@ public Object postProcessAfterInitialization(final Object bean, final String bea
AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);
multiMethods.addAll(methodsWithHandler);
}
Iterator<Method> iterator = annotatedMethods.keySet().iterator();
if (annotatedMethods.isEmpty()) {
this.nonAnnotatedClasses.add(bean.getClass());
this.logger.trace(() -> "No @KafkaListener annotations found on bean type: " + bean.getClass());
Expand Down Expand Up @@ -457,6 +461,10 @@ protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, Kafka
if (StringUtils.hasText(errorHandlerBeanName)) {
endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, KafkaListenerErrorHandler.class));
}
Class<?>[] parameterTypes = endpoint.getMethod().getParameterTypes();
if (parameterTypes.length == 1 && parameterTypes[0].equals(Flux.class)) {
endpoint.setReactive(true);
}
this.registrar.registerEndpoint(endpoint, factory);
if (StringUtils.hasText(beanRef)) {
this.listenerScope.removeListener(beanRef);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public void setConsumerFactory(ConsumerFactory<? super K, ? super V> consumerFac
this.consumerFactory = consumerFactory;
}

@Override
public ConsumerFactory<? super K, ? super V> getConsumerFactory() {
return this.consumerFactory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ public abstract class AbstractKafkaListenerEndpoint<K, V>

private Properties consumerProperties;

private boolean reactive;

@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
Expand Down Expand Up @@ -411,6 +413,15 @@ public void setConsumerProperties(Properties consumerProperties) {
this.consumerProperties = consumerProperties;
}

public void setReactive(boolean reactive) {
this.reactive = reactive;
}

@Override
public boolean isReactive() {
return this.reactive;
}

@Override
public void afterPropertiesSet() {
boolean topicsEmpty = getTopics().isEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Collection;
import java.util.regex.Pattern;

import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.support.TopicPartitionInitialOffset;

Expand Down Expand Up @@ -72,4 +73,8 @@ public interface KafkaListenerContainerFactory<C extends MessageListenerContaine
*/
C createContainer(Pattern topicPattern);

default ConsumerFactory<?, ?> getConsumerFactory() {
throw new UnsupportedOperationException("This factory does not support this method");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ default Properties getConsumerProperties() {
return null;
}

default boolean isReactive() {
return false;
}

/**
* Setup the specified message listener container with the model
* defined by this endpoint.
Expand Down
Loading