Skip to content

Commit

Permalink
Add Tip for Assigning All Partitions Manually
Browse files Browse the repository at this point in the history
* Polishing - add ackMode note
  • Loading branch information
garyrussell authored and artembilan committed May 14, 2019
1 parent 1eb5f53 commit 2ace8bf
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 0 deletions.
4 changes: 4 additions & 0 deletions src/reference/asciidoc/index.adoc
Expand Up @@ -43,6 +43,10 @@ include::streams.adoc[]

include::testing.adoc[]

== Tips and Tricks

include::tips.adoc[]

== Spring Integration

This part of the reference guide shows how to use the `spring-integration-kafka` module of Spring Integration.
Expand Down
4 changes: 4 additions & 0 deletions src/reference/asciidoc/kafka.adoc
Expand Up @@ -1010,6 +1010,8 @@ public void listen(ConsumerRecord<?, ?> record) {

You can specify each partition in the `partitions` or `partitionOffsets` attribute but not both.

As with most annotation properties, you can use SpEL expressions; for an example of how to generate a large list of partitions, see <<assign-all-parts>>.

When using manual `AckMode`, you can also provide the listener with the `Acknowledgment`.
The following example also shows how to use a different container factory.

Expand Down Expand Up @@ -2876,6 +2878,8 @@ static class MultiListenerBean {

Note that the argument is `null`, not `KafkaNull`.

TIP: See <<assign-all-parts>>.

[[annotation-error-handling]]
==== Handling Exceptions

Expand Down
45 changes: 45 additions & 0 deletions src/reference/asciidoc/tips.adoc
@@ -0,0 +1,45 @@
[[assign-all-parts]]
=== Manually Assigning All Partitions

Let's say you want to always read all records from all partitions (such as when using a compacted topic to load a distributed cache), it can be useful to manually assign the partitions and not use Kafka's group management.
Doing so can be unwieldy when there are many partitions, because you have to list the partitions.
It's also an issue if the number of partitions changes over time, because you would have to recompile your application each time the partition count changes.

The following is an example of how to use the power of a SpEL expression to create the partition list dynamically when the application starts:

====
[source, java]
----
@KafkaListener(topicPartitions = @TopicPartition(topic = "compacted",
partitions = "#{@finder.partitions('compacted')}"))
public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, String payload) {
...
}
@Bean
public PartitionFinder finder(ConsumerFactory<String, String> consumerFactory) {
return new PartitionFinder(consumerFactory);
}
public static class PartitionFinder {
private final ConsumerFactory<String, String> consumerFactory;
public PartitionFinder(ConsumerFactory<String, String> consumerFactory) {
this.consumerFactory = consumerFactory;
}
public String[] partitions(String topic) {
try (Consumer<String, String> consumer = consumerFactory.createConsumer()) {
return consumer.partitionsFor(topic).stream()
.map(pi -> "" + pi.partition())
.toArray(String[]::new);
}
}
}
----
====

Using this in conjunction with `ConsumerConfig.AUTO_OFFSET_RESET_CONFIG=earliest` will load all records each time the application is started.
You should also set the container's `AckMode` to `MANUAL` to prevent the container from committing offsets for a `null` consumer group.

0 comments on commit 2ace8bf

Please sign in to comment.