Skip to content

Commit

Permalink
Improve subBatchPerPartition determination
Browse files Browse the repository at this point in the history
Previously, the `subBatchPartition` was coerced when the `EOSMode`
property was set.
This mechanism relied on `BeanUtils.copyProperties` so it was not
obvious how the default was set to false in 2.6.

Remove the coercion and add explicit code in the determination to check
the `EOSMode`.
  • Loading branch information
garyrussell authored and artembilan committed Nov 13, 2020
1 parent 19bea7f commit 5c77905
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -682,21 +682,18 @@ public EOSMode getEosMode() {
* Set the exactly once semantics mode. When {@link EOSMode#ALPHA} a producer per
* group/topic/partition is used (enabling 'transactional.id fencing`).
* {@link EOSMode#BETA} enables fetch-offset-request fencing, and requires brokers 2.5
* or later. In the 2.6 client, the default will be BETA because the 2.6 client can
* or later. With the 2.6 client, the default is now BETA because the 2.6 client can
* automatically fall back to ALPHA.
* @param eosMode the mode; default ALPHA.
* @since 2.5
*/
public void setEosMode(EOSMode eosMode) {
if (eosMode == null) {
this.eosMode = EOSMode.ALPHA;
this.eosMode = EOSMode.ALPHA; // TODO change this in 2.7 to an assertion
}
else {
this.eosMode = eosMode;
}
if (this.eosMode.equals(EOSMode.BETA) && this.subBatchPerPartition == null) {
this.subBatchPerPartition = false;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,13 @@ private void checkGroupInstance(Properties properties, ConsumerFactory<K, V> con

private boolean setupSubBatchPerPartition() {
Boolean subBatching = this.containerProperties.getSubBatchPerPartition();
return subBatching == null ? this.transactionManager != null : subBatching;
if (subBatching != null) {
return subBatching;
}
if (this.transactionManager == null) {
return false;
}
return this.eosMode.equals(EOSMode.ALPHA);
}

private DeliveryAttemptAware setupDeliveryAttemptAware() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -55,10 +56,15 @@
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties.EOSMode;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.annotation.DirtiesContext.ClassMode;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import org.springframework.transaction.PlatformTransactionManager;

/**
* @author Gary Russell
Expand All @@ -67,6 +73,7 @@
*/
@SpringJUnitConfig
@DirtiesContext(classMode = ClassMode.AFTER_EACH_TEST_METHOD)
@EmbeddedKafka(topics = "sbpp")
public class SubBatchPerPartitionTests {

private static final String CONTAINER_ID = "container";
Expand All @@ -81,6 +88,9 @@ public class SubBatchPerPartitionTests {
@Autowired
private KafkaListenerEndpointRegistry registry;

@Autowired
private EmbeddedKafkaBroker broker;

/*
* Deliver 6 records from three partitions, fail on the second record second
* partition.
Expand All @@ -98,7 +108,7 @@ void discardRemainingRecordsFromPollAndSeek() throws Exception {
inOrder.verify(this.consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class));
inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
inOrder.verify(this.consumer, times(3)).commitSync(any(), eq(Duration.ofSeconds(60)));
inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
inOrder.verify(this.consumer, atLeastOnce()).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
assertThat(this.config.contents).contains("foo", "bar", "baz", "qux", "fiz", "buz");
this.registry.stop();
}
Expand All @@ -116,11 +126,64 @@ void withFilter() throws Exception {
inOrder.verify(this.consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class));
inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
inOrder.verify(this.consumer, times(3)).commitSync(any(), eq(Duration.ofSeconds(60)));
inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
inOrder.verify(this.consumer, atLeastOnce()).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
assertThat(this.config.filtered).contains("bar", "qux", "buz");
this.registry.stop();
}

@Test
void defaults() {
Map<String, Object> props = KafkaTestUtils.consumerProps("sbpp", "false", this.broker);
ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
ContainerProperties containerProps = new ContainerProperties("sbpp");
containerProps.setMessageListener(mock(MessageListener.class));
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
container.start();
assertThat(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.subBatchPerPartition"))
.isEqualTo(Boolean.FALSE);
container.stop();

containerProps = new ContainerProperties("sbpp");
containerProps.setMessageListener(mock(MessageListener.class));
containerProps.setSubBatchPerPartition(true);
container = new KafkaMessageListenerContainer<>(cf, containerProps);
container.start();
assertThat(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.subBatchPerPartition"))
.isEqualTo(Boolean.TRUE);
container.stop();

containerProps = new ContainerProperties("sbpp");
containerProps.setMessageListener(mock(MessageListener.class));
containerProps.setTransactionManager(mock(PlatformTransactionManager.class));
containerProps.setEosMode(EOSMode.ALPHA);
container = new KafkaMessageListenerContainer<>(cf, containerProps);
container.start();
assertThat(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.subBatchPerPartition"))
.isEqualTo(Boolean.TRUE);
container.stop();

containerProps = new ContainerProperties("sbpp");
containerProps.setMessageListener(mock(MessageListener.class));
containerProps.setTransactionManager(mock(PlatformTransactionManager.class));
containerProps.setEosMode(EOSMode.BETA);
container = new KafkaMessageListenerContainer<>(cf, containerProps);
container.start();
assertThat(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.subBatchPerPartition"))
.isEqualTo(Boolean.FALSE);
container.stop();

// default is BETA
containerProps = new ContainerProperties("sbpp");
containerProps.setMessageListener(mock(MessageListener.class));
containerProps.setTransactionManager(mock(PlatformTransactionManager.class));
container = new KafkaMessageListenerContainer<>(cf, containerProps);
container.start();
assertThat(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.subBatchPerPartition"))
.isEqualTo(Boolean.FALSE);
container.stop();
}

@Configuration
@EnableKafka
public static class Config {
Expand Down

0 comments on commit 5c77905

Please sign in to comment.