Skip to content

Commit

Permalink
GH-1554: Partition wildcard for initial offsets
Browse files Browse the repository at this point in the history
Resolves #1554

When using manual assignment it was difficult to specify the initial
offset for multiple partitions - especially when dynamically determined.

Add a wildcard to indicate the offset should be applied to all partitions.

**cherry-pick to 2.5.x**

* Call onPartitionsAssigned() with manual assignment

Resolves #1553

* Fix test class name.
# Conflicts:
#	src/reference/asciidoc/whats-new.adoc
  • Loading branch information
garyrussell authored and artembilan committed Jul 31, 2020
1 parent dee102f commit 4587d7e
Show file tree
Hide file tree
Showing 9 changed files with 277 additions and 39 deletions.
Expand Up @@ -555,22 +555,32 @@ private List<TopicPartitionOffset> resolveTopicPartitionsList(TopicPartition top
for (String partition : partitions) {
resolvePartitionAsInteger((String) topic, resolveExpression(partition), result);
}

for (PartitionOffset partitionOffset : partitionOffsets) {
TopicPartitionOffset topicPartitionOffset =
new TopicPartitionOffset((String) topic,
resolvePartition(topic, partitionOffset),
resolveInitialOffset(topic, partitionOffset),
isRelative(topic, partitionOffset));
if (!result.contains(topicPartitionOffset)) {
result.add(topicPartitionOffset);
}
else {
throw new IllegalArgumentException(
String.format("@TopicPartition can't have the same partition configuration twice: [%s]",
topicPartitionOffset));
if (partitionOffsets.length == 1 && partitionOffsets[0].partition().equals("*")) {
result.forEach(tpo -> {
tpo.setOffset(resolveInitialOffset(tpo.getTopic(), partitionOffsets[0]));
tpo.setRelativeToCurrent(isRelative(tpo.getTopic(), partitionOffsets[0]));
});
}
else {
for (PartitionOffset partitionOffset : partitionOffsets) {
Assert.isTrue(!partitionOffset.partition().equals("*"), () ->
"Partition wildcard '*' is only allowed in a single @PartitionOffset in " + result);
TopicPartitionOffset topicPartitionOffset =
new TopicPartitionOffset((String) topic,
resolvePartition(topic, partitionOffset),
resolveInitialOffset(topic, partitionOffset),
isRelative(topic, partitionOffset));
if (!result.contains(topicPartitionOffset)) {
result.add(topicPartitionOffset);
}
else {
throw new IllegalArgumentException(
String.format("@TopicPartition can't have the same partition configuration twice: [%s]",
topicPartitionOffset));
}
}
}
Assert.isTrue(result.size() > 0, () -> "At least one partition required for " + topic);
return result;
}

Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 the original author or authors.
* Copyright 2016-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,9 +31,10 @@
public @interface PartitionOffset {

/**
* The partition within the topic to listen on.
* Property place holders and SpEL expressions are supported,
* which must resolve to Integer (or String that can be parsed as Integer).
* The partition within the topic to listen on. Property place holders and SpEL
* expressions are supported, which must resolve to Integer (or String that can be
* parsed as Integer). '*' indicates that the initial offset will be applied to all
* partitions in the encompassing {@link TopicPartition}
* @return partition within the topic.
*/
String partition();
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 the original author or authors.
* Copyright 2016-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -50,7 +50,8 @@
String[] partitions() default {};

/**
* The partitions with initial offsets within the topic.
* The partitions with initial offsets within the topic. There must only be one
* instance of {@link PartitionOffset} if its 'partition' property is '*'.
* Partitions specified here can't be duplicated in the {@link #partitions()}.
* @return the {@link PartitionOffset} array.
*/
Expand Down
Expand Up @@ -18,13 +18,15 @@

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.AbstractMap.SimpleEntry;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -890,7 +892,7 @@ private void subscribeOrAssignTopics(final Consumer<? super K, ? super V> subscr
else {
List<TopicPartitionOffset> topicPartitionsToAssign =
Arrays.asList(KafkaMessageListenerContainer.this.topicPartitions);
this.definedPartitions = new HashMap<>(topicPartitionsToAssign.size());
this.definedPartitions = new LinkedHashMap<>(topicPartitionsToAssign.size());
for (TopicPartitionOffset topicPartition : topicPartitionsToAssign) {
this.definedPartitions.put(topicPartition.getTopicPartition(),
new OffsetMetadata(topicPartition.getOffset(), topicPartition.isRelativeToCurrent(),
Expand Down Expand Up @@ -2108,7 +2110,7 @@ private void initPartitionsIfNeeded() {
* called until we poll() the consumer. Users can use a ConsumerAwareRebalanceListener
* or a ConsumerSeekAware listener in that case.
*/
Map<TopicPartition, OffsetMetadata> partitions = new HashMap<>(this.definedPartitions);
Map<TopicPartition, OffsetMetadata> partitions = new LinkedHashMap<>(this.definedPartitions);
Set<TopicPartition> beginnings = partitions.entrySet().stream()
.filter(e -> SeekPosition.BEGINNING.equals(e.getValue().seekPosition))
.map(Entry::getKey)
Expand Down Expand Up @@ -2153,6 +2155,12 @@ else if (metadata.relativeToCurrent) {
}
}
}
if (this.consumerSeekAwareListener != null) {
this.consumerSeekAwareListener.onPartitionsAssigned(partitions.keySet().stream()
.map(tp -> new SimpleEntry<>(tp, this.consumer.position(tp)))
.collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue())),
this.seekCallback);
}
}

private void logReset(TopicPartition topicPartition, long newOffset) {
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2019 the original author or authors.
* Copyright 2019-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -70,11 +70,11 @@ public enum SeekPosition {

private final TopicPartition topicPartition;

private final Long offset;
private final SeekPosition position;

private final boolean relativeToCurrent;
private Long offset;

private final SeekPosition position;
private boolean relativeToCurrent;

/**
* Construct an instance with no initial offset management.
Expand Down Expand Up @@ -171,10 +171,28 @@ public Long getOffset() {
return this.offset;
}

/**
* Set the offset.
* @param offset the offset.
* @since 2.5.5
*/
public void setOffset(Long offset) {
this.offset = offset;
}

public boolean isRelativeToCurrent() {
return this.relativeToCurrent;
}

/**
* Set whether the offset is relative to the current position.
* @param relativeToCurrent true for relative to current.
* @since 2.5.5
*/
public void setRelativeToCurrent(boolean relativeToCurrent) {
this.relativeToCurrent = relativeToCurrent;
}

public SeekPosition getPosition() {
return this.position;
}
Expand Down
@@ -0,0 +1,172 @@
/*
* Copyright 2017-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.listener;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;
import org.mockito.InOrder;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

/**
* @author Gary Russell
* @since 2.0.1
*
*/
@SpringJUnitConfig
@DirtiesContext
public class ManualAssignmentInitialSeekTests {

@SuppressWarnings("rawtypes")
@Autowired
private Consumer consumer;

@Autowired
private Config config;

@Autowired
private KafkaListenerEndpointRegistry registry;

/*
* Deliver 6 records from three partitions, fail on the second record second
* partition, first attempt; verify partition 0,1 committed and a total of 7 records
* handled after seek.
*/
@SuppressWarnings("unchecked")
@Test
public void discardRemainingRecordsFromPollAndSeek() throws Exception {
assertThat(this.config.pollLatch.await(10, TimeUnit.SECONDS)).isTrue();
this.registry.stop();
assertThat(this.config.closeLatch.await(10, TimeUnit.SECONDS)).isTrue();
InOrder inOrder = inOrder(this.consumer);
inOrder.verify(this.consumer).assign(any(Collection.class));
inOrder.verify(this.consumer).seekToBeginning(any());
inOrder.verify(this.consumer, atLeastOnce()).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
assertThat(this.config.registerSeekCallbackCalled).isTrue();
assertThat(this.config.partitionsAssignedCalled).isTrue();
assertThat(this.config.assignments).hasSize(3);
}

@Configuration
@EnableKafka
public static class Config extends AbstractConsumerSeekAware {

final CountDownLatch pollLatch = new CountDownLatch(1);

final CountDownLatch closeLatch = new CountDownLatch(1);

volatile boolean registerSeekCallbackCalled;

volatile boolean partitionsAssignedCalled;

volatile Map<TopicPartition, Long> assignments;

@KafkaListener(groupId = "grp",
topicPartitions = @org.springframework.kafka.annotation.TopicPartition(topic = "foo",
partitions = "#{'0,1,2'.split(',')}",
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")))
public void foo(String in) {
}

@SuppressWarnings({ "rawtypes" })
@Bean
public ConsumerFactory consumerFactory() {
ConsumerFactory consumerFactory = mock(ConsumerFactory.class);
final Consumer consumer = consumer();
given(consumerFactory.createConsumer("grp", "", "-0", KafkaTestUtils.defaultPropertyOverrides()))
.willReturn(consumer);
return consumerFactory;
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@Bean
public Consumer consumer() {
final Consumer consumer = mock(Consumer.class);
willAnswer(i -> {
this.pollLatch.countDown();
try {
Thread.sleep(50);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return new ConsumerRecords(Collections.emptyMap());
}).given(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
willAnswer(i -> {
this.closeLatch.countDown();
return null;
}).given(consumer).close();
return consumer;
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.setErrorHandler(new SeekToCurrentErrorHandler());
factory.getContainerProperties().setAckMode(AckMode.RECORD);
factory.getContainerProperties().setDeliveryAttemptHeader(true);
return factory;
}

@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {
super.registerSeekCallback(callback);
this.registerSeekCallbackCalled = true;
}

@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
super.onPartitionsAssigned(assignments, callback);
this.partitionsAssignedCalled = true;
this.assignments = assignments;
callback.seekToBeginning(assignments.keySet());
}

}

}

0 comments on commit 4587d7e

Please sign in to comment.