Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ public DefaultKafkaProducerFactory(Map<String, Object> configs,
setTransactionIdPrefix(txId);
this.configs.remove(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
}
this.configs.put("internal.auto.downgrade.txn.commit", true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ public enum EOSMode {

private boolean deliveryAttemptHeader;

private EOSMode eosMode;
private EOSMode eosMode = EOSMode.BETA;

private TransactionDefinition transactionDefinition;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.BDDMockito.willReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
Expand All @@ -37,6 +38,7 @@
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
Expand Down Expand Up @@ -118,7 +120,7 @@ void testLatestOnlyTx() throws InterruptedException {
willAnswer(inv -> {
latch.countDown();
return null;
}).given(producer).sendOffsetsToTransaction(any(), anyString());
}).given(producer).sendOffsetsToTransaction(any(), any(ConsumerGroupMetadata.class));
props.setTransactionManager(tm);
this.registry.start();
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
Expand Down Expand Up @@ -196,6 +198,7 @@ public Consumer consumer() {
this.closeLatch.countDown();
return null;
}).given(consumer).close();
willReturn(new ConsumerGroupMetadata("")).given(consumer).groupMetadata();
return consumer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.BDDMockito.willReturn;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;

Expand All @@ -41,6 +42,7 @@
import java.util.concurrent.atomic.AtomicReference;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
Expand Down Expand Up @@ -117,7 +119,7 @@ void discardRemainingRecordsFromPollAndSeek() throws Exception {
offsets.put(new TopicPartition("foo", 0), new OffsetAndMetadata(2L));
offsets.put(new TopicPartition("foo", 1), new OffsetAndMetadata(2L));
offsets.put(new TopicPartition("foo", 2), new OffsetAndMetadata(2L));
inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID);
inOrder.verify(this.producer).sendOffsetsToTransaction(eq(offsets), any(ConsumerGroupMetadata.class));
inOrder.verify(this.producer).commitTransaction();
assertThat(this.config.ehException).isInstanceOf(ListenerExecutionFailedException.class);
assertThat(((ListenerExecutionFailedException) this.config.ehException).getGroupId()).isEqualTo(CONTAINER_ID);
Expand Down Expand Up @@ -254,6 +256,7 @@ public Consumer consumer() {
this.closeLatch.countDown();
return null;
}).given(consumer).close();
willReturn(new ConsumerGroupMetadata(CONTAINER_ID)).given(consumer).groupMetadata();
return consumer;
}

Expand All @@ -275,7 +278,6 @@ public void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consum
});
factory.setBatchListener(true);
factory.getContainerProperties().setTransactionManager(tm());
factory.getContainerProperties().setSubBatchPerPartition(false);
factory.setMissingTopicsFatal(false);
return factory;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2019 the original author or authors.
* Copyright 2017-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,9 +18,11 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.BDDMockito.willReturn;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;

Expand All @@ -37,6 +39,7 @@
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
Expand Down Expand Up @@ -105,17 +108,17 @@ public void discardRemainingRecordsFromPollAndSeek() throws Exception {
inOrder.verify(this.producer).beginTransaction();
Map<TopicPartition, OffsetAndMetadata> offsets = new LinkedHashMap<>();
offsets.put(new TopicPartition("foo", 0), new OffsetAndMetadata(1L));
inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID);
inOrder.verify(this.producer).sendOffsetsToTransaction(eq(offsets), any(ConsumerGroupMetadata.class));
inOrder.verify(this.producer).commitTransaction();
offsets.clear();
offsets.put(new TopicPartition("foo", 0), new OffsetAndMetadata(2L));
inOrder.verify(this.producer).beginTransaction();
inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID);
inOrder.verify(this.producer).sendOffsetsToTransaction(eq(offsets), any(ConsumerGroupMetadata.class));
inOrder.verify(this.producer).commitTransaction();
offsets.clear();
offsets.put(new TopicPartition("foo", 1), new OffsetAndMetadata(1L));
inOrder.verify(this.producer).beginTransaction();
inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID);
inOrder.verify(this.producer).sendOffsetsToTransaction(eq(offsets), any(ConsumerGroupMetadata.class));
inOrder.verify(this.producer).commitTransaction();
inOrder.verify(this.producer).beginTransaction();
inOrder.verify(this.consumer).seek(new TopicPartition("foo", 1), 1L);
Expand All @@ -125,17 +128,17 @@ public void discardRemainingRecordsFromPollAndSeek() throws Exception {
offsets.clear();
offsets.put(new TopicPartition("foo", 1), new OffsetAndMetadata(2L));
inOrder.verify(this.producer).beginTransaction();
inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID);
inOrder.verify(this.producer).sendOffsetsToTransaction(eq(offsets), any(ConsumerGroupMetadata.class));
inOrder.verify(this.producer).commitTransaction();
offsets.clear();
offsets.put(new TopicPartition("foo", 2), new OffsetAndMetadata(1L));
inOrder.verify(this.producer).beginTransaction();
inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID);
inOrder.verify(this.producer).sendOffsetsToTransaction(eq(offsets), any(ConsumerGroupMetadata.class));
inOrder.verify(this.producer).commitTransaction();
offsets.clear();
offsets.put(new TopicPartition("foo", 2), new OffsetAndMetadata(2L));
inOrder.verify(this.producer).beginTransaction();
inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID);
inOrder.verify(this.producer).sendOffsetsToTransaction(eq(offsets), any(ConsumerGroupMetadata.class));
inOrder.verify(this.producer).commitTransaction();
inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
assertThat(this.config.count).isEqualTo(7);
Expand Down Expand Up @@ -224,6 +227,7 @@ public Consumer consumer() {
this.closeLatch.countDown();
return null;
}).given(consumer).close();
willReturn(new ConsumerGroupMetadata(CONTAINER_ID)).given(consumer).groupMetadata();
return consumer;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2019 the original author or authors.
* Copyright 2017-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,9 +19,11 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.BDDMockito.willReturn;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;

