From 4587d7e56e23a815a8a9ba432ba75dda9f87b739 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Fri, 31 Jul 2020 15:29:30 -0400 Subject: [PATCH] GH-1554: Partition wildcard for initial offsets Resolves https://github.com/spring-projects/spring-kafka/issues/1554 When using manual assignment it was difficult to specify the initial offset for multiple partitions - especially when dynamically determined. Add a wildcard to indicate the offset should be applied to all partitions. **cherry-pick to 2.5.x** * Call onPartitionsAssigned() with manual assignment Resolves https://github.com/spring-projects/spring-kafka/issues/1553 * Fix test class name. # Conflicts: # src/reference/asciidoc/whats-new.adoc --- ...kaListenerAnnotationBeanPostProcessor.java | 38 ++-- .../kafka/annotation/PartitionOffset.java | 9 +- .../kafka/annotation/TopicPartition.java | 5 +- .../KafkaMessageListenerContainer.java | 12 +- .../kafka/support/TopicPartitionOffset.java | 26 ++- .../ManualAssignmentInitialSeekTests.java | 172 ++++++++++++++++++ .../SeekToCurrentOnErrorRecordModeTests.java | 21 ++- src/reference/asciidoc/kafka.adoc | 29 ++- src/reference/asciidoc/tips.adoc | 4 +- 9 files changed, 277 insertions(+), 39 deletions(-) create mode 100644 spring-kafka/src/test/java/org/springframework/kafka/listener/ManualAssignmentInitialSeekTests.java diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java index f9b351878a..31be2c6d51 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java @@ -555,22 +555,32 @@ private List resolveTopicPartitionsList(TopicPartition top for (String partition : partitions) { resolvePartitionAsInteger((String) topic, resolveExpression(partition), result); } - - for (PartitionOffset partitionOffset : partitionOffsets) { - TopicPartitionOffset topicPartitionOffset = - new TopicPartitionOffset((String) topic, - resolvePartition(topic, partitionOffset), - resolveInitialOffset(topic, partitionOffset), - isRelative(topic, partitionOffset)); - if (!result.contains(topicPartitionOffset)) { - result.add(topicPartitionOffset); - } - else { - throw new IllegalArgumentException( - String.format("@TopicPartition can't have the same partition configuration twice: [%s]", - topicPartitionOffset)); + if (partitionOffsets.length == 1 && partitionOffsets[0].partition().equals("*")) { + result.forEach(tpo -> { + tpo.setOffset(resolveInitialOffset(tpo.getTopic(), partitionOffsets[0])); + tpo.setRelativeToCurrent(isRelative(tpo.getTopic(), partitionOffsets[0])); + }); + } + else { + for (PartitionOffset partitionOffset : partitionOffsets) { + Assert.isTrue(!partitionOffset.partition().equals("*"), () -> + "Partition wildcard '*' is only allowed in a single @PartitionOffset in " + result); + TopicPartitionOffset topicPartitionOffset = + new TopicPartitionOffset((String) topic, + resolvePartition(topic, partitionOffset), + resolveInitialOffset(topic, partitionOffset), + isRelative(topic, partitionOffset)); + if (!result.contains(topicPartitionOffset)) { + result.add(topicPartitionOffset); + } + else { + throw new IllegalArgumentException( + String.format("@TopicPartition can't have the same partition configuration twice: [%s]", + topicPartitionOffset)); + } } } + Assert.isTrue(result.size() > 0, () -> "At least one partition required for " + topic); return result; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/PartitionOffset.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/PartitionOffset.java index 776497d126..f56f7df772 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/PartitionOffset.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/PartitionOffset.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,9 +31,10 @@ public @interface PartitionOffset { /** - * The partition within the topic to listen on. - * Property place holders and SpEL expressions are supported, - * which must resolve to Integer (or String that can be parsed as Integer). + * The partition within the topic to listen on. Property place holders and SpEL + * expressions are supported, which must resolve to Integer (or String that can be + * parsed as Integer). '*' indicates that the initial offset will be applied to all + * partitions in the encompassing {@link TopicPartition} * @return partition within the topic. */ String partition(); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/TopicPartition.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/TopicPartition.java index 2f12a7705e..e2818aa7c6 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/TopicPartition.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/TopicPartition.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -50,7 +50,8 @@ String[] partitions() default {}; /** - * The partitions with initial offsets within the topic. + * The partitions with initial offsets within the topic. There must only be one + * instance of {@link PartitionOffset} if its 'partition' property is '*'. * Partitions specified here can't be duplicated in the {@link #partitions()}. * @return the {@link PartitionOffset} array. */ diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 2eb2f56427..f92113962d 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -18,6 +18,7 @@ import java.nio.ByteBuffer; import java.time.Duration; +import java.util.AbstractMap.SimpleEntry; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -25,6 +26,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; @@ -890,7 +892,7 @@ private void subscribeOrAssignTopics(final Consumer subscr else { List topicPartitionsToAssign = Arrays.asList(KafkaMessageListenerContainer.this.topicPartitions); - this.definedPartitions = new HashMap<>(topicPartitionsToAssign.size()); + this.definedPartitions = new LinkedHashMap<>(topicPartitionsToAssign.size()); for (TopicPartitionOffset topicPartition : topicPartitionsToAssign) { this.definedPartitions.put(topicPartition.getTopicPartition(), new OffsetMetadata(topicPartition.getOffset(), topicPartition.isRelativeToCurrent(), @@ -2108,7 +2110,7 @@ private void initPartitionsIfNeeded() { * called until we poll() the consumer. Users can use a ConsumerAwareRebalanceListener * or a ConsumerSeekAware listener in that case. */ - Map partitions = new HashMap<>(this.definedPartitions); + Map partitions = new LinkedHashMap<>(this.definedPartitions); Set beginnings = partitions.entrySet().stream() .filter(e -> SeekPosition.BEGINNING.equals(e.getValue().seekPosition)) .map(Entry::getKey) @@ -2153,6 +2155,12 @@ else if (metadata.relativeToCurrent) { } } } + if (this.consumerSeekAwareListener != null) { + this.consumerSeekAwareListener.onPartitionsAssigned(partitions.keySet().stream() + .map(tp -> new SimpleEntry<>(tp, this.consumer.position(tp))) + .collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue())), + this.seekCallback); + } } private void logReset(TopicPartition topicPartition, long newOffset) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/TopicPartitionOffset.java b/spring-kafka/src/main/java/org/springframework/kafka/support/TopicPartitionOffset.java index d455570633..18db8a24e2 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/TopicPartitionOffset.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/TopicPartitionOffset.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 the original author or authors. + * Copyright 2019-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -70,11 +70,11 @@ public enum SeekPosition { private final TopicPartition topicPartition; - private final Long offset; + private final SeekPosition position; - private final boolean relativeToCurrent; + private Long offset; - private final SeekPosition position; + private boolean relativeToCurrent; /** * Construct an instance with no initial offset management. @@ -171,10 +171,28 @@ public Long getOffset() { return this.offset; } + /** + * Set the offset. + * @param offset the offset. + * @since 2.5.5 + */ + public void setOffset(Long offset) { + this.offset = offset; + } + public boolean isRelativeToCurrent() { return this.relativeToCurrent; } + /** + * Set whether the offset is relative to the current position. + * @param relativeToCurrent true for relative to current. + * @since 2.5.5 + */ + public void setRelativeToCurrent(boolean relativeToCurrent) { + this.relativeToCurrent = relativeToCurrent; + } + public SeekPosition getPosition() { return this.position; } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ManualAssignmentInitialSeekTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ManualAssignmentInitialSeekTests.java new file mode 100644 index 0000000000..3ec19f733e --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ManualAssignmentInitialSeekTests.java @@ -0,0 +1,172 @@ +/* + * Copyright 2017-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.willAnswer; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; + +import java.time.Duration; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.Test; +import org.mockito.InOrder; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.annotation.PartitionOffset; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.listener.ContainerProperties.AckMode; +import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +/** + * @author Gary Russell + * @since 2.0.1 + * + */ +@SpringJUnitConfig +@DirtiesContext +public class ManualAssignmentInitialSeekTests { + + @SuppressWarnings("rawtypes") + @Autowired + private Consumer consumer; + + @Autowired + private Config config; + + @Autowired + private KafkaListenerEndpointRegistry registry; + + /* + * Deliver 6 records from three partitions, fail on the second record second + * partition, first attempt; verify partition 0,1 committed and a total of 7 records + * handled after seek. + */ + @SuppressWarnings("unchecked") + @Test + public void discardRemainingRecordsFromPollAndSeek() throws Exception { + assertThat(this.config.pollLatch.await(10, TimeUnit.SECONDS)).isTrue(); + this.registry.stop(); + assertThat(this.config.closeLatch.await(10, TimeUnit.SECONDS)).isTrue(); + InOrder inOrder = inOrder(this.consumer); + inOrder.verify(this.consumer).assign(any(Collection.class)); + inOrder.verify(this.consumer).seekToBeginning(any()); + inOrder.verify(this.consumer, atLeastOnce()).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT)); + assertThat(this.config.registerSeekCallbackCalled).isTrue(); + assertThat(this.config.partitionsAssignedCalled).isTrue(); + assertThat(this.config.assignments).hasSize(3); + } + + @Configuration + @EnableKafka + public static class Config extends AbstractConsumerSeekAware { + + final CountDownLatch pollLatch = new CountDownLatch(1); + + final CountDownLatch closeLatch = new CountDownLatch(1); + + volatile boolean registerSeekCallbackCalled; + + volatile boolean partitionsAssignedCalled; + + volatile Map assignments; + + @KafkaListener(groupId = "grp", + topicPartitions = @org.springframework.kafka.annotation.TopicPartition(topic = "foo", + partitions = "#{'0,1,2'.split(',')}", + partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))) + public void foo(String in) { + } + + @SuppressWarnings({ "rawtypes" }) + @Bean + public ConsumerFactory consumerFactory() { + ConsumerFactory consumerFactory = mock(ConsumerFactory.class); + final Consumer consumer = consumer(); + given(consumerFactory.createConsumer("grp", "", "-0", KafkaTestUtils.defaultPropertyOverrides())) + .willReturn(consumer); + return consumerFactory; + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Bean + public Consumer consumer() { + final Consumer consumer = mock(Consumer.class); + willAnswer(i -> { + this.pollLatch.countDown(); + try { + Thread.sleep(50); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return new ConsumerRecords(Collections.emptyMap()); + }).given(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT)); + willAnswer(i -> { + this.closeLatch.countDown(); + return null; + }).given(consumer).close(); + return consumer; + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); + factory.setConsumerFactory(consumerFactory()); + factory.setErrorHandler(new SeekToCurrentErrorHandler()); + factory.getContainerProperties().setAckMode(AckMode.RECORD); + factory.getContainerProperties().setDeliveryAttemptHeader(true); + return factory; + } + + @Override + public void registerSeekCallback(ConsumerSeekCallback callback) { + super.registerSeekCallback(callback); + this.registerSeekCallbackCalled = true; + } + + @Override + public void onPartitionsAssigned(Map assignments, ConsumerSeekCallback callback) { + super.onPartitionsAssigned(assignments, callback); + this.partitionsAssignedCalled = true; + this.assignments = assignments; + callback.seekToBeginning(assignments.keySet()); + } + + } + +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentOnErrorRecordModeTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentOnErrorRecordModeTests.java index 4b5f6df05b..70d8055be9 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentOnErrorRecordModeTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentOnErrorRecordModeTests.java @@ -37,7 +37,6 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.OffsetAndMetadata; @@ -51,6 +50,7 @@ import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.annotation.PartitionOffset; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerEndpointRegistry; import org.springframework.kafka.core.ConsumerFactory; @@ -94,7 +94,10 @@ public void discardRemainingRecordsFromPollAndSeek() throws Exception { this.registry.stop(); assertThat(this.config.closeLatch.await(10, TimeUnit.SECONDS)).isTrue(); InOrder inOrder = inOrder(this.consumer); - inOrder.verify(this.consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class)); + inOrder.verify(this.consumer).assign(any(Collection.class)); + inOrder.verify(this.consumer).seek(new TopicPartition("foo", 0), 0L); + inOrder.verify(this.consumer).seek(new TopicPartition("foo", 1), 0L); + inOrder.verify(this.consumer).seek(new TopicPartition("foo", 2), 0L); inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT)); inOrder.verify(this.consumer).commitSync( Collections.singletonMap(new TopicPartition("foo", 0), new OffsetAndMetadata(1L)), @@ -137,11 +140,14 @@ public static class Config { final CountDownLatch closeLatch = new CountDownLatch(1); - final CountDownLatch commitLatch = new CountDownLatch(7); + final CountDownLatch commitLatch = new CountDownLatch(6); int count; - @KafkaListener(topics = "foo", groupId = "grp") + @KafkaListener(groupId = "grp", + topicPartitions = @org.springframework.kafka.annotation.TopicPartition(topic = "foo", + partitions = "#{'0,1,2'.split(',')}", + partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))) public void foo(String in, @Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery) { this.contents.add(in); this.deliveries.add(delivery); @@ -168,11 +174,6 @@ public Consumer consumer() { final TopicPartition topicPartition0 = new TopicPartition("foo", 0); final TopicPartition topicPartition1 = new TopicPartition("foo", 1); final TopicPartition topicPartition2 = new TopicPartition("foo", 2); - willAnswer(i -> { - ((ConsumerRebalanceListener) i.getArgument(1)).onPartitionsAssigned( - Collections.singletonList(topicPartition1)); - return null; - }).given(consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class)); Map> records1 = new LinkedHashMap<>(); records1.put(topicPartition0, Arrays.asList( new ConsumerRecord("foo", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, 0, null, "foo"), @@ -198,7 +199,7 @@ public Consumer consumer() { return new ConsumerRecords(records2); default: try { - Thread.sleep(500); + Thread.sleep(50); } catch (InterruptedException e) { Thread.currentThread().interrupt(); diff --git a/src/reference/asciidoc/kafka.adoc b/src/reference/asciidoc/kafka.adoc index 629bfeb69d..5e056bca4b 100644 --- a/src/reference/asciidoc/kafka.adoc +++ b/src/reference/asciidoc/kafka.adoc @@ -1209,6 +1209,9 @@ public void listen(String data) { ---- ==== +[[manual-assignment]] +====== Explicit Partition Assignment + You can also configure POJO listeners with explicit topics and partitions (and, optionally, their initial offsets). The following example shows how to do so: @@ -1230,6 +1233,29 @@ You can specify each partition in the `partitions` or `partitionOffsets` attribu As with most annotation properties, you can use SpEL expressions; for an example of how to generate a large list of partitions, see <>. +Starting with version 2.5.5, you can apply an initial offset to all assigned partitions: + +==== +[source, java] +---- +@KafkaListener(id = "thing3", topicPartitions = + { @TopicPartition(topic = "topic1", partitions = { "0", "1" }, + partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")) + }) +public void listen(ConsumerRecord record) { + ... +} +---- +==== + +The `*` wildcard represents all partitions in the `partitions` attribute. +There must only be one `@PartitionOffset` with the wildcard in each `@TopicPartition`. + +In addition, when the listener implements `ConsumerSeekAware`, `onPartitionsAssigned` is now called, even when using manual assignment. +This allows, for example, any arbitrary seek operations at that time. + +====== Manual Acknowledgment + When using manual `AckMode`, you can also provide the listener with the `Acknowledgment`. The following example also shows how to use a different container factory. @@ -2632,8 +2658,7 @@ When using group management, `onPartitionsAssigned` is called when partitions ar You can use this method, for example, for setting initial offsets for the partitions, by calling the callback. You can also use this method to associate this thread's callback with the assigned partitions (see the example below). You must use the callback argument, not the one passed into `registerSeekCallback`. -This method is never called if you explicitly assign partitions yourself. -Use the `TopicPartitionOffset` in that case. +Starting with version 2.5.5, this method is called, even when using <>. `onPartitionsRevoked` is called when the container is stopped or Kafka revokes assignments. You should discard this thread's callback and remove any associations to the revoked partitions. diff --git a/src/reference/asciidoc/tips.adoc b/src/reference/asciidoc/tips.adoc index 501f2e4ad6..146a75fa6a 100644 --- a/src/reference/asciidoc/tips.adoc +++ b/src/reference/asciidoc/tips.adoc @@ -11,7 +11,8 @@ The following is an example of how to use the power of a SpEL expression to crea [source, java] ---- @KafkaListener(topicPartitions = @TopicPartition(topic = "compacted", - partitions = "#{@finder.partitions('compacted')}")) + partitions = "#{@finder.partitions('compacted')}"), + partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))) public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, String payload) { ... } @@ -43,6 +44,7 @@ public static class PartitionFinder { Using this in conjunction with `ConsumerConfig.AUTO_OFFSET_RESET_CONFIG=earliest` will load all records each time the application is started. You should also set the container's `AckMode` to `MANUAL` to prevent the container from committing offsets for a `null` consumer group. +Howewever, starting with version 2.5.5, as shown above, you can apply an initial offset to all partitions; see <> for more information. [[ex-jdbc-sync]] === Example of Transaction Synchronization