Skip to content

Commit

Permalink
GH-650: Pause/Resume doc example
Browse files Browse the repository at this point in the history
Resolves #650
  • Loading branch information
garyrussell authored and artembilan committed Jul 11, 2018
1 parent 0ffccba commit 10db6e9
Showing 1 changed file with 59 additions and 0 deletions.
59 changes: 59 additions & 0 deletions src/reference/asciidoc/kafka.adoc
Expand Up @@ -1476,6 +1476,65 @@ However, the consumers might not have actually paused yet; `isConsumerPaused()`

In addition, also since _2.1.5_, `ConsumerPausedEvent` s and `ConsumerResumedEvent` s are published with the container as the `source` property and the `TopicPatition` s involved in the `partitions` s property.

This simple Spring Boot application demonstrates using the container registry to get a reference to a `@KafkaListener` method's container and pausing/resuming its consumers, as well as receiving the corresponding events.

[source, java]
----
@SpringBootApplication
public class Application implements ApplicationListener<KafkaEvent> {
public static void main(String[] args) {
SpringApplication.run(Application.class, args).close();
}
@Override
public void onApplicationEvent(KafkaEvent event) {
System.out.println(event);
}
@Bean
public ApplicationRunner runner(KafkaListenerEndpointRegistry registry,
KafkaTemplate<String, String> template) {
return args -> {
template.send("pause.resume.topic", "foo");
Thread.sleep(10_000);
System.out.println("pausing");
registry.getListenerContainer("pause.resume").pause();
Thread.sleep(10_000);
template.send("pause.resume.topic", "bar");
Thread.sleep(10_000);
System.out.println("resuming");
registry.getListenerContainer("pause.resume").resume();
Thread.sleep(10_000);
};
}
@KafkaListener(id = "pause.resume", topics = "pause.resume.topic")
public void listen(String in) {
System.out.println(in);
}
@Bean
public NewTopic topic() {
return new NewTopic("pause.resume.topic", 2, (short) 1);
}
}
----

With results:

[source]
----
partitions assigned: [pause.resume.topic-1, pause.resume.topic-0]
foo
pausing
ConsumerPausedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
resuming
ConsumerResumedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
bar
----

[[serdes]]
==== Serialization/Deserialization and Message Conversion

Expand Down

0 comments on commit 10db6e9

Please sign in to comment.