Expand All @@ -38,6 +40,7 @@
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
Expand Down Expand Up @@ -106,31 +109,31 @@ public void discardRemainingRecordsFromPollAndSeek() throws Exception {
inOrder.verify(this.producer).beginTransaction();
Map<TopicPartition, OffsetAndMetadata> offsets = new LinkedHashMap<>();
offsets.put(new TopicPartition("foo", 0), new OffsetAndMetadata(1L));
inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID);
inOrder.verify(this.producer).sendOffsetsToTransaction(eq(offsets), any(ConsumerGroupMetadata.class));
inOrder.verify(this.producer).commitTransaction();
offsets.clear();
offsets.put(new TopicPartition("foo", 0), new OffsetAndMetadata(2L));
inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID);
inOrder.verify(this.producer).sendOffsetsToTransaction(eq(offsets), any(ConsumerGroupMetadata.class));
inOrder.verify(this.producer).commitTransaction();
offsets.clear();
offsets.put(new TopicPartition("foo", 1), new OffsetAndMetadata(1L));
inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID);
inOrder.verify(this.producer).sendOffsetsToTransaction(eq(offsets), any(ConsumerGroupMetadata.class));
inOrder.verify(this.producer).commitTransaction();
inOrder.verify(this.consumer).seek(new TopicPartition("foo", 1), 1L);
inOrder.verify(this.consumer).seek(new TopicPartition("foo", 2), 0L);
inOrder.verify(this.producer).abortTransaction();
inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
offsets.clear();
offsets.put(new TopicPartition("foo", 1), new OffsetAndMetadata(2L));
inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID);
inOrder.verify(this.producer).sendOffsetsToTransaction(eq(offsets), any(ConsumerGroupMetadata.class));
inOrder.verify(this.producer).commitTransaction();
offsets.clear();
offsets.put(new TopicPartition("foo", 2), new OffsetAndMetadata(1L));
inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID);
inOrder.verify(this.producer).sendOffsetsToTransaction(eq(offsets), any(ConsumerGroupMetadata.class));
inOrder.verify(this.producer).commitTransaction();
offsets.clear();
offsets.put(new TopicPartition("foo", 2), new OffsetAndMetadata(2L));
inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID);
inOrder.verify(this.producer).sendOffsetsToTransaction(eq(offsets), any(ConsumerGroupMetadata.class));
inOrder.verify(this.producer).commitTransaction();
inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
assertThat(this.config.count).isEqualTo(7);
Expand Down Expand Up @@ -225,6 +228,7 @@ public Consumer consumer() {
this.closeLatch.countDown();
return null;
}).given(consumer).close();
willReturn(new ConsumerGroupMetadata(CONTAINER_ID)).given(consumer).groupMetadata();
return consumer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.BDDMockito.willReturn;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;

