diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java index 682630445e..9e16658a84 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java @@ -204,6 +204,7 @@ public DefaultKafkaProducerFactory(Map configs, setTransactionIdPrefix(txId); this.configs.remove(ProducerConfig.TRANSACTIONAL_ID_CONFIG); } + this.configs.put("internal.auto.downgrade.txn.commit", true); } @Override diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java index 730a6715d4..57f558a1ca 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java @@ -239,7 +239,7 @@ public enum EOSMode { private boolean deliveryAttemptHeader; - private EOSMode eosMode; + private EOSMode eosMode = EOSMode.BETA; private TransactionDefinition transactionDefinition; diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/CommitOnAssignmentTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/CommitOnAssignmentTests.java index a691da95dc..d173acc35e 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/CommitOnAssignmentTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/CommitOnAssignmentTests.java @@ -22,6 +22,7 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.willAnswer; +import static org.mockito.BDDMockito.willReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -37,6 +38,7 @@ import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -118,7 +120,7 @@ void testLatestOnlyTx() throws InterruptedException { willAnswer(inv -> { latch.countDown(); return null; - }).given(producer).sendOffsetsToTransaction(any(), anyString()); + }).given(producer).sendOffsetsToTransaction(any(), any(ConsumerGroupMetadata.class)); props.setTransactionManager(tm); this.registry.start(); assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); @@ -196,6 +198,7 @@ public Consumer consumer() { this.closeLatch.countDown(); return null; }).given(consumer).close(); + willReturn(new ConsumerGroupMetadata("")).given(consumer).groupMetadata(); return consumer; } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentBatchErrorHandlerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentBatchErrorHandlerTests.java index a1a50f6d5e..7019025422 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentBatchErrorHandlerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentBatchErrorHandlerTests.java @@ -24,6 +24,7 @@ import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.willAnswer; +import static org.mockito.BDDMockito.willReturn; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; @@ -41,6 +42,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -117,7 +119,7 @@ void discardRemainingRecordsFromPollAndSeek() throws Exception { offsets.put(new TopicPartition("foo", 0), new OffsetAndMetadata(2L)); offsets.put(new TopicPartition("foo", 1), new OffsetAndMetadata(2L)); offsets.put(new TopicPartition("foo", 2), new OffsetAndMetadata(2L)); - inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID); + inOrder.verify(this.producer).sendOffsetsToTransaction(eq(offsets), any(ConsumerGroupMetadata.class)); inOrder.verify(this.producer).commitTransaction(); assertThat(this.config.ehException).isInstanceOf(ListenerExecutionFailedException.class); assertThat(((ListenerExecutionFailedException) this.config.ehException).getGroupId()).isEqualTo(CONTAINER_ID); @@ -254,6 +256,7 @@ public Consumer consumer() { this.closeLatch.countDown(); return null; }).given(consumer).close(); + willReturn(new ConsumerGroupMetadata(CONTAINER_ID)).given(consumer).groupMetadata(); return consumer; } @@ -275,7 +278,6 @@ public void handle(Exception thrownException, ConsumerRecords data, Consum }); factory.setBatchListener(true); factory.getContainerProperties().setTransactionManager(tm()); - factory.getContainerProperties().setSubBatchPerPartition(false); factory.setMissingTopicsFatal(false); return factory; } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentOnErrorBatchModeTXTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentOnErrorBatchModeTXTests.java index 5202c8c935..99b3668438 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentOnErrorBatchModeTXTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentOnErrorBatchModeTXTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2019 the original author or authors. + * Copyright 2017-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,9 +18,11 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.willAnswer; +import static org.mockito.BDDMockito.willReturn; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; @@ -37,6 +39,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -105,17 +108,17 @@ public void discardRemainingRecordsFromPollAndSeek() throws Exception { inOrder.verify(this.producer).beginTransaction(); Map offsets = new LinkedHashMap<>(); offsets.put(new TopicPartition("foo", 0), new OffsetAndMetadata(1L)); - inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID); + inOrder.verify(this.producer).sendOffsetsToTransaction(eq(offsets), any(ConsumerGroupMetadata.class)); inOrder.verify(this.producer).commitTransaction(); offsets.clear(); offsets.put(new TopicPartition("foo", 0), new OffsetAndMetadata(2L)); inOrder.verify(this.producer).beginTransaction(); - inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID); + inOrder.verify(this.producer).sendOffsetsToTransaction(eq(offsets), any(ConsumerGroupMetadata.class)); inOrder.verify(this.producer).commitTransaction(); offsets.clear(); offsets.put(new TopicPartition("foo", 1), new OffsetAndMetadata(1L)); inOrder.verify(this.producer).beginTransaction(); - inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID); + inOrder.verify(this.producer).sendOffsetsToTransaction(eq(offsets), any(ConsumerGroupMetadata.class)); inOrder.verify(this.producer).commitTransaction(); inOrder.verify(this.producer).beginTransaction(); inOrder.verify(this.consumer).seek(new TopicPartition("foo", 1), 1L); @@ -125,17 +128,17 @@ public void discardRemainingRecordsFromPollAndSeek() throws Exception { offsets.clear(); offsets.put(new TopicPartition("foo", 1), new OffsetAndMetadata(2L)); inOrder.verify(this.producer).beginTransaction(); - inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID); + inOrder.verify(this.producer).sendOffsetsToTransaction(eq(offsets), any(ConsumerGroupMetadata.class)); inOrder.verify(this.producer).commitTransaction(); offsets.clear(); offsets.put(new TopicPartition("foo", 2), new OffsetAndMetadata(1L)); inOrder.verify(this.producer).beginTransaction(); - inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID); + inOrder.verify(this.producer).sendOffsetsToTransaction(eq(offsets), any(ConsumerGroupMetadata.class)); inOrder.verify(this.producer).commitTransaction(); offsets.clear(); offsets.put(new TopicPartition("foo", 2), new OffsetAndMetadata(2L)); inOrder.verify(this.producer).beginTransaction(); - inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID); + inOrder.verify(this.producer).sendOffsetsToTransaction(eq(offsets), any(ConsumerGroupMetadata.class)); inOrder.verify(this.producer).commitTransaction(); inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT)); assertThat(this.config.count).isEqualTo(7); @@ -224,6 +227,7 @@ public Consumer consumer() { this.closeLatch.countDown(); return null; }).given(consumer).close(); + willReturn(new ConsumerGroupMetadata(CONTAINER_ID)).given(consumer).groupMetadata(); return consumer; } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentOnErrorRecordModeTXTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentOnErrorRecordModeTXTests.java index 0c65a85321..b0ef7238cc 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentOnErrorRecordModeTXTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentOnErrorRecordModeTXTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2019 the original author or authors. + * Copyright 2017-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,9 +19,11 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.willAnswer; +import static org.mockito.BDDMockito.willReturn; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; @@ -38,6 +40,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -106,15 +109,15 @@ public void discardRemainingRecordsFromPollAndSeek() throws Exception { inOrder.verify(this.producer).beginTransaction(); Map offsets = new LinkedHashMap<>(); offsets.put(new TopicPartition("foo", 0), new OffsetAndMetadata(1L)); - inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID); + inOrder.verify(this.producer).sendOffsetsToTransaction(eq(offsets), any(ConsumerGroupMetadata.class)); inOrder.verify(this.producer).commitTransaction(); offsets.clear(); offsets.put(new TopicPartition("foo", 0), new OffsetAndMetadata(2L)); - inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID); + inOrder.verify(this.producer).sendOffsetsToTransaction(eq(offsets), any(ConsumerGroupMetadata.class)); inOrder.verify(this.producer).commitTransaction(); offsets.clear(); offsets.put(new TopicPartition("foo", 1), new OffsetAndMetadata(1L)); - inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID); + inOrder.verify(this.producer).sendOffsetsToTransaction(eq(offsets), any(ConsumerGroupMetadata.class)); inOrder.verify(this.producer).commitTransaction(); inOrder.verify(this.consumer).seek(new TopicPartition("foo", 1), 1L); inOrder.verify(this.consumer).seek(new TopicPartition("foo", 2), 0L); @@ -122,15 +125,15 @@ public void discardRemainingRecordsFromPollAndSeek() throws Exception { inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT)); offsets.clear(); offsets.put(new TopicPartition("foo", 1), new OffsetAndMetadata(2L)); - inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID); + inOrder.verify(this.producer).sendOffsetsToTransaction(eq(offsets), any(ConsumerGroupMetadata.class)); inOrder.verify(this.producer).commitTransaction(); offsets.clear(); offsets.put(new TopicPartition("foo", 2), new OffsetAndMetadata(1L)); - inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID); + inOrder.verify(this.producer).sendOffsetsToTransaction(eq(offsets), any(ConsumerGroupMetadata.class)); inOrder.verify(this.producer).commitTransaction(); offsets.clear(); offsets.put(new TopicPartition("foo", 2), new OffsetAndMetadata(2L)); - inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID); + inOrder.verify(this.producer).sendOffsetsToTransaction(eq(offsets), any(ConsumerGroupMetadata.class)); inOrder.verify(this.producer).commitTransaction(); inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT)); assertThat(this.config.count).isEqualTo(7); @@ -225,6 +228,7 @@ public Consumer consumer() { this.closeLatch.countDown(); return null; }).given(consumer).close(); + willReturn(new ConsumerGroupMetadata(CONTAINER_ID)).given(consumer).groupMetadata(); return consumer; } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/SubBatchPerPartitionTxRollbackTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/SubBatchPerPartitionTxRollbackTests.java index df3dbe0b6e..6fac3d5a53 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/SubBatchPerPartitionTxRollbackTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/SubBatchPerPartitionTxRollbackTests.java @@ -18,9 +18,11 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.willAnswer; +import static org.mockito.BDDMockito.willReturn; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; @@ -37,6 +39,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -101,7 +104,7 @@ public void abortSecondBatch() throws Exception { inOrder.verify(this.producer).beginTransaction(); Map offsets = new LinkedHashMap<>(); offsets.put(new TopicPartition("foo", 0), new OffsetAndMetadata(2L)); - inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID); + inOrder.verify(this.producer).sendOffsetsToTransaction(eq(offsets), any(ConsumerGroupMetadata.class)); inOrder.verify(this.producer).commitTransaction(); inOrder.verify(this.producer).beginTransaction(); inOrder.verify(this.producer).abortTransaction(); @@ -109,12 +112,12 @@ public void abortSecondBatch() throws Exception { offsets.clear(); offsets.put(new TopicPartition("foo", 2), new OffsetAndMetadata(2L)); inOrder.verify(this.producer).beginTransaction(); - inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID); + inOrder.verify(this.producer).sendOffsetsToTransaction(eq(offsets), any(ConsumerGroupMetadata.class)); inOrder.verify(this.producer).commitTransaction(); offsets.clear(); offsets.put(new TopicPartition("foo", 1), new OffsetAndMetadata(2L)); inOrder.verify(this.producer).beginTransaction(); - inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID); + inOrder.verify(this.producer).sendOffsetsToTransaction(eq(offsets), any(ConsumerGroupMetadata.class)); inOrder.verify(this.producer).commitTransaction(); assertThat(this.config.contents).containsExactly("foo", "bar", "baz", "qux", "fiz", "buz", "baz", "qux"); assertThat(this.config.transactionSuffix).isNotNull(); @@ -204,6 +207,7 @@ public Consumer consumer() { this.closeLatch.countDown(); return null; }).given(consumer).close(); + willReturn(new ConsumerGroupMetadata(CONTAINER_ID)).given(consumer).groupMetadata(); return consumer; } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/SubBatchPerPartitionTxTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/SubBatchPerPartitionTxTests.java index 515733b38b..a0fb3cd1c8 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/SubBatchPerPartitionTxTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/SubBatchPerPartitionTxTests.java @@ -18,9 +18,11 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.willAnswer; +import static org.mockito.BDDMockito.willReturn; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; @@ -37,6 +39,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -101,17 +104,17 @@ public void threeTransactionsForThreeSubBatches() throws Exception { inOrder.verify(this.producer).beginTransaction(); Map offsets = new LinkedHashMap<>(); offsets.put(new TopicPartition("foo", 0), new OffsetAndMetadata(2L)); - inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID); + inOrder.verify(this.producer).sendOffsetsToTransaction(eq(offsets), any(ConsumerGroupMetadata.class)); inOrder.verify(this.producer).commitTransaction(); offsets.clear(); offsets.put(new TopicPartition("foo", 1), new OffsetAndMetadata(2L)); inOrder.verify(this.producer).beginTransaction(); - inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID); + inOrder.verify(this.producer).sendOffsetsToTransaction(eq(offsets), any(ConsumerGroupMetadata.class)); inOrder.verify(this.producer).commitTransaction(); offsets.clear(); offsets.put(new TopicPartition("foo", 2), new OffsetAndMetadata(2L)); inOrder.verify(this.producer).beginTransaction(); - inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID); + inOrder.verify(this.producer).sendOffsetsToTransaction(eq(offsets), any(ConsumerGroupMetadata.class)); inOrder.verify(this.producer).commitTransaction(); assertThat(this.config.contents).containsExactly("foo", "bar", "baz", "qux", "fiz", "buz"); assertThat(this.config.transactionSuffix).isNotNull(); @@ -190,6 +193,7 @@ public Consumer consumer() { this.closeLatch.countDown(); return null; }).given(consumer).close(); + willReturn(new ConsumerGroupMetadata(CONTAINER_ID)).given(consumer).groupMetadata(); return consumer; } @@ -200,6 +204,7 @@ public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { factory.setConsumerFactory(consumerFactory()); factory.getContainerProperties().setAckMode(AckMode.BATCH); factory.getContainerProperties().setTransactionManager(tm()); + factory.getContainerProperties().setSubBatchPerPartition(true); factory.setBatchListener(true); return factory; } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java index 4dd1760380..b951180f54 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java @@ -23,6 +23,7 @@ import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.willAnswer; @@ -494,6 +495,7 @@ public void testConsumeAndProduceTransactionExternalTM() throws Exception { public void testRollbackRecord() throws Exception { logger.info("Start testRollbackRecord"); Map props = KafkaTestUtils.consumerProps("txTest1", "false", embeddedKafka); +// props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); test fallback to EOSMode.ALPHA props.put(ConsumerConfig.GROUP_ID_CONFIG, "group"); props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); @@ -502,6 +504,7 @@ public void testRollbackRecord() throws Exception { containerProps.setPollTimeout(10_000); Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); +// senderProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); senderProps.put(ProducerConfig.RETRIES_CONFIG, 1); DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); pf.setTransactionIdPrefix("rr."); @@ -674,7 +677,8 @@ public void accept(ConsumerRecord record, Exception exception) { verify(afterRollbackProcessor).clearThreadState(); verify(dlTemplate).send(any(ProducerRecord.class)); verify(dlTemplate).sendOffsetsToTransaction( - Collections.singletonMap(new TopicPartition(topic3, 0), new OffsetAndMetadata(1L))); + eq(Collections.singletonMap(new TopicPartition(topic3, 0), new OffsetAndMetadata(1L))), + any(ConsumerGroupMetadata.class)); logger.info("Stop testMaxAttempts"); } diff --git a/src/reference/asciidoc/changes-since-1.0.adoc b/src/reference/asciidoc/changes-since-1.0.adoc index 42a4cb86d3..87c0307326 100644 --- a/src/reference/asciidoc/changes-since-1.0.adoc +++ b/src/reference/asciidoc/changes-since-1.0.adoc @@ -1,4 +1,122 @@ [[migration]] +=== Changes between 2.4 and 2.5 + +This section covers the changes made from version 2.4 to version 2.5. +For changes in earlier version, see <>. + +Also see <>. + +[[x25-factory-listeners]] +==== Consumer/Producer Factory Changes + +The default consumer and producer factories can now invoke a callback whenever a consumer or producer is created or closed. +Implementations for native Micrometer metrics are provided. +See <> for more information. + +You can now change bootstrap server properties at runtime, enabling failover to another Kafka cluster. +See <> for more information. + +[[x25-streams-listeners]] +==== `StreamsBuilderFactoryBean` Changes + +The factory bean can now invoke a callback whenever a `KafkaStreams` created or destroyed. +An Implementation for native Micrometer metrics is provided. +See <> for more information. + +[[x25-kafka-client]] +==== Kafka Client Version + +This version requires the 2.5.0 `kafka-clients`. + +==== Class/Package Changes + +`SeekUtils` has been moved from the `o.s.k.support` package to `o.s.k.listener`. + +[[x25-delivery]] +==== Delivery Attempts Header + +There is now an option to to add a header which tracks delivery attempts when using certain error handlers and after rollback processors. +See <> for more information. + +[[x25-message-return]] +==== @KafkaListener Changes + +Default reply headers will now be populated automatically if needed when a `@KafkaListener` return type is `Message`. +See <> for more information. + +The `KafkaHeaders.RECEIVED_MESSAGE_KEY` is no longer populated with a `null` value when the incoming record has a `null` key; the header is omitted altogether. + +`@KafkaListener` methods can now specify a `ConsumerRecordMetadata` parameter instead of using discrete headers for metadata such as topic, partition, etc. +See <> for more information. + +[[x25-container]] +==== Listener Container Changes + +The `assignmentCommitOption` container property is now `LATEST_ONLY_NO_TX` by default. +See <> for more information. + +The `subBatchPerPartition` container property is now `true` by default when using transactions. +See <> for more information. + +A new `RecoveringBatchErrorHandler` is now provided. +See <> for more information. + +Static group membership is now supported. +See <> for more information. + +When incremental/cooperative rebalancing is configured, if offsets fail to commit with a non-fatal `RebalanceInProgressException`, the container will attempt to re-commit the offsets for the partitions that remain assigned to this instance after the rebalance is completed. + +The default error handler is now the `SeekToCurrentErrorHandler` for record listeners and `RecoveringBatchErrorHandler` for batch listeners. +See <> for more information. + +You can now control the level at which exceptions intentionally thrown by standard error handlers are logged. +See <> for more information. + +The `getAssignmentsByClientId()` method has been added, making it easier to determine which consumers in a concurrent container are assigned which partition(s). +See <> for more information. + +You can now suppress logging entire `ConsumerRecord` s in error, debug logs etc. +See `onlyLogRecordMetadata` in <>. + +[[x25-template]] +==== KafkaTemplate Changes + +The `KafkaTemplate` can now maintain micrometer timers. +See <> for more information. + +The `KafkaTemplate` can now be configured with `ProducerConfig` properties to override those in the producer factory. +See <> for more information. + +A `RoutingKafkaTemplate` has now been provided. +See <> for more information. + +You can now use `KafkaSendCallback` instead of `ListenerFutureCallback` to get a narrower exception, making it easier to extract the failed `ProducerRecord`. +See <> for more information. + +[[x25-string-serializer]] +==== Kafka String Serializer/Deserializer + +New `ToStringSerializer`/`StringDeserializer` s as well as an associated `SerDe` are now provided. +See <> for more information. + +[[x25-json-deser]] +==== JsonDeserializer + +The `JsonDeserializer` now has more flexibility to determine the deserialization type. +See <> for more information. + +[[x25-delegate-serde]] +==== Delegating Serializer/Deserializer + +The `DelegatingSerializer` can now handle "standard" types, when the outbound record has no header. +See <> for more information. + +[[x25-testing]] +==== Testing Changes + +The `KafkaTestUtils.consumerProps()` helper record now sets `ConsumerConfig.AUTO_OFFSET_RESET_CONFIG` to `earliest` by default. +See <> for more information. + === Changes between 2.3 and 2.4 [[kafka-client-2.4]] diff --git a/src/reference/asciidoc/kafka.adoc b/src/reference/asciidoc/kafka.adoc index ea42202c9b..a41979d0a7 100644 --- a/src/reference/asciidoc/kafka.adoc +++ b/src/reference/asciidoc/kafka.adoc @@ -3178,12 +3178,12 @@ If the listener successfully processes the record (or multiple records, when usi If the listener throws an exception, the transaction is rolled back and the consumer is repositioned so that the rolled-back record(s) can be retrieved on the next poll. See <> for more information and for handling records that repeatedly fail. -Using transactions enables exactly once semantics (EOS). +Using transactions enables Exactly Once Semantics (EOS). This means that, for a `read->process-write` sequence, it is guaranteed that the **sequence** is completed exactly once. (The read and process are have at least once semantics). -Kafka version 2.5 now supports two EOS modes: +Spring for Apache Kafka version 2.5 and later supports two EOS modes: * `ALPHA` - aka `transactional.id` fencing (since version 0.11.0.0) * `BETA` - aka fetch-offset-request fencing (since version 2.5) @@ -3193,18 +3193,18 @@ Spring manages this by using a `Producer` for each `group.id/topic/partition`; w With mode `BETA`, it is not necessary to have a producer for each `group.id/topic/partition` because consumer metadata is sent along with the offsets to the transaction and the broker can determine if the producer is fenced using that information instead. -To configure the container to use mode `BETA`, set the container property `EOSMode` to `BETA`. +Starting with version 2.6, the default `EOSMode` is `BETA`. -IMPORTANT: With `BETA`, your brokers must be version 2.5 or later, unless you are using `kafka-clients` version 2.6; the producer in that version can automatically fall back to `ALPHA` if the broker does not support `BETA`. -To enable the fall back in the 2.6 client; set `internal.auto.downgrade.txn.commit=true`. +To configure the container to use mode `ALPHA`, set the container property `EOSMode` to `ALPHA`, to revert to the previous behavior. -You should also set the `DefaultKafkaConsumerFactory` `producerPerConsumerPartition` property to `false`, to reduce the number of producers needed. +IMPORTANT: With `BETA`, your brokers must be version 2.5 or later, however with `kafka-clients` version 2.6, the producer will automatically fall back to `ALPHA` if the broker does not support `BETA`. +The `DefaultKafkaProducerFactory` is configured to enable that behavior. +If your brokers are earlier than 2.5, be sure to leave the `DefaultKafkaConsumerFactory` `producerPerConsumerPartition` set to `true` and, if you are using a batch listener, you should set `subBatchPerPartition` to `true`. -If your brokers are upgraded to 2.5, you can immediately enable `BETA` mode. - -If your brokers are older than 2.5, when the 2.6 client is available, you can enable `BETA` mode, but leave the `producerPerConsumerPartition` to its default `true` and enable fall back as discussed above. When your brokers are upgraded to 2.5 or later, the producer will automatically switch to using mode `BETA`, but the number of producers will remain as before. -You can then do a rolling upgrade of your application with `producerPerConsumerPartition` set to `false` to reduce the number of producers. +You can then do a rolling upgrade of your application with `producerPerConsumerPartition` set to `false` to reduce the number of producers; you should also no longer set the `subBatchPerPartition` container property. + +If your brokers are already 2.5 or newer, you should set the `DefaultKafkaConsumerFactory` `producerPerConsumerPartition` property to `false`, to reduce the number of producers needed. When using `BETA` mode, it is no longer necessary to set the `subBatchPerPartition` to `true`; it will default to `false` when the `EOSMode` is `BETA`. diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index 52348c69d6..d4ed4a8a84 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -1,117 +1,14 @@ -=== What's New in 2.5 Since 2.4 +=== What's New in 2.6 Since 2.5 -This section covers the changes made from version 2.4 to version 2.5. +This section covers the changes made from version 2.5 to version 2.6. For changes in earlier version, see <>. -Also see <>. - -[[x25-factory-listeners]] -==== Consumer/Producer Factory Changes - -The default consumer and producer factories can now invoke a callback whenever a consumer or producer is created or closed. -Implementations for native Micrometer metrics are provided. -See <> for more information. - -You can now change bootstrap server properties at runtime, enabling failover to another Kafka cluster. -See <> for more information. - -[[x25-streams-listeners]] -==== `StreamsBuilderFactoryBean` Changes - -The factory bean can now invoke a callback whenever a `KafkaStreams` created or destroyed. -An Implementation for native Micrometer metrics is provided. -See <> for more information. - -[[x25-kafka-client]] +[[x26-kafka-client]] ==== Kafka Client Version -This version requires the 2.5.0 `kafka-clients`. - -==== Class/Package Changes - -`SeekUtils` has been moved from the `o.s.k.support` package to `o.s.k.listener`. - -[[x25-delivery]] -==== Delivery Attempts Header +This version requires the 2.6.0 `kafka-clients`. -There is now an option to to add a header which tracks delivery attempts when using certain error handlers and after rollback processors. -See <> for more information. - -[[x25-message-return]] -==== @KafkaListener Changes - -Default reply headers will now be populated automatically if needed when a `@KafkaListener` return type is `Message`. -See <> for more information. - -The `KafkaHeaders.RECEIVED_MESSAGE_KEY` is no longer populated with a `null` value when the incoming record has a `null` key; the header is omitted altogether. - -`@KafkaListener` methods can now specify a `ConsumerRecordMetadata` parameter instead of using discrete headers for metadata such as topic, partition, etc. -See <> for more information. - -[[x25-container]] ==== Listener Container Changes -The `assignmentCommitOption` container property is now `LATEST_ONLY_NO_TX` by default. -See <> for more information. - -The `subBatchPerPartition` container property is now `true` by default when using transactions. -See <> for more information. - -A new `RecoveringBatchErrorHandler` is now provided. -See <> for more information. - -Static group membership is now supported. -See <> for more information. - -When incremental/cooperative rebalancing is configured, if offsets fail to commit with a non-fatal `RebalanceInProgressException`, the container will attempt to re-commit the offsets for the partitions that remain assigned to this instance after the rebalance is completed. - -The default error handler is now the `SeekToCurrentErrorHandler` for record listeners and `RecoveringBatchErrorHandler` for batch listeners. -See <> for more information. - -You can now control the level at which exceptions intentionally thrown by standard error handlers are logged. -See <> for more information. - -The `getAssignmentsByClientId()` method has been added, making it easier to determine which consumers in a concurrent container are assigned which partition(s). -See <> for more information. - -You can now suppress logging entire `ConsumerRecord` s in error, debug logs etc. -See `onlyLogRecordMetadata` in <>. - -[[x25-template]] -==== KafkaTemplate Changes - -The `KafkaTemplate` can now maintain micrometer timers. -See <> for more information. - -The `KafkaTemplate` can now be configured with `ProducerConfig` properties to override those in the producer factory. -See <> for more information. - -A `RoutingKafkaTemplate` has now been provided. -See <> for more information. - -You can now use `KafkaSendCallback` instead of `ListenerFutureCallback` to get a narrower exception, making it easier to extract the failed `ProducerRecord`. -See <> for more information. - -[[x25-string-serializer]] -==== Kafka String Serializer/Deserializer - -New `ToStringSerializer`/`StringDeserializer` s as well as an associated `SerDe` are now provided. -See <> for more information. - -[[x25-json-deser]] -==== JsonDeserializer - -The `JsonDeserializer` now has more flexibility to determine the deserialization type. -See <> for more information. - -[[x25-delegate-serde]] -==== Delegating Serializer/Deserializer - -The `DelegatingSerializer` can now handle "standard" types, when the outbound record has no header. -See <> for more information. - -[[x25-testing]] -==== Testing Changes - -The `KafkaTestUtils.consumerProps()` helper record now sets `ConsumerConfig.AUTO_OFFSET_RESET_CONFIG` to `earliest` by default. -See <> for more information. +The default `EOSMode` is now `BETA`. +See <> for more information.