Skip to content

Commit

Permalink
GH-1690: Polish Quick Start Docs
Browse files Browse the repository at this point in the history
Resolves #1690

**cherry-pick to 2.6.x (fix kafka-clients version to 2.6.1)**

(cherry picked from commit 5e46e02)
  • Loading branch information
garyrussell authored and artembilan committed Jan 26, 2021
1 parent e978ae4 commit afbecd2
Showing 1 changed file with 94 additions and 146 deletions.
240 changes: 94 additions & 146 deletions src/reference/asciidoc/quick-tour.adoc
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
[[quick-tour]]
=== Quick Tour for the Impatient

This is the five-minute tour to get started with Spring Kafka.
=== Quick Tour

Prerequisites: You must install and run Apache Kafka.
Then you must grab the spring-kafka JAR and all of its dependencies.
Then you must put the spring-kafka JAR and all of its dependencies on your class path.
The easiest way to do that is to declare a dependency in your build tool.
The following example shows how to do so with Maven:

If you are not using Spring Boot, declare the `spring-kafka` jar as a dependency in your project.

.Maven
====
[source,xml,subs="+attributes"]
----
Expand All @@ -19,17 +19,17 @@ The following example shows how to do so with Maven:
----
====

The following example shows how to do so with Gradle:

.Gradle
====
[source,groovy,subs="+attributes"]
----
compile 'org.springframework.kafka:spring-kafka:{project-version}'
----
====

IMPORTANT: When using Spring Boot, omit the version and Boot will automatically bring in the correct version that is compatible with your Boot version:
IMPORTANT: When using Spring Boot, (and you haven't used start.spring.io to create your project), omit the version and Boot will automatically bring in the correct version that is compatible with your Boot version:

.Maven
====
[source,xml,subs="+attributes"]
----
Expand All @@ -40,114 +40,110 @@ IMPORTANT: When using Spring Boot, omit the version and Boot will automatically
----
====

The following example shows how to do so with Gradle:

.Gradle
====
[source,groovy,subs="+attributes"]
----
compile 'org.springframework.kafka:spring-kafka'
----
====

However, the quickest way to get started is to use https://start.spring.io[start.spring.io] (or the wizards in Spring Tool Suits and Intellij IDEA) and create a project, selecting 'Spring for Apache Kafka' as a dependency.

[[compatibility]]
==== Compatibility

This quick tour works with the following versions:

* Apache Kafka Clients 2.4.1
* Apache Kafka Clients 2.6.1
* Spring Framework 5.3.x
* Minimum Java version: 8

==== A Very, Very Quick Example
==== Getting Started

As the following example shows, you can use plain Java to send and receive a message:
The simplest way to get started is to use https://start.spring.io[start.spring.io] (or the wizards in Spring Tool Suits and Intellij IDEA) and create a project, selecting 'Spring for Apache Kafka' as a dependency.
Refer to the https://docs.spring.io/spring-boot/docs/current/reference/html/spring-boot-features.html#boot-features-kafka[Spring Boot documentation] for more information about its opinionated auto configuration of the infrastructure beans.

Here is a minimal consumer application.

===== Spring Boot Consumer App

.Application
====
[source,java]
[source, java]
----
@Test
public void testAutoCommit() throws Exception {
logger.info("Start auto");
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
final CountDownLatch latch = new CountDownLatch(4);
containerProps.setMessageListener(new MessageListener<Integer, String>() {
@Override
public void onMessage(ConsumerRecord<Integer, String> message) {
logger.info("received: " + message);
latch.countDown();
}
});
KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps);
container.setBeanName("testAuto");
container.start();
Thread.sleep(1000); // wait a bit for the container to start
KafkaTemplate<Integer, String> template = createTemplate();
template.setDefaultTopic("topic1");
template.sendDefault(0, "foo");
template.sendDefault(2, "bar");
template.sendDefault(0, "baz");
template.sendDefault(2, "qux");
template.flush();
assertTrue(latch.await(60, TimeUnit.SECONDS));
container.stop();
logger.info("Stop auto");
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("topic1")
.partitions(10)
.replicas(1)
.build();
}
@KafkaListener(id = "myId", topics = "topic1")
public void listen(String in) {
System.out.println(in);
}
}
----
====

[source, java]
.application.properties
====
[source, properties]
----
private KafkaMessageListenerContainer<Integer, String> createContainer(
ContainerProperties containerProps) {
Map<String, Object> props = consumerProps();
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<Integer, String>(props);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
}
spring.kafka.consumer.auto-offset-reset=earliest
----
====

private KafkaTemplate<Integer, String> createTemplate() {
Map<String, Object> senderProps = senderProps();
ProducerFactory<Integer, String> pf =
new DefaultKafkaProducerFactory<Integer, String>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
return template;
}
The `NewTopic` bean causes the topic to be created on the broker; it is not needed if the topic already exists.

private Map<String, Object> consumerProps() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, group);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
===== Spring Boot Producer App

.Application
====
[source, java]
----
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("topic1")
.partitions(10)
.replicas(1)
.build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("topic1", "test");
};
}
private Map<String, Object> senderProps() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
----
====

==== With Java Configuration
==== With Java Configuration (No Spring Boot)

IMPORTANT: Spring for Apache Kafka is designed to be used in a Spring Application Context.
For example, if you create the listener container yourself outside of a Spring context, not all functions will work unless you satisfy all of the `...Aware` interfaces that the container implements.

You can do the same work as appears in the previous example with Spring configuration in Java.
The following example shows how to do so:
Here is an example of an application that does not use Spring Boot.

====
[source,java]
Expand Down Expand Up @@ -180,13 +176,15 @@ public class Config {
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
return new DefaultKafkaConsumerFactory<>(consumerProps());
}
@Bean
public Map<String, Object> consumerConfigs() {
private Map<String, Object> consumerProps() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, group);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
...
return props;
}
Expand All @@ -198,13 +196,15 @@ public class Config {
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
return new DefaultKafkaProducerFactory<>(senderProps());
}
@Bean
public Map<String, Object> producerConfigs() {
private Map<String, Object> senderProps() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
...
return props;
}
Expand All @@ -216,7 +216,9 @@ public class Config {
}
----
====

====
[source, java]
----
public class Listener {
Expand All @@ -232,58 +234,4 @@ public class Listener {
----
====

==== Even Quicker, with Spring Boot

Spring Boot can make things even simpler.
The following Spring Boot application sends three messages to a topic, receives them, and stops:

====
[source, java]
----
@SpringBootApplication
public class Application implements CommandLineRunner {
public static Logger logger = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args).close();
}
@Autowired
private KafkaTemplate<String, String> template;
private final CountDownLatch latch = new CountDownLatch(3);
@Override
public void run(String... args) throws Exception {
this.template.send("myTopic", "foo1");
this.template.send("myTopic", "foo2");
this.template.send("myTopic", "foo3");
latch.await(60, TimeUnit.SECONDS);
logger.info("All received");
}
@KafkaListener(topics = "myTopic")
public void listen(ConsumerRecord<?, ?> cr) throws Exception {
logger.info(cr.toString());
latch.countDown();
}
}
----
====

Boot takes care of most of the configuration.
When we use a local broker, the only properties we need are the following:

.application.properties
====
[source]
----
spring.kafka.consumer.group-id=foo
spring.kafka.consumer.auto-offset-reset=earliest
----
====

We need the first property because we are using group management to assign topic partitions to consumers, so we need a group.
The second property ensures the new consumer group gets the messages we sent, because the container might start after the sends have completed.
As you can see, you have to define several infrastructure beans when not using Spring Boot.

0 comments on commit afbecd2

Please sign in to comment.