Skip to content

Commit

Permalink
GH-945: enable.auto.commit=false by default
Browse files Browse the repository at this point in the history
Resolves #945

Turn off auto commit by default.
  • Loading branch information
garyrussell authored and artembilan committed Feb 27, 2019
1 parent 1eead9d commit 637738b
Show file tree
Hide file tree
Showing 17 changed files with 138 additions and 38 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2018 the original author or authors.
* Copyright 2016-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,6 +21,7 @@
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;

import org.apache.commons.logging.Log;
Expand Down Expand Up @@ -50,6 +51,8 @@ public final class KafkaTestUtils {

private static final Log logger = LogFactory.getLog(KafkaTestUtils.class); // NOSONAR

private static Properties defaults;

private KafkaTestUtils() {
// private ctor
}
Expand Down Expand Up @@ -249,4 +252,18 @@ public static <T> T getPropertyValue(Object root, String propertyPath, Class<T>
return (T) value;
}

/**
* Return a {@link Properties} object equal to the default consumer property overrides.
* Useful when matching arguments in Mockito tests.
* @return the default properties.
* @since 2.2.5
*/
public static Properties defaultPropertyOverrides() {
if (defaults == null) {
Properties props = new Properties();
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
defaults = props;
}
return defaults;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -417,7 +418,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume

private final boolean wantsFullRecords;

private final boolean autoCommit = KafkaMessageListenerContainer.this.consumerFactory.isAutoCommit();
private final boolean autoCommit;

private final boolean isManualAck = this.containerProperties.getAckMode().equals(AckMode.MANUAL);

Expand Down Expand Up @@ -493,15 +494,14 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume

@SuppressWarnings(UNCHECKED)
ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType) {
Assert.state(!this.isAnyManualAck || !this.autoCommit,
() -> "Consumer cannot be configured for auto commit for ackMode "
+ this.containerProperties.getAckMode());
Properties consumerProperties = new Properties(this.containerProperties.getConsumerProperties());
this.autoCommit = determineAutoCommit(consumerProperties);
this.consumer =
KafkaMessageListenerContainer.this.consumerFactory.createConsumer(
this.consumerGroupId,
this.containerProperties.getClientId(),
KafkaMessageListenerContainer.this.clientIdSuffix,
this.containerProperties.getConsumerProperties());
consumerProperties);

this.transactionTemplate = determineTransactionTemplate();
subscribeOrAssignTopics(this.consumer);
Expand Down Expand Up @@ -572,6 +572,27 @@ private TransactionTemplate determineTransactionTemplate() {
: null;
}

private boolean determineAutoCommit(Properties consumerProperties) {
boolean autoCommit;
String autoCommitOverride = consumerProperties.getProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
if (!KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties()
.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)
&& autoCommitOverride == null) {
consumerProperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
autoCommit = false;
}
else if (autoCommitOverride != null) {
autoCommit = Boolean.parseBoolean(autoCommitOverride);
}
else {
autoCommit = KafkaMessageListenerContainer.this.consumerFactory.isAutoCommit();
}
Assert.state(!this.isAnyManualAck || !autoCommit,
() -> "Consumer cannot be configured for auto commit for ackMode "
+ this.containerProperties.getAckMode());
return autoCommit;
}

