Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume

private final Map<String, Map<Integer, Long>> offsets = new HashMap<>();

private final GenericMessageListener<?> genericListener;

private final MessageListener<K, V> listener;

private final BatchMessageListener<K, V> batchListener;
Expand Down Expand Up @@ -311,6 +313,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
}
this.consumer = consumer;
GenericErrorHandler<?> errHandler = this.containerProperties.getGenericErrorHandler();
this.genericListener = listener;
if (listener instanceof BatchMessageListener) {
this.listener = null;
this.batchListener = (BatchMessageListener<K, V>) listener;
Expand Down Expand Up @@ -391,7 +394,7 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
KafkaMessageListenerContainer.this.getContainerProperties().getCommitCallback());
}
}
if (ListenerConsumer.this.listener instanceof ConsumerSeekAware) {
if (ListenerConsumer.this.genericListener instanceof ConsumerSeekAware) {
seekPartitions(partitions, false);
}
if (this.consumerAwareListener != null) {
Expand Down Expand Up @@ -431,10 +434,10 @@ public void seekToEnd(String topic, int partition) {

};
if (idle) {
((ConsumerSeekAware) ListenerConsumer.this.listener).onIdleContainer(current, callback);
((ConsumerSeekAware) ListenerConsumer.this.genericListener).onIdleContainer(current, callback);
}
else {
((ConsumerSeekAware) ListenerConsumer.this.listener).onPartitionsAssigned(current, callback);
((ConsumerSeekAware) ListenerConsumer.this.genericListener).onPartitionsAssigned(current, callback);
}
}

Expand Down Expand Up @@ -466,8 +469,8 @@ public boolean isLongLived() {

@Override
public void run() {
if (this.listener instanceof ConsumerSeekAware) {
((ConsumerSeekAware) this.listener).registerSeekCallback(this);
if (this.genericListener instanceof ConsumerSeekAware) {
((ConsumerSeekAware) this.genericListener).registerSeekCallback(this);
}
this.count = 0;
this.last = System.currentTimeMillis();
Expand Down Expand Up @@ -500,7 +503,7 @@ public void run() {
publishIdleContainerEvent(now - lastReceive, this.isConsumerAwareListener
? this.consumer : null);
lastAlertAt = now;
if (this.listener instanceof ConsumerSeekAware) {
if (this.genericListener instanceof ConsumerSeekAware) {
seekPartitions(getAssignedPartitions(), true);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,13 @@ public class KafkaMessageListenerContainerTests {

private static String topic14 = "testTopic14";

private static String topic15 = "testTopic15";

private static String topic16 = "testTopic16";

@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, topic3, topic4, topic5,
topic6, topic7, topic8, topic9, topic10, topic11, topic12, topic13, topic14);
topic6, topic7, topic8, topic9, topic10, topic11, topic12, topic13, topic14, topic15, topic16);

@Rule
public TestName testName = new TestName();
Expand Down Expand Up @@ -724,15 +728,62 @@ public void testSeekAutoCommit() throws Exception {

@Test
public void testSeekAutoCommitDefault() throws Exception {
Map<String, Object> props = KafkaTestUtils.consumerProps("test12", "true", embeddedKafka);
Map<String, Object> props = KafkaTestUtils.consumerProps("test15", "true", embeddedKafka);
props.remove(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); // test true by default
testSeekGuts(props, topic12, true);
testSeekGuts(props, topic15, true);
}

@Test
public void testSeekBatch() throws Exception {
logger.info("Start seek batch seek");
Map<String, Object> props = KafkaTestUtils.consumerProps("test16", "true", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
ContainerProperties containerProps = new ContainerProperties(topic16);
final CountDownLatch registerLatch = new CountDownLatch(1);
final CountDownLatch assignedLatch = new CountDownLatch(1);
final CountDownLatch idleLatch = new CountDownLatch(1);
class Listener implements BatchMessageListener<Integer, String>, ConsumerSeekAware {

@Override
public void onMessage(List<ConsumerRecord<Integer, String>> data) {
// empty
}

@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {
registerLatch.countDown();
}

@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
assignedLatch.countDown();
}

@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
idleLatch.countDown();
}

}
Listener messageListener = new Listener();
containerProps.setMessageListener(messageListener);
containerProps.setSyncCommits(true);
containerProps.setAckOnError(false);
containerProps.setIdleEventInterval(10L);
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf,
containerProps);
container.setBeanName("testBatchSeek");
container.start();
assertThat(registerLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(assignedLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(idleLatch.await(10, TimeUnit.SECONDS)).isTrue();
container.stop();
}

private void testSeekGuts(Map<String, Object> props, String topic, boolean autoCommit) throws Exception {
logger.info("Start seek " + topic);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
ContainerProperties containerProps = new ContainerProperties(topic11);
ContainerProperties containerProps = new ContainerProperties(topic);
final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(6));
final AtomicBoolean seekInitial = new AtomicBoolean();
final CountDownLatch idleLatch = new CountDownLatch(1);
Expand All @@ -749,10 +800,10 @@ public void onMessage(ConsumerRecord<Integer, String> data) {
messageThread = Thread.currentThread();
latch.get().countDown();
if (latch.get().getCount() == 2 && !seekInitial.get()) {
callback.seekToEnd(topic11, 0);
callback.seekToBeginning(topic11, 0);
callback.seek(topic11, 0, 1);
callback.seek(topic11, 1, 1);
callback.seekToEnd(topic, 0);
callback.seekToBeginning(topic, 0);
callback.seek(topic, 0, 1);
callback.seek(topic, 1, 1);
}
}

Expand Down Expand Up @@ -792,7 +843,7 @@ public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekC

KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf,
containerProps);
container.setBeanName("testRecordAcks");
container.setBeanName("testSeek" + topic);
container.start();
assertThat(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.autoCommit", Boolean.class))
.isEqualTo(autoCommit);
Expand All @@ -801,7 +852,7 @@ public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekC
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(topic11);
template.setDefaultTopic(topic);
template.sendDefault(0, 0, "foo");
template.sendDefault(1, 0, "bar");
template.sendDefault(0, 0, "baz");
Expand Down Expand Up @@ -843,11 +894,11 @@ public void publishEvent(ApplicationEvent event) {
ArgumentCaptor<Collection<TopicPartition>> captor = ArgumentCaptor.forClass(Collection.class);
verify(consumer).seekToBeginning(captor.capture());
TopicPartition next = captor.getValue().iterator().next();
assertThat(next.topic()).isEqualTo(topic11);
assertThat(next.topic()).isEqualTo(topic);
assertThat(next.partition()).isEqualTo(0);
verify(consumer).seekToEnd(captor.capture());
next = captor.getValue().iterator().next();
assertThat(next.topic()).isEqualTo(topic11);
assertThat(next.topic()).isEqualTo(topic);
assertThat(next.partition()).isEqualTo(0);
logger.info("Stop seek");
}
Expand Down