Expand All @@ -37,6 +39,7 @@
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
Expand Down Expand Up @@ -101,20 +104,20 @@ public void abortSecondBatch() throws Exception {
inOrder.verify(this.producer).beginTransaction();
Map<TopicPartition, OffsetAndMetadata> offsets = new LinkedHashMap<>();
offsets.put(new TopicPartition("foo", 0), new OffsetAndMetadata(2L));
inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID);
inOrder.verify(this.producer).sendOffsetsToTransaction(eq(offsets), any(ConsumerGroupMetadata.class));
inOrder.verify(this.producer).commitTransaction();
inOrder.verify(this.producer).beginTransaction();
inOrder.verify(this.producer).abortTransaction();
inOrder.verify(this.consumer).seek(new TopicPartition("foo", 1), 0L);
offsets.clear();
offsets.put(new TopicPartition("foo", 2), new OffsetAndMetadata(2L));
inOrder.verify(this.producer).beginTransaction();
inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID);
inOrder.verify(this.producer).sendOffsetsToTransaction(eq(offsets), any(ConsumerGroupMetadata.class));
inOrder.verify(this.producer).commitTransaction();
offsets.clear();
offsets.put(new TopicPartition("foo", 1), new OffsetAndMetadata(2L));
inOrder.verify(this.producer).beginTransaction();
inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID);
inOrder.verify(this.producer).sendOffsetsToTransaction(eq(offsets), any(ConsumerGroupMetadata.class));
inOrder.verify(this.producer).commitTransaction();
assertThat(this.config.contents).containsExactly("foo", "bar", "baz", "qux", "fiz", "buz", "baz", "qux");
assertThat(this.config.transactionSuffix).isNotNull();
Expand Down Expand Up @@ -204,6 +207,7 @@ public Consumer consumer() {
this.closeLatch.countDown();
return null;
}).given(consumer).close();
willReturn(new ConsumerGroupMetadata(CONTAINER_ID)).given(consumer).groupMetadata();
return consumer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.BDDMockito.willReturn;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;

Expand All @@ -37,6 +39,7 @@
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
Expand Down Expand Up @@ -101,17 +104,17 @@ public void threeTransactionsForThreeSubBatches() throws Exception {
inOrder.verify(this.producer).beginTransaction();
Map<TopicPartition, OffsetAndMetadata> offsets = new LinkedHashMap<>();
offsets.put(new TopicPartition("foo", 0), new OffsetAndMetadata(2L));
inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID);
inOrder.verify(this.producer).sendOffsetsToTransaction(eq(offsets), any(ConsumerGroupMetadata.class));
inOrder.verify(this.producer).commitTransaction();
offsets.clear();
offsets.put(new TopicPartition("foo", 1), new OffsetAndMetadata(2L));
inOrder.verify(this.producer).beginTransaction();
inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID);
inOrder.verify(this.producer).sendOffsetsToTransaction(eq(offsets), any(ConsumerGroupMetadata.class));
inOrder.verify(this.producer).commitTransaction();
offsets.clear();
offsets.put(new TopicPartition("foo", 2), new OffsetAndMetadata(2L));
inOrder.verify(this.producer).beginTransaction();
inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID);
inOrder.verify(this.producer).sendOffsetsToTransaction(eq(offsets), any(ConsumerGroupMetadata.class));
inOrder.verify(this.producer).commitTransaction();
assertThat(this.config.contents).containsExactly("foo", "bar", "baz", "qux", "fiz", "buz");
assertThat(this.config.transactionSuffix).isNotNull();
Expand Down Expand Up @@ -190,6 +193,7 @@ public Consumer consumer() {
this.closeLatch.countDown();
return null;
}).given(consumer).close();
willReturn(new ConsumerGroupMetadata(CONTAINER_ID)).given(consumer).groupMetadata();
return consumer;
}

Expand All @@ -200,6 +204,7 @@ public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AckMode.BATCH);
factory.getContainerProperties().setTransactionManager(tm());
factory.getContainerProperties().setSubBatchPerPartition(true);
factory.setBatchListener(true);
return factory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.willAnswer;
Expand Down Expand Up @@ -494,6 +495,7 @@ public void testConsumeAndProduceTransactionExternalTM() throws Exception {
public void testRollbackRecord() throws Exception {
logger.info("Start testRollbackRecord");
Map<String, Object> props = KafkaTestUtils.consumerProps("txTest1", "false", embeddedKafka);
// props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); test fallback to EOSMode.ALPHA
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
Expand All @@ -502,6 +504,7 @@ public void testRollbackRecord() throws Exception {
containerProps.setPollTimeout(10_000);

Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
// senderProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
senderProps.put(ProducerConfig.RETRIES_CONFIG, 1);
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
pf.setTransactionIdPrefix("rr.");
Expand Down Expand Up @@ -674,7 +677,8 @@ public void accept(ConsumerRecord<?, ?> record, Exception exception) {
verify(afterRollbackProcessor).clearThreadState();
verify(dlTemplate).send(any(ProducerRecord.class));
verify(dlTemplate).sendOffsetsToTransaction(
Collections.singletonMap(new TopicPartition(topic3, 0), new OffsetAndMetadata(1L)));
eq(Collections.singletonMap(new TopicPartition(topic3, 0), new OffsetAndMetadata(1L))),
any(ConsumerGroupMetadata.class));
logger.info("Stop testMaxAttempts");
}

Expand Down
Loading