private Duration determineSyncCommitTimeout() {
if (this.containerProperties.getSyncCommitTimeout() != null) {
return this.containerProperties.getSyncCommitTimeout();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import static org.mockito.Mockito.mock;

import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -34,6 +33,7 @@
import org.junit.jupiter.api.Test;

import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.test.utils.KafkaTestUtils;

/**
* @author Gary Russell
Expand All @@ -55,7 +55,8 @@ public void testCorrectContainerForConsumerError() throws InterruptedException {
Thread.sleep(100);
return new ConsumerRecords<>(Collections.emptyMap());
}).given(consumer).poll(any());
given(consumerFactory.createConsumer("grp", "", "-0", new Properties())).willReturn(consumer);
given(consumerFactory.createConsumer("grp", "", "-0", KafkaTestUtils.defaultPropertyOverrides()))
.willReturn(consumer);
ContainerProperties containerProperties = new ContainerProperties("foo");
containerProperties.setGroupId("grp");
containerProperties.setMessageListener((MessageListener) record -> { });
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
Expand All @@ -43,6 +44,7 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.junit.ClassRule;
import org.junit.Test;
Expand Down Expand Up @@ -105,7 +107,18 @@ public class ConcurrentMessageListenerContainerTests {
public void testAutoCommit() throws Exception {
this.logger.info("Start auto");
Map<String, Object> props = KafkaTestUtils.consumerProps("test1", "true", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
AtomicReference<Properties> overrides = new AtomicReference<>();
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props) {

@Override
protected KafkaConsumer<Integer, String> createKafkaConsumer(String groupId, String clientIdPrefix,
String clientIdSuffixArg, Properties properties) {

overrides.set(properties);
return super.createKafkaConsumer(groupId, clientIdPrefix, clientIdSuffixArg, properties);
}

};
ContainerProperties containerProps = new ContainerProperties(topic1);
containerProps.setLogContainerConfig(true);

Expand Down Expand Up @@ -172,14 +185,26 @@ public void testAutoCommit() throws Exception {
assertThat(children).contains((KafkaMessageListenerContainer<Integer, String>) e.getSource());
}
});
assertThat(overrides.get().getProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)).isNull();
this.logger.info("Stop auto");
}

@Test
public void testAutoCommitWithRebalanceListener() throws Exception {
this.logger.info("Start auto");
Map<String, Object> props = KafkaTestUtils.consumerProps("test10", "true", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
Map<String, Object> props = KafkaTestUtils.consumerProps("test10", "false", embeddedKafka);
AtomicReference<Properties> overrides = new AtomicReference<>();
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props) {

@Override
protected KafkaConsumer<Integer, String> createKafkaConsumer(String groupId, String clientIdPrefix,
String clientIdSuffixArg, Properties properties) {

overrides.set(properties);
return super.createKafkaConsumer(groupId, clientIdPrefix, clientIdSuffixArg, properties);
}

};
ContainerProperties containerProps = new ContainerProperties(topic1);

final CountDownLatch latch = new CountDownLatch(4);
Expand All @@ -189,6 +214,9 @@ public void testAutoCommitWithRebalanceListener() throws Exception {
listenerThreadNames.add(Thread.currentThread().getName());
latch.countDown();
});
Properties consumerProperties = new Properties();
consumerProperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
containerProps.setConsumerProperties(consumerProperties);
final CountDownLatch rebalancePartitionsAssignedLatch = new CountDownLatch(2);
final CountDownLatch rebalancePartitionsRevokedLatch = new CountDownLatch(2);
containerProps.setConsumerRebalanceListener(new ConsumerRebalanceListener() {
Expand Down Expand Up @@ -231,14 +259,27 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
assertThat(threadName).contains("-C-");
}
container.stop();
assertThat(overrides.get().getProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)).isEqualTo("true");
this.logger.info("Stop auto");
}

@Test
public void testAfterListenCommit() throws Exception {
this.logger.info("Start manual");
Map<String, Object> props = KafkaTestUtils.consumerProps("test2", "false", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
props.remove(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
AtomicReference<Properties> overrides = new AtomicReference<>();
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props) {

@Override
protected KafkaConsumer<Integer, String> createKafkaConsumer(String groupId, String clientIdPrefix,
String clientIdSuffixArg, Properties properties) {

overrides.set(properties);
return super.createKafkaConsumer(groupId, clientIdPrefix, clientIdSuffixArg, properties);
}

};
ContainerProperties containerProps = new ContainerProperties(topic2);

final CountDownLatch latch = new CountDownLatch(4);
Expand Down Expand Up @@ -267,6 +308,7 @@ public void testAfterListenCommit() throws Exception {
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
container.stop();
this.logger.info("Stop manual");
assertThat(overrides.get().getProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)).isEqualTo("false");
}

@Test
Expand Down Expand Up @@ -456,6 +498,7 @@ public ConsumerRecords<Integer, String> answer(InvocationOnMock invocation) thro
public void testListenerException() throws Exception {
this.logger.info("Start exception");
Map<String, Object> props = KafkaTestUtils.consumerProps("test1", "true", embeddedKafka);
props.remove(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
ContainerProperties containerProps = new ContainerProperties(topic6);
containerProps.setAckCount(23);
Expand All @@ -467,6 +510,9 @@ public void testListenerException() throws Exception {
latch.countDown();
throw new RuntimeException("intended");
});
Properties consumerProperties = new Properties();
consumerProperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
containerProps.setConsumerProperties(consumerProperties);

ConcurrentMessageListenerContainer<Integer, String> container =
new ConcurrentMessageListenerContainer<>(cf, containerProps);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -55,6 +54,7 @@
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;

Expand Down Expand Up @@ -136,7 +136,8 @@ public void foo(List<String> in) {
public ConsumerFactory consumerFactory() {
ConsumerFactory consumerFactory = mock(ConsumerFactory.class);
final Consumer consumer = consumer();
given(consumerFactory.createConsumer(CONTAINER_ID, "", "-0", new Properties())).willReturn(consumer);
given(consumerFactory.createConsumer(CONTAINER_ID, "", "-0", KafkaTestUtils.defaultPropertyOverrides()))
.willReturn(consumer);
return consumerFactory;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -54,6 +53,7 @@
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;

Expand Down Expand Up @@ -133,7 +133,8 @@ public void foo(String in) {
public ConsumerFactory consumerFactory() {
ConsumerFactory consumerFactory = mock(ConsumerFactory.class);
final Consumer consumer = consumer();
given(consumerFactory.createConsumer(CONTAINER_ID, "", "-0", new Properties())).willReturn(consumer);
given(consumerFactory.createConsumer(CONTAINER_ID, "", "-0", KafkaTestUtils.defaultPropertyOverrides()))
.willReturn(consumer);
return consumerFactory;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -56,6 +55,7 @@
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;

Expand Down Expand Up @@ -147,7 +147,8 @@ public void foo(String in) {
public ConsumerFactory consumerFactory() {
ConsumerFactory consumerFactory = mock(ConsumerFactory.class);
final Consumer consumer = consumer();
given(consumerFactory.createConsumer(CONTAINER_ID, "", "-0", new Properties())).willReturn(consumer);
given(consumerFactory.createConsumer(CONTAINER_ID, "", "-0", KafkaTestUtils.defaultPropertyOverrides()))
.willReturn(consumer);
return consumerFactory;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1028,8 +1028,8 @@ public void testSeekAutoCommit() throws Exception {
@Test
public void testSeekAutoCommitDefault() throws Exception {
Map<String, Object> props = KafkaTestUtils.consumerProps("test15", "true", embeddedKafka);
props.remove(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); // test true by default
testSeekGuts(props, topic15, true);
props.remove(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); // test false by default
testSeekGuts(props, topic15, false);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -56,6 +55,7 @@
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;

Expand Down Expand Up @@ -137,7 +137,8 @@ public void foo(String in) {
public ConsumerFactory consumerFactory() {
ConsumerFactory consumerFactory = mock(ConsumerFactory.class);
final Consumer consumer = consumer();
given(consumerFactory.createConsumer(CONTAINER_ID, "", "-0", new Properties())).willReturn(consumer);
given(consumerFactory.createConsumer(CONTAINER_ID, "", "-0", KafkaTestUtils.defaultPropertyOverrides()))
.willReturn(consumer);
return consumerFactory;
}

Expand Down
Loading

0 comments on commit 637738b

Please sign in to comment.