Skip to content

Commit

Permalink
Partition list, range with initial offsets
Browse files Browse the repository at this point in the history
- support partition lists and lists of ranges with `@TopicPartition.partitionOffset()`.
  • Loading branch information
garyrussell authored and artembilan committed Nov 23, 2020
1 parent e666153 commit f60a554
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
import org.springframework.kafka.support.KafkaNull;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.GenericMessageConverter;
import org.springframework.messaging.converter.MessageConverter;
Expand Down Expand Up @@ -564,7 +565,7 @@ private List<TopicPartitionOffset> resolveTopicPartitionsList(TopicPartition top
() -> "At least one 'partition' or 'partitionOffset' required in @TopicPartition for topic '" + topic + "'");
List<TopicPartitionOffset> result = new ArrayList<>();
for (String partition : partitions) {
resolvePartitionAsInteger((String) topic, resolveExpression(partition), result);
resolvePartitionAsInteger((String) topic, resolveExpression(partition), result, null, false, false);
}
if (partitionOffsets.length == 1 && partitionOffsets[0].partition().equals("*")) {
result.forEach(tpo -> {
Expand All @@ -576,19 +577,8 @@ private List<TopicPartitionOffset> resolveTopicPartitionsList(TopicPartition top
for (PartitionOffset partitionOffset : partitionOffsets) {
Assert.isTrue(!partitionOffset.partition().equals("*"), () ->
"Partition wildcard '*' is only allowed in a single @PartitionOffset in " + result);
TopicPartitionOffset topicPartitionOffset =
new TopicPartitionOffset((String) topic,
resolvePartition(topic, partitionOffset),
resolveInitialOffset(topic, partitionOffset),
isRelative(topic, partitionOffset));
if (!result.contains(topicPartitionOffset)) {
result.add(topicPartitionOffset);
}
else {
throw new IllegalArgumentException(
String.format("@TopicPartition can't have the same partition configuration twice: [%s]",
topicPartitionOffset));
}
resolvePartitionAsInteger((String) topic, resolveExpression(partitionOffset.partition()), result,
resolveInitialOffset(topic, partitionOffset), isRelative(topic, partitionOffset), true);
}
}
Assert.isTrue(result.size() > 0, () -> "At least one partition required for " + topic);
Expand Down Expand Up @@ -673,18 +663,27 @@ else if (resolvedValue instanceof Iterable) {

@SuppressWarnings("unchecked")
private void resolvePartitionAsInteger(String topic, Object resolvedValue,
List<TopicPartitionOffset> result) {
List<TopicPartitionOffset> result, @Nullable Long offset, boolean isRelative, boolean checkDups) {

if (resolvedValue instanceof String[]) {
for (Object object : (String[]) resolvedValue) {
resolvePartitionAsInteger(topic, object, result);
resolvePartitionAsInteger(topic, object, result, offset, isRelative, checkDups);
}
}
else if (resolvedValue instanceof String) {
Assert.state(StringUtils.hasText((String) resolvedValue),
() -> "partition in @TopicPartition for topic '" + topic + "' cannot be empty");
result.addAll(parsePartitions((String) resolvedValue)
.map(part -> new TopicPartitionOffset(topic, part))
.collect(Collectors.toList()));
List<TopicPartitionOffset> collected = parsePartitions((String) resolvedValue)
.map(part -> new TopicPartitionOffset(topic, part, offset, isRelative))
.collect(Collectors.toList());
if (checkDups) {
collected.forEach(tpo -> {
Assert.state(!result.contains(tpo), () ->
String.format("@TopicPartition can't have the same partition configuration twice: [%s]",
tpo));
});
}
result.addAll(collected);
}
else if (resolvedValue instanceof Integer[]) {
for (Integer partition : (Integer[]) resolvedValue) {
Expand All @@ -696,7 +695,7 @@ else if (resolvedValue instanceof Integer) {
}
else if (resolvedValue instanceof Iterable) {
for (Object object : (Iterable<Object>) resolvedValue) {
resolvePartitionAsInteger(topic, object, result);
resolvePartitionAsInteger(topic, object, result, offset, isRelative, checkDups);
}
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@
* The partition within the topic to listen on. Property place holders and SpEL
* expressions are supported, which must resolve to Integer (or String that can be
* parsed as Integer). '*' indicates that the initial offset will be applied to all
* partitions in the encompassing {@link TopicPartition}
* partitions in the encompassing {@link TopicPartition} The string can contain a
* comma-delimited list of partitions, or ranges of partitions (e.g.
* {@code 0-5, 7, 10-15}, in which case, the offset will be applied to all of those
* partitions.
* @return partition within the topic.
*/
String partition();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,9 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.kafka.clients.consumer.Consumer;
Expand Down Expand Up @@ -102,9 +100,21 @@ void parsePartitions() {
TopicPartitionOffset[] topicPartitions = registry.getListenerContainer("pp")
.getContainerProperties()
.getTopicPartitions();
List<Integer> collected = Arrays.stream(topicPartitions).map(tp -> tp.getPartition())
.collect(Collectors.toList());
Stream<Integer> collected = Arrays.stream(topicPartitions)
.map(tp -> tp.getPartition());
assertThat(collected).containsExactly(0, 1, 2, 3, 4, 5, 7, 10, 11, 12, 13, 14, 15);

assertThat(Arrays.stream(this.registry.getListenerContainer("ppo")
.getContainerProperties()
.getTopicPartitions())).containsExactly(
new TopicPartitionOffset("foo", 0),
new TopicPartitionOffset("foo", 1),
new TopicPartitionOffset("foo", 2),
new TopicPartitionOffset("foo", 3));
assertThat(Arrays.stream(this.registry.getListenerContainer("ppo")
.getContainerProperties()
.getTopicPartitions())
.map(tpo -> tpo.getOffset())).containsExactly(0L, 0L, 1L, 1L);
}

@SuppressWarnings({ "rawtypes", "unchecked" })
Expand Down Expand Up @@ -147,6 +157,13 @@ public void foo(String in) {
public void bar(String in) {
}

@KafkaListener(id = "ppo", autoStartup = "false",
topicPartitions = @org.springframework.kafka.annotation.TopicPartition(topic = "foo",
partitionOffsets = { @PartitionOffset(partition = "0-1", initialOffset = "0"),
@PartitionOffset(partition = "#{'2-3'}", initialOffset = "1") }))
public void baz(String in) {
}

@SuppressWarnings({ "rawtypes" })
@Bean
public ConsumerFactory consumerFactory() {
Expand Down
17 changes: 17 additions & 0 deletions src/reference/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1320,6 +1320,23 @@ public void process(String in) {

The range is inclusive; the example above will assign partitions `0, 1, 2, 3, 4, 5, 7, 10, 11, 12, 13, 14, 15`.

The same technique can be used when specifying initial offsets:

====
[source, java]
----
@KafkaListener(id = "thing3", topicPartitions =
{ @TopicPartition(topic = "topic1",
partitionOffsets = @PartitionOffset(partition = "0-5", initialOffset = "0"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
----
====

The initial offset will be applied to all 6 partitions.

====== Manual Acknowledgment

When using manual `AckMode`, you can also provide the listener with the `Acknowledgment`.
Expand Down

0 comments on commit f60a554

Please sign in to